Merge remote-tracking branch 'refs/remotes/origin/utils-refactor' into utils-refactor
This commit is contained in:
commit
ea0f27c92d
@ -13,7 +13,7 @@ func PluginPreProcessBlock(pl *plugins.PluginLoader, block *types.Block) {
|
|||||||
_, ok := item.(func([]byte))
|
_, ok := item.(func([]byte))
|
||||||
return ok
|
return ok
|
||||||
})
|
})
|
||||||
encoded, _ = rlp.EncodeToBytes(block)
|
encoded, _ := rlp.EncodeToBytes(block)
|
||||||
for _, fni := range fnList {
|
for _, fni := range fnList {
|
||||||
if fn, ok := fni.(func([]byte)); ok {
|
if fn, ok := fni.(func([]byte)); ok {
|
||||||
fn(encoded)
|
fn(encoded)
|
||||||
|
@ -53,12 +53,9 @@ var DefaultPluginLoader *PluginLoader
|
|||||||
func NewPluginLoader(target string) (*PluginLoader, error) {
|
func NewPluginLoader(target string) (*PluginLoader, error) {
|
||||||
pl := &PluginLoader{
|
pl := &PluginLoader{
|
||||||
Plugins: []*plugin.Plugin{},
|
Plugins: []*plugin.Plugin{},
|
||||||
// RPCPlugins: []APILoader{},
|
|
||||||
Subcommands: make(map[string]Subcommand),
|
Subcommands: make(map[string]Subcommand),
|
||||||
Flags: []*flag.FlagSet{},
|
Flags: []*flag.FlagSet{},
|
||||||
LookupCache: make(map[string][]interface{}),
|
LookupCache: make(map[string][]interface{}),
|
||||||
// CreateConsensusEngine: ethconfig.CreateConsensusEngine,
|
|
||||||
// UpdateBlockchainVMConfig: func(cfg *vm.Config) {},
|
|
||||||
}
|
}
|
||||||
files, err := ioutil.ReadDir(target)
|
files, err := ioutil.ReadDir(target)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -6,6 +6,8 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"reflect"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum"
|
"github.com/ethereum/go-ethereum"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
@ -22,6 +24,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
"github.com/openrelayxyz/plugeth-utils/core"
|
"github.com/openrelayxyz/plugeth-utils/core"
|
||||||
"github.com/openrelayxyz/plugeth-utils/restricted"
|
"github.com/openrelayxyz/plugeth-utils/restricted"
|
||||||
|
"github.com/openrelayxyz/plugeth-utils/restricted/params"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WrappedScopeContext struct {
|
type WrappedScopeContext struct {
|
||||||
@ -226,6 +229,7 @@ type Backend struct {
|
|||||||
pendingLogsOnce sync.Once
|
pendingLogsOnce sync.Once
|
||||||
removedLogsFeed event.Feed
|
removedLogsFeed event.Feed
|
||||||
removedLogsOnce sync.Once
|
removedLogsOnce sync.Once
|
||||||
|
chainConfig *params.ChainConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBackend(b interfaces.Backend) *Backend {
|
func NewBackend(b interfaces.Backend) *Backend {
|
||||||
@ -563,3 +567,36 @@ func (b *Backend) SubscribeRemovedLogsEvent(ch chan<- []byte) core.Subscription
|
|||||||
})
|
})
|
||||||
return b.removedLogsFeed.Subscribe(ch)
|
return b.removedLogsFeed.Subscribe(ch)
|
||||||
} // RLP encoded logs
|
} // RLP encoded logs
|
||||||
|
|
||||||
|
func convertAndSet(a, b reflect.Value) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if recover() != nil {
|
||||||
|
fmt.Errorf("error converting: %v", err.Error())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
a.Set(b.Convert(a.Type()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Backend) ChainConfig() *params.ChainConfig {
|
||||||
|
// We're using the reflect library to copy data from params.ChainConfig to
|
||||||
|
// pparams.ChainConfig, so this function shouldn't need to be touched for
|
||||||
|
// simple changes to ChainConfig (though pparams.ChainConfig may need to be
|
||||||
|
// updated). Note that this probably won't carry over consensus engine data.
|
||||||
|
if b.chainConfig != nil { return b.chainConfig }
|
||||||
|
b.chainConfig = ¶ms.ChainConfig{}
|
||||||
|
nval := reflect.ValueOf(b.b.ChainConfig())
|
||||||
|
ntype := nval.Type()
|
||||||
|
lval := reflect.ValueOf(b.chainConfig)
|
||||||
|
for i := 0; i < nval.NumField(); i++ {
|
||||||
|
field := ntype.Field(i)
|
||||||
|
v := nval.FieldByName(field.Name)
|
||||||
|
lv := lval.FieldByName(field.Name)
|
||||||
|
if v.Type() == lv.Type() && lv.CanSet() {
|
||||||
|
lv.Set(v)
|
||||||
|
} else {
|
||||||
|
convertAndSet(lv, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return b.chainConfig
|
||||||
|
}
|
||||||
|
140
rpc/plugin_subscriptions.go
Normal file
140
rpc/plugin_subscriptions.go
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
package rpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
func isChanType(t reflect.Type) bool {
|
||||||
|
// Pointers to channels are weird, but whatever
|
||||||
|
for t.Kind() == reflect.Ptr {
|
||||||
|
t = t.Elem()
|
||||||
|
}
|
||||||
|
// Make sure we have a channel
|
||||||
|
if t.Kind() != reflect.Chan {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Make sure it is a receivable channel
|
||||||
|
return (t.ChanDir() & reflect.RecvDir) == reflect.RecvDir
|
||||||
|
}
|
||||||
|
|
||||||
|
func isChanPubsub(methodType reflect.Type) bool {
|
||||||
|
if methodType.NumIn() < 2 || methodType.NumOut() != 2 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return isContextType(methodType.In(1)) &&
|
||||||
|
isChanType(methodType.Out(0)) &&
|
||||||
|
isErrorType(methodType.Out(1))
|
||||||
|
}
|
||||||
|
|
||||||
|
func callbackifyChanPubSub(receiver, fn reflect.Value) *callback {
|
||||||
|
c := &callback{rcvr: receiver, errPos: 1, isSubscribe: true}
|
||||||
|
fntype := fn.Type()
|
||||||
|
// Skip receiver and context.Context parameter (if present).
|
||||||
|
firstArg := 0
|
||||||
|
if c.rcvr.IsValid() {
|
||||||
|
firstArg++
|
||||||
|
}
|
||||||
|
if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType {
|
||||||
|
c.hasCtx = true
|
||||||
|
firstArg++
|
||||||
|
}
|
||||||
|
// Add all remaining parameters.
|
||||||
|
c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg)
|
||||||
|
for i := firstArg; i < fntype.NumIn(); i++ {
|
||||||
|
c.argTypes[i-firstArg] = fntype.In(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
retFnType := reflect.FuncOf(append([]reflect.Type{receiver.Type(), contextType}, c.argTypes...), []reflect.Type{subscriptionType, errorType}, false)
|
||||||
|
|
||||||
|
// // What follows uses reflection to construct a dynamically typed function equivalent to:
|
||||||
|
// func(receiver <T>, cctx context.Context, args ...<T>) (rpc.Subscription, error) {
|
||||||
|
// notifier, supported := NotifierFromContext(cctx)
|
||||||
|
// if !supported { return Subscription{}, ErrNotificationsUnsupported}
|
||||||
|
// ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// ch, err := fn()
|
||||||
|
// if err != nil { return Subscription{}, err }
|
||||||
|
// rpcSub := notifier.CreateSubscription()
|
||||||
|
// go func() {
|
||||||
|
// select {
|
||||||
|
// case v, ok := <- ch:
|
||||||
|
// if !ok { return }
|
||||||
|
// notifier.Notify(rpcSub.ID, v)
|
||||||
|
// case <-rpcSub.Err():
|
||||||
|
// cancel()
|
||||||
|
// return
|
||||||
|
// case <-notifier.Closed():
|
||||||
|
// cancel()
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
// }()
|
||||||
|
// return rpcSub, nil
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
|
||||||
|
c.fn = reflect.MakeFunc(retFnType, func(args []reflect.Value) ([]reflect.Value) {
|
||||||
|
notifier, supported := NotifierFromContext(args[1].Interface().(context.Context))
|
||||||
|
if !supported {
|
||||||
|
return []reflect.Value{reflect.Zero(subscriptionType), reflect.ValueOf(ErrNotificationsUnsupported)}
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
args[1] = reflect.ValueOf(ctx)
|
||||||
|
out := fn.Call(args)
|
||||||
|
if !out[1].IsNil() {
|
||||||
|
// This amounts to: if err != nil { return nil, err }
|
||||||
|
return []reflect.Value{reflect.Zero(subscriptionType), out[1]}
|
||||||
|
}
|
||||||
|
// Geth's provided context is done once we've returned the subscription id.
|
||||||
|
// This new context will cancel when the notifier closes.
|
||||||
|
|
||||||
|
rpcSub := notifier.CreateSubscription()
|
||||||
|
go func() {
|
||||||
|
defer log.Info("Plugin subscription goroutine closed")
|
||||||
|
selectCases := []reflect.SelectCase{
|
||||||
|
{Dir: reflect.SelectRecv, Chan: out[0]},
|
||||||
|
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(rpcSub.Err())},
|
||||||
|
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(notifier.Closed())},
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
chosen, val, recvOK := reflect.Select(selectCases)
|
||||||
|
switch chosen {
|
||||||
|
case 0: // val, ok := <-ch
|
||||||
|
if !recvOK {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
notifier.Notify(rpcSub.ID, val.Interface())
|
||||||
|
case 1:
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
case 2:
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return []reflect.Value{reflect.ValueOf(*rpcSub), reflect.Zero(errorType)}
|
||||||
|
})
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func pluginExtendedCallbacks(callbacks map[string]*callback, receiver reflect.Value) {
|
||||||
|
typ := receiver.Type()
|
||||||
|
for m := 0; m < typ.NumMethod(); m++ {
|
||||||
|
method := typ.Method(m)
|
||||||
|
if method.PkgPath != "" {
|
||||||
|
continue // method not exported
|
||||||
|
}
|
||||||
|
if method.Name == "Timer" {
|
||||||
|
methodType := method.Func.Type()
|
||||||
|
log.Info("Timer method", "in", methodType.NumIn(), "out", methodType.NumOut(), "contextType", isContextType(methodType.In(1)), "chanType", isChanType(methodType.Out(0)), "chandir", methodType.Out(0).ChanDir() & reflect.RecvDir == reflect.RecvDir, "errorType", isErrorType(methodType.Out(1)))
|
||||||
|
}
|
||||||
|
if isChanPubsub(method.Type) {
|
||||||
|
cb := callbackifyChanPubSub(receiver, method.Func)
|
||||||
|
name := formatName(method.Name)
|
||||||
|
callbacks[name] = cb
|
||||||
|
log.Info("Added chanPubsub", "name", name, "args", cb.argTypes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -64,6 +64,7 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
|
|||||||
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
|
return fmt.Errorf("no service name for type %s", rcvrVal.Type().String())
|
||||||
}
|
}
|
||||||
callbacks := suitableCallbacks(rcvrVal)
|
callbacks := suitableCallbacks(rcvrVal)
|
||||||
|
pluginExtendedCallbacks(callbacks, rcvrVal)
|
||||||
if len(callbacks) == 0 {
|
if len(callbacks) == 0 {
|
||||||
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
|
return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
|
||||||
}
|
}
|
||||||
@ -198,7 +199,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value
|
|||||||
const size = 64 << 10
|
const size = 64 << 10
|
||||||
buf := make([]byte, size)
|
buf := make([]byte, size)
|
||||||
buf = buf[:runtime.Stack(buf, false)]
|
buf = buf[:runtime.Stack(buf, false)]
|
||||||
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf))
|
log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf), "args", fullargs)
|
||||||
errRes = errors.New("method handler crashed")
|
errRes = errors.New("method handler crashed")
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
Loading…
Reference in New Issue
Block a user