2019-11-15 16:38:56 +00:00
package main
import (
2020-06-25 00:57:51 +00:00
"context"
2019-11-15 16:38:56 +00:00
"database/sql"
2019-11-19 12:57:16 +00:00
"sync"
2019-11-17 12:01:10 +00:00
"time"
2019-11-15 18:37:57 +00:00
2020-01-20 00:49:52 +00:00
"github.com/filecoin-project/go-address"
2020-07-01 04:26:46 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
2019-11-15 16:38:56 +00:00
"github.com/ipfs/go-cid"
2019-12-10 23:42:36 +00:00
_ "github.com/lib/pq"
2020-07-02 04:08:45 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2020-01-20 00:49:52 +00:00
"golang.org/x/xerrors"
2019-11-15 18:37:57 +00:00
2020-01-20 00:49:52 +00:00
"github.com/filecoin-project/lotus/api"
2020-07-02 04:08:45 +00:00
"github.com/filecoin-project/lotus/chain/events/state"
2019-11-15 18:37:57 +00:00
"github.com/filecoin-project/lotus/chain/types"
2019-11-15 16:38:56 +00:00
)
type storage struct {
db * sql . DB
2019-11-19 12:57:16 +00:00
headerLk sync . Mutex
2020-07-01 04:26:46 +00:00
2020-07-02 04:08:45 +00:00
genesisTs * types . TipSet
2019-11-15 16:38:56 +00:00
}
2019-12-05 11:58:19 +00:00
func openStorage ( dbSource string ) ( * storage , error ) {
2019-12-10 23:42:36 +00:00
db , err := sql . Open ( "postgres" , dbSource )
2019-11-15 16:38:56 +00:00
if err != nil {
return nil , err
}
2019-12-13 09:30:51 +00:00
db . SetMaxOpenConns ( 1350 )
2020-07-02 04:08:45 +00:00
st := & storage { db : db }
2019-11-15 16:38:56 +00:00
return st , st . setup ( )
}
func ( st * storage ) setup ( ) error {
tx , err := st . db . Begin ( )
if err != nil {
return err
}
_ , err = tx . Exec ( `
2020-04-24 22:25:33 +00:00
create table if not exists block_cids
(
cid text not null
constraint block_cids_pk
primary key
) ;
create unique index if not exists block_cids_cid_uindex
on block_cids ( cid ) ;
2019-12-10 23:42:36 +00:00
create table if not exists blocks_synced
(
cid text not null
constraint blocks_synced_pk
2020-04-24 22:25:33 +00:00
primary key
constraint blocks_block_cids_cid_fk
references block_cids ( cid ) ,
2019-12-10 23:42:36 +00:00
add_ts int not null
) ;
create unique index if not exists blocks_synced_cid_uindex
on blocks_synced ( cid ) ;
create table if not exists block_parents
(
2020-04-24 22:25:33 +00:00
block text not null
constraint blocks_block_cids_cid_fk
references block_cids ( cid ) ,
2019-12-10 23:42:36 +00:00
parent text not null
) ;
create unique index if not exists block_parents_block_parent_uindex
on block_parents ( block , parent ) ;
2020-04-24 22:25:33 +00:00
create table if not exists drand_entries
(
round bigint not null
constraint drand_entries_pk
primary key ,
data bytea not null
) ;
create unique index if not exists drand_entries_round_uindex
on drand_entries ( round ) ;
create table if not exists block_drand_entries
(
round bigint not null
constraint block_drand_entries_drand_entries_round_fk
references drand_entries ( round ) ,
block text not null
constraint blocks_block_cids_cid_fk
references block_cids ( cid )
) ;
create unique index if not exists block_drand_entries_round_uindex
on block_drand_entries ( round , block ) ;
2019-12-10 23:42:36 +00:00
create table if not exists blocks
(
cid text not null
constraint blocks_pk
2020-04-24 22:25:33 +00:00
primary key
constraint blocks_block_cids_cid_fk
references block_cids ( cid ) ,
2019-12-10 23:42:36 +00:00
parentWeight numeric not null ,
parentStateRoot text not null ,
2019-12-12 18:34:28 +00:00
height bigint not null ,
2019-12-10 23:42:36 +00:00
miner text not null ,
2019-12-12 18:34:28 +00:00
timestamp bigint not null ,
2020-04-24 22:25:33 +00:00
ticket bytea not null ,
eprof bytea ,
forksig bigint not null
2019-12-10 23:42:36 +00:00
) ;
create unique index if not exists block_cid_uindex
on blocks ( cid ) ;
2020-01-22 15:10:22 +00:00
create materialized view if not exists state_heights
as select distinct height , parentstateroot from blocks ;
2020-06-24 12:34:53 +00:00
create index if not exists state_heights_index
2020-01-22 15:10:22 +00:00
on state_heights ( height ) ;
create index if not exists state_heights_height_index
on state_heights ( parentstateroot ) ;
2019-12-10 23:42:36 +00:00
create table if not exists id_address_map
(
id text not null ,
address text not null ,
constraint id_address_map_pk
primary key ( id , address )
) ;
create unique index if not exists id_address_map_id_uindex
on id_address_map ( id ) ;
create unique index if not exists id_address_map_address_uindex
on id_address_map ( address ) ;
2019-11-16 19:47:06 +00:00
create table if not exists actors
2019-11-15 18:37:57 +00:00
(
2019-12-10 23:42:36 +00:00
id text not null
constraint id_address_map_actors_id_fk
references id_address_map ( id ) ,
2019-11-15 18:37:57 +00:00
code text not null ,
head text not null ,
nonce int not null ,
2019-11-16 19:47:06 +00:00
balance text not null ,
2019-11-16 12:34:52 +00:00
stateroot text
2019-11-15 18:37:57 +00:00
) ;
2019-11-16 19:47:06 +00:00
create index if not exists actors_id_index
on actors ( id ) ;
2019-11-15 18:37:57 +00:00
2019-11-16 19:47:06 +00:00
create index if not exists id_address_map_address_index
on id_address_map ( address ) ;
create index if not exists id_address_map_id_index
on id_address_map ( id ) ;
2020-01-22 15:10:22 +00:00
create or replace function actor_tips ( epoch bigint )
returns table ( id text ,
code text ,
head text ,
nonce int ,
balance text ,
stateroot text ,
height bigint ,
parentstateroot text ) as
$ body $
select distinct on ( id ) * from actors
inner join state_heights sh on sh . parentstateroot = stateroot
where height < $ 1
order by id , height desc ;
$ body $ language sql ;
2020-01-19 16:18:47 +00:00
create table if not exists actor_states
(
head text not null ,
code text not null ,
state json not null
) ;
create unique index if not exists actor_states_head_code_uindex
on actor_states ( head , code ) ;
create index if not exists actor_states_head_index
on actor_states ( head ) ;
create index if not exists actor_states_code_head_index
on actor_states ( head , code ) ;
2019-11-16 19:47:06 +00:00
create table if not exists messages
2019-11-15 16:38:56 +00:00
(
cid text not null
constraint messages_pk
primary key ,
2019-12-12 18:34:28 +00:00
"from" text not null ,
"to" text not null ,
2020-01-19 16:18:47 +00:00
nonce bigint not null ,
2019-11-15 16:38:56 +00:00
value text not null ,
2020-01-19 16:18:47 +00:00
gasprice bigint not null ,
gaslimit bigint not null ,
method bigint ,
2019-12-10 23:42:36 +00:00
params bytea
2019-11-15 16:38:56 +00:00
) ;
2019-11-16 19:47:06 +00:00
create unique index if not exists messages_cid_uindex
2019-11-15 16:38:56 +00:00
on messages ( cid ) ;
2020-03-09 08:34:04 +00:00
2020-03-09 18:18:30 +00:00
create index if not exists messages_from_index
2020-03-09 08:34:04 +00:00
on messages ( "from" ) ;
2020-03-09 18:18:30 +00:00
create index if not exists messages_to_index
2020-03-09 08:34:04 +00:00
on messages ( "to" ) ;
2019-11-16 19:47:06 +00:00
create table if not exists block_messages
2019-11-15 16:38:56 +00:00
(
2020-04-24 22:25:33 +00:00
block text not null
constraint blocks_block_cids_cid_fk
references block_cids ( cid ) ,
2019-12-12 18:34:28 +00:00
message text not null ,
2019-11-15 16:38:56 +00:00
constraint block_messages_pk
2019-11-16 12:34:52 +00:00
primary key ( block , message )
) ;
2019-11-17 12:01:10 +00:00
create table if not exists mpool_messages
(
msg text not null
constraint mpool_messages_pk
primary key
constraint mpool_messages_messages_cid_fk
references messages ,
add_ts int not null
) ;
create unique index if not exists mpool_messages_msg_uindex
on mpool_messages ( msg ) ;
2019-12-03 11:05:12 +00:00
create table if not exists receipts
(
2019-12-12 18:34:28 +00:00
msg text not null ,
2019-12-10 23:42:36 +00:00
state text not null ,
2019-12-03 11:05:12 +00:00
idx int not null ,
exit int not null ,
gas_used int not null ,
2019-12-10 23:42:36 +00:00
return bytea ,
2019-12-03 11:05:12 +00:00
constraint receipts_pk
primary key ( msg , state )
) ;
create index if not exists receipts_msg_state_index
on receipts ( msg , state ) ;
2020-06-25 00:57:51 +00:00
create table if not exists miner_sectors
(
2020-06-30 20:22:58 +00:00
miner_id text not null ,
sector_id bigint not null ,
2020-07-01 04:26:46 +00:00
2020-06-30 20:22:58 +00:00
activation_epoch bigint not null ,
expiration_epoch bigint not null ,
2020-07-01 04:26:46 +00:00
termination_epoch bigint ,
2020-06-30 20:22:58 +00:00
deal_weight text not null ,
verified_deal_weight text not null ,
seal_cid text not null ,
seal_rand_epoch bigint not null ,
2020-06-25 00:57:51 +00:00
constraint miner_sectors_pk
2020-06-30 20:22:58 +00:00
primary key ( miner_id , sector_id )
2020-06-25 00:57:51 +00:00
) ;
2020-06-30 17:26:41 +00:00
create index if not exists miner_sectors_miner_sectorid_index
2020-06-30 20:22:58 +00:00
on miner_sectors ( miner_id , sector_id ) ;
2020-06-30 17:26:41 +00:00
2020-06-30 20:22:58 +00:00
create table if not exists miner_info
2019-11-16 12:34:52 +00:00
(
2020-06-30 20:22:58 +00:00
miner_id text not null ,
owner_addr text not null ,
worker_addr text not null ,
peer_id text ,
sector_size text not null ,
2020-06-30 17:26:41 +00:00
2020-06-30 20:22:58 +00:00
precommit_deposits text not null ,
locked_funds text not null ,
next_deadline_process_faults bigint not null ,
constraint miner_info_pk
primary key ( miner_id )
) ;
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads
(
miner_id text not null ,
miner_sectors_cid text not null ,
state_root text not null ,
constraint miner_sectors_heads_pk
primary key ( miner_id , miner_sectors_cid )
2019-11-15 16:38:56 +00:00
) ;
2019-11-16 12:34:52 +00:00
2020-07-02 04:08:45 +00:00
create type miner_sector_event_type as enum ( ' ADDED ' , ' EXTENDED ' , ' EXPIRED ' , ' TERMINATED ' ) ;
create table if not exists miner_sector_events
(
miner_id text not null ,
sector_id bigint not null ,
state_root text not null ,
event miner_sector_event_type not null ,
constraint miner_sector_events_pk
primary key ( sector_id , event , miner_id , state_root )
)
2020-06-30 17:26:41 +00:00
/ *
2020-01-22 15:10:22 +00:00
create or replace function miner_tips ( epoch bigint )
returns table ( head text ,
addr text ,
stateroot text ,
sectorset text ,
setsize decimal ,
provingset text ,
provingsize decimal ,
owner text ,
worker text ,
peerid text ,
sectorsize bigint ,
power decimal ,
active bool ,
ppe bigint ,
slashed_at bigint ,
height bigint ,
parentstateroot text ) as
$ body $
select distinct on ( addr ) * from miner_heads
inner join state_heights sh on sh . parentstateroot = stateroot
where height < $ 1
order by addr , height desc ;
$ body $ language sql ;
2020-01-20 00:49:52 +00:00
create table if not exists deals
(
id int not null ,
pieceRef text not null ,
pieceSize bigint not null ,
client text not null ,
provider text not null ,
2020-02-10 19:16:36 +00:00
start decimal not null ,
end decimal not null ,
2020-01-20 00:49:52 +00:00
epochPrice decimal not null ,
collateral decimal not null ,
constraint deals_pk
primary key ( id )
) ;
create index if not exists deals_client_index
on deals ( client ) ;
create unique index if not exists deals_id_uindex
on deals ( id ) ;
create index if not exists deals_pieceRef_index
on deals ( pieceRef ) ;
create index if not exists deals_provider_index
on deals ( provider ) ;
2020-01-22 02:22:12 +00:00
create table if not exists deal_activations
(
deal bigint not null
constraint deal_activations_deals_id_fk
references deals ,
activation_epoch bigint not null ,
constraint deal_activations_pk
primary key ( deal )
) ;
create index if not exists deal_activations_activation_epoch_index
on deal_activations ( activation_epoch ) ;
create unique index if not exists deal_activations_deal_uindex
on deal_activations ( deal ) ;
2020-03-09 05:43:32 +00:00
* /
2020-01-27 20:16:08 +00:00
2019-11-15 16:38:56 +00:00
` )
if err != nil {
return err
}
return tx . Commit ( )
}
2019-12-11 22:17:44 +00:00
func ( st * storage ) hasList ( ) map [ cid . Cid ] struct { } {
rws , err := st . db . Query ( ` select cid FROM blocks_synced ` )
2019-11-15 16:38:56 +00:00
if err != nil {
log . Error ( err )
2019-12-11 22:17:44 +00:00
return map [ cid . Cid ] struct { } { }
2019-11-15 16:38:56 +00:00
}
2019-12-11 22:17:44 +00:00
out := map [ cid . Cid ] struct { } { }
for rws . Next ( ) {
var c string
if err := rws . Scan ( & c ) ; err != nil {
log . Error ( err )
continue
}
ci , err := cid . Parse ( c )
if err != nil {
log . Error ( err )
continue
}
out [ ci ] = struct { } { }
}
return out
2019-11-15 16:38:56 +00:00
}
2020-01-19 16:18:47 +00:00
func ( st * storage ) storeActors ( actors map [ address . Address ] map [ types . Actor ] actorInfo ) error {
// Basic
2019-11-15 16:38:56 +00:00
tx , err := st . db . Begin ( )
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _ , err := tx . Exec ( `
2020-01-19 16:18:47 +00:00
create temp table a ( like actors excluding constraints ) on commit drop ;
` ) ; err != nil {
2019-12-12 18:34:28 +00:00
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy a (id, code, head, nonce, balance, stateroot) from stdin ` )
2019-11-15 18:37:57 +00:00
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
2019-11-15 18:37:57 +00:00
for addr , acts := range actors {
2019-11-16 12:34:52 +00:00
for act , st := range acts {
2020-01-19 16:18:47 +00:00
if _ , err := stmt . Exec ( addr . String ( ) , act . Code . String ( ) , act . Head . String ( ) , act . Nonce , act . Balance . String ( ) , st . stateroot . String ( ) ) ; err != nil {
2019-11-15 18:37:57 +00:00
return err
}
}
}
2019-12-12 18:34:28 +00:00
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into actors select * from a on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
2020-01-19 16:18:47 +00:00
if err := tx . Commit ( ) ; err != nil {
return err
}
// States
tx , err = st . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( `
create temp table a ( like actor_states excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err = tx . Prepare ( ` copy a (head, code, state) from stdin ` )
if err != nil {
return err
}
for _ , acts := range actors {
for act , st := range acts {
if _ , err := stmt . Exec ( act . Head . String ( ) , act . Code . String ( ) , st . state ) ; err != nil {
return err
}
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into actor_states select * from a on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
if err := tx . Commit ( ) ; err != nil {
return err
}
return nil
2019-11-15 18:37:57 +00:00
}
2020-06-25 00:57:51 +00:00
type storeSectorsAPI interface {
StateMinerSectors ( context . Context , address . Address , * abi . BitField , bool , types . TipSetKey ) ( [ ] * api . ChainSectorInfo , error )
}
2020-06-30 20:22:58 +00:00
func ( st * storage ) storeSectors ( minerTips map [ types . TipSetKey ] [ ] * minerStateInfo , sectorApi storeSectorsAPI ) error {
2020-06-25 00:57:51 +00:00
tx , err := st . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( ` create temp table ms (like miner_sectors excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
2020-06-30 20:22:58 +00:00
stmt , err := tx . Prepare ( ` copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN ` )
2020-06-25 00:57:51 +00:00
if err != nil {
return err
}
for tipset , miners := range minerTips {
for _ , miner := range miners {
sectors , err := sectorApi . StateMinerSectors ( context . TODO ( ) , miner . addr , nil , true , tipset )
if err != nil {
log . Debugw ( "Failed to load sectors" , "tipset" , tipset . String ( ) , "miner" , miner . addr . String ( ) , "error" , err )
}
for _ , sector := range sectors {
if _ , err := stmt . Exec (
miner . addr . String ( ) ,
uint64 ( sector . ID ) ,
2020-07-01 10:05:50 +00:00
int64 ( sector . Info . Activation ) ,
int64 ( sector . Info . Expiration ) ,
2020-06-30 20:22:58 +00:00
sector . Info . DealWeight . String ( ) ,
sector . Info . VerifiedDealWeight . String ( ) ,
2020-07-01 10:05:50 +00:00
sector . Info . SealedCID . String ( ) ,
0 , // TODO: Not there now?
2020-06-25 00:57:51 +00:00
) ; err != nil {
return err
}
}
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into miner_sectors select * from ms on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
return tx . Commit ( )
}
2020-06-30 20:22:58 +00:00
func ( st * storage ) storeMiners ( minerTips map [ types . TipSetKey ] [ ] * minerStateInfo ) error {
2020-06-30 17:26:41 +00:00
tx , err := st . db . Begin ( )
if err != nil {
return err
}
2019-11-16 12:34:52 +00:00
2020-06-30 20:22:58 +00:00
if _ , err := tx . Exec ( ` create temp table mi (like miner_info excluding constraints) on commit drop; ` ) ; err != nil {
2020-06-30 17:26:41 +00:00
return xerrors . Errorf ( "prep temp: %w" , err )
}
2019-12-12 18:34:28 +00:00
2020-06-30 20:22:58 +00:00
stmt , err := tx . Prepare ( ` copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN ` )
2020-06-30 17:26:41 +00:00
if err != nil {
return err
}
for ts , miners := range minerTips {
for _ , miner := range miners {
var pid string
if len ( miner . info . PeerId ) != 0 {
peerid , err := peer . IDFromBytes ( miner . info . PeerId )
if err != nil {
// this should "never happen", but if it does we should still store info about the miner.
log . Warnw ( "failed to decode peerID" , "peerID (bytes)" , miner . info . PeerId , "miner" , miner . addr , "tipset" , ts . String ( ) )
} else {
pid = peerid . String ( )
}
}
2020-03-09 21:47:58 +00:00
if _ , err := stmt . Exec (
2020-06-30 17:26:41 +00:00
miner . addr . String ( ) ,
miner . info . Owner . String ( ) ,
miner . info . Worker . String ( ) ,
pid ,
miner . info . SectorSize . ShortString ( ) ,
miner . state . PreCommitDeposits . String ( ) ,
miner . state . LockedFunds . String ( ) ,
miner . state . NextDeadlineToProcessFaults ,
2020-03-09 21:47:58 +00:00
) ; err != nil {
2020-06-30 20:22:58 +00:00
log . Errorw ( "failed to store miner state" , "state" , miner . state , "info" , miner . info , "error" , err )
return err
}
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into miner_info select * from mi on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
return tx . Commit ( )
}
2020-07-01 04:26:46 +00:00
func ( st * storage ) storeMinerSectorsHeads ( minerTips map [ types . TipSetKey ] [ ] * minerStateInfo , api api . FullNode ) error {
2020-06-30 20:22:58 +00:00
tx , err := st . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( ` create temp table msh (like miner_sectors_heads excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy msh (miner_id, miner_sectors_cid, state_root) from STDIN ` )
if err != nil {
return err
}
2020-07-02 04:08:45 +00:00
for _ , miners := range minerTips {
2020-06-30 20:22:58 +00:00
for _ , miner := range miners {
if _ , err := stmt . Exec (
miner . addr . String ( ) ,
miner . state . Sectors . String ( ) ,
miner . stateroot . String ( ) ,
) ; err != nil {
log . Errorw ( "failed to store miners sectors head" , "state" , miner . state , "info" , miner . info , "error" , err )
2020-03-09 21:47:58 +00:00
return err
}
2019-12-12 18:34:28 +00:00
2020-03-09 21:47:58 +00:00
}
2020-06-30 17:26:41 +00:00
}
if err := stmt . Close ( ) ; err != nil {
return err
}
2019-11-16 12:34:52 +00:00
2020-06-30 20:22:58 +00:00
if _ , err := tx . Exec ( ` insert into miner_sectors_heads select * from msh on conflict do nothing ` ) ; err != nil {
2020-06-30 17:26:41 +00:00
return xerrors . Errorf ( "actor put: %w" , err )
}
2020-07-02 04:08:45 +00:00
return tx . Commit ( )
2020-07-01 04:26:46 +00:00
}
2020-07-02 04:08:45 +00:00
type sectorUpdate struct {
terminationEpoch abi . ChainEpoch
terminated bool
expirationEpoch abi . ChainEpoch
sectorID abi . SectorNumber
minerID address . Address
2020-07-01 04:26:46 +00:00
}
2020-07-02 04:08:45 +00:00
func ( st * storage ) updateMinerSectors ( minerTips map [ types . TipSetKey ] [ ] * minerStateInfo , api api . FullNode ) error {
log . Debugw ( "updating miners constant sector table" , "#tipsets" , len ( minerTips ) )
pred := state . NewStatePredicates ( api )
2020-07-01 04:26:46 +00:00
2020-07-02 04:08:45 +00:00
eventTx , err := st . db . Begin ( )
if err != nil {
return err
}
2020-07-01 04:26:46 +00:00
2020-07-02 04:08:45 +00:00
if _ , err := eventTx . Exec ( ` create temp table mse (like miner_sector_events excluding constraints) on commit drop; ` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
eventStmt , err := eventTx . Prepare ( ` copy mse (sector_id, event, miner_id, state_root) from STDIN ` )
if err != nil {
return err
}
var sectorUpdates [ ] sectorUpdate
// TODO consider performing the miner sector diffing in parallel and performing the database update after.
for _ , miners := range minerTips {
for _ , miner := range miners {
// special case genesis miners
if miner . tsKey == st . genesisTs . Key ( ) {
sectors , err := api . StateMinerSectors ( context . TODO ( ) , miner . addr , nil , true , miner . tsKey )
if err != nil {
log . Debugw ( "failed to get miner info for genesis" , "miner" , miner . addr . String ( ) )
continue
}
for _ , sector := range sectors {
2020-07-08 12:35:40 +00:00
if _ , err := eventStmt . Exec ( sector . Info . SectorNumber , "ADDED" , miner . addr . String ( ) , miner . stateroot . String ( ) ) ; err != nil {
2020-07-02 04:08:45 +00:00
return err
}
}
} else {
sectorDiffFn := pred . OnMinerActorChange ( miner . addr , pred . OnMinerSectorChange ( ) )
changed , val , err := sectorDiffFn ( context . TODO ( ) , miner . parentTsKey , miner . tsKey )
if err != nil {
log . Debugw ( "error getting miner sector diff" , "miner" , miner . addr , "error" , err )
continue
}
if ! changed {
continue
}
changes := val . ( * state . MinerSectorChanges )
log . Debugw ( "sector changes for miner" , "miner" , miner . addr . String ( ) , "Added" , len ( changes . Added ) , "Extended" , len ( changes . Extended ) , "Removed" , len ( changes . Removed ) , "oldState" , miner . parentTsKey , "newState" , miner . tsKey )
for _ , extended := range changes . Extended {
2020-07-08 12:35:40 +00:00
if _ , err := eventStmt . Exec ( extended . To . SectorNumber , "EXTENDED" , miner . addr . String ( ) , miner . stateroot . String ( ) ) ; err != nil {
2020-07-02 04:08:45 +00:00
return err
}
sectorUpdates = append ( sectorUpdates , sectorUpdate {
terminationEpoch : 0 ,
terminated : false ,
2020-07-08 12:35:40 +00:00
expirationEpoch : extended . To . Expiration ,
sectorID : extended . To . SectorNumber ,
2020-07-02 04:08:45 +00:00
minerID : miner . addr ,
} )
2020-07-08 12:35:40 +00:00
log . Debugw ( "sector extended" , "miner" , miner . addr . String ( ) , "sector" , extended . To . SectorNumber , "old" , extended . To . Expiration , "new" , extended . From . Expiration )
2020-07-02 04:08:45 +00:00
}
curTs , err := api . ChainGetTipSet ( context . TODO ( ) , miner . tsKey )
if err != nil {
return err
}
for _ , removed := range changes . Removed {
// decide if they were terminated or extended
2020-07-08 12:35:40 +00:00
if removed . Expiration > curTs . Height ( ) {
if _ , err := eventStmt . Exec ( removed . SectorNumber , "TERMINATED" , miner . addr . String ( ) , miner . stateroot . String ( ) ) ; err != nil {
2020-07-02 04:08:45 +00:00
return err
}
2020-07-08 12:35:40 +00:00
log . Debugw ( "sector terminated" , "miner" , miner . addr . String ( ) , "sector" , removed . SectorNumber , "old" , "sectorExpiration" , removed . Expiration , "terminationEpoch" , curTs . Height ( ) )
2020-07-02 04:08:45 +00:00
sectorUpdates = append ( sectorUpdates , sectorUpdate {
terminationEpoch : curTs . Height ( ) ,
terminated : true ,
2020-07-08 12:35:40 +00:00
expirationEpoch : removed . Expiration ,
sectorID : removed . SectorNumber ,
2020-07-02 04:08:45 +00:00
minerID : miner . addr ,
} )
}
2020-07-08 12:35:40 +00:00
if _ , err := eventStmt . Exec ( removed . SectorNumber , "EXPIRED" , miner . addr . String ( ) , miner . stateroot . String ( ) ) ; err != nil {
2020-07-02 04:08:45 +00:00
return err
}
2020-07-08 12:35:40 +00:00
log . Debugw ( "sector removed" , "miner" , miner . addr . String ( ) , "sector" , removed . SectorNumber , "old" , "sectorExpiration" , removed . Expiration , "currEpoch" , curTs . Height ( ) )
2020-07-02 04:08:45 +00:00
}
for _ , added := range changes . Added {
2020-07-08 12:35:40 +00:00
if _ , err := eventStmt . Exec ( miner . addr . String ( ) , added . SectorNumber , miner . stateroot . String ( ) , "ADDED" ) ; err != nil {
2020-07-02 04:08:45 +00:00
return err
}
}
2020-07-01 04:26:46 +00:00
}
}
}
2020-07-02 04:08:45 +00:00
if err := eventStmt . Close ( ) ; err != nil {
return err
}
if _ , err := eventTx . Exec ( ` insert into miner_sector_events select * from mse on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
if err := eventTx . Commit ( ) ; err != nil {
return err
}
updateTx , err := st . db . Begin ( )
2020-07-01 04:26:46 +00:00
if err != nil {
return err
}
2020-07-02 04:08:45 +00:00
updateStmt , err := updateTx . Prepare ( ` UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4 ` )
2020-07-01 04:26:46 +00:00
if err != nil {
return err
}
2020-07-02 04:08:45 +00:00
for _ , update := range sectorUpdates {
if update . terminated {
if _ , err := updateStmt . Exec ( update . terminationEpoch , update . expirationEpoch , update . minerID . String ( ) , update . sectorID ) ; err != nil {
return err
}
} else {
if _ , err := updateStmt . Exec ( nil , update . expirationEpoch , update . minerID . String ( ) , update . sectorID ) ; err != nil {
return err
}
2020-07-01 04:26:46 +00:00
}
}
2020-07-02 04:08:45 +00:00
if err := updateStmt . Close ( ) ; err != nil {
2020-07-01 04:26:46 +00:00
return err
}
2020-07-02 04:08:45 +00:00
return updateTx . Commit ( )
2019-11-16 12:34:52 +00:00
}
2019-11-19 12:57:16 +00:00
func ( st * storage ) storeHeaders ( bhs map [ cid . Cid ] * types . BlockHeader , sync bool ) error {
st . headerLk . Lock ( )
defer st . headerLk . Unlock ( )
2019-11-15 18:37:57 +00:00
tx , err := st . db . Begin ( )
if err != nil {
2019-12-12 18:34:28 +00:00
return xerrors . Errorf ( "begin: %w" , err )
2019-11-15 18:37:57 +00:00
}
2019-12-12 18:34:28 +00:00
if _ , err := tx . Exec ( `
2020-04-24 22:25:33 +00:00
create temp table bc ( like block_cids excluding constraints ) on commit drop ;
create temp table de ( like drand_entries excluding constraints ) on commit drop ;
create temp table bde ( like block_drand_entries excluding constraints ) on commit drop ;
2019-12-12 18:34:28 +00:00
create temp table tbp ( like block_parents excluding constraints ) on commit drop ;
create temp table bs ( like blocks_synced excluding constraints ) on commit drop ;
create temp table b ( like blocks excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
2020-04-24 22:25:33 +00:00
{
stmt , err := tx . Prepare ( ` copy bc (cid) from STDIN ` )
if err != nil {
return err
}
2019-11-15 18:37:57 +00:00
2020-04-24 22:25:33 +00:00
for _ , bh := range bhs {
if _ , err := stmt . Exec ( bh . Cid ( ) . String ( ) ) ; err != nil {
2019-12-11 22:17:44 +00:00
log . Error ( err )
}
2019-11-18 20:11:43 +00:00
}
2020-04-24 22:25:33 +00:00
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into block_cids select * from bc on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "drand entries put: %w" , err )
}
2019-12-12 16:56:57 +00:00
}
2019-11-18 20:11:43 +00:00
2020-04-24 22:25:33 +00:00
{
stmt , err := tx . Prepare ( ` copy de (round, data) from STDIN ` )
if err != nil {
return err
}
for _ , bh := range bhs {
for _ , ent := range bh . BeaconEntries {
if _ , err := stmt . Exec ( ent . Round , ent . Data ) ; err != nil {
log . Error ( err )
}
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into drand_entries select * from de on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "drand entries put: %w" , err )
}
}
{
stmt , err := tx . Prepare ( ` copy bde (round, block) from STDIN ` )
if err != nil {
return err
}
for _ , bh := range bhs {
for _ , ent := range bh . BeaconEntries {
if _ , err := stmt . Exec ( ent . Round , bh . Cid ( ) . String ( ) ) ; err != nil {
log . Error ( err )
}
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into block_drand_entries select * from bde on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "block drand entries put: %w" , err )
}
2019-12-12 18:34:28 +00:00
}
2020-04-24 22:25:33 +00:00
{
stmt , err := tx . Prepare ( ` copy tbp (block, parent) from STDIN ` )
if err != nil {
return err
}
for _ , bh := range bhs {
for _ , parent := range bh . Parents {
if _ , err := stmt . Exec ( bh . Cid ( ) . String ( ) , parent . String ( ) ) ; err != nil {
log . Error ( err )
}
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into block_parents select * from tbp on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "parent put: %w" , err )
}
2019-12-12 18:34:28 +00:00
}
2019-11-19 12:57:16 +00:00
if sync {
2019-12-08 15:49:13 +00:00
now := time . Now ( ) . Unix ( )
2019-12-12 18:34:28 +00:00
stmt , err := tx . Prepare ( ` copy bs (cid, add_ts) from stdin ` )
2019-12-12 16:56:57 +00:00
if err != nil {
return err
}
for _ , bh := range bhs {
2019-12-12 18:34:28 +00:00
if _ , err := stmt . Exec ( bh . Cid ( ) . String ( ) , now ) ; err != nil {
2019-12-11 22:17:44 +00:00
log . Error ( err )
2019-11-19 12:57:16 +00:00
}
2019-12-12 16:56:57 +00:00
}
2019-12-12 18:34:28 +00:00
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into blocks_synced select * from bs on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "syncd put: %w" , err )
}
2019-11-19 12:57:16 +00:00
}
2020-04-24 22:25:33 +00:00
stmt2 , err := tx . Prepare ( ` copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", ticket, eprof, forksig) from stdin ` )
2019-12-12 16:56:57 +00:00
if err != nil {
return err
}
2019-11-15 16:38:56 +00:00
for _ , bh := range bhs {
2020-04-24 22:25:33 +00:00
var eprof interface { }
if bh . ElectionProof != nil {
eprof = bh . ElectionProof . VRFProof
}
2019-12-12 13:53:38 +00:00
2020-05-15 17:06:52 +00:00
if bh . Ticket == nil {
log . Warnf ( "got a block with nil ticket" )
bh . Ticket = & types . Ticket {
VRFProof : [ ] byte { } ,
}
}
2019-12-12 16:56:57 +00:00
if _ , err := stmt2 . Exec (
bh . Cid ( ) . String ( ) ,
2019-12-12 13:53:38 +00:00
bh . ParentWeight . String ( ) ,
bh . ParentStateRoot . String ( ) ,
bh . Height ,
bh . Miner . String ( ) ,
bh . Timestamp ,
2020-04-24 22:25:33 +00:00
bh . Ticket . VRFProof ,
eprof ,
bh . ForkSignaling ) ; err != nil {
2019-12-11 22:17:44 +00:00
log . Error ( err )
2019-12-10 23:42:36 +00:00
}
2019-11-15 16:38:56 +00:00
}
2019-12-10 23:42:36 +00:00
2019-12-12 18:34:28 +00:00
if err := stmt2 . Close ( ) ; err != nil {
return xerrors . Errorf ( "s2 close: %w" , err )
}
if _ , err := tx . Exec ( ` insert into blocks select * from b on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "blk put: %w" , err )
}
2020-04-24 22:25:33 +00:00
return tx . Commit ( )
2019-11-15 16:38:56 +00:00
}
func ( st * storage ) storeMessages ( msgs map [ cid . Cid ] * types . Message ) error {
tx , err := st . db . Begin ( )
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _ , err := tx . Exec ( `
create temp table msgs ( like messages excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy msgs (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) from stdin ` )
2019-11-15 16:38:56 +00:00
if err != nil {
return err
}
for c , m := range msgs {
if _ , err := stmt . Exec (
c . String ( ) ,
m . From . String ( ) ,
m . To . String ( ) ,
m . Nonce ,
m . Value . String ( ) ,
m . GasPrice . String ( ) ,
2020-03-18 20:45:37 +00:00
m . GasLimit ,
2019-11-15 16:38:56 +00:00
m . Method ,
m . Params ,
) ; err != nil {
return err
}
}
2019-12-12 18:34:28 +00:00
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into messages select * from msgs on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
2019-11-15 16:38:56 +00:00
return tx . Commit ( )
}
2019-12-03 11:05:12 +00:00
func ( st * storage ) storeReceipts ( recs map [ mrec ] * types . MessageReceipt ) error {
tx , err := st . db . Begin ( )
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _ , err := tx . Exec ( `
create temp table recs ( like receipts excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy recs (msg, state, idx, exit, gas_used, return) from stdin ` )
2019-12-03 11:05:12 +00:00
if err != nil {
return err
}
for c , m := range recs {
if _ , err := stmt . Exec (
c . msg . String ( ) ,
c . state . String ( ) ,
c . idx ,
m . ExitCode ,
2020-03-18 20:45:37 +00:00
m . GasUsed ,
2019-12-03 11:05:12 +00:00
m . Return ,
) ; err != nil {
return err
}
}
2019-12-12 18:34:28 +00:00
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into receipts select * from recs on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
2019-12-03 11:05:12 +00:00
return tx . Commit ( )
}
2019-11-15 18:37:57 +00:00
func ( st * storage ) storeAddressMap ( addrs map [ address . Address ] address . Address ) error {
tx , err := st . db . Begin ( )
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _ , err := tx . Exec ( `
create temp table iam ( like id_address_map excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy iam (id, address) from STDIN ` )
2019-11-15 18:37:57 +00:00
if err != nil {
return err
}
for a , i := range addrs {
if i == address . Undef {
continue
}
if _ , err := stmt . Exec (
i . String ( ) ,
a . String ( ) ,
) ; err != nil {
return err
}
}
2019-12-12 18:34:28 +00:00
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into id_address_map select * from iam on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
2019-11-15 18:37:57 +00:00
return tx . Commit ( )
}
2019-11-15 16:38:56 +00:00
func ( st * storage ) storeMsgInclusions ( incls map [ cid . Cid ] [ ] cid . Cid ) error {
tx , err := st . db . Begin ( )
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _ , err := tx . Exec ( `
create temp table mi ( like block_messages excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy mi (block, message) from STDIN ` )
2019-11-15 16:38:56 +00:00
if err != nil {
return err
}
for b , msgs := range incls {
for _ , msg := range msgs {
if _ , err := stmt . Exec (
b . String ( ) ,
msg . String ( ) ,
) ; err != nil {
return err
}
}
}
2019-12-12 18:34:28 +00:00
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into block_messages select * from mi on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
2019-11-15 16:38:56 +00:00
return tx . Commit ( )
}
2019-12-13 11:04:24 +00:00
func ( st * storage ) storeMpoolInclusions ( msgs [ ] api . MpoolUpdate ) error {
2019-11-17 12:01:10 +00:00
tx , err := st . db . Begin ( )
if err != nil {
return err
}
2019-12-13 11:04:24 +00:00
if _ , err := tx . Exec ( `
2020-01-20 00:49:52 +00:00
create temp table mi ( like mpool_messages excluding constraints ) on commit drop ;
` ) ; err != nil {
2019-12-13 11:04:24 +00:00
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err := tx . Prepare ( ` copy mi (msg, add_ts) from stdin ` )
2019-11-17 12:01:10 +00:00
if err != nil {
return err
}
2019-12-13 11:04:24 +00:00
for _ , msg := range msgs {
if msg . Type != api . MpoolAdd {
continue
}
if _ , err := stmt . Exec (
msg . Message . Message . Cid ( ) . String ( ) ,
time . Now ( ) . Unix ( ) ,
) ; err != nil {
return err
}
}
if err := stmt . Close ( ) ; err != nil {
2019-11-17 12:01:10 +00:00
return err
}
2019-12-13 11:04:24 +00:00
if _ , err := tx . Exec ( ` insert into mpool_messages select * from mi on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
2019-11-17 12:01:10 +00:00
return tx . Commit ( )
}
2020-02-10 19:16:36 +00:00
func ( st * storage ) storeDeals ( deals map [ string ] api . MarketDeal ) error {
2020-03-09 05:43:32 +00:00
/ * tx , err := st . db . Begin ( )
2020-01-20 00:49:52 +00:00
if err != nil {
return err
}
if _ , err := tx . Exec ( `
create temp table d ( like deals excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
2020-02-10 19:16:36 +00:00
stmt , err := tx . Prepare ( ` copy d (id, pieceref, piecesize, client, "provider", "start", "end", epochprice, collateral) from stdin ` )
2020-01-20 00:49:52 +00:00
if err != nil {
return err
}
var bloat uint64
for id , deal := range deals {
2020-02-10 19:16:36 +00:00
if len ( deal . Proposal . PieceCID . String ( ) ) > 100 {
bloat += uint64 ( len ( deal . Proposal . PieceCID . String ( ) ) )
2020-01-20 00:49:52 +00:00
continue
}
if _ , err := stmt . Exec (
id ,
2020-02-10 19:16:36 +00:00
deal . Proposal . PieceCID . String ( ) ,
deal . Proposal . PieceSize ,
deal . Proposal . Client . String ( ) ,
deal . Proposal . Provider . String ( ) ,
fmt . Sprint ( deal . Proposal . StartEpoch ) ,
fmt . Sprint ( deal . Proposal . EndEpoch ) ,
deal . Proposal . StoragePricePerEpoch . String ( ) ,
deal . Proposal . ProviderCollateral . String ( ) ,
2020-01-20 00:49:52 +00:00
) ; err != nil {
return err
}
}
if bloat > 0 {
log . Warnf ( "deal PieceRefs had %d bytes of garbage" , bloat )
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into deals select * from d on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
2020-01-22 02:22:12 +00:00
if err := tx . Commit ( ) ; err != nil {
return err
}
// Activations
tx , err = st . db . Begin ( )
if err != nil {
return err
}
if _ , err := tx . Exec ( `
create temp table d ( like deal_activations excluding constraints ) on commit drop ;
` ) ; err != nil {
return xerrors . Errorf ( "prep temp: %w" , err )
}
stmt , err = tx . Prepare ( ` copy d (deal, activation_epoch) from stdin ` )
if err != nil {
return err
}
for id , deal := range deals {
2020-02-10 19:16:36 +00:00
if deal . State . SectorStartEpoch <= 0 {
2020-01-22 02:22:12 +00:00
continue
}
if _ , err := stmt . Exec (
id ,
2020-02-10 19:16:36 +00:00
deal . State . SectorStartEpoch ,
2020-01-22 02:22:12 +00:00
) ; err != nil {
return err
}
}
if err := stmt . Close ( ) ; err != nil {
return err
}
if _ , err := tx . Exec ( ` insert into deal_activations select * from d on conflict do nothing ` ) ; err != nil {
return xerrors . Errorf ( "actor put: %w" , err )
}
if err := tx . Commit ( ) ; err != nil {
return err
}
2020-03-09 21:47:58 +00:00
* /
2020-01-22 02:22:12 +00:00
return nil
2020-01-20 00:49:52 +00:00
}
2020-01-22 15:10:22 +00:00
func ( st * storage ) refreshViews ( ) error {
if _ , err := st . db . Exec ( ` refresh materialized view state_heights ` ) ; err != nil {
return err
}
return nil
}
2019-11-15 16:38:56 +00:00
func ( st * storage ) close ( ) error {
return st . db . Close ( )
}