Simplify sync logic around tipset expansions, ff and forking

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
Jakub Sztandera 2020-08-07 23:01:28 +02:00
parent 383e28e6e7
commit bf774fc29b
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
2 changed files with 28 additions and 24 deletions

View File

@ -1301,31 +1301,33 @@ loop:
at = blks[len(blks)-1].Parents() at = blks[len(blks)-1].Parents()
} }
// base is the tipset in the candidate chain at the height equal to our known tipset height. base := blockSet[len(blockSet)-1]
if base := blockSet[len(blockSet)-1]; !types.CidArrsEqual(base.Parents().Cids(), known.Cids()) { if base.Parents() == known.Parents() {
if base.Parents() == known.Parents() { // common case: receiving a block thats potentially part of the same tipset as our best block
// common case: receiving a block thats potentially part of the same tipset as our best block return blockSet, nil
return blockSet, nil
}
// We have now ascertained that this is *not* a 'fast forward'
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
fork, err := syncer.syncFork(ctx, base, known)
if err != nil {
if xerrors.Is(err, ErrForkTooLong) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
log.Warn("adding forked chain to our bad tipset cache")
for _, b := range incoming.Blocks() {
syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality"))
}
}
return nil, xerrors.Errorf("failed to sync fork: %w", err)
}
blockSet = append(blockSet, fork...)
} }
if types.CidArrsEqual(base.Parents().Cids(), known.Cids()) {
// common case: receiving blocks that are building on top of our best tipset
return blockSet, nil
}
// We have now ascertained that this is *not* a 'fast forward'
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
fork, err := syncer.syncFork(ctx, base, known)
if err != nil {
if xerrors.Is(err, ErrForkTooLong) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
log.Warn("adding forked chain to our bad tipset cache")
for _, b := range incoming.Blocks() {
syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality"))
}
}
return nil, xerrors.Errorf("failed to sync fork: %w", err)
}
blockSet = append(blockSet, fork...)
return blockSet, nil return blockSet, nil
} }

View File

@ -100,12 +100,14 @@ func (fsj *fsJournal) putEntry(je *JournalEntry) error {
return nil return nil
} }
const RFC3339nocolon = "2006-01-02T150405Z0700"
func (fsj *fsJournal) rollJournalFile() error { func (fsj *fsJournal) rollJournalFile() error {
if fsj.fi != nil { if fsj.fi != nil {
fsj.fi.Close() fsj.fi.Close()
} }
nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339)))) nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(RFC3339nocolon))))
if err != nil { if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err) return xerrors.Errorf("failed to open journal file: %w", err)
} }