package syncer import ( "container/list" "context" "database/sql" "sync" "time" "golang.org/x/xerrors" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) var log = logging.Logger("syncer") type Syncer struct { db *sql.DB headerLk sync.Mutex node api.FullNode } func NewSyncer(db *sql.DB, node api.FullNode) *Syncer { return &Syncer{ db: db, node: node, } } func (s *Syncer) setupSchemas() error { tx, err := s.db.Begin() if err != nil { return err } if _, err := tx.Exec(` create table if not exists block_cids ( cid text not null constraint block_cids_pk primary key ); create unique index if not exists block_cids_cid_uindex on block_cids (cid); create table if not exists blocks_synced ( cid text not null constraint blocks_synced_pk primary key constraint blocks_block_cids_cid_fk references block_cids (cid), synced_at int not null, processed_at int ); create unique index if not exists blocks_synced_cid_uindex on blocks_synced (cid,processed_at); create table if not exists block_parents ( block text not null constraint blocks_block_cids_cid_fk references block_cids (cid), parent text not null ); create unique index if not exists block_parents_block_parent_uindex on block_parents (block, parent); create table if not exists drand_entries ( round bigint not null constraint drand_entries_pk primary key, data bytea not null ); create unique index if not exists drand_entries_round_uindex on drand_entries (round); create table if not exists block_drand_entries ( round bigint not null constraint block_drand_entries_drand_entries_round_fk references drand_entries (round), block text not null constraint blocks_block_cids_cid_fk references block_cids (cid) ); create unique index if not exists block_drand_entries_round_uindex on block_drand_entries (round, block); create table if not exists blocks ( cid text not null constraint blocks_pk primary key constraint blocks_block_cids_cid_fk references block_cids (cid), parentWeight numeric not null, parentStateRoot text not null, height bigint not null, miner text not null, timestamp bigint not null, ticket bytea not null, election_proof bytea, win_count bigint, parent_base_fee text not null, forksig bigint not null ); create unique index if not exists block_cid_uindex on blocks (cid,height); create materialized view if not exists state_heights as select distinct height, parentstateroot from blocks; create index if not exists state_heights_height_index on state_heights (height); create index if not exists state_heights_parentstateroot_index on state_heights (parentstateroot); `); err != nil { return err } return tx.Commit() } func (s *Syncer) Start(ctx context.Context) { log.Debug("Starting Syncer") if err := s.setupSchemas(); err != nil { log.Fatal(err) } // doing the initial sync here lets us avoid the HCCurrent case in the switch head, err := s.node.ChainHead(ctx) if err != nil { log.Fatalw("Failed to get chain head form lotus", "error", err) } unsynced, err := s.unsyncedBlocks(ctx, head, time.Unix(0, 0)) if err != nil { log.Fatalw("failed to gather unsynced blocks", "error", err) } if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { log.Fatalw("failed to store unsynced blocks", "error", err) } // continue to keep the block headers table up to date. notifs, err := s.node.ChainNotify(ctx) if err != nil { log.Fatal(err) } lastSynced := time.Now() go func() { for notif := range notifs { for _, change := range notif { switch change.Type { case store.HCApply: unsynced, err := s.unsyncedBlocks(ctx, change.Val, lastSynced) if err != nil { log.Errorw("failed to gather unsynced blocks", "error", err) } if len(unsynced) == 0 { continue } if err := s.storeHeaders(unsynced, true, lastSynced); err != nil { // so this is pretty bad, need some kind of retry.. // for now just log an error and the blocks will be attempted again on next notifi log.Errorw("failed to store unsynced blocks", "error", err) } lastSynced = time.Now() case store.HCRevert: log.Debug("revert todo") } } } }() } func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since time.Time) (map[cid.Cid]*types.BlockHeader, error) { // get a list of blocks we have already synced in the past 3 mins. This ensures we aren't returning the entire // table every time. lookback := since.Add(-(time.Minute * 3)) log.Debugw("Gathering unsynced blocks", "since", lookback.String()) hasList, err := s.syncedBlocks(lookback) if err != nil { return nil, err } // build a list of blocks that we have not synced. toVisit := list.New() for _, header := range head.Blocks() { toVisit.PushBack(header) } toSync := map[cid.Cid]*types.BlockHeader{} for toVisit.Len() > 0 { bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) _, has := hasList[bh.Cid()] if _, seen := toSync[bh.Cid()]; seen || has { continue } toSync[bh.Cid()] = bh if len(toSync)%500 == 10 { log.Debugw("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height) } if len(bh.Parents) == 0 { continue } pts, err := s.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) if err != nil { log.Error(err) continue } for _, header := range pts.Blocks() { toVisit.PushBack(header) } } log.Debugw("Gathered unsynced blocks", "count", len(toSync)) return toSync, nil } func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) { // timestamp is used to return a configurable amount of rows based on when they were last added. rws, err := s.db.Query(`select cid FROM blocks_synced where synced_at > $1`, timestamp.Unix()) if err != nil { return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) } out := map[cid.Cid]struct{}{} for rws.Next() { var c string if err := rws.Scan(&c); err != nil { return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err) } ci, err := cid.Parse(c) if err != nil { return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err) } out[ci] = struct{}{} } return out, nil } func (s *Syncer) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool, timestamp time.Time) error { s.headerLk.Lock() defer s.headerLk.Unlock() if len(bhs) == 0 { return nil } log.Debugw("Storing Headers", "count", len(bhs)) tx, err := s.db.Begin() if err != nil { return xerrors.Errorf("begin: %w", err) } if _, err := tx.Exec(` create temp table bc (like block_cids excluding constraints) on commit drop; create temp table de (like drand_entries excluding constraints) on commit drop; create temp table bde (like block_drand_entries excluding constraints) on commit drop; create temp table tbp (like block_parents excluding constraints) on commit drop; create temp table bs (like blocks_synced excluding constraints) on commit drop; create temp table b (like blocks excluding constraints) on commit drop; `); err != nil { return xerrors.Errorf("prep temp: %w", err) } { stmt, err := tx.Prepare(`copy bc (cid) from STDIN`) if err != nil { return err } for _, bh := range bhs { if _, err := stmt.Exec(bh.Cid().String()); err != nil { log.Error(err) } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into block_cids select * from bc on conflict do nothing `); err != nil { return xerrors.Errorf("drand entries put: %w", err) } } { stmt, err := tx.Prepare(`copy de (round, data) from STDIN`) if err != nil { return err } for _, bh := range bhs { for _, ent := range bh.BeaconEntries { if _, err := stmt.Exec(ent.Round, ent.Data); err != nil { log.Error(err) } } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into drand_entries select * from de on conflict do nothing `); err != nil { return xerrors.Errorf("drand entries put: %w", err) } } { stmt, err := tx.Prepare(`copy bde (round, block) from STDIN`) if err != nil { return err } for _, bh := range bhs { for _, ent := range bh.BeaconEntries { if _, err := stmt.Exec(ent.Round, bh.Cid().String()); err != nil { log.Error(err) } } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into block_drand_entries select * from bde on conflict do nothing `); err != nil { return xerrors.Errorf("block drand entries put: %w", err) } } { stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`) if err != nil { return err } for _, bh := range bhs { for _, parent := range bh.Parents { if _, err := stmt.Exec(bh.Cid().String(), parent.String()); err != nil { log.Error(err) } } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil { return xerrors.Errorf("parent put: %w", err) } } if sync { stmt, err := tx.Prepare(`copy bs (cid, synced_at) from stdin `) if err != nil { return err } for _, bh := range bhs { if _, err := stmt.Exec(bh.Cid().String(), timestamp.Unix()); err != nil { log.Error(err) } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil { return xerrors.Errorf("syncd put: %w", err) } } stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", ticket, election_proof, win_count, parent_base_fee, forksig) from stdin`) if err != nil { return err } for _, bh := range bhs { var eproof, winCount interface{} if bh.ElectionProof != nil { eproof = bh.ElectionProof.VRFProof winCount = bh.ElectionProof.WinCount } if bh.Ticket == nil { log.Warnf("got a block with nil ticket") bh.Ticket = &types.Ticket{ VRFProof: []byte{}, } } if _, err := stmt2.Exec( bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp, bh.Ticket.VRFProof, eproof, winCount, bh.ParentBaseFee.String(), bh.ForkSignaling); err != nil { log.Error(err) } } if err := stmt2.Close(); err != nil { return xerrors.Errorf("s2 close: %w", err) } if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil { return xerrors.Errorf("blk put: %w", err) } return tx.Commit() }