lotus/chain/sync_manager.go

691 lines
17 KiB
Go

package chain
import (
"context"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
peer "github.com/libp2p/go-libp2p-core/peer"
)
var (
BootstrapPeerThreshold = 4
RecentSyncBufferSize = 10
MaxSyncWorkers = 5
SyncWorkerHistory = 3
InitialSyncTimeThreshold = 15 * time.Minute
coalesceTipsets = false
)
func init() {
coalesceTipsets = os.Getenv("LOTUS_SYNC_FORMTS_PEND") == "yes"
if bootstrapPeerThreshold := os.Getenv("LOTUS_SYNC_BOOTSTRAP_PEERS"); bootstrapPeerThreshold != "" {
threshold, err := strconv.Atoi(bootstrapPeerThreshold)
if err != nil {
log.Errorf("failed to parse 'LOTUS_SYNC_BOOTSTRAP_PEERS' env var: %s", err)
} else {
BootstrapPeerThreshold = threshold
}
}
}
type SyncFunc func(context.Context, *types.TipSet) error
// SyncManager manages the chain synchronization process, both at bootstrap time
// and during ongoing operation.
//
// It receives candidate chain heads in the form of tipsets from peers,
// and schedules them onto sync workers, deduplicating processing for
// already-active syncs.
type SyncManager interface {
// Start starts the SyncManager.
Start()
// Stop stops the SyncManager.
Stop()
// SetPeerHead informs the SyncManager that the supplied peer reported the
// supplied tipset.
SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet)
// State retrieves the state of the sync workers.
State() []SyncerStateSnapshot
}
type syncManager struct {
ctx context.Context
cancel func()
workq chan peerHead
statusq chan workerStatus
nextWorker uint64
pend syncBucketSet
deferred syncBucketSet
heads map[peer.ID]*types.TipSet
recent *syncBuffer
initialSyncDone bool
mx sync.Mutex
state map[uint64]*workerState
history []*workerState
historyI int
doSync func(context.Context, *types.TipSet) error
}
var _ SyncManager = (*syncManager)(nil)
type peerHead struct {
p peer.ID
ts *types.TipSet
}
type workerState struct {
id uint64
ts *types.TipSet
ss *SyncerState
dt time.Duration
}
type workerStatus struct {
id uint64
err error
}
// sync manager interface
func NewSyncManager(sync SyncFunc) SyncManager {
ctx, cancel := context.WithCancel(context.Background())
return &syncManager{
ctx: ctx,
cancel: cancel,
workq: make(chan peerHead),
statusq: make(chan workerStatus),
heads: make(map[peer.ID]*types.TipSet),
state: make(map[uint64]*workerState),
recent: newSyncBuffer(RecentSyncBufferSize),
history: make([]*workerState, SyncWorkerHistory),
doSync: sync,
}
}
func (sm *syncManager) Start() {
go sm.scheduler()
}
func (sm *syncManager) Stop() {
select {
case <-sm.ctx.Done():
default:
sm.cancel()
}
}
func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
select {
case sm.workq <- peerHead{p: p, ts: ts}:
case <-sm.ctx.Done():
case <-ctx.Done():
}
}
func (sm *syncManager) State() []SyncerStateSnapshot {
sm.mx.Lock()
workerStates := make([]*workerState, 0, len(sm.state)+len(sm.history))
for _, ws := range sm.state {
workerStates = append(workerStates, ws)
}
for _, ws := range sm.history {
if ws != nil {
workerStates = append(workerStates, ws)
}
}
sm.mx.Unlock()
sort.Slice(workerStates, func(i, j int) bool {
return workerStates[i].id < workerStates[j].id
})
result := make([]SyncerStateSnapshot, 0, len(workerStates))
for _, ws := range workerStates {
result = append(result, ws.ss.Snapshot())
}
return result
}
// sync manager internals
func (sm *syncManager) scheduler() {
ticker := time.NewTicker(time.Minute)
tickerC := ticker.C
for {
select {
case head := <-sm.workq:
sm.handlePeerHead(head)
case status := <-sm.statusq:
sm.handleWorkerStatus(status)
case <-tickerC:
if sm.initialSyncDone {
ticker.Stop()
tickerC = nil
sm.handleInitialSyncDone()
}
case <-sm.ctx.Done():
return
}
}
}
func (sm *syncManager) handlePeerHead(head peerHead) {
log.Infof("new peer head: %s %s", head.p, head.ts)
// have we started syncing yet?
if sm.nextWorker == 0 {
// track the peer head until we start syncing
sm.heads[head.p] = head.ts
// not yet; do we have enough peers?
if len(sm.heads) < BootstrapPeerThreshold {
// not enough peers; track it and wait
return
}
// we are ready to start syncing; select the sync target and spawn a worker
target, err := sm.selectInitialSyncTarget()
if err != nil {
log.Errorf("failed to select initial sync target: %s", err)
return
}
log.Infof("selected initial sync target: %s", target)
sm.spawnWorker(target)
return
}
// we have started syncing, add peer head to the queue if applicable and maybe spawn a worker
// if there is work to do (possibly in a fork)
target, work, err := sm.addSyncTarget(head.ts)
if err != nil {
log.Warnf("failed to add sync target: %s", err)
return
}
if work {
log.Infof("selected sync target: %s", target)
sm.spawnWorker(target)
}
}
func (sm *syncManager) handleWorkerStatus(status workerStatus) {
log.Debugf("worker %d done; status error: %s", status.id, status.err)
sm.mx.Lock()
ws := sm.state[status.id]
delete(sm.state, status.id)
// we track the last few workers for debug purposes
sm.history[sm.historyI] = ws
sm.historyI++
sm.historyI %= len(sm.history)
sm.mx.Unlock()
if status.err != nil {
// we failed to sync this target -- log it and try to work on an extended chain
// if there is nothing related to be worked on, we stop working on this chain.
log.Errorf("error during sync in %s: %s", ws.ts, status.err)
} else {
// add to the recently synced buffer
sm.recent.Push(ws.ts)
// if we are still in initial sync and this was fast enough, mark the end of the initial sync
if !sm.initialSyncDone && ws.dt < InitialSyncTimeThreshold {
sm.initialSyncDone = true
}
}
// we are done with this target, select the next sync target and spawn a worker if there is work
// to do, because of an extension of this chain.
target, work, err := sm.selectSyncTarget(ws.ts)
if err != nil {
log.Warnf("failed to select sync target: %s", err)
return
}
if work {
log.Infof("selected sync target: %s", target)
sm.spawnWorker(target)
}
}
func (sm *syncManager) handleInitialSyncDone() {
// we have just finished the initial sync; spawn some additional workers in deferred syncs
// as needed (and up to MaxSyncWorkers) to ramp up chain sync
for len(sm.state) < MaxSyncWorkers {
target, work, err := sm.selectDeferredSyncTarget()
if err != nil {
log.Errorf("error selecting deferred sync target: %s", err)
return
}
if !work {
return
}
log.Infof("selected deferred sync target: %s", target)
sm.spawnWorker(target)
}
}
func (sm *syncManager) spawnWorker(target *types.TipSet) {
id := sm.nextWorker
sm.nextWorker++
ws := &workerState{
id: id,
ts: target,
ss: new(SyncerState),
}
ws.ss.data.WorkerID = id
sm.mx.Lock()
sm.state[id] = ws
sm.mx.Unlock()
go sm.worker(ws)
}
func (sm *syncManager) worker(ws *workerState) {
log.Infof("worker %d syncing in %s", ws.id, ws.ts)
start := build.Clock.Now()
ctx := context.WithValue(sm.ctx, syncStateKey{}, ws.ss)
err := sm.doSync(ctx, ws.ts)
ws.dt = build.Clock.Since(start)
log.Infof("worker %d done; took %s", ws.id, ws.dt)
select {
case sm.statusq <- workerStatus{id: ws.id, err: err}:
case <-sm.ctx.Done():
}
}
// selects the initial sync target by examining known peer heads; only called once for the initial
// sync.
func (sm *syncManager) selectInitialSyncTarget() (*types.TipSet, error) {
var buckets syncBucketSet
var peerHeads []*types.TipSet
for _, ts := range sm.heads {
peerHeads = append(peerHeads, ts)
}
// clear the map, we don't use it any longer
sm.heads = nil
sort.Slice(peerHeads, func(i, j int) bool {
return peerHeads[i].Height() < peerHeads[j].Height()
})
for _, ts := range peerHeads {
buckets.Insert(ts)
}
if len(buckets.buckets) > 1 {
log.Warn("caution, multiple distinct chains seen during head selections")
// TODO: we *could* refuse to sync here without user intervention.
// For now, just select the best cluster
}
return buckets.Heaviest(), nil
}
// adds a tipset to the potential sync targets; returns true if there is a a tipset to work on.
// this could be either a restart, eg because there is no currently scheduled sync work or a worker
// failed or a potential fork.
func (sm *syncManager) addSyncTarget(ts *types.TipSet) (*types.TipSet, bool, error) {
// Note: we don't need the state lock here to access the active worker states, as the only
// competing threads that may access it do so through State() which is read only.
// if we have recently synced this or any heavier tipset we just ignore it; this can happen
// with an empty worker set after we just finished syncing to a target
if sm.recent.Synced(ts) {
return nil, false, nil
}
// if the worker set is empty, we have finished syncing and were waiting for the next tipset
// in this case, we just return the tipset as work to be done
if len(sm.state) == 0 {
return ts, true, nil
}
// check if it is related to any active sync; if so insert into the pending sync queue
for _, ws := range sm.state {
if ts.Equals(ws.ts) {
// ignore it, we are already syncing it
return nil, false, nil
}
if ts.Parents() == ws.ts.Key() {
// schedule for syncing next; it's an extension of an active sync
sm.pend.Insert(ts)
return nil, false, nil
}
}
// check to see if it is related to any pending sync; if so insert it into the pending sync queue
if sm.pend.RelatedToAny(ts) {
sm.pend.Insert(ts)
return nil, false, nil
}
// it's not related to any active or pending sync; this could be a fork in which case we
// start a new worker to sync it, if it is *heavier* than any active or pending set;
// if it is not, we ignore it.
for _, ws := range sm.state {
if isHeavier(ws.ts, ts) {
return nil, false, nil
}
}
pendHeaviest := sm.pend.Heaviest()
if pendHeaviest != nil && isHeavier(pendHeaviest, ts) {
return nil, false, nil
}
// if we have not finished the initial sync or have too many workers, add it to the deferred queue;
// it will be processed once a worker is freed from syncing a chain (or the initial sync finishes)
if !sm.initialSyncDone || len(sm.state) >= MaxSyncWorkers {
log.Infof("deferring sync on %s", ts)
sm.deferred.Insert(ts)
return nil, false, nil
}
// start a new worker, seems heavy enough and unrelated to active or pending syncs
return ts, true, nil
}
// selects the next sync target after a worker sync has finished; returns true and a target
// TipSet if this chain should continue to sync because there is a heavier related tipset.
func (sm *syncManager) selectSyncTarget(done *types.TipSet) (*types.TipSet, bool, error) {
// we pop the related bucket and if there is any related tipset, we work on the heaviest one next
// if we are not already working on a heavier tipset
related := sm.pend.PopRelated(done)
if related == nil {
return sm.selectDeferredSyncTarget()
}
heaviest := related.heaviestTipSet()
if isHeavier(done, heaviest) {
return sm.selectDeferredSyncTarget()
}
for _, ws := range sm.state {
if isHeavier(ws.ts, heaviest) {
return sm.selectDeferredSyncTarget()
}
}
if sm.recent.Synced(heaviest) {
return sm.selectDeferredSyncTarget()
}
return heaviest, true, nil
}
// selects a deferred sync target if there is any; these are sync targets that were not related to
// active syncs and were deferred because there were too many workers running
func (sm *syncManager) selectDeferredSyncTarget() (*types.TipSet, bool, error) {
deferredLoop:
for !sm.deferred.Empty() {
bucket := sm.deferred.Pop()
heaviest := bucket.heaviestTipSet()
if sm.recent.Synced(heaviest) {
// we have synced it or something heavier recently, skip it
continue deferredLoop
}
if sm.pend.RelatedToAny(heaviest) {
// this has converged to a pending sync, insert it to the pending queue
sm.pend.Insert(heaviest)
continue deferredLoop
}
for _, ws := range sm.state {
if ws.ts.Equals(heaviest) || isHeavier(ws.ts, heaviest) {
// we have converged and are already syncing it or we are syncing on something heavier
// ignore it and pop the next deferred bucket
continue deferredLoop
}
if heaviest.Parents() == ws.ts.Key() {
// we have converged and we are syncing its parent; insert it to the pending queue
sm.pend.Insert(heaviest)
continue deferredLoop
}
// it's not related to any active or pending sync and this worker is free, so sync it!
return heaviest, true, nil
}
}
return nil, false, nil
}
func isHeavier(a, b *types.TipSet) bool {
return a.ParentWeight().GreaterThan(b.ParentWeight())
}
// sync buffer -- this is a circular buffer of recently synced tipsets
type syncBuffer struct {
buf []*types.TipSet
next int
}
func newSyncBuffer(size int) *syncBuffer {
return &syncBuffer{buf: make([]*types.TipSet, size)}
}
func (sb *syncBuffer) Push(ts *types.TipSet) {
sb.buf[sb.next] = ts
sb.next++
sb.next %= len(sb.buf)
}
func (sb *syncBuffer) Synced(ts *types.TipSet) bool {
for _, rts := range sb.buf {
if rts != nil && (rts.Equals(ts) || isHeavier(rts, ts)) {
return true
}
}
return false
}
// sync buckets and related utilities
type syncBucketSet struct {
buckets []*syncTargetBucket
}
type syncTargetBucket struct {
tips []*types.TipSet
}
func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
var stb syncTargetBucket
for _, ts := range tipsets {
stb.add(ts)
}
return &stb
}
func (sbs *syncBucketSet) String() string {
var bStrings []string
for _, b := range sbs.buckets {
var tsStrings []string
for _, t := range b.tips {
tsStrings = append(tsStrings, t.String())
}
bStrings = append(bStrings, "["+strings.Join(tsStrings, ",")+"]")
}
return "{" + strings.Join(bStrings, ";") + "}"
}
func (sbs *syncBucketSet) RelatedToAny(ts *types.TipSet) bool {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
return true
}
}
return false
}
func (sbs *syncBucketSet) Insert(ts *types.TipSet) {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
b.add(ts)
return
}
}
sbs.buckets = append(sbs.buckets, newSyncTargetBucket(ts))
}
func (sbs *syncBucketSet) Pop() *syncTargetBucket {
var bestBuck *syncTargetBucket
var bestTs *types.TipSet
for _, b := range sbs.buckets {
hts := b.heaviestTipSet()
if bestBuck == nil || bestTs.ParentWeight().LessThan(hts.ParentWeight()) {
bestBuck = b
bestTs = hts
}
}
sbs.removeBucket(bestBuck)
return bestBuck
}
func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1)
for _, b := range sbs.buckets {
if b != toremove {
nbuckets = append(nbuckets, b)
}
}
sbs.buckets = nbuckets
}
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
var bOut *syncTargetBucket
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
sbs.removeBucket(b)
if bOut == nil {
bOut = &syncTargetBucket{}
}
bOut.tips = append(bOut.tips, b.tips...)
}
}
return bOut
}
func (sbs *syncBucketSet) Heaviest() *types.TipSet {
// TODO: should also consider factoring in number of peers represented by each bucket here
var bestTs *types.TipSet
for _, b := range sbs.buckets {
bhts := b.heaviestTipSet()
if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) {
bestTs = bhts
}
}
return bestTs
}
func (sbs *syncBucketSet) Empty() bool {
return len(sbs.buckets) == 0
}
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
for _, t := range stb.tips {
if ts.Equals(t) {
return true
}
if ts.Key() == t.Parents() {
return true
}
if ts.Parents() == t.Key() {
return true
}
}
return false
}
func (stb *syncTargetBucket) add(ts *types.TipSet) {
for i, t := range stb.tips {
if t.Equals(ts) {
return
}
if coalesceTipsets && t.Height() == ts.Height() &&
types.CidArrsEqual(t.Blocks()[0].Parents, ts.Blocks()[0].Parents) {
miners := make(map[address.Address]struct{})
newTs := []*types.BlockHeader{}
for _, b := range t.Blocks() {
_, have := miners[b.Miner]
if !have {
newTs = append(newTs, b)
miners[b.Miner] = struct{}{}
}
}
for _, b := range ts.Blocks() {
_, have := miners[b.Miner]
if !have {
newTs = append(newTs, b)
miners[b.Miner] = struct{}{}
}
}
ts2, err := types.NewTipSet(newTs)
if err != nil {
log.Warnf("error while trying to recombine a tipset in a bucket: %+v", err)
continue
}
stb.tips[i] = ts2
return
}
}
stb.tips = append(stb.tips, ts)
}
func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
if stb == nil {
return nil
}
var best *types.TipSet
for _, ts := range stb.tips {
if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) {
best = ts
}
}
return best
}