handle marking blocks as bad better

This commit is contained in:
whyrusleeping 2019-11-10 15:06:06 -08:00
parent c74f87fd51
commit dbc706b846
2 changed files with 37 additions and 12 deletions

View File

@ -67,22 +67,20 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
) )
} }
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
req := &BlockSyncRequest{ req := &BlockSyncRequest{
Start: tipset, Start: tipset,
RequestLength: uint64(count), RequestLength: uint64(count),
Options: BSOptBlocks, Options: BSOptBlocks,
} }
peers := bs.getPeers()
var oerr error var oerr error
for _, p := range perm { for _, p := range peers {
res, err := bs.sendRequestToPeer(ctx, peers[p], req) res, err := bs.sendRequestToPeer(ctx, p, req)
if err != nil { if err != nil {
oerr = err oerr = err
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err) log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err)
continue continue
} }
@ -91,7 +89,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
} }
oerr = bs.processStatus(req, res) oerr = bs.processStatus(req, res)
if oerr != nil { if oerr != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr) log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr)
} }
} }
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr) return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
@ -182,7 +180,7 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID) s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID)
if err != nil { if err != nil {
bs.RemovePeer(p) bs.RemovePeer(p)
return nil, err return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
} }
if err := cborutil.WriteCborRPC(s, req); err != nil { if err := cborutil.WriteCborRPC(s, req); err != nil {

View File

@ -119,14 +119,15 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.Bsync.AddPeer(from) syncer.Bsync.AddPeer(from)
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
if fts.TipSet().Blocks()[0].ParentWeight.LessThan(bestPweight) { targetWeight := fts.TipSet().Blocks()[0].ParentWeight
if targetWeight.LessThan(bestPweight) {
log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now")
return return
} }
go func() { go func() {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil { if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("sync error: %+v", err) log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err)
} }
}() }()
} }
@ -358,6 +359,12 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "chain.Sync") ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End() defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())),
trace.Int64Attribute("height", int64(maybeHead.Height())),
)
}
syncer.syncLock.Lock() syncer.syncLock.Lock()
defer syncer.syncLock.Unlock() defer syncer.syncLock.Unlock()
@ -367,10 +374,12 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
} }
if err := syncer.collectChain(ctx, maybeHead); err != nil { if err := syncer.collectChain(ctx, maybeHead); err != nil {
span.AddAttributes(trace.StringAttribute("col_error", err.Error()))
return xerrors.Errorf("collectChain failed: %w", err) return xerrors.Errorf("collectChain failed: %w", err)
} }
if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil { if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil {
span.AddAttributes(trace.StringAttribute("put_error", err.Error()))
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err) return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
} }
@ -690,6 +699,15 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
trace.Int64Attribute("toHeight", int64(to.Height())), trace.Int64Attribute("toHeight", int64(to.Height())),
) )
for _, pcid := range from.Parents() {
if syncer.bad.Has(pcid) {
for _, b := range from.Cids() {
syncer.bad.Add(b)
}
return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s)", from.Cids(), pcid)
}
}
blockSet := []*types.TipSet{from} blockSet := []*types.TipSet{from}
at := from.Parents() at := from.Parents()
@ -766,6 +784,13 @@ loop:
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
fork, err := syncer.syncFork(ctx, last, to) fork, err := syncer.syncFork(ctx, last, to)
if err != nil { if err != nil {
if xerrors.Is(err, ErrForkTooLong) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
log.Warn("adding forked chain to our bad tipset cache")
for _, b := range from.Blocks() {
syncer.bad.Add(b.Cid())
}
}
return nil, xerrors.Errorf("failed to sync fork: %w", err) return nil, xerrors.Errorf("failed to sync fork: %w", err)
} }
@ -775,6 +800,8 @@ loop:
return blockSet, nil return blockSet, nil
} }
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold) tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold)
if err != nil { if err != nil {
@ -801,7 +828,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
} }
} }
} }
return nil, xerrors.Errorf("fork was longer than our threshold") return nil, ErrForkTooLong
} }
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error { func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {