websocket stream of block + state diff data for seed node
This commit is contained in:
parent
22d78f23cd
commit
131bd1d81d
@ -143,8 +143,6 @@ var (
|
|||||||
utils.EWASMInterpreterFlag,
|
utils.EWASMInterpreterFlag,
|
||||||
utils.EVMInterpreterFlag,
|
utils.EVMInterpreterFlag,
|
||||||
utils.StateDiffFlag,
|
utils.StateDiffFlag,
|
||||||
utils.StateDiffModeFlag,
|
|
||||||
utils.StateDiffPathFlag,
|
|
||||||
configFileFlag,
|
configFileFlag,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,8 +249,6 @@ var AppHelpFlagGroups = []flagGroup{
|
|||||||
Name: "STATE DIFF",
|
Name: "STATE DIFF",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
utils.StateDiffFlag,
|
utils.StateDiffFlag,
|
||||||
utils.StateDiffModeFlag,
|
|
||||||
utils.StateDiffPathFlag,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -56,7 +56,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/p2p/nat"
|
"github.com/ethereum/go-ethereum/p2p/nat"
|
||||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/service"
|
"github.com/ethereum/go-ethereum/statediff/service"
|
||||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||||
cli "gopkg.in/urfave/cli.v1"
|
cli "gopkg.in/urfave/cli.v1"
|
||||||
@ -714,18 +713,6 @@ var (
|
|||||||
Name: "statediff",
|
Name: "statediff",
|
||||||
Usage: "Enables the calculation of state diffs between each block, persists these state diffs the configured persistence mode.",
|
Usage: "Enables the calculation of state diffs between each block, persists these state diffs the configured persistence mode.",
|
||||||
}
|
}
|
||||||
|
|
||||||
StateDiffModeFlag = cli.StringFlag{
|
|
||||||
Name: "statediff.mode",
|
|
||||||
Usage: "Enables the user to determine which persistence mode they'd like to store the state diffs in.",
|
|
||||||
Value: "csv",
|
|
||||||
}
|
|
||||||
|
|
||||||
StateDiffPathFlag = cli.StringFlag{
|
|
||||||
Name: "statediff.path",
|
|
||||||
Usage: "Enables the user to determine where to persist the state diffs.",
|
|
||||||
Value: ".",
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// MakeDataDir retrieves the currently requested data directory, terminating
|
// MakeDataDir retrieves the currently requested data directory, terminating
|
||||||
@ -937,6 +924,9 @@ func setWS(ctx *cli.Context, cfg *node.Config) {
|
|||||||
if ctx.GlobalIsSet(WSApiFlag.Name) {
|
if ctx.GlobalIsSet(WSApiFlag.Name) {
|
||||||
cfg.WSModules = splitAndTrim(ctx.GlobalString(WSApiFlag.Name))
|
cfg.WSModules = splitAndTrim(ctx.GlobalString(WSApiFlag.Name))
|
||||||
}
|
}
|
||||||
|
if ctx.GlobalBool(StateDiffFlag.Name) {
|
||||||
|
cfg.WSModules = append(cfg.WSModules, "statediff")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setIPC creates an IPC path configuration from the set command line flags,
|
// setIPC creates an IPC path configuration from the set command line flags,
|
||||||
@ -1535,29 +1525,14 @@ func RegisterEthStatsService(stack *node.Node, url string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
|
||||||
func RegisterStateDiffService(stack *node.Node, ctx *cli.Context) {
|
func RegisterStateDiffService(stack *node.Node, ctx *cli.Context) {
|
||||||
//based on the context, if path and mode are set, update the config here
|
|
||||||
//otherwise pass in an empty config
|
|
||||||
|
|
||||||
modeFlag := ctx.GlobalString(StateDiffModeFlag.Name)
|
|
||||||
mode, err := statediff.NewMode(modeFlag)
|
|
||||||
if err != nil {
|
|
||||||
Fatalf("Failed to register State Diff Service", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
path := ctx.GlobalString(StateDiffPathFlag.Name)
|
|
||||||
|
|
||||||
config := statediff.Config{
|
|
||||||
Mode: mode,
|
|
||||||
Path: path,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
|
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
|
||||||
var ethServ *eth.Ethereum
|
var ethServ *eth.Ethereum
|
||||||
ctx.Service(ðServ)
|
ctx.Service(ðServ)
|
||||||
chainDb := ethServ.ChainDb()
|
chainDb := ethServ.ChainDb()
|
||||||
blockChain := ethServ.BlockChain()
|
blockChain := ethServ.BlockChain()
|
||||||
return service.NewStateDiffService(chainDb, blockChain, config)
|
return service.NewStateDiffService(chainDb, blockChain)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
Fatalf("Failed to register State Diff Service", err)
|
Fatalf("Failed to register State Diff Service", err)
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// Copyright 2015 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package builder_test
|
package builder_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type AccountDiffsMap map[common.Hash]AccountDiff
|
type AccountDiffsMap map[common.Hash]AccountDiff
|
||||||
|
|
||||||
type StateDiff struct {
|
type StateDiff struct {
|
||||||
BlockNumber int64 `json:"blockNumber" gencodec:"required"`
|
BlockNumber int64 `json:"blockNumber" gencodec:"required"`
|
||||||
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
|
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
|
||||||
@ -77,3 +78,27 @@ type DiffUint64 struct {
|
|||||||
type DiffBigInt struct {
|
type DiffBigInt struct {
|
||||||
Value *big.Int `json:"value" gencodec:"optional"`
|
Value *big.Int `json:"value" gencodec:"optional"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
// State trie leaf is just a short node, below
|
||||||
|
// that has an rlp encoded account as the value
|
||||||
|
|
||||||
|
|
||||||
|
// SO each account diffs map is reall a map of shortnode keys to values
|
||||||
|
// Flatten to a slice of short nodes?
|
||||||
|
|
||||||
|
// Need to coerce into:
|
||||||
|
|
||||||
|
type TrieNode struct {
|
||||||
|
// leaf, extension or branch
|
||||||
|
nodeKind string
|
||||||
|
|
||||||
|
// If leaf or extension: [0] is key, [1] is val.
|
||||||
|
// If branch: [0] - [16] are children.
|
||||||
|
elements []interface{}
|
||||||
|
|
||||||
|
// IPLD block information
|
||||||
|
cid *cid.Cid
|
||||||
|
rawdata []byte
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
@ -1,93 +0,0 @@
|
|||||||
// Copyright 2015 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
// Contains a batch of utility type declarations used by the tests. As the node
|
|
||||||
// operates on unique types, a lot of them are needed to check various features.
|
|
||||||
|
|
||||||
package statediff
|
|
||||||
|
|
||||||
import "fmt"
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
Mode StateDiffMode // Mode for storing diffs
|
|
||||||
Path string // Path for storing diffs
|
|
||||||
}
|
|
||||||
|
|
||||||
type StateDiffMode int
|
|
||||||
|
|
||||||
const (
|
|
||||||
CSV StateDiffMode = iota
|
|
||||||
IPLD
|
|
||||||
LDB
|
|
||||||
SQL
|
|
||||||
)
|
|
||||||
|
|
||||||
func (mode StateDiffMode) IsValid() bool {
|
|
||||||
return mode >= IPLD && mode <= SQL
|
|
||||||
}
|
|
||||||
|
|
||||||
// String implements the stringer interface.
|
|
||||||
func (mode StateDiffMode) String() string {
|
|
||||||
switch mode {
|
|
||||||
case CSV:
|
|
||||||
return "csv"
|
|
||||||
case IPLD:
|
|
||||||
return "ipfs"
|
|
||||||
case LDB:
|
|
||||||
return "ldb"
|
|
||||||
case SQL:
|
|
||||||
return "sql"
|
|
||||||
default:
|
|
||||||
return "unknown"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMode(mode string) (StateDiffMode, error) {
|
|
||||||
stateDiffMode := StateDiffMode(0)
|
|
||||||
err := stateDiffMode.UnmarshalText([]byte(mode))
|
|
||||||
return stateDiffMode, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mode StateDiffMode) MarshalText() ([]byte, error) {
|
|
||||||
switch mode {
|
|
||||||
case CSV:
|
|
||||||
return []byte("ipfs"), nil
|
|
||||||
case IPLD:
|
|
||||||
return []byte("ipfs"), nil
|
|
||||||
case LDB:
|
|
||||||
return []byte("ldb"), nil
|
|
||||||
case SQL:
|
|
||||||
return []byte("sql"), nil
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unknown state diff storage mode %d", mode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mode *StateDiffMode) UnmarshalText(text []byte) error {
|
|
||||||
switch string(text) {
|
|
||||||
case "csv":
|
|
||||||
*mode = CSV
|
|
||||||
case "ipfs":
|
|
||||||
*mode = IPLD
|
|
||||||
case "ldb":
|
|
||||||
*mode = LDB
|
|
||||||
case "sql":
|
|
||||||
*mode = SQL
|
|
||||||
default:
|
|
||||||
return fmt.Errorf(`unknown state diff storage mode %q, want "ipfs", "ldb" or "sql"`, text)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
package statediff_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/testhelpers"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestNewMode(t *testing.T) {
|
|
||||||
mode, err := statediff.NewMode("csv")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if mode != statediff.CSV {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = statediff.NewMode("not a real mode")
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Expected an error, and got nil.")
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,51 +0,0 @@
|
|||||||
// Copyright 2015 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
// Contains a batch of utility type declarations used by the tests. As the node
|
|
||||||
// operates on unique types, a lot of them are needed to check various features.
|
|
||||||
|
|
||||||
package extractor
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/builder"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/publisher"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Extractor interface {
|
|
||||||
ExtractStateDiff(parent, current types.Block) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type extractor struct {
|
|
||||||
Builder builder.Builder // Interface for building state diff objects from two blocks
|
|
||||||
Publisher publisher.Publisher // Interface for publishing state diff objects to a datastore (e.g. IPFS)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewExtractor(builder builder.Builder, publisher publisher.Publisher) *extractor {
|
|
||||||
return &extractor{
|
|
||||||
Builder: builder,
|
|
||||||
Publisher: publisher,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *extractor) ExtractStateDiff(parent, current types.Block) (string, error) {
|
|
||||||
stateDiff, err := e.Builder.BuildStateDiff(parent.Root(), current.Root(), current.Number().Int64(), current.Hash())
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return e.Publisher.PublishStateDiff(stateDiff)
|
|
||||||
}
|
|
@ -1,123 +0,0 @@
|
|||||||
package extractor_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"math/big"
|
|
||||||
"math/rand"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
b "github.com/ethereum/go-ethereum/statediff/builder"
|
|
||||||
e "github.com/ethereum/go-ethereum/statediff/extractor"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/testhelpers/mocks"
|
|
||||||
)
|
|
||||||
|
|
||||||
var publisher mocks.Publisher
|
|
||||||
var builder mocks.Builder
|
|
||||||
var currentBlockNumber *big.Int
|
|
||||||
var parentBlock, currentBlock *types.Block
|
|
||||||
var expectedStateDiff b.StateDiff
|
|
||||||
var extractor e.Extractor
|
|
||||||
var err error
|
|
||||||
|
|
||||||
func TestExtractor(t *testing.T) {
|
|
||||||
publisher = mocks.Publisher{}
|
|
||||||
builder = mocks.Builder{}
|
|
||||||
extractor = e.NewExtractor(&builder, &publisher)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
blockNumber := rand.Int63()
|
|
||||||
parentBlockNumber := big.NewInt(blockNumber - int64(1))
|
|
||||||
currentBlockNumber = big.NewInt(blockNumber)
|
|
||||||
parentBlock = types.NewBlock(&types.Header{Number: parentBlockNumber}, nil, nil, nil)
|
|
||||||
currentBlock = types.NewBlock(&types.Header{Number: currentBlockNumber}, nil, nil, nil)
|
|
||||||
|
|
||||||
expectedStateDiff = b.StateDiff{
|
|
||||||
BlockNumber: blockNumber,
|
|
||||||
BlockHash: currentBlock.Hash(),
|
|
||||||
CreatedAccounts: nil,
|
|
||||||
DeletedAccounts: nil,
|
|
||||||
UpdatedAccounts: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
testBuildStateDiffStruct(t)
|
|
||||||
testBuildStateDiffErrorHandling(t)
|
|
||||||
testPublishingStateDiff(t)
|
|
||||||
testPublisherErrorHandling(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testBuildStateDiffStruct(t *testing.T) {
|
|
||||||
builder.SetStateDiffToBuild(&expectedStateDiff)
|
|
||||||
|
|
||||||
_, err = extractor.ExtractStateDiff(*parentBlock, *currentBlock)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !equals(builder.OldStateRoot, parentBlock.Root()) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
if !equals(builder.NewStateRoot, currentBlock.Root()) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
if !equals(builder.BlockNumber, currentBlockNumber.Int64()) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
if !equals(builder.BlockHash, currentBlock.Hash()) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testBuildStateDiffErrorHandling(t *testing.T) {
|
|
||||||
builder.SetBuilderError(mocks.Error)
|
|
||||||
|
|
||||||
_, err = extractor.ExtractStateDiff(*parentBlock, *currentBlock)
|
|
||||||
if err == nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !equals(err, mocks.Error) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
builder.SetBuilderError(nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testPublishingStateDiff(t *testing.T) {
|
|
||||||
builder.SetStateDiffToBuild(&expectedStateDiff)
|
|
||||||
|
|
||||||
_, err = extractor.ExtractStateDiff(*parentBlock, *currentBlock)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !equals(publisher.StateDiff, &expectedStateDiff) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testPublisherErrorHandling(t *testing.T) {
|
|
||||||
publisher.SetPublisherError(mocks.Error)
|
|
||||||
|
|
||||||
_, err = extractor.ExtractStateDiff(*parentBlock, *currentBlock)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Expected an error, but it didn't occur.")
|
|
||||||
}
|
|
||||||
if !equals(err, mocks.Error) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
publisher.SetPublisherError(nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func equals(actual, expected interface{}) (success bool) {
|
|
||||||
if actualByteSlice, ok := actual.([]byte); ok {
|
|
||||||
if expectedByteSlice, ok := expected.([]byte); ok {
|
|
||||||
return bytes.Equal(actualByteSlice, expectedByteSlice)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return reflect.DeepEqual(actual, expected)
|
|
||||||
}
|
|
@ -1,130 +0,0 @@
|
|||||||
package publisher
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/csv"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/builder"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
Headers = []string{
|
|
||||||
"blockNumber", "blockHash", "accountAction", "codeHash",
|
|
||||||
"nonceValue", "balanceValue", "contractRoot", "storageDiffPaths",
|
|
||||||
"accountLeafKey", "storageKey", "storageValue",
|
|
||||||
}
|
|
||||||
|
|
||||||
timeStampFormat = "20060102150405.00000"
|
|
||||||
deletedAccountAction = "deleted"
|
|
||||||
createdAccountAction = "created"
|
|
||||||
updatedAccountAction = "updated"
|
|
||||||
)
|
|
||||||
|
|
||||||
func createCSVFilePath(path, blockNumber string) string {
|
|
||||||
now := time.Now()
|
|
||||||
timeStamp := now.Format(timeStampFormat)
|
|
||||||
suffix := timeStamp + "-" + blockNumber
|
|
||||||
filePath := filepath.Join(path, suffix)
|
|
||||||
filePath = filePath + ".csv"
|
|
||||||
return filePath
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *publisher) publishStateDiffToCSV(sd builder.StateDiff) (string, error) {
|
|
||||||
filePath := createCSVFilePath(p.Config.Path, strconv.FormatInt(sd.BlockNumber, 10))
|
|
||||||
|
|
||||||
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
writer := csv.NewWriter(file)
|
|
||||||
defer writer.Flush()
|
|
||||||
|
|
||||||
var data [][]string
|
|
||||||
data = append(data, Headers)
|
|
||||||
data = append(data, accumulateAccountRows(sd)...)
|
|
||||||
for _, value := range data {
|
|
||||||
err := writer.Write(value)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return filePath, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func accumulateAccountRows(sd builder.StateDiff) [][]string {
|
|
||||||
var accountRows [][]string
|
|
||||||
for accountAddr, accountDiff := range sd.CreatedAccounts {
|
|
||||||
formattedAccountData := formatAccountData(accountAddr, accountDiff, sd, createdAccountAction)
|
|
||||||
|
|
||||||
accountRows = append(accountRows, formattedAccountData...)
|
|
||||||
}
|
|
||||||
|
|
||||||
for accountAddr, accountDiff := range sd.UpdatedAccounts {
|
|
||||||
formattedAccountData := formatAccountData(accountAddr, accountDiff, sd, updatedAccountAction)
|
|
||||||
|
|
||||||
accountRows = append(accountRows, formattedAccountData...)
|
|
||||||
}
|
|
||||||
|
|
||||||
for accountAddr, accountDiff := range sd.DeletedAccounts {
|
|
||||||
formattedAccountData := formatAccountData(accountAddr, accountDiff, sd, deletedAccountAction)
|
|
||||||
|
|
||||||
accountRows = append(accountRows, formattedAccountData...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return accountRows
|
|
||||||
}
|
|
||||||
|
|
||||||
func formatAccountData(accountAddr common.Hash, accountDiff builder.AccountDiff, sd builder.StateDiff, accountAction string) [][]string {
|
|
||||||
blockNumberString := strconv.FormatInt(sd.BlockNumber, 10)
|
|
||||||
blockHash := sd.BlockHash.String()
|
|
||||||
codeHash := accountDiff.CodeHash
|
|
||||||
nonce := strconv.FormatUint(*accountDiff.Nonce.Value, 10)
|
|
||||||
balance := accountDiff.Balance.Value.String()
|
|
||||||
newContractRoot := accountDiff.ContractRoot.Value
|
|
||||||
address := accountAddr.String()
|
|
||||||
var result [][]string
|
|
||||||
|
|
||||||
if len(accountDiff.Storage) > 0 {
|
|
||||||
for storagePath, storage := range accountDiff.Storage {
|
|
||||||
formattedAccountData := []string{
|
|
||||||
blockNumberString,
|
|
||||||
blockHash,
|
|
||||||
accountAction,
|
|
||||||
codeHash,
|
|
||||||
nonce,
|
|
||||||
balance,
|
|
||||||
*newContractRoot,
|
|
||||||
storagePath,
|
|
||||||
address,
|
|
||||||
*storage.Key,
|
|
||||||
*storage.Value,
|
|
||||||
}
|
|
||||||
|
|
||||||
result = append(result, formattedAccountData)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
formattedAccountData := []string{
|
|
||||||
blockNumberString,
|
|
||||||
blockHash,
|
|
||||||
accountAction,
|
|
||||||
codeHash,
|
|
||||||
nonce,
|
|
||||||
balance,
|
|
||||||
*newContractRoot,
|
|
||||||
"",
|
|
||||||
address,
|
|
||||||
"",
|
|
||||||
"",
|
|
||||||
}
|
|
||||||
result = append(result, formattedAccountData)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
|
@ -1,48 +0,0 @@
|
|||||||
// Copyright 2015 The go-ethereum Authors
|
|
||||||
// This file is part of the go-ethereum library.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Lesser General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Lesser General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Lesser General Public License
|
|
||||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
// Contains a batch of utility type declarations used by the tests. As the node
|
|
||||||
// operates on unique types, a lot of them are needed to check various features.
|
|
||||||
|
|
||||||
package publisher
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/builder"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Publisher interface {
|
|
||||||
PublishStateDiff(sd *builder.StateDiff) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type publisher struct {
|
|
||||||
Config statediff.Config
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPublisher(config statediff.Config) (*publisher, error) {
|
|
||||||
return &publisher{
|
|
||||||
Config: config,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *publisher) PublishStateDiff(sd *builder.StateDiff) (string, error) {
|
|
||||||
switch p.Config.Mode {
|
|
||||||
case statediff.CSV:
|
|
||||||
return p.publishStateDiffToCSV(*sd)
|
|
||||||
default:
|
|
||||||
return p.publishStateDiffToCSV(*sd)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,316 +0,0 @@
|
|||||||
package publisher_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"encoding/csv"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"reflect"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/builder"
|
|
||||||
p "github.com/ethereum/go-ethereum/statediff/publisher"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/testhelpers"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
tempDir = os.TempDir()
|
|
||||||
testFilePrefix = "test-statediff"
|
|
||||||
publisher p.Publisher
|
|
||||||
dir string
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
var expectedCreatedAccountRow = []string{
|
|
||||||
strconv.FormatInt(testhelpers.BlockNumber, 10),
|
|
||||||
testhelpers.BlockHash,
|
|
||||||
"created",
|
|
||||||
testhelpers.CodeHash,
|
|
||||||
strconv.FormatUint(testhelpers.NewNonceValue, 10),
|
|
||||||
strconv.FormatInt(testhelpers.NewBalanceValue, 10),
|
|
||||||
testhelpers.ContractRoot,
|
|
||||||
testhelpers.StoragePath,
|
|
||||||
testhelpers.ContractLeafKey.Hex(),
|
|
||||||
"0000000000000000000000000000000000000000000000000000000000000001",
|
|
||||||
testhelpers.StorageValue,
|
|
||||||
}
|
|
||||||
|
|
||||||
var expectedCreatedAccountWithoutStorageUpdateRow = []string{
|
|
||||||
strconv.FormatInt(testhelpers.BlockNumber, 10),
|
|
||||||
testhelpers.BlockHash,
|
|
||||||
"created",
|
|
||||||
testhelpers.CodeHash,
|
|
||||||
strconv.FormatUint(testhelpers.NewNonceValue, 10),
|
|
||||||
strconv.FormatInt(testhelpers.NewBalanceValue, 10),
|
|
||||||
testhelpers.ContractRoot,
|
|
||||||
"",
|
|
||||||
testhelpers.AnotherContractLeafKey.Hex(),
|
|
||||||
"",
|
|
||||||
"",
|
|
||||||
}
|
|
||||||
|
|
||||||
var expectedUpdatedAccountRow = []string{
|
|
||||||
strconv.FormatInt(testhelpers.BlockNumber, 10),
|
|
||||||
testhelpers.BlockHash,
|
|
||||||
"updated",
|
|
||||||
testhelpers.CodeHash,
|
|
||||||
strconv.FormatUint(testhelpers.NewNonceValue, 10),
|
|
||||||
strconv.FormatInt(testhelpers.NewBalanceValue, 10),
|
|
||||||
testhelpers.ContractRoot,
|
|
||||||
testhelpers.StoragePath,
|
|
||||||
testhelpers.ContractLeafKey.Hex(),
|
|
||||||
"0000000000000000000000000000000000000000000000000000000000000001",
|
|
||||||
testhelpers.StorageValue,
|
|
||||||
}
|
|
||||||
|
|
||||||
var expectedDeletedAccountRow = []string{
|
|
||||||
strconv.FormatInt(testhelpers.BlockNumber, 10),
|
|
||||||
testhelpers.BlockHash,
|
|
||||||
"deleted",
|
|
||||||
testhelpers.CodeHash,
|
|
||||||
strconv.FormatUint(testhelpers.NewNonceValue, 10),
|
|
||||||
strconv.FormatInt(testhelpers.NewBalanceValue, 10),
|
|
||||||
testhelpers.ContractRoot,
|
|
||||||
testhelpers.StoragePath,
|
|
||||||
testhelpers.ContractLeafKey.Hex(),
|
|
||||||
"0000000000000000000000000000000000000000000000000000000000000001",
|
|
||||||
testhelpers.StorageValue,
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPublisher(t *testing.T) {
|
|
||||||
dir, err = ioutil.TempDir(tempDir, testFilePrefix)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
config := statediff.Config{
|
|
||||||
Path: dir,
|
|
||||||
Mode: statediff.CSV,
|
|
||||||
}
|
|
||||||
publisher, err = p.NewPublisher(config)
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Test func(t *testing.T)
|
|
||||||
|
|
||||||
var tests = []Test{
|
|
||||||
testFileName,
|
|
||||||
testColumnHeaders,
|
|
||||||
testAccountDiffs,
|
|
||||||
testWhenNoDiff,
|
|
||||||
testDefaultPublisher,
|
|
||||||
testDefaultDirectory,
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
test(t)
|
|
||||||
err := removeFilesFromDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Error removing files from temp dir: %s", dir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func removeFilesFromDir(dir string) error {
|
|
||||||
files, err := filepath.Glob(filepath.Join(dir, "*"))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, file := range files {
|
|
||||||
err = os.RemoveAll(file)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func testFileName(t *testing.T) {
|
|
||||||
fileName, err := publisher.PublishStateDiff(&testhelpers.TestStateDiff)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !strings.HasPrefix(fileName, dir) {
|
|
||||||
t.Errorf(testhelpers.TestFailureFormatString, t.Name(), dir, fileName)
|
|
||||||
}
|
|
||||||
blockNumberWithFileExt := strconv.FormatInt(testhelpers.BlockNumber, 10) + ".csv"
|
|
||||||
if !strings.HasSuffix(fileName, blockNumberWithFileExt) {
|
|
||||||
t.Errorf(testhelpers.TestFailureFormatString, t.Name(), blockNumberWithFileExt, fileName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testColumnHeaders(t *testing.T) {
|
|
||||||
_, err = publisher.PublishStateDiff(&testhelpers.TestStateDiff)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
file, err := getTestDiffFile(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lines, err := csv.NewReader(file).ReadAll()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if len(lines) < 1 {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(lines[0], p.Headers) {
|
|
||||||
t.Error()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testAccountDiffs(t *testing.T) {
|
|
||||||
// it persists the created, updated and deleted account diffs to a CSV file
|
|
||||||
_, err = publisher.PublishStateDiff(&testhelpers.TestStateDiff)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
file, err := getTestDiffFile(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lines, err := csv.NewReader(file).ReadAll()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if len(lines) <= 3 {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(lines[1], expectedCreatedAccountRow) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(lines[2], expectedCreatedAccountWithoutStorageUpdateRow) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(lines[3], expectedUpdatedAccountRow) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(lines[4], expectedDeletedAccountRow) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testWhenNoDiff(t *testing.T) {
|
|
||||||
//it creates an empty CSV when there is no diff
|
|
||||||
emptyDiff := builder.StateDiff{}
|
|
||||||
_, err = publisher.PublishStateDiff(&emptyDiff)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
file, err := getTestDiffFile(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lines, err := csv.NewReader(file).ReadAll()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !equals(len(lines), 1) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testDefaultPublisher(t *testing.T) {
|
|
||||||
//it defaults to publishing state diffs to a CSV file when no mode is configured
|
|
||||||
config := statediff.Config{Path: dir}
|
|
||||||
publisher, err = p.NewPublisher(config)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = publisher.PublishStateDiff(&testhelpers.TestStateDiff)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
file, err := getTestDiffFile(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lines, err := csv.NewReader(file).ReadAll()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(len(lines), 5) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(lines[0], p.Headers) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testDefaultDirectory(t *testing.T) {
|
|
||||||
//it defaults to publishing CSV files in the current directory when no path is configured
|
|
||||||
config := statediff.Config{}
|
|
||||||
publisher, err = p.NewPublisher(config)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := os.Chdir(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = publisher.PublishStateDiff(&testhelpers.TestStateDiff)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
file, err := getTestDiffFile(dir)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lines, err := csv.NewReader(file).ReadAll()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(len(lines), 5) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
if !equals(lines[0], p.Headers) {
|
|
||||||
t.Errorf(testhelpers.ErrorFormatString, t.Name(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getTestDiffFile(dir string) (*os.File, error) {
|
|
||||||
files, err := ioutil.ReadDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(files) == 0 {
|
|
||||||
return nil, errors.New("There are 0 files.")
|
|
||||||
}
|
|
||||||
|
|
||||||
fileName := files[0].Name()
|
|
||||||
filePath := filepath.Join(dir, fileName)
|
|
||||||
|
|
||||||
return os.Open(filePath)
|
|
||||||
}
|
|
||||||
|
|
||||||
func equals(actual, expected interface{}) (success bool) {
|
|
||||||
if actualByteSlice, ok := actual.([]byte); ok {
|
|
||||||
if expectedByteSlice, ok := expected.([]byte); ok {
|
|
||||||
return bytes.Equal(actualByteSlice, expectedByteSlice)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return reflect.DeepEqual(actual, expected)
|
|
||||||
}
|
|
95
statediff/service/api.go
Normal file
95
statediff/service/api.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
// Copyright 2015 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
const APIName = "statediff"
|
||||||
|
const APIVersion = "0.0.1"
|
||||||
|
|
||||||
|
// PublicStateDiffAPI provides the a websocket service
|
||||||
|
// that can be used to stream out state diffs as they
|
||||||
|
// are produced by a full node
|
||||||
|
type PublicStateDiffAPI struct {
|
||||||
|
sds *StateDiffService
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
lastUsed map[string]time.Time // keeps track when a filter was polled for the last time.
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPublicStateDiffAPI create a new state diff websocket streaming service.
|
||||||
|
func NewPublicStateDiffAPI(sds *StateDiffService) *PublicStateDiffAPI {
|
||||||
|
return &PublicStateDiffAPI{
|
||||||
|
sds: sds,
|
||||||
|
lastUsed: make(map[string]time.Time),
|
||||||
|
mu: sync.Mutex{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamData set up a subscription that fires off state-diffs when they are created
|
||||||
|
func (api *PublicStateDiffAPI) StreamData(ctx context.Context) (*rpc.Subscription, error) {
|
||||||
|
// ensure that the RPC connection supports subscriptions
|
||||||
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
|
if !supported {
|
||||||
|
return nil, rpc.ErrNotificationsUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// create subscription and start waiting for statediff events
|
||||||
|
rpcSub := notifier.CreateSubscription()
|
||||||
|
id := rpcSub.ID
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// subscribe to events from the state diff service
|
||||||
|
payloadChannel := make(chan StateDiffPayload)
|
||||||
|
quitChan := make(chan bool)
|
||||||
|
api.sds.Subscribe(id, payloadChannel, quitChan)
|
||||||
|
|
||||||
|
// loop and await state diff payloads and relay them to the subscriber with then notifier
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case packet := <-payloadChannel:
|
||||||
|
if err := notifier.Notify(id, packet); err != nil {
|
||||||
|
log.Error("Failed to send state diff packet", "err", err)
|
||||||
|
}
|
||||||
|
case <-rpcSub.Err():
|
||||||
|
err := api.sds.Unsubscribe(id)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to unsubscribe from the state diff service", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-notifier.Closed():
|
||||||
|
err := api.sds.Unsubscribe(id)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Failed to unsubscribe from the state diff service", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-quitChan:
|
||||||
|
// don't need to unsubscribe, statediff service does so before sending the quit signal
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return rpcSub, nil
|
||||||
|
}
|
@ -1,19 +1,19 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"bytes"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"fmt"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"sync"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
|
||||||
b "github.com/ethereum/go-ethereum/statediff/builder"
|
|
||||||
e "github.com/ethereum/go-ethereum/statediff/extractor"
|
|
||||||
p "github.com/ethereum/go-ethereum/statediff/publisher"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/builder"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BlockChain interface {
|
type BlockChain interface {
|
||||||
@ -23,31 +23,49 @@ type BlockChain interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type StateDiffService struct {
|
type StateDiffService struct {
|
||||||
Builder *b.Builder
|
sync.Mutex
|
||||||
Extractor e.Extractor
|
Builder builder.Builder
|
||||||
BlockChain BlockChain
|
BlockChain BlockChain
|
||||||
|
QuitChan chan bool
|
||||||
|
Subscriptions Subscriptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config statediff.Config) (*StateDiffService, error) {
|
type Subscriptions map[rpc.ID]subscription
|
||||||
builder := b.NewBuilder(db, blockChain)
|
|
||||||
publisher, err := p.NewPublisher(config)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
extractor := e.NewExtractor(builder, publisher)
|
type subscription struct {
|
||||||
|
PayloadChan chan<- StateDiffPayload
|
||||||
|
QuitChan chan<- bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type StateDiffPayload struct {
|
||||||
|
BlockRlp []byte `json:"block"`
|
||||||
|
StateDiff builder.StateDiff `json:"state_diff"`
|
||||||
|
Err error `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain) (*StateDiffService, error) {
|
||||||
return &StateDiffService{
|
return &StateDiffService{
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
Extractor: extractor,
|
Builder: builder.NewBuilder(db, blockChain),
|
||||||
|
QuitChan: make(chan bool),
|
||||||
|
Subscriptions: make(Subscriptions),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (StateDiffService) Protocols() []p2p.Protocol {
|
func (sds *StateDiffService) Protocols() []p2p.Protocol {
|
||||||
return []p2p.Protocol{}
|
return []p2p.Protocol{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (StateDiffService) APIs() []rpc.API {
|
// APIs returns the RPC descriptors the Whisper implementation offers
|
||||||
return []rpc.API{}
|
func (sds *StateDiffService) APIs() []rpc.API {
|
||||||
|
return []rpc.API{
|
||||||
|
{
|
||||||
|
Namespace: APIName,
|
||||||
|
Version: APIVersion,
|
||||||
|
Service: NewPublicStateDiffAPI(sds),
|
||||||
|
Public: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) {
|
func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) {
|
||||||
@ -56,7 +74,6 @@ func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) {
|
|||||||
|
|
||||||
blocksCh := make(chan *types.Block, 10)
|
blocksCh := make(chan *types.Block, 10)
|
||||||
errCh := chainEventSub.Err()
|
errCh := chainEventSub.Err()
|
||||||
quitCh := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
HandleChainEventChLoop:
|
HandleChainEventChLoop:
|
||||||
@ -66,13 +83,15 @@ func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) {
|
|||||||
case chainEvent := <-chainEventCh:
|
case chainEvent := <-chainEventCh:
|
||||||
log.Debug("Event received from chainEventCh", "event", chainEvent)
|
log.Debug("Event received from chainEventCh", "event", chainEvent)
|
||||||
blocksCh <- chainEvent.Block
|
blocksCh <- chainEvent.Block
|
||||||
//if node stopped
|
//if node stopped
|
||||||
case err := <-errCh:
|
case err := <-errCh:
|
||||||
log.Warn("Error from chain event subscription, breaking loop.", "error", err)
|
log.Warn("Error from chain event subscription, breaking loop.", "error", err)
|
||||||
|
close(sds.QuitChan)
|
||||||
|
break HandleChainEventChLoop
|
||||||
|
case <-sds.QuitChan:
|
||||||
break HandleChainEventChLoop
|
break HandleChainEventChLoop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(quitCh)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
//loop through chain events until no more
|
//loop through chain events until no more
|
||||||
@ -90,21 +109,77 @@ HandleBlockChLoop:
|
|||||||
break HandleBlockChLoop
|
break HandleBlockChLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
stateDiffLocation, err := sds.Extractor.ExtractStateDiff(*parentBlock, *currentBlock)
|
stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number().Int64(), currentBlock.Hash())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err)
|
log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err)
|
||||||
} else {
|
|
||||||
log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation)
|
|
||||||
sds.BlockChain.AddToStateDiffProcessedCollection(parentBlock.Root())
|
|
||||||
sds.BlockChain.AddToStateDiffProcessedCollection(currentBlock.Root())
|
|
||||||
}
|
}
|
||||||
case <-quitCh:
|
rlpBuff := new(bytes.Buffer)
|
||||||
|
currentBlock.EncodeRLP(rlpBuff)
|
||||||
|
blockRlp := rlpBuff.Bytes()
|
||||||
|
payload := StateDiffPayload{
|
||||||
|
BlockRlp: blockRlp,
|
||||||
|
StateDiff: *stateDiff,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
// If we have any websocket subscription listening in, send the data to them
|
||||||
|
sds.Send(payload)
|
||||||
|
case <-sds.QuitChan:
|
||||||
log.Debug("Quitting the statediff block channel")
|
log.Debug("Quitting the statediff block channel")
|
||||||
|
sds.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sds *StateDiffService) Subscribe(id rpc.ID, sub chan<- StateDiffPayload, quitChan chan<- bool) {
|
||||||
|
log.Info("Subscribing to the statediff service")
|
||||||
|
sds.Lock()
|
||||||
|
sds.Subscriptions[id] = subscription{
|
||||||
|
PayloadChan: sub,
|
||||||
|
QuitChan: quitChan,
|
||||||
|
}
|
||||||
|
sds.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sds *StateDiffService) Unsubscribe(id rpc.ID) error {
|
||||||
|
log.Info("Unsubscribing from the statediff service")
|
||||||
|
sds.Lock()
|
||||||
|
_, ok := sds.Subscriptions[id]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id)
|
||||||
|
}
|
||||||
|
delete(sds.Subscriptions, id)
|
||||||
|
sds.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sds *StateDiffService) Send(payload StateDiffPayload) {
|
||||||
|
sds.Lock()
|
||||||
|
for id, sub := range sds.Subscriptions {
|
||||||
|
select {
|
||||||
|
case sub.PayloadChan <- payload:
|
||||||
|
log.Info("sending state diff payload to subscription %s", id)
|
||||||
|
default:
|
||||||
|
log.Info("unable to send payload to subscription %s; channel has no receiver", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sds.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sds *StateDiffService) Close() {
|
||||||
|
sds.Lock()
|
||||||
|
for id, sub := range sds.Subscriptions {
|
||||||
|
select {
|
||||||
|
case sub.QuitChan <- true:
|
||||||
|
delete(sds.Subscriptions, id)
|
||||||
|
log.Info("closing subscription %s", id)
|
||||||
|
default:
|
||||||
|
log.Info("unable to close subscription %s; channel has no receiver", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sds.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (sds *StateDiffService) Start(server *p2p.Server) error {
|
func (sds *StateDiffService) Start(server *p2p.Server) error {
|
||||||
log.Info("Starting statediff service")
|
log.Info("Starting statediff service")
|
||||||
|
|
||||||
@ -114,7 +189,8 @@ func (sds *StateDiffService) Start(server *p2p.Server) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (StateDiffService) Stop() error {
|
func (sds *StateDiffService) Stop() error {
|
||||||
log.Info("Stopping statediff service")
|
log.Info("Stopping statediff service")
|
||||||
|
close(sds.QuitChan)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,6 @@ func testErrorInChainEventLoop(t *testing.T) {
|
|||||||
blockChain := mocks.BlockChain{}
|
blockChain := mocks.BlockChain{}
|
||||||
service := s.StateDiffService{
|
service := s.StateDiffService{
|
||||||
Builder: nil,
|
Builder: nil,
|
||||||
Extractor: &extractor,
|
|
||||||
BlockChain: &blockChain,
|
BlockChain: &blockChain,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,7 +84,6 @@ func testErrorInBlockLoop(t *testing.T) {
|
|||||||
blockChain := mocks.BlockChain{}
|
blockChain := mocks.BlockChain{}
|
||||||
service := s.StateDiffService{
|
service := s.StateDiffService{
|
||||||
Builder: nil,
|
Builder: nil,
|
||||||
Extractor: &extractor,
|
|
||||||
BlockChain: &blockChain,
|
BlockChain: &blockChain,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user