service is viable

+ fix subscriptions
+ error and test cleanup
+ cleanup, refactor
+ consistently log block number as "number"
+ MODE=statediff no longer needed
This commit is contained in:
Roy Crihfield 2023-06-16 17:53:15 +08:00
parent ccb517bf61
commit 5c59e09423
25 changed files with 513 additions and 772 deletions

74
adapt/state.go Normal file
View File

@ -0,0 +1,74 @@
package adapt
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
plugeth "github.com/openrelayxyz/plugeth-utils/core"
)
// StateView exposes a minimal interface for state access for diff building
type StateView interface {
OpenTrie(root common.Hash) (StateTrie, error)
ContractCode(codeHash common.Hash) ([]byte, error)
}
// StateTrie is an interface exposing only the necessary methods from state.Trie
type StateTrie interface {
GetKey([]byte) []byte
NodeIterator([]byte) trie.NodeIterator
}
// adapts a state.Database to StateView - used in tests
type stateDatabaseView struct {
db state.Database
}
var _ StateView = stateDatabaseView{}
func GethStateView(db state.Database) StateView {
return stateDatabaseView{db}
}
func (a stateDatabaseView) OpenTrie(root common.Hash) (StateTrie, error) {
return a.db.OpenTrie(common.Hash(root))
}
func (a stateDatabaseView) ContractCode(hash common.Hash) ([]byte, error) {
return a.db.ContractCode(common.Hash{}, hash)
}
// adapts geth Trie to plugeth
type adaptTrie struct {
plugeth.Trie
}
func NewStateTrie(t plugeth.Trie) StateTrie { return adaptTrie{t} }
func (a adaptTrie) NodeIterator(start []byte) trie.NodeIterator {
return NodeIterator(a.Trie.NodeIterator(start))
}
func NodeIterator(it plugeth.NodeIterator) trie.NodeIterator {
return adaptIter{it}
}
type adaptIter struct {
plugeth.NodeIterator
}
func (it adaptIter) Hash() common.Hash {
return common.Hash(it.NodeIterator.Hash())
}
func (it adaptIter) Parent() common.Hash {
return common.Hash(it.NodeIterator.Parent())
}
func (it adaptIter) AddResolver(resolver trie.NodeResolver) {
r := func(owner plugeth.Hash, path []byte, hash plugeth.Hash) []byte {
return resolver(common.Hash(owner), path, common.Hash(hash))
}
it.NodeIterator.AddResolver(r)
}

View File

@ -4,7 +4,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
plugeth "github.com/openrelayxyz/plugeth-utils/core" plugeth "github.com/openrelayxyz/plugeth-utils/core"
plugeth_params "github.com/openrelayxyz/plugeth-utils/restricted/params" plugeth_params "github.com/openrelayxyz/plugeth-utils/restricted/params"
@ -37,26 +36,3 @@ func ChainConfig(cc *plugeth_params.ChainConfig) *params.ChainConfig {
LondonBlock: cc.LondonBlock, LondonBlock: cc.LondonBlock,
} }
} }
func NodeIterator(it plugeth.NodeIterator) trie.NodeIterator {
return nodeiter{it}
}
type nodeiter struct {
plugeth.NodeIterator
}
func (it nodeiter) Hash() common.Hash {
return common.Hash(it.NodeIterator.Hash())
}
func (it nodeiter) Parent() common.Hash {
return common.Hash(it.NodeIterator.Parent())
}
func (it nodeiter) AddResolver(resolver trie.NodeResolver) {
r := func(owner plugeth.Hash, path []byte, hash plugeth.Hash) []byte {
return resolver(common.Hash(owner), path, common.Hash(hash))
}
it.NodeIterator.AddResolver(r)
}

16
api.go
View File

@ -36,11 +36,11 @@ const APIVersion = "0.0.1"
// that can be used to stream out state diffs as they // that can be used to stream out state diffs as they
// are produced by a full node // are produced by a full node
type PublicAPI struct { type PublicAPI struct {
sds IService sds *Service
} }
// NewPublicStateDiffAPI creates an rpc subscription interface for the underlying statediff service // NewPublicStateDiffAPI creates an rpc subscription interface for the underlying statediff service
func NewPublicAPI(sds IService) *PublicAPI { func NewPublicAPI(sds *Service) *PublicAPI {
return &PublicAPI{ return &PublicAPI{
sds: sds, sds: sds,
} }
@ -59,7 +59,7 @@ func (api *PublicAPI) Stream(ctx context.Context, params Params) (<-chan Payload
defer close(payloadChan) defer close(payloadChan)
defer func() { defer func() {
if err := api.sds.Unsubscribe(id); err != nil { if err := api.sds.Unsubscribe(id); err != nil {
log.Error("Failed to unsubscribe from statediff service", "err", err) log.Error("Failed to unsubscribe from statediff service", "error", err)
} }
}() }()
@ -144,16 +144,12 @@ func (api *PublicAPI) StreamWrites(ctx context.Context) (<-chan JobStatus, error
// subscribe to events from the statediff service // subscribe to events from the statediff service
statusChan := make(chan JobStatus, chainEventChanSize) statusChan := make(chan JobStatus, chainEventChanSize)
clientChan := make(chan JobStatus, chainEventChanSize) clientChan := make(chan JobStatus, chainEventChanSize)
quitChan := make(chan bool, 1) id := api.sds.SubscribeWriteStatus(statusChan)
id := api.sds.SubscribeWriteStatus(statusChan, quitChan)
go func() { go func() {
defer func() { defer func() {
close(statusChan) close(statusChan)
close(clientChan) close(clientChan)
if err := api.sds.UnsubscribeWriteStatus(id); err != nil {
log.Error("Failed to unsubscribe from job status stream", "error", err)
}
}() }()
for { for {
@ -161,9 +157,7 @@ func (api *PublicAPI) StreamWrites(ctx context.Context) (<-chan JobStatus, error
case status := <-statusChan: case status := <-statusChan:
clientChan <- status clientChan <- status
case <-ctx.Done(): case <-ctx.Done():
log.Error("Error from context", "error", ctx.Err()) api.sds.UnsubscribeWriteStatus(id)
return
case <-quitChan:
return return
} }
} }

View File

@ -3,20 +3,24 @@ package statediff
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"math/big" "math/big"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
plugeth "github.com/openrelayxyz/plugeth-utils/core" plugeth "github.com/openrelayxyz/plugeth-utils/core"
// "github.com/openrelayxyz/plugeth-utils/restricted/types" "github.com/openrelayxyz/plugeth-utils/restricted"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/utils" "github.com/cerc-io/plugeth-statediff/utils"
) )
type blockChain interface { type BlockChain interface {
// SubscribeChainEvent(ch chan<- plugeth.ChainEvent) plugeth.Subscription // SubscribeChainEvent(ch chan<- plugeth.ChainEvent) plugeth.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
CurrentBlock() *types.Header CurrentBlock() *types.Header
@ -26,22 +30,23 @@ type blockChain interface {
GetTd(hash common.Hash, number uint64) *big.Int GetTd(hash common.Hash, number uint64) *big.Int
// TODO LockTrie is never used // TODO LockTrie is never used
// UnlockTrie(root core.Hash) // UnlockTrie(root core.Hash)
StateCache() StateView StateCache() adapt.StateView
} }
// Adapts the plugeth Backend to the blockChain interface // Adapts the plugeth Backend to the blockChain interface
type backendBlockChain struct { type backendBlockChain struct {
plugeth.Backend restricted.Backend
ctx context.Context ctx context.Context
// middleware?
// code mappings exposed in StateCache, safe for concurrent access
code map[common.Hash][]byte
codeMtx sync.RWMutex
} }
type backendStateView struct { var _ BlockChain = (*backendBlockChain)(nil)
plugeth.Backend
}
func asBlockChain(backend plugeth.Backend) blockChain { func NewPluginBlockChain(backend restricted.Backend) BlockChain {
return backendBlockChain{ return &backendBlockChain{
Backend: backend, Backend: backend,
ctx: context.Background(), ctx: context.Background(),
} }
@ -64,12 +69,12 @@ func (b backendBlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.
return sub return sub
} }
func (b backendBlockChain) CurrentBlock() *types.Header { func (b *backendBlockChain) CurrentBlock() *types.Header {
buf := b.Backend.CurrentBlock() buf := b.Backend.CurrentBlock()
return utils.MustDecode[types.Header](buf) return utils.MustDecode[types.Header](buf)
} }
func (b backendBlockChain) GetBlockByHash(hash common.Hash) *types.Block { func (b *backendBlockChain) GetBlockByHash(hash common.Hash) *types.Block {
buf, err := b.Backend.BlockByHash(b.ctx, plugeth.Hash(hash)) buf, err := b.Backend.BlockByHash(b.ctx, plugeth.Hash(hash))
if err != nil { if err != nil {
panic(err) panic(err)
@ -77,7 +82,7 @@ func (b backendBlockChain) GetBlockByHash(hash common.Hash) *types.Block {
return utils.MustDecode[types.Block](buf) return utils.MustDecode[types.Block](buf)
} }
func (b backendBlockChain) GetBlockByNumber(number uint64) *types.Block { func (b *backendBlockChain) GetBlockByNumber(number uint64) *types.Block {
buf, err := b.Backend.BlockByNumber(b.ctx, int64(number)) buf, err := b.Backend.BlockByNumber(b.ctx, int64(number))
if err != nil { if err != nil {
panic(err) panic(err)
@ -85,7 +90,7 @@ func (b backendBlockChain) GetBlockByNumber(number uint64) *types.Block {
return utils.MustDecode[types.Block](buf) return utils.MustDecode[types.Block](buf)
} }
func (b backendBlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { func (b *backendBlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
buf, err := b.Backend.GetReceipts(b.ctx, plugeth.Hash(hash)) buf, err := b.Backend.GetReceipts(b.ctx, plugeth.Hash(hash))
if err != nil { if err != nil {
panic(err) panic(err)
@ -98,11 +103,49 @@ func (b backendBlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return receipts return receipts
} }
func (b backendBlockChain) GetTd(hash common.Hash, number uint64) *big.Int { func (b *backendBlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
return b.Backend.GetTd(b.ctx, plugeth.Hash(hash)) return b.Backend.GetTd(b.ctx, plugeth.Hash(hash))
} }
func (b backendBlockChain) StateCache() StateView { func (b *backendBlockChain) StateCache() adapt.StateView {
// TODO return &pluginStateView{backend: b}
return nil
} }
func (b *backendBlockChain) ChainConfig() *params.ChainConfig {
return adapt.ChainConfig(b.Backend.ChainConfig())
}
func (b *backendBlockChain) getCode(hash common.Hash) ([]byte, error) {
b.codeMtx.RLock()
defer b.codeMtx.RUnlock()
code, has := b.code[hash]
if !has {
return nil, errors.New("code not found")
}
return copybytes(code), nil
}
func (b *backendBlockChain) setCode(hash common.Hash, code []byte) {
b.codeMtx.Lock()
defer b.codeMtx.Unlock()
b.code[hash] = code
}
// exposes a StateView from a combination of plugeth's core Backend and cached contract code
type pluginStateView struct {
backend *backendBlockChain
}
func (p *pluginStateView) OpenTrie(root common.Hash) (adapt.StateTrie, error) {
t, err := p.backend.GetTrie(plugeth.Hash(root))
if err != nil {
return nil, err
}
return adapt.NewStateTrie(t), nil
}
func (p *pluginStateView) ContractCode(hash common.Hash) ([]byte, error) {
return p.backend.getCode(hash)
}
func copybytes(b []byte) []byte { return []byte(string(b)) }

View File

@ -31,7 +31,7 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
// "github.com/cerc-io/plugeth-statediff/adapt" "github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
@ -45,9 +45,7 @@ var (
emptyNode, _ = rlp.EncodeToBytes(&[]byte{}) emptyNode, _ = rlp.EncodeToBytes(&[]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode) emptyContractRoot = crypto.Keccak256Hash(emptyNode)
nullCodeHash = crypto.Keccak256Hash([]byte{}).Bytes() nullCodeHash = crypto.Keccak256Hash([]byte{}).Bytes()
zeroHash common.Hash
zeroHashBytes = utils.Hex2Bytes("0000000000000000000000000000000000000000000000000000000000000000")
// zeroHash = core.HexToHash("0000000000000000000000000000000000000000000000000000000000000000")
) )
// Builder interface exposes the method for building a state diff between two blocks // Builder interface exposes the method for building a state diff between two blocks
@ -57,8 +55,7 @@ type Builder interface {
} }
type StateDiffBuilder struct { type StateDiffBuilder struct {
// StateCache state.Database StateCache adapt.StateView
StateCache StateView
} }
type IterPair struct { type IterPair struct {
@ -85,7 +82,7 @@ func IPLDMappingAppender(iplds *[]sdtypes.IPLD) sdtypes.IPLDSink {
} }
// NewBuilder is used to create a statediff builder // NewBuilder is used to create a statediff builder
func NewBuilder(stateCache StateView) Builder { func NewBuilder(stateCache adapt.StateView) Builder {
return &StateDiffBuilder{ return &StateDiffBuilder{
StateCache: stateCache, // state cache is safe for concurrent reads StateCache: stateCache, // state cache is safe for concurrent reads
} }
@ -228,8 +225,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
// trie nodes will be written to blockstore only // trie nodes will be written to blockstore only
// reminder that this includes leaf nodes, since the geth iterator.Leaf() actually // reminder that this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node // signifies a "value" node
// TODO: use Hash type if it.Hash() == zeroHash {
if bytes.Equal(it.Hash().Bytes(), zeroHashBytes) {
continue continue
} }
nodeVal := make([]byte, len(it.NodeBlob())) nodeVal := make([]byte, len(it.NodeBlob()))
@ -264,25 +260,23 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
return diffAccountsAtB, it.Error() return diffAccountsAtB, it.Error()
} }
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something // decodes account at leaf and encodes RLP data to CID
// that actually exists in an MMPT // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
encodedPath := utils.HexToCompact(it.Path())
leafKey := encodedPath[1:]
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return nil, fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", leafKey, err) return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err)
} }
return &sdtypes.AccountWrapper{ return &sdtypes.AccountWrapper{
LeafKey: leafKey, LeafKey: it.LeafKey(),
Account: &account, Account: &account,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(parentBlob)).String(), CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(parentBlob)).String(),
}, nil }, nil
} }
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B // deletedOrUpdatedState returns a slice of all the paths that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B // and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB sdtypes.AccountMap, func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB sdtypes.AccountMap,
watchedAddressesLeafPaths [][]byte, output sdtypes.StateNodeSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) { watchedAddressesLeafPaths [][]byte, output sdtypes.StateNodeSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) {
@ -423,7 +417,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sd
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil return nil
} }
log.Debug("Storage Root For Eventual Diff", "root", sr.String()) log.Debug("Storage root for eventual diff", "root", sr.String())
sTrie, err := sdb.StateCache.OpenTrie(sr) sTrie, err := sdb.StateCache.OpenTrie(sr)
if err != nil { if err != nil {
log.Info("error in build storage diff eventual", "error", err) log.Info("error in build storage diff eventual", "error", err)
@ -446,20 +440,15 @@ func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, out
var prevBlob []byte var prevBlob []byte
for it.Next(true) { for it.Next(true) {
if it.Leaf() { if it.Leaf() {
storageLeafNode, err := sdb.processStorageValueNode(it, prevBlob) storageLeafNode := sdb.processStorageValueNode(it, prevBlob)
if err != nil {
return err
}
if err := output(storageLeafNode); err != nil { if err := output(storageLeafNode); err != nil {
return err return err
} }
} else { } else {
nodeVal := make([]byte, len(it.NodeBlob())) nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob()) copy(nodeVal, it.NodeBlob())
nodeHash := make([]byte, len(it.Hash().Bytes()))
copy(nodeHash, it.Hash().Bytes())
if err := ipldOutput(sdtypes.IPLD{ if err := ipldOutput(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, nodeHash).String(), CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal, Content: nodeVal,
}); err != nil { }); err != nil {
return err return err
@ -470,26 +459,20 @@ func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, out
return it.Error() return it.Error()
} }
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something // decodes account at leaf and encodes RLP data to CID
// that actually exists in an MMPT // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator, parentBlob []byte) (sdtypes.StorageLeafNode, error) { // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
leafKey := make([]byte, len(it.LeafKey())) leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey()) copy(leafKey, it.LeafKey())
value := make([]byte, len(it.LeafBlob())) value := make([]byte, len(it.LeafBlob()))
copy(value, it.LeafBlob()) copy(value, it.LeafBlob())
// // since this is a "value node", we need to move up to the "parent" node which is the actual leaf node
// // it should be in the fastcache since it necessarily was recently accessed to reach the current node
// parentNodeRLP, err := sdb.StateCache.TrieDB().Node(it.Parent())
// if err != nil {
// return sdtypes.StorageLeafNode{}, err
// }
return sdtypes.StorageLeafNode{ return sdtypes.StorageLeafNode{
LeafKey: leafKey, LeafKey: leafKey,
Value: value, Value: value,
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentBlob)).String(), CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentBlob)).String(),
}, nil }
} }
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account // buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
@ -571,16 +554,13 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, ou
it, _ := trie.NewDifferenceIterator(a, b) it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
if it.Leaf() { if it.Leaf() {
storageLeafNode, err := sdb.processStorageValueNode(it, prevBlob) storageLeafNode := sdb.processStorageValueNode(it, prevBlob)
if err != nil {
return nil, err
}
if err := output(storageLeafNode); err != nil { if err := output(storageLeafNode); err != nil {
return nil, err return nil, err
} }
diffSlotsAtB[hex.EncodeToString(storageLeafNode.LeafKey)] = true diffSlotsAtB[hex.EncodeToString(storageLeafNode.LeafKey)] = true
} else { } else {
if bytes.Equal(it.Hash().Bytes(), zeroHashBytes) { if it.Hash() == zeroHash {
continue continue
} }
nodeVal := make([]byte, len(it.NodeBlob())) nodeVal := make([]byte, len(it.NodeBlob()))

View File

@ -18,17 +18,16 @@ package statediff_test
import ( import (
"encoding/hex" "encoding/hex"
"fmt"
"math/big" "math/big"
"os"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/openrelayxyz/plugeth-utils/restricted/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/test_helpers" "github.com/cerc-io/plugeth-statediff/test_helpers"
@ -495,13 +494,6 @@ var (
}) })
) )
func init() {
if os.Getenv("MODE") != "statediff" {
fmt.Println("Skipping statediff test")
os.Exit(0)
}
}
func TestBuilder(t *testing.T) { func TestBuilder(t *testing.T) {
blocks, chain := test_helpers.MakeChain(3, test_helpers.Genesis, test_helpers.TestChainGen) blocks, chain := test_helpers.MakeChain(3, test_helpers.Genesis, test_helpers.TestChainGen)
contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr) contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr)
@ -804,7 +796,7 @@ func TestBuilder(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())),
tests, params, test_helpers.CheckedRoots{ tests, params, test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode, block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode, block1: block1BranchRootNode,
@ -1018,7 +1010,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode, block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode, block1: block1BranchRootNode,
block2: block2BranchRootNode, block2: block2BranchRootNode,
@ -1268,7 +1260,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
@ -1402,7 +1394,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
@ -1605,7 +1597,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
@ -1832,7 +1824,7 @@ func TestBuilderWithMovedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
block1: block01BranchRootNode, block1: block01BranchRootNode,
block2: bankAccountAtBlock02LeafNode, block2: bankAccountAtBlock02LeafNode,
}) })
@ -2358,7 +2350,7 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
block1: block1bBranchRootNode, block1: block1bBranchRootNode,
block2: block2bBranchRootNode, block2: block2bBranchRootNode,
block3: block3bBranchRootNode, block3: block3bBranchRootNode,
@ -2561,7 +2553,7 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
block1: block1bBranchRootNode, block1: block1bBranchRootNode,
block2: block2bBranchRootNode, block2: block2bBranchRootNode,
block3: block3bBranchRootNode, block3: block3bBranchRootNode,

View File

@ -44,10 +44,6 @@ var (
) )
func init() { func init() {
if os.Getenv("MODE") != "statediff" {
fmt.Println("Skipping statediff test")
os.Exit(0)
}
if os.Getenv("STATEDIFF_DB") != "file" { if os.Getenv("STATEDIFF_DB") != "file" {
fmt.Println("Skipping statediff .sql file writing mode test") fmt.Println("Skipping statediff .sql file writing mode test")
os.Exit(0) os.Exit(0)

View File

@ -17,7 +17,6 @@
package metrics package metrics
import ( import (
"fmt"
"strings" "strings"
"time" "time"
@ -253,7 +252,7 @@ func (met *dbMetricsHandles) Update(stats DbStats) {
func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) { func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
since := UpdateDuration(start, timer) since := UpdateDuration(start, timer)
logger.Trace(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) logger.Trace(msg, "duration", since.Milliseconds())
} }
func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration { func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {

View File

@ -86,7 +86,7 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
case *copyFrom: case *copyFrom:
_, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) _, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows)
if err != nil { if err != nil {
log.Error("COPY error", "table", item.tableName, "err", err) log.Error("COPY error", "table", item.tableName, "error", err)
return err return err
} }
case cachedStmt: case cachedStmt:

View File

@ -18,9 +18,7 @@ package mainnet_tests
import ( import (
"context" "context"
"fmt"
"math/big" "math/big"
"os"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -41,13 +39,6 @@ var (
chainConf = params.MainnetChainConfig chainConf = params.MainnetChainConfig
) )
func init() {
if os.Getenv("MODE") != "statediff" {
fmt.Println("Skipping statediff test")
os.Exit(0)
}
}
func TestMainnetIndexer(t *testing.T) { func TestMainnetIndexer(t *testing.T) {
conf := test_helpers.GetTestConfig() conf := test_helpers.GetTestConfig()

View File

@ -1,33 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres_test
import (
"fmt"
"os"
// "github.com/cerc-io/plugeth-statediff/utils/log"
)
func init() {
if os.Getenv("MODE") != "statediff" {
fmt.Println("Skipping statediff test")
os.Exit(0)
}
// TODO
// log.Root().SetHandler(log.DiscardHandler())
}

View File

@ -17,7 +17,6 @@
package interfaces package interfaces
import ( import (
"io"
"math/big" "math/big"
"time" "time"
@ -41,7 +40,7 @@ type StateDiffIndexer interface {
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error
ClearWatchedAddresses() error ClearWatchedAddresses() error
io.Closer Close() error
} }
// Batch required for indexing data atomically // Batch required for indexing data atomically

View File

@ -18,8 +18,6 @@ package test
import ( import (
"bytes" "bytes"
"fmt"
"os"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -52,11 +50,6 @@ var (
) )
func init() { func init() {
if os.Getenv("MODE") != "statediff" {
fmt.Println("Skipping statediff test")
os.Exit(0)
}
// canonical block at LondonBlock height // canonical block at LondonBlock height
mockBlock = mocks.MockBlock mockBlock = mocks.MockBlock
txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts

View File

@ -151,9 +151,6 @@ func initConfig() {
config = statediff.Config{} config = statediff.Config{}
return return
} }
if !config.EnableWriteLoop {
return
}
if config.ID == "" { if config.ID == "" {
utils.Fatalf("Must specify node ID for statediff DB output") utils.Fatalf("Must specify node ID for statediff DB output")

View File

@ -7,7 +7,11 @@ import (
"github.com/openrelayxyz/plugeth-utils/core" "github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted" "github.com/openrelayxyz/plugeth-utils/restricted"
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
ind "github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/cerc-io/plugeth-statediff/utils/log" "github.com/cerc-io/plugeth-statediff/utils/log"
) )
@ -15,14 +19,23 @@ var (
pluginLoader core.PluginLoader pluginLoader core.PluginLoader
gethContext core.Context gethContext core.Context
service *statediff.Service service *statediff.Service
blockchain statediff.BlockChain
) )
func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) { func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) {
log.DefaultLogger = logger log.SetDefaultLogger(logger)
// lvl, err := strconv.ParseInt(ctx.String("verbosity"), 10, 8)
// if err != nil {
// log.Error("cannot parse verbosity", "error", err)
// }
// log.TestLogger.SetLevel(int(lvl))
// log.SetDefaultLogger(log.TestLogger)
pluginLoader = pl pluginLoader = pl
gethContext = ctx gethContext = ctx
log.Info("Initialized statediff plugin") log.Debug("Initialized statediff plugin")
} }
func InitializeNode(stack core.Node, b core.Backend) { func InitializeNode(stack core.Node, b core.Backend) {
@ -34,17 +47,32 @@ func InitializeNode(stack core.Node, b core.Backend) {
return return
} }
serviceConfig := GetConfig() serviceConfig := GetConfig()
service, err = statediff.NewIndexingService(serviceConfig, backend, networkid) chain := statediff.NewPluginBlockChain(backend)
var indexer interfaces.StateDiffIndexer
if serviceConfig.IndexerConfig != nil {
info := node.Info{
GenesisBlock: chain.GetBlockByNumber(0).Hash().String(),
NetworkID: strconv.FormatUint(networkid, 10),
ChainID: backend.ChainConfig().ChainID.Uint64(),
ID: serviceConfig.ID,
ClientName: serviceConfig.ClientName,
}
var err error
_, indexer, err = ind.NewStateDiffIndexer(serviceConfig.Context,
adapt.ChainConfig(backend.ChainConfig()), info, serviceConfig.IndexerConfig)
if err != nil {
log.Error("failed to construct indexer", "error", err)
}
}
service, err := statediff.NewService(serviceConfig, chain, backend, indexer)
if err != nil { if err != nil {
log.Error("failed to construct service", "error", err) log.Error("failed to construct service", "error", err)
return
} }
if err = service.Start(); err != nil { if err = service.Start(); err != nil {
log.Error("failed to start service", "error", err) log.Error("failed to start service", "error", err)
return return
} }
log.Info("Initialized node and backend")
} }
func GetAPIs(stack core.Node, backend core.Backend) []core.API { func GetAPIs(stack core.Node, backend core.Backend) []core.API {
@ -71,5 +99,5 @@ func StateUpdate(blockRoot core.Hash,
return return
} }
// TODO extract code // blockchain.
} }

View File

@ -18,7 +18,6 @@ package statediff_test
import ( import (
"bytes" "bytes"
"fmt"
"io" "io"
"log" "log"
"math/big" "math/big"
@ -35,7 +34,8 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/test_helpers" "github.com/cerc-io/plugeth-statediff/test_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
@ -420,10 +420,6 @@ var (
) )
func init() { func init() {
if os.Getenv("MODE") != "statediff" {
fmt.Println("Skipping statediff test")
os.Exit(0)
}
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()
genesisBlock = core.DefaultGenesisBlock().MustCommit(db) genesisBlock = core.DefaultGenesisBlock().MustCommit(db)
genBy, err := rlp.EncodeToBytes(genesisBlock) genBy, err := rlp.EncodeToBytes(genesisBlock)
@ -659,11 +655,10 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
} }
test_helpers.RunBuilderTests(t, test_helpers.RunBuilderTests(t,
statediff.NewBuilder(statediff.StateDatabaseView(chain.StateCache())), statediff.NewBuilder(adapt.GethStateView(chain.StateCache())),
tests, params, test_helpers.CheckedRoots{ tests, params, test_helpers.CheckedRoots{
block1: block1RootBranchNode, block1: block1RootBranchNode,
block2: block2RootBranchNode, block2: block2RootBranchNode,
block3: block3RootBranchNode, block3: block3RootBranchNode,
}) })
} }

View File

@ -18,9 +18,9 @@ package statediff
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"math/big" "math/big"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -31,18 +31,12 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
plugeth "github.com/openrelayxyz/plugeth-utils/core" plugeth "github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted"
"github.com/thoas/go-funk" "github.com/thoas/go-funk"
"github.com/cerc-io/plugeth-statediff/adapt"
ind "github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/node"
types2 "github.com/cerc-io/plugeth-statediff/types" types2 "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log" "github.com/cerc-io/plugeth-statediff/utils/log"
) )
@ -51,57 +45,19 @@ const (
chainEventChanSize = 20000 chainEventChanSize = 20000
genesisBlockNumber = 0 genesisBlockNumber = 0
defaultRetryLimit = 3 // default retry limit once deadlock is detected. defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html pgDeadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
typeAssertionFailed = "type assertion failed"
unexpectedOperation = "unexpected operation"
) )
var writeLoopParams = ParamsWithMutex{ var (
Params: Params{ errTypeAssertionFailed = errors.New("type assertion failed")
errUnexpectedOperation = errors.New("unexpected operation")
)
var defaultWriteLoopParams = Params{
IncludeBlock: true, IncludeBlock: true,
IncludeReceipts: true, IncludeReceipts: true,
IncludeTD: true, IncludeTD: true,
IncludeCode: true, IncludeCode: true,
},
}
type RpcID int
// IService is the state-diffing service interface
type IService interface {
// Lifecycle methods
Start() error
Stop() error
// APIs method for getting API(s) for this service
// APIs() []rpc.API
// Loop is the main event loop for processing state diffs
Loop(chainEventCh chan core.ChainEvent)
// WriteLoop event loop for progressively processing and writing diffs directly to DB
WriteLoop(chainEventCh chan core.ChainEvent)
// Subscribe method to subscribe to receive state diff processing output
Subscribe(sub chan<- Payload, quitChan chan<- bool, params Params) RpcID
// Unsubscribe method to unsubscribe from state diff processing
Unsubscribe(id RpcID) error
// StateDiffAt method to get state diff object at specific block
StateDiffAt(blockNumber uint64, params Params) (*Payload, error)
// StateDiffFor method to get state diff object at specific block
StateDiffFor(blockHash common.Hash, params Params) (*Payload, error)
// WriteStateDiffAt method to write state diff object directly to DB
WriteStateDiffAt(blockNumber uint64, params Params) JobID
// WriteStateDiffFor method to write state diff object directly to DB
WriteStateDiffFor(blockHash common.Hash, params Params) error
// WatchAddress method to change the addresses being watched in write loop params
WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error
// StreamCodeAndCodeHash method to export all the codehash => code mappings at a block height
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool)
// SubscribeWriteStatus method to subscribe to receive state diff processing output
SubscribeWriteStatus(sub chan<- JobStatus, quitChan chan<- bool) RpcID
// UnsubscribeWriteStatus method to unsubscribe from state diff processing
UnsubscribeWriteStatus(id RpcID) error
} }
// Service is the underlying struct for the state diffing service // Service is the underlying struct for the state diffing service
@ -109,7 +65,7 @@ type Service struct {
// Used to build the state diff objects // Used to build the state diff objects
Builder Builder Builder Builder
// Used to subscribe to chain events (blocks) // Used to subscribe to chain events (blocks)
BlockChain blockChain BlockChain BlockChain
// Cache the last block so that we can avoid having to lookup the next block's parent // Cache the last block so that we can avoid having to lookup the next block's parent
BlockCache BlockCache BlockCache BlockCache
// The publicBackendAPI which provides useful information about the current state // The publicBackendAPI which provides useful information about the current state
@ -120,31 +76,31 @@ type Service struct {
indexer interfaces.StateDiffIndexer indexer interfaces.StateDiffIndexer
// Should the statediff service wait for geth to sync to head? // Should the statediff service wait for geth to sync to head?
WaitForSync bool ShouldWaitForSync bool
// Whether to enable writing state diffs directly to track blockchain head. // Whether to enable writing state diffs directly to track blockchain head.
enableWriteLoop bool enableWriteLoop bool
// Parameters to use in the service write loop, if enabled
writeLoopParams ParamsWithMutex
// Size of the worker pool // Size of the worker pool
numWorkers uint numWorkers uint
// Number of retry for aborted transactions due to deadlock. // Number of retry for aborted transactions due to deadlock.
maxRetry uint maxRetry uint
// Sequential ID for RPC subscriptions // Sequential ID for RPC subscriptions
lastRpcID uint64 lastSubID uint64
// PayloadSubs struct {
// A mapping of RpcIDs to their subscription channels, mapped to their subscription type (hash // A mapping of RpcIDs to their subscription channels, mapped to their subscription type (hash
// of the Params RLP) // of the Params RLP)
Subscriptions map[common.Hash]map[RpcID]Subscription Subscriptions map[common.Hash]map[SubID]Subscription
// A mapping of subscription params rlp hash to the corresponding subscription params // A mapping of subscription params rlp hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]Params SubscriptionTypes map[common.Hash]Params
// Number of current subscribers // Number of current subscribers
subscribers int32 subscribers int32
// Used to sync access to the Subscriptions // Used to sync access to the Subscriptions
subscriptionsMutex sync.Mutex subscriptionsMutex sync.Mutex
// }
// Write job status subscriptions // Write job status subscriptions
jobStatusSubs map[RpcID]jobStatusSubscription jobStatusSubs map[SubID]jobStatusSubscription
jobStatusSubsMutex sync.RWMutex jobStatusSubsMutex sync.RWMutex
// Sequential ID for write jobs // Sequential ID for write jobs
lastJobID uint64 lastJobID uint64
@ -153,7 +109,10 @@ type Service struct {
currentJobsMutex sync.Mutex currentJobsMutex sync.Mutex
} }
// IDs used for tracking in-progress jobs (0 for invalid) // ID for identifying client subscriptions
type SubID uint64
// ID used for tracking in-progress jobs (0 for invalid)
type JobID uint64 type JobID uint64
// JobStatus represents the status of a completed job // JobStatus represents the status of a completed job
@ -174,6 +133,12 @@ type BlockCache struct {
maxSize uint maxSize uint
} }
type workerParams struct {
chainEventCh <-chan core.ChainEvent
wg *sync.WaitGroup
id uint
}
func NewBlockCache(max uint) BlockCache { func NewBlockCache(max uint) BlockCache {
return BlockCache{ return BlockCache{
blocks: make(map[common.Hash]*types.Block), blocks: make(map[common.Hash]*types.Block),
@ -181,36 +146,8 @@ func NewBlockCache(max uint) BlockCache {
} }
} }
// NewIndexingService creates and registers a new Service with indexing configured // NewService creates a new state diffing service with the given config and backend
func NewIndexingService(params Config, backend restricted.Backend, networkid uint64) (*Service, error) { func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, indexer interfaces.StateDiffIndexer) (*Service, error) {
blockChain := asBlockChain(backend)
var indexer interfaces.StateDiffIndexer
if params.IndexerConfig != nil {
info := node.Info{
GenesisBlock: blockChain.GetBlockByNumber(0).Hash().String(),
NetworkID: strconv.FormatUint(networkid, 10),
ChainID: backend.ChainConfig().ChainID.Uint64(),
ID: params.ID,
ClientName: params.ClientName,
}
var err error
_, indexer, err = ind.NewStateDiffIndexer(params.Context, adapt.ChainConfig(backend.ChainConfig()), info, params.IndexerConfig)
if err != nil {
return nil, err
}
err = loadWatchedAddresses(indexer)
if err != nil {
return nil, err
}
}
sds := NewService(blockChain, params, backend, indexer)
// stack.RegisterLifecycle(sds)
// stack.RegisterAPIs(sds.APIs())
return sds, nil
}
func NewService(blockChain blockChain, cfg Config, backend plugeth.Backend, indexer interfaces.StateDiffIndexer) *Service {
workers := cfg.NumWorkers workers := cfg.NumWorkers
if workers == 0 { if workers == 0 {
workers = 1 workers = 1
@ -221,40 +158,34 @@ func NewService(blockChain blockChain, cfg Config, backend plugeth.Backend, inde
BlockChain: blockChain, BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()), Builder: NewBuilder(blockChain.StateCache()),
QuitChan: quitCh, QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[RpcID]Subscription), Subscriptions: make(map[common.Hash]map[SubID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params), SubscriptionTypes: make(map[common.Hash]Params),
BlockCache: NewBlockCache(workers), BlockCache: NewBlockCache(workers),
BackendAPI: backend, BackendAPI: backend,
WaitForSync: cfg.WaitForSync, ShouldWaitForSync: cfg.WaitForSync,
indexer: indexer, indexer: indexer,
enableWriteLoop: cfg.EnableWriteLoop, enableWriteLoop: cfg.EnableWriteLoop,
numWorkers: workers, numWorkers: workers,
maxRetry: defaultRetryLimit, maxRetry: defaultRetryLimit,
jobStatusSubs: map[RpcID]jobStatusSubscription{}, jobStatusSubs: map[SubID]jobStatusSubscription{},
currentJobs: map[uint64]JobID{}, currentJobs: map[uint64]JobID{},
} }
if indexer != nil { if indexer != nil {
err := loadWatchedAddresses(indexer, &sds.writeLoopParams)
if err != nil {
return nil, err
}
indexer.ReportDBMetrics(10*time.Second, quitCh) indexer.ReportDBMetrics(10*time.Second, quitCh)
} else {
sds.writeLoopParams.Params = defaultWriteLoopParams
} }
return sds return sds, nil
}
// APIs returns the RPC descriptors the statediff.Service offers
func (sds *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: APIName,
Version: APIVersion,
Service: NewPublicAPI(sds),
Public: true,
},
}
} }
// Return the parent block of currentBlock, using the cached block if available; // Return the parent block of currentBlock, using the cached block if available;
// and cache the passed block // and cache the passed block
func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block { func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc BlockChain) *types.Block {
lbc.Lock() lbc.Lock()
parentHash := currentBlock.ParentHash() parentHash := currentBlock.ParentHash()
var parentBlock *types.Block var parentBlock *types.Block
@ -271,47 +202,44 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain)
return parentBlock return parentBlock
} }
type workerParams struct { // WriteLoop event loop for progressively processing and writing diffs directly to DB
chainEventCh <-chan core.ChainEvent
wg *sync.WaitGroup
id uint
}
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
log.Info("Starting statediff write loop")
log := log.New("context", "statediff writing")
sub := sds.BlockChain.SubscribeChainEvent(chainEventCh) sub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer sub.Unsubscribe() defer sub.Unsubscribe()
var wg sync.WaitGroup var wg sync.WaitGroup
// Process metrics for chain events, then forward to workers
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize) chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
defer func() {
log.Info("Quitting")
close(chainEventFwd)
}()
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for { for {
select { select {
case chainEvent := <-chainEventCh: case event := <-chainEventCh:
log.Debug("Chain event received", "event", event)
// First process metrics for chain events, then forward to workers
lastHeight := defaultStatediffMetrics.lastEventHeight.Value() lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
block := chainEvent.Block block := event.Block
nextHeight := int64(block.Number().Uint64()) nextHeight := int64(block.Number().Uint64())
if nextHeight-lastHeight != 1 { if nextHeight-lastHeight != 1 {
log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight) log.Warn("Received block out-of-order", "next", nextHeight, "last", lastHeight)
} }
defaultStatediffMetrics.lastEventHeight.Update(nextHeight) defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent chainEventFwd <- event
case err := <-sub.Err(): case err := <-sub.Err():
log.Error("Error from chain event subscription", "error", err) if err != nil {
close(sds.QuitChan) log.Error("Error from subscription", "error", err)
log.Info("Quitting the statediffing writing loop")
if err := sds.indexer.Close(); err != nil {
log.Error("Error closing indexer", "err", err)
} }
close(sds.QuitChan)
return return
case <-sds.QuitChan: case <-sds.QuitChan:
log.Info("Quitting the statediffing writing loop")
if err := sds.indexer.Close(); err != nil {
log.Error("Error closing indexer", "err", err)
}
return return
} }
} }
@ -326,12 +254,13 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie. // For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId) log.Info("Writing genesis state diff", "number", genesisBlockNumber, "worker", workerId)
writeLoopParams.RLock() sds.writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.Params) defer sds.writeLoopParams.RUnlock()
writeLoopParams.RUnlock()
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, sds.writeLoopParams.Params)
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", log.Error("failed to write state diff", "number",
genesisBlockNumber, "error", err.Error(), "worker", workerId) genesisBlockNumber, "error", err.Error(), "worker", workerId)
return return
} }
@ -348,7 +277,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
block := chainEvent.Block block := chainEvent.Block
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain) parent := sds.BlockCache.getParentBlock(block, sds.BlockChain)
if parent == nil { if parent == nil {
log.Error("Parent block is nil, skipping this block", "block height", block.Number()) log.Error("Parent block is nil, skipping this block", "number", block.Number())
continue continue
} }
@ -357,20 +286,20 @@ func (sds *Service) writeLoopWorker(params workerParams) {
sds.writeGenesisStateDiff(parent, params.id) sds.writeGenesisStateDiff(parent, params.id)
} }
log.Info("Writing state diff", "block height", block.Number().Uint64(), "worker", params.id) log.Info("Writing state diff", "number", block.Number().Uint64(), "worker", params.id)
writeLoopParams.RLock() sds.writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(block, parent.Root(), writeLoopParams.Params) err := sds.writeStateDiffWithRetry(block, parent.Root(), sds.writeLoopParams.Params)
writeLoopParams.RUnlock() sds.writeLoopParams.RUnlock()
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", log.Error("failed to write state diff",
"block height", block.Number().Uint64(), "number", block.Number().Uint64(),
"block hash", block.Hash().String(), "hash", block.Hash().String(),
"error", err.Error(), "error", err.Error(),
"worker", params.id) "worker", params.id)
continue continue
} }
// TODO: how to handle with concurrent workers // FIXME: reported height will be non-monotonic with concurrent workers
defaultStatediffMetrics.lastStatediffHeight.Update(int64(block.Number().Uint64())) defaultStatediffMetrics.lastStatediffHeight.Update(int64(block.Number().Uint64()))
case <-sds.QuitChan: case <-sds.QuitChan:
log.Info("Quitting the statediff writing process", "worker", params.id) log.Info("Quitting the statediff writing process", "worker", params.id)
@ -379,28 +308,34 @@ func (sds *Service) writeLoopWorker(params workerParams) {
} }
} }
// Loop is the main processing method // PublishLoop processes and publishes statediff payloads to subscribed clients
func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { func (sds *Service) PublishLoop(chainEventCh chan core.ChainEvent) {
log.Info("Starting statediff listening loop") log.Info("Starting statediff publish loop")
log := log.New("context", "statediff publishing")
sub := sds.BlockChain.SubscribeChainEvent(chainEventCh) sub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer sub.Unsubscribe() defer func() {
log.Info("Quitting")
sds.close()
sub.Unsubscribe()
}()
for { for {
select { select {
//Notify chain event channel of events //Notify chain event channel of events
case event := <-chainEventCh: case event := <-chainEventCh:
defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
log.Debug("Loop(): chain event received", "event", event) log.Debug("Chain event received", "event", event)
// if we don't have any subscribers, do not process a statediff // if we don't have any subscribers, do not process a statediff
if atomic.LoadInt32(&sds.subscribers) == 0 { if atomic.LoadInt32(&sds.subscribers) == 0 {
log.Debug("Currently no subscribers to the statediffing service; processing is halted") log.Debug("Currently no subscribers; processing is halted")
continue continue
} }
block := event.Block block := event.Block
parent := sds.BlockCache.getParentBlock(block, sds.BlockChain) parent := sds.BlockCache.getParentBlock(block, sds.BlockChain)
if parent == nil { if parent == nil {
log.Error("Parent block is nil, skipping this block", "block height", block.Number()) log.Error("Parent block is nil, skipping this block", "number", block.Number())
continue continue
} }
@ -411,41 +346,40 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
} }
sds.streamStateDiff(block, parent.Root()) sds.streamStateDiff(block, parent.Root())
case err := <-sub.Err(): case err := <-sub.Err():
log.Error("Error from chain event subscription", "error", err) if err != nil {
log.Error("error from subscription", "error", err)
}
close(sds.QuitChan) close(sds.QuitChan)
log.Info("Quitting the statediffing listening loop")
sds.close()
return return
case <-sds.QuitChan: case <-sds.QuitChan:
log.Info("Quitting the statediffing listening loop")
sds.close()
return return
} }
} }
} }
// streamStateDiff method builds the state diff payload for each subscription according to their subscription type and sends them the result // streamStateDiff builds and delivers diff payloads for each subscription according to their
// subscription type
func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common.Hash) { func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common.Hash) {
sds.subscriptionsMutex.Lock() sds.subscriptionsMutex.Lock()
for ty, subs := range sds.Subscriptions { for ty, subs := range sds.Subscriptions {
params, ok := sds.SubscriptionTypes[ty] params, ok := sds.SubscriptionTypes[ty]
if !ok { if !ok {
log.Error("no parameter set associated with this subscription", "subscription type", ty.String()) log.Error("no parameter set associated with this subscription", "sub.type", ty.String())
sds.closeType(ty) sds.closeType(ty)
continue continue
} }
// create payload for this subscription type // create payload for this subscription type
payload, err := sds.processStateDiff(currentBlock, parentRoot, params) payload, err := sds.processStateDiff(currentBlock, parentRoot, params)
if err != nil { if err != nil {
log.Error("statediff processing error", "block height", currentBlock.Number().Uint64(), "parameters", params, "error", err.Error()) log.Error("statediff processing error", "number", currentBlock.Number().Uint64(), "parameters", params, "error", err.Error())
continue continue
} }
for id, sub := range subs { for id, sub := range subs {
select { select {
case sub.PayloadChan <- *payload: case sub.PayloadChan <- *payload:
log.Debug("sending statediff payload at head", "height", currentBlock.Number(), "subscription id", id) log.Debug("sending statediff payload at head", "height", currentBlock.Number(), "sub.id", id)
default: default:
log.Info("unable to send statediff payload; channel has no receiver", "subscription id", id) log.Info("unable to send statediff payload; channel has no receiver", "sub.id", id)
} }
} }
} }
@ -453,49 +387,44 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common
} }
// StateDiffAt returns a state diff object payload at the specific blockheight // StateDiffAt returns a state diff object payload at the specific blockheight
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data // This operation cannot be performed back past the point of db pruning; it requires an archival
// node for historical data
func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, error) { func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, error) {
log.Info("sending state diff", "number", blockNumber)
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber) parentRoot := common.Hash{}
if blockNumber != 0 {
// use watched addresses from statediffing write loop if not provided parentRoot = sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()).Root()
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
} }
// compute leaf paths of watched addresses in the params return sds.processStateDiff(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
params.ComputeWatchedAddressesLeafPaths()
if blockNumber == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params)
}
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
return sds.processStateDiff(currentBlock, parentBlock.Root(), params)
} }
// StateDiffFor returns a state diff object payload for the specific blockhash // StateDiffFor returns a state diff object payload for the specific blockhash
// This operation cannot be performed back past the point of db pruning; it requires an archival node for historical data // This operation cannot be performed back past the point of db pruning; it requires an archival
// node for historical data
func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload, error) { func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload, error) {
log.Info("sending state diff", "hash", blockHash)
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash) parentRoot := common.Hash{}
if currentBlock.NumberU64() != 0 {
// use watched addresses from statediffing write loop if not provided parentRoot = sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()).Root()
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil { }
writeLoopParams.RLock() return sds.processStateDiff(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses)) }
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock() // use watched addresses from statediffing write loop if not provided
// compute leaf paths of watched addresses in the params
func (sds *Service) maybeReplaceWatchedAddresses(params Params) Params {
if params.WatchedAddresses == nil && sds.writeLoopParams.WatchedAddresses != nil {
sds.writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(sds.writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, sds.writeLoopParams.WatchedAddresses)
sds.writeLoopParams.RUnlock()
} }
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths() params.ComputeWatchedAddressesLeafPaths()
return params
if currentBlock.NumberU64() == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params)
}
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
return sds.processStateDiff(currentBlock, parentBlock.Root(), params)
} }
// processStateDiff method builds the state diff payload from the current block, parent state root, and provided params // processStateDiff method builds the state diff payload from the current block, parent state root, and provided params
@ -515,7 +444,8 @@ func (sds *Service) processStateDiff(currentBlock *types.Block, parentRoot commo
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Info("state diff size", "at block height", currentBlock.Number().Uint64(), "rlp byte size", len(stateDiffRlp)) log.Debug("statediff RLP payload for block",
"number", currentBlock.Number().Uint64(), "byte size", len(stateDiffRlp))
return sds.newPayload(stateDiffRlp, currentBlock, params) return sds.newPayload(stateDiffRlp, currentBlock, params)
} }
@ -545,7 +475,7 @@ func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Pa
} }
// Subscribe is used by the API to subscribe to the service loop // Subscribe is used by the API to subscribe to the service loop
func (sds *Service) Subscribe(sub chan<- Payload, quitChan chan<- bool, params Params) RpcID { func (sds *Service) Subscribe(sub chan<- Payload, quitChan chan<- bool, params Params) SubID {
log.Info("Subscribing to the statediff service") log.Info("Subscribing to the statediff service")
if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) { if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) {
log.Info("State diffing subscription received; beginning statediff processing") log.Info("State diffing subscription received; beginning statediff processing")
@ -561,11 +491,11 @@ func (sds *Service) Subscribe(sub chan<- Payload, quitChan chan<- bool, params P
return 0 return 0
} }
subscriptionType := crypto.Keccak256Hash(by) subscriptionType := crypto.Keccak256Hash(by)
id := RpcID(atomic.AddUint64(&sds.lastRpcID, 1)) id := SubID(atomic.AddUint64(&sds.lastSubID, 1))
// Add subscriber // Add subscriber
sds.subscriptionsMutex.Lock() sds.subscriptionsMutex.Lock()
if sds.Subscriptions[subscriptionType] == nil { if sds.Subscriptions[subscriptionType] == nil {
sds.Subscriptions[subscriptionType] = make(map[RpcID]Subscription) sds.Subscriptions[subscriptionType] = make(map[SubID]Subscription)
} }
sds.Subscriptions[subscriptionType][id] = Subscription{ sds.Subscriptions[subscriptionType][id] = Subscription{
PayloadChan: sub, PayloadChan: sub,
@ -577,8 +507,8 @@ func (sds *Service) Subscribe(sub chan<- Payload, quitChan chan<- bool, params P
} }
// Unsubscribe is used to unsubscribe from the service loop // Unsubscribe is used to unsubscribe from the service loop
func (sds *Service) Unsubscribe(id RpcID) error { func (sds *Service) Unsubscribe(id SubID) error {
log.Info("Unsubscribing from the statediff service", "subscription id", id) log.Info("Unsubscribing from the statediff service", "sub.id", id)
sds.subscriptionsMutex.Lock() sds.subscriptionsMutex.Lock()
for ty := range sds.Subscriptions { for ty := range sds.Subscriptions {
delete(sds.Subscriptions[ty], id) delete(sds.Subscriptions[ty], id)
@ -597,55 +527,42 @@ func (sds *Service) Unsubscribe(id RpcID) error {
return nil return nil
} }
// GetSyncStatus will check the status of geth syncing. // IsSyncing returns true if geth is still syncing, and false if it has caught up to head.
// It will return false if geth has finished syncing. func (sds *Service) IsSyncing() bool {
// It will return a true Geth is still syncing.
func (sds *Service) GetSyncStatus() bool {
progress := sds.BackendAPI.Downloader().Progress() progress := sds.BackendAPI.Downloader().Progress()
return progress.CurrentBlock() < progress.HighestBlock() return progress.CurrentBlock() < progress.HighestBlock()
} }
// WaitingForSync calls GetSyncStatus to check if we have caught up to head. // WaitForSync continuously checks the status of geth syncing, only returning once it has caught
// It will keep looking and checking if we have caught up to head. // up to head.
// It will only complete if we catch up to head, otherwise it will keep looping forever. func (sds *Service) WaitForSync() {
func (sds *Service) WaitingForSync() error {
log.Info("We are going to wait for geth to sync to head!")
// Has the geth node synced to head?
synced := false synced := false
for !synced { for !synced {
syncing := sds.GetSyncStatus() if !sds.IsSyncing() {
if !syncing { log.Debug("Geth has completed syncing")
log.Info("Geth has caught up to the head of the chain")
synced = true synced = true
} else { } else {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
} }
return nil
} }
// Start is used to begin the service // Start is used to begin the service
func (sds *Service) Start() error { func (sds *Service) Start() error {
log.Info("Starting statediff service") log.Info("Starting statediff service")
if sds.WaitForSync { if sds.ShouldWaitForSync {
log.Info("Statediff service will wait until geth has caught up to the head of the chain.") log.Info("Statediff service waiting until geth has caught up to the head of the chain")
err := sds.WaitingForSync() sds.WaitForSync()
if err != nil {
return err
}
log.Info("Continuing with startdiff start process")
} }
chainEventCh := make(chan core.ChainEvent, chainEventChanSize) chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.Loop(chainEventCh) go sds.PublishLoop(chainEventCh)
if sds.enableWriteLoop { if sds.enableWriteLoop {
log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params) log.Debug("Starting statediff DB write loop", "params", sds.writeLoopParams.Params)
chainEventCh := make(chan core.ChainEvent, chainEventChanSize) chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.WriteLoop(chainEventCh) go sds.WriteLoop(chainEventCh)
} }
return nil return nil
} }
@ -653,7 +570,13 @@ func (sds *Service) Start() error {
func (sds *Service) Stop() error { func (sds *Service) Stop() error {
log.Info("Stopping statediff service") log.Info("Stopping statediff service")
close(sds.QuitChan) close(sds.QuitChan)
return nil var err error
if sds.indexer != nil {
if err = sds.indexer.Close(); err != nil {
log.Error("Error closing indexer", "error", err)
}
}
return err
} }
// close is used to close all listening subscriptions // close is used to close all listening subscriptions
@ -663,9 +586,9 @@ func (sds *Service) close() {
for id, sub := range subs { for id, sub := range subs {
select { select {
case sub.QuitChan <- true: case sub.QuitChan <- true:
log.Info("closing subscription", "id", id) log.Info("closing subscription", "sub.id", id)
default: default:
log.Info("unable to close subscription; channel has no receiver", "subscription id", id) log.Info("unable to close subscription; channel has no receiver", "sub.id", id)
} }
delete(sds.Subscriptions[ty], id) delete(sds.Subscriptions[ty], id)
} }
@ -676,7 +599,7 @@ func (sds *Service) close() {
} }
// closeType is used to close all subscriptions of given type // closeType is used to close all subscriptions of given type
// closeType needs to be called with subscription access locked // NOTE: this needs to be called with subscription access locked
func (sds *Service) closeType(subType common.Hash) { func (sds *Service) closeType(subType common.Hash) {
subs := sds.Subscriptions[subType] subs := sds.Subscriptions[subType]
for id, sub := range subs { for id, sub := range subs {
@ -686,12 +609,12 @@ func (sds *Service) closeType(subType common.Hash) {
delete(sds.SubscriptionTypes, subType) delete(sds.SubscriptionTypes, subType)
} }
func sendNonBlockingQuit(id RpcID, sub Subscription) { func sendNonBlockingQuit(id SubID, sub Subscription) {
select { select {
case sub.QuitChan <- true: case sub.QuitChan <- true:
log.Info("closing subscription", "id", id) log.Info("closing subscription", "sub.id", id)
default: default:
log.Info("unable to close subscription; channel has no receiver", "subscription id", id) log.Info("unable to close subscription; channel has no receiver", "sub.id", id)
} }
} }
@ -711,7 +634,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID {
go func() { go func() {
err := sds.writeStateDiffAt(blockNumber, params) err := sds.writeStateDiffAt(blockNumber, params)
if err != nil { if err != nil {
log.Error("error from writeStateDiffAt", "error", err) log.Error("failed to write state diff", "error", err)
} }
sds.currentJobsMutex.Lock() sds.currentJobsMutex.Lock()
delete(sds.currentJobs, blockNumber) delete(sds.currentJobs, blockNumber)
@ -727,17 +650,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID {
} }
func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error { func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error {
log.Info("writing state diff at", "block height", blockNumber) log.Info("Writing state diff at", "number", blockNumber)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
parentRoot := common.Hash{} parentRoot := common.Hash{}
@ -745,24 +658,14 @@ func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error {
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root() parentRoot = parentBlock.Root()
} }
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
} }
// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database // WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error { func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
log.Info("writing state diff for", "block hash", blockHash) log.Info("Writing state diff for", "hash", blockHash)
// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
parentRoot := common.Hash{} parentRoot := common.Hash{}
@ -770,7 +673,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash()) parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root() parentRoot = parentBlock.Root()
} }
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params) return sds.writeStateDiffWithRetry(currentBlock, parentRoot, sds.maybeReplaceWatchedAddresses(params))
} }
// Writes a state diff from the current block, parent state root, and provided params // Writes a state diff from the current block, parent state root, and provided params
@ -781,6 +684,10 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
var tx interfaces.Batch var tx interfaces.Batch
start, logger := countStateDiffBegin(block) start, logger := countStateDiffBegin(block)
defer countStateDiffEnd(start, logger, err) defer countStateDiffEnd(start, logger, err)
if sds.indexer == nil {
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
}
if params.IncludeTD { if params.IncludeTD {
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
} }
@ -796,7 +703,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
defer func() { defer func() {
// This is very noisy so we log at Trace. // This is very noisy so we log at Trace.
since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer) since := metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
logger.Trace(fmt.Sprintf("statediff output duration=%dms", since.Milliseconds())) logger.Trace("statediff output", "duration", since.Milliseconds())
}() }()
return sds.indexer.PushStateNode(tx, node, block.Hash().String()) return sds.indexer.PushStateNode(tx, node, block.Hash().String())
} }
@ -811,12 +718,14 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
BlockHash: block.Hash(), BlockHash: block.Hash(),
BlockNumber: block.Number(), BlockNumber: block.Number(),
}, params, output, ipldOutput) }, params, output, ipldOutput)
// TODO this anti-pattern needs to be sorted out eventually // TODO this anti-pattern needs to be sorted out eventually
if err := tx.Submit(err); err != nil { if err := tx.Submit(err); err != nil {
return fmt.Errorf("batch transaction submission failed: %w", err) return fmt.Errorf("batch transaction submission failed: %w", err)
} }
// allow dereferencing of parent, keep current locked as it should be the next parent // allow dereferencing of parent, keep current locked as it should be the next parent
// TODO never locked
// sds.BlockChain.UnlockTrie(parentRoot) // sds.BlockChain.UnlockTrie(parentRoot)
return nil return nil
} }
@ -826,10 +735,10 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
var err error var err error
for i := uint(0); i < sds.maxRetry; i++ { for i := uint(0); i < sds.maxRetry; i++ {
err = sds.writeStateDiff(block, parentRoot, params) err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) { if err != nil && strings.Contains(err.Error(), pgDeadlockDetected) {
// Retry only when the deadlock is detected. // Retry only when the deadlock is detected.
if i+1 < sds.maxRetry { if i+1 < sds.maxRetry {
log.Warn("dead lock detected while writing statediff", "err", err, "retry number", i) log.Warn("dead lock detected while writing statediff", "error", err, "retry number", i)
} }
continue continue
} }
@ -839,35 +748,32 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
} }
// SubscribeWriteStatus is used by the API to subscribe to the job status updates // SubscribeWriteStatus is used by the API to subscribe to the job status updates
func (sds *Service) SubscribeWriteStatus(sub chan<- JobStatus, quitChan chan<- bool) RpcID { func (sds *Service) SubscribeWriteStatus(sub chan<- JobStatus) SubID {
id := RpcID(atomic.AddUint64(&sds.lastRpcID, 1)) id := SubID(atomic.AddUint64(&sds.lastSubID, 1))
log.Info("Subscribing to job status updates", "subscription id", id) log.Info("Subscribing to job status updates", "sub.id", id)
sds.jobStatusSubsMutex.Lock() sds.jobStatusSubsMutex.Lock()
sds.jobStatusSubs[id] = jobStatusSubscription{ sds.jobStatusSubs[id] = jobStatusSubscription{
statusChan: sub, statusChan: sub,
quitChan: quitChan,
} }
sds.jobStatusSubsMutex.Unlock() sds.jobStatusSubsMutex.Unlock()
return id return id
} }
// UnsubscribeWriteStatus is used to unsubscribe from job status updates // UnsubscribeWriteStatus is used to unsubscribe from job status updates
func (sds *Service) UnsubscribeWriteStatus(id RpcID) error { func (sds *Service) UnsubscribeWriteStatus(id SubID) {
log.Info("Unsubscribing from job status updates", "subscription id", id) log.Info("Unsubscribing from job status updates", "sub.id", id)
sds.jobStatusSubsMutex.Lock() sds.jobStatusSubsMutex.Lock()
close(sds.jobStatusSubs[id].quitChan)
delete(sds.jobStatusSubs, id) delete(sds.jobStatusSubs, id)
sds.jobStatusSubsMutex.Unlock() sds.jobStatusSubsMutex.Unlock()
return nil
} }
// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height // StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height
func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) { func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) {
current := sds.BlockChain.GetBlockByNumber(blockNumber) current := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending code and codehash", "block height", blockNumber) log.Info("sending code and codehash", "number", blockNumber)
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root()) currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
if err != nil { if err != nil {
log.Error("error creating trie for block", "block height", current.Number(), "err", err) log.Error("error getting trie for block", "number", current.Number(), "error", err)
close(quitChan) close(quitChan)
return return
} }
@ -882,13 +788,13 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
} }
account := new(types.StateAccount) account := new(types.StateAccount)
if err := rlp.DecodeBytes(leafIt.Value, account); err != nil { if err := rlp.DecodeBytes(leafIt.Value, account); err != nil {
log.Error("error decoding state account", "err", err) log.Error("error decoding state account", "error", err)
return return
} }
codeHash := common.BytesToHash(account.CodeHash) codeHash := common.BytesToHash(account.CodeHash)
code, err := sds.BlockChain.StateCache().ContractCode(codeHash) code, err := sds.BlockChain.StateCache().ContractCode(codeHash)
if err != nil { if err != nil {
log.Error("error collecting contract code", "err", err) log.Error("error collecting contract code", "error", err)
return return
} }
outChan <- types2.CodeAndCodeHash{ outChan <- types2.CodeAndCodeHash{
@ -899,12 +805,12 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
}() }()
} }
// WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db: // WatchAddress performs one of following operations on the watched addresses in sds.writeLoopParams and the db:
// add | remove | set | clear // add | remove | set | clear
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error { func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
// lock writeLoopParams for a write sds.writeLoopParams.Lock()
writeLoopParams.Lock() log.Debug("WatchAddress: locked sds.writeLoopParams")
defer writeLoopParams.Unlock() defer sds.writeLoopParams.Unlock()
// get the current block number // get the current block number
currentBlockNumber := sds.BlockChain.CurrentBlock().Number currentBlockNumber := sds.BlockChain.CurrentBlock().Number
@ -913,20 +819,20 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
case types2.Add: case types2.Add:
// filter out args having an already watched address with a warning // filter out args having an already watched address with a warning
filteredArgs, ok := funk.Filter(args, func(arg types2.WatchAddressArg) bool { filteredArgs, ok := funk.Filter(args, func(arg types2.WatchAddressArg) bool {
if funk.Contains(writeLoopParams.WatchedAddresses, plugeth.HexToAddress(arg.Address)) { if funk.Contains(sds.writeLoopParams.WatchedAddresses, plugeth.HexToAddress(arg.Address)) {
log.Warn("Address already being watched", "address", arg.Address) log.Warn("Address already being watched", "address", arg.Address)
return false return false
} }
return true return true
}).([]types2.WatchAddressArg) }).([]types2.WatchAddressArg)
if !ok { if !ok {
return fmt.Errorf("add: filtered args %s", typeAssertionFailed) return fmt.Errorf("add: filtered args %w", errTypeAssertionFailed)
} }
// get addresses from the filtered args // get addresses from the filtered args
filteredAddresses, err := MapWatchAddressArgsToAddresses(filteredArgs) filteredAddresses, err := MapWatchAddressArgsToAddresses(filteredArgs)
if err != nil { if err != nil {
return fmt.Errorf("add: filtered addresses %s", err.Error()) return fmt.Errorf("add: filtered addresses %w", err)
} }
// update the db // update the db
@ -938,19 +844,19 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...) sds.writeLoopParams.WatchedAddresses = append(sds.writeLoopParams.WatchedAddresses, filteredAddresses...)
writeLoopParams.ComputeWatchedAddressesLeafPaths() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Remove: case types2.Remove:
// get addresses from args // get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args) argAddresses, err := MapWatchAddressArgsToAddresses(args)
if err != nil { if err != nil {
return fmt.Errorf("remove: mapped addresses %s", err.Error()) return fmt.Errorf("remove: mapped addresses %w", err)
} }
// remove the provided addresses from currently watched addresses // remove the provided addresses from currently watched addresses
addresses, ok := funk.Subtract(writeLoopParams.WatchedAddresses, argAddresses).([]common.Address) addresses, ok := funk.Subtract(sds.writeLoopParams.WatchedAddresses, argAddresses).([]common.Address)
if !ok { if !ok {
return fmt.Errorf("remove: filtered addresses %s", typeAssertionFailed) return fmt.Errorf("remove: filtered addresses %w", errTypeAssertionFailed)
} }
// update the db // update the db
@ -962,13 +868,13 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = addresses sds.writeLoopParams.WatchedAddresses = addresses
writeLoopParams.ComputeWatchedAddressesLeafPaths() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Set: case types2.Set:
// get addresses from args // get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args) argAddresses, err := MapWatchAddressArgsToAddresses(args)
if err != nil { if err != nil {
return fmt.Errorf("set: mapped addresses %s", err.Error()) return fmt.Errorf("set: mapped addresses %w", err)
} }
// update the db // update the db
@ -980,8 +886,8 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = argAddresses sds.writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Clear: case types2.Clear:
// update the db // update the db
if sds.indexer != nil { if sds.indexer != nil {
@ -992,29 +898,27 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
} }
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = []common.Address{} sds.writeLoopParams.WatchedAddresses = []common.Address{}
writeLoopParams.ComputeWatchedAddressesLeafPaths() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
default: default:
return fmt.Errorf("%s %s", unexpectedOperation, operation) return fmt.Errorf("%w: %v", errUnexpectedOperation, operation)
} }
return nil return nil
} }
// loadWatchedAddresses loads watched addresses to in-memory write loop params // loadWatchedAddresses loads watched addresses from an indexer to params
func loadWatchedAddresses(indexer interfaces.StateDiffIndexer) error { func loadWatchedAddresses(indexer interfaces.StateDiffIndexer, params *ParamsWithMutex) error {
watchedAddresses, err := indexer.LoadWatchedAddresses() watchedAddresses, err := indexer.LoadWatchedAddresses()
if err != nil { if err != nil {
return err return err
} }
params.Lock()
defer params.Unlock()
writeLoopParams.Lock() params.WatchedAddresses = watchedAddresses
defer writeLoopParams.Unlock() params.ComputeWatchedAddressesLeafPaths()
writeLoopParams.WatchedAddresses = watchedAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths()
return nil return nil
} }
@ -1024,7 +928,7 @@ func MapWatchAddressArgsToAddresses(args []types2.WatchAddressArg) ([]common.Add
return common.HexToAddress(arg.Address) return common.HexToAddress(arg.Address)
}).([]common.Address) }).([]common.Address)
if !ok { if !ok {
return nil, fmt.Errorf(typeAssertionFailed) return nil, errTypeAssertionFailed
} }
return addresses, nil return addresses, nil

View File

@ -17,12 +17,10 @@
package statediff_test package statediff_test
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"math/big" "math/big"
"math/rand" "math/rand"
"reflect"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -33,19 +31,25 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
// "github.com/ethereum/go-ethereum/rpc" geth_log "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
// plugeth "github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted/rlp"
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/test_helpers/mocks" "github.com/cerc-io/plugeth-statediff/test_helpers/mocks"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
// "github.com/cerc-io/plugeth-statediff/utils/log"
) )
func init() {
// The geth sync logs are noisy, silence them
geth_log.Root().SetHandler(geth_log.DiscardHandler())
// log.TestLogger.SetLevel(2)
}
func TestServiceLoop(t *testing.T) { func TestServiceLoop(t *testing.T) {
testErrorInChainEventLoop(t) t.Run("error in chain event loop", testErrorInChainEventLoop)
testErrorInBlockLoop(t) t.Run("error in block loop", testErrorInBlockLoop)
} }
var ( var (
@ -104,13 +108,14 @@ func testErrorInChainEventLoop(t *testing.T) {
Builder: &builder, Builder: &builder,
BlockChain: &blockChain, BlockChain: &blockChain,
QuitChan: serviceQuit, QuitChan: serviceQuit,
Subscriptions: make(map[common.Hash]map[statediff.RpcID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[statediff.SubID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1), BlockCache: statediff.NewBlockCache(1),
} }
payloadChan := make(chan statediff.Payload, 2) payloadChan := make(chan statediff.Payload, 2)
quitChan := make(chan bool) quitChan := make(chan bool)
service.Subscribe(payloadChan, quitChan, defaultParams) service.Subscribe(payloadChan, quitChan, defaultParams)
// FIXME why is this here?
testRoot2 = common.HexToHash("0xTestRoot2") testRoot2 = common.HexToHash("0xTestRoot2")
blockMapping := make(map[common.Hash]*types.Block) blockMapping := make(map[common.Hash]*types.Block)
blockMapping[parentBlock1.Hash()] = parentBlock1 blockMapping[parentBlock1.Hash()] = parentBlock1
@ -133,12 +138,9 @@ func testErrorInChainEventLoop(t *testing.T) {
} }
wg.Done() wg.Done()
}() }()
service.Loop(eventsChannel) service.PublishLoop(eventsChannel)
wg.Wait() wg.Wait()
if len(payloads) != 2 { require.Equal(t, 2, len(payloads), "number of payloads")
t.Error("Test failure:", t.Name())
t.Logf("Actual number of payloads does not equal expected.\nactual: %+v\nexpected: 3", len(payloads))
}
testReceipts1Rlp, err := rlp.EncodeToBytes(&testReceipts1) testReceipts1Rlp, err := rlp.EncodeToBytes(&testReceipts1)
if err != nil { if err != nil {
@ -150,34 +152,16 @@ func testErrorInChainEventLoop(t *testing.T) {
} }
expectedReceiptsRlp := [][]byte{testReceipts1Rlp, testReceipts2Rlp, nil} expectedReceiptsRlp := [][]byte{testReceipts1Rlp, testReceipts2Rlp, nil}
for i, payload := range payloads { for i, payload := range payloads {
if !bytes.Equal(payload.ReceiptsRlp, expectedReceiptsRlp[i]) { require.Equal(t, expectedReceiptsRlp[i], payload.ReceiptsRlp, "payload %d", i)
t.Error("Test failure:", t.Name())
t.Logf("Actual receipt rlp for payload %d does not equal expected.\nactual: %+v\nexpected: %+v", i, payload.ReceiptsRlp, expectedReceiptsRlp[i])
}
} }
if !reflect.DeepEqual(builder.Params, defaultParams) { require.Equal(t, builder.Params, defaultParams)
t.Error("Test failure:", t.Name()) require.Equal(t, testBlock2.Hash(), builder.Args.BlockHash)
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) require.Equal(t, parentBlock2.Root(), builder.Args.OldStateRoot)
} require.Equal(t, testBlock2.Root(), builder.Args.NewStateRoot)
if !bytes.Equal(builder.Args.BlockHash.Bytes(), testBlock2.Hash().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual blockhash does not equal expected.\nactual:%x\nexpected: %x", builder.Args.BlockHash.Bytes(), testBlock2.Hash().Bytes())
}
if !bytes.Equal(builder.Args.OldStateRoot.Bytes(), parentBlock2.Root().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual root does not equal expected.\nactual:%x\nexpected: %x", builder.Args.OldStateRoot.Bytes(), parentBlock2.Root().Bytes())
}
if !bytes.Equal(builder.Args.NewStateRoot.Bytes(), testBlock2.Root().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual root does not equal expected.\nactual:%x\nexpected: %x", builder.Args.NewStateRoot.Bytes(), testBlock2.Root().Bytes())
}
//look up the parent block from its hash //look up the parent block from its hash
expectedHashes := []common.Hash{testBlock1.ParentHash(), testBlock2.ParentHash()} expectedHashes := []common.Hash{testBlock1.ParentHash(), testBlock2.ParentHash()}
if !reflect.DeepEqual(blockChain.HashesLookedUp, expectedHashes) { require.Equal(t, expectedHashes, blockChain.HashesLookedUp)
t.Error("Test failure:", t.Name())
t.Logf("Actual looked up parent hashes does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.HashesLookedUp, expectedHashes)
}
} }
func testErrorInBlockLoop(t *testing.T) { func testErrorInBlockLoop(t *testing.T) {
@ -188,7 +172,7 @@ func testErrorInBlockLoop(t *testing.T) {
Builder: &builder, Builder: &builder,
BlockChain: &blockChain, BlockChain: &blockChain,
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[statediff.RpcID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[statediff.SubID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1), BlockCache: statediff.NewBlockCache(1),
} }
@ -206,24 +190,12 @@ func testErrorInBlockLoop(t *testing.T) {
case <-quitChan: case <-quitChan:
} }
}() }()
service.Loop(eventsChannel) service.PublishLoop(eventsChannel)
if !reflect.DeepEqual(builder.Params, defaultParams) { require.Equal(t, defaultParams, builder.Params)
t.Error("Test failure:", t.Name()) require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash)
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot)
} require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot)
if !bytes.Equal(builder.Args.BlockHash.Bytes(), testBlock1.Hash().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual blockhash does not equal expected.\nactual:%+v\nexpected: %x", builder.Args.BlockHash.Bytes(), testBlock1.Hash().Bytes())
}
if !bytes.Equal(builder.Args.OldStateRoot.Bytes(), parentBlock1.Root().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual old state root does not equal expected.\nactual:%+v\nexpected: %x", builder.Args.OldStateRoot.Bytes(), parentBlock1.Root().Bytes())
}
if !bytes.Equal(builder.Args.NewStateRoot.Bytes(), testBlock1.Root().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual new state root does not equal expected.\nactual:%+v\nexpected: %x", builder.Args.NewStateRoot.Bytes(), testBlock1.Root().Bytes())
}
} }
func TestGetStateDiffAt(t *testing.T) { func TestGetStateDiffAt(t *testing.T) {
@ -264,7 +236,7 @@ func TestGetStateDiffAt(t *testing.T) {
Builder: &builder, Builder: &builder,
BlockChain: &blockChain, BlockChain: &blockChain,
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[statediff.RpcID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[statediff.SubID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1), BlockCache: statediff.NewBlockCache(1),
} }
@ -277,26 +249,11 @@ func TestGetStateDiffAt(t *testing.T) {
t.Error(err) t.Error(err)
} }
if !reflect.DeepEqual(builder.Params, defaultParams) { require.Equal(t, defaultParams, builder.Params)
t.Error("Test failure:", t.Name()) require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash)
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot)
} require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot)
if !bytes.Equal(builder.Args.BlockHash.Bytes(), testBlock1.Hash().Bytes()) { require.Equal(t, stateDiffPayloadRlp, expectedStateDiffPayloadRlp)
t.Error("Test failure:", t.Name())
t.Logf("Actual blockhash does not equal expected.\nactual:%+v\nexpected: %x", builder.Args.BlockHash.Bytes(), testBlock1.Hash().Bytes())
}
if !bytes.Equal(builder.Args.OldStateRoot.Bytes(), parentBlock1.Root().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual old state root does not equal expected.\nactual:%+v\nexpected: %x", builder.Args.OldStateRoot.Bytes(), parentBlock1.Root().Bytes())
}
if !bytes.Equal(builder.Args.NewStateRoot.Bytes(), testBlock1.Root().Bytes()) {
t.Error("Test failure:", t.Name())
t.Logf("Actual new state root does not equal expected.\nactual:%+v\nexpected: %x", builder.Args.NewStateRoot.Bytes(), testBlock1.Root().Bytes())
}
if !bytes.Equal(expectedStateDiffPayloadRlp, stateDiffPayloadRlp) {
t.Error("Test failure:", t.Name())
t.Logf("Actual state diff payload does not equal expected.\nactual:%+v\nexpected: %+v", expectedStateDiffPayload, stateDiffPayload)
}
} }
type writeSub struct { type writeSub struct {
@ -338,7 +295,8 @@ func TestWriteStateDiffAt(t *testing.T) {
blockChain.SetBlockForNumber(testBlock1, testBlock1.NumberU64()) blockChain.SetBlockForNumber(testBlock1, testBlock1.NumberU64())
blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1) blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1)
service := statediff.NewService(&blockChain, statediff.Config{}, &mocks.Backend{}, &indexer) service, err := statediff.NewService(statediff.Config{}, &blockChain, &mocks.Backend{}, &indexer)
require.NoError(t, err)
service.Builder = &builder service.Builder = &builder
api := statediff.NewPublicAPI(service) api := statediff.NewPublicAPI(service)
@ -359,11 +317,8 @@ func TestWriteStateDiffAt(t *testing.T) {
require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot) require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot)
require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot) require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot)
// unsubscribe and verify we get nothing // verify we get nothing after unsubscribing
// TODO - StreamWrites receives EOF error after unsubscribing. Doesn't seem to impact
// anything but would be good to know why.
ws.unsubscribe() ws.unsubscribe()
job = service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams) job = service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams)
ok, _ = ws.awaitStatus(job, jobTimeout) ok, _ = ws.awaitStatus(job, jobTimeout)
require.False(t, ok) require.False(t, ok)
@ -377,11 +332,6 @@ func TestWriteStateDiffAt(t *testing.T) {
require.True(t, ok) require.True(t, ok)
} }
func TestWaitForSync(t *testing.T) {
testWaitForSync(t)
testGetSyncStatus(t)
}
// This function will create a backend and service object which includes a generic Backend // This function will create a backend and service object which includes a generic Backend
func createServiceWithMockBackend(t *testing.T, curBlock uint64, highestBlock uint64) (*mocks.Backend, *statediff.Service) { func createServiceWithMockBackend(t *testing.T, curBlock uint64, highestBlock uint64) (*mocks.Backend, *statediff.Service) {
builder := mocks.Builder{} builder := mocks.Builder{}
@ -408,105 +358,48 @@ func createServiceWithMockBackend(t *testing.T, curBlock uint64, highestBlock ui
Builder: &builder, Builder: &builder,
BlockChain: &blockChain, BlockChain: &blockChain,
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[statediff.RpcID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[statediff.SubID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1), BlockCache: statediff.NewBlockCache(1),
BackendAPI: backend, BackendAPI: backend,
WaitForSync: true, ShouldWaitForSync: true,
} }
return backend, service return backend, service
} }
// This function will test to make sure that the state diff waits // TestWaitForSync ensures that the service waits until the blockchain has caught up to head
// until the blockchain has caught up to head! func TestWaitForSync(t *testing.T) {
func testWaitForSync(t *testing.T) { // Trivial case
t.Log("Starting Sync")
_, service := createServiceWithMockBackend(t, 10, 10) _, service := createServiceWithMockBackend(t, 10, 10)
err := service.WaitingForSync() service.WaitForSync()
if err != nil {
t.Fatal("Sync Failed")
}
t.Log("Sync Complete")
}
// This test will run the WaitForSync() at the start of the execusion // Catching-up case
// It will then incrementally increase the currentBlock to match the highestBlock
// At each interval it will run the GetSyncStatus to ensure that the return value is not false.
// It will also check to make sure that the WaitForSync() function has not completed!
func testGetSyncStatus(t *testing.T) {
t.Log("Starting Get Sync Status Test")
var highestBlock uint64 = 5 var highestBlock uint64 = 5
// Create a backend and a service // Create a service and a backend that is lagging behind the sync.
// the backend is lagging behind the sync.
backend, service := createServiceWithMockBackend(t, 0, highestBlock) backend, service := createServiceWithMockBackend(t, 0, highestBlock)
syncComplete := make(chan int, 1)
checkSyncComplete := make(chan int, 1)
go func() { go func() {
// Start the sync function which will wait for the sync service.WaitForSync()
// Once the sync is complete add a value to the checkSyncComplet channel syncComplete <- 0
t.Log("Starting Sync")
err := service.WaitingForSync()
if err != nil {
t.Error("Sync Failed")
checkSyncComplete <- 1
}
t.Log("We have finally synced!")
checkSyncComplete <- 0
}() }()
tables := []struct { // Iterate blocks, updating the current synced block
currentBlock uint64 for currentBlock := uint64(0); currentBlock <= highestBlock; currentBlock++ {
highestBlock uint64 backend.SetCurrentBlock(currentBlock)
}{ if currentBlock < highestBlock {
{1, highestBlock}, // Ensure we are still waiting if we haven't actually reached head
{2, highestBlock}, require.Equal(t, len(syncComplete), 0)
{3, highestBlock}, }
{4, highestBlock},
{5, highestBlock},
} }
time.Sleep(2 * time.Second) timeout := time.After(time.Second)
for _, table := range tables { for {
// Iterate over each block select {
// Once the highest block reaches the current block the sync should complete case <-syncComplete:
return
// Update the backend current block value case <-timeout:
t.Log("Updating Current Block to: ", table.currentBlock) t.Fatal("timed out waiting for sync to complete")
backend.SetCurrentBlock(table.currentBlock)
syncStatus := service.GetSyncStatus()
time.Sleep(2 * time.Second)
// Make sure if syncStatus is false that WaitForSync has completed!
if !syncStatus && len(checkSyncComplete) == 0 {
t.Error("Sync is complete but WaitForSync is not")
}
if syncStatus && len(checkSyncComplete) == 1 {
t.Error("Sync is not complete but WaitForSync is")
}
// Make sure sync hasn't completed and that the checkSyncComplete channel is empty
if syncStatus && len(checkSyncComplete) == 0 {
continue
}
// This code will only be run if the sync is complete and the WaitForSync function is complete
// If syncstatus is complete, make sure that the blocks match
if !syncStatus && table.currentBlock != table.highestBlock {
t.Errorf("syncStatus indicated sync was complete even when current block, %d, and highest block %d aren't equal",
table.currentBlock, table.highestBlock)
}
// Make sure that WaitForSync completed once the current block caught up to head!
checkSyncCompleteVal := <-checkSyncComplete
if checkSyncCompleteVal != 0 {
t.Errorf("syncStatus indicated sync was complete but the checkSyncComplete has a value of %d",
checkSyncCompleteVal)
} else {
t.Log("Test Passed!")
} }
} }
} }

View File

@ -1,83 +0,0 @@
package statediff
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
// "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/trie"
plugeth "github.com/openrelayxyz/plugeth-utils/core"
// plugeth_trie "github.com/openrelayxyz/plugeth-utils/restricted/trie"
"github.com/cerc-io/plugeth-statediff/adapt"
)
// Exposes a minimal interface for state access for diff building
type StateView interface {
OpenTrie(root common.Hash) (StateTrie, error)
ContractCode(codeHash common.Hash) ([]byte, error)
}
// StateTrie is an interface exposing only the necessary methods from state.Trie
type StateTrie interface {
GetKey([]byte) []byte
// GetAccount(common.Address) (*types.StateAccount, error)
// Hash() common.Hash
NodeIterator([]byte) trie.NodeIterator
// Prove(key []byte, fromLevel uint, proofDb KeyValueWriter) error
}
// exposes a StateView from a combination of plugeth's core Backend and cached contract code
type plugethStateView struct {
b plugeth.Backend
code map[common.Hash][]byte
}
var _ StateView = &plugethStateView{}
func (p *plugethStateView) OpenTrie(root common.Hash) (StateTrie, error) {
t, err := p.b.GetTrie(plugeth.Hash(root))
if err != nil {
return nil, err
}
return adaptTrie{t}, nil
}
func (p *plugethStateView) ContractCode(hash common.Hash) ([]byte, error) {
return p.code[hash], nil
}
// adapts a state.Database to StateView - used in tests
type stateDatabaseView struct {
db state.Database
}
var _ StateView = stateDatabaseView{}
func StateDatabaseView(db state.Database) StateView {
return stateDatabaseView{db}
}
func (a stateDatabaseView) OpenTrie(root common.Hash) (StateTrie, error) {
// return adaptTrie{a.db.OpenTrie(common.Hash(root))}
return a.db.OpenTrie(common.Hash(root))
}
func (a stateDatabaseView) ContractCode(hash common.Hash) ([]byte, error) {
return a.db.ContractCode(common.Hash{}, hash)
}
// adapts geth Trie to plugeth
type adaptTrie struct {
plugeth.Trie
}
var _ StateTrie = adaptTrie{}
// func (a adaptTrie) GetAccount(addr *types.StateAccount) (*plugeth.StateAccount, error) {
// return adapt.StateAccount(a.Trie.GetAccount(addr))
// }
func (a adaptTrie) NodeIterator(start []byte) trie.NodeIterator {
return adapt.NodeIterator(a.Trie.NodeIterator(start))
}

View File

@ -28,12 +28,12 @@ func NewBackend(t *testing.T, progress ethereum.SyncProgress) *Backend {
MockBackend: NewMockBackend(ctl), MockBackend: NewMockBackend(ctl),
downloader: dler, downloader: dler,
} }
ret.EXPECT().Downloader().Return(&dler).AnyTimes() ret.EXPECT().Downloader().Return(&ret.downloader).AnyTimes()
return ret return ret
} }
func (b *Backend) SetCurrentBlock(block uint64) { func (b *Backend) SetCurrentBlock(block uint64) {
b.downloader.SyncProgress.StartingBlock = block b.downloader.SyncProgress.CurrentBlock = block
} }
func (d Downloader) Progress() plugeth.Progress { func (d Downloader) Progress() plugeth.Progress {
@ -67,8 +67,8 @@ func TestBackend(t *testing.T) {
} }
b.SetCurrentBlock(420) b.SetCurrentBlock(420)
block = b.Downloader().Progress().StartingBlock() block = b.Downloader().Progress().CurrentBlock()
if 420 != block { if 420 != block {
t.Fatalf("wrong StartingBlock; expected %d, got %d", 420, block) t.Fatalf("wrong CurrentBlock; expected %d, got %d", 420, block)
} }
} }

View File

@ -21,12 +21,11 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/cerc-io/plugeth-statediff"
) )
// BlockChain is a mock blockchain for testing // BlockChain is a mock blockchain for testing
@ -68,7 +67,7 @@ func (bc *BlockChain) SetChainEvents(chainEvents []core.ChainEvent) {
// SubscribeChainEvent mock method // SubscribeChainEvent mock method
func (bc *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { func (bc *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
subErr := errors.New("subscription error") subErr := errors.New("mock subscription error")
var eventCounter int var eventCounter int
subscription := event.NewSubscription(func(quit <-chan struct{}) error { subscription := event.NewSubscription(func(quit <-chan struct{}) error {
@ -153,6 +152,6 @@ func (bc *BlockChain) SetTd(hash common.Hash, blockNum uint64, td *big.Int) {
// func (bc *BlockChain) UnlockTrie(root core.Hash) {} // func (bc *BlockChain) UnlockTrie(root core.Hash) {}
// TODO // TODO
func (bc *BlockChain) StateCache() statediff.StateView { func (bc *BlockChain) StateCache() adapt.StateView {
return nil return nil
} }

View File

@ -17,9 +17,8 @@
package mocks package mocks
import ( import (
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/core/types"
) )
var _ statediff.Builder = &Builder{} var _ statediff.Builder = &Builder{}
@ -29,8 +28,6 @@ type Builder struct {
Args statediff.Args Args statediff.Args
Params statediff.Params Params statediff.Params
stateDiff sdtypes.StateObject stateDiff sdtypes.StateObject
block *types.Block
stateTrie sdtypes.StateObject
builderError error builderError error
} }
@ -50,19 +47,12 @@ func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statedi
return builder.builderError return builder.builderError
} }
// BuildStateTrieObject mock method
func (builder *Builder) BuildStateTrieObject(block *types.Block) (sdtypes.StateObject, error) {
builder.block = block
return builder.stateTrie, builder.builderError
}
// SetStateDiffToBuild mock method // SetStateDiffToBuild mock method
func (builder *Builder) SetStateDiffToBuild(stateDiff sdtypes.StateObject) { func (builder *Builder) SetStateDiffToBuild(stateDiff sdtypes.StateObject) {
builder.stateDiff = stateDiff builder.stateDiff = stateDiff
} }
// SetBuilderError mock method // SetBuilderError mock method
func (builder *Builder) SetBuilderError(err error) { func (builder *Builder) SetError(err error) {
builder.builderError = err builder.builderError = err
} }

View File

@ -21,8 +21,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
// common "github.com/openrelayxyz/plugeth-utils/core"
// "github.com/ethereum/go-ethereum/types"
) )
// StateRoots holds the state roots required for generating a state diff // StateRoots holds the state roots required for generating a state diff

View File

@ -1,6 +1,7 @@
package log package log
import ( import (
// geth_log "github.com/ethereum/go-ethereum/log"
"github.com/inconshreveable/log15" "github.com/inconshreveable/log15"
"github.com/openrelayxyz/plugeth-utils/core" "github.com/openrelayxyz/plugeth-utils/core"
) )
@ -9,12 +10,14 @@ type Logger = core.Logger
var ( var (
DefaultLogger core.Logger DefaultLogger core.Logger
TestLogger = Log15Logger()
) )
func init() { func init() {
// The plugeth logger is only initialized with the geth runtime, // The plugeth logger is only initialized with the geth runtime,
// but tests expect to have a logger available, so default to this. // but tests expect to have a logger available, so default to this.
DefaultLogger = TestLogger() DefaultLogger = TestLogger
} }
func Trace(m string, a ...interface{}) { DefaultLogger.Trace(m, a...) } func Trace(m string, a ...interface{}) { DefaultLogger.Trace(m, a...) }
@ -24,6 +27,19 @@ func Warn(m string, a ...interface{}) { DefaultLogger.Warn(m, a...) }
func Crit(m string, a ...interface{}) { DefaultLogger.Crit(m, a...) } func Crit(m string, a ...interface{}) { DefaultLogger.Crit(m, a...) }
func Error(m string, a ...interface{}) { DefaultLogger.Error(m, a...) } func Error(m string, a ...interface{}) { DefaultLogger.Error(m, a...) }
func SetDefaultLogger(l core.Logger) {
// gethlogger, ok := l.(geth_log.Logger)
// if !ok {
// panic("not a geth Logger")
// }
DefaultLogger = l
}
// Log15Logger returns a logger satisfying the same interface as geth's
func Log15Logger(ctx ...interface{}) wrapLog15 {
return wrapLog15{log15.New(ctx...)}
}
type wrapLog15 struct{ log15.Logger } type wrapLog15 struct{ log15.Logger }
func (l wrapLog15) New(ctx ...interface{}) Logger { func (l wrapLog15) New(ctx ...interface{}) Logger {
@ -34,8 +50,8 @@ func (l wrapLog15) Trace(m string, a ...interface{}) {
l.Logger.Debug(m, a...) l.Logger.Debug(m, a...)
} }
func TestLogger(ctx ...interface{}) Logger { func (l wrapLog15) SetLevel(lvl int) {
return wrapLog15{log15.New(ctx...)} l.SetHandler(log15.LvlFilterHandler(log15.Lvl(lvl), l.GetHandler()))
} }
// New returns a Logger that includes the contextual args in all output // New returns a Logger that includes the contextual args in all output

View File

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/openrelayxyz/plugeth-utils/restricted/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
// Fatalf formats a message to standard error and exits the program. // Fatalf formats a message to standard error and exits the program.