update plugeth-utils version

This commit is contained in:
Austin Roberts 2021-08-31 15:29:09 -05:00
parent 40377543bf
commit d9d51dd345
5 changed files with 446 additions and 23 deletions

View File

@ -41,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/wrappers"
"gopkg.in/urfave/cli.v1"
)
@ -314,7 +315,8 @@ func geth(ctx *cli.Context) error {
if err := plugins.Initialize(path.Join(ctx.GlobalString(utils.DataDirFlag.Name), "plugins"), ctx); err != nil { return err }
prepare(ctx)
stack, backend := makeFullNode(ctx)
pluginsInitializeNode(stack, backend)
wrapperBackend := wrappers.NewBackend(backend)
pluginsInitializeNode(stack, wrapperBackend)
if ok, err := plugins.RunSubcommand(ctx); ok {
stack.Close()
return err
@ -325,7 +327,7 @@ func geth(ctx *cli.Context) error {
return fmt.Errorf("invalid command: %q", args[0])
}
}
stack.RegisterAPIs(pluginGetAPIs(stack, backend))
stack.RegisterAPIs(pluginGetAPIs(stack, wrapperBackend))
startNode(ctx, stack, backend)
stack.Wait()

View File

@ -3,28 +3,51 @@ package main
import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/plugins"
"github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/ethereum/go-ethereum/plugins/wrappers"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/log"
"github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted"
)
type APILoader func(*node.Node, interfaces.Backend) []rpc.API
func GetAPIsFromLoader(pl *plugins.PluginLoader, stack *node.Node, backend interfaces.Backend) []rpc.API {
result := []rpc.API{}
fnList := pl.Lookup("GetAPIs", func(item interface{}) bool {
_, ok := item.(func(*node.Node, interfaces.Backend) []rpc.API)
return ok
})
for _, fni := range fnList {
if fn, ok := fni.(func(*node.Node, interfaces.Backend) []rpc.API); ok {
result = append(result, fn(stack, backend)...)
func apiTranslate(apis []core.API) []rpc.API {
result := make([]rpc.API, len(apis))
for i, api := range apis {
result[i] = rpc.API{
Namespace: api.Namespace,
Version: api.Version,
Service: api.Service,
Public: api.Public,
}
}
return result
}
func pluginGetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API {
func GetAPIsFromLoader(pl *plugins.PluginLoader, stack *node.Node, backend restricted.Backend) []rpc.API {
result := []core.API{}
fnList := pl.Lookup("GetAPIs", func(item interface{}) bool {
switch item.(type) {
case func(core.Node, restricted.Backend) []core.API:
return true
case func(core.Node, core.Backend) []core.API:
return true
default:
return false
}
})
for _, fni := range fnList {
switch fn := fni.(type) {
case func(core.Node, restricted.Backend) []core.API:
result = append(result, fn(wrappers.NewNode(stack), backend)...)
case func(core.Node, core.Backend) []core.API:
result = append(result, fn(wrappers.NewNode(stack), backend)...)
default:
}
}
return apiTranslate(result)
}
func pluginGetAPIs(stack *node.Node, backend restricted.Backend) []rpc.API {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting GetAPIs, but default PluginLoader has not been initialized")
return []rpc.API{}
@ -32,19 +55,29 @@ func pluginGetAPIs(stack *node.Node, backend interfaces.Backend) []rpc.API {
return GetAPIsFromLoader(plugins.DefaultPluginLoader, stack, backend)
}
func InitializeNode(pl *plugins.PluginLoader, stack *node.Node, backend interfaces.Backend) {
func InitializeNode(pl *plugins.PluginLoader, stack *node.Node, backend restricted.Backend) {
fnList := pl.Lookup("InitializeNode", func(item interface{}) bool {
_, ok := item.(func(*node.Node, interfaces.Backend))
return ok
switch item.(type) {
case func(core.Logger, core.Node, restricted.Backend):
return true
case func(core.Logger, core.Node, core.Backend):
return true
default:
return false
}
})
for _, fni := range fnList {
if fn, ok := fni.(func(*node.Node, interfaces.Backend)); ok {
fn(stack, backend)
switch fn := fni.(type) {
case func(core.Logger, core.Node, restricted.Backend):
fn(log.Root(), wrappers.NewNode(stack), backend)
case func(core.Logger, core.Node, core.Backend):
fn(log.Root(), wrappers.NewNode(stack), backend)
default:
}
}
}
func pluginsInitializeNode(stack *node.Node, backend interfaces.Backend) {
func pluginsInitializeNode(stack *node.Node, backend restricted.Backend) {
if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting InitializeNode, but default PluginLoader has not been initialized")
return

1
go.mod
View File

@ -49,6 +49,7 @@ require (
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/olekukonko/tablewriter v0.0.5
github.com/openrelayxyz/plugeth-utils v0.0.0
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/prometheus/tsdb v0.7.1
github.com/rjeczalik/notify v0.9.1

View File

@ -4,7 +4,6 @@ import (
"context"
"math/big"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
@ -26,7 +25,6 @@ type Backend interface {
Downloader() *downloader.Downloader
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
ChainDb() ethdb.Database
AccountManager() *accounts.Manager
ExtRPCEnabled() bool
RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection
RPCTxFeeCap() float64 // global tx fee cap for all transaction related APIs

View File

@ -0,0 +1,389 @@
package wrappers
import (
"context"
"math/big"
"encoding/json"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/common"
gcore "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted"
"sync"
)
type Node struct {
n *node.Node
}
func NewNode(n *node.Node) *Node {
return &Node{n}
}
func (n *Node) Server() core.Server {
return n.Server()
}
func (n *Node) DataDir() string {
return n.n.DataDir()
}
func (n *Node) InstanceDir() string {
return n.n.InstanceDir()
}
func (n *Node) IPCEndpoint() string {
return n.n.IPCEndpoint()
}
func (n *Node) HTTPEndpoint() string {
return n.n.HTTPEndpoint()
}
func (n *Node) WSEndpoint() string {
return n.n.WSEndpoint()
}
func (n *Node) ResolvePath(x string) string {
return n.n.ResolvePath(x)
}
type Backend struct{
b interfaces.Backend
newTxsFeed event.Feed
newTxsOnce sync.Once
chainFeed event.Feed
chainOnce sync.Once
chainHeadFeed event.Feed
chainHeadOnce sync.Once
chainSideFeed event.Feed
chainSideOnce sync.Once
logsFeed event.Feed
logsOnce sync.Once
pendingLogsFeed event.Feed
pendingLogsOnce sync.Once
removedLogsFeed event.Feed
removedLogsOnce sync.Once
}
func NewBackend(b interfaces.Backend) *Backend {
return &Backend{b: b}
}
func (b *Backend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
return b.b.SuggestGasTipCap(ctx)
}
func (b *Backend) ChainDb() restricted.Database {
return b.b.ChainDb()
}
func (b *Backend) ExtRPCEnabled() bool {
return b.b.ExtRPCEnabled()
}
func (b *Backend) RPCGasCap() uint64 {
return b.b.RPCGasCap()
}
func (b *Backend) RPCTxFeeCap() float64 {
return b.b.RPCTxFeeCap()
}
func (b *Backend) UnprotectedAllowed() bool {
return b.b.UnprotectedAllowed()
}
func (b *Backend) SetHead(number uint64) {
b.b.SetHead(number)
}
func (b *Backend) HeaderByNumber(ctx context.Context, number int64) ([]byte, error) {
header, err := b.b.HeaderByNumber(ctx, rpc.BlockNumber(number))
if err != nil { return nil, err }
return rlp.EncodeToBytes(header)
}
func (b *Backend) HeaderByHash(ctx context.Context, hash core.Hash) ([]byte, error) {
header, err := b.b.HeaderByHash(ctx, common.Hash(hash))
if err != nil { return nil, err }
return rlp.EncodeToBytes(header)
}
func (b *Backend) CurrentHeader() []byte {
ret, _ := rlp.EncodeToBytes(b.b.CurrentHeader())
return ret
}
func (b *Backend) CurrentBlock() []byte {
ret, _ := rlp.EncodeToBytes(b.b.CurrentBlock())
return ret
}
func (b *Backend) BlockByNumber(ctx context.Context, number int64) ([]byte, error){
block, err := b.b.BlockByNumber(ctx, rpc.BlockNumber(number))
if err != nil { return nil, err }
return rlp.EncodeToBytes(block)
}
func (b *Backend) BlockByHash(ctx context.Context, hash core.Hash) ([]byte, error){
block, err := b.b.BlockByHash(ctx, common.Hash(hash))
if err != nil { return nil, err }
return rlp.EncodeToBytes(block)
}
func (b *Backend) GetReceipts(ctx context.Context, hash core.Hash) ([]byte, error) {
receipts, err := b.b.GetReceipts(ctx, common.Hash(hash))
if err != nil { return nil, err }
return json.Marshal(receipts)
}
func (b *Backend) GetTd(ctx context.Context, hash core.Hash) *big.Int {
return b.b.GetTd(ctx, common.Hash(hash))
}
func (b *Backend) SendTx(ctx context.Context, signedTx []byte) error {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(signedTx); err != nil {
return err
}
return b.b.SendTx(ctx, tx)
}
func (b *Backend) GetTransaction(ctx context.Context, txHash core.Hash) ([]byte, core.Hash, uint64, uint64, error) { // RLP Encoded transaction {
tx, blockHash, blockNumber, index, err := b.b.GetTransaction(ctx, common.Hash(txHash))
if err != nil { return nil, core.Hash(blockHash), blockNumber, index, err }
enc, err := tx.MarshalBinary()
return enc, core.Hash(blockHash), blockNumber, index, err
}
func (b *Backend) GetPoolTransactions() ([][]byte, error) {
txs, err := b.b.GetPoolTransactions()
if err != nil { return nil, err }
results := make([][]byte, len(txs))
for i, tx := range txs {
results[i], _ = rlp.EncodeToBytes(tx)
}
return results, nil
}
func (b *Backend) GetPoolTransaction(txHash core.Hash) []byte {
tx := b.b.GetPoolTransaction(common.Hash(txHash))
if tx == nil { return []byte{} }
enc, _ := rlp.EncodeToBytes(tx)
return enc
}
func (b *Backend) GetPoolNonce(ctx context.Context, addr core.Address) (uint64, error) {
return b.b.GetPoolNonce(ctx, common.Address(addr))
}
func (b *Backend) Stats() (pending int, queued int) {
return b.b.Stats()
}
func (b *Backend) TxPoolContent() (map[core.Address][][]byte, map[core.Address][][]byte) {
pending, queued := b.b.TxPoolContent()
trpending, trqueued := make(map[core.Address][][]byte), make(map[core.Address][][]byte)
for k, v := range pending {
trpending[core.Address(k)] = make([][]byte, len(v))
for i, tx := range v {
trpending[core.Address(k)][i], _ = tx.MarshalBinary()
}
}
for k, v := range queued {
trqueued[core.Address(k)] = make([][]byte, len(v))
for i, tx := range v {
trpending[core.Address(k)][i], _ = tx.MarshalBinary()
}
}
return trpending, trqueued
} // RLP encoded transactions
func (b *Backend) BloomStatus() (uint64, uint64) {
return b.b.BloomStatus()
}
func (b *Backend) GetLogs(ctx context.Context, blockHash core.Hash) ([][]byte, error) {
logs, err := b.b.GetLogs(ctx, common.Hash(blockHash))
if err != nil { return nil, err }
encLogs := make([][]byte, len(logs))
for i, log := range logs {
encLogs[i], _ = rlp.EncodeToBytes(log)
}
return encLogs, nil
} // []RLP encoded logs
type dl struct{
dl *downloader.Downloader
}
type progress struct{
p ethereum.SyncProgress
}
func (p *progress) StartingBlock() uint64 {
return p.p.StartingBlock
}
func (p *progress) CurrentBlock() uint64 {
return p.p.CurrentBlock
}
func (p *progress) HighestBlock() uint64 {
return p.p.HighestBlock
}
func (p *progress) PulledStates() uint64 {
return p.p.PulledStates
}
func (p *progress) KnownStates() uint64 {
return p.p.KnownStates
}
func (d *dl) Progress() core.Progress {
return &progress{d.dl.Progress()}
}
func (b *Backend) Downloader() core.Downloader {
return &dl{b.b.Downloader()}
}
func (b *Backend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) core.Subscription {
var sub event.Subscription
b.newTxsOnce.Do(func() {
bch := make(chan gcore.NewTxsEvent, 100)
sub = b.b.SubscribeNewTxsEvent(bch)
go func(){
for {
select {
case item := <-bch:
txe := core.NewTxsEvent{
Txs: make([][]byte, len(item.Txs)),
}
for i, tx := range item.Txs {
txe.Txs[i], _ = tx.MarshalBinary()
}
b.newTxsFeed.Send(txe)
case err := <-sub.Err():
log.Warn("Subscription error for NewTxs", "err", err)
return
}
}
}()
})
return b.newTxsFeed.Subscribe(ch)
}
func (b *Backend) SubscribeChainEvent(ch chan<- core.ChainEvent) core.Subscription {
var sub event.Subscription
b.chainOnce.Do(func() {
bch := make(chan gcore.ChainEvent, 100)
sub = b.b.SubscribeChainEvent(bch)
go func(){
for {
select {
case item := <-bch:
ce := core.ChainEvent{
Hash: core.Hash(item.Hash),
}
ce.Block, _ = rlp.EncodeToBytes(item.Block)
ce.Logs, _ = rlp.EncodeToBytes(item.Logs)
b.chainFeed.Send(ce)
case err := <-sub.Err():
log.Warn("Subscription error for Chain", "err", err)
return
}
}
}()
})
return b.chainFeed.Subscribe(ch)
}
func (b *Backend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) core.Subscription {
var sub event.Subscription
b.chainHeadOnce.Do(func() {
bch := make(chan gcore.ChainHeadEvent, 100)
sub = b.b.SubscribeChainHeadEvent(bch)
go func(){
for {
select {
case item := <-bch:
che := core.ChainHeadEvent{}
che.Block, _ = rlp.EncodeToBytes(item.Block)
b.chainHeadFeed.Send(che)
case err := <-sub.Err():
log.Warn("Subscription error for ChainHead", "err", err)
return
}
}
}()
})
return b.chainHeadFeed.Subscribe(ch)
}
func (b *Backend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) core.Subscription {
var sub event.Subscription
b.chainSideOnce.Do(func() {
bch := make(chan gcore.ChainSideEvent, 100)
sub = b.b.SubscribeChainSideEvent(bch)
go func(){
for {
select {
case item := <-bch:
cse := core.ChainSideEvent{}
cse.Block, _ = rlp.EncodeToBytes(item.Block)
b.chainSideFeed.Send(cse)
case err := <-sub.Err():
log.Warn("Subscription error for ChainSide", "err", err)
return
}
}
}()
})
return b.chainSideFeed.Subscribe(ch)
}
func (b *Backend) SubscribeLogsEvent(ch chan<- [][]byte) core.Subscription {
var sub event.Subscription
b.logsOnce.Do(func() {
bch := make(chan []*types.Log, 100)
sub = b.b.SubscribeLogsEvent(bch)
go func(){
for {
select {
case item := <-bch:
logs := make([][]byte, len(item))
for i, log := range item {
logs[i], _ = rlp.EncodeToBytes(log)
}
b.logsFeed.Send(logs)
case err := <-sub.Err():
log.Warn("Subscription error for Logs", "err", err)
return
}
}
}()
})
return b.logsFeed.Subscribe(ch)
} // []RLP encoded logs
func (b *Backend) SubscribePendingLogsEvent(ch chan<- [][]byte) core.Subscription {
var sub event.Subscription
b.pendingLogsOnce.Do(func() {
bch := make(chan []*types.Log, 100)
sub = b.b.SubscribePendingLogsEvent(bch)
go func(){
for {
select {
case item := <-bch:
logs := make([][]byte, len(item))
for i, log := range item {
logs[i], _ = rlp.EncodeToBytes(log)
}
b.pendingLogsFeed.Send(logs)
case err := <-sub.Err():
log.Warn("Subscription error for PendingLogs", "err", err)
return
}
}
}()
})
return b.pendingLogsFeed.Subscribe(ch)
} // RLP Encoded logs
func (b *Backend) SubscribeRemovedLogsEvent(ch chan<- []byte) core.Subscription {
var sub event.Subscription
b.removedLogsOnce.Do(func() {
bch := make(chan gcore.RemovedLogsEvent, 100)
sub = b.b.SubscribeRemovedLogsEvent(bch)
go func(){
for {
select {
case item := <-bch:
logs := make([][]byte, len(item.Logs))
for i, log := range item.Logs {
logs[i], _ = rlp.EncodeToBytes(log)
}
b.removedLogsFeed.Send(item)
case err := <-sub.Err():
log.Warn("Subscription error for RemovedLogs", "err", err)
return
}
}
}()
})
return b.removedLogsFeed.Subscribe(ch)
} // RLP encoded logs