test: chain syncer & sync manager
Added a few tests for some uncovered methods of the syncer and sync manager. Deleted some dead code (sync.go:getLatestBeaconEntry).
This commit is contained in:
parent
e435b42426
commit
ae66f57fa0
@ -1244,25 +1244,3 @@ func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
|
|||||||
bbr, ok := syncer.bad.Has(blk)
|
bbr, ok := syncer.bad.Has(blk)
|
||||||
return bbr.String(), ok
|
return bbr.String(), ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
|
|
||||||
cur := ts
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
cbe := cur.Blocks()[0].BeaconEntries
|
|
||||||
if len(cbe) > 0 {
|
|
||||||
return &cbe[len(cbe)-1], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if cur.Height() == 0 {
|
|
||||||
return nil, xerrors.Errorf("made it back to genesis block without finding beacon entry")
|
|
||||||
}
|
|
||||||
|
|
||||||
next, err := syncer.store.LoadTipSet(ctx, cur.Parents())
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("failed to load parents when searching back for latest beacon entry: %w", err)
|
|
||||||
}
|
|
||||||
cur = next
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, xerrors.Errorf("found NO beacon entries in the 20 latest tipsets")
|
|
||||||
}
|
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -240,3 +241,35 @@ func TestSyncManager(t *testing.T) {
|
|||||||
op3.done()
|
op3.done()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncManagerBucketSet(t *testing.T) {
|
||||||
|
ts1 := mock.TipSet(mock.MkBlock(nil, 0, 0))
|
||||||
|
ts2 := mock.TipSet(mock.MkBlock(ts1, 1, 0))
|
||||||
|
bucket1 := newSyncTargetBucket(ts1, ts2)
|
||||||
|
bucketSet := syncBucketSet{buckets: []*syncTargetBucket{bucket1}}
|
||||||
|
fmt.Println("bucketSet: ", bucketSet.String())
|
||||||
|
|
||||||
|
// inserting a tipset from an existing chain, should add to an existing bucket
|
||||||
|
ts3 := mock.TipSet(mock.MkBlock(ts2, 2, 0))
|
||||||
|
bucketSet.Insert(ts3)
|
||||||
|
require.Equal(t, 1, len(bucketSet.buckets))
|
||||||
|
require.Equal(t, 3, len(bucketSet.buckets[0].tips))
|
||||||
|
fmt.Println("bucketSet: ", bucketSet.String())
|
||||||
|
|
||||||
|
// inserting a tipset from new chain, should create a new bucket
|
||||||
|
ts4fork := mock.TipSet(mock.MkBlock(nil, 1, 1))
|
||||||
|
bucketSet.Insert(ts4fork)
|
||||||
|
require.Equal(t, 2, len(bucketSet.buckets))
|
||||||
|
require.Equal(t, 3, len(bucketSet.buckets[0].tips))
|
||||||
|
require.Equal(t, 1, len(bucketSet.buckets[1].tips))
|
||||||
|
fmt.Println("bucketSet: ", bucketSet.String())
|
||||||
|
|
||||||
|
// Pop removes the best bucket, e.g. bucket1
|
||||||
|
popped := bucketSet.Pop()
|
||||||
|
require.Equal(t, popped, bucket1)
|
||||||
|
require.Equal(t, 1, len(bucketSet.buckets))
|
||||||
|
|
||||||
|
// PopRelated removes the bucket containing the given tipset, leaving the set empty
|
||||||
|
bucketSet.PopRelated(ts4fork)
|
||||||
|
require.Equal(t, 0, len(bucketSet.buckets))
|
||||||
|
}
|
||||||
|
@ -1098,3 +1098,153 @@ func TestInvalidHeight(t *testing.T) {
|
|||||||
|
|
||||||
tu.mineOnBlock(base, 0, nil, false, true, nil, -1, true)
|
tu.mineOnBlock(base, 0, nil, false, true, nil, -1, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestIncomingBlocks mines new blocks and checks if the incoming channel streams new block headers properly
|
||||||
|
func TestIncomingBlocks(t *testing.T) {
|
||||||
|
H := 50
|
||||||
|
tu := prepSyncTest(t, H)
|
||||||
|
|
||||||
|
client := tu.addClientNode()
|
||||||
|
require.NoError(t, tu.mn.LinkAll())
|
||||||
|
|
||||||
|
clientNode := tu.nds[client]
|
||||||
|
incoming, err := clientNode.SyncIncomingBlocks(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
tu.connect(client, 0)
|
||||||
|
tu.waitUntilSync(0, client)
|
||||||
|
tu.compareSourceState(client)
|
||||||
|
|
||||||
|
timeout := time.After(10 * time.Second)
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
tu.mineNewBlock(0, nil)
|
||||||
|
tu.waitUntilSync(0, client)
|
||||||
|
tu.compareSourceState(client)
|
||||||
|
|
||||||
|
// just in case, so we don't get deadlocked
|
||||||
|
select {
|
||||||
|
case <-incoming:
|
||||||
|
case <-timeout:
|
||||||
|
tu.t.Fatal("TestIncomingBlocks timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSyncManualBadTS tests manually marking and unmarking blocks in the bad TS cache
|
||||||
|
func TestSyncManualBadTS(t *testing.T) {
|
||||||
|
// Test setup:
|
||||||
|
// - source node is fully synced,
|
||||||
|
// - client node is unsynced
|
||||||
|
// - client manually marked source's head and it's parent as bad
|
||||||
|
H := 50
|
||||||
|
tu := prepSyncTest(t, H)
|
||||||
|
|
||||||
|
client := tu.addClientNode()
|
||||||
|
require.NoError(t, tu.mn.LinkAll())
|
||||||
|
|
||||||
|
sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
clientHead, err := tu.nds[client].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync in test setup")
|
||||||
|
|
||||||
|
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHead.Cids()[0])
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
sourceHeadParent := sourceHead.Parents().Cids()[0]
|
||||||
|
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHeadParent)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
reason, err := tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")
|
||||||
|
|
||||||
|
reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")
|
||||||
|
|
||||||
|
// Assertion 1:
|
||||||
|
// - client shouldn't be synced after timeout, because the source TS is marked bad.
|
||||||
|
// - bad block is the first block that should be synced, 1sec should be enough
|
||||||
|
tu.connect(1, 0)
|
||||||
|
timeout := time.After(1 * time.Second)
|
||||||
|
<-timeout
|
||||||
|
|
||||||
|
clientHead, err = tu.nds[client].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync if source head is bad")
|
||||||
|
|
||||||
|
// Assertion 2:
|
||||||
|
// - after unmarking blocks as bad and reconnecting, source & client should be in sync
|
||||||
|
err = tu.nds[client].SyncUnmarkBad(tu.ctx, sourceHead.Cids()[0])
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")
|
||||||
|
|
||||||
|
err = tu.nds[client].SyncUnmarkAllBad(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")
|
||||||
|
|
||||||
|
tu.disconnect(1, 0)
|
||||||
|
tu.connect(1, 0)
|
||||||
|
|
||||||
|
tu.waitUntilSync(0, client)
|
||||||
|
tu.compareSourceState(client)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestState tests fetching the sync worker state before, during & after the sync
|
||||||
|
func TestSyncState(t *testing.T) {
|
||||||
|
H := 50
|
||||||
|
tu := prepSyncTest(t, H)
|
||||||
|
|
||||||
|
client := tu.addClientNode()
|
||||||
|
require.NoError(t, tu.mn.LinkAll())
|
||||||
|
clientNode := tu.nds[client]
|
||||||
|
sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
|
||||||
|
// sync state should be empty before the sync
|
||||||
|
state, err := clientNode.SyncState(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
require.Equal(tu.t, len(state.ActiveSyncs), 0)
|
||||||
|
|
||||||
|
tu.connect(client, 0)
|
||||||
|
|
||||||
|
// wait until sync starts, or at most `timeout` seconds
|
||||||
|
timeout := time.After(5 * time.Second)
|
||||||
|
activeSyncs := []api.ActiveSync{}
|
||||||
|
|
||||||
|
for len(activeSyncs) == 0 {
|
||||||
|
state, err = clientNode.SyncState(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
activeSyncs = state.ActiveSyncs
|
||||||
|
|
||||||
|
sleep := time.After(100 * time.Millisecond)
|
||||||
|
select {
|
||||||
|
case <-sleep:
|
||||||
|
case <-timeout:
|
||||||
|
tu.t.Fatal("TestSyncState timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check state during sync
|
||||||
|
require.Equal(tu.t, len(activeSyncs), 1)
|
||||||
|
require.True(tu.t, activeSyncs[0].Target.Equals(sourceHead))
|
||||||
|
|
||||||
|
tu.waitUntilSync(0, client)
|
||||||
|
tu.compareSourceState(client)
|
||||||
|
|
||||||
|
// check state after sync
|
||||||
|
state, err = clientNode.SyncState(tu.ctx)
|
||||||
|
require.NoError(tu.t, err)
|
||||||
|
require.Equal(tu.t, len(state.ActiveSyncs), 1)
|
||||||
|
require.Equal(tu.t, state.ActiveSyncs[0].Stage, api.StageSyncComplete)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user