package syncer import ( "container/list" "context" "database/sql" "fmt" "sync" "time" "golang.org/x/xerrors" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) var log = logging.Logger("syncer") type Syncer struct { db *sql.DB lookbackLimit uint64 headerLk sync.Mutex node api.FullNode } func NewSyncer(db *sql.DB, node api.FullNode, lookbackLimit uint64) *Syncer { return &Syncer{ db: db, node: node, lookbackLimit: lookbackLimit, } } func (s *Syncer) setupSchemas() error { tx, err := s.db.Begin() if err != nil { return err } if _, err := tx.Exec(` /* tracks circulating fil available on the network at each tipset */ create table if not exists chain_economics ( parent_state_root text not null constraint chain_economics_pk primary key, circulating_fil text not null, vested_fil text not null, mined_fil text not null, burnt_fil text not null, locked_fil text not null ); create table if not exists block_cids ( cid text not null constraint block_cids_pk primary key ); create unique index if not exists block_cids_cid_uindex on block_cids (cid); create table if not exists blocks_synced ( cid text not null constraint blocks_synced_pk primary key constraint blocks_block_cids_cid_fk references block_cids (cid), synced_at int not null, processed_at int ); create unique index if not exists blocks_synced_cid_uindex on blocks_synced (cid,processed_at); create table if not exists block_parents ( block text not null constraint blocks_block_cids_cid_fk references block_cids (cid), parent text not null ); create unique index if not exists block_parents_block_parent_uindex on block_parents (block, parent); create table if not exists drand_entries ( round bigint not null constraint drand_entries_pk primary key, data bytea not null ); create unique index if not exists drand_entries_round_uindex on drand_entries (round); create table if not exists block_drand_entries ( round bigint not null constraint block_drand_entries_drand_entries_round_fk references drand_entries (round), block text not null constraint blocks_block_cids_cid_fk references block_cids (cid) ); create unique index if not exists block_drand_entries_round_uindex on block_drand_entries (round, block); create table if not exists blocks ( cid text not null constraint blocks_pk primary key constraint blocks_block_cids_cid_fk references block_cids (cid), parentWeight numeric not null, parentStateRoot text not null, height bigint not null, miner text not null, timestamp bigint not null, ticket bytea not null, election_proof bytea, win_count bigint, parent_base_fee text not null, forksig bigint not null ); create unique index if not exists block_cid_uindex on blocks (cid,height); create materialized view if not exists state_heights as select distinct height, parentstateroot from blocks; create index if not exists state_heights_height_index on state_heights (height); create index if not exists state_heights_parentstateroot_index on state_heights (parentstateroot); `); err != nil { return err } return tx.Commit() } func (s *Syncer) Start(ctx context.Context) { if err := logging.SetLogLevel("syncer", "info"); err != nil { log.Fatal(err) } log.Debug("Starting Syncer") if err := s.setupSchemas(); err != nil { log.Fatal(err) } // continue to keep the block headers table up to date. notifs, err := s.node.ChainNotify(ctx) if err != nil { log.Fatal(err) } // we need to ensure that on a restart we don't reprocess the whole flarping chain blkCID, height, err := s.mostRecentlySyncedBlockHeight() if err != nil { log.Fatalw("failed to find most recently synced block", "error", err) } log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height) sinceEpoch := uint64(height) go func() { for notif := range notifs { for _, change := range notif { switch change.Type { case store.HCApply: unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch) if err != nil { log.Errorw("failed to gather unsynced blocks", "error", err) } if err := s.storeCirculatingSupply(ctx, change.Val); err != nil { log.Errorw("failed to store circulating supply", "error", err) } if len(unsynced) == 0 { continue } if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { // so this is pretty bad, need some kind of retry.. // for now just log an error and the blocks will be attempted again on next notifi log.Errorw("failed to store unsynced blocks", "error", err) } sinceEpoch = uint64(change.Val.Height()) case store.HCRevert: log.Debug("revert todo") } } } }() } func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) { hasList, err := s.syncedBlocks(since, s.lookbackLimit) if err != nil { return nil, err } // build a list of blocks that we have not synced. toVisit := list.New() for _, header := range head.Blocks() { toVisit.PushBack(header) } toSync := map[cid.Cid]*types.BlockHeader{} for toVisit.Len() > 0 { bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) _, has := hasList[bh.Cid()] if _, seen := toSync[bh.Cid()]; seen || has { continue } toSync[bh.Cid()] = bh if len(toSync)%500 == 10 { log.Debugw("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height) } if bh.Height == 0 { continue } pts, err := s.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) if err != nil { log.Error(err) continue } for _, header := range pts.Blocks() { toVisit.PushBack(header) } } log.Debugw("Gathered unsynced blocks", "count", len(toSync)) return toSync, nil } func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) { rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit) if err != nil { return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) } out := map[cid.Cid]struct{}{} for rws.Next() { var c string if err := rws.Scan(&c); err != nil { return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err) } ci, err := cid.Parse(c) if err != nil { return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err) } out[ci] = struct{}{} } return out, nil } func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) { rw := s.db.QueryRow(` select blocks_synced.cid, b.height from blocks_synced left join blocks b on blocks_synced.cid = b.cid where processed_at is not null order by height desc limit 1 `) var c string var h int64 if err := rw.Scan(&c, &h); err != nil { if err == sql.ErrNoRows { return cid.Undef, 0, nil } return cid.Undef, -1, err } ci, err := cid.Parse(c) if err != nil { return cid.Undef, -1, err } return ci, h, nil } func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error { supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key()) if err != nil { return err } ceInsert := `insert into chain_economics (parent_state_root, circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) ` + `values ('%s', '%s', '%s', '%s', '%s', '%s') on conflict on constraint chain_economics_pk do ` + `update set (circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) = ('%[2]s', '%[3]s', '%[4]s', '%[5]s', '%[6]s') ` + `where chain_economics.parent_state_root = '%[1]s';` if _, err := s.db.Exec(fmt.Sprintf(ceInsert, tipset.ParentState().String(), supply.FilCirculating.String(), supply.FilVested.String(), supply.FilMined.String(), supply.FilBurnt.String(), supply.FilLocked.String(), )); err != nil { return xerrors.Errorf("insert circulating supply for tipset (%s): %w", tipset.Key().String(), err) } return nil } func (s *Syncer) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool, timestamp time.Time) error { s.headerLk.Lock() defer s.headerLk.Unlock() if len(bhs) == 0 { return nil } log.Debugw("Storing Headers", "count", len(bhs)) tx, err := s.db.Begin() if err != nil { return xerrors.Errorf("begin: %w", err) } if _, err := tx.Exec(` create temp table bc (like block_cids excluding constraints) on commit drop; create temp table de (like drand_entries excluding constraints) on commit drop; create temp table bde (like block_drand_entries excluding constraints) on commit drop; create temp table tbp (like block_parents excluding constraints) on commit drop; create temp table bs (like blocks_synced excluding constraints) on commit drop; create temp table b (like blocks excluding constraints) on commit drop; `); err != nil { return xerrors.Errorf("prep temp: %w", err) } { stmt, err := tx.Prepare(`copy bc (cid) from STDIN`) if err != nil { return err } for _, bh := range bhs { if _, err := stmt.Exec(bh.Cid().String()); err != nil { log.Error(err) } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into block_cids select * from bc on conflict do nothing `); err != nil { return xerrors.Errorf("drand entries put: %w", err) } } { stmt, err := tx.Prepare(`copy de (round, data) from STDIN`) if err != nil { return err } for _, bh := range bhs { for _, ent := range bh.BeaconEntries { if _, err := stmt.Exec(ent.Round, ent.Data); err != nil { log.Error(err) } } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into drand_entries select * from de on conflict do nothing `); err != nil { return xerrors.Errorf("drand entries put: %w", err) } } { stmt, err := tx.Prepare(`copy bde (round, block) from STDIN`) if err != nil { return err } for _, bh := range bhs { for _, ent := range bh.BeaconEntries { if _, err := stmt.Exec(ent.Round, bh.Cid().String()); err != nil { log.Error(err) } } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into block_drand_entries select * from bde on conflict do nothing `); err != nil { return xerrors.Errorf("block drand entries put: %w", err) } } { stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`) if err != nil { return err } for _, bh := range bhs { for _, parent := range bh.Parents { if _, err := stmt.Exec(bh.Cid().String(), parent.String()); err != nil { log.Error(err) } } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil { return xerrors.Errorf("parent put: %w", err) } } if sync { stmt, err := tx.Prepare(`copy bs (cid, synced_at) from stdin `) if err != nil { return err } for _, bh := range bhs { if _, err := stmt.Exec(bh.Cid().String(), timestamp.Unix()); err != nil { log.Error(err) } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil { return xerrors.Errorf("syncd put: %w", err) } } stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", ticket, election_proof, win_count, parent_base_fee, forksig) from stdin`) if err != nil { return err } for _, bh := range bhs { var eproof, winCount interface{} if bh.ElectionProof != nil { eproof = bh.ElectionProof.VRFProof winCount = bh.ElectionProof.WinCount } if bh.Ticket == nil { log.Warnf("got a block with nil ticket") bh.Ticket = &types.Ticket{ VRFProof: []byte{}, } } if _, err := stmt2.Exec( bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp, bh.Ticket.VRFProof, eproof, winCount, bh.ParentBaseFee.String(), bh.ForkSignaling); err != nil { log.Error(err) } } if err := stmt2.Close(); err != nil { return xerrors.Errorf("s2 close: %w", err) } if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil { return xerrors.Errorf("blk put: %w", err) } return tx.Commit() }