diff --git a/statediff/api.go b/statediff/api.go index 6c15bd57c..b2614d5e7 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -194,7 +194,7 @@ func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscript } case err = <-rpcSub.Err(): if err != nil { - log.Error("State diff service rpcSub error: " + err.Error()) + log.Error("statediff_StreamWrites RPC subscription error: " + err.Error()) return } case <-quitChan: diff --git a/statediff/service.go b/statediff/service.go index 5aaff6adf..5a171ef74 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -155,8 +155,8 @@ type JobID uint64 // JobStatus represents the status of a completed job type JobStatus struct { - id JobID - err error + ID JobID + Err error } type statusSubscription struct { @@ -180,6 +180,7 @@ func NewBlockCache(max uint) BlockCache { // New creates a new statediff.Service // func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error { +// func New(stack *node.Node, blockChain *core.BlockChain, networkID uint64, params Config, backend ethapi.Backend) error { func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error { blockChain := ethServ.BlockChain() var indexer interfaces.StateDiffIndexer @@ -221,6 +222,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params enableWriteLoop: params.EnableWriteLoop, numWorkers: workers, maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, } stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) @@ -235,6 +238,37 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params return nil } +func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, indexer interfaces.StateDiffIndexer) *Service { + workers := cfg.NumWorkers + if workers == 0 { + workers = 1 + } + + quitCh := make(chan bool) + sds := &Service{ + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(blockChain.StateCache()), + QuitChan: quitCh, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]Params), + BlockCache: NewBlockCache(workers), + BackendAPI: backend, + WaitForSync: cfg.WaitForSync, + indexer: indexer, + enableWriteLoop: cfg.EnableWriteLoop, + numWorkers: workers, + maxRetry: defaultRetryLimit, + jobStatusSubs: map[rpc.ID]statusSubscription{}, + currentJobs: map[uint64]JobID{}, + } + + if indexer != nil { + indexer.ReportDBMetrics(10*time.Second, quitCh) + } + return sds +} + // Protocols exports the services p2p protocols, this service has none func (sds *Service) Protocols() []p2p.Protocol { return []p2p.Protocol{} @@ -814,7 +848,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p }, params, output, ipldOutput) // TODO this anti-pattern needs to be sorted out eventually if err := tx.Submit(err); err != nil { - return fmt.Errorf("batch transaction submission failed: %s", err.Error()) + return fmt.Errorf("batch transaction submission failed: %w", err) } // allow dereferencing of parent, keep current locked as it should be the next parent @@ -843,9 +877,6 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) { log.Info("Subscribing to job status updates", "subscription id", id) sds.Lock() - if sds.jobStatusSubs == nil { - sds.jobStatusSubs = map[rpc.ID]statusSubscription{} - } sds.jobStatusSubs[id] = statusSubscription{ statusChan: sub, quitChan: quitChan, diff --git a/statediff/service_test.go b/statediff/service_test.go index 1df068608..ca5c1116e 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -18,6 +18,8 @@ package statediff_test import ( "bytes" + "context" + "errors" "math/big" "math/rand" "reflect" @@ -25,6 +27,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -85,6 +89,10 @@ var ( } ) +func init() { + defaultParams.ComputeWatchedAddressesLeafPaths() +} + func testErrorInChainEventLoop(t *testing.T) { //the first chain event causes and error (in blockchain mock) builder := mocks.Builder{} @@ -147,7 +155,6 @@ func testErrorInChainEventLoop(t *testing.T) { } } - defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -200,7 +207,6 @@ func testErrorInBlockLoop(t *testing.T) { }() service.Loop(eventsChannel) - defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -220,10 +226,6 @@ func testErrorInBlockLoop(t *testing.T) { } func TestGetStateDiffAt(t *testing.T) { - testErrorInStateDiffAt(t) -} - -func testErrorInStateDiffAt(t *testing.T) { mockStateDiff := types2.StateObject{ BlockNumber: testBlock1.Number(), BlockHash: testBlock1.Hash(), @@ -275,7 +277,6 @@ func testErrorInStateDiffAt(t *testing.T) { t.Error(err) } - defaultParams.ComputeWatchedAddressesLeafPaths() if !reflect.DeepEqual(builder.Params, defaultParams) { t.Error("Test failure:", t.Name()) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) @@ -298,6 +299,81 @@ func testErrorInStateDiffAt(t *testing.T) { } } +type writeSub struct { + sub *rpc.ClientSubscription + statusChan <-chan statediff.JobStatus + client *rpc.Client +} + +func (ws writeSub) close() { + ws.sub.Unsubscribe() + ws.client.Close() +} + +// awaitStatus awaits status update for writeStateDiffAt job +func subscribeWrites(ctx context.Context, svc *statediff.Service) (writeSub, error) { + server := rpc.NewServer() + api := statediff.NewPublicStateDiffAPI(svc) + err := server.RegisterName("statediff", api) + if err != nil { + return writeSub{}, err + } + client := rpc.DialInProc(server) + statusChan := make(chan statediff.JobStatus) + sub, err := client.Subscribe(ctx, "statediff", statusChan, "streamWrites") + return writeSub{sub, statusChan, client}, err +} + +func awaitJob(ws writeSub, job statediff.JobID, timeout time.Duration) (bool, error) { + for { + select { + case err := <-ws.sub.Err(): + return false, err + case status := <-ws.statusChan: + if status.Err != nil { + return false, status.Err + } + if status.ID == job { + return true, nil + } + case <-time.After(timeout): + return false, errors.New("timeout") + } + } +} + +func TestWriteStateDiffAt(t *testing.T) { + builder := mocks.Builder{} + indexer := mocks.StateDiffIndexer{} + blockChain := mocks.BlockChain{} + blockMapping := make(map[common.Hash]*types.Block) + blockMapping[parentBlock1.Hash()] = parentBlock1 + blockChain.SetBlocksForHashes(blockMapping) + blockChain.SetBlockForNumber(testBlock1, testBlock1.NumberU64()) + blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1) + + service := statediff.NewService(&blockChain, statediff.Config{}, &mocks.Backend{}, &indexer) + service.Builder = &builder + + // delay to avoid subscription request being sent after statediff is written, + // and timeout to prevent hanging just in case it still happens + writeDelay := 100 * time.Millisecond + jobTimeout := time.Second + ws, err := subscribeWrites(context.Background(), service) + require.NoError(t, err) + defer ws.close() + time.Sleep(writeDelay) + job := service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams) + ok, err := awaitJob(ws, job, jobTimeout) + require.NoError(t, err) + require.True(t, ok) + + require.Equal(t, defaultParams, builder.Params) + require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash) + require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot) + require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot) +} + func TestWaitForSync(t *testing.T) { testWaitForSync(t) testGetSyncStatus(t) diff --git a/statediff/test_helpers/mocks/builder.go b/statediff/test_helpers/mocks/builder.go index 490393e53..f50c4e978 100644 --- a/statediff/test_helpers/mocks/builder.go +++ b/statediff/test_helpers/mocks/builder.go @@ -28,7 +28,6 @@ var _ statediff.Builder = &Builder{} type Builder struct { Args statediff.Args Params statediff.Params - StateRoots sdtypes.StateRoots stateDiff sdtypes.StateObject block *types.Block stateTrie sdtypes.StateObject @@ -45,7 +44,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi // BuildStateDiffObject mock method func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { - builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot} + builder.Args = args builder.Params = params return builder.builderError diff --git a/statediff/test_helpers/mocks/indexer.go b/statediff/test_helpers/mocks/indexer.go index 218947d77..0524fbc14 100644 --- a/statediff/test_helpers/mocks/indexer.go +++ b/statediff/test_helpers/mocks/indexer.go @@ -27,19 +27,22 @@ import ( ) var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} +var _ interfaces.Batch = &batch{} // StateDiffIndexer is a mock state diff indexer type StateDiffIndexer struct{} +type batch struct{} + func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { - return nil, nil + return &batch{}, nil } -func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { +func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error { return nil } -func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, iplds sdtypes.IPLD) error { +func (sdi *StateDiffIndexer) PushIPLD(txi interfaces.Batch, ipld sdtypes.IPLD) error { return nil } @@ -68,3 +71,7 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { func (sdi *StateDiffIndexer) Close() error { return nil } + +func (tx *batch) Submit(err error) error { + return nil +}