Merge pull request #872 from filecoin-project/fix/incoming-tipset-bucketing
fix incoming tipset bucketing
This commit is contained in:
commit
f77fe9ed89
@ -33,6 +33,9 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
||||
}
|
||||
|
||||
go func() {
|
||||
log.Infof("New block over pubsub: %s", blk.Cid())
|
||||
|
||||
start := time.Now()
|
||||
log.Debug("about to fetch messages for block from pubsub")
|
||||
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
|
||||
if err != nil {
|
||||
@ -46,7 +49,8 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugw("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom())
|
||||
took := time.Since(start)
|
||||
log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom(), "msgfetch", took)
|
||||
if delay := time.Now().Unix() - int64(blk.Header.Timestamp); delay > 5 {
|
||||
log.Warnf("Received block with large delay %d from miner %s", delay, blk.Header.Miner)
|
||||
}
|
||||
|
@ -362,6 +362,7 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
|
||||
}
|
||||
|
||||
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
||||
log.Info("SYNC TIME: ", maybeHead.Cids())
|
||||
ctx, span := trace.StartSpan(ctx, "chain.Sync")
|
||||
defer span.End()
|
||||
|
||||
|
@ -203,9 +203,6 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
||||
if types.CidArrsEqual(ts.Parents(), t.Cids()) {
|
||||
return true
|
||||
}
|
||||
if types.CidArrsEqual(ts.Parents(), t.Parents()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -283,6 +280,7 @@ func (sm *SyncManager) syncScheduler() {
|
||||
}
|
||||
|
||||
func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
|
||||
log.Info("scheduling incoming tipset sync: ", ts.Cids())
|
||||
if sm.getBootstrapState() == BSStateSelected {
|
||||
sm.setBootstrapState(BSStateScheduled)
|
||||
sm.syncTargets <- ts
|
||||
@ -295,7 +293,7 @@ func (sm *SyncManager) scheduleIncoming(ts *types.TipSet) {
|
||||
break
|
||||
}
|
||||
|
||||
if types.CidArrsEqual(ts.Parents(), acts.Cids()) || types.CidArrsEqual(ts.Parents(), acts.Parents()) {
|
||||
if types.CidArrsEqual(ts.Parents(), acts.Cids()) {
|
||||
// sync this next, after that sync process finishes
|
||||
relatedToActiveSync = true
|
||||
}
|
||||
|
@ -74,6 +74,7 @@ func TestSyncManager(t *testing.T) {
|
||||
b := mock.TipSet(mock.MkBlock(a, 1, 2))
|
||||
c1 := mock.TipSet(mock.MkBlock(b, 1, 3))
|
||||
c2 := mock.TipSet(mock.MkBlock(b, 2, 4))
|
||||
c3 := mock.TipSet(mock.MkBlock(b, 3, 5))
|
||||
d := mock.TipSet(mock.MkBlock(c1, 4, 5))
|
||||
|
||||
runSyncMgrTest(t, "testBootstrap", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
|
||||
@ -120,4 +121,30 @@ func TestSyncManager(t *testing.T) {
|
||||
|
||||
assertGetSyncOp(t, stc, d)
|
||||
})
|
||||
|
||||
runSyncMgrTest(t, "testSyncIncomingTipset", 1, func(t *testing.T, sm *SyncManager, stc chan *syncOp) {
|
||||
sm.SetPeerHead(ctx, "peer1", a)
|
||||
assertGetSyncOp(t, stc, a)
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", b)
|
||||
op := <-stc
|
||||
op.done()
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", c1)
|
||||
op1 := <-stc
|
||||
fmt.Println("op1: ", op1.ts.Cids())
|
||||
|
||||
sm.SetPeerHead(ctx, "peer2", c2)
|
||||
sm.SetPeerHead(ctx, "peer2", c3)
|
||||
|
||||
op1.done()
|
||||
|
||||
op2 := <-stc
|
||||
fmt.Println("op2: ", op2.ts.Cids())
|
||||
op2.done()
|
||||
|
||||
op3 := <-stc
|
||||
fmt.Println("op3: ", op3.ts.Cids())
|
||||
op3.done()
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user