diff --git a/chain/sync.go b/chain/sync.go index 0e5f1a126..1b1cbdde9 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -123,6 +123,8 @@ type Syncer struct { receiptTracker *blockReceiptTracker verifier ffiwrapper.Verifier + + windowSize int } // NewSyncer creates a new Syncer object. @@ -148,6 +150,7 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connm receiptTracker: newBlockReceiptTracker(), connmgr: connmgr, verifier: verifier, + windowSize: defaultMessageFetchWindowSize, incoming: pubsub.New(50), } @@ -1413,7 +1416,8 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers)))) - windowSize := defaultMessageFetchWindowSize + windowSize := syncer.windowSize +mainLoop: for i := len(headers) - 1; i >= 0; { fts, err := syncer.store.TryFillTipSet(headers[i]) if err != nil { @@ -1441,6 +1445,12 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS nreq := batchSize - len(bstout) bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(nreq)) if err != nil { + // TODO check errors for temporary nature + if windowSize > 1 { + windowSize /= 2 + log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize) + continue mainLoop + } return xerrors.Errorf("message processing failed: %w", err) } @@ -1475,9 +1485,24 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return xerrors.Errorf("message processing failed: %w", err) } } + + if i >= windowSize { + newWindowSize := windowSize + 10 + if newWindowSize > int(blocksync.MaxRequestLength) { + newWindowSize = int(blocksync.MaxRequestLength) + } + if newWindowSize > windowSize { + windowSize = newWindowSize + log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize) + } + } + i -= batchSize } + // remember our window size + syncer.windowSize = windowSize + return nil }