parallel chain sync
This commit is contained in:
parent
61c0b8c3db
commit
35f6e10646
106
chain/sync.go
106
chain/sync.go
@ -7,7 +7,6 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -64,19 +63,11 @@ var (
|
||||
LocalIncoming = "incoming"
|
||||
|
||||
log = logging.Logger("chain")
|
||||
defaultMessageFetchWindowSize = 200
|
||||
)
|
||||
|
||||
func init() {
|
||||
if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" {
|
||||
val, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err)
|
||||
return
|
||||
}
|
||||
defaultMessageFetchWindowSize = val
|
||||
}
|
||||
}
|
||||
concurrentSyncRequests = 16
|
||||
syncRequestBatchSize = 4
|
||||
syncRequestRetries = 5
|
||||
)
|
||||
|
||||
// Syncer is in charge of running the chain synchronization logic. As such, it
|
||||
// is tasked with these functions, amongst others:
|
||||
@ -132,8 +123,6 @@ type Syncer struct {
|
||||
|
||||
verifier ffiwrapper.Verifier
|
||||
|
||||
windowSize int
|
||||
|
||||
tickerCtxCancel context.CancelFunc
|
||||
|
||||
checkptLk sync.Mutex
|
||||
@ -175,7 +164,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C
|
||||
receiptTracker: newBlockReceiptTracker(),
|
||||
connmgr: connmgr,
|
||||
verifier: verifier,
|
||||
windowSize: defaultMessageFetchWindowSize,
|
||||
|
||||
incoming: pubsub.New(50),
|
||||
}
|
||||
@ -1483,8 +1471,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
|
||||
|
||||
span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))
|
||||
|
||||
windowSize := syncer.windowSize
|
||||
mainLoop:
|
||||
for i := len(headers) - 1; i >= 0; {
|
||||
fts, err := syncer.store.TryFillTipSet(headers[i])
|
||||
if err != nil {
|
||||
@ -1498,35 +1484,73 @@ mainLoop:
|
||||
continue
|
||||
}
|
||||
|
||||
batchSize := windowSize
|
||||
batchSize := concurrentSyncRequests * syncRequestBatchSize
|
||||
if i < batchSize {
|
||||
if i == 0 {
|
||||
batchSize = 1
|
||||
} else {
|
||||
batchSize = i
|
||||
}
|
||||
|
||||
nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index
|
||||
}
|
||||
|
||||
ss.SetStage(api.StageFetchingMessages)
|
||||
var bstout []*exchange.CompactedMessages
|
||||
for len(bstout) < batchSize {
|
||||
next := headers[nextI]
|
||||
bstout := make([]*exchange.CompactedMessages, batchSize)
|
||||
var wg sync.WaitGroup
|
||||
var mx sync.Mutex
|
||||
var batchErr error
|
||||
for j := 0; j < batchSize; j += syncRequestBatchSize {
|
||||
wg.Add(1)
|
||||
go func(j int) {
|
||||
defer wg.Done()
|
||||
|
||||
nreq := batchSize - len(bstout)
|
||||
bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq))
|
||||
nreq := syncRequestBatchSize
|
||||
if j*syncRequestBatchSize+nreq > batchSize {
|
||||
nreq = batchSize - j*syncRequestBatchSize
|
||||
}
|
||||
|
||||
failed := false
|
||||
for offset := 0; !failed && offset < nreq; {
|
||||
nextI := (i + 1) - batchSize + j*syncRequestBatchSize + offset
|
||||
nextHeader := headers[nextI]
|
||||
|
||||
var requestErr error
|
||||
var requestResult []*exchange.CompactedMessages
|
||||
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
|
||||
if retry > 0 {
|
||||
log.Infof("fetching messages at %d (retry %d)", nextI, retry)
|
||||
} else {
|
||||
log.Infof("fetching messages at %d", nextI)
|
||||
}
|
||||
|
||||
result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
|
||||
if err != nil {
|
||||
// TODO check errors for temporary nature
|
||||
if windowSize > 1 {
|
||||
windowSize /= 2
|
||||
log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize)
|
||||
continue mainLoop
|
||||
requestErr = multierror.Append(requestErr, err)
|
||||
} else {
|
||||
requestResult = result
|
||||
}
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
bstout = append(bstout, bstips...)
|
||||
nextI += len(bstips)
|
||||
mx.Lock()
|
||||
if requestResult == nil {
|
||||
// we failed!
|
||||
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
|
||||
batchErr = multierror.Append(batchErr, requestErr)
|
||||
failed = true
|
||||
} else {
|
||||
copy(bstout[j*syncRequestBatchSize+offset:], requestResult)
|
||||
offset += len(requestResult)
|
||||
}
|
||||
mx.Unlock()
|
||||
}
|
||||
}(j)
|
||||
}
|
||||
wg.Wait()
|
||||
ss.SetStage(api.StageMessages)
|
||||
|
||||
if batchErr != nil {
|
||||
return xerrors.Errorf("failed to fetch messages: %w", err)
|
||||
}
|
||||
|
||||
for bsi := 0; bsi < len(bstout); bsi++ {
|
||||
// temp storage so we don't persist data we dont want to
|
||||
bs := bstore.NewTemporary()
|
||||
@ -1555,23 +1579,9 @@ mainLoop:
|
||||
}
|
||||
}
|
||||
|
||||
if i >= windowSize {
|
||||
newWindowSize := windowSize + 10
|
||||
if newWindowSize > int(exchange.MaxRequestLength) {
|
||||
newWindowSize = int(exchange.MaxRequestLength)
|
||||
}
|
||||
if newWindowSize > windowSize {
|
||||
windowSize = newWindowSize
|
||||
log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize)
|
||||
}
|
||||
}
|
||||
|
||||
i -= batchSize
|
||||
}
|
||||
|
||||
// remember our window size
|
||||
syncer.windowSize = windowSize
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user