Merge pull request #3845 from filecoin-project/sync-manager

syncer: make SyncManager an interface.
This commit is contained in:
Łukasz Magiera 2020-09-15 10:28:48 +02:00 committed by GitHub
commit 6665a9ca69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 38 deletions

View File

@ -58,7 +58,14 @@ import (
// the theoretical max height based on systime are quickly rejected // the theoretical max height based on systime are quickly rejected
const MaxHeightDrift = 5 const MaxHeightDrift = 5
var defaultMessageFetchWindowSize = 200 var (
// LocalIncoming is the _local_ pubsub (unrelated to libp2p pubsub) topic
// where the Syncer publishes candidate chain heads to be synced.
LocalIncoming = "incoming"
log = logging.Logger("chain")
defaultMessageFetchWindowSize = 200
)
func init() { func init() {
if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" { if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" {
@ -71,10 +78,6 @@ func init() {
} }
} }
var log = logging.Logger("chain")
var LocalIncoming = "incoming"
// Syncer is in charge of running the chain synchronization logic. As such, it // Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others: // is tasked with these functions, amongst others:
// //
@ -119,7 +122,7 @@ type Syncer struct {
self peer.ID self peer.ID
syncmgr *SyncManager syncmgr SyncManager
connmgr connmgr.ConnManager connmgr connmgr.ConnManager
@ -140,8 +143,10 @@ type Syncer struct {
ds dtypes.MetadataDS ds dtypes.MetadataDS
} }
type SyncManagerCtor func(syncFn SyncFunc) SyncManager
// NewSyncer creates a new Syncer object. // NewSyncer creates a new Syncer object.
func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*Syncer, error) { func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, syncMgrCtor SyncManagerCtor, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*Syncer, error) {
gen, err := sm.ChainStore().GetGenesis() gen, err := sm.ChainStore().GetGenesis()
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting genesis block: %w", err) return nil, xerrors.Errorf("getting genesis block: %w", err)
@ -181,7 +186,7 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C
log.Warn("*********************************************************************************************") log.Warn("*********************************************************************************************")
} }
s.syncmgr = NewSyncManager(s.Sync) s.syncmgr = syncMgrCtor(s.Sync)
return s, nil return s, nil
} }
@ -1665,11 +1670,7 @@ func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []b
} }
func (syncer *Syncer) State() []SyncerState { func (syncer *Syncer) State() []SyncerState {
var out []SyncerState return syncer.syncmgr.State()
for _, ss := range syncer.syncmgr.syncStates {
out = append(out, ss.Snapshot())
}
return out
} }
// MarkBad manually adds a block to the "bad blocks" cache. // MarkBad manually adds a block to the "bad blocks" cache.

View File

@ -20,7 +20,28 @@ const (
type SyncFunc func(context.Context, *types.TipSet) error type SyncFunc func(context.Context, *types.TipSet) error
type SyncManager struct { // SyncManager manages the chain synchronization process, both at bootstrap time
// and during ongoing operation.
//
// It receives candidate chain heads in the form of tipsets from peers,
// and schedules them onto sync workers, deduplicating processing for
// already-active syncs.
type SyncManager interface {
// Start starts the SyncManager.
Start()
// Stop stops the SyncManager.
Stop()
// SetPeerHead informs the SyncManager that the supplied peer reported the
// supplied tipset.
SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet)
// State retrieves the state of the sync workers.
State() []SyncerState
}
type syncManager struct {
lk sync.Mutex lk sync.Mutex
peerHeads map[peer.ID]*types.TipSet peerHeads map[peer.ID]*types.TipSet
@ -48,6 +69,8 @@ type SyncManager struct {
workerChan chan *types.TipSet workerChan chan *types.TipSet
} }
var _ SyncManager = (*syncManager)(nil)
type syncResult struct { type syncResult struct {
ts *types.TipSet ts *types.TipSet
success bool success bool
@ -55,8 +78,8 @@ type syncResult struct {
const syncWorkerCount = 3 const syncWorkerCount = 3
func NewSyncManager(sync SyncFunc) *SyncManager { func NewSyncManager(sync SyncFunc) SyncManager {
return &SyncManager{ return &syncManager{
bspThresh: 1, bspThresh: 1,
peerHeads: make(map[peer.ID]*types.TipSet), peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet), syncTargets: make(chan *types.TipSet),
@ -69,18 +92,18 @@ func NewSyncManager(sync SyncFunc) *SyncManager {
} }
} }
func (sm *SyncManager) Start() { func (sm *syncManager) Start() {
go sm.syncScheduler() go sm.syncScheduler()
for i := 0; i < syncWorkerCount; i++ { for i := 0; i < syncWorkerCount; i++ {
go sm.syncWorker(i) go sm.syncWorker(i)
} }
} }
func (sm *SyncManager) Stop() { func (sm *syncManager) Stop() {
close(sm.stop) close(sm.stop)
} }
func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) { func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
sm.lk.Lock() sm.lk.Lock()
defer sm.lk.Unlock() defer sm.lk.Unlock()
sm.peerHeads[p] = ts sm.peerHeads[p] = ts
@ -105,6 +128,14 @@ func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip
sm.incomingTipSets <- ts sm.incomingTipSets <- ts
} }
func (sm *syncManager) State() []SyncerState {
ret := make([]SyncerState, 0, len(sm.syncStates))
for _, s := range sm.syncStates {
ret = append(ret, s.Snapshot())
}
return ret
}
type syncBucketSet struct { type syncBucketSet struct {
buckets []*syncTargetBucket buckets []*syncTargetBucket
} }
@ -234,7 +265,7 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
return best return best
} }
func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) { func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) {
var buckets syncBucketSet var buckets syncBucketSet
var peerHeads []*types.TipSet var peerHeads []*types.TipSet
@ -258,7 +289,7 @@ func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
return buckets.Heaviest(), nil return buckets.Heaviest(), nil
} }
func (sm *SyncManager) syncScheduler() { func (sm *syncManager) syncScheduler() {
for { for {
select { select {
@ -280,7 +311,7 @@ func (sm *SyncManager) syncScheduler() {
} }
} }
func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { func (sm *syncManager) scheduleIncoming(ts *types.TipSet) {
log.Debug("scheduling incoming tipset sync: ", ts.Cids()) log.Debug("scheduling incoming tipset sync: ", ts.Cids())
if sm.getBootstrapState() == BSStateSelected { if sm.getBootstrapState() == BSStateSelected {
sm.setBootstrapState(BSStateScheduled) sm.setBootstrapState(BSStateScheduled)
@ -328,10 +359,11 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
} }
} }
func (sm *SyncManager) scheduleProcessResult(res *syncResult) { func (sm *syncManager) scheduleProcessResult(res *syncResult) {
if res.success && sm.getBootstrapState() != BSStateComplete { if res.success && sm.getBootstrapState() != BSStateComplete {
sm.setBootstrapState(BSStateComplete) sm.setBootstrapState(BSStateComplete)
} }
delete(sm.activeSyncs, res.ts.Key()) delete(sm.activeSyncs, res.ts.Key())
relbucket := sm.activeSyncTips.PopRelated(res.ts) relbucket := sm.activeSyncTips.PopRelated(res.ts)
if relbucket != nil { if relbucket != nil {
@ -360,7 +392,7 @@ func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
} }
} }
func (sm *SyncManager) scheduleWorkSent() { func (sm *syncManager) scheduleWorkSent() {
hts := sm.nextSyncTarget.heaviestTipSet() hts := sm.nextSyncTarget.heaviestTipSet()
sm.activeSyncs[hts.Key()] = hts sm.activeSyncs[hts.Key()] = hts
@ -372,7 +404,7 @@ func (sm *SyncManager) scheduleWorkSent() {
} }
} }
func (sm *SyncManager) syncWorker(id int) { func (sm *syncManager) syncWorker(id int) {
ss := &SyncerState{} ss := &SyncerState{}
sm.syncStates[id] = ss sm.syncStates[id] = ss
for { for {
@ -397,7 +429,7 @@ func (sm *SyncManager) syncWorker(id int) {
} }
} }
func (sm *SyncManager) syncedPeerCount() int { func (sm *syncManager) syncedPeerCount() int {
var count int var count int
for _, ts := range sm.peerHeads { for _, ts := range sm.peerHeads {
if ts.Height() > 0 { if ts.Height() > 0 {
@ -407,19 +439,19 @@ func (sm *SyncManager) syncedPeerCount() int {
return count return count
} }
func (sm *SyncManager) getBootstrapState() int { func (sm *syncManager) getBootstrapState() int {
sm.bssLk.Lock() sm.bssLk.Lock()
defer sm.bssLk.Unlock() defer sm.bssLk.Unlock()
return sm.bootstrapState return sm.bootstrapState
} }
func (sm *SyncManager) setBootstrapState(v int) { func (sm *syncManager) setBootstrapState(v int) {
sm.bssLk.Lock() sm.bssLk.Lock()
defer sm.bssLk.Unlock() defer sm.bssLk.Unlock()
sm.bootstrapState = v sm.bootstrapState = v
} }
func (sm *SyncManager) IsBootstrapped() bool { func (sm *syncManager) IsBootstrapped() bool {
sm.bssLk.Lock() sm.bssLk.Lock()
defer sm.bssLk.Unlock() defer sm.bssLk.Unlock()
return sm.bootstrapState == BSStateComplete return sm.bootstrapState == BSStateComplete

View File

@ -17,7 +17,7 @@ type syncOp struct {
done func() done func()
} }
func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *SyncManager, chan *syncOp)) { func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *syncManager, chan *syncOp)) {
syncTargets := make(chan *syncOp) syncTargets := make(chan *syncOp)
sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error { sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error {
ch := make(chan struct{}) ch := make(chan struct{})
@ -27,7 +27,7 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T,
} }
<-ch <-ch
return nil return nil
}) }).(*syncManager)
sm.bspThresh = thresh sm.bspThresh = thresh
sm.Start() sm.Start()
@ -77,12 +77,12 @@ func TestSyncManager(t *testing.T) {
c3 := mock.TipSet(mock.MkBlock(b, 3, 5)) c3 := mock.TipSet(mock.MkBlock(b, 3, 5))
d := mock.TipSet(mock.MkBlock(c1, 4, 5)) d := mock.TipSet(mock.MkBlock(c1, 4, 5))
runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", c1) sm.SetPeerHead(ctx, "peer1", c1)
assertGetSyncOp(t, stc, c1) assertGetSyncOp(t, stc, c1)
}) })
runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", c1) sm.SetPeerHead(ctx, "peer1", c1)
assertNoOp(t, stc) assertNoOp(t, stc)
@ -90,7 +90,7 @@ func TestSyncManager(t *testing.T) {
assertGetSyncOp(t, stc, c1) assertGetSyncOp(t, stc, c1)
}) })
runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", b) sm.SetPeerHead(ctx, "peer1", b)
assertGetSyncOp(t, stc, b) assertGetSyncOp(t, stc, b)
@ -101,7 +101,7 @@ func TestSyncManager(t *testing.T) {
assertGetSyncOp(t, stc, c2) assertGetSyncOp(t, stc, c2)
}) })
runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", a) sm.SetPeerHead(ctx, "peer1", a)
assertGetSyncOp(t, stc, a) assertGetSyncOp(t, stc, a)
@ -122,7 +122,7 @@ func TestSyncManager(t *testing.T) {
assertGetSyncOp(t, stc, d) assertGetSyncOp(t, stc, d)
}) })
runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) { runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", a) sm.SetPeerHead(ctx, "peer1", a)
assertGetSyncOp(t, stc, a) assertGetSyncOp(t, stc, a)

View File

@ -266,6 +266,9 @@ func Online() Option {
Override(new(dtypes.ChainBlockService), modules.ChainBlockService), Override(new(dtypes.ChainBlockService), modules.ChainBlockService),
// Filecoin services // Filecoin services
// We don't want the SyncManagerCtor to be used as an fx constructor, but rather as a value.
// It will be called implicitly by the Syncer constructor.
Override(new(chain.SyncManagerCtor), func() chain.SyncManagerCtor { return chain.NewSyncManager }),
Override(new(*chain.Syncer), modules.NewSyncer), Override(new(*chain.Syncer), modules.NewSyncer),
Override(new(exchange.Client), exchange.NewClient), Override(new(exchange.Client), exchange.NewClient),
Override(new(*messagepool.MessagePool), modules.MessagePool), Override(new(*messagepool.MessagePool), modules.MessagePool),

View File

@ -163,8 +163,31 @@ func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore,
return netName, err return netName, err
} }
func NewSyncer(lc fx.Lifecycle, ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*chain.Syncer, error) { type SyncerParams struct {
syncer, err := chain.NewSyncer(ds, sm, exchange, h.ConnManager(), h.ID(), beacon, verifier) fx.In
Lifecycle fx.Lifecycle
MetadataDS dtypes.MetadataDS
StateManager *stmgr.StateManager
ChainXchg exchange.Client
SyncMgrCtor chain.SyncManagerCtor
Host host.Host
Beacon beacon.Schedule
Verifier ffiwrapper.Verifier
}
func NewSyncer(params SyncerParams) (*chain.Syncer, error) {
var (
lc = params.Lifecycle
ds = params.MetadataDS
sm = params.StateManager
ex = params.ChainXchg
smCtor = params.SyncMgrCtor
h = params.Host
b = params.Beacon
v = params.Verifier
)
syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v)
if err != nil { if err != nil {
return nil, err return nil, err
} }