Merge pull request #4541 from filecoin-project/feat/maybe-fix-something
Fix chain sync stopping to sync
This commit is contained in:
commit
bfdb74bfee
@ -2,7 +2,9 @@ package chain
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
@ -11,6 +13,14 @@ import (
|
|||||||
|
|
||||||
const BootstrapPeerThreshold = 2
|
const BootstrapPeerThreshold = 2
|
||||||
|
|
||||||
|
var coalesceForksParents = false
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
if os.Getenv("LOTUS_SYNC_REL_PARENT") == "yes" {
|
||||||
|
coalesceForksParents = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
BSStateInit = 0
|
BSStateInit = 0
|
||||||
BSStateSelected = 1
|
BSStateSelected = 1
|
||||||
@ -152,6 +162,19 @@ func newSyncTargetBucket(tipsets ...*types.TipSet) *syncTargetBucket {
|
|||||||
return &stb
|
return &stb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sbs *syncBucketSet) String() string {
|
||||||
|
var bStrings []string
|
||||||
|
for _, b := range sbs.buckets {
|
||||||
|
var tsStrings []string
|
||||||
|
for _, t := range b.tips {
|
||||||
|
tsStrings = append(tsStrings, t.String())
|
||||||
|
}
|
||||||
|
bStrings = append(bStrings, "["+strings.Join(tsStrings, ",")+"]")
|
||||||
|
}
|
||||||
|
|
||||||
|
return "{" + strings.Join(bStrings, ";") + "}"
|
||||||
|
}
|
||||||
|
|
||||||
func (sbs *syncBucketSet) RelatedToAny(ts *types.TipSet) bool {
|
func (sbs *syncBucketSet) RelatedToAny(ts *types.TipSet) bool {
|
||||||
for _, b := range sbs.buckets {
|
for _, b := range sbs.buckets {
|
||||||
if b.sameChainAs(ts) {
|
if b.sameChainAs(ts) {
|
||||||
@ -198,13 +221,17 @@ func (sbs *syncBucketSet) removeBucket(toremove *syncTargetBucket) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
|
func (sbs *syncBucketSet) PopRelated(ts *types.TipSet) *syncTargetBucket {
|
||||||
|
var bOut *syncTargetBucket
|
||||||
for _, b := range sbs.buckets {
|
for _, b := range sbs.buckets {
|
||||||
if b.sameChainAs(ts) {
|
if b.sameChainAs(ts) {
|
||||||
sbs.removeBucket(b)
|
sbs.removeBucket(b)
|
||||||
return b
|
if bOut == nil {
|
||||||
|
bOut = &syncTargetBucket{}
|
||||||
|
}
|
||||||
|
bOut.tips = append(bOut.tips, b.tips...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return bOut
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbs *syncBucketSet) Heaviest() *types.TipSet {
|
func (sbs *syncBucketSet) Heaviest() *types.TipSet {
|
||||||
@ -224,8 +251,7 @@ func (sbs *syncBucketSet) Empty() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type syncTargetBucket struct {
|
type syncTargetBucket struct {
|
||||||
tips []*types.TipSet
|
tips []*types.TipSet
|
||||||
count int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
||||||
@ -239,12 +265,14 @@ func (stb *syncTargetBucket) sameChainAs(ts *types.TipSet) bool {
|
|||||||
if ts.Parents() == t.Key() {
|
if ts.Parents() == t.Key() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
if coalesceForksParents && ts.Parents() == t.Parents() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (stb *syncTargetBucket) add(ts *types.TipSet) {
|
func (stb *syncTargetBucket) add(ts *types.TipSet) {
|
||||||
stb.count++
|
|
||||||
|
|
||||||
for _, t := range stb.tips {
|
for _, t := range stb.tips {
|
||||||
if t.Equals(ts) {
|
if t.Equals(ts) {
|
||||||
@ -294,7 +322,6 @@ func (sm *syncManager) selectSyncTarget() (*types.TipSet, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sm *syncManager) syncScheduler() {
|
func (sm *syncManager) syncScheduler() {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ts, ok := <-sm.incomingTipSets:
|
case ts, ok := <-sm.incomingTipSets:
|
||||||
@ -326,7 +353,8 @@ func (sm *syncManager) scheduleIncoming(ts *types.TipSet) {
|
|||||||
var relatedToActiveSync bool
|
var relatedToActiveSync bool
|
||||||
for _, acts := range sm.activeSyncs {
|
for _, acts := range sm.activeSyncs {
|
||||||
if ts.Equals(acts) {
|
if ts.Equals(acts) {
|
||||||
break
|
// ignore, we are already syncing it
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if ts.Parents() == acts.Key() {
|
if ts.Parents() == acts.Key() {
|
||||||
@ -376,7 +404,9 @@ func (sm *syncManager) scheduleProcessResult(res *syncResult) {
|
|||||||
sm.nextSyncTarget = relbucket
|
sm.nextSyncTarget = relbucket
|
||||||
sm.workerChan = sm.syncTargets
|
sm.workerChan = sm.syncTargets
|
||||||
} else {
|
} else {
|
||||||
sm.syncQueue.buckets = append(sm.syncQueue.buckets, relbucket)
|
for _, t := range relbucket.tips {
|
||||||
|
sm.syncQueue.Insert(t)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,69 @@ func assertGetSyncOp(t *testing.T, c chan *syncOp, ts *types.TipSet) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncManagerEdgeCase(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
a := mock.TipSet(mock.MkBlock(genTs, 1, 1))
|
||||||
|
t.Logf("a: %s", a)
|
||||||
|
b1 := mock.TipSet(mock.MkBlock(a, 1, 2))
|
||||||
|
t.Logf("b1: %s", b1)
|
||||||
|
b2 := mock.TipSet(mock.MkBlock(a, 2, 3))
|
||||||
|
t.Logf("b2: %s", b2)
|
||||||
|
c1 := mock.TipSet(mock.MkBlock(b1, 2, 4))
|
||||||
|
t.Logf("c1: %s", c1)
|
||||||
|
c2 := mock.TipSet(mock.MkBlock(b2, 1, 5))
|
||||||
|
t.Logf("c2: %s", c2)
|
||||||
|
d1 := mock.TipSet(mock.MkBlock(c1, 1, 6))
|
||||||
|
t.Logf("d1: %s", d1)
|
||||||
|
e1 := mock.TipSet(mock.MkBlock(d1, 1, 7))
|
||||||
|
t.Logf("e1: %s", e1)
|
||||||
|
|
||||||
|
runSyncMgrTest(t, "edgeCase", 1, func(t *testing.T, sm *syncManager, stc chan *syncOp) {
|
||||||
|
sm.SetPeerHead(ctx, "peer1", a)
|
||||||
|
assertGetSyncOp(t, stc, a)
|
||||||
|
|
||||||
|
sm.SetPeerHead(ctx, "peer1", b1)
|
||||||
|
sm.SetPeerHead(ctx, "peer1", b2)
|
||||||
|
// b1 and b2 are being processed
|
||||||
|
|
||||||
|
b1op := <-stc
|
||||||
|
b2op := <-stc
|
||||||
|
if !b1op.ts.Equals(b1) {
|
||||||
|
b1op, b2op = b2op, b1op
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.SetPeerHead(ctx, "peer2", c2) // c2 is put into activeSyncTips at index 0
|
||||||
|
sm.SetPeerHead(ctx, "peer2", c1) // c1 is put into activeSyncTips at index 1
|
||||||
|
sm.SetPeerHead(ctx, "peer3", b2) // b2 is related to c2 and even though it is actively synced it is put into activeSyncTips index 0
|
||||||
|
sm.SetPeerHead(ctx, "peer1", a) // a is related to b2 and is put into activeSyncTips index 0
|
||||||
|
|
||||||
|
b1op.done() // b1 completes first, is related to a, so it pops activeSyncTips index 0
|
||||||
|
// even though correct one is index 1
|
||||||
|
|
||||||
|
b2op.done()
|
||||||
|
// b2 completes and is not related to c1, so it leaves activeSyncTips as it is
|
||||||
|
|
||||||
|
waitUntilAllWorkersAreDone(stc)
|
||||||
|
|
||||||
|
if len(sm.activeSyncTips.buckets) != 0 {
|
||||||
|
t.Errorf("activeSyncTips expected empty but got: %s", sm.activeSyncTips.String())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitUntilAllWorkersAreDone(stc chan *syncOp) {
|
||||||
|
for i := 0; i < 10; {
|
||||||
|
select {
|
||||||
|
case so := <-stc:
|
||||||
|
so.done()
|
||||||
|
default:
|
||||||
|
i++
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncManager(t *testing.T) {
|
func TestSyncManager(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user