Merge pull request #8072 from filecoin-project/bloxico/syncer-tests
test: chain: unit tests for the syncer & sync manager
This commit is contained in:
commit
9c22065459
@ -1244,25 +1244,3 @@ func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
|
||||
bbr, ok := syncer.bad.Has(blk)
|
||||
return bbr.String(), ok
|
||||
}
|
||||
|
||||
func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
|
||||
cur := ts
|
||||
for i := 0; i < 20; i++ {
|
||||
cbe := cur.Blocks()[0].BeaconEntries
|
||||
if len(cbe) > 0 {
|
||||
return &cbe[len(cbe)-1], nil
|
||||
}
|
||||
|
||||
if cur.Height() == 0 {
|
||||
return nil, xerrors.Errorf("made it back to genesis block without finding beacon entry")
|
||||
}
|
||||
|
||||
next, err := syncer.store.LoadTipSet(ctx, cur.Parents())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load parents when searching back for latest beacon entry: %w", err)
|
||||
}
|
||||
cur = next
|
||||
}
|
||||
|
||||
return nil, xerrors.Errorf("found NO beacon entries in the 20 latest tipsets")
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -240,3 +241,34 @@ func TestSyncManager(t *testing.T) {
|
||||
op3.done()
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncManagerBucketSet(t *testing.T) {
|
||||
ts1 := mock.TipSet(mock.MkBlock(nil, 0, 0))
|
||||
ts2 := mock.TipSet(mock.MkBlock(ts1, 1, 0))
|
||||
bucket1 := newSyncTargetBucket(ts1, ts2)
|
||||
bucketSet := syncBucketSet{buckets: []*syncTargetBucket{bucket1}}
|
||||
|
||||
// inserting a tipset (potential sync target) from an existing chain, should add to an existing bucket
|
||||
//stm: @CHAIN_SYNCER_ADD_SYNC_TARGET_001
|
||||
ts3 := mock.TipSet(mock.MkBlock(ts2, 2, 0))
|
||||
bucketSet.Insert(ts3)
|
||||
require.Equal(t, 1, len(bucketSet.buckets))
|
||||
require.Equal(t, 3, len(bucketSet.buckets[0].tips))
|
||||
|
||||
// inserting a tipset from new chain, should create a new bucket
|
||||
ts4fork := mock.TipSet(mock.MkBlock(nil, 1, 1))
|
||||
bucketSet.Insert(ts4fork)
|
||||
require.Equal(t, 2, len(bucketSet.buckets))
|
||||
require.Equal(t, 3, len(bucketSet.buckets[0].tips))
|
||||
require.Equal(t, 1, len(bucketSet.buckets[1].tips))
|
||||
|
||||
// Pop removes the best bucket (best sync target), e.g. bucket1
|
||||
//stm: @CHAIN_SYNCER_SELECT_SYNC_TARGET_001
|
||||
popped := bucketSet.Pop()
|
||||
require.Equal(t, popped, bucket1)
|
||||
require.Equal(t, 1, len(bucketSet.buckets))
|
||||
|
||||
// PopRelated removes the bucket containing the given tipset, leaving the set empty
|
||||
bucketSet.PopRelated(ts4fork)
|
||||
require.Equal(t, 0, len(bucketSet.buckets))
|
||||
}
|
||||
|
@ -1098,3 +1098,158 @@ func TestInvalidHeight(t *testing.T) {
|
||||
|
||||
tu.mineOnBlock(base, 0, nil, false, true, nil, -1, true)
|
||||
}
|
||||
|
||||
// TestIncomingBlocks mines new blocks and checks if the incoming channel streams new block headers properly
|
||||
func TestIncomingBlocks(t *testing.T) {
|
||||
H := 50
|
||||
tu := prepSyncTest(t, H)
|
||||
|
||||
client := tu.addClientNode()
|
||||
require.NoError(t, tu.mn.LinkAll())
|
||||
|
||||
clientNode := tu.nds[client]
|
||||
//stm: @CHAIN_SYNCER_INCOMING_BLOCKS_001
|
||||
incoming, err := clientNode.SyncIncomingBlocks(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
tu.connect(client, 0)
|
||||
tu.waitUntilSync(0, client)
|
||||
tu.compareSourceState(client)
|
||||
|
||||
timeout := time.After(10 * time.Second)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
tu.mineNewBlock(0, nil)
|
||||
tu.waitUntilSync(0, client)
|
||||
tu.compareSourceState(client)
|
||||
|
||||
// just in case, so we don't get deadlocked
|
||||
select {
|
||||
case <-incoming:
|
||||
case <-timeout:
|
||||
tu.t.Fatal("TestIncomingBlocks timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyncManualBadTS tests manually marking and unmarking blocks in the bad TS cache
|
||||
func TestSyncManualBadTS(t *testing.T) {
|
||||
// Test setup:
|
||||
// - source node is fully synced,
|
||||
// - client node is unsynced
|
||||
// - client manually marked source's head and it's parent as bad
|
||||
H := 50
|
||||
tu := prepSyncTest(t, H)
|
||||
|
||||
client := tu.addClientNode()
|
||||
require.NoError(t, tu.mn.LinkAll())
|
||||
|
||||
sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
clientHead, err := tu.nds[client].ChainHead(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync in test setup")
|
||||
|
||||
//stm: @CHAIN_SYNCER_MARK_BAD_001
|
||||
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHead.Cids()[0])
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
sourceHeadParent := sourceHead.Parents().Cids()[0]
|
||||
err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHeadParent)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
//stm: @CHAIN_SYNCER_CHECK_BAD_001
|
||||
reason, err := tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
|
||||
require.NoError(tu.t, err)
|
||||
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")
|
||||
|
||||
reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
|
||||
require.NoError(tu.t, err)
|
||||
require.NotEqual(tu.t, "", reason, "block is not bad after manually marking")
|
||||
|
||||
// Assertion 1:
|
||||
// - client shouldn't be synced after timeout, because the source TS is marked bad.
|
||||
// - bad block is the first block that should be synced, 1sec should be enough
|
||||
tu.connect(1, 0)
|
||||
timeout := time.After(1 * time.Second)
|
||||
<-timeout
|
||||
|
||||
clientHead, err = tu.nds[client].ChainHead(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync if source head is bad")
|
||||
|
||||
// Assertion 2:
|
||||
// - after unmarking blocks as bad and reconnecting, source & client should be in sync
|
||||
//stm: @CHAIN_SYNCER_UNMARK_BAD_001
|
||||
err = tu.nds[client].SyncUnmarkBad(tu.ctx, sourceHead.Cids()[0])
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0])
|
||||
require.NoError(tu.t, err)
|
||||
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")
|
||||
|
||||
err = tu.nds[client].SyncUnmarkAllBad(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent)
|
||||
require.NoError(tu.t, err)
|
||||
require.Equal(tu.t, "", reason, "block is still bad after manually unmarking")
|
||||
|
||||
tu.disconnect(1, 0)
|
||||
tu.connect(1, 0)
|
||||
|
||||
tu.waitUntilSync(0, client)
|
||||
tu.compareSourceState(client)
|
||||
}
|
||||
|
||||
// TestState tests fetching the sync worker state before, during & after the sync
|
||||
func TestSyncState(t *testing.T) {
|
||||
H := 50
|
||||
tu := prepSyncTest(t, H)
|
||||
|
||||
client := tu.addClientNode()
|
||||
require.NoError(t, tu.mn.LinkAll())
|
||||
clientNode := tu.nds[client]
|
||||
sourceHead, err := tu.nds[source].ChainHead(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
|
||||
// sync state should be empty before the sync
|
||||
state, err := clientNode.SyncState(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
require.Equal(tu.t, len(state.ActiveSyncs), 0)
|
||||
|
||||
tu.connect(client, 0)
|
||||
|
||||
// wait until sync starts, or at most `timeout` seconds
|
||||
timeout := time.After(5 * time.Second)
|
||||
activeSyncs := []api.ActiveSync{}
|
||||
|
||||
for len(activeSyncs) == 0 {
|
||||
//stm: @CHAIN_SYNCER_STATE_001
|
||||
state, err = clientNode.SyncState(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
activeSyncs = state.ActiveSyncs
|
||||
|
||||
sleep := time.After(100 * time.Millisecond)
|
||||
select {
|
||||
case <-sleep:
|
||||
case <-timeout:
|
||||
tu.t.Fatal("TestSyncState timeout")
|
||||
}
|
||||
}
|
||||
|
||||
// check state during sync
|
||||
require.Equal(tu.t, len(activeSyncs), 1)
|
||||
require.True(tu.t, activeSyncs[0].Target.Equals(sourceHead))
|
||||
|
||||
tu.waitUntilSync(0, client)
|
||||
tu.compareSourceState(client)
|
||||
|
||||
// check state after sync
|
||||
state, err = clientNode.SyncState(tu.ctx)
|
||||
require.NoError(tu.t, err)
|
||||
require.Equal(tu.t, len(state.ActiveSyncs), 1)
|
||||
require.Equal(tu.t, state.ActiveSyncs[0].Stage, api.StageSyncComplete)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user