It works!

This commit is contained in:
whyrusleeping 2019-11-15 13:35:29 -08:00
parent 1d81c53f8f
commit 7aa76d21d1
5 changed files with 178 additions and 46 deletions

View File

@ -71,14 +71,25 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID)
return nil, err return nil, err
} }
return &Syncer{ s := &Syncer{
bad: NewBadBlockCache(), bad: NewBadBlockCache(),
Genesis: gent, Genesis: gent,
Bsync: bsync, Bsync: bsync,
store: sm.ChainStore(), store: sm.ChainStore(),
sm: sm, sm: sm,
self: self, self: self,
}, nil }
s.syncmgr = NewSyncManager(s.Sync)
return s, nil
}
func (syncer *Syncer) Start() {
syncer.syncmgr.Start()
}
func (syncer *Syncer) Stop() {
syncer.syncmgr.Stop()
} }
// InformNewHead informs the syncer about a new potential tipset // InformNewHead informs the syncer about a new potential tipset
@ -118,18 +129,20 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.Bsync.AddPeer(from) syncer.Bsync.AddPeer(from)
syncer.syncmgr.SetPeerHead(from, fts.TipSet()) syncer.syncmgr.SetPeerHead(from, fts.TipSet())
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight /*
targetWeight := fts.TipSet().Blocks()[0].ParentWeight bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
if targetWeight.LessThan(bestPweight) { targetWeight := fts.TipSet().Blocks()[0].ParentWeight
log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now") if targetWeight.LessThan(bestPweight) {
return log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now")
} return
go func() {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err)
} }
}()
go func() {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("sync error (curW=%s, targetW=%s): %+v", bestPweight, targetWeight, err)
}
}()
*/
} }
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
@ -748,9 +761,20 @@ func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig types.Signatur
return nil return nil
} }
const syncStateKey = "syncStateKey"
func extractSyncState(ctx context.Context) *SyncerState {
v := ctx.Value(syncStateKey)
if v != nil {
return v.(*SyncerState)
}
return nil
}
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "collectHeaders") ctx, span := trace.StartSpan(ctx, "collectHeaders")
defer span.End() defer span.End()
ss := extractSyncState(ctx)
span.AddAttributes( span.AddAttributes(
trace.Int64Attribute("fromHeight", int64(from.Height())), trace.Int64Attribute("fromHeight", int64(from.Height())),
@ -773,7 +797,7 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
// we want to sync all the blocks until the height above the block we have // we want to sync all the blocks until the height above the block we have
untilHeight := to.Height() + 1 untilHeight := to.Height() + 1
syncer.syncState.SetHeight(blockSet[len(blockSet)-1].Height()) ss.SetHeight(blockSet[len(blockSet)-1].Height())
var acceptedBlocks []cid.Cid var acceptedBlocks []cid.Cid
@ -841,7 +865,7 @@ loop:
acceptedBlocks = append(acceptedBlocks, at...) acceptedBlocks = append(acceptedBlocks, at...)
syncer.syncState.SetHeight(blks[len(blks)-1].Height()) ss.SetHeight(blks[len(blks)-1].Height())
at = blks[len(blks)-1].Parents() at = blks[len(blks)-1].Parents()
} }
@ -904,7 +928,8 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
} }
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error { func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
syncer.syncState.SetHeight(0) ss := extractSyncState(ctx)
ss.SetHeight(0)
return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error { return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error {
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids())) log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
@ -913,7 +938,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
return xerrors.Errorf("message processing failed: %w", err) return xerrors.Errorf("message processing failed: %w", err)
} }
syncer.syncState.SetHeight(fts.TipSet().Height()) ss.SetHeight(fts.TipSet().Height())
return nil return nil
}) })
@ -1008,8 +1033,9 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error {
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error { func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "collectChain") ctx, span := trace.StartSpan(ctx, "collectChain")
defer span.End() defer span.End()
ss := extractSyncState(ctx)
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts) ss.Init(syncer.store.GetHeaviestTipSet(), ts)
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet()) headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
if err != nil { if err != nil {
@ -1022,7 +1048,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids()) log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
} }
syncer.syncState.SetStage(api.StagePersistHeaders) ss.SetStage(api.StagePersistHeaders)
toPersist := make([]*types.BlockHeader, 0, len(headers)*build.BlocksPerEpoch) toPersist := make([]*types.BlockHeader, 0, len(headers)*build.BlocksPerEpoch)
for _, ts := range headers { for _, ts := range headers {
@ -1033,13 +1059,13 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
} }
toPersist = nil toPersist = nil
syncer.syncState.SetStage(api.StageMessages) ss.SetStage(api.StageMessages)
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil { if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
return xerrors.Errorf("collectChain syncMessages: %w", err) return xerrors.Errorf("collectChain syncMessages: %w", err)
} }
syncer.syncState.SetStage(api.StageSyncComplete) ss.SetStage(api.StageSyncComplete)
log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids())) log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids()))
return nil return nil
@ -1059,5 +1085,6 @@ func VerifyElectionProof(ctx context.Context, eproof []byte, rand []byte, worker
} }
func (syncer *Syncer) State() SyncerState { func (syncer *Syncer) State() SyncerState {
return syncer.syncState.Snapshot() panic("NYI")
//return syncer.syncState.Snapshot()
} }

View File

@ -20,11 +20,11 @@ type SyncManager struct {
bspThresh int bspThresh int
syncTargets chan *types.TipSet incomingTipSets chan *types.TipSet
syncTargets chan *types.TipSet
syncResults chan *syncResult
asLk sync.Mutex
activeSyncs map[types.TipSetKey]*types.TipSet activeSyncs map[types.TipSetKey]*types.TipSet
queuedSyncs map[types.TipSetKey]*types.TipSet
syncState SyncerState syncState SyncerState
@ -33,30 +33,48 @@ type SyncManager struct {
stop chan struct{} stop chan struct{}
} }
type syncResult struct {
ts *types.TipSet
success bool
}
const syncWorkerCount = 3
func NewSyncManager(sync SyncFunc) *SyncManager { func NewSyncManager(sync SyncFunc) *SyncManager {
return &SyncManager{ return &SyncManager{
peerHeads: make(map[peer.ID]*types.TipSet), bspThresh: 1,
syncTargets: make(chan *types.TipSet), peerHeads: make(map[peer.ID]*types.TipSet),
activeSyncs: make([]*types.TipSet, syncWorkerCount), syncTargets: make(chan *types.TipSet),
doSync: sync, syncResults: make(chan *syncResult),
stop: make(chan struct{}), incomingTipSets: make(chan *types.TipSet),
activeSyncs: make(map[types.TipSetKey]*types.TipSet),
doSync: sync,
stop: make(chan struct{}),
} }
} }
func (sm *SyncManager) Start() { func (sm *SyncManager) Start() {
go sm.syncScheduler()
for i := 0; i < syncWorkerCount; i++ { for i := 0; i < syncWorkerCount; i++ {
go sm.syncWorker(i) go sm.syncWorker(i)
} }
} }
func (sm *SyncManager) Stop() {
close(sm.stop)
}
func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) { func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) {
log.Info("set peer head!")
sm.lk.Lock() sm.lk.Lock()
defer sm.lk.Unlock() defer sm.lk.Unlock()
sm.peerHeads[p] = ts sm.peerHeads[p] = ts
if !sm.bootstrapped { if !sm.bootstrapped {
log.Info("not bootstrapped")
spc := sm.syncedPeerCount() spc := sm.syncedPeerCount()
if spc >= sm.bspThresh { if spc >= sm.bspThresh {
log.Info("go time!")
// Its go time! // Its go time!
target, err := sm.selectSyncTarget() target, err := sm.selectSyncTarget()
if err != nil { if err != nil {
@ -64,16 +82,15 @@ func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) {
return return
} }
sm.asLk.Lock() sm.incomingTipSets <- target
sm.activeSyncs[target.Key()] = target // TODO: is this the right place to say we're bootstrapped? probably want to wait until the sync finishes
sm.asLk.Unlock()
sm.syncTargets <- target
sm.bootstrapped = true sm.bootstrapped = true
} }
log.Infof("sync bootstrap has %d peers", spc) log.Infof("sync bootstrap has %d peers", spc)
return return
} }
sm.incomingTipSets <- ts
} }
type syncBucketSet struct { type syncBucketSet struct {
@ -103,14 +120,36 @@ func (sbs *syncBucketSet) Pop() *syncTargetBucket {
bestTs = hts bestTs = hts
} }
} }
nbuckets := make([]*syncTargetBucket, len(sbs.buckets)-1)
sbs.removeBucket(bestBuck)
return bestBuck return bestBuck
} }
func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
nbuckets := make([]*syncTargetBucket, 0, len(sbs.buckets)-1)
for _, b := range sbs.buckets {
if b != toremove {
nbuckets = append(nbuckets, b)
}
}
sbs.buckets = nbuckets
}
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
for _, b := range sbs.buckets {
if b.sameChainAs(ts) {
sbs.removeBucket(b)
return b
}
}
return nil
}
func (sbs *syncBucketSet) Heaviest() *types.TipSet { func (sbs *syncBucketSet) Heaviest() *types.TipSet {
// TODO: should also consider factoring in number of peers represented by each bucket here // TODO: should also consider factoring in number of peers represented by each bucket here
var bestTs *types.TipSet var bestTs *types.TipSet
for _, b := range buckets { for _, b := range sbs.buckets {
bhts := b.heaviestTipSet() bhts := b.heaviestTipSet()
if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) { if bestTs == nil || bhts.ParentWeight().GreaterThan(bestTs.ParentWeight()) {
bestTs = bhts bestTs = bhts
@ -160,6 +199,10 @@ func (stb *syncTargetBucket) add(ts *types.TipSet) {
} }
func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet { func (stb *syncTargetBucket) heaviestTipSet() *types.TipSet {
if stb == nil {
return nil
}
var best *types.TipSet var best *types.TipSet
for _, ts := range stb.tips { for _, ts := range stb.tips {
if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) { if best == nil || ts.ParentWeight().GreaterThan(best.ParentWeight()) {
@ -195,6 +238,7 @@ func (sm *SyncManager) selectSyncTarget() (*types.TipSet, error) {
func (sm *SyncManager) syncScheduler() { func (sm *SyncManager) syncScheduler() {
var syncQueue syncBucketSet var syncQueue syncBucketSet
var activeSyncTips syncBucketSet
var nextSyncTarget *syncTargetBucket var nextSyncTarget *syncTargetBucket
var workerChan chan *types.TipSet var workerChan chan *types.TipSet
@ -208,7 +252,6 @@ func (sm *SyncManager) syncScheduler() {
} }
var relatedToActiveSync bool var relatedToActiveSync bool
sm.asLk.Lock()
for _, acts := range sm.activeSyncs { for _, acts := range sm.activeSyncs {
if ts.Equals(acts) { if ts.Equals(acts) {
break break
@ -219,28 +262,54 @@ func (sm *SyncManager) syncScheduler() {
relatedToActiveSync = true relatedToActiveSync = true
} }
} }
sm.asLk.Unlock()
// if this is related to an active sync process, immediately bucket it // if this is related to an active sync process, immediately bucket it
// we don't want to start a parallel sync process that duplicates work // we don't want to start a parallel sync process that duplicates work
if relatedToActiveSync { if relatedToActiveSync {
syncQueue.Insert(ts) log.Info("related to active sync")
activeSyncTips.Insert(ts)
continue
} }
if nextSyncTarget != nil && nextSyncTarget.sameChainAs(ts) { if nextSyncTarget != nil && nextSyncTarget.sameChainAs(ts) {
log.Info("new tipset is part of our next sync target")
nextSyncTarget.add(ts) nextSyncTarget.add(ts)
} else { } else {
log.Info("insert into that queue!")
syncQueue.Insert(ts) syncQueue.Insert(ts)
if nextSyncTarget == nil { if nextSyncTarget == nil {
nextSyncTarget = syncQueue.Pop() nextSyncTarget = syncQueue.Pop()
workerChan = workerChanVal workerChan = sm.syncTargets
log.Info("setting next sync target")
}
}
case res := <-sm.syncResults:
delete(sm.activeSyncs, res.ts.Key())
relbucket := activeSyncTips.PopRelated(res.ts)
if relbucket != nil {
if res.success {
if nextSyncTarget == nil {
nextSyncTarget = relbucket
workerChan = sm.syncTargets
} else {
syncQueue.buckets = append(syncQueue.buckets, relbucket)
}
} else {
// TODO: this is the case where we try to sync a chain, and
// fail, and we have more blocks on top of that chain that
// have come in since. The question is, should we try to
// sync these? or just drop them?
} }
} }
case workerChan <- nextSyncTarget.heaviestTipSet(): case workerChan <- nextSyncTarget.heaviestTipSet():
hts := nextSyncTarget.heaviestTipSet()
sm.activeSyncs[hts.Key()] = hts
if len(syncQueue.buckets) > 0 { if len(syncQueue.buckets) > 0 {
nextSyncTarget = syncQueue.Pop() nextSyncTarget = syncQueue.Pop()
} else { } else {
nextSyncTarget = nil
workerChan = nil workerChan = nil
} }
case <-sm.stop: case <-sm.stop:
@ -253,19 +322,22 @@ func (sm *SyncManager) syncScheduler() {
func (sm *SyncManager) syncWorker(id int) { func (sm *SyncManager) syncWorker(id int) {
for { for {
select { select {
case ts, ok := sm.syncTargets: case ts, ok := <-sm.syncTargets:
if !ok { if !ok {
log.Info("sync manager worker shutting down") log.Info("sync manager worker shutting down")
return return
} }
log.Info("sync worker go time!", ts.Cids())
if err := sm.doSync(context.TODO(), ts); err != nil { err := sm.doSync(context.TODO(), ts)
if err != nil {
log.Errorf("sync error: %+v", err) log.Errorf("sync error: %+v", err)
} }
sm.asLk.Lock() sm.syncResults <- &syncResult{
delete(sm.activeSyncs, ts.Key()) ts: ts,
sm.asLk.Unlock() success: err == nil,
}
} }
} }
} }

View File

@ -32,12 +32,20 @@ type SyncerState struct {
} }
func (ss *SyncerState) SetStage(v api.SyncStateStage) { func (ss *SyncerState) SetStage(v api.SyncStateStage) {
if ss == nil {
return
}
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Stage = v ss.Stage = v
} }
func (ss *SyncerState) Init(base, target *types.TipSet) { func (ss *SyncerState) Init(base, target *types.TipSet) {
if ss == nil {
return
}
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Target = target ss.Target = target
@ -47,6 +55,10 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {
} }
func (ss *SyncerState) SetHeight(h uint64) { func (ss *SyncerState) SetHeight(h uint64) {
if ss == nil {
return
}
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
ss.Height = h ss.Height = h

View File

@ -200,7 +200,7 @@ func Online() Option {
Override(new(dtypes.ClientDAG), testing.MemoryClientDag), Override(new(dtypes.ClientDAG), testing.MemoryClientDag),
// Filecoin services // Filecoin services
Override(new(*chain.Syncer), chain.NewSyncer), Override(new(*chain.Syncer), modules.NewSyncer),
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
Override(new(*chain.MessagePool), modules.MessagePool), Override(new(*chain.MessagePool), modules.MessagePool),

View File

@ -12,11 +12,13 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
peer "github.com/libp2p/go-libp2p-peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/blocksync"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -119,3 +121,22 @@ func SetGenesis(cs *store.ChainStore, g Genesis) error {
return cs.SetGenesis(genesis) return cs.SetGenesis(genesis)
} }
func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*chain.Syncer, error) {
syncer, err := chain.NewSyncer(sm, bsync, self)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
syncer.Start()
return nil
},
OnStop: func(_ context.Context) error {
syncer.Stop()
return nil
},
})
return syncer, nil
}