Merge pull request #2 from openrelayxyz/utils-refactor

Utils refactor
This commit is contained in:
AusIV 2021-09-17 16:12:27 -05:00 committed by GitHub
commit 9446654386
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1143 additions and 213 deletions

View File

@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/wrappers"
"gopkg.in/urfave/cli.v1"
)
@ -314,15 +315,19 @@ func geth(ctx *cli.Context) error {
if err := plugins.Initialize(path.Join(ctx.GlobalString(utils.DataDirFlag.Name), "plugins"), ctx); err != nil { return err }
prepare(ctx)
stack, backend := makeFullNode(ctx)
pluginsInitializeNode(stack, backend)
wrapperBackend := wrappers.NewBackend(backend)
pluginsInitializeNode(stack, wrapperBackend)
if ok, err := plugins.RunSubcommand(ctx); ok {
stack.Close()
return err
}
defer stack.Close()
if ok, err := plugins.RunSubcommand(ctx); ok { return err }
if !plugins.ParseFlags(ctx.Args()) {
if args := ctx.Args(); len(args) > 0 {
return fmt.Errorf("invalid command: %q", args[0])
}
}
stack.RegisterAPIs(pluginGetAPIs(stack, backend))
stack.RegisterAPIs(pluginGetAPIs(stack, wrapperBackend))
startNode(ctx, stack, backend)
stack.Wait()

View File

@ -1,50 +1,83 @@
package main
import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/wrappers"
"github.com/ethereum/go-ethereum/rpc"
"github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted"
)
type APILoader func(*node.Node, interfaces.Backend) []rpc.API
func GetAPIsFromLoader(pl *plugins.PluginLoader, stack *node.Node, backend interfaces.Backend) []rpc.API {
result := []rpc.API{}
fnList := pl.Lookup("GetAPIs", func(item interface{}) bool {
_, ok := item.(func(*node.Node, interfaces.Backend) []rpc.API)
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*node.Node, interfaces.Backend) []rpc.API); ok {
result = append(result, fn(stack, backend)...)
}
}
return result
func apiTranslate(apis []core.API) []rpc.API {
result := make([]rpc.API, len(apis))
for i, api := range apis {
result[i] = rpc.API{
Namespace: api.Namespace,
Version: api.Version,
Service: api.Service,
Public: api.Public,
}
}
return result
}
func pluginGetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API {
func GetAPIsFromLoader(pl *plugins.PluginLoader, stack *node.Node, backend restricted.Backend) []rpc.API {
result := []core.API{}
fnList := pl.Lookup("GetAPIs", func(item interface{}) bool {
switch item.(type) {
case func(core.Node, restricted.Backend) []core.API:
return true
case func(core.Node, core.Backend) []core.API:
return true
default:
return false
}
})
for _, fni := range fnList {
switch fn := fni.(type) {
case func(core.Node, restricted.Backend) []core.API:
result = append(result, fn(wrappers.NewNode(stack), backend)...)
case func(core.Node, core.Backend) []core.API:
result = append(result, fn(wrappers.NewNode(stack), backend)...)
default:
}
}
return apiTranslate(result)
}
func pluginGetAPIs(stack *node.Node, backend restricted.Backend) []rpc.API {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting GetAPIs, but default PluginLoader has not been initialized")
return []rpc.API{}
}
}
return GetAPIsFromLoader(plugins.DefaultPluginLoader, stack, backend)
}
func InitializeNode(pl *plugins.PluginLoader, stack *node.Node, backend interfaces.Backend) {
fnList := pl.Lookup("InitializeNode", func(item interface{}) bool {
_, ok := item.(func(*node.Node, interfaces.Backend))
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*node.Node, interfaces.Backend)); ok {
fn(stack, backend)
}
}
func InitializeNode(pl *plugins.PluginLoader, stack *node.Node, backend restricted.Backend) {
fnList := pl.Lookup("InitializeNode", func(item interface{}) bool {
switch item.(type) {
case func(core.Node, restricted.Backend):
return true
case func(core.Node, core.Backend):
return true
default:
return false
}
})
for _, fni := range fnList {
switch fn := fni.(type) {
case func(core.Node, restricted.Backend):
fn(wrappers.NewNode(stack), backend)
case func(core.Node, core.Backend):
fn(wrappers.NewNode(stack), backend)
default:
}
}
}
func pluginsInitializeNode(stack *node.Node, backend interfaces.Backend) {
func pluginsInitializeNode(stack *node.Node, backend restricted.Backend) {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting InitializeNode, but default PluginLoader has not been initialized")
return

View File

@ -0,0 +1,29 @@
package core
import (
"testing"
"github.com/ethereum/go-ethereum/plugins"
"github.com/openrelayxyz/plugeth-utils/core"
)
func TestReorgLongHeadersHook(t *testing.T) {
invoked := false
done := plugins.HookTester("NewHead", func(b []byte, h core.Hash, logs [][]byte) {
invoked = true
if b == nil {
t.Errorf("Expected block to be non-nil")
}
if h == (core.Hash{}) {
t.Errorf("Expected hash to be non-empty")
}
if len(logs) > 0 {
t.Errorf("Expected some logs")
}
})
defer done()
testReorgLong(t, true)
if !invoked {
t.Errorf("Expected plugin invocation")
}
}

View File

@ -1,20 +1,24 @@
package core
import (
"encoding/json"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/openrelayxyz/plugeth-utils/core"
)
func PluginPreProcessBlock(pl *plugins.PluginLoader, block *types.Block) {
fnList := pl.Lookup("PreProcessBlock", func(item interface{}) bool {
_, ok := item.(func(*types.Block))
_, ok := item.(func(core.Hash, uint64, []byte))
return ok
})
encoded, _ := rlp.EncodeToBytes(block)
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Block)); ok {
fn(block)
if fn, ok := fni.(func(core.Hash, uint64, []byte)); ok {
fn(core.Hash(block.Hash()), block.NumberU64(), encoded)
}
}
}
@ -27,12 +31,13 @@ func pluginPreProcessBlock(block *types.Block) {
}
func PluginPreProcessTransaction(pl *plugins.PluginLoader, tx *types.Transaction, block *types.Block, i int) {
fnList := pl.Lookup("PreProcessTransaction", func(item interface{}) bool {
_, ok := item.(func(*types.Transaction, *types.Block, int))
_, ok := item.(func([]byte, core.Hash, core.Hash, int))
return ok
})
txBytes, _ := tx.MarshalBinary()
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Transaction, *types.Block, int)); ok {
fn(tx, block, i)
if fn, ok := fni.(func([]byte, core.Hash, core.Hash, int)); ok {
fn(txBytes, core.Hash(tx.Hash()), core.Hash(block.Hash()), i)
}
}
}
@ -45,12 +50,12 @@ func pluginPreProcessTransaction(tx *types.Transaction, block *types.Block, i in
}
func PluginBlockProcessingError(pl *plugins.PluginLoader, tx *types.Transaction, block *types.Block, err error) {
fnList := pl.Lookup("BlockProcessingError", func(item interface{}) bool {
_, ok := item.(func(*types.Transaction, *types.Block, error))
_, ok := item.(func(core.Hash, core.Hash, error))
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Transaction, *types.Block, error)); ok {
fn(tx, block, err)
if fn, ok := fni.(func(core.Hash, core.Hash, error)); ok {
fn(core.Hash(tx.Hash()), core.Hash(block.Hash()), err)
}
}
}
@ -63,12 +68,13 @@ func pluginBlockProcessingError(tx *types.Transaction, block *types.Block, err e
}
func PluginPostProcessTransaction(pl *plugins.PluginLoader, tx *types.Transaction, block *types.Block, i int, receipt *types.Receipt) {
fnList := pl.Lookup("PostProcessTransaction", func(item interface{}) bool {
_, ok := item.(func(*types.Transaction, *types.Block, int, *types.Receipt))
_, ok := item.(func(core.Hash, core.Hash, int, []byte))
return ok
})
receiptBytes, _ := json.Marshal(receipt)
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Transaction, *types.Block, int, *types.Receipt)); ok {
fn(tx, block, i, receipt)
if fn, ok := fni.(func(core.Hash, core.Hash, int, []byte)); ok {
fn(core.Hash(tx.Hash()), core.Hash(block.Hash()), i, receiptBytes)
}
}
}
@ -81,12 +87,12 @@ func pluginPostProcessTransaction(tx *types.Transaction, block *types.Block, i i
}
func PluginPostProcessBlock(pl *plugins.PluginLoader, block *types.Block) {
fnList := pl.Lookup("PostProcessBlock", func(item interface{}) bool {
_, ok := item.(func(*types.Block))
_, ok := item.(func(core.Hash))
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Block)); ok {
fn(block)
if fn, ok := fni.(func(core.Hash)); ok {
fn(core.Hash(block.Hash()))
}
}
}
@ -101,12 +107,17 @@ func pluginPostProcessBlock(block *types.Block) {
func PluginNewHead(pl *plugins.PluginLoader, block *types.Block, hash common.Hash, logs []*types.Log) {
fnList := pl.Lookup("NewHead", func(item interface{}) bool {
_, ok := item.(func(*types.Block, common.Hash, []*types.Log))
_, ok := item.(func([]byte, core.Hash, [][]byte))
return ok
})
blockBytes, _ := rlp.EncodeToBytes(block)
logBytes := make([][]byte, len(logs))
for i, l := range logs {
logBytes[i], _ = rlp.EncodeToBytes(l)
}
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Block, common.Hash, []*types.Log)); ok {
fn(block, hash, logs)
if fn, ok := fni.(func([]byte, core.Hash, [][]byte)); ok {
fn(blockBytes, core.Hash(hash), logBytes)
}
}
}
@ -120,12 +131,17 @@ func pluginNewHead(block *types.Block, hash common.Hash, logs []*types.Log) {
func PluginNewSideBlock(pl *plugins.PluginLoader, block *types.Block, hash common.Hash, logs []*types.Log) {
fnList := pl.Lookup("NewSideBlock", func(item interface{}) bool {
_, ok := item.(func(*types.Block, common.Hash, []*types.Log))
_, ok := item.(func([]byte, core.Hash, [][]byte))
return ok
})
blockBytes, _ := rlp.EncodeToBytes(block)
logBytes := make([][]byte, len(logs))
for i, l := range logs {
logBytes[i], _ = rlp.EncodeToBytes(l)
}
for _, fni := range fnList {
if fn, ok := fni.(func(*types.Block, common.Hash, []*types.Log)); ok {
fn(block, hash, logs)
if fn, ok := fni.(func([]byte, core.Hash, [][]byte)); ok {
fn(blockBytes, core.Hash(hash), logBytes)
}
}
}
@ -139,12 +155,20 @@ func pluginNewSideBlock(block *types.Block, hash common.Hash, logs []*types.Log)
func PluginReorg(pl *plugins.PluginLoader, commonBlock *types.Block, oldChain, newChain types.Blocks) {
fnList := pl.Lookup("Reorg", func(item interface{}) bool {
_, ok := item.(func(common *types.Block, oldChain, newChain types.Blocks))
_, ok := item.(func(core.Hash, []core.Hash, []core.Hash))
return ok
})
oldChainHashes := make([]core.Hash, len(oldChain))
for i, block := range oldChain {
oldChainHashes[i] = core.Hash(block.Hash())
}
newChainHashes := make([]core.Hash, len(newChain))
for i, block := range newChain {
newChainHashes[i] = core.Hash(block.Hash())
}
for _, fni := range fnList {
if fn, ok := fni.(func(common *types.Block, oldChain, newChain types.Blocks)); ok {
fn(commonBlock, oldChain, newChain)
if fn, ok := fni.(func(core.Hash, []core.Hash, []core.Hash)); ok {
fn(core.Hash(commonBlock.Hash()), oldChainHashes, newChainHashes)
}
}
}

View File

@ -1,27 +1,44 @@
package state
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/plugins"
"github.com/openrelayxyz/plugeth-utils/core"
)
func PluginStateUpdate(pl *plugins.PluginLoader, blockRoot, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) {
fnList := pl.Lookup("StateUpdate", func(item interface{}) bool {
_, ok := item.(func(common.Hash, common.Hash, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte))
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(common.Hash, common.Hash, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)); ok {
fn(blockRoot, parentRoot, destructs, accounts, storage)
}
}
fnList := pl.Lookup("StateUpdate", func(item interface{}) bool {
_, ok := item.(func(core.Hash, core.Hash, map[core.Hash]struct{}, map[core.Hash][]byte, map[core.Hash]map[core.Hash][]byte))
return ok
})
coreDestructs := make(map[core.Hash]struct{})
for k, v := range destructs {
coreDestructs[core.Hash(k)] = v
}
coreAccounts := make(map[core.Hash][]byte)
for k, v := range accounts {
coreAccounts[core.Hash(k)] = v
}
coreStorage := make(map[core.Hash]map[core.Hash][]byte)
for k, v := range storage {
coreStorage[core.Hash(k)] = make(map[core.Hash][]byte)
for h, d := range v {
coreStorage[core.Hash(k)][core.Hash(h)] = d
}
}
for _, fni := range fnList {
if fn, ok := fni.(func(core.Hash, core.Hash, map[core.Hash]struct{}, map[core.Hash][]byte, map[core.Hash]map[core.Hash][]byte)); ok {
fn(core.Hash(blockRoot), core.Hash(parentRoot), coreDestructs, coreAccounts, coreStorage)
}
}
}
func pluginStateUpdate(blockRoot, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting StateUpdate, but default PluginLoader has not been initialized")
return
}
return
}
PluginStateUpdate(plugins.DefaultPluginLoader, blockRoot, parentRoot, destructs, accounts, storage)
}

5
core/vm/plugin_hooks.go Normal file
View File

@ -0,0 +1,5 @@
package vm
func (st *Stack) Len() int {
return len(st.data)
}

View File

@ -145,7 +145,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
chainDb: chainDb,
eventMux: stack.EventMux(),
accountManager: stack.AccountManager(),
engine: pluginCreateConsensusEngine(stack, chainConfig, &ethashConfig, config.Miner.Notify, config.Miner.Noverify, chainDb),
engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &ethashConfig, config.Miner.Notify, config.Miner.Noverify, chainDb),
closeBloomHandler: make(chan struct{}),
networkID: config.NetworkId,
gasPrice: config.Miner.GasPrice,

View File

@ -1,106 +1,71 @@
package eth
import (
"math/big"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"reflect"
"time"
"math/big"
"reflect"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/wrappers"
"github.com/openrelayxyz/plugeth-utils/core"
)
func PluginCreateConsensusEngine(pl *plugins.PluginLoader, stack *node.Node, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database) consensus.Engine {
fnList := pl.Lookup("CreateConsensusEngine", func(item interface{}) bool {
_, ok := item.(func(*node.Node, *params.ChainConfig, *ethash.Config, []string, bool, ethdb.Database) consensus.Engine)
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*node.Node, *params.ChainConfig, *ethash.Config, []string, bool, ethdb.Database) consensus.Engine); ok {
return fn(stack, chainConfig, config, notify, noverify, db)
}
}
return ethconfig.CreateConsensusEngine(stack, chainConfig, config, notify, noverify, db)
}
func pluginCreateConsensusEngine(stack *node.Node, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database) consensus.Engine {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting CreateConsensusEngine, but default PluginLoader has not been initialized")
return ethconfig.CreateConsensusEngine(stack, chainConfig, config, notify, noverify, db)
}
return PluginCreateConsensusEngine(plugins.DefaultPluginLoader, stack, chainConfig, config, notify, noverify, db)
}
type metaTracer struct{
tracers []vm.Tracer
type metaTracer struct {
tracers []core.TracerResult
}
func (mt *metaTracer) CaptureStart(env *vm.EVM, from common.Address, to common.Address, create bool, input []byte, gas uint64, value *big.Int) {
for _, tracer := range mt.tracers {
tracer.CaptureStart(env, from, to, create, input, gas, value)
}
for _, tracer := range mt.tracers {
tracer.CaptureStart(core.Address(from), core.Address(to), create, input, gas, value)
}
}
func (mt *metaTracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, scope *vm.ScopeContext, rData []byte, depth int, err error) {
for _, tracer := range mt.tracers {
tracer.CaptureState(env, pc, op, gas, cost, scope, rData, depth, err)
}
for _, tracer := range mt.tracers {
tracer.CaptureState(pc, core.OpCode(op), gas, cost, wrappers.NewWrappedScopeContext(scope), rData, depth, err)
}
}
func (mt *metaTracer) CaptureFault(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, scope *vm.ScopeContext, depth int, err error) {
for _, tracer := range mt.tracers {
tracer.CaptureFault(env, pc, op, gas, cost, scope, depth, err)
}
for _, tracer := range mt.tracers {
tracer.CaptureFault(pc, core.OpCode(op), gas, cost, wrappers.NewWrappedScopeContext(scope), depth, err)
}
}
func (mt *metaTracer) CaptureEnd(output []byte, gasUsed uint64, t time.Duration, err error) {
for _, tracer := range mt.tracers {
tracer.CaptureEnd(output, gasUsed, t, err)
}
for _, tracer := range mt.tracers {
tracer.CaptureEnd(output, gasUsed, t, err)
}
}
func PluginUpdateBlockchainVMConfig(pl *plugins.PluginLoader, cfg *vm.Config) {
tracerList := plugins.Lookup("LiveTracer", func(item interface{}) bool {
_, ok := item.(*vm.Tracer)
log.Info("Item is LiveTracer", "ok", ok, "type", reflect.TypeOf(item))
return ok
})
if len(tracerList) > 0 {
mt := &metaTracer{tracers: []vm.Tracer{}}
for _, tracer := range(tracerList) {
if v, ok := tracer.(*vm.Tracer); ok {
log.Info("LiveTracer registered")
mt.tracers = append(mt.tracers, *v)
} else {
log.Info("Item is not tracer")
}
}
cfg.Debug = true
cfg.Tracer = mt
} else {
log.Warn("Module is not tracer")
}
tracerList := plugins.Lookup("LiveTracer", func(item interface{}) bool {
_, ok := item.(*vm.Tracer)
log.Info("Item is LiveTracer", "ok", ok, "type", reflect.TypeOf(item))
return ok
})
if len(tracerList) > 0 {
mt := &metaTracer{tracers: []core.TracerResult{}}
for _, tracer := range tracerList {
if v, ok := tracer.(core.TracerResult); ok {
log.Info("LiveTracer registered")
mt.tracers = append(mt.tracers, v)
} else {
log.Info("Item is not tracer")
}
}
cfg.Debug = true
cfg.Tracer = mt
} else {
log.Warn("Module is not tracer")
}
fnList := plugins.Lookup("UpdateBlockchainVMConfig", func(item interface{}) bool {
_, ok := item.(func(*vm.Config))
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*vm.Config)); ok {
fn(cfg)
return
}
}
}
func pluginUpdateBlockchainVMConfig(cfg *vm.Config) {
if plugins.DefaultPluginLoader == nil {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting CreateConsensusEngine, but default PluginLoader has not been initialized")
return
}
PluginUpdateBlockchainVMConfig(plugins.DefaultPluginLoader, cfg)
return
}
PluginUpdateBlockchainVMConfig(plugins.DefaultPluginLoader, cfg)
}

View File

@ -1,34 +1,42 @@
package tracers
import (
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/log"
"reflect"
"reflect"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/ethereum/go-ethereum/plugins/wrappers"
"github.com/openrelayxyz/plugeth-utils/core"
)
func GetPluginTracer(pl *plugins.PluginLoader, name string) (func(*state.StateDB)interfaces.TracerResult, bool) {
tracers := pl.Lookup("Tracers", func(item interface{}) bool {
_, ok := item.(*map[string]func(*state.StateDB)interfaces.TracerResult)
if !ok { log.Warn("Found tracer that did not match type", "tracer", reflect.TypeOf(item) ) }
return ok
})
for _, tmap := range tracers {
if tracerMap, ok := tmap.(*map[string]func(*state.StateDB)interfaces.TracerResult); ok {
if tracer, ok := (*tracerMap)[name]; ok {
return tracer, true
}
}
}
log.Info("Tracer not found", "name", name, "tracers", len(tracers))
return nil, false
func GetPluginTracer(pl *plugins.PluginLoader, name string) (func(*state.StateDB) interfaces.TracerResult, bool) {
tracers := pl.Lookup("Tracers", func(item interface{}) bool {
_, ok := item.(*map[string]func(core.StateDB) core.TracerResult)
if !ok {
log.Warn("Found tracer that did not match type", "tracer", reflect.TypeOf(item))
}
return ok
})
for _, tmap := range tracers {
if tracerMap, ok := tmap.(*map[string]func(core.StateDB) core.TracerResult); ok {
if tracer, ok := (*tracerMap)[name]; ok {
return func(sdb *state.StateDB) interfaces.TracerResult {
return wrappers.NewWrappedTracer(tracer(wrappers.NewWrappedStateDB(sdb)))
}, true
}
}
}
log.Info("Tracer not found", "name", name, "tracers", len(tracers))
return nil, false
}
func getPluginTracer(name string) (func(*state.StateDB)interfaces.TracerResult, bool) {
if plugins.DefaultPluginLoader == nil {
func getPluginTracer(name string) (func(*state.StateDB) interfaces.TracerResult, bool) {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting GetPluginTracer, but default PluginLoader has not been initialized")
return nil, false
}
return GetPluginTracer(plugins.DefaultPluginLoader, name)
return nil, false
}
return GetPluginTracer(plugins.DefaultPluginLoader, name)
}

3
go.mod
View File

@ -49,6 +49,7 @@ require (
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/olekukonko/tablewriter v0.0.5
github.com/openrelayxyz/plugeth-utils v0.0.6
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/prometheus/tsdb v0.7.1
github.com/rjeczalik/notify v0.9.1
@ -69,3 +70,5 @@ require (
gopkg.in/urfave/cli.v1 v1.20.0
gotest.tools v2.2.0+incompatible // indirect
)
// replace github.com/openrelayxyz/plugeth-utils => /home/philip/src/rivet/plugeth-utils

2
go.sum
View File

@ -303,6 +303,8 @@ github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/openrelayxyz/plugeth-utils v0.0.6 h1:bnoUyRBrxbkfd5Zn89X1D6zEQicpPY3qW81iBrq+6e4=
github.com/openrelayxyz/plugeth-utils v0.0.6/go.mod h1:Lv47unyKJ3b/PVbVAt9Uk+RQmpdrzDOsjSCPhAMQAps=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=

19
plugins/hooktester.go Normal file
View File

@ -0,0 +1,19 @@
package plugins
// type PluginLoader struct{
// Plugins []*plugin.Plugin
// Subcommands map[string]Subcommand
// Flags []*flag.FlagSet
// LookupCache map[string][]interface{}
// }
func HookTester(name string, fn interface{}) func() {
oldDefault := DefaultPluginLoader
DefaultPluginLoader = &PluginLoader{
LookupCache: map[string][]interface{}{
name: []interface{}{fn},
},
}
return func() { DefaultPluginLoader = oldDefault }
}

View File

@ -4,7 +4,6 @@ import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
@ -26,7 +25,6 @@ type Backend interface {
Downloader() *downloader.Downloader
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
ChainDb() ethdb.Database
AccountManager() *accounts.Manager
ExtRPCEnabled() bool
RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection
RPCTxFeeCap() float64 // global tx fee cap for all transaction related APIs

View File

@ -1,30 +1,34 @@
package plugins
import (
"plugin"
"github.com/ethereum/go-ethereum/log"
"gopkg.in/urfave/cli.v1"
"flag"
"io/ioutil"
"strings"
"path"
"fmt"
"reflect"
)
"github.com/openrelayxyz/plugeth-utils/core"
"flag"
"fmt"
"io/ioutil"
"path"
"plugin"
"reflect"
"strings"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/event"
"gopkg.in/urfave/cli.v1"
)
type Subcommand func(*cli.Context, []string) error
type PluginLoader struct{
Plugins []*plugin.Plugin
type PluginLoader struct {
Plugins []*plugin.Plugin
Subcommands map[string]Subcommand
Flags []*flag.FlagSet
Flags []*flag.FlagSet
LookupCache map[string][]interface{}
}
func (pl *PluginLoader) Lookup(name string, validate func(interface{}) bool) []interface{} {
if v, ok := pl.LookupCache[name]; ok { return v }
if v, ok := pl.LookupCache[name]; ok {
return v
}
results := []interface{}{}
for _, plugin := range pl.Plugins {
if v, err := plugin.Lookup(name); err == nil {
@ -45,19 +49,14 @@ func Lookup(name string, validate func(interface{}) bool) []interface{} {
return DefaultPluginLoader.Lookup(name, validate)
}
var DefaultPluginLoader *PluginLoader
func NewPluginLoader(target string) (*PluginLoader, error) {
pl := &PluginLoader{
Plugins: []*plugin.Plugin{},
// RPCPlugins: []APILoader{},
Subcommands: make(map[string]Subcommand),
Flags: []*flag.FlagSet{},
Flags: []*flag.FlagSet{},
LookupCache: make(map[string][]interface{}),
// CreateConsensusEngine: ethconfig.CreateConsensusEngine,
// UpdateBlockchainVMConfig: func(cfg *vm.Config) {},
}
files, err := ioutil.ReadDir(target)
if err != nil {
@ -106,33 +105,41 @@ func NewPluginLoader(target string) (*PluginLoader, error) {
func Initialize(target string, ctx *cli.Context) (err error) {
DefaultPluginLoader, err = NewPluginLoader(target)
if err != nil { return err }
if err != nil {
return err
}
DefaultPluginLoader.Initialize(ctx)
return nil
}
func (pl *PluginLoader) Initialize(ctx *cli.Context) {
fns := pl.Lookup("Initialize", func(i interface{}) bool {
_, ok := i.(func(*cli.Context, *PluginLoader))
_, ok := i.(func(*cli.Context, core.PluginLoader, core.Logger))
return ok
})
for _, fni := range fns {
if fn, ok := fni.(func(*cli.Context, *PluginLoader)); ok {
fn(ctx, pl)
if fn, ok := fni.(func(*cli.Context, core.PluginLoader, core.Logger)); ok {
fn(ctx, pl, log.Root())
}
}
}
func (pl *PluginLoader) RunSubcommand(ctx *cli.Context) (bool, error) {
args := ctx.Args()
if len(args) == 0 { return false, fmt.Errorf("No subcommand arguments")}
if len(args) == 0 {
return false, fmt.Errorf("no subcommand arguments")
}
subcommand, ok := pl.Subcommands[args[0]]
if !ok { return false, fmt.Errorf("Subcommand %v does not exist", args[0])}
if !ok {
return false, fmt.Errorf("Subcommand %v does not exist", args[0])
}
return true, subcommand(ctx, args[1:])
}
func RunSubcommand(ctx *cli.Context) (bool, error) {
if DefaultPluginLoader == nil { return false, fmt.Errorf("Plugin loader not initialized") }
if DefaultPluginLoader == nil {
return false, fmt.Errorf("Plugin loader not initialized")
}
return DefaultPluginLoader.RunSubcommand(ctx)
}
@ -150,3 +157,20 @@ func ParseFlags(args []string) bool {
}
return DefaultPluginLoader.ParseFlags(args)
}
type feedWrapper struct {
feed *event.Feed
}
func (f *feedWrapper) Send(item interface{}) int {
return f.feed.Send(item)
}
func (f *feedWrapper) Subscribe(ch interface{}) core.Subscription {
return f.feed.Subscribe(ch)
}
func (pl *PluginLoader) GetFeed() core.Feed {
return &feedWrapper{&event.Feed{}}
}

View File

@ -0,0 +1,604 @@
package wrappers
import (
"context"
"encoding/json"
"fmt"
"math/big"
"reflect"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
gcore "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted"
"github.com/openrelayxyz/plugeth-utils/restricted/params"
)
type WrappedScopeContext struct {
s *vm.ScopeContext
}
func NewWrappedScopeContext(s *vm.ScopeContext) *WrappedScopeContext {
return &WrappedScopeContext{s}
}
func (w *WrappedScopeContext) Memory() core.Memory {
return w.s.Memory
}
func (w *WrappedScopeContext) Stack() core.Stack {
return w.s.Stack
}
func (w *WrappedScopeContext) Contract() core.Contract {
return &WrappedContract{w.s.Contract}
}
type WrappedContract struct {
c *vm.Contract
}
func (w *WrappedContract) AsDelegate() core.Contract {
return &WrappedContract{w.c.AsDelegate()}
}
func (w *WrappedContract) GetOp(n uint64) core.OpCode {
return core.OpCode(w.c.GetOp(n))
}
func (w *WrappedContract) GetByte(n uint64) byte {
return w.c.GetByte(n)
}
func (w *WrappedContract) Caller() core.Address {
return core.Address(w.c.Caller())
}
func (w *WrappedContract) Address() core.Address {
return core.Address(w.c.Address())
}
func (w *WrappedContract) Value() *big.Int {
return w.c.Value()
}
// added UseGas bc compiler compained without it. Should investigate if the false return with effect performance.
// take this out of core.interface
func (w *WrappedContract) UseGas(gas uint64) (ok bool) {
return false
}
type WrappedTracer struct {
r core.TracerResult
}
func NewWrappedTracer(r core.TracerResult) *WrappedTracer {
return &WrappedTracer{r}
}
func (w WrappedTracer) CaptureStart(env *vm.EVM, from common.Address, to common.Address, create bool, input []byte, gas uint64, value *big.Int) {
w.r.CaptureStart(core.Address(from), core.Address(to), create, input, gas, value)
}
func (w WrappedTracer) CaptureState(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, scope *vm.ScopeContext, rData []byte, depth int, err error) {
w.r.CaptureState(pc, core.OpCode(op), gas, cost, &WrappedScopeContext{scope}, rData, depth, err)
}
func (w WrappedTracer) CaptureFault(env *vm.EVM, pc uint64, op vm.OpCode, gas, cost uint64, scope *vm.ScopeContext, depth int, err error) {
w.r.CaptureFault(pc, core.OpCode(op), gas, cost, &WrappedScopeContext{scope}, depth, err)
}
func (w WrappedTracer) CaptureEnd(output []byte, gasUsed uint64, t time.Duration, err error) {
w.r.CaptureEnd(output, gasUsed, t, err)
}
func (w WrappedTracer) GetResult() (interface{}, error) {
return w.r.Result()
}
type WrappedStateDB struct {
s *state.StateDB
}
func NewWrappedStateDB(d *state.StateDB) *WrappedStateDB {
return &WrappedStateDB{d}
}
// GetBalance(Address) *big.Int
func (w *WrappedStateDB) GetBalance(addr core.Address) *big.Int {
return w.s.GetBalance(common.Address(addr))
}
// GetNonce(Address) uint64
func (w *WrappedStateDB) GetNonce(addr core.Address) uint64 {
return w.s.GetNonce(common.Address(addr))
}
// GetCodeHash(Address) Hash
func (w *WrappedStateDB) GetCodeHash(addr core.Address) core.Hash {
return core.Hash(w.s.GetCodeHash(common.Address(addr)))
} // sort this out
// GetCode(Address) []byte
func (w *WrappedStateDB) GetCode(addr core.Address) []byte {
return w.s.GetCode(common.Address(addr))
}
// GetCodeSize(Address) int
func (w *WrappedStateDB) GetCodeSize(addr core.Address) int {
return w.s.GetCodeSize(common.Address(addr))
}
//GetRefund() uint64
func (w *WrappedStateDB) GetRefund() uint64 { //are we sure we want to include this? getting a refund seems like changing state
return w.s.GetRefund()
}
// GetCommittedState(Address, Hash) Hash
func (w *WrappedStateDB) GetCommittedState(addr core.Address, hsh core.Hash) core.Hash {
return core.Hash(w.s.GetCommittedState(common.Address(addr), common.Hash(hsh)))
}
// GetState(Address, Hash) Hash
func (w *WrappedStateDB) GetState(addr core.Address, hsh core.Hash) core.Hash {
return core.Hash(w.s.GetState(common.Address(addr), common.Hash(hsh)))
}
// HasSuicided(Address) bool
func (w *WrappedStateDB) HasSuicided(addr core.Address) bool { // I figured we'd skip some of the future labor and update the name now
return w.s.HasSuicided(common.Address(addr))
}
// // Exist reports whether the given account exists in state.
// // Notably this should also return true for suicided accounts.
// Exist(Address) bool
func (w *WrappedStateDB) Exist(addr core.Address) bool {
return w.s.Exist(common.Address(addr))
}
// // Empty returns whether the given account is empty. Empty
// // is defined according to EIP161 (balance = nonce = code = 0).
// Empty(Address) bool
func (w *WrappedStateDB) Empty(addr core.Address) bool {
return w.s.Empty(common.Address(addr))
}
// AddressInAccessList(addr Address) bool
func (w *WrappedStateDB) AddressInAccessList(addr core.Address) bool {
return w.s.AddressInAccessList(common.Address(addr))
}
// SlotInAccessList(addr Address, slot Hash) (addressOk bool, slotOk bool)
func (w *WrappedStateDB) SlotInAccessList(addr core.Address, slot core.Hash) (addressOK, slotOk bool) {
return w.s.SlotInAccessList(common.Address(addr), common.Hash(slot))
}
type Node struct {
n *node.Node
}
func NewNode(n *node.Node) *Node {
return &Node{n}
}
func (n *Node) Server() core.Server {
return n.n.Server()
}
func (n *Node) DataDir() string {
return n.n.DataDir()
}
func (n *Node) InstanceDir() string {
return n.n.InstanceDir()
}
func (n *Node) IPCEndpoint() string {
return n.n.IPCEndpoint()
}
func (n *Node) HTTPEndpoint() string {
return n.n.HTTPEndpoint()
}
func (n *Node) WSEndpoint() string {
return n.n.WSEndpoint()
}
func (n *Node) ResolvePath(x string) string {
return n.n.ResolvePath(x)
}
type Backend struct {
b interfaces.Backend
newTxsFeed event.Feed
newTxsOnce sync.Once
chainFeed event.Feed
chainOnce sync.Once
chainHeadFeed event.Feed
chainHeadOnce sync.Once
chainSideFeed event.Feed
chainSideOnce sync.Once
logsFeed event.Feed
logsOnce sync.Once
pendingLogsFeed event.Feed
pendingLogsOnce sync.Once
removedLogsFeed event.Feed
removedLogsOnce sync.Once
chainConfig *params.ChainConfig
}
func NewBackend(b interfaces.Backend) *Backend {
return &Backend{b: b}
}
func (b *Backend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
return b.b.SuggestGasTipCap(ctx)
}
func (b *Backend) ChainDb() restricted.Database {
return b.b.ChainDb()
}
func (b *Backend) ExtRPCEnabled() bool {
return b.b.ExtRPCEnabled()
}
func (b *Backend) RPCGasCap() uint64 {
return b.b.RPCGasCap()
}
func (b *Backend) RPCTxFeeCap() float64 {
return b.b.RPCTxFeeCap()
}
func (b *Backend) UnprotectedAllowed() bool {
return b.b.UnprotectedAllowed()
}
func (b *Backend) SetHead(number uint64) {
b.b.SetHead(number)
}
func (b *Backend) HeaderByNumber(ctx context.Context, number int64) ([]byte, error) {
header, err := b.b.HeaderByNumber(ctx, rpc.BlockNumber(number))
if err != nil {
return nil, err
}
return rlp.EncodeToBytes(header)
}
func (b *Backend) HeaderByHash(ctx context.Context, hash core.Hash) ([]byte, error) {
header, err := b.b.HeaderByHash(ctx, common.Hash(hash))
if err != nil {
return nil, err
}
return rlp.EncodeToBytes(header)
}
func (b *Backend) CurrentHeader() []byte {
ret, _ := rlp.EncodeToBytes(b.b.CurrentHeader())
return ret
}
func (b *Backend) CurrentBlock() []byte {
ret, _ := rlp.EncodeToBytes(b.b.CurrentBlock())
return ret
}
func (b *Backend) BlockByNumber(ctx context.Context, number int64) ([]byte, error) {
block, err := b.b.BlockByNumber(ctx, rpc.BlockNumber(number))
if err != nil {
return nil, err
}
return rlp.EncodeToBytes(block)
}
func (b *Backend) BlockByHash(ctx context.Context, hash core.Hash) ([]byte, error) {
block, err := b.b.BlockByHash(ctx, common.Hash(hash))
if err != nil {
return nil, err
}
return rlp.EncodeToBytes(block)
}
func (b *Backend) GetReceipts(ctx context.Context, hash core.Hash) ([]byte, error) {
receipts, err := b.b.GetReceipts(ctx, common.Hash(hash))
if err != nil {
return nil, err
}
return json.Marshal(receipts)
}
func (b *Backend) GetTd(ctx context.Context, hash core.Hash) *big.Int {
return b.b.GetTd(ctx, common.Hash(hash))
}
func (b *Backend) SendTx(ctx context.Context, signedTx []byte) error {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(signedTx); err != nil {
return err
}
return b.b.SendTx(ctx, tx)
}
func (b *Backend) GetTransaction(ctx context.Context, txHash core.Hash) ([]byte, core.Hash, uint64, uint64, error) { // RLP Encoded transaction {
tx, blockHash, blockNumber, index, err := b.b.GetTransaction(ctx, common.Hash(txHash))
if err != nil {
return nil, core.Hash(blockHash), blockNumber, index, err
}
enc, err := tx.MarshalBinary()
return enc, core.Hash(blockHash), blockNumber, index, err
}
func (b *Backend) GetPoolTransactions() ([][]byte, error) {
txs, err := b.b.GetPoolTransactions()
if err != nil {
return nil, err
}
results := make([][]byte, len(txs))
for i, tx := range txs {
results[i], _ = rlp.EncodeToBytes(tx)
}
return results, nil
}
func (b *Backend) GetPoolTransaction(txHash core.Hash) []byte {
tx := b.b.GetPoolTransaction(common.Hash(txHash))
if tx == nil {
return []byte{}
}
enc, _ := rlp.EncodeToBytes(tx)
return enc
}
func (b *Backend) GetPoolNonce(ctx context.Context, addr core.Address) (uint64, error) {
return b.b.GetPoolNonce(ctx, common.Address(addr))
}
func (b *Backend) Stats() (pending int, queued int) {
return b.b.Stats()
}
func (b *Backend) TxPoolContent() (map[core.Address][][]byte, map[core.Address][][]byte) {
pending, queued := b.b.TxPoolContent()
trpending, trqueued := make(map[core.Address][][]byte), make(map[core.Address][][]byte)
for k, v := range pending {
trpending[core.Address(k)] = make([][]byte, len(v))
for i, tx := range v {
trpending[core.Address(k)][i], _ = tx.MarshalBinary()
}
}
for k, v := range queued {
trqueued[core.Address(k)] = make([][]byte, len(v))
for i, tx := range v {
trpending[core.Address(k)][i], _ = tx.MarshalBinary()
}
}
return trpending, trqueued
} // RLP encoded transactions
func (b *Backend) BloomStatus() (uint64, uint64) {
return b.b.BloomStatus()
}
func (b *Backend) GetLogs(ctx context.Context, blockHash core.Hash) ([][]byte, error) {
logs, err := b.b.GetLogs(ctx, common.Hash(blockHash))
if err != nil {
return nil, err
}
encLogs := make([][]byte, len(logs))
for i, log := range logs {
encLogs[i], _ = rlp.EncodeToBytes(log)
}
return encLogs, nil
} // []RLP encoded logs
type dl struct {
dl *downloader.Downloader
}
type progress struct {
p ethereum.SyncProgress
}
func (p *progress) StartingBlock() uint64 {
return p.p.StartingBlock
}
func (p *progress) CurrentBlock() uint64 {
return p.p.CurrentBlock
}
func (p *progress) HighestBlock() uint64 {
return p.p.HighestBlock
}
func (p *progress) PulledStates() uint64 {
return p.p.PulledStates
}
func (p *progress) KnownStates() uint64 {
return p.p.KnownStates
}
func (d *dl) Progress() core.Progress {
return &progress{d.dl.Progress()}
}
func (b *Backend) Downloader() core.Downloader {
return &dl{b.b.Downloader()}
}
func (b *Backend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) core.Subscription {
var sub event.Subscription
b.newTxsOnce.Do(func() {
bch := make(chan gcore.NewTxsEvent, 100)
sub = b.b.SubscribeNewTxsEvent(bch)
go func() {
for {
select {
case item := <-bch:
txe := core.NewTxsEvent{
Txs: make([][]byte, len(item.Txs)),
}
for i, tx := range item.Txs {
txe.Txs[i], _ = tx.MarshalBinary()
}
b.newTxsFeed.Send(txe)
case err := <-sub.Err():
log.Warn("Subscription error for NewTxs", "err", err)
return
}
}
}()
})
return b.newTxsFeed.Subscribe(ch)
}
func (b *Backend) SubscribeChainEvent(ch chan<- core.ChainEvent) core.Subscription {
var sub event.Subscription
b.chainOnce.Do(func() {
bch := make(chan gcore.ChainEvent, 100)
sub = b.b.SubscribeChainEvent(bch)
go func() {
for {
select {
case item := <-bch:
ce := core.ChainEvent{
Hash: core.Hash(item.Hash),
}
ce.Block, _ = rlp.EncodeToBytes(item.Block)
ce.Logs, _ = rlp.EncodeToBytes(item.Logs)
b.chainFeed.Send(ce)
case err := <-sub.Err():
log.Warn("Subscription error for Chain", "err", err)
return
}
}
}()
})
return b.chainFeed.Subscribe(ch)
}
func (b *Backend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) core.Subscription {
var sub event.Subscription
b.chainHeadOnce.Do(func() {
bch := make(chan gcore.ChainHeadEvent, 100)
sub = b.b.SubscribeChainHeadEvent(bch)
go func() {
for {
select {
case item := <-bch:
che := core.ChainHeadEvent{}
che.Block, _ = rlp.EncodeToBytes(item.Block)
b.chainHeadFeed.Send(che)
case err := <-sub.Err():
log.Warn("Subscription error for ChainHead", "err", err)
return
}
}
}()
})
return b.chainHeadFeed.Subscribe(ch)
}
func (b *Backend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) core.Subscription {
var sub event.Subscription
b.chainSideOnce.Do(func() {
bch := make(chan gcore.ChainSideEvent, 100)
sub = b.b.SubscribeChainSideEvent(bch)
go func() {
for {
select {
case item := <-bch:
cse := core.ChainSideEvent{}
cse.Block, _ = rlp.EncodeToBytes(item.Block)
b.chainSideFeed.Send(cse)
case err := <-sub.Err():
log.Warn("Subscription error for ChainSide", "err", err)
return
}
}
}()
})
return b.chainSideFeed.Subscribe(ch)
}
func (b *Backend) SubscribeLogsEvent(ch chan<- [][]byte) core.Subscription {
var sub event.Subscription
b.logsOnce.Do(func() {
bch := make(chan []*types.Log, 100)
sub = b.b.SubscribeLogsEvent(bch)
go func() {
for {
select {
case item := <-bch:
logs := make([][]byte, len(item))
for i, log := range item {
logs[i], _ = rlp.EncodeToBytes(log)
}
b.logsFeed.Send(logs)
case err := <-sub.Err():
log.Warn("Subscription error for Logs", "err", err)
return
}
}
}()
})
return b.logsFeed.Subscribe(ch)
} // []RLP encoded logs
func (b *Backend) SubscribePendingLogsEvent(ch chan<- [][]byte) core.Subscription {
var sub event.Subscription
b.pendingLogsOnce.Do(func() {
bch := make(chan []*types.Log, 100)
sub = b.b.SubscribePendingLogsEvent(bch)
go func() {
for {
select {
case item := <-bch:
logs := make([][]byte, len(item))
for i, log := range item {
logs[i], _ = rlp.EncodeToBytes(log)
}
b.pendingLogsFeed.Send(logs)
case err := <-sub.Err():
log.Warn("Subscription error for PendingLogs", "err", err)
return
}
}
}()
})
return b.pendingLogsFeed.Subscribe(ch)
} // RLP Encoded logs
func (b *Backend) SubscribeRemovedLogsEvent(ch chan<- []byte) core.Subscription {
var sub event.Subscription
b.removedLogsOnce.Do(func() {
bch := make(chan gcore.RemovedLogsEvent, 100)
sub = b.b.SubscribeRemovedLogsEvent(bch)
go func() {
for {
select {
case item := <-bch:
logs := make([][]byte, len(item.Logs))
for i, log := range item.Logs {
logs[i], _ = rlp.EncodeToBytes(log)
}
b.removedLogsFeed.Send(item)
case err := <-sub.Err():
log.Warn("Subscription error for RemovedLogs", "err", err)
return
}
}
}()
})
return b.removedLogsFeed.Subscribe(ch)
} // RLP encoded logs
func convertAndSet(a, b reflect.Value) (err error) {
defer func() {
if recover() != nil {
fmt.Errorf("error converting: %v", err.Error())
}
}()
a.Set(b.Convert(a.Type()))
return nil
}
func (b *Backend) ChainConfig() *params.ChainConfig {
// We're using the reflect library to copy data from params.ChainConfig to
// pparams.ChainConfig, so this function shouldn't need to be touched for
// simple changes to ChainConfig (though pparams.ChainConfig may need to be
// updated). Note that this probably won't carry over consensus engine data.
if b.chainConfig != nil {
return b.chainConfig
}
b.chainConfig = &params.ChainConfig{}
nval := reflect.ValueOf(b.b.ChainConfig())
ntype := nval.Type()
lval := reflect.ValueOf(b.chainConfig)
for i := 0; i < nval.NumField(); i++ {
field := ntype.Field(i)
v := nval.FieldByName(field.Name)
lv := lval.FieldByName(field.Name)
if v.Type() == lv.Type() && lv.CanSet() {
lv.Set(v)
} else {
convertAndSet(lv, v)
}
}
return b.chainConfig
}

28
rpc/getRPCcall_test.go Normal file
View File

@ -0,0 +1,28 @@
package rpc
import (
"testing"
"github.com/ethereum/go-ethereum/plugins"
)
func TestGetRPCCalls(t *testing.T) {
invoked := false
done := plugins.HookTester("GetRPCCalls", func(id, method, params string) {
invoked = true
if id == "" {
t.Errorf("Expected id to be non-nil")
}
if method == "" {
t.Errorf("Expected method to be non-nil")
}
if params == "" {
t.Errorf("Expected params to be non-nil")
}
})
defer done()
TestClientResponseType(t)
if !invoked {
t.Errorf("Expected plugin invocation")
}
}

View File

@ -333,6 +333,8 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
pluginGetRPCCalls(string(msg.ID), string(msg.Method), string(msg.Params))
start := time.Now()
answer := h.runMethod(cp.ctx, msg, callb, args)

26
rpc/plugin_hooks.go Normal file
View File

@ -0,0 +1,26 @@
package rpc
import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/plugins"
)
func PluginGetRPCCalls(pl *plugins.PluginLoader, id, method, params string) {
fnList := pl.Lookup("GetRPCCalls", func(item interface{}) bool {
_, ok := item.(func(string, string, string))
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(string, string, string)); ok {
fn(id, method, params)
}
}
}
func pluginGetRPCCalls(id, method, params string) {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting GerRPCCalls, but default PluginLoader has not been initialized")
return
}
PluginGetRPCCalls(plugins.DefaultPluginLoader, id, method, params)
}

137
rpc/plugin_subscriptions.go Normal file
View File

@ -0,0 +1,137 @@
package rpc
import (
"context"
"reflect"
"github.com/ethereum/go-ethereum/log"
)
func isChanType(t reflect.Type) bool {
// Pointers to channels are weird, but whatever
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// Make sure we have a channel
if t.Kind() != reflect.Chan {
return false
}
// Make sure it is a receivable channel
return (t.ChanDir() & reflect.RecvDir) == reflect.RecvDir
}
func isChanPubsub(methodType reflect.Type) bool {
if methodType.NumIn() < 2 || methodType.NumOut() != 2 {
return false
}
return isContextType(methodType.In(1)) &&
isChanType(methodType.Out(0)) &&
isErrorType(methodType.Out(1))
}
func callbackifyChanPubSub(receiver, fn reflect.Value) *callback {
c := &callback{rcvr: receiver, errPos: 1, isSubscribe: true}
fntype := fn.Type()
// Skip receiver and context.Context parameter (if present).
firstArg := 0
if c.rcvr.IsValid() {
firstArg++
}
if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType {
c.hasCtx = true
firstArg++
}
// Add all remaining parameters.
c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg)
for i := firstArg; i < fntype.NumIn(); i++ {
c.argTypes[i-firstArg] = fntype.In(i)
}
retFnType := reflect.FuncOf(append([]reflect.Type{receiver.Type(), contextType}, c.argTypes...), []reflect.Type{subscriptionType, errorType}, false)
// // What follows uses reflection to construct a dynamically typed function equivalent to:
// func(receiver <T>, cctx context.Context, args ...<T>) (rpc.Subscription, error) {
// notifier, supported := NotifierFromContext(cctx)
// if !supported { return Subscription{}, ErrNotificationsUnsupported}
// ctx, cancel := context.WithCancel(context.Background())
// ch, err := fn()
// if err != nil { return Subscription{}, err }
// rpcSub := notifier.CreateSubscription()
// go func() {
// select {
// case v, ok := <- ch:
// if !ok { return }
// notifier.Notify(rpcSub.ID, v)
// case <-rpcSub.Err():
// cancel()
// return
// case <-notifier.Closed():
// cancel()
// return
// }
// }()
// return rpcSub, nil
// }
//
c.fn = reflect.MakeFunc(retFnType, func(args []reflect.Value) ([]reflect.Value) {
notifier, supported := NotifierFromContext(args[1].Interface().(context.Context))
if !supported {
return []reflect.Value{reflect.Zero(subscriptionType), reflect.ValueOf(ErrNotificationsUnsupported)}
}
ctx, cancel := context.WithCancel(context.Background())
args[1] = reflect.ValueOf(ctx)
out := fn.Call(args)
if !out[1].IsNil() {
// This amounts to: if err != nil { return nil, err }
return []reflect.Value{reflect.Zero(subscriptionType), out[1]}
}
// Geth's provided context is done once we've returned the subscription id.
// This new context will cancel when the notifier closes.
rpcSub := notifier.CreateSubscription()
go func() {
defer log.Info("Plugin subscription goroutine closed")
selectCases := []reflect.SelectCase{
{Dir: reflect.SelectRecv, Chan: out[0]},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(rpcSub.Err())},
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(notifier.Closed())},
}
for {
chosen, val, recvOK := reflect.Select(selectCases)
switch chosen {
case 0: // val, ok := <-ch
if !recvOK {
return
}
if err := notifier.Notify(rpcSub.ID, val.Interface()); err != nil {
log.Warn("Subscription notification failed", "id", rpcSub.ID, "err", err)
}
case 1:
cancel()
return
case 2:
cancel()
return
}
}
}()
return []reflect.Value{reflect.ValueOf(*rpcSub), reflect.Zero(errorType)}
})
return c
}
func pluginExtendedCallbacks(callbacks map[string]*callback, receiver reflect.Value) {
typ := receiver.Type()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
if method.PkgPath != "" {
continue // method not exported
}
if isChanPubsub(method.Type) {
cb := callbackifyChanPubSub(receiver, method.Func)
name := formatName(method.Name)
callbacks[name] = cb
}
}
}

View File

@ -64,6 +64,7 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
}
callbacks := suitableCallbacks(rcvrVal)
pluginExtendedCallbacks(callbacks, rcvrVal)
if len(callbacks) == 0 {
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}
@ -198,7 +199,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf))
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf), "args", fullargs)
errRes = errors.New("method handler crashed")
}
}()