WIP: implement chain sync manager and concurrent sync

This commit is contained in:
whyrusleeping 2019-11-12 21:51:36 -08:00
parent 52a05c3d0e
commit 9e4b3ae88a
2 changed files with 90 additions and 11 deletions

View File

@ -47,8 +47,6 @@ type Syncer struct {
// The known Genesis tipset
Genesis *types.TipSet
syncLock sync.Mutex
// TipSets known to be invalid
bad *BadBlockCache
@ -57,12 +55,10 @@ type Syncer struct {
self peer.ID
syncLock sync.Mutex
syncState SyncerState
// peer heads
// Note: clear cache on disconnects
peerHeads map[peer.ID]*types.TipSet
peerHeadsLk sync.Mutex
syncmgr *SyncManager
}
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) {
@ -87,8 +83,6 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID)
}, nil
}
const BootstrapPeerThreshold = 1
// InformNewHead informs the syncer about a new potential tipset
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
@ -123,10 +117,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
return
}
syncer.peerHeadsLk.Lock()
syncer.peerHeads[from] = fts.TipSet()
syncer.peerHeadsLk.Unlock()
syncer.Bsync.AddPeer(from)
syncer.syncmgr.SetPeerHead(from, fts.TipSet())
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
targetWeight := fts.TipSet().Blocks()[0].ParentWeight

87
chain/sync_manager.go Normal file
View File

@ -0,0 +1,87 @@
package chain
import (
"context"
"sync"
"github.com/filecoin-project/lotus/chain/types"
peer "github.com/libp2p/go-libp2p-peer"
)
const BootstrapPeerThreshold = 2
type SyncFunc func(context.Context, *types.TipSet) error
type SyncManager struct {
lk sync.Mutex
peerHeads map[peer.ID]*types.TipSet
bootstrapped bool
bspThresh int
syncTargets chan *types.TipSet
doSync func(context.Context, *types.TipSet) error
}
func NewSyncManager(sync SyncFunc) *SyncManager {
return &SyncManager{
peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet),
doSync: sync,
}
}
func (sm *SyncManager) Start() {
for i := 0; i < syncWorkerCount; i++ {
go sm.syncWorker(i)
}
}
func (sm *SyncManager) SetPeerHead(p peer.ID, ts *types.TipSet) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.peerHeads[p] = ts
if !sm.bootstrapped {
spc := sm.syncedPeerCount()
if spc >= sm.bspThresh {
// Its go time!
}
log.Infof("sync bootstrap has %d peers", spc)
return
}
}
func (sm *SyncManager) syncWorker(id int) {
for {
select {
case ts, ok := sm.syncTargets:
if !ok {
log.Info("sync manager worker shutting down")
return
}
if err := sm.doSync(context.TODO(), ts); err != nil {
log.Errorf("sync error: %+v", err)
}
}
}
}
func (sm *SyncManager) syncedPeerCount() int {
var count int
for _, ts := range sm.peerHeads {
if ts.Height() > 0 {
count++
}
}
return count
}
func (sm *SyncManager) IsBootstrapped() bool {
sm.lk.Lock()
defer sm.lk.Unlock()
return sm.bootstrapped
}