getting closer, squashme
This commit is contained in:
parent
4bc523e6b4
commit
1d81c53f8f
@ -55,8 +55,7 @@ type Syncer struct {
|
||||
|
||||
self peer.ID
|
||||
|
||||
syncLock sync.Mutex
|
||||
syncState SyncerState
|
||||
syncLock sync.Mutex
|
||||
|
||||
syncmgr *SyncManager
|
||||
}
|
||||
@ -73,13 +72,12 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID)
|
||||
}
|
||||
|
||||
return &Syncer{
|
||||
bad: NewBadBlockCache(),
|
||||
Genesis: gent,
|
||||
Bsync: bsync,
|
||||
peerHeads: make(map[peer.ID]*types.TipSet),
|
||||
store: sm.ChainStore(),
|
||||
sm: sm,
|
||||
self: self,
|
||||
bad: NewBadBlockCache(),
|
||||
Genesis: gent,
|
||||
Bsync: bsync,
|
||||
store: sm.ChainStore(),
|
||||
sm: sm,
|
||||
self: self,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package chain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
@ -21,14 +22,24 @@ type SyncManager struct {
|
||||
|
||||
syncTargets chan *types.TipSet
|
||||
|
||||
asLk sync.Mutex
|
||||
activeSyncs map[types.TipSetKey]*types.TipSet
|
||||
queuedSyncs map[types.TipSetKey]*types.TipSet
|
||||
|
||||
syncState SyncerState
|
||||
|
||||
doSync func(context.Context, *types.TipSet) error
|
||||
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func NewSyncManager(sync SyncFunc) *SyncManager {
|
||||
return &SyncManager{
|
||||
peerHeads: make(map[peer.ID]*types.TipSet),
|
||||
syncTargets: make(chan *types.TipSet),
|
||||
activeSyncs: make([]*types.TipSet, syncWorkerCount),
|
||||
doSync: sync,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,6 +58,17 @@ func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) {
|
||||
spc := sm.syncedPeerCount()
|
||||
if spc >= sm.bspThresh {
|
||||
// Its go time!
|
||||
target, err := sm.selectSyncTarget()
|
||||
if err != nil {
|
||||
log.Error("failed to select sync target: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
sm.asLk.Lock()
|
||||
sm.activeSyncs[target.Key()] = target
|
||||
sm.asLk.Unlock()
|
||||
sm.syncTargets <- target
|
||||
sm.bootstrapped = true
|
||||
}
|
||||
log.Infof("sync bootstrap has %d peers", spc)
|
||||
return
|
||||
@ -54,8 +76,178 @@ func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) {
|
||||
|
||||
}
|
||||
|
||||
type syncBucketSet struct {
|
||||
buckets []*syncTargetBucket
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) Insert(ts *types.TipSet) {
|
||||
for _, b := range sbs.buckets {
|
||||
if b.sameChainAs(ts) {
|
||||
b.add(ts)
|
||||
return
|
||||
}
|
||||
}
|
||||
sbs.buckets = append(sbs.buckets, &syncTargetBucket{
|
||||
tips: []*types.TipSet{ts},
|
||||
count: 1,
|
||||
})
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) Pop() *syncTargetBucket {
|
||||
var bestBuck *syncTargetBucket
|
||||
var bestTs *types.TipSet
|
||||
for _, b := range sbs.buckets {
|
||||
hts := b.heaviestTipSet()
|
||||
if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) {
|
||||
bestBuck = b
|
||||
bestTs = hts
|
||||
}
|
||||
}
|
||||
nbuckets := make([]*syncTargetBucket, len(sbs.buckets)-1)
|
||||
return bestBuck
|
||||
}
|
||||
|
||||
func (sbs *syncBucketSet) Heaviest() *types.TipSet {
|
||||
// TODO: should also consider factoring in number of peers represented by each bucket here
|
||||
var bestTs *types.TipSet
|
||||
for _, b := range buckets {
|
||||
bhts := b.heaviestTipSet()
|
||||
if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) {
|
||||
bestTs = bhts
|
||||
}
|
||||
}
|
||||
return bestTs
|
||||
}
|
||||
|
||||
type syncTargetBucket struct {
|
||||
tips []*types.TipSet
|
||||
count int
|
||||
}
|
||||
|
||||
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
|
||||
var stb syncTargetBucket
|
||||
for _, ts := range tipsets {
|
||||
stb.add(ts)
|
||||
}
|
||||
return &stb
|
||||
}
|
||||
|
||||
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
||||
for _, t := range stb.tips {
|
||||
if ts.Equals(t) {
|
||||
return true
|
||||
}
|
||||
if types.CidArrsEqual(ts.Cids(), t.Parents()) {
|
||||
return true
|
||||
}
|
||||
if types.CidArrsEqual(ts.Parents(), t.Cids()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (stb *syncTargetBucket) add(ts *types.TipSet) {
|
||||
stb.count++
|
||||
|
||||
for _, t := range stb.tips {
|
||||
if t.Equals(ts) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
stb.tips = append(stb.tips, ts)
|
||||
}
|
||||
|
||||
func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
|
||||
var best *types.TipSet
|
||||
for _, ts := range stb.tips {
|
||||
if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) {
|
||||
best = ts
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
|
||||
panic("NYI")
|
||||
var buckets syncBucketSet
|
||||
|
||||
var peerHeads []*types.TipSet
|
||||
for _, ts := range sm.peerHeads {
|
||||
peerHeads = append(peerHeads, ts)
|
||||
}
|
||||
sort.Slice(peerHeads, func(i, j int) bool {
|
||||
return peerHeads[i].Height() < peerHeads[j].Height()
|
||||
})
|
||||
|
||||
for _, ts := range peerHeads {
|
||||
buckets.Insert(ts)
|
||||
}
|
||||
|
||||
if len(buckets.buckets) > 1 {
|
||||
log.Warning("caution, multiple distinct chains seen during head selections")
|
||||
// TODO: we *could* refuse to sync here without user intervention.
|
||||
// For now, just select the best cluster
|
||||
}
|
||||
|
||||
return buckets.Heaviest(), nil
|
||||
}
|
||||
|
||||
func (sm *SyncManager) syncScheduler() {
|
||||
var syncQueue syncBucketSet
|
||||
|
||||
var nextSyncTarget *syncTargetBucket
|
||||
var workerChan chan *types.TipSet
|
||||
|
||||
for {
|
||||
select {
|
||||
case ts, ok := <-sm.incomingTipSets:
|
||||
if !ok {
|
||||
log.Info("shutting down sync scheduler")
|
||||
return
|
||||
}
|
||||
|
||||
var relatedToActiveSync bool
|
||||
sm.asLk.Lock()
|
||||
for _, acts := range sm.activeSyncs {
|
||||
if ts.Equals(acts) {
|
||||
break
|
||||
}
|
||||
|
||||
if types.CidArrsEqual(ts.Parents(), acts.Cids()) {
|
||||
// sync this next, after that sync process finishes
|
||||
relatedToActiveSync = true
|
||||
}
|
||||
}
|
||||
sm.asLk.Unlock()
|
||||
|
||||
// if this is related to an active sync process, immediately bucket it
|
||||
// we don't want to start a parallel sync process that duplicates work
|
||||
if relatedToActiveSync {
|
||||
syncQueue.Insert(ts)
|
||||
}
|
||||
|
||||
if nextSyncTarget != nil && nextSyncTarget.sameChainAs(ts) {
|
||||
nextSyncTarget.add(ts)
|
||||
} else {
|
||||
syncQueue.Insert(ts)
|
||||
|
||||
if nextSyncTarget == nil {
|
||||
nextSyncTarget = syncQueue.Pop()
|
||||
workerChan = workerChanVal
|
||||
}
|
||||
}
|
||||
case workerChan <- nextSyncTarget.heaviestTipSet():
|
||||
if len(syncQueue.buckets) > 0 {
|
||||
nextSyncTarget = syncQueue.Pop()
|
||||
} else {
|
||||
workerChan = nil
|
||||
}
|
||||
case <-sm.stop:
|
||||
log.Info("sync scheduler shutting down")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SyncManager) syncWorker(id int) {
|
||||
@ -70,6 +262,10 @@ func (sm *SyncManager) syncWorker(id int) {
|
||||
if err := sm.doSync(context.TODO(), ts); err != nil {
|
||||
log.Errorf("sync error: %+v", err)
|
||||
}
|
||||
|
||||
sm.asLk.Lock()
|
||||
delete(sm.activeSyncs, ts.Key())
|
||||
sm.asLk.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -197,6 +197,10 @@ func (ts *TipSet) ParentState() cid.Cid {
|
||||
return ts.blks[0].ParentStateRoot
|
||||
}
|
||||
|
||||
func (ts *TipSet) ParentWeight() BigInt {
|
||||
return ts.blks[0].ParentWeight
|
||||
}
|
||||
|
||||
func (ts *TipSet) Contains(oc cid.Cid) bool {
|
||||
for _, c := range ts.cids {
|
||||
if c == oc {
|
||||
|
Loading…
Reference in New Issue
Block a user