diff --git a/chain/sync.go b/chain/sync.go index 9864600dd..ae3f8ee80 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -58,7 +58,14 @@ import ( // the theoretical max height based on systime are quickly rejected const MaxHeightDrift = 5 -var defaultMessageFetchWindowSize = 200 +var ( + // LocalIncoming is the _local_ pubsub (unrelated to libp2p pubsub) topic + // where the Syncer publishes candidate chain heads to be synced. + LocalIncoming = "incoming" + + log = logging.Logger("chain") + defaultMessageFetchWindowSize = 200 +) func init() { if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" { @@ -71,10 +78,6 @@ func init() { } } -var log = logging.Logger("chain") - -var LocalIncoming = "incoming" - // Syncer is in charge of running the chain synchronization logic. As such, it // is tasked with these functions, amongst others: // @@ -119,7 +122,7 @@ type Syncer struct { self peer.ID - syncmgr *SyncManager + syncmgr SyncManager connmgr connmgr.ConnManager @@ -140,8 +143,10 @@ type Syncer struct { ds dtypes.MetadataDS } +type SyncManagerCtor func(syncFn SyncFunc) SyncManager + // NewSyncer creates a new Syncer object. -func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*Syncer, error) { +func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, syncMgrCtor SyncManagerCtor, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*Syncer, error) { gen, err := sm.ChainStore().GetGenesis() if err != nil { return nil, xerrors.Errorf("getting genesis block: %w", err) @@ -181,7 +186,7 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C log.Warn("*********************************************************************************************") } - s.syncmgr = NewSyncManager(s.Sync) + s.syncmgr = syncMgrCtor(s.Sync) return s, nil } @@ -1665,11 +1670,7 @@ func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []b } func (syncer *Syncer) State() []SyncerState { - var out []SyncerState - for _, ss := range syncer.syncmgr.syncStates { - out = append(out, ss.Snapshot()) - } - return out + return syncer.syncmgr.State() } // MarkBad manually adds a block to the "bad blocks" cache. diff --git a/chain/sync_manager.go b/chain/sync_manager.go index 8c77b47c5..811092bc7 100644 --- a/chain/sync_manager.go +++ b/chain/sync_manager.go @@ -20,7 +20,28 @@ const ( type SyncFunc func(context.Context, *types.TipSet) error -type SyncManager struct { +// SyncManager manages the chain synchronization process, both at bootstrap time +// and during ongoing operation. +// +// It receives candidate chain heads in the form of tipsets from peers, +// and schedules them onto sync workers, deduplicating processing for +// already-active syncs. +type SyncManager interface { + // Start starts the SyncManager. + Start() + + // Stop stops the SyncManager. + Stop() + + // SetPeerHead informs the SyncManager that the supplied peer reported the + // supplied tipset. + SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) + + // State retrieves the state of the sync workers. + State() []SyncerState +} + +type syncManager struct { lk sync.Mutex peerHeads map[peer.ID]*types.TipSet @@ -48,6 +69,8 @@ type SyncManager struct { workerChan chan *types.TipSet } +var _ SyncManager = (*syncManager)(nil) + type syncResult struct { ts *types.TipSet success bool @@ -55,8 +78,8 @@ type syncResult struct { const syncWorkerCount = 3 -func NewSyncManager(sync SyncFunc) *SyncManager { - return &SyncManager{ +func NewSyncManager(sync SyncFunc) SyncManager { + return &syncManager{ bspThresh: 1, peerHeads: make(map[peer.ID]*types.TipSet), syncTargets: make(chan *types.TipSet), @@ -69,18 +92,18 @@ func NewSyncManager(sync SyncFunc) *SyncManager { } } -func (sm *SyncManager) Start() { +func (sm *syncManager) Start() { go sm.syncScheduler() for i := 0; i < syncWorkerCount; i++ { go sm.syncWorker(i) } } -func (sm *SyncManager) Stop() { +func (sm *syncManager) Stop() { close(sm.stop) } -func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { +func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { sm.lk.Lock() defer sm.lk.Unlock() sm.peerHeads[p] = ts @@ -105,6 +128,14 @@ func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip sm.incomingTipSets <- ts } +func (sm *syncManager) State() []SyncerState { + ret := make([]SyncerState, 0, len(sm.syncStates)) + for _, s := range sm.syncStates { + ret = append(ret, s.Snapshot()) + } + return ret +} + type syncBucketSet struct { buckets []*syncTargetBucket } @@ -234,7 +265,7 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { return best } -func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { +func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) { var buckets syncBucketSet var peerHeads []*types.TipSet @@ -258,7 +289,7 @@ func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { return buckets.Heaviest(), nil } -func (sm *SyncManager) syncScheduler() { +func (sm *syncManager) syncScheduler() { for { select { @@ -280,7 +311,7 @@ func (sm *SyncManager) syncScheduler() { } } -func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { +func (sm *syncManager) scheduleIncoming(ts *types.TipSet) { log.Debug("scheduling incoming tipset sync: ", ts.Cids()) if sm.getBootstrapState() == BSStateSelected { sm.setBootstrapState(BSStateScheduled) @@ -328,10 +359,11 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { } } -func (sm *SyncManager) scheduleProcessResult(res *syncResult) { +func (sm *syncManager) scheduleProcessResult(res *syncResult) { if res.success && sm.getBootstrapState() != BSStateComplete { sm.setBootstrapState(BSStateComplete) } + delete(sm.activeSyncs, res.ts.Key()) relbucket := sm.activeSyncTips.PopRelated(res.ts) if relbucket != nil { @@ -360,7 +392,7 @@ func (sm *SyncManager) scheduleProcessResult(res *syncResult) { } } -func (sm *SyncManager) scheduleWorkSent() { +func (sm *syncManager) scheduleWorkSent() { hts := sm.nextSyncTarget.heaviestTipSet() sm.activeSyncs[hts.Key()] = hts @@ -372,7 +404,7 @@ func (sm *SyncManager) scheduleWorkSent() { } } -func (sm *SyncManager) syncWorker(id int) { +func (sm *syncManager) syncWorker(id int) { ss := &SyncerState{} sm.syncStates[id] = ss for { @@ -397,7 +429,7 @@ func (sm *SyncManager) syncWorker(id int) { } } -func (sm *SyncManager) syncedPeerCount() int { +func (sm *syncManager) syncedPeerCount() int { var count int for _, ts := range sm.peerHeads { if ts.Height() > 0 { @@ -407,19 +439,19 @@ func (sm *SyncManager) syncedPeerCount() int { return count } -func (sm *SyncManager) getBootstrapState() int { +func (sm *syncManager) getBootstrapState() int { sm.bssLk.Lock() defer sm.bssLk.Unlock() return sm.bootstrapState } -func (sm *SyncManager) setBootstrapState(v int) { +func (sm *syncManager) setBootstrapState(v int) { sm.bssLk.Lock() defer sm.bssLk.Unlock() sm.bootstrapState = v } -func (sm *SyncManager) IsBootstrapped() bool { +func (sm *syncManager) IsBootstrapped() bool { sm.bssLk.Lock() defer sm.bssLk.Unlock() return sm.bootstrapState == BSStateComplete diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index ca2ced856..269b3a62e 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -17,7 +17,7 @@ type syncOp struct { done func() } -func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *SyncManager, chan *syncOp)) { +func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *syncManager, chan *syncOp)) { syncTargets := make(chan *syncOp) sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error { ch := make(chan struct{}) @@ -27,7 +27,7 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, } <-ch return nil - }) + }).(*syncManager) sm.bspThresh = thresh sm.Start() @@ -77,12 +77,12 @@ func TestSyncManager(t *testing.T) { c3 := mock.TipSet(mock.MkBlock(b, 3, 5)) d := mock.TipSet(mock.MkBlock(c1, 4, 5)) - runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", c1) assertGetSyncOp(t, stc, c1) }) - runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", c1) assertNoOp(t, stc) @@ -90,7 +90,7 @@ func TestSyncManager(t *testing.T) { assertGetSyncOp(t, stc, c1) }) - runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", b) assertGetSyncOp(t, stc, b) @@ -101,7 +101,7 @@ func TestSyncManager(t *testing.T) { assertGetSyncOp(t, stc, c2) }) - runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", a) assertGetSyncOp(t, stc, a) @@ -122,7 +122,7 @@ func TestSyncManager(t *testing.T) { assertGetSyncOp(t, stc, d) }) - runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { + runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) { sm.SetPeerHead(ctx, "peer1", a) assertGetSyncOp(t, stc, a) diff --git a/node/builder.go b/node/builder.go index 6c01258a6..bf11145df 100644 --- a/node/builder.go +++ b/node/builder.go @@ -242,6 +242,9 @@ func Online() Option { Override(new(dtypes.ChainBlockService), modules.ChainBlockService), // Filecoin services + // We don't want the SyncManagerCtor to be used as an fx constructor, but rather as a value. + // It will be called implicitly by the Syncer constructor. + Override(new(chain.SyncManagerCtor), func() chain.SyncManagerCtor { return chain.NewSyncManager }), Override(new(*chain.Syncer), modules.NewSyncer), Override(new(exchange.Client), exchange.NewClient), Override(new(*messagepool.MessagePool), modules.MessagePool), diff --git a/node/modules/chain.go b/node/modules/chain.go index 7b7e03e44..5eda51078 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -163,8 +163,31 @@ func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, return netName, err } -func NewSyncer(lc fx.Lifecycle, ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*chain.Syncer, error) { - syncer, err := chain.NewSyncer(ds, sm, exchange, h.ConnManager(), h.ID(), beacon, verifier) +type SyncerParams struct { + fx.In + + Lifecycle fx.Lifecycle + MetadataDS dtypes.MetadataDS + StateManager *stmgr.StateManager + ChainXchg exchange.Client + SyncMgrCtor chain.SyncManagerCtor + Host host.Host + Beacon beacon.Schedule + Verifier ffiwrapper.Verifier +} + +func NewSyncer(params SyncerParams) (*chain.Syncer, error) { + var ( + lc = params.Lifecycle + ds = params.MetadataDS + sm = params.StateManager + ex = params.ChainXchg + smCtor = params.SyncMgrCtor + h = params.Host + b = params.Beacon + v = params.Verifier + ) + syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v) if err != nil { return nil, err }