PopRelated should pop all
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
24fc7d4cbd
commit
810feee5a1
@ -5,7 +5,6 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||||
@ -212,16 +211,12 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
|
|||||||
sbs.buckets = nbuckets
|
sbs.buckets = nbuckets
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
|
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) []*syncTargetBucket {
|
||||||
var bOut *syncTargetBucket
|
var bOut []*syncTargetBucket
|
||||||
for _, b := range sbs.buckets {
|
for _, b := range sbs.buckets {
|
||||||
if b.sameChainAs(ts) {
|
if b.sameChainAs(ts) {
|
||||||
if bOut == nil {
|
|
||||||
sbs.removeBucket(b)
|
sbs.removeBucket(b)
|
||||||
bOut = b
|
bOut = append(bOut, b)
|
||||||
} else {
|
|
||||||
log.Errorf("REPORT THIS more that one related bucket for %s", ts)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return bOut
|
return bOut
|
||||||
@ -312,8 +307,6 @@ func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *syncManager) syncScheduler() {
|
func (sm *syncManager) syncScheduler() {
|
||||||
t := time.NewTicker(10 * time.Second)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ts, ok := <-sm.incomingTipSets:
|
case ts, ok := <-sm.incomingTipSets:
|
||||||
@ -330,16 +323,6 @@ func (sm *syncManager) syncScheduler() {
|
|||||||
case <-sm.stop:
|
case <-sm.stop:
|
||||||
log.Info("sync scheduler shutting down")
|
log.Info("sync scheduler shutting down")
|
||||||
return
|
return
|
||||||
case <-t.C:
|
|
||||||
activeSyncs := make([]types.TipSetKey, 0, len(sm.activeSyncs))
|
|
||||||
for tsk := range sm.activeSyncs {
|
|
||||||
activeSyncs = append(activeSyncs, tsk)
|
|
||||||
}
|
|
||||||
sort.Slice(activeSyncs, func(i, j int) bool {
|
|
||||||
return string(activeSyncs[i].Bytes()) < string(activeSyncs[j].Bytes())
|
|
||||||
})
|
|
||||||
|
|
||||||
log.Infof("activeSyncs: %v, activeSyncTips: %s ", activeSyncs, sm.activeSyncTips.String())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -399,14 +382,17 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
delete(sm.activeSyncs, res.ts.Key())
|
delete(sm.activeSyncs, res.ts.Key())
|
||||||
relbucket := sm.activeSyncTips.PopRelated(res.ts)
|
relbuckets := sm.activeSyncTips.PopRelated(res.ts)
|
||||||
if relbucket != nil {
|
if len(relbuckets) != 0 {
|
||||||
if res.success {
|
if res.success {
|
||||||
if sm.nextSyncTarget == nil {
|
if sm.nextSyncTarget == nil {
|
||||||
sm.nextSyncTarget = relbucket
|
sm.nextSyncTarget = relbuckets[0]
|
||||||
sm.workerChan = sm.syncTargets
|
sm.workerChan = sm.syncTargets
|
||||||
} else {
|
relbuckets = relbuckets[1:]
|
||||||
for _, t := range relbucket.tips {
|
}
|
||||||
|
|
||||||
|
for _, b := range relbuckets {
|
||||||
|
for _, t := range b.tips {
|
||||||
sm.syncQueue.Insert(t)
|
sm.syncQueue.Insert(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,12 +80,14 @@ func TestSyncManagerEdgeCase(t *testing.T) {
|
|||||||
t.Logf("c1: %s", c1)
|
t.Logf("c1: %s", c1)
|
||||||
c2 := mock.TipSet(mock.MkBlock(b2, 1, 5))
|
c2 := mock.TipSet(mock.MkBlock(b2, 1, 5))
|
||||||
t.Logf("c2: %s", c2)
|
t.Logf("c2: %s", c2)
|
||||||
|
d1 := mock.TipSet(mock.MkBlock(c1, 1, 6))
|
||||||
|
t.Logf("d1: %s", d1)
|
||||||
|
e1 := mock.TipSet(mock.MkBlock(d1, 1, 7))
|
||||||
|
t.Logf("e1: %s", e1)
|
||||||
|
|
||||||
runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
|
runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
|
||||||
sm.SetPeerHead(ctx, "peer1", a)
|
sm.SetPeerHead(ctx, "peer1", a)
|
||||||
assertGetSyncOp(t, stc, a)
|
assertGetSyncOp(t, stc, a)
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
t.Logf("bootstate: %d", sm.bootstrapState)
|
|
||||||
|
|
||||||
sm.SetPeerHead(ctx, "peer1", b1)
|
sm.SetPeerHead(ctx, "peer1", b1)
|
||||||
sm.SetPeerHead(ctx, "peer1", b2)
|
sm.SetPeerHead(ctx, "peer1", b2)
|
||||||
@ -96,7 +98,6 @@ func TestSyncManagerEdgeCase(t *testing.T) {
|
|||||||
if !b1op.ts.Equals(b1) {
|
if !b1op.ts.Equals(b1) {
|
||||||
b1op, b2op = b2op, b1op
|
b1op, b2op = b2op, b1op
|
||||||
}
|
}
|
||||||
t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String())
|
|
||||||
|
|
||||||
sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0
|
sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0
|
||||||
sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1
|
sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1
|
||||||
@ -106,10 +107,18 @@ func TestSyncManagerEdgeCase(t *testing.T) {
|
|||||||
b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0
|
b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0
|
||||||
// even though correct one is index 1
|
// even though correct one is index 1
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String())
|
|
||||||
b2op.done()
|
b2op.done()
|
||||||
// b2 completes and is not related to c1, so it leaves activeSyncTips as it is
|
// b2 completes and is not related to c1, so it leaves activeSyncTips as it is
|
||||||
|
|
||||||
|
waitUntilAllWorkersAreDone(stc)
|
||||||
|
|
||||||
|
if len(sm.activeSyncTips.buckets) != 0 {
|
||||||
|
t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitUntilAllWorkersAreDone(stc chan *syncOp) {
|
||||||
for i := 0; i < 10; {
|
for i := 0; i < 10; {
|
||||||
select {
|
select {
|
||||||
case so := <-stc:
|
case so := <-stc:
|
||||||
@ -119,13 +128,6 @@ func TestSyncManagerEdgeCase(t *testing.T) {
|
|||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
if len(sm.activeSyncTips.buckets) != 0 {
|
|
||||||
t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String())
|
|
||||||
}
|
|
||||||
t.Logf("activeSyncs: %s: activeSyncTips: %s", sm.activeSyncs, sm.activeSyncTips.String())
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncManager(t *testing.T) {
|
func TestSyncManager(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user