fix(chainwatch): sync based on height

This commit is contained in:
frrist 2020-08-27 11:45:42 -07:00
parent fccdd70524
commit f934ebd429
2 changed files with 52 additions and 33 deletions

View File

@ -70,7 +70,7 @@ var runCmd = &cli.Command{
} }
db.SetMaxOpenConns(1350) db.SetMaxOpenConns(1350)
sync := syncer.NewSyncer(db, api) sync := syncer.NewSyncer(db, api, 1400)
sync.Start(ctx) sync.Start(ctx)
proc := processor.NewProcessor(ctx, db, api, maxBatch) proc := processor.NewProcessor(ctx, db, api, maxBatch)

View File

@ -23,14 +23,17 @@ var log = logging.Logger("syncer")
type Syncer struct { type Syncer struct {
db *sql.DB db *sql.DB
lookbackLimit uint64
headerLk sync.Mutex headerLk sync.Mutex
node api.FullNode node api.FullNode
} }
func NewSyncer(db *sql.DB, node api.FullNode) *Syncer { func NewSyncer(db *sql.DB, node api.FullNode, lookbackLimit uint64) *Syncer {
return &Syncer{ return &Syncer{
db: db, db: db,
node: node, node: node,
lookbackLimit: lookbackLimit,
} }
} }
@ -148,59 +151,53 @@ create index if not exists state_heights_parentstateroot_index
} }
func (s *Syncer) Start(ctx context.Context) { func (s *Syncer) Start(ctx context.Context) {
if err := logging.SetLogLevel("syncer", "info"); err != nil {
log.Fatal(err)
}
log.Debug("Starting Syncer") log.Debug("Starting Syncer")
if err := s.setupSchemas(); err != nil { if err := s.setupSchemas(); err != nil {
log.Fatal(err) log.Fatal(err)
} }
// doing the initial sync here lets us avoid the HCCurrent case in the switch
head, err := s.node.ChainHead(ctx)
if err != nil {
log.Fatalw("Failed to get chain head form lotus", "error", err)
}
unsynced, err := s.unsyncedBlocks(ctx, head, time.Unix(0, 0))
if err != nil {
log.Fatalw("failed to gather unsynced blocks", "error", err)
}
if err := s.storeHeaders(unsynced, true, time.Now()); err != nil {
log.Fatalw("failed to store unsynced blocks", "error", err)
}
// continue to keep the block headers table up to date. // continue to keep the block headers table up to date.
notifs, err := s.node.ChainNotify(ctx) notifs, err := s.node.ChainNotify(ctx)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
lastSynced := time.Now() // we need to ensure that on a restart we don't reprocess the whole flarping chain
blkCID, height, err := s.mostRecentlySyncedBlockHeight()
if err != nil {
log.Fatalw("failed to find most recently synced block", "error", err)
}
log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height)
sinceEpoch := uint64(height)
go func() { go func() {
for notif := range notifs { for notif := range notifs {
for _, change := range notif { for _, change := range notif {
switch change.Type { switch change.Type {
case store.HCApply: case store.HCApply:
unsynced, err := s.unsyncedBlocks(ctx, change.Val, lastSynced) unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch)
if err != nil { if err != nil {
log.Errorw("failed to gather unsynced blocks", "error", err) log.Errorw("failed to gather unsynced blocks", "error", err)
} }
if err := s.storeCirculatingSupply(ctx, change.Val); err != nil { if err := s.storeCirculatingSupply(ctx, change.Val); err != nil {
log.Errorw("failed to store circulating supply", "error", err) // TODO do something with me
} }
if len(unsynced) == 0 { if len(unsynced) == 0 {
continue continue
} }
if err := s.storeHeaders(unsynced, true, lastSynced); err != nil { if err := s.storeHeaders(unsynced, true, time.Now()); err != nil {
// so this is pretty bad, need some kind of retry.. // so this is pretty bad, need some kind of retry..
// for now just log an error and the blocks will be attempted again on next notifi // for now just log an error and the blocks will be attempted again on next notifi
log.Errorw("failed to store unsynced blocks", "error", err) log.Errorw("failed to store unsynced blocks", "error", err)
} }
lastSynced = time.Now() sinceEpoch = uint64(change.Val.Height())
case store.HCRevert: case store.HCRevert:
log.Debug("revert todo") log.Debug("revert todo")
} }
@ -209,12 +206,8 @@ func (s *Syncer) Start(ctx context.Context) {
}() }()
} }
func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since time.Time) (map[cid.Cid]*types.BlockHeader, error) { func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) {
// get a list of blocks we have already synced in the past 3 mins. This ensures we aren't returning the entire hasList, err := s.syncedBlocks(since, s.lookbackLimit)
// table every time.
lookback := since.Add(-(time.Minute * 3))
log.Debugw("Gathering unsynced blocks", "since", lookback.String())
hasList, err := s.syncedBlocks(lookback)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -257,9 +250,8 @@ func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since t
return toSync, nil return toSync, nil
} }
func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error) { func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) {
// timestamp is used to return a configurable amount of rows based on when they were last added. rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit)
rws, err := s.db.Query(`select cid FROM blocks_synced where synced_at > $1`, timestamp.Unix())
if err != nil { if err != nil {
return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err)
} }
@ -281,6 +273,33 @@ func (s *Syncer) syncedBlocks(timestamp time.Time) (map[cid.Cid]struct{}, error)
return out, nil return out, nil
} }
func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) {
rw := s.db.QueryRow(`
select blocks_synced.cid, b.height
from blocks_synced
left join blocks b on blocks_synced.cid = b.cid
where processed_at is not null
order by height desc
limit 1
`)
var c string
var h int64
if err := rw.Scan(&c, &h); err != nil {
if err == sql.ErrNoRows {
return cid.Undef, 0, nil
}
return cid.Undef, -1, err
}
ci, err := cid.Parse(c)
if err != nil {
return cid.Undef, -1, err
}
return ci, h, nil
}
func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error { func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error {
supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key()) supply, err := s.node.StateCirculatingSupply(ctx, tipset.Key())
if err != nil { if err != nil {