rewrite sync manager
This commit is contained in:
parent
d4cdc6d334
commit
5d34b7d618
@ -7,11 +7,13 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
const BootstrapPeerThreshold = 2
|
||||
var BootstrapPeerThreshold = 2
|
||||
|
||||
var coalesceForksParents = false
|
||||
|
||||
@ -21,13 +23,6 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
BSStateInit = 0
|
||||
BSStateSelected = 1
|
||||
BSStateScheduled = 2
|
||||
BSStateComplete = 3
|
||||
)
|
||||
|
||||
type SyncFunc func(context.Context, *types.TipSet) error
|
||||
|
||||
// SyncManager manages the chain synchronization process, both at bootstrap time
|
||||
@ -52,108 +47,327 @@ type SyncManager interface {
|
||||
}
|
||||
|
||||
type syncManager struct {
|
||||
lk sync.Mutex
|
||||
peerHeads map[peer.ID]*types.TipSet
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
|
||||
bssLk sync.Mutex
|
||||
bootstrapState int
|
||||
workq chan peerHead
|
||||
statusq chan workerStatus
|
||||
|
||||
bspThresh int
|
||||
nextWorker uint64
|
||||
pend syncBucketSet
|
||||
heads map[peer.ID]*types.TipSet
|
||||
|
||||
incomingTipSets chan *types.TipSet
|
||||
syncTargets chan *types.TipSet
|
||||
syncResults chan *syncResult
|
||||
mx sync.Mutex
|
||||
state map[uint64]*workerState
|
||||
|
||||
syncStates []*SyncerState
|
||||
|
||||
// Normally this handler is set to `(*Syncer).Sync()`.
|
||||
doSync func(context.Context, *types.TipSet) error
|
||||
|
||||
stop chan struct{}
|
||||
|
||||
// Sync Scheduler fields
|
||||
activeSyncs map[types.TipSetKey]*types.TipSet
|
||||
syncQueue syncBucketSet
|
||||
activeSyncTips syncBucketSet
|
||||
nextSyncTarget *syncTargetBucket
|
||||
workerChan chan *types.TipSet
|
||||
}
|
||||
|
||||
var _ SyncManager = (*syncManager)(nil)
|
||||
|
||||
type syncResult struct {
|
||||
ts *types.TipSet
|
||||
success bool
|
||||
type peerHead struct {
|
||||
p peer.ID
|
||||
ts *types.TipSet
|
||||
}
|
||||
|
||||
const syncWorkerCount = 3
|
||||
type workerState struct {
|
||||
id uint64
|
||||
ts *types.TipSet
|
||||
ss *SyncerState
|
||||
}
|
||||
|
||||
type workerStatus struct {
|
||||
id uint64
|
||||
err error
|
||||
}
|
||||
|
||||
// sync manager interface
|
||||
func NewSyncManager(sync SyncFunc) SyncManager {
|
||||
sm := &syncManager{
|
||||
bspThresh: 1,
|
||||
peerHeads: make(map[peer.ID]*types.TipSet),
|
||||
syncTargets: make(chan *types.TipSet),
|
||||
syncResults: make(chan *syncResult),
|
||||
syncStates: make([]*SyncerState, syncWorkerCount),
|
||||
incomingTipSets: make(chan *types.TipSet),
|
||||
activeSyncs: make(map[types.TipSetKey]*types.TipSet),
|
||||
doSync: sync,
|
||||
stop: make(chan struct{}),
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &syncManager{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
|
||||
workq: make(chan peerHead),
|
||||
statusq: make(chan workerStatus),
|
||||
|
||||
heads: make(map[peer.ID]*types.TipSet),
|
||||
state: make(map[uint64]*workerState),
|
||||
|
||||
doSync: sync,
|
||||
}
|
||||
for i := range sm.syncStates {
|
||||
sm.syncStates[i] = new(SyncerState)
|
||||
}
|
||||
return sm
|
||||
}
|
||||
|
||||
func (sm *syncManager) Start() {
|
||||
go sm.syncScheduler()
|
||||
for i := 0; i < syncWorkerCount; i++ {
|
||||
go sm.syncWorker(i)
|
||||
}
|
||||
go sm.scheduler()
|
||||
}
|
||||
|
||||
func (sm *syncManager) Stop() {
|
||||
close(sm.stop)
|
||||
select {
|
||||
case <-sm.ctx.Done():
|
||||
default:
|
||||
sm.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
|
||||
sm.lk.Lock()
|
||||
defer sm.lk.Unlock()
|
||||
sm.peerHeads[p] = ts
|
||||
|
||||
if sm.getBootstrapState() == BSStateInit {
|
||||
spc := sm.syncedPeerCount()
|
||||
if spc >= sm.bspThresh {
|
||||
// Its go time!
|
||||
target, err := sm.selectSyncTarget()
|
||||
if err != nil {
|
||||
log.Error("failed to select sync target: ", err)
|
||||
return
|
||||
}
|
||||
sm.setBootstrapState(BSStateSelected)
|
||||
|
||||
sm.incomingTipSets <- target
|
||||
}
|
||||
log.Infof("sync bootstrap has %d peers", spc)
|
||||
return
|
||||
select {
|
||||
case sm.workq <- peerHead{p: p, ts: ts}:
|
||||
case <-sm.ctx.Done():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
sm.incomingTipSets <- ts
|
||||
}
|
||||
|
||||
func (sm *syncManager) State() []SyncerStateSnapshot {
|
||||
ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates))
|
||||
for _, s := range sm.syncStates {
|
||||
ret = append(ret, s.Snapshot())
|
||||
sm.mx.Lock()
|
||||
workerStates := make([]*workerState, 0, len(sm.state))
|
||||
for _, ws := range sm.state {
|
||||
workerStates = append(workerStates, ws)
|
||||
}
|
||||
return ret
|
||||
sm.mx.Unlock()
|
||||
|
||||
sort.Slice(workerStates, func(i, j int) bool {
|
||||
return workerStates[i].id < workerStates[j].id
|
||||
})
|
||||
|
||||
result := make([]SyncerStateSnapshot, 0, len(workerStates))
|
||||
for _, ws := range workerStates {
|
||||
result = append(result, ws.ss.Snapshot())
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// sync manager internals
|
||||
func (sm *syncManager) scheduler() {
|
||||
for {
|
||||
select {
|
||||
case head := <-sm.workq:
|
||||
sm.handlePeerHead(head)
|
||||
case status := <-sm.statusq:
|
||||
sm.handleWorkerStatus(status)
|
||||
case <-sm.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) handlePeerHead(head peerHead) {
|
||||
log.Infof("new peer head: %s %s", head.p, head.ts)
|
||||
|
||||
// have we started syncing yet?
|
||||
if sm.nextWorker == 0 {
|
||||
// track the peer head until we start syncing
|
||||
sm.heads[head.p] = head.ts
|
||||
|
||||
// not yet; do we have enough peers?
|
||||
if len(sm.heads) < BootstrapPeerThreshold {
|
||||
// not enough peers; track it and wait
|
||||
return
|
||||
}
|
||||
|
||||
// we are ready to start syncing; select the sync target and spawn a worker
|
||||
target, err := sm.selectInitialSyncTarget()
|
||||
if err != nil {
|
||||
log.Errorf("failed to select initial sync target: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("selected initial sync target: %s", target)
|
||||
sm.spawnWorker(target)
|
||||
return
|
||||
}
|
||||
|
||||
// we have started syncing, add peer head to the queue if applicable and maybe spawn a worker
|
||||
// if there is work to do (possibly in a fork)
|
||||
target, work, err := sm.addSyncTarget(head.ts)
|
||||
if err != nil {
|
||||
log.Warnf("failed to add sync target: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if work {
|
||||
log.Infof("selected sync target: %s", target)
|
||||
sm.spawnWorker(target)
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) handleWorkerStatus(status workerStatus) {
|
||||
log.Debugf("worker %d done; status error: %s", status.err)
|
||||
|
||||
sm.mx.Lock()
|
||||
ws := sm.state[status.id]
|
||||
delete(sm.state, status.id)
|
||||
sm.mx.Unlock()
|
||||
|
||||
if status.err != nil {
|
||||
// we failed to sync this target -- log it and try to work on an extended chain
|
||||
// if there is nothing related to be worked on, we stop working on this chain.
|
||||
log.Errorf("error during sync in %s: %s", ws.ts, status.err)
|
||||
}
|
||||
|
||||
// we are done with this target, select the next sync target and spawn a worker if there is work
|
||||
// to do, because of an extension of this chain.
|
||||
target, work, err := sm.selectSyncTarget(ws.ts)
|
||||
if err != nil {
|
||||
log.Warnf("failed to select sync target: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if work {
|
||||
log.Infof("selected sync target: %s", target)
|
||||
sm.spawnWorker(target)
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) spawnWorker(target *types.TipSet) {
|
||||
id := sm.nextWorker
|
||||
sm.nextWorker++
|
||||
ws := &workerState{
|
||||
id: id,
|
||||
ts: target,
|
||||
ss: new(SyncerState),
|
||||
}
|
||||
|
||||
sm.mx.Lock()
|
||||
sm.state[id] = ws
|
||||
sm.mx.Unlock()
|
||||
|
||||
go sm.worker(ws)
|
||||
}
|
||||
|
||||
func (sm *syncManager) worker(ws *workerState) {
|
||||
log.Infof("worker %d syncing in %s", ws.id, ws.ss)
|
||||
|
||||
start := build.Clock.Now()
|
||||
defer func() {
|
||||
log.Infof("worker %d done; took %s", ws.id, build.Clock.Since(start))
|
||||
}()
|
||||
|
||||
ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss)
|
||||
err := sm.doSync(ctx, ws.ts)
|
||||
|
||||
select {
|
||||
case sm.statusq <- workerStatus{id: ws.id, err: err}:
|
||||
case <-sm.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
// selects the initial sync target by examining known peer heads; only called once for the initial
|
||||
// sync.
|
||||
func (sm *syncManager) selectInitialSyncTarget() (*types.TipSet, error) {
|
||||
var buckets syncBucketSet
|
||||
|
||||
var peerHeads []*types.TipSet
|
||||
for _, ts := range sm.heads {
|
||||
peerHeads = append(peerHeads, ts)
|
||||
}
|
||||
// clear the map, we don't use it any longer
|
||||
sm.heads = nil
|
||||
|
||||
sort.Slice(peerHeads, func(i, j int) bool {
|
||||
return peerHeads[i].Height() < peerHeads[j].Height()
|
||||
})
|
||||
|
||||
for _, ts := range peerHeads {
|
||||
buckets.Insert(ts)
|
||||
}
|
||||
|
||||
if len(buckets.buckets) > 1 {
|
||||
log.Warn("caution, multiple distinct chains seen during head selections")
|
||||
// TODO: we *could* refuse to sync here without user intervention.
|
||||
// For now, just select the best cluster
|
||||
}
|
||||
|
||||
return buckets.Heaviest(), nil
|
||||
}
|
||||
|
||||
// adds a tipset to the potential sync targets; returns true if there is a a tipset to work on.
|
||||
// this could be either a restart, eg because there is no currently scheduled sync work or a worker
|
||||
// failed or a potential fork.
|
||||
func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, error) {
|
||||
// Note: we don't need the state lock here to access the active worker states, as the only
|
||||
// competing threads that may access it do so through State() which is read only.
|
||||
|
||||
// if the worker set is empty, we have finished syncing and were waiting for the next tipset
|
||||
// in this case, we just return the tipset as work to be done
|
||||
if len(sm.state) == 0 {
|
||||
return ts, true, nil
|
||||
}
|
||||
|
||||
// check if it is related to any active sync; if so insert into the pending sync queue
|
||||
for _, ws := range sm.state {
|
||||
if ts.Equals(ws.ts) {
|
||||
// ignore it, we are already syncing it
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
if ts.Parents() == ws.ts.Key() {
|
||||
// schedule for syncing next; it's an extension of an active sync
|
||||
sm.pend.Insert(ts)
|
||||
return nil, false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// check to see if it is related to any pending sync; if so insert it into the pending sync queue
|
||||
if sm.pend.RelatedToAny(ts) {
|
||||
sm.pend.Insert(ts)
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// it's not related to any active or pending sync; this could be a fork in which case we
|
||||
// start a new worker to sync it, if it is *heavier* than any active or pending set;
|
||||
// if it is not, we ignore it.
|
||||
activeHeavier := false
|
||||
for _, ws := range sm.state {
|
||||
if ws.ts.Height() > ts.Height() {
|
||||
activeHeavier = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if activeHeavier {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
pendHeaviest := sm.pend.Heaviest()
|
||||
if pendHeaviest != nil && pendHeaviest.Height() > ts.Height() {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// start a new worker, seems heavy enough and unrelated to active or pending syncs
|
||||
return ts, true, nil
|
||||
}
|
||||
|
||||
// selects the next sync target after a worker sync has finished; returns true and a target
|
||||
// TipSet if this chain should continue to sync because there is a heavier related tipset.
|
||||
func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool, error) {
|
||||
// we pop the related bucket and if there is any related tipset, we work on the heaviest one next
|
||||
// if we are not already working on a heavier tipset
|
||||
related := sm.pend.PopRelated(done)
|
||||
if related == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
heaviest := related.heaviestTipSet()
|
||||
for _, ws := range sm.state {
|
||||
if ws.ts.Height() > heaviest.Height() {
|
||||
return nil, false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return heaviest, true, nil
|
||||
}
|
||||
|
||||
// sync buckets and related utilities
|
||||
type syncBucketSet struct {
|
||||
buckets []*syncTargetBucket
|
||||
}
|
||||
|
||||
type syncTargetBucket struct {
|
||||
tips []*types.TipSet
|
||||
}
|
||||
|
||||
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
|
||||
var stb syncTargetBucket
|
||||
for _, ts := range tipsets {
|
||||
@ -250,10 +464,6 @@ func (sbs *syncBucketSet) Empty() bool {
|
||||
return len(sbs.buckets) == 0
|
||||
}
|
||||
|
||||
type syncTargetBucket struct {
|
||||
tips []*types.TipSet
|
||||
}
|
||||
|
||||
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
||||
for _, t := range stb.tips {
|
||||
if ts.Equals(t) {
|
||||
@ -296,196 +506,3 @@ func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) {
|
||||
var buckets syncBucketSet
|
||||
|
||||
var peerHeads []*types.TipSet
|
||||
for _, ts := range sm.peerHeads {
|
||||
peerHeads = append(peerHeads, ts)
|
||||
}
|
||||
sort.Slice(peerHeads, func(i, j int) bool {
|
||||
return peerHeads[i].Height() < peerHeads[j].Height()
|
||||
})
|
||||
|
||||
for _, ts := range peerHeads {
|
||||
buckets.Insert(ts)
|
||||
}
|
||||
|
||||
if len(buckets.buckets) > 1 {
|
||||
log.Warn("caution, multiple distinct chains seen during head selections")
|
||||
// TODO: we *could* refuse to sync here without user intervention.
|
||||
// For now, just select the best cluster
|
||||
}
|
||||
|
||||
return buckets.Heaviest(), nil
|
||||
}
|
||||
|
||||
func (sm *syncManager) syncScheduler() {
|
||||
for {
|
||||
select {
|
||||
case ts, ok := <-sm.incomingTipSets:
|
||||
if !ok {
|
||||
log.Info("shutting down sync scheduler")
|
||||
return
|
||||
}
|
||||
|
||||
sm.scheduleIncoming(ts)
|
||||
case res := <-sm.syncResults:
|
||||
sm.scheduleProcessResult(res)
|
||||
case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet():
|
||||
sm.scheduleWorkSent()
|
||||
case <-sm.stop:
|
||||
log.Info("sync scheduler shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) scheduleIncoming(ts *types.TipSet) {
|
||||
log.Debug("scheduling incoming tipset sync: ", ts.Cids())
|
||||
if sm.getBootstrapState() == BSStateSelected {
|
||||
sm.setBootstrapState(BSStateScheduled)
|
||||
sm.syncTargets <- ts
|
||||
return
|
||||
}
|
||||
|
||||
var relatedToActiveSync bool
|
||||
for _, acts := range sm.activeSyncs {
|
||||
if ts.Equals(acts) {
|
||||
// ignore, we are already syncing it
|
||||
return
|
||||
}
|
||||
|
||||
if ts.Parents() == acts.Key() {
|
||||
// sync this next, after that sync process finishes
|
||||
relatedToActiveSync = true
|
||||
}
|
||||
}
|
||||
|
||||
if !relatedToActiveSync && sm.activeSyncTips.RelatedToAny(ts) {
|
||||
relatedToActiveSync = true
|
||||
}
|
||||
|
||||
// if this is related to an active sync process, immediately bucket it
|
||||
// we don't want to start a parallel sync process that duplicates work
|
||||
if relatedToActiveSync {
|
||||
sm.activeSyncTips.Insert(ts)
|
||||
return
|
||||
}
|
||||
|
||||
if sm.getBootstrapState() == BSStateScheduled {
|
||||
sm.syncQueue.Insert(ts)
|
||||
return
|
||||
}
|
||||
|
||||
if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) {
|
||||
sm.nextSyncTarget.add(ts)
|
||||
} else {
|
||||
sm.syncQueue.Insert(ts)
|
||||
|
||||
if sm.nextSyncTarget == nil {
|
||||
sm.nextSyncTarget = sm.syncQueue.Pop()
|
||||
sm.workerChan = sm.syncTargets
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) scheduleProcessResult(res *syncResult) {
|
||||
if res.success && sm.getBootstrapState() != BSStateComplete {
|
||||
sm.setBootstrapState(BSStateComplete)
|
||||
}
|
||||
|
||||
delete(sm.activeSyncs, res.ts.Key())
|
||||
relbucket := sm.activeSyncTips.PopRelated(res.ts)
|
||||
if relbucket != nil {
|
||||
if res.success {
|
||||
if sm.nextSyncTarget == nil {
|
||||
sm.nextSyncTarget = relbucket
|
||||
sm.workerChan = sm.syncTargets
|
||||
} else {
|
||||
for _, t := range relbucket.tips {
|
||||
sm.syncQueue.Insert(t)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
// TODO: this is the case where we try to sync a chain, and
|
||||
// fail, and we have more blocks on top of that chain that
|
||||
// have come in since. The question is, should we try to
|
||||
// sync these? or just drop them?
|
||||
log.Error("failed to sync chain but have new unconnected blocks from chain")
|
||||
}
|
||||
|
||||
if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() {
|
||||
next := sm.syncQueue.Pop()
|
||||
if next != nil {
|
||||
sm.nextSyncTarget = next
|
||||
sm.workerChan = sm.syncTargets
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) scheduleWorkSent() {
|
||||
hts := sm.nextSyncTarget.heaviestTipSet()
|
||||
sm.activeSyncs[hts.Key()] = hts
|
||||
|
||||
if !sm.syncQueue.Empty() {
|
||||
sm.nextSyncTarget = sm.syncQueue.Pop()
|
||||
} else {
|
||||
sm.nextSyncTarget = nil
|
||||
sm.workerChan = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) syncWorker(id int) {
|
||||
ss := sm.syncStates[id]
|
||||
for {
|
||||
select {
|
||||
case ts, ok := <-sm.syncTargets:
|
||||
if !ok {
|
||||
log.Info("sync manager worker shutting down")
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.WithValue(context.TODO(), syncStateKey{}, ss)
|
||||
err := sm.doSync(ctx, ts)
|
||||
if err != nil {
|
||||
log.Errorf("sync error: %+v", err)
|
||||
}
|
||||
|
||||
sm.syncResults <- &syncResult{
|
||||
ts: ts,
|
||||
success: err == nil,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *syncManager) syncedPeerCount() int {
|
||||
var count int
|
||||
for _, ts := range sm.peerHeads {
|
||||
if ts.Height() > 0 {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
func (sm *syncManager) getBootstrapState() int {
|
||||
sm.bssLk.Lock()
|
||||
defer sm.bssLk.Unlock()
|
||||
return sm.bootstrapState
|
||||
}
|
||||
|
||||
func (sm *syncManager) setBootstrapState(v int) {
|
||||
sm.bssLk.Lock()
|
||||
defer sm.bssLk.Unlock()
|
||||
sm.bootstrapState = v
|
||||
}
|
||||
|
||||
func (sm *syncManager) IsBootstrapped() bool {
|
||||
sm.bssLk.Lock()
|
||||
defer sm.bssLk.Unlock()
|
||||
return sm.bootstrapState == BSStateComplete
|
||||
}
|
||||
|
@ -28,7 +28,12 @@ func runSyncMgrTest(t *testing.T, tname string, thresh int, tf func(*testing.T,
|
||||
<-ch
|
||||
return nil
|
||||
}).(*syncManager)
|
||||
sm.bspThresh = thresh
|
||||
|
||||
oldBootstrapPeerThreshold := BootstrapPeerThreshold
|
||||
BootstrapPeerThreshold = thresh
|
||||
defer func() {
|
||||
BootstrapPeerThreshold = oldBootstrapPeerThreshold
|
||||
}()
|
||||
|
||||
sm.Start()
|
||||
defer sm.Stop()
|
||||
@ -112,8 +117,8 @@ func TestSyncManagerEdgeCase(t *testing.T) {
|
||||
|
||||
waitUntilAllWorkersAreDone(stc)
|
||||
|
||||
if len(sm.activeSyncTips.buckets) != 0 {
|
||||
t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String())
|
||||
if len(sm.state) != 0 {
|
||||
t.Errorf("active syncs expected empty but got: %d", len(sm.state))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user