unit test for WriteStateDiffAt

This commit is contained in:
Roy Crihfield 2023-04-03 23:58:56 +08:00
parent b51c467084
commit b275cef114
5 changed files with 154 additions and 17 deletions

View File

@ -194,7 +194,7 @@ func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscript
}
case err = <-rpcSub.Err():
if err != nil {
log.Error("State diff service rpcSub error: " + err.Error())
log.Error("statediff_StreamWrites RPC subscription error: " + err.Error())
return
}
case <-quitChan:

View File

@ -180,6 +180,7 @@ func NewBlockCache(max uint) BlockCache {
// New creates a new statediff.Service
// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
// func New(stack *node.Node, blockChain *core.BlockChain, networkID uint64, params Config, backend ethapi.Backend) error {
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
blockChain := ethServ.BlockChain()
var indexer interfaces.StateDiffIndexer
@ -237,6 +238,37 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
return nil
}
func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, indexer interfaces.StateDiffIndexer) *Service {
workers := cfg.NumWorkers
if workers == 0 {
workers = 1
}
quitCh := make(chan bool)
sds := &Service{
Mutex: sync.Mutex{},
BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers),
BackendAPI: backend,
WaitForSync: cfg.WaitForSync,
indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
jobStatusSubs: map[rpc.ID]statusSubscription{},
currentJobs: map[uint64]JobID{},
}
if indexer != nil {
indexer.ReportDBMetrics(10*time.Second, quitCh)
}
return sds
}
// Protocols exports the services p2p protocols, this service has none
func (sds *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
@ -816,7 +848,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}, params, output, ipldOutput)
// TODO this anti-pattern needs to be sorted out eventually
if err := tx.Submit(err); err != nil {
return fmt.Errorf("batch transaction submission failed: %s", err.Error())
return fmt.Errorf("batch transaction submission failed: %w", err)
}
// allow dereferencing of parent, keep current locked as it should be the next parent

View File

@ -18,6 +18,7 @@ package statediff_test
import (
"bytes"
"context"
"math/big"
"math/rand"
"reflect"
@ -25,6 +26,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
@ -85,6 +88,10 @@ var (
}
)
func init() {
defaultParams.ComputeWatchedAddressesLeafPaths()
}
func testErrorInChainEventLoop(t *testing.T) {
//the first chain event causes and error (in blockchain mock)
builder := mocks.Builder{}
@ -147,7 +154,6 @@ func testErrorInChainEventLoop(t *testing.T) {
}
}
defaultParams.ComputeWatchedAddressesLeafPaths()
if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
@ -200,7 +206,7 @@ func testErrorInBlockLoop(t *testing.T) {
}()
service.Loop(eventsChannel)
defaultParams.ComputeWatchedAddressesLeafPaths()
// defaultParams.ComputeWatchedAddressesLeafPaths()
if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
@ -220,10 +226,6 @@ func testErrorInBlockLoop(t *testing.T) {
}
func TestGetStateDiffAt(t *testing.T) {
testErrorInStateDiffAt(t)
}
func testErrorInStateDiffAt(t *testing.T) {
mockStateDiff := types2.StateObject{
BlockNumber: testBlock1.Number(),
BlockHash: testBlock1.Hash(),
@ -275,7 +277,7 @@ func testErrorInStateDiffAt(t *testing.T) {
t.Error(err)
}
defaultParams.ComputeWatchedAddressesLeafPaths()
// defaultParams.ComputeWatchedAddressesLeafPaths()
if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
@ -298,6 +300,82 @@ func testErrorInStateDiffAt(t *testing.T) {
}
}
type writeSub struct {
sub *rpc.ClientSubscription
statusChan <-chan statediff.JobStatus
client *rpc.Client
}
func (ws writeSub) close() {
ws.sub.Unsubscribe()
ws.client.Close()
}
// awaitStatus awaits status update for writeStateDiffAt job
func subscribeWrites(ctx context.Context, svc *statediff.Service) (writeSub, error) {
server := rpc.NewServer()
api := statediff.NewPublicStateDiffAPI(svc)
err := server.RegisterName("statediff", api)
if err != nil {
return writeSub{}, err
}
client := rpc.DialInProc(server)
statusChan := make(chan statediff.JobStatus)
sub, err := client.Subscribe(context.Background(), "statediff", statusChan, "streamWrites")
return writeSub{sub, statusChan, client}, err
}
func awaitJob(ws writeSub, job statediff.JobID, ctx context.Context) (bool, error) {
for {
select {
case err := <-ws.sub.Err():
return false, err
case status := <-ws.statusChan:
if status.Err != nil {
return false, status.Err
}
if status.ID == job {
return true, nil
}
case <-ctx.Done():
return false, ctx.Err()
}
}
}
func TestWriteStateDiffAt(t *testing.T) {
builder := mocks.Builder{}
indexer := mocks.StateDiffIndexer{}
blockChain := mocks.BlockChain{}
blockMapping := make(map[common.Hash]*types.Block)
blockMapping[parentBlock1.Hash()] = parentBlock1
blockChain.SetBlocksForHashes(blockMapping)
blockChain.SetBlockForNumber(testBlock1, testBlock1.NumberU64())
blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1)
service := statediff.NewService(&blockChain, statediff.Config{}, &mocks.Backend{}, &indexer)
service.Builder = &builder
// delay to avoid subscription request being sent after statediff is written,
// and timeout to prevent hanging just in case it still happens
writeDelay := time.Second
jobTimeout := time.Second
ws, err := subscribeWrites(context.Background(), service)
require.NoError(t, err)
defer ws.close()
time.Sleep(writeDelay)
job := service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams)
ctx, _ := context.WithTimeout(context.Background(), jobTimeout)
ok, err := awaitJob(ws, job, ctx)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, defaultParams, builder.Params)
require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash)
require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot)
require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot)
}
func TestWaitForSync(t *testing.T) {
testWaitForSync(t)
testGetSyncStatus(t)

View File

@ -28,7 +28,6 @@ var _ statediff.Builder = &Builder{}
type Builder struct {
Args statediff.Args
Params statediff.Params
StateRoots sdtypes.StateRoots
stateDiff sdtypes.StateObject
block *types.Block
stateTrie sdtypes.StateObject
@ -45,7 +44,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
// BuildStateDiffObject mock method
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}
builder.Args = args
builder.Params = params
return builder.builderError

View File

@ -27,19 +27,34 @@ import (
)
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
var _ interfaces.Batch = &batch{}
// StateDiffIndexer is a mock state diff indexer
type StateDiffIndexer struct{}
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
return nil, nil
type StateDiffIndexer struct {
StateNodes []sdtypes.StateLeafNode
IPLDs []sdtypes.IPLD
}
func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
type batch struct {
sdi *StateDiffIndexer
StateNodes []sdtypes.StateLeafNode
IPLDs []sdtypes.IPLD
}
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
return &batch{}, nil
}
func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
tx := txi.(*batch)
tx.StateNodes = append(tx.StateNodes, stateNode)
return nil
}
func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, iplds sdtypes.IPLD) error {
func (sdi *StateDiffIndexer) PushIPLD(txi interfaces.Batch, ipld sdtypes.IPLD) error {
tx := txi.(*batch)
tx.IPLDs = append(tx.IPLDs, ipld)
return nil
}
@ -68,3 +83,16 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
func (sdi *StateDiffIndexer) Close() error {
return nil
}
func (tx *batch) Submit(err error) error {
if err != nil {
return err
}
for _, sn := range tx.StateNodes {
tx.sdi.StateNodes = append(tx.sdi.StateNodes, sn)
}
for _, ipld := range tx.IPLDs {
tx.sdi.IPLDs = append(tx.sdi.IPLDs, ipld)
}
return nil
}