track recently synced tipsets to avoid unnecessary worker spawning
This commit is contained in:
parent
2b82e5a118
commit
ab7a66b90d
@ -16,6 +16,8 @@ import (
|
||||
|
||||
var BootstrapPeerThreshold = 1
|
||||
|
||||
var RecentSyncBufferSize = 10
|
||||
|
||||
var coalesceTipsets = false
|
||||
|
||||
func init() {
|
||||
@ -55,6 +57,7 @@ type syncManager struct {
|
||||
nextWorker uint64
|
||||
pend syncBucketSet
|
||||
heads map[peer.ID]*types.TipSet
|
||||
recent *syncBuffer
|
||||
|
||||
mx sync.Mutex
|
||||
state map[uint64]*workerState
|
||||
@ -92,6 +95,7 @@ func NewSyncManager(sync SyncFunc) SyncManager {
|
||||
|
||||
heads: make(map[peer.ID]*types.TipSet),
|
||||
state: make(map[uint64]*workerState),
|
||||
recent: newSyncBuffer(RecentSyncBufferSize),
|
||||
|
||||
doSync: sync,
|
||||
}
|
||||
@ -203,6 +207,9 @@ func (sm *syncManager) handleWorkerStatus(status workerStatus) {
|
||||
// we failed to sync this target -- log it and try to work on an extended chain
|
||||
// if there is nothing related to be worked on, we stop working on this chain.
|
||||
log.Errorf("error during sync in %s: %s", ws.ts, status.err)
|
||||
} else {
|
||||
// add to the recently synced buffer
|
||||
sm.recent.Push(ws.ts)
|
||||
}
|
||||
|
||||
// we are done with this target, select the next sync target and spawn a worker if there is work
|
||||
@ -286,6 +293,12 @@ func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, err
|
||||
// Note: we don't need the state lock here to access the active worker states, as the only
|
||||
// competing threads that may access it do so through State() which is read only.
|
||||
|
||||
// if we have recently synced this or any heavier tipset we just ignore it; this can happen
|
||||
// with an empty worker set after we just finished syncing to a target
|
||||
if sm.recent.Synced(ts) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// if the worker set is empty, we have finished syncing and were waiting for the next tipset
|
||||
// in this case, we just return the tipset as work to be done
|
||||
if len(sm.state) == 0 {
|
||||
@ -341,6 +354,10 @@ func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool
|
||||
}
|
||||
|
||||
heaviest := related.heaviestTipSet()
|
||||
if isHeavier(done, heaviest) {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
for _, ws := range sm.state {
|
||||
if isHeavier(ws.ts, heaviest) {
|
||||
return nil, false, nil
|
||||
@ -354,6 +371,47 @@ func isHeavier(a, b *types.TipSet) bool {
|
||||
return a.ParentWeight().GreaterThan(b.ParentWeight())
|
||||
}
|
||||
|
||||
// sync buffer -- this is a circular buffer of recently synced tipsets
|
||||
type syncBuffer struct {
|
||||
buf []*types.TipSet
|
||||
next int64
|
||||
}
|
||||
|
||||
func newSyncBuffer(size int) *syncBuffer {
|
||||
return &syncBuffer{buf: make([]*types.TipSet, size)}
|
||||
}
|
||||
|
||||
func (sb *syncBuffer) Push(ts *types.TipSet) {
|
||||
i := int(sb.next % int64(len(sb.buf)))
|
||||
sb.buf[i] = ts
|
||||
sb.next++
|
||||
}
|
||||
|
||||
func (sb *syncBuffer) Synced(ts *types.TipSet) bool {
|
||||
synced := func(a, b *types.TipSet) bool {
|
||||
return a.Equals(b) || isHeavier(a, b)
|
||||
}
|
||||
|
||||
if sb.next < int64(len(sb.buf)) {
|
||||
for i := int(sb.next - 1); i >= 0; i-- {
|
||||
if synced(sb.buf[i], ts) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
for j := 1; j < len(sb.buf); j++ {
|
||||
i := int((sb.next - int64(j)) % int64(len(sb.buf)))
|
||||
if synced(sb.buf[i], ts) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// sync buckets and related utilities
|
||||
type syncBucketSet struct {
|
||||
buckets []*syncTargetBucket
|
||||
|
Loading…
Reference in New Issue
Block a user