lotus/chain/sync_manager.go
Steven Allen 5733c71c50 Lint everything
We were ignoring quite a few error cases, and had one case where we weren't
actually updating state where we wanted to. Unfortunately, if the linter doesn't
pass, nobody has any reason to actually check lint failures in CI.

There are three remaining XXXs marked in the code for lint.
2020-08-20 20:46:36 -07:00

427 lines
9.0 KiB
Go

package chain
import (
"context"
"sort"
"sync"
"github.com/filecoin-project/lotus/chain/types"
peer "github.com/libp2p/go-libp2p-core/peer"
)
const BootstrapPeerThreshold = 2
const (
BSStateInit = 0
BSStateSelected = 1
BSStateScheduled = 2
BSStateComplete = 3
)
type SyncFunc func(context.Context, *types.TipSet) error
type SyncManager struct {
lk sync.Mutex
peerHeads map[peer.ID]*types.TipSet
bssLk sync.Mutex
bootstrapState int
bspThresh int
incomingTipSets chan *types.TipSet
syncTargets chan *types.TipSet
syncResults chan *syncResult
syncStates []*SyncerState
// Normally this handler is set to `(*Syncer).Sync()`.
doSync func(context.Context, *types.TipSet) error
stop chan struct{}
// Sync Scheduler fields
activeSyncs map[types.TipSetKey]*types.TipSet
syncQueue syncBucketSet
activeSyncTips syncBucketSet
nextSyncTarget *syncTargetBucket
workerChan chan *types.TipSet
}
type syncResult struct {
ts *types.TipSet
success bool
}
const syncWorkerCount = 3
func NewSyncManager(sync SyncFunc) *SyncManager {
return &SyncManager{
bspThresh: 1,
peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet),
syncResults: make(chan *syncResult),
syncStates: make([]*SyncerState, syncWorkerCount),
incomingTipSets: make(chan *types.TipSet),
activeSyncs: make(map[types.TipSetKey]*types.TipSet),
doSync: sync,
stop: make(chan struct{}),
}
}
func (sm *SyncManager) Start() {
go sm.syncScheduler()
for i := 0; i < syncWorkerCount; i++ {
go sm.syncWorker(i)
}
}
func (sm *SyncManager) Stop() {
close(sm.stop)
}
func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.peerHeads[p] = ts
if sm.getBootstrapState() == BSStateInit {
spc := sm.syncedPeerCount()
if spc >= sm.bspThresh {
// Its go time!
target, err := sm.selectSyncTarget()
if err != nil {
log.Error("failed to select sync target: ", err)
return
}
sm.setBootstrapState(BSStateSelected)
sm.incomingTipSets <- target
}
log.Infof("sync bootstrap has %d peers", spc)
return
}
sm.incomingTipSets <- ts
}
type syncBucketSet struct {
buckets []*syncTargetBucket
}
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
var stb syncTargetBucket
for _, ts := range tipsets {
stb.add(ts)
}
return &stb
}
func (sbs *syncBucketSet) RelatedToAny(ts *types.TipSet) bool {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
return true
}
}
return false
}
func (sbs *syncBucketSet) Insert(ts *types.TipSet) {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
b.add(ts)
return
}
}
sbs.buckets = append(sbs.buckets, newSyncTargetBucket(ts))
}
func (sbs *syncBucketSet) Pop() *syncTargetBucket {
var bestBuck *syncTargetBucket
var bestTs *types.TipSet
for _, b := range sbs.buckets {
hts := b.heaviestTipSet()
if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) {
bestBuck = b
bestTs = hts
}
}
sbs.removeBucket(bestBuck)
return bestBuck
}
func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1)
for _, b := range sbs.buckets {
if b != toremove {
nbuckets = append(nbuckets, b)
}
}
sbs.buckets = nbuckets
}
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
sbs.removeBucket(b)
return b
}
}
return nil
}
func (sbs *syncBucketSet) Heaviest() *types.TipSet {
// TODO: should also consider factoring in number of peers represented by each bucket here
var bestTs *types.TipSet
for _, b := range sbs.buckets {
bhts := b.heaviestTipSet()
if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) {
bestTs = bhts
}
}
return bestTs
}
func (sbs *syncBucketSet) Empty() bool {
return len(sbs.buckets) == 0
}
type syncTargetBucket struct {
tips []*types.TipSet
count int
}
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
for _, t := range stb.tips {
if ts.Equals(t) {
return true
}
if ts.Key() == t.Parents() {
return true
}
if ts.Parents() == t.Key() {
return true
}
}
return false
}
func (stb *syncTargetBucket) add(ts *types.TipSet) {
stb.count++
for _, t := range stb.tips {
if t.Equals(ts) {
return
}
}
stb.tips = append(stb.tips, ts)
}
func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
if stb == nil {
return nil
}
var best *types.TipSet
for _, ts := range stb.tips {
if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) {
best = ts
}
}
return best
}
func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
var buckets syncBucketSet
var peerHeads []*types.TipSet
for _, ts := range sm.peerHeads {
peerHeads = append(peerHeads, ts)
}
sort.Slice(peerHeads, func(i, j int) bool {
return peerHeads[i].Height() < peerHeads[j].Height()
})
for _, ts := range peerHeads {
buckets.Insert(ts)
}
if len(buckets.buckets) > 1 {
log.Warn("caution, multiple distinct chains seen during head selections")
// TODO: we *could* refuse to sync here without user intervention.
// For now, just select the best cluster
}
return buckets.Heaviest(), nil
}
func (sm *SyncManager) syncScheduler() {
for {
select {
case ts, ok := <-sm.incomingTipSets:
if !ok {
log.Info("shutting down sync scheduler")
return
}
sm.scheduleIncoming(ts)
case res := <-sm.syncResults:
sm.scheduleProcessResult(res)
case sm.workerChan <- sm.nextSyncTarget.heaviestTipSet():
sm.scheduleWorkSent()
case <-sm.stop:
log.Info("sync scheduler shutting down")
return
}
}
}
func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
log.Debug("scheduling incoming tipset sync: ", ts.Cids())
if sm.getBootstrapState() == BSStateSelected {
sm.setBootstrapState(BSStateScheduled)
sm.syncTargets <- ts
return
}
var relatedToActiveSync bool
for _, acts := range sm.activeSyncs {
if ts.Equals(acts) {
break
}
if ts.Parents() == acts.Key() {
// sync this next, after that sync process finishes
relatedToActiveSync = true
}
}
if !relatedToActiveSync && sm.activeSyncTips.RelatedToAny(ts) {
relatedToActiveSync = true
}
// if this is related to an active sync process, immediately bucket it
// we don't want to start a parallel sync process that duplicates work
if relatedToActiveSync {
sm.activeSyncTips.Insert(ts)
return
}
if sm.getBootstrapState() == BSStateScheduled {
sm.syncQueue.Insert(ts)
return
}
if sm.nextSyncTarget != nil && sm.nextSyncTarget.sameChainAs(ts) {
sm.nextSyncTarget.add(ts)
} else {
sm.syncQueue.Insert(ts)
if sm.nextSyncTarget == nil {
sm.nextSyncTarget = sm.syncQueue.Pop()
sm.workerChan = sm.syncTargets
}
}
}
func (sm *SyncManager) scheduleProcessResult(res *syncResult) {
if res.success && sm.getBootstrapState() != BSStateComplete {
sm.setBootstrapState(BSStateComplete)
}
delete(sm.activeSyncs, res.ts.Key())
relbucket := sm.activeSyncTips.PopRelated(res.ts)
if relbucket != nil {
if res.success {
if sm.nextSyncTarget == nil {
sm.nextSyncTarget = relbucket
sm.workerChan = sm.syncTargets
} else {
sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket)
}
return
}
// TODO: this is the case where we try to sync a chain, and
// fail, and we have more blocks on top of that chain that
// have come in since. The question is, should we try to
// sync these? or just drop them?
log.Error("failed to sync chain but have new unconnected blocks from chain")
}
if sm.nextSyncTarget == nil && !sm.syncQueue.Empty() {
next := sm.syncQueue.Pop()
if next != nil {
sm.nextSyncTarget = next
sm.workerChan = sm.syncTargets
}
}
}
func (sm *SyncManager) scheduleWorkSent() {
hts := sm.nextSyncTarget.heaviestTipSet()
sm.activeSyncs[hts.Key()] = hts
if !sm.syncQueue.Empty() {
sm.nextSyncTarget = sm.syncQueue.Pop()
} else {
sm.nextSyncTarget = nil
sm.workerChan = nil
}
}
func (sm *SyncManager) syncWorker(id int) {
ss := &SyncerState{}
sm.syncStates[id] = ss
for {
select {
case ts, ok := <-sm.syncTargets:
if !ok {
log.Info("sync manager worker shutting down")
return
}
ctx := context.WithValue(context.TODO(), syncStateKey{}, ss)
err := sm.doSync(ctx, ts)
if err != nil {
log.Errorf("sync error: %+v", err)
}
sm.syncResults <- &syncResult{
ts: ts,
success: err == nil,
}
}
}
}
func (sm *SyncManager) syncedPeerCount() int {
var count int
for _, ts := range sm.peerHeads {
if ts.Height() > 0 {
count++
}
}
return count
}
func (sm *SyncManager) getBootstrapState() int {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
return sm.bootstrapState
}
func (sm *SyncManager) setBootstrapState(v int) {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
sm.bootstrapState = v
}
func (sm *SyncManager) IsBootstrapped() bool {
sm.bssLk.Lock()
defer sm.bssLk.Unlock()
return sm.bootstrapState == BSStateComplete
}