From dbc706b846a2865ee1684c0fe44a85ad1b26f309 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 10 Nov 2019 15:06:06 -0800 Subject: [PATCH] handle marking blocks as bad better --- chain/blocksync/blocksync_client.go | 16 ++++++-------- chain/sync.go | 33 ++++++++++++++++++++++++++--- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index e17263362..6af46a592 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -67,22 +67,20 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ) } - peers := bs.getPeers() - perm := rand.Perm(len(peers)) - // TODO: round robin through these peers on error - req := &BlockSyncRequest{ Start: tipset, RequestLength: uint64(count), Options: BSOptBlocks, } + peers := bs.getPeers() + var oerr error - for _, p := range perm { - res, err := bs.sendRequestToPeer(ctx, peers[p], req) + for _, p := range peers { + res, err := bs.sendRequestToPeer(ctx, p, req) if err != nil { oerr = err - log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err) + log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err) continue } @@ -91,7 +89,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) } oerr = bs.processStatus(req, res) if oerr != nil { - log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr) + log.Warnf("BlockSync peer %s response was an error: %s", p.String(), oerr) } } return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr) @@ -182,7 +180,7 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc s, err := bs.host.NewStream(inet.WithNoDial(ctx, "should already have connection"), p, BlockSyncProtocolID) if err != nil { bs.RemovePeer(p) - return nil, err + return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } if err := cborutil.WriteCborRPC(s, req); err != nil { diff --git a/chain/sync.go b/chain/sync.go index e7384e02f..d0e24c7a0 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -119,14 +119,15 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.Bsync.AddPeer(from) bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight - if fts.TipSet().Blocks()[0].ParentWeight.LessThan(bestPweight) { + targetWeight := fts.TipSet().Blocks()[0].ParentWeight + if targetWeight.LessThan(bestPweight) { log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") return } go func() { if err := syncer.Sync(ctx, fts.TipSet()); err != nil { - log.Errorf("sync error: %+v", err) + log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err) } }() } @@ -358,6 +359,12 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "chain.Sync") defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes( + trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())), + trace.Int64Attribute("height", int64(maybeHead.Height())), + ) + } syncer.syncLock.Lock() defer syncer.syncLock.Unlock() @@ -367,10 +374,12 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { } if err := syncer.collectChain(ctx, maybeHead); err != nil { + span.AddAttributes(trace.StringAttribute("col_error", err.Error())) return xerrors.Errorf("collectChain failed: %w", err) } if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil { + span.AddAttributes(trace.StringAttribute("put_error", err.Error())) return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err) } @@ -690,6 +699,15 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to trace.Int64Attribute("toHeight", int64(to.Height())), ) + for _, pcid := range from.Parents() { + if syncer.bad.Has(pcid) { + for _, b := range from.Cids() { + syncer.bad.Add(b) + } + return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s)", from.Cids(), pcid) + } + } + blockSet := []*types.TipSet{from} at := from.Parents() @@ -766,6 +784,13 @@ loop: log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) fork, err := syncer.syncFork(ctx, last, to) if err != nil { + if xerrors.Is(err, ErrForkTooLong) { + // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? + log.Warn("adding forked chain to our bad tipset cache") + for _, b := range from.Blocks() { + syncer.bad.Add(b.Cid()) + } + } return nil, xerrors.Errorf("failed to sync fork: %w", err) } @@ -775,6 +800,8 @@ loop: return blockSet, nil } +var ErrForkTooLong = fmt.Errorf("fork longer than threshold") + func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold) if err != nil { @@ -801,7 +828,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type } } } - return nil, xerrors.Errorf("fork was longer than our threshold") + return nil, ErrForkTooLong } func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {