Merge pull request #3887 from filecoin-project/feat/parallel-sync

Parallel fetch for chain sync
This commit is contained in:
Łukasz Magiera 2020-09-23 19:24:54 +02:00 committed by GitHub
commit a2278e26af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 58 deletions

View File

@ -7,6 +7,7 @@ import (
"math/rand"
"time"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
@ -357,6 +358,12 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
}
defer func() {
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
// go-libp2p-core 0.7.0
go helpers.FullClose(stream) //nolint:errcheck
}()
// Write request.
_ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline))
if err := cborutil.WriteCborRPC(stream, req); err != nil {

View File

@ -40,7 +40,7 @@ const (
WriteReqDeadline = 5 * time.Second
ReadResDeadline = WriteReqDeadline
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 5
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
)

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/helpers"
inet "github.com/libp2p/go-libp2p-core/network"
)
@ -39,7 +40,9 @@ func (s *server) HandleStream(stream inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream")
defer span.End()
defer stream.Close() //nolint:errcheck
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
// go-libp2p-core 0.7.0
defer helpers.FullClose(stream) //nolint:errcheck
var req Request
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {

View File

@ -7,7 +7,6 @@ import (
"fmt"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
@ -62,20 +61,12 @@ var (
// where the Syncer publishes candidate chain heads to be synced.
LocalIncoming = "incoming"
log = logging.Logger("chain")
defaultMessageFetchWindowSize = 200
)
log = logging.Logger("chain")
func init() {
if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" {
val, err := strconv.Atoi(s)
if err != nil {
log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err)
return
}
defaultMessageFetchWindowSize = val
}
}
concurrentSyncRequests = exchange.ShufflePeersPrefix
syncRequestBatchSize = 8
syncRequestRetries = 5
)
// Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others:
@ -131,8 +122,6 @@ type Syncer struct {
verifier ffiwrapper.Verifier
windowSize int
tickerCtxCancel context.CancelFunc
checkptLk sync.Mutex
@ -174,7 +163,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C
receiptTracker: newBlockReceiptTracker(),
connmgr: connmgr,
verifier: verifier,
windowSize: defaultMessageFetchWindowSize,
incoming: pubsub.New(50),
}
@ -1481,8 +1469,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))
windowSize := syncer.windowSize
mainLoop:
for i := len(headers) - 1; i >= 0; {
fts, err := syncer.store.TryFillTipSet(headers[i])
if err != nil {
@ -1496,35 +1482,20 @@ mainLoop:
continue
}
batchSize := windowSize
batchSize := concurrentSyncRequests * syncRequestBatchSize
if i < batchSize {
batchSize = i
batchSize = i + 1
}
nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index
ss.SetStage(api.StageFetchingMessages)
var bstout []*exchange.CompactedMessages
for len(bstout) < batchSize {
next := headers[nextI]
nreq := batchSize - len(bstout)
bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq))
if err != nil {
// TODO check errors for temporary nature
if windowSize > 1 {
windowSize /= 2
log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize)
continue mainLoop
}
return xerrors.Errorf("message processing failed: %w", err)
}
bstout = append(bstout, bstips...)
nextI += len(bstips)
}
startOffset := i + 1 - batchSize
bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset)
ss.SetStage(api.StageMessages)
if batchErr != nil {
return xerrors.Errorf("failed to fetch messages: %w", err)
}
for bsi := 0; bsi < len(bstout); bsi++ {
// temp storage so we don't persist data we dont want to
bs := bstore.NewTemporary()
@ -1553,26 +1524,78 @@ mainLoop:
}
}
if i >= windowSize {
newWindowSize := windowSize + 10
if newWindowSize > int(exchange.MaxRequestLength) {
newWindowSize = int(exchange.MaxRequestLength)
}
if newWindowSize > windowSize {
windowSize = newWindowSize
log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize)
}
}
i -= batchSize
}
// remember our window size
syncer.windowSize = windowSize
return nil
}
func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) {
batchSize := len(headers)
batch := make([]*exchange.CompactedMessages, batchSize)
var wg sync.WaitGroup
var mx sync.Mutex
var batchErr error
start := build.Clock.Now()
for j := 0; j < batchSize; j += syncRequestBatchSize {
wg.Add(1)
go func(j int) {
defer wg.Done()
nreq := syncRequestBatchSize
if j+nreq > batchSize {
nreq = batchSize - j
}
failed := false
for offset := 0; !failed && offset < nreq; {
nextI := j + offset
nextHeader := headers[nextI]
var requestErr error
var requestResult []*exchange.CompactedMessages
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
if retry > 0 {
log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry)
} else {
log.Infof("fetching messages at %d", startOffset+nextI)
}
result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
if err != nil {
requestErr = multierror.Append(requestErr, err)
} else {
requestResult = result
}
}
mx.Lock()
if requestResult != nil {
copy(batch[j+offset:], requestResult)
offset += len(requestResult)
} else {
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
batchErr = multierror.Append(batchErr, requestErr)
failed = true
}
mx.Unlock()
}
}(j)
}
wg.Wait()
if batchErr != nil {
return nil, batchErr
}
log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start))
return batch, nil
}
func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
for _, m := range bst.Bls {
//log.Infof("putting BLS message: %s", m.Cid())