improve bootstrap sync handling

This commit is contained in:
whyrusleeping 2019-12-08 12:44:47 +01:00
parent 8bb6ab02e3
commit 17b0cb1d3d

View File

@ -2,6 +2,7 @@ package chain
import ( import (
"context" "context"
"fmt"
"sort" "sort"
"sync" "sync"
@ -11,12 +12,21 @@ import (
const BootstrapPeerThreshold = 2 const BootstrapPeerThreshold = 2
const (
BSStateInit = 0
BSStateSelected = 1
BSStateScheduled = 2
BSStateComplete = 3
)
type SyncFunc func(context.Context, *types.TipSet) error type SyncFunc func(context.Context, *types.TipSet) error
type SyncManager struct { type SyncManager struct {
lk sync.Mutex lk sync.Mutex
peerHeads map[peer.ID]*types.TipSet peerHeads map[peer.ID]*types.TipSet
bootstrapped bool
bssLk sync.Mutex
bootstrapState int
bspThresh int bspThresh int
@ -75,7 +85,7 @@ func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip
defer sm.lk.Unlock() defer sm.lk.Unlock()
sm.peerHeads[p] = ts sm.peerHeads[p] = ts
if !sm.bootstrapped { if sm.getBootstrapState() == BSStateInit {
spc := sm.syncedPeerCount() spc := sm.syncedPeerCount()
if spc >= sm.bspThresh { if spc >= sm.bspThresh {
// Its go time! // Its go time!
@ -84,14 +94,15 @@ func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip
log.Error("failed to select sync target: ", err) log.Error("failed to select sync target: ", err)
return return
} }
sm.setBootstrapState(BSStateSelected)
fmt.Println("schedule bootstrap sync")
sm.incomingTipSets <- target sm.incomingTipSets <- target
// TODO: is this the right place to say we're bootstrapped? probably want to wait until the sync finishes
sm.bootstrapped = true
} }
log.Infof("sync bootstrap has %d peers", spc) log.Infof("sync bootstrap has %d peers", spc)
return return
} }
fmt.Println("sending off incoming tipset")
sm.incomingTipSets <- ts sm.incomingTipSets <- ts
} }
@ -175,6 +186,10 @@ func (sbs *syncBucketSet) Heaviest() *types.TipSet {
return bestTs return bestTs
} }
func (sbs *syncBucketSet) Empty() bool {
return len(sbs.buckets) == 0
}
type syncTargetBucket struct { type syncTargetBucket struct {
tips []*types.TipSet tips []*types.TipSet
count int count int
@ -271,6 +286,14 @@ func (sm *SyncManager) syncScheduler() {
} }
func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) { func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
fmt.Println("INCOMING: ", ts.Height())
if sm.getBootstrapState() == BSStateSelected {
sm.setBootstrapState(BSStateScheduled)
fmt.Println("start bootstrap sync")
sm.syncTargets <- ts
return
}
var relatedToActiveSync bool var relatedToActiveSync bool
for _, acts := range sm.activeSyncs { for _, acts := range sm.activeSyncs {
if ts.Equals(acts) { if ts.Equals(acts) {
@ -294,6 +317,12 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
return return
} }
if sm.getBootstrapState() == BSStateScheduled {
fmt.Println("received new head while bootstrapping...")
sm.syncQueue.Insert(ts)
return
}
if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) { if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) {
sm.nextSyncTarget.add(ts) sm.nextSyncTarget.add(ts)
} else { } else {
@ -307,6 +336,11 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
} }
func (sm *SyncManager) scheduleProcessResult(res *syncResult) { func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
fmt.Println("result!")
if res.success && sm.getBootstrapState() != BSStateComplete {
fmt.Println("finally out of bootstrapping")
sm.setBootstrapState(BSStateComplete)
}
delete(sm.activeSyncs, res.ts.Key()) delete(sm.activeSyncs, res.ts.Key())
relbucket := sm.activeSyncTips.PopRelated(res.ts) relbucket := sm.activeSyncTips.PopRelated(res.ts)
if relbucket != nil { if relbucket != nil {
@ -317,6 +351,7 @@ func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
} else { } else {
sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket) sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket)
} }
return
} else { } else {
// TODO: this is the case where we try to sync a chain, and // TODO: this is the case where we try to sync a chain, and
// fail, and we have more blocks on top of that chain that // fail, and we have more blocks on top of that chain that
@ -324,13 +359,21 @@ func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
// sync these? or just drop them? // sync these? or just drop them?
} }
} }
if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() {
next := sm.syncQueue.Pop()
if next != nil {
sm.nextSyncTarget = next
sm.workerChan = sm.syncTargets
}
}
} }
func (sm *SyncManager) scheduleWorkSent() { func (sm *SyncManager) scheduleWorkSent() {
hts := sm.nextSyncTarget.heaviestTipSet() hts := sm.nextSyncTarget.heaviestTipSet()
sm.activeSyncs[hts.Key()] = hts sm.activeSyncs[hts.Key()] = hts
if len(sm.syncQueue.buckets) > 0 { if !sm.syncQueue.Empty() {
sm.nextSyncTarget = sm.syncQueue.Pop() sm.nextSyncTarget = sm.syncQueue.Pop()
} else { } else {
sm.nextSyncTarget = nil sm.nextSyncTarget = nil
@ -373,8 +416,20 @@ func (sm *SyncManager) syncedPeerCount() int {
return count return count
} }
func (sm *SyncManager) IsBootstrapped() bool { func (sm *SyncManager) getBootstrapState() int {
sm.lk.Lock() sm.bssLk.Lock()
defer sm.lk.Unlock() defer sm.bssLk.Unlock()
return sm.bootstrapped return sm.bootstrapState
}
func (sm *SyncManager) setBootstrapState(v int) {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
sm.bootstrapState = v
}
func (sm *SyncManager) IsBootstrapped() bool {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
return sm.bootstrapState == BSStateComplete
} }