diff --git a/core/plugin_hooks.go b/core/plugin_hooks.go index c5636d9c7..ffd9ff1b9 100644 --- a/core/plugin_hooks.go +++ b/core/plugin_hooks.go @@ -13,7 +13,7 @@ func PluginPreProcessBlock(pl *plugins.PluginLoader, block *types.Block) { _, ok := item.(func([]byte)) return ok }) - encoded, _ = rlp.EncodeToBytes(block) + encoded, _ := rlp.EncodeToBytes(block) for _, fni := range fnList { if fn, ok := fni.(func([]byte)); ok { fn(encoded) diff --git a/plugins/plugin_loader.go b/plugins/plugin_loader.go index 19da5986a..7cd4abd31 100644 --- a/plugins/plugin_loader.go +++ b/plugins/plugin_loader.go @@ -53,12 +53,9 @@ var DefaultPluginLoader *PluginLoader func NewPluginLoader(target string) (*PluginLoader, error) { pl := &PluginLoader{ Plugins: []*plugin.Plugin{}, - // RPCPlugins: []APILoader{}, Subcommands: make(map[string]Subcommand), Flags: []*flag.FlagSet{}, LookupCache: make(map[string][]interface{}), - // CreateConsensusEngine: ethconfig.CreateConsensusEngine, - // UpdateBlockchainVMConfig: func(cfg *vm.Config) {}, } files, err := ioutil.ReadDir(target) if err != nil { diff --git a/plugins/wrappers/wrappers.go b/plugins/wrappers/wrappers.go index f13602f7b..1bd952dae 100644 --- a/plugins/wrappers/wrappers.go +++ b/plugins/wrappers/wrappers.go @@ -6,6 +6,8 @@ import ( "math/big" "sync" "time" + "reflect" + "fmt" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" @@ -22,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/openrelayxyz/plugeth-utils/core" "github.com/openrelayxyz/plugeth-utils/restricted" + "github.com/openrelayxyz/plugeth-utils/restricted/params" ) type WrappedScopeContext struct { @@ -226,6 +229,7 @@ type Backend struct { pendingLogsOnce sync.Once removedLogsFeed event.Feed removedLogsOnce sync.Once + chainConfig *params.ChainConfig } func NewBackend(b interfaces.Backend) *Backend { @@ -563,3 +567,36 @@ func (b *Backend) SubscribeRemovedLogsEvent(ch chan<- []byte) core.Subscription }) return b.removedLogsFeed.Subscribe(ch) } // RLP encoded logs + +func convertAndSet(a, b reflect.Value) (err error) { + defer func() { + if recover() != nil { + fmt.Errorf("error converting: %v", err.Error()) + } + }() + a.Set(b.Convert(a.Type())) + return nil +} + +func (b *Backend) ChainConfig() *params.ChainConfig { + // We're using the reflect library to copy data from params.ChainConfig to + // pparams.ChainConfig, so this function shouldn't need to be touched for + // simple changes to ChainConfig (though pparams.ChainConfig may need to be + // updated). Note that this probably won't carry over consensus engine data. + if b.chainConfig != nil { return b.chainConfig } + b.chainConfig = ¶ms.ChainConfig{} + nval := reflect.ValueOf(b.b.ChainConfig()) + ntype := nval.Type() + lval := reflect.ValueOf(b.chainConfig) + for i := 0; i < nval.NumField(); i++ { + field := ntype.Field(i) + v := nval.FieldByName(field.Name) + lv := lval.FieldByName(field.Name) + if v.Type() == lv.Type() && lv.CanSet() { + lv.Set(v) + } else { + convertAndSet(lv, v) + } + } + return b.chainConfig +} diff --git a/rpc/plugin_subscriptions.go b/rpc/plugin_subscriptions.go new file mode 100644 index 000000000..165717176 --- /dev/null +++ b/rpc/plugin_subscriptions.go @@ -0,0 +1,140 @@ +package rpc + +import ( + "context" + "reflect" + "github.com/ethereum/go-ethereum/log" +) + + +func isChanType(t reflect.Type) bool { + // Pointers to channels are weird, but whatever + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // Make sure we have a channel + if t.Kind() != reflect.Chan { + return false + } + // Make sure it is a receivable channel + return (t.ChanDir() & reflect.RecvDir) == reflect.RecvDir +} + +func isChanPubsub(methodType reflect.Type) bool { + if methodType.NumIn() < 2 || methodType.NumOut() != 2 { + return false + } + return isContextType(methodType.In(1)) && + isChanType(methodType.Out(0)) && + isErrorType(methodType.Out(1)) +} + +func callbackifyChanPubSub(receiver, fn reflect.Value) *callback { + c := &callback{rcvr: receiver, errPos: 1, isSubscribe: true} + fntype := fn.Type() + // Skip receiver and context.Context parameter (if present). + firstArg := 0 + if c.rcvr.IsValid() { + firstArg++ + } + if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType { + c.hasCtx = true + firstArg++ + } + // Add all remaining parameters. + c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg) + for i := firstArg; i < fntype.NumIn(); i++ { + c.argTypes[i-firstArg] = fntype.In(i) + } + + retFnType := reflect.FuncOf(append([]reflect.Type{receiver.Type(), contextType}, c.argTypes...), []reflect.Type{subscriptionType, errorType}, false) + +// // What follows uses reflection to construct a dynamically typed function equivalent to: +// func(receiver , cctx context.Context, args ...) (rpc.Subscription, error) { +// notifier, supported := NotifierFromContext(cctx) +// if !supported { return Subscription{}, ErrNotificationsUnsupported} +// ctx, cancel := context.WithCancel(context.Background()) +// ch, err := fn() +// if err != nil { return Subscription{}, err } +// rpcSub := notifier.CreateSubscription() +// go func() { +// select { +// case v, ok := <- ch: +// if !ok { return } +// notifier.Notify(rpcSub.ID, v) +// case <-rpcSub.Err(): +// cancel() +// return +// case <-notifier.Closed(): +// cancel() +// return +// } +// }() +// return rpcSub, nil +// } +// + + c.fn = reflect.MakeFunc(retFnType, func(args []reflect.Value) ([]reflect.Value) { + notifier, supported := NotifierFromContext(args[1].Interface().(context.Context)) + if !supported { + return []reflect.Value{reflect.Zero(subscriptionType), reflect.ValueOf(ErrNotificationsUnsupported)} + } + ctx, cancel := context.WithCancel(context.Background()) + args[1] = reflect.ValueOf(ctx) + out := fn.Call(args) + if !out[1].IsNil() { + // This amounts to: if err != nil { return nil, err } + return []reflect.Value{reflect.Zero(subscriptionType), out[1]} + } + // Geth's provided context is done once we've returned the subscription id. + // This new context will cancel when the notifier closes. + + rpcSub := notifier.CreateSubscription() + go func() { + defer log.Info("Plugin subscription goroutine closed") + selectCases := []reflect.SelectCase{ + {Dir: reflect.SelectRecv, Chan: out[0]}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(rpcSub.Err())}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(notifier.Closed())}, + } + for { + chosen, val, recvOK := reflect.Select(selectCases) + switch chosen { + case 0: // val, ok := <-ch + if !recvOK { + return + } + notifier.Notify(rpcSub.ID, val.Interface()) + case 1: + cancel() + return + case 2: + cancel() + return + } + } + }() + return []reflect.Value{reflect.ValueOf(*rpcSub), reflect.Zero(errorType)} + }) + return c +} + +func pluginExtendedCallbacks(callbacks map[string]*callback, receiver reflect.Value) { + typ := receiver.Type() + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + if method.PkgPath != "" { + continue // method not exported + } + if method.Name == "Timer" { + methodType := method.Func.Type() + log.Info("Timer method", "in", methodType.NumIn(), "out", methodType.NumOut(), "contextType", isContextType(methodType.In(1)), "chanType", isChanType(methodType.Out(0)), "chandir", methodType.Out(0).ChanDir() & reflect.RecvDir == reflect.RecvDir, "errorType", isErrorType(methodType.Out(1))) + } + if isChanPubsub(method.Type) { + cb := callbackifyChanPubSub(receiver, method.Func) + name := formatName(method.Name) + callbacks[name] = cb + log.Info("Added chanPubsub", "name", name, "args", cb.argTypes) + } + } +} diff --git a/rpc/service.go b/rpc/service.go index bef891ea1..6f0a0921e 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -64,6 +64,7 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { return fmt.Errorf("no service name for type %s", rcvrVal.Type().String()) } callbacks := suitableCallbacks(rcvrVal) + pluginExtendedCallbacks(callbacks, rcvrVal) if len(callbacks) == 0 { return fmt.Errorf("service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } @@ -198,7 +199,7 @@ func (c *callback) call(ctx context.Context, method string, args []reflect.Value const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] - log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf)) + log.Error("RPC method " + method + " crashed: " + fmt.Sprintf("%v\n%s", err, buf), "args", fullargs) errRes = errors.New("method handler crashed") } }()