From 3679a9aa17ec658df51a5bbe669fb1e60f470c2e Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 3 Apr 2023 17:34:09 +0800 Subject: [PATCH 1/4] publicize jobstatus fields --- statediff/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 5aaff6adf..8a8c570f8 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -155,8 +155,8 @@ type JobID uint64 // JobStatus represents the status of a completed job type JobStatus struct { - id JobID - err error + ID JobID + Err error } type statusSubscription struct { From b51c467084a8d0c5a93545c5f1e367af499f9d07 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 3 Apr 2023 22:43:14 +0800 Subject: [PATCH 2/4] init jobs maps --- statediff/service.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/statediff/service.go b/statediff/service.go index 8a8c570f8..acfe9574c 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -221,6 +221,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params enableWriteLoop: params.EnableWriteLoop, numWorkers: workers, maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -843,9 +845,6 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) { log.Info("Subscribing to job status updates", "subscription id", id) sds.Lock() - if sds.jobStatusSubs == nil { - sds.jobStatusSubs = map[rpc.ID]statusSubscription{} - } sds.jobStatusSubs[id] = statusSubscription{ statusChan: sub, quitChan: quitChan, From b275cef1142d7e60f4ac3aadab546602c61af61a Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Mon, 3 Apr 2023 23:58:56 +0800 Subject: [PATCH 3/4] unit test for WriteStateDiffAt --- statediff/api.go | 2 +- statediff/service.go | 34 ++++++++- statediff/service_test.go | 92 +++++++++++++++++++++++-- statediff/test_helpers/mocks/builder.go | 3 +- statediff/test_helpers/mocks/indexer.go | 40 +++++++++-- 5 files changed, 154 insertions(+), 17 deletions(-) diff --git a/statediff/api.go b/statediff/api.go index 6c15bd57c..b2614d5e7 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -194,7 +194,7 @@ func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscript } case err = <-rpcSub.Err(): if err != nil { - log.Error("State diff service rpcSub error: " + err.Error()) + log.Error("statediff_StreamWrites RPC subscription error: " + err.Error()) return } case <-quitChan: diff --git a/statediff/service.go b/statediff/service.go index acfe9574c..5a171ef74 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -180,6 +180,7 @@ func NewBlockCache(max uint) BlockCache { // New creates a new statediff.Service // func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +// func New(stack *node.Node, blockChain *core.BlockChain, networkID uint64, params Config, backend ethapi.Backend) error { func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error { blockChain := ethServ.BlockChain() var indexer interfaces.StateDiffIndexer @@ -237,6 +238,37 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params return nil } +func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, indexer interfaces.StateDiffIndexer) *Service { + workers := cfg.NumWorkers + if workers == 0 { + workers = 1 + } + + quitCh := make(chan bool) + sds := &Service{ + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: cfg.WaitForSync, + indexer: indexer, + enableWriteLoop: cfg.EnableWriteLoop, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + } + + if indexer != nil { + indexer.ReportDBMetrics(10*time.Second, quitCh) + } + return sds +} + // Protocols exports the services p2p protocols, this service has none func (sds *Service) Protocols() []p2p.Protocol { return []p2p.Protocol{} @@ -816,7 +848,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p }, params, output, ipldOutput) // TODO this anti-pattern needs to be sorted out eventually if err := tx.Submit(err); err != nil { - return fmt.Errorf("batch transaction submission failed: %s", err.Error()) + return fmt.Errorf("batch transaction submission failed: %w", err) } // allow dereferencing of parent, keep current locked as it should be the next parent diff --git a/statediff/service_test.go b/statediff/service_test.go index 1df068608..88fe14520 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -18,6 +18,7 @@ package statediff_test import ( "bytes" + "context" "math/big" "math/rand" "reflect" @@ -25,6 +26,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -85,6 +88,10 @@ var ( } ) +func init() { + defaultParams.ComputeWatchedAddressesLeafPaths() +} + func testErrorInChainEventLoop(t *testing.T) { //the first chain event causes and error (in blockchain mock) builder := mocks.Builder{} @@ -147,7 +154,6 @@ func testErrorInChainEventLoop(t *testing.T) { } } - defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -200,7 +206,7 @@ func testErrorInBlockLoop(t *testing.T) { }() service.Loop(eventsChannel) - defaultParams.ComputeWatchedAddressesLeafPaths() + // defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -220,10 +226,6 @@ func testErrorInBlockLoop(t *testing.T) { } func TestGetStateDiffAt(t *testing.T) { - testErrorInStateDiffAt(t) -} - -func testErrorInStateDiffAt(t *testing.T) { mockStateDiff := types2.StateObject{ BlockNumber: testBlock1.Number(), BlockHash: testBlock1.Hash(), @@ -275,7 +277,7 @@ func testErrorInStateDiffAt(t *testing.T) { t.Error(err) } - defaultParams.ComputeWatchedAddressesLeafPaths() + // defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -298,6 +300,82 @@ func testErrorInStateDiffAt(t *testing.T) { } } +type writeSub struct { + sub *rpc.ClientSubscription + statusChan <-chan statediff.JobStatus + client *rpc.Client +} + +func (ws writeSub) close() { + ws.sub.Unsubscribe() + ws.client.Close() +} + +// awaitStatus awaits status update for writeStateDiffAt job +func subscribeWrites(ctx context.Context, svc *statediff.Service) (writeSub, error) { + server := rpc.NewServer() + api := statediff.NewPublicStateDiffAPI(svc) + err := server.RegisterName("statediff", api) + if err != nil { + return writeSub{}, err + } + client := rpc.DialInProc(server) + statusChan := make(chan statediff.JobStatus) + sub, err := client.Subscribe(context.Background(), "statediff", statusChan, "streamWrites") + return writeSub{sub, statusChan, client}, err +} + +func awaitJob(ws writeSub, job statediff.JobID, ctx context.Context) (bool, error) { + for { + select { + case err := <-ws.sub.Err(): + return false, err + case status := <-ws.statusChan: + if status.Err != nil { + return false, status.Err + } + if status.ID == job { + return true, nil + } + case <-ctx.Done(): + return false, ctx.Err() + } + } +} + +func TestWriteStateDiffAt(t *testing.T) { + builder := mocks.Builder{} + indexer := mocks.StateDiffIndexer{} + blockChain := mocks.BlockChain{} + blockMapping := make(map[common.Hash]*types.Block) + blockMapping[parentBlock1.Hash()] = parentBlock1 + blockChain.SetBlocksForHashes(blockMapping) + blockChain.SetBlockForNumber(testBlock1, testBlock1.NumberU64()) + blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1) + + service := statediff.NewService(&blockChain, statediff.Config{}, &mocks.Backend{}, &indexer) + service.Builder = &builder + + // delay to avoid subscription request being sent after statediff is written, + // and timeout to prevent hanging just in case it still happens + writeDelay := time.Second + jobTimeout := time.Second + ws, err := subscribeWrites(context.Background(), service) + require.NoError(t, err) + defer ws.close() + time.Sleep(writeDelay) + job := service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams) + ctx, _ := context.WithTimeout(context.Background(), jobTimeout) + ok, err := awaitJob(ws, job, ctx) + require.NoError(t, err) + require.True(t, ok) + + require.Equal(t, defaultParams, builder.Params) + require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash) + require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot) + require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot) +} + func TestWaitForSync(t *testing.T) { testWaitForSync(t) testGetSyncStatus(t) diff --git a/statediff/test_helpers/mocks/builder.go b/statediff/test_helpers/mocks/builder.go index 490393e53..f50c4e978 100644 --- a/statediff/test_helpers/mocks/builder.go +++ b/statediff/test_helpers/mocks/builder.go @@ -28,7 +28,6 @@ var _ statediff.Builder = &Builder{} type Builder struct { Args statediff.Args Params statediff.Params - StateRoots sdtypes.StateRoots stateDiff sdtypes.StateObject block *types.Block stateTrie sdtypes.StateObject @@ -45,7 +44,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi // BuildStateDiffObject mock method func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { - builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot} + builder.Args = args builder.Params = params return builder.builderError diff --git a/statediff/test_helpers/mocks/indexer.go b/statediff/test_helpers/mocks/indexer.go index 218947d77..f96a9829a 100644 --- a/statediff/test_helpers/mocks/indexer.go +++ b/statediff/test_helpers/mocks/indexer.go @@ -27,19 +27,34 @@ import ( ) var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} +var _ interfaces.Batch = &batch{} // StateDiffIndexer is a mock state diff indexer -type StateDiffIndexer struct{} - -func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - return nil, nil +type StateDiffIndexer struct { + StateNodes []sdtypes.StateLeafNode + IPLDs []sdtypes.IPLD } -func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { +type batch struct { + sdi *StateDiffIndexer + + StateNodes []sdtypes.StateLeafNode + IPLDs []sdtypes.IPLD +} + +func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { + return &batch{}, nil +} + +func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { + tx := txi.(*batch) + tx.StateNodes = append(tx.StateNodes, stateNode) return nil } -func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, iplds sdtypes.IPLD) error { +func (sdi *StateDiffIndexer) PushIPLD(txi interfaces.Batch, ipld sdtypes.IPLD) error { + tx := txi.(*batch) + tx.IPLDs = append(tx.IPLDs, ipld) return nil } @@ -68,3 +83,16 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { func (sdi *StateDiffIndexer) Close() error { return nil } + +func (tx *batch) Submit(err error) error { + if err != nil { + return err + } + for _, sn := range tx.StateNodes { + tx.sdi.StateNodes = append(tx.sdi.StateNodes, sn) + } + for _, ipld := range tx.IPLDs { + tx.sdi.IPLDs = append(tx.sdi.IPLDs, ipld) + } + return nil +} From 12590d204527e07ea65bf90805e175e7b4f2aeab Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 4 Apr 2023 11:42:41 +0800 Subject: [PATCH 4/4] lint, cleanup --- statediff/service_test.go | 16 +++++++--------- statediff/test_helpers/mocks/indexer.go | 25 ++----------------------- 2 files changed, 9 insertions(+), 32 deletions(-) diff --git a/statediff/service_test.go b/statediff/service_test.go index 88fe14520..ca5c1116e 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -19,6 +19,7 @@ package statediff_test import ( "bytes" "context" + "errors" "math/big" "math/rand" "reflect" @@ -206,7 +207,6 @@ func testErrorInBlockLoop(t *testing.T) { }() service.Loop(eventsChannel) - // defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -277,7 +277,6 @@ func TestGetStateDiffAt(t *testing.T) { t.Error(err) } - // defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -321,11 +320,11 @@ func subscribeWrites(ctx context.Context, svc *statediff.Service) (writeSub, err } client := rpc.DialInProc(server) statusChan := make(chan statediff.JobStatus) - sub, err := client.Subscribe(context.Background(), "statediff", statusChan, "streamWrites") + sub, err := client.Subscribe(ctx, "statediff", statusChan, "streamWrites") return writeSub{sub, statusChan, client}, err } -func awaitJob(ws writeSub, job statediff.JobID, ctx context.Context) (bool, error) { +func awaitJob(ws writeSub, job statediff.JobID, timeout time.Duration) (bool, error) { for { select { case err := <-ws.sub.Err(): @@ -337,8 +336,8 @@ func awaitJob(ws writeSub, job statediff.JobID, ctx context.Context) (bool, erro if status.ID == job { return true, nil } - case <-ctx.Done(): - return false, ctx.Err() + case <-time.After(timeout): + return false, errors.New("timeout") } } } @@ -358,15 +357,14 @@ func TestWriteStateDiffAt(t *testing.T) { // delay to avoid subscription request being sent after statediff is written, // and timeout to prevent hanging just in case it still happens - writeDelay := time.Second + writeDelay := 100 * time.Millisecond jobTimeout := time.Second ws, err := subscribeWrites(context.Background(), service) require.NoError(t, err) defer ws.close() time.Sleep(writeDelay) job := service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams) - ctx, _ := context.WithTimeout(context.Background(), jobTimeout) - ok, err := awaitJob(ws, job, ctx) + ok, err := awaitJob(ws, job, jobTimeout) require.NoError(t, err) require.True(t, ok) diff --git a/statediff/test_helpers/mocks/indexer.go b/statediff/test_helpers/mocks/indexer.go index f96a9829a..0524fbc14 100644 --- a/statediff/test_helpers/mocks/indexer.go +++ b/statediff/test_helpers/mocks/indexer.go @@ -30,31 +30,19 @@ var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} var _ interfaces.Batch = &batch{} // StateDiffIndexer is a mock state diff indexer -type StateDiffIndexer struct { - StateNodes []sdtypes.StateLeafNode - IPLDs []sdtypes.IPLD -} +type StateDiffIndexer struct{} -type batch struct { - sdi *StateDiffIndexer - - StateNodes []sdtypes.StateLeafNode - IPLDs []sdtypes.IPLD -} +type batch struct{} func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { return &batch{}, nil } func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { - tx := txi.(*batch) - tx.StateNodes = append(tx.StateNodes, stateNode) return nil } func (sdi *StateDiffIndexer) PushIPLD(txi interfaces.Batch, ipld sdtypes.IPLD) error { - tx := txi.(*batch) - tx.IPLDs = append(tx.IPLDs, ipld) return nil } @@ -85,14 +73,5 @@ func (sdi *StateDiffIndexer) Close() error { } func (tx *batch) Submit(err error) error { - if err != nil { - return err - } - for _, sn := range tx.StateNodes { - tx.sdi.StateNodes = append(tx.sdi.StateNodes, sn) - } - for _, ipld := range tx.IPLDs { - tx.sdi.IPLDs = append(tx.sdi.IPLDs, ipld) - } return nil }