diff --git a/chain/sync.go b/chain/sync.go index 0ab8ac183..95c2e2e84 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1494,57 +1494,8 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS } ss.SetStage(api.StageFetchingMessages) - bstout := make([]*exchange.CompactedMessages, batchSize) - var wg sync.WaitGroup - var mx sync.Mutex - var batchErr error - for j := 0; j < batchSize; j += syncRequestBatchSize { - wg.Add(1) - go func(j int) { - defer wg.Done() - - nreq := syncRequestBatchSize - if j+nreq > batchSize { - nreq = batchSize - j - } - - failed := false - for offset := 0; !failed && offset < nreq; { - nextI := (i + 1) - batchSize + j + offset - nextHeader := headers[nextI] - - var requestErr error - var requestResult []*exchange.CompactedMessages - for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { - if retry > 0 { - log.Infof("fetching messages at %d (retry %d)", nextI, retry) - } else { - log.Infof("fetching messages at %d", nextI) - } - - result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) - if err != nil { - requestErr = multierror.Append(requestErr, err) - } else { - requestResult = result - } - } - - mx.Lock() - if requestResult == nil { - // we failed! - log.Errorf("error fetching messages at %d: %s", nextI, requestErr) - batchErr = multierror.Append(batchErr, requestErr) - failed = true - } else { - copy(bstout[j+offset:], requestResult) - offset += len(requestResult) - } - mx.Unlock() - } - }(j) - } - wg.Wait() + startOffset := i + 1 - batchSize + bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset) ss.SetStage(api.StageMessages) if batchErr != nil { @@ -1585,6 +1536,71 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return nil } +func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) { + batchSize := len(headers) + batch := make([]*exchange.CompactedMessages, batchSize) + + var wg sync.WaitGroup + var mx sync.Mutex + var batchErr error + + start := build.Clock.Now() + + for j := 0; j < batchSize; j += syncRequestBatchSize { + wg.Add(1) + go func(j int) { + defer wg.Done() + + nreq := syncRequestBatchSize + if j+nreq > batchSize { + nreq = batchSize - j + } + + failed := false + for offset := 0; !failed && offset < nreq; { + nextI := j + offset + nextHeader := headers[nextI] + + var requestErr error + var requestResult []*exchange.CompactedMessages + for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { + if retry > 0 { + log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry) + } else { + log.Infof("fetching messages at %d", startOffset+nextI) + } + + result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) + if err != nil { + requestErr = multierror.Append(requestErr, err) + } else { + requestResult = result + } + } + + mx.Lock() + if requestResult != nil { + copy(batch[j+offset:], requestResult) + offset += len(requestResult) + } else { + log.Errorf("error fetching messages at %d: %s", nextI, requestErr) + batchErr = multierror.Append(batchErr, requestErr) + failed = true + } + mx.Unlock() + } + }(j) + } + wg.Wait() + + if batchErr != nil { + dt := build.Clock.Since(start) + log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt) + } + + return batch, batchErr +} + func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error { for _, m := range bst.Bls { //log.Infof("putting BLS message: %s", m.Cid())