Merge pull request #358 from cerc-io/roy/v5-dev
Fix and test WriteStateDiffAt
This commit is contained in:
commit
60f1d6b4ef
@ -194,7 +194,7 @@ func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscript
|
|||||||
}
|
}
|
||||||
case err = <-rpcSub.Err():
|
case err = <-rpcSub.Err():
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("State diff service rpcSub error: " + err.Error())
|
log.Error("statediff_StreamWrites RPC subscription error: " + err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-quitChan:
|
case <-quitChan:
|
||||||
|
@ -155,8 +155,8 @@ type JobID uint64
|
|||||||
|
|
||||||
// JobStatus represents the status of a completed job
|
// JobStatus represents the status of a completed job
|
||||||
type JobStatus struct {
|
type JobStatus struct {
|
||||||
id JobID
|
ID JobID
|
||||||
err error
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
type statusSubscription struct {
|
type statusSubscription struct {
|
||||||
@ -180,6 +180,7 @@ func NewBlockCache(max uint) BlockCache {
|
|||||||
|
|
||||||
// New creates a new statediff.Service
|
// New creates a new statediff.Service
|
||||||
// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
|
// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
|
||||||
|
// func New(stack *node.Node, blockChain *core.BlockChain, networkID uint64, params Config, backend ethapi.Backend) error {
|
||||||
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
||||||
blockChain := ethServ.BlockChain()
|
blockChain := ethServ.BlockChain()
|
||||||
var indexer interfaces.StateDiffIndexer
|
var indexer interfaces.StateDiffIndexer
|
||||||
@ -221,6 +222,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
enableWriteLoop: params.EnableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
numWorkers: workers,
|
numWorkers: workers,
|
||||||
maxRetry: defaultRetryLimit,
|
maxRetry: defaultRetryLimit,
|
||||||
|
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
||||||
|
currentJobs: map[uint64]JobID{},
|
||||||
}
|
}
|
||||||
stack.RegisterLifecycle(sds)
|
stack.RegisterLifecycle(sds)
|
||||||
stack.RegisterAPIs(sds.APIs())
|
stack.RegisterAPIs(sds.APIs())
|
||||||
@ -235,6 +238,37 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewService(blockChain blockChain, cfg Config, backend ethapi.Backend, indexer interfaces.StateDiffIndexer) *Service {
|
||||||
|
workers := cfg.NumWorkers
|
||||||
|
if workers == 0 {
|
||||||
|
workers = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
quitCh := make(chan bool)
|
||||||
|
sds := &Service{
|
||||||
|
Mutex: sync.Mutex{},
|
||||||
|
BlockChain: blockChain,
|
||||||
|
Builder: NewBuilder(blockChain.StateCache()),
|
||||||
|
QuitChan: quitCh,
|
||||||
|
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
||||||
|
SubscriptionTypes: make(map[common.Hash]Params),
|
||||||
|
BlockCache: NewBlockCache(workers),
|
||||||
|
BackendAPI: backend,
|
||||||
|
WaitForSync: cfg.WaitForSync,
|
||||||
|
indexer: indexer,
|
||||||
|
enableWriteLoop: cfg.EnableWriteLoop,
|
||||||
|
numWorkers: workers,
|
||||||
|
maxRetry: defaultRetryLimit,
|
||||||
|
jobStatusSubs: map[rpc.ID]statusSubscription{},
|
||||||
|
currentJobs: map[uint64]JobID{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if indexer != nil {
|
||||||
|
indexer.ReportDBMetrics(10*time.Second, quitCh)
|
||||||
|
}
|
||||||
|
return sds
|
||||||
|
}
|
||||||
|
|
||||||
// Protocols exports the services p2p protocols, this service has none
|
// Protocols exports the services p2p protocols, this service has none
|
||||||
func (sds *Service) Protocols() []p2p.Protocol {
|
func (sds *Service) Protocols() []p2p.Protocol {
|
||||||
return []p2p.Protocol{}
|
return []p2p.Protocol{}
|
||||||
@ -814,7 +848,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
}, params, output, ipldOutput)
|
}, params, output, ipldOutput)
|
||||||
// TODO this anti-pattern needs to be sorted out eventually
|
// TODO this anti-pattern needs to be sorted out eventually
|
||||||
if err := tx.Submit(err); err != nil {
|
if err := tx.Submit(err); err != nil {
|
||||||
return fmt.Errorf("batch transaction submission failed: %s", err.Error())
|
return fmt.Errorf("batch transaction submission failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// allow dereferencing of parent, keep current locked as it should be the next parent
|
// allow dereferencing of parent, keep current locked as it should be the next parent
|
||||||
@ -843,9 +877,6 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
|
|||||||
func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) {
|
func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) {
|
||||||
log.Info("Subscribing to job status updates", "subscription id", id)
|
log.Info("Subscribing to job status updates", "subscription id", id)
|
||||||
sds.Lock()
|
sds.Lock()
|
||||||
if sds.jobStatusSubs == nil {
|
|
||||||
sds.jobStatusSubs = map[rpc.ID]statusSubscription{}
|
|
||||||
}
|
|
||||||
sds.jobStatusSubs[id] = statusSubscription{
|
sds.jobStatusSubs[id] = statusSubscription{
|
||||||
statusChan: sub,
|
statusChan: sub,
|
||||||
quitChan: quitChan,
|
quitChan: quitChan,
|
||||||
|
@ -18,6 +18,8 @@ package statediff_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"math/big"
|
"math/big"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -25,6 +27,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
@ -85,6 +89,10 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
defaultParams.ComputeWatchedAddressesLeafPaths()
|
||||||
|
}
|
||||||
|
|
||||||
func testErrorInChainEventLoop(t *testing.T) {
|
func testErrorInChainEventLoop(t *testing.T) {
|
||||||
//the first chain event causes and error (in blockchain mock)
|
//the first chain event causes and error (in blockchain mock)
|
||||||
builder := mocks.Builder{}
|
builder := mocks.Builder{}
|
||||||
@ -147,7 +155,6 @@ func testErrorInChainEventLoop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
defaultParams.ComputeWatchedAddressesLeafPaths()
|
|
||||||
if !reflect.DeepEqual(builder.Params, defaultParams) {
|
if !reflect.DeepEqual(builder.Params, defaultParams) {
|
||||||
t.Error("Test failure:", t.Name())
|
t.Error("Test failure:", t.Name())
|
||||||
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
|
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
|
||||||
@ -200,7 +207,6 @@ func testErrorInBlockLoop(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
service.Loop(eventsChannel)
|
service.Loop(eventsChannel)
|
||||||
|
|
||||||
defaultParams.ComputeWatchedAddressesLeafPaths()
|
|
||||||
if !reflect.DeepEqual(builder.Params, defaultParams) {
|
if !reflect.DeepEqual(builder.Params, defaultParams) {
|
||||||
t.Error("Test failure:", t.Name())
|
t.Error("Test failure:", t.Name())
|
||||||
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
|
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
|
||||||
@ -220,10 +226,6 @@ func testErrorInBlockLoop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetStateDiffAt(t *testing.T) {
|
func TestGetStateDiffAt(t *testing.T) {
|
||||||
testErrorInStateDiffAt(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testErrorInStateDiffAt(t *testing.T) {
|
|
||||||
mockStateDiff := types2.StateObject{
|
mockStateDiff := types2.StateObject{
|
||||||
BlockNumber: testBlock1.Number(),
|
BlockNumber: testBlock1.Number(),
|
||||||
BlockHash: testBlock1.Hash(),
|
BlockHash: testBlock1.Hash(),
|
||||||
@ -275,7 +277,6 @@ func testErrorInStateDiffAt(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defaultParams.ComputeWatchedAddressesLeafPaths()
|
|
||||||
if !reflect.DeepEqual(builder.Params, defaultParams) {
|
if !reflect.DeepEqual(builder.Params, defaultParams) {
|
||||||
t.Error("Test failure:", t.Name())
|
t.Error("Test failure:", t.Name())
|
||||||
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
|
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
|
||||||
@ -298,6 +299,81 @@ func testErrorInStateDiffAt(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type writeSub struct {
|
||||||
|
sub *rpc.ClientSubscription
|
||||||
|
statusChan <-chan statediff.JobStatus
|
||||||
|
client *rpc.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws writeSub) close() {
|
||||||
|
ws.sub.Unsubscribe()
|
||||||
|
ws.client.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// awaitStatus awaits status update for writeStateDiffAt job
|
||||||
|
func subscribeWrites(ctx context.Context, svc *statediff.Service) (writeSub, error) {
|
||||||
|
server := rpc.NewServer()
|
||||||
|
api := statediff.NewPublicStateDiffAPI(svc)
|
||||||
|
err := server.RegisterName("statediff", api)
|
||||||
|
if err != nil {
|
||||||
|
return writeSub{}, err
|
||||||
|
}
|
||||||
|
client := rpc.DialInProc(server)
|
||||||
|
statusChan := make(chan statediff.JobStatus)
|
||||||
|
sub, err := client.Subscribe(ctx, "statediff", statusChan, "streamWrites")
|
||||||
|
return writeSub{sub, statusChan, client}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func awaitJob(ws writeSub, job statediff.JobID, timeout time.Duration) (bool, error) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-ws.sub.Err():
|
||||||
|
return false, err
|
||||||
|
case status := <-ws.statusChan:
|
||||||
|
if status.Err != nil {
|
||||||
|
return false, status.Err
|
||||||
|
}
|
||||||
|
if status.ID == job {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
case <-time.After(timeout):
|
||||||
|
return false, errors.New("timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteStateDiffAt(t *testing.T) {
|
||||||
|
builder := mocks.Builder{}
|
||||||
|
indexer := mocks.StateDiffIndexer{}
|
||||||
|
blockChain := mocks.BlockChain{}
|
||||||
|
blockMapping := make(map[common.Hash]*types.Block)
|
||||||
|
blockMapping[parentBlock1.Hash()] = parentBlock1
|
||||||
|
blockChain.SetBlocksForHashes(blockMapping)
|
||||||
|
blockChain.SetBlockForNumber(testBlock1, testBlock1.NumberU64())
|
||||||
|
blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1)
|
||||||
|
|
||||||
|
service := statediff.NewService(&blockChain, statediff.Config{}, &mocks.Backend{}, &indexer)
|
||||||
|
service.Builder = &builder
|
||||||
|
|
||||||
|
// delay to avoid subscription request being sent after statediff is written,
|
||||||
|
// and timeout to prevent hanging just in case it still happens
|
||||||
|
writeDelay := 100 * time.Millisecond
|
||||||
|
jobTimeout := time.Second
|
||||||
|
ws, err := subscribeWrites(context.Background(), service)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer ws.close()
|
||||||
|
time.Sleep(writeDelay)
|
||||||
|
job := service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams)
|
||||||
|
ok, err := awaitJob(ws, job, jobTimeout)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
require.Equal(t, defaultParams, builder.Params)
|
||||||
|
require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash)
|
||||||
|
require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot)
|
||||||
|
require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot)
|
||||||
|
}
|
||||||
|
|
||||||
func TestWaitForSync(t *testing.T) {
|
func TestWaitForSync(t *testing.T) {
|
||||||
testWaitForSync(t)
|
testWaitForSync(t)
|
||||||
testGetSyncStatus(t)
|
testGetSyncStatus(t)
|
||||||
|
@ -28,7 +28,6 @@ var _ statediff.Builder = &Builder{}
|
|||||||
type Builder struct {
|
type Builder struct {
|
||||||
Args statediff.Args
|
Args statediff.Args
|
||||||
Params statediff.Params
|
Params statediff.Params
|
||||||
StateRoots sdtypes.StateRoots
|
|
||||||
stateDiff sdtypes.StateObject
|
stateDiff sdtypes.StateObject
|
||||||
block *types.Block
|
block *types.Block
|
||||||
stateTrie sdtypes.StateObject
|
stateTrie sdtypes.StateObject
|
||||||
@ -45,7 +44,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
|
|||||||
|
|
||||||
// BuildStateDiffObject mock method
|
// BuildStateDiffObject mock method
|
||||||
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
|
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
|
||||||
builder.StateRoots = sdtypes.StateRoots{OldStateRoot: args.OldStateRoot, NewStateRoot: args.NewStateRoot}
|
builder.Args = args
|
||||||
builder.Params = params
|
builder.Params = params
|
||||||
|
|
||||||
return builder.builderError
|
return builder.builderError
|
||||||
|
@ -27,19 +27,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||||
|
var _ interfaces.Batch = &batch{}
|
||||||
|
|
||||||
// StateDiffIndexer is a mock state diff indexer
|
// StateDiffIndexer is a mock state diff indexer
|
||||||
type StateDiffIndexer struct{}
|
type StateDiffIndexer struct{}
|
||||||
|
|
||||||
|
type batch struct{}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||||
return nil, nil
|
return &batch{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
|
func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, iplds sdtypes.IPLD) error {
|
func (sdi *StateDiffIndexer) PushIPLD(txi interfaces.Batch, ipld sdtypes.IPLD) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,3 +71,7 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
|
|||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tx *batch) Submit(err error) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user