refactor parallel fetch logic into a separate function

This commit is contained in:
vyzo 2020-09-17 17:21:26 +03:00
parent 2a428f09e6
commit fb605f6d7f

View File

@ -1494,57 +1494,8 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
}
ss.SetStage(api.StageFetchingMessages)
bstout := make([]*exchange.CompactedMessages, batchSize)
var wg sync.WaitGroup
var mx sync.Mutex
var batchErr error
for j := 0; j < batchSize; j += syncRequestBatchSize {
wg.Add(1)
go func(j int) {
defer wg.Done()
nreq := syncRequestBatchSize
if j+nreq > batchSize {
nreq = batchSize - j
}
failed := false
for offset := 0; !failed && offset < nreq; {
nextI := (i + 1) - batchSize + j + offset
nextHeader := headers[nextI]
var requestErr error
var requestResult []*exchange.CompactedMessages
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
if retry > 0 {
log.Infof("fetching messages at %d (retry %d)", nextI, retry)
} else {
log.Infof("fetching messages at %d", nextI)
}
result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
if err != nil {
requestErr = multierror.Append(requestErr, err)
} else {
requestResult = result
}
}
mx.Lock()
if requestResult == nil {
// we failed!
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
batchErr = multierror.Append(batchErr, requestErr)
failed = true
} else {
copy(bstout[j+offset:], requestResult)
offset += len(requestResult)
}
mx.Unlock()
}
}(j)
}
wg.Wait()
startOffset := i + 1 - batchSize
bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset)
ss.SetStage(api.StageMessages)
if batchErr != nil {
@ -1585,6 +1536,71 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
return nil
}
func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) {
batchSize := len(headers)
batch := make([]*exchange.CompactedMessages, batchSize)
var wg sync.WaitGroup
var mx sync.Mutex
var batchErr error
start := build.Clock.Now()
for j := 0; j < batchSize; j += syncRequestBatchSize {
wg.Add(1)
go func(j int) {
defer wg.Done()
nreq := syncRequestBatchSize
if j+nreq > batchSize {
nreq = batchSize - j
}
failed := false
for offset := 0; !failed && offset < nreq; {
nextI := j + offset
nextHeader := headers[nextI]
var requestErr error
var requestResult []*exchange.CompactedMessages
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
if retry > 0 {
log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry)
} else {
log.Infof("fetching messages at %d", startOffset+nextI)
}
result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
if err != nil {
requestErr = multierror.Append(requestErr, err)
} else {
requestResult = result
}
}
mx.Lock()
if requestResult != nil {
copy(batch[j+offset:], requestResult)
offset += len(requestResult)
} else {
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
batchErr = multierror.Append(batchErr, requestErr)
failed = true
}
mx.Unlock()
}
}(j)
}
wg.Wait()
if batchErr != nil {
dt := build.Clock.Since(start)
log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt)
}
return batch, batchErr
}
func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
for _, m := range bst.Bls {
//log.Infof("putting BLS message: %s", m.Cid())