syncer: make SyncManager an interface.

This commit is contained in:
Raúl Kripalani 2020-09-14 21:58:59 +01:00
parent 86607452d7
commit dd841f32db
5 changed files with 97 additions and 38 deletions

View File

@ -58,7 +58,14 @@ import (
// the theoretical max height based on systime are quickly rejected
const MaxHeightDrift = 5
var defaultMessageFetchWindowSize = 200
var (
// LocalIncoming is the _local_ pubsub (unrelated to libp2p pubsub) topic
// where the Syncer publishes candidate chain heads to be synced.
LocalIncoming = "incoming"
log = logging.Logger("chain")
defaultMessageFetchWindowSize = 200
)
func init() {
if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" {
@ -71,10 +78,6 @@ func init() {
}
}
var log = logging.Logger("chain")
var LocalIncoming = "incoming"
// Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others:
//
@ -119,7 +122,7 @@ type Syncer struct {
self peer.ID
syncmgr *SyncManager
syncmgr SyncManager
connmgr connmgr.ConnManager
@ -140,8 +143,10 @@ type Syncer struct {
ds dtypes.MetadataDS
}
type SyncManagerCtor func(syncFn SyncFunc) SyncManager
// NewSyncer creates a new Syncer object.
func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*Syncer, error) {
func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, syncMgrCtor SyncManagerCtor, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*Syncer, error) {
gen, err := sm.ChainStore().GetGenesis()
if err != nil {
return nil, xerrors.Errorf("getting genesis block: %w", err)
@ -181,7 +186,7 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C
log.Warn("*********************************************************************************************")
}
s.syncmgr = NewSyncManager(s.Sync)
s.syncmgr = syncMgrCtor(s.Sync)
return s, nil
}
@ -1665,11 +1670,7 @@ func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []b
}
func (syncer *Syncer) State() []SyncerState {
var out []SyncerState
for _, ss := range syncer.syncmgr.syncStates {
out = append(out, ss.Snapshot())
}
return out
return syncer.syncmgr.State()
}
// MarkBad manually adds a block to the "bad blocks" cache.

View File

@ -20,7 +20,28 @@ const (
type SyncFunc func(context.Context, *types.TipSet) error
type SyncManager struct {
// SyncManager manages the chain synchronization process, both at bootstrap time
// and during ongoing operation.
//
// It receives candidate chain heads in the form of tipsets from peers,
// and schedules them onto sync workers, deduplicating processing for
// already-active syncs.
type SyncManager interface {
// Start starts the SyncManager.
Start()
// Stop stops the SyncManager.
Stop()
// SetPeerHead informs the SyncManager that the supplied peer reported the
// supplied tipset.
SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet)
// State retrieves the state of the sync workers.
State() []SyncerState
}
type syncManager struct {
lk sync.Mutex
peerHeads map[peer.ID]*types.TipSet
@ -48,6 +69,8 @@ type SyncManager struct {
workerChan chan *types.TipSet
}
var _ SyncManager = (*syncManager)(nil)
type syncResult struct {
ts *types.TipSet
success bool
@ -55,8 +78,8 @@ type syncResult struct {
const syncWorkerCount = 3
func NewSyncManager(sync SyncFunc) *SyncManager {
return &SyncManager{
func NewSyncManager(sync SyncFunc) SyncManager {
return &syncManager{
bspThresh: 1,
peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet),
@ -69,18 +92,18 @@ func NewSyncManager(sync SyncFunc) *SyncManager {
}
}
func (sm *SyncManager) Start() {
func (sm *syncManager) Start() {
go sm.syncScheduler()
for i := 0; i < syncWorkerCount; i++ {
go sm.syncWorker(i)
}
}
func (sm *SyncManager) Stop() {
func (sm *syncManager) Stop() {
close(sm.stop)
}
func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.peerHeads[p] = ts
@ -105,6 +128,14 @@ func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip
sm.incomingTipSets <- ts
}
func (sm *syncManager) State() []SyncerState {
ret := make([]SyncerState, 0, len(sm.syncStates))
for _, s := range sm.syncStates {
ret = append(ret, s.Snapshot())
}
return ret
}
type syncBucketSet struct {
buckets []*syncTargetBucket
}
@ -234,7 +265,7 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
return best
}
func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) {
var buckets syncBucketSet
var peerHeads []*types.TipSet
@ -258,7 +289,7 @@ func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
return buckets.Heaviest(), nil
}
func (sm *SyncManager) syncScheduler() {
func (sm *syncManager) syncScheduler() {
for {
select {
@ -280,7 +311,7 @@ func (sm *SyncManager) syncScheduler() {
}
}
func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
func (sm *syncManager) scheduleIncoming(ts *types.TipSet) {
log.Debug("scheduling incoming tipset sync: ", ts.Cids())
if sm.getBootstrapState() == BSStateSelected {
sm.setBootstrapState(BSStateScheduled)
@ -328,10 +359,11 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
}
}
func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
func (sm *syncManager) scheduleProcessResult(res *syncResult) {
if res.success && sm.getBootstrapState() != BSStateComplete {
sm.setBootstrapState(BSStateComplete)
}
delete(sm.activeSyncs, res.ts.Key())
relbucket := sm.activeSyncTips.PopRelated(res.ts)
if relbucket != nil {
@ -360,7 +392,7 @@ func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
}
}
func (sm *SyncManager) scheduleWorkSent() {
func (sm *syncManager) scheduleWorkSent() {
hts := sm.nextSyncTarget.heaviestTipSet()
sm.activeSyncs[hts.Key()] = hts
@ -372,7 +404,7 @@ func (sm *SyncManager) scheduleWorkSent() {
}
}
func (sm *SyncManager) syncWorker(id int) {
func (sm *syncManager) syncWorker(id int) {
ss := &SyncerState{}
sm.syncStates[id] = ss
for {
@ -397,7 +429,7 @@ func (sm *SyncManager) syncWorker(id int) {
}
}
func (sm *SyncManager) syncedPeerCount() int {
func (sm *syncManager) syncedPeerCount() int {
var count int
for _, ts := range sm.peerHeads {
if ts.Height() > 0 {
@ -407,19 +439,19 @@ func (sm *SyncManager) syncedPeerCount() int {
return count
}
func (sm *SyncManager) getBootstrapState() int {
func (sm *syncManager) getBootstrapState() int {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
return sm.bootstrapState
}
func (sm *SyncManager) setBootstrapState(v int) {
func (sm *syncManager) setBootstrapState(v int) {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
sm.bootstrapState = v
}
func (sm *SyncManager) IsBootstrapped() bool {
func (sm *syncManager) IsBootstrapped() bool {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
return sm.bootstrapState == BSStateComplete

View File

@ -17,7 +17,7 @@ type syncOp struct {
done func()
}
func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *SyncManager, chan *syncOp)) {
func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T, *syncManager, chan *syncOp)) {
syncTargets := make(chan *syncOp)
sm := NewSyncManager(func(ctx context.Context, ts *types.TipSet) error {
ch := make(chan struct{})
@ -27,7 +27,7 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T,
}
<-ch
return nil
})
}).(*syncManager)
sm.bspThresh = thresh
sm.Start()
@ -77,12 +77,12 @@ func TestSyncManager(t *testing.T) {
c3 := mock.TipSet(mock.MkBlock(b, 3, 5))
d := mock.TipSet(mock.MkBlock(c1, 4, 5))
runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", c1)
assertGetSyncOp(t, stc, c1)
})
runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
runSyncMgrTest(t, "testBootstrap", 2, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", c1)
assertNoOp(t, stc)
@ -90,7 +90,7 @@ func TestSyncManager(t *testing.T) {
assertGetSyncOp(t, stc, c1)
})
runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
runSyncMgrTest(t, "testSyncAfterBootstrap", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", b)
assertGetSyncOp(t, stc, b)
@ -101,7 +101,7 @@ func TestSyncManager(t *testing.T) {
assertGetSyncOp(t, stc, c2)
})
runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
runSyncMgrTest(t, "testCoalescing", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", a)
assertGetSyncOp(t, stc, a)
@ -122,7 +122,7 @@ func TestSyncManager(t *testing.T) {
assertGetSyncOp(t, stc, d)
})
runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
sm.SetPeerHead(ctx, "peer1", a)
assertGetSyncOp(t, stc, a)

View File

@ -242,6 +242,9 @@ func Online() Option {
Override(new(dtypes.ChainBlockService), modules.ChainBlockService),
// Filecoin services
// We don't want the SyncManagerCtor to be used as an fx constructor, but rather as a value.
// It will be called implicitly by the Syncer constructor.
Override(new(chain.SyncManagerCtor), func() chain.SyncManagerCtor { return chain.NewSyncManager }),
Override(new(*chain.Syncer), modules.NewSyncer),
Override(new(exchange.Client), exchange.NewClient),
Override(new(*messagepool.MessagePool), modules.MessagePool),

View File

@ -163,8 +163,31 @@ func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore,
return netName, err
}
func NewSyncer(lc fx.Lifecycle, ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*chain.Syncer, error) {
syncer, err := chain.NewSyncer(ds, sm, exchange, h.ConnManager(), h.ID(), beacon, verifier)
type SyncerParams struct {
fx.In
Lifecycle fx.Lifecycle
MetadataDS dtypes.MetadataDS
StateManager *stmgr.StateManager
ChainXchg exchange.Client
SyncMgrCtor chain.SyncManagerCtor
Host host.Host
Beacon beacon.Schedule
Verifier ffiwrapper.Verifier
}
func NewSyncer(params SyncerParams) (*chain.Syncer, error) {
var (
lc = params.Lifecycle
ds = params.MetadataDS
sm = params.StateManager
ex = params.ChainXchg
smCtor = params.SyncMgrCtor
h = params.Host
b = params.Beacon
v = params.Verifier
)
syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v)
if err != nil {
return nil, err
}