lotus/chain/sync_manager.go

362 lines
7.8 KiB
Go
Raw Normal View History

package chain
import (
"context"
2019-11-15 02:27:43 +00:00
"sort"
"sync"
"github.com/filecoin-project/lotus/chain/types"
peer "github.com/libp2p/go-libp2p-peer"
)
const BootstrapPeerThreshold = 2
type SyncFunc func(context.Context, *types.TipSet) error
type SyncManager struct {
lk sync.Mutex
peerHeads map[peer.ID]*types.TipSet
bootstrapped bool
bspThresh int
2019-11-15 21:35:29 +00:00
incomingTipSets chan *types.TipSet
syncTargets chan *types.TipSet
syncResults chan *syncResult
2019-11-16 01:05:16 +00:00
syncStates []*SyncerState
2019-11-15 02:27:43 +00:00
2019-11-16 01:05:16 +00:00
activeSyncs map[types.TipSetKey]*types.TipSet
2019-11-15 02:27:43 +00:00
doSync func(context.Context, *types.TipSet) error
2019-11-15 02:27:43 +00:00
stop chan struct{}
}
2019-11-15 21:35:29 +00:00
type syncResult struct {
ts *types.TipSet
success bool
}
const syncWorkerCount = 3
func NewSyncManager(sync SyncFunc) *SyncManager {
return &SyncManager{
2019-11-15 21:35:29 +00:00
bspThresh: 1,
peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet),
syncResults: make(chan *syncResult),
2019-11-16 01:05:16 +00:00
syncStates: make([]*SyncerState, syncWorkerCount),
2019-11-15 21:35:29 +00:00
incomingTipSets: make(chan *types.TipSet),
activeSyncs: make(map[types.TipSetKey]*types.TipSet),
doSync: sync,
stop: make(chan struct{}),
}
}
func (sm *SyncManager) Start() {
2019-11-15 21:35:29 +00:00
go sm.syncScheduler()
for i := 0; i < syncWorkerCount; i++ {
go sm.syncWorker(i)
}
}
2019-11-15 21:35:29 +00:00
func (sm *SyncManager) Stop() {
close(sm.stop)
}
2019-11-16 01:05:16 +00:00
func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
log.Info("set peer head!", ts.Height(), ts.Cids())
sm.lk.Lock()
defer sm.lk.Unlock()
sm.peerHeads[p] = ts
if !sm.bootstrapped {
spc := sm.syncedPeerCount()
if spc >= sm.bspThresh {
// Its go time!
2019-11-15 02:27:43 +00:00
target, err := sm.selectSyncTarget()
if err != nil {
log.Error("failed to select sync target: ", err)
return
}
2019-11-15 21:35:29 +00:00
sm.incomingTipSets <- target
// TODO: is this the right place to say we're bootstrapped? probably want to wait until the sync finishes
2019-11-15 02:27:43 +00:00
sm.bootstrapped = true
}
log.Infof("sync bootstrap has %d peers", spc)
return
}
2019-11-15 21:35:29 +00:00
sm.incomingTipSets <- ts
}
2019-11-15 02:27:43 +00:00
type syncBucketSet struct {
buckets []*syncTargetBucket
}
func (sbs *syncBucketSet) Insert(ts *types.TipSet) {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
b.add(ts)
return
}
}
sbs.buckets = append(sbs.buckets, &syncTargetBucket{
tips: []*types.TipSet{ts},
count: 1,
})
}
func (sbs *syncBucketSet) Pop() *syncTargetBucket {
var bestBuck *syncTargetBucket
var bestTs *types.TipSet
for _, b := range sbs.buckets {
hts := b.heaviestTipSet()
if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) {
bestBuck = b
bestTs = hts
}
}
2019-11-15 21:35:29 +00:00
sbs.removeBucket(bestBuck)
2019-11-15 02:27:43 +00:00
return bestBuck
}
2019-11-15 21:35:29 +00:00
func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1)
for _, b := range sbs.buckets {
if b != toremove {
nbuckets = append(nbuckets, b)
}
}
sbs.buckets = nbuckets
}
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
sbs.removeBucket(b)
return b
}
}
return nil
}
2019-11-15 02:27:43 +00:00
func (sbs *syncBucketSet) Heaviest() *types.TipSet {
// TODO: should also consider factoring in number of peers represented by each bucket here
var bestTs *types.TipSet
2019-11-15 21:35:29 +00:00
for _, b := range sbs.buckets {
2019-11-15 02:27:43 +00:00
bhts := b.heaviestTipSet()
if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) {
bestTs = bhts
}
}
return bestTs
}
type syncTargetBucket struct {
tips []*types.TipSet
count int
}
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
var stb syncTargetBucket
for _, ts := range tipsets {
stb.add(ts)
}
return &stb
}
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
for _, t := range stb.tips {
if ts.Equals(t) {
return true
}
if types.CidArrsEqual(ts.Cids(), t.Parents()) {
return true
}
if types.CidArrsEqual(ts.Parents(), t.Cids()) {
return true
}
}
return false
}
func (stb *syncTargetBucket) add(ts *types.TipSet) {
stb.count++
for _, t := range stb.tips {
if t.Equals(ts) {
return
}
}
stb.tips = append(stb.tips, ts)
}
func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
2019-11-15 21:35:29 +00:00
if stb == nil {
return nil
}
2019-11-15 02:27:43 +00:00
var best *types.TipSet
for _, ts := range stb.tips {
if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) {
best = ts
}
}
return best
}
2019-11-13 17:03:56 +00:00
func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
2019-11-15 02:27:43 +00:00
var buckets syncBucketSet
var peerHeads []*types.TipSet
for _, ts := range sm.peerHeads {
peerHeads = append(peerHeads, ts)
}
sort.Slice(peerHeads, func(i, j int) bool {
return peerHeads[i].Height() < peerHeads[j].Height()
})
for _, ts := range peerHeads {
buckets.Insert(ts)
}
if len(buckets.buckets) > 1 {
log.Warning("caution, multiple distinct chains seen during head selections")
// TODO: we *could* refuse to sync here without user intervention.
// For now, just select the best cluster
}
return buckets.Heaviest(), nil
}
func (sm *SyncManager) syncScheduler() {
var syncQueue syncBucketSet
2019-11-15 21:35:29 +00:00
var activeSyncTips syncBucketSet
2019-11-15 02:27:43 +00:00
var nextSyncTarget *syncTargetBucket
var workerChan chan *types.TipSet
for {
select {
case ts, ok := <-sm.incomingTipSets:
if !ok {
log.Info("shutting down sync scheduler")
return
}
var relatedToActiveSync bool
for _, acts := range sm.activeSyncs {
if ts.Equals(acts) {
break
}
if types.CidArrsEqual(ts.Parents(), acts.Cids()) {
// sync this next, after that sync process finishes
relatedToActiveSync = true
}
}
// if this is related to an active sync process, immediately bucket it
// we don't want to start a parallel sync process that duplicates work
if relatedToActiveSync {
2019-11-15 21:35:29 +00:00
log.Info("related to active sync")
activeSyncTips.Insert(ts)
continue
2019-11-15 02:27:43 +00:00
}
if nextSyncTarget != nil && nextSyncTarget.sameChainAs(ts) {
2019-11-15 21:35:29 +00:00
log.Info("new tipset is part of our next sync target")
2019-11-15 02:27:43 +00:00
nextSyncTarget.add(ts)
} else {
2019-11-15 21:35:29 +00:00
log.Info("insert into that queue!")
2019-11-15 02:27:43 +00:00
syncQueue.Insert(ts)
if nextSyncTarget == nil {
nextSyncTarget = syncQueue.Pop()
2019-11-15 21:35:29 +00:00
workerChan = sm.syncTargets
log.Info("setting next sync target")
}
}
case res := <-sm.syncResults:
delete(sm.activeSyncs, res.ts.Key())
relbucket := activeSyncTips.PopRelated(res.ts)
if relbucket != nil {
if res.success {
if nextSyncTarget == nil {
nextSyncTarget = relbucket
workerChan = sm.syncTargets
} else {
syncQueue.buckets = append(syncQueue.buckets, relbucket)
}
} else {
// TODO: this is the case where we try to sync a chain, and
// fail, and we have more blocks on top of that chain that
// have come in since. The question is, should we try to
// sync these? or just drop them?
2019-11-15 02:27:43 +00:00
}
}
case workerChan <- nextSyncTarget.heaviestTipSet():
2019-11-15 21:35:29 +00:00
hts := nextSyncTarget.heaviestTipSet()
sm.activeSyncs[hts.Key()] = hts
2019-11-15 02:27:43 +00:00
if len(syncQueue.buckets) > 0 {
nextSyncTarget = syncQueue.Pop()
} else {
2019-11-15 21:35:29 +00:00
nextSyncTarget = nil
2019-11-15 02:27:43 +00:00
workerChan = nil
}
case <-sm.stop:
log.Info("sync scheduler shutting down")
return
}
}
2019-11-13 17:03:56 +00:00
}
func (sm *SyncManager) syncWorker(id int) {
2019-11-16 01:05:16 +00:00
ss := &SyncerState{}
sm.syncStates[id] = ss
for {
select {
2019-11-15 21:35:29 +00:00
case ts, ok := <-sm.syncTargets:
if !ok {
log.Info("sync manager worker shutting down")
return
}
2019-11-16 01:05:16 +00:00
log.Info("sync worker go time!", ts.Height(), ts.Cids())
2019-11-16 01:05:16 +00:00
ctx := context.WithValue(context.TODO(), syncStateKey, ss)
err := sm.doSync(ctx, ts)
2019-11-15 21:35:29 +00:00
if err != nil {
log.Errorf("sync error: %+v", err)
}
2019-11-15 02:27:43 +00:00
2019-11-15 21:35:29 +00:00
sm.syncResults <- &syncResult{
ts: ts,
success: err == nil,
}
}
}
}
func (sm *SyncManager) syncedPeerCount() int {
var count int
for _, ts := range sm.peerHeads {
if ts.Height() > 0 {
count++
}
}
return count
}
func (sm *SyncManager) IsBootstrapped() bool {
sm.lk.Lock()
defer sm.lk.Unlock()
return sm.bootstrapped
}