163 lines
4.6 KiB
Go
163 lines
4.6 KiB
Go
package rpc
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/plugins"
|
|
)
|
|
|
|
// Is t context.Context or *context.Context?
|
|
func isContextType(t reflect.Type) bool {
|
|
for t.Kind() == reflect.Ptr {
|
|
t = t.Elem()
|
|
}
|
|
return t == contextType
|
|
}
|
|
|
|
func isChanType(t reflect.Type) bool {
|
|
// Pointers to channels are weird, but whatever
|
|
for t.Kind() == reflect.Ptr {
|
|
t = t.Elem()
|
|
}
|
|
// Make sure we have a channel
|
|
if t.Kind() != reflect.Chan {
|
|
return false
|
|
}
|
|
// Make sure it is a receivable channel
|
|
return (t.ChanDir() & reflect.RecvDir) == reflect.RecvDir
|
|
}
|
|
|
|
func isChanPubsub(methodType reflect.Type) bool {
|
|
if methodType.NumIn() < 2 || methodType.NumOut() != 2 {
|
|
return false
|
|
}
|
|
return isContextType(methodType.In(1)) &&
|
|
isChanType(methodType.Out(0)) &&
|
|
isErrorType(methodType.Out(1))
|
|
}
|
|
|
|
func callbackifyChanPubSub(receiver, fn reflect.Value) *callback {
|
|
c := &callback{rcvr: receiver, errPos: 1, isSubscribe: true}
|
|
fntype := fn.Type()
|
|
// Skip receiver and context.Context parameter (if present).
|
|
firstArg := 0
|
|
if c.rcvr.IsValid() {
|
|
firstArg++
|
|
}
|
|
if fntype.NumIn() > firstArg && fntype.In(firstArg) == contextType {
|
|
c.hasCtx = true
|
|
firstArg++
|
|
}
|
|
// Add all remaining parameters.
|
|
c.argTypes = make([]reflect.Type, fntype.NumIn()-firstArg)
|
|
for i := firstArg; i < fntype.NumIn(); i++ {
|
|
c.argTypes[i-firstArg] = fntype.In(i)
|
|
}
|
|
|
|
retFnType := reflect.FuncOf(append([]reflect.Type{receiver.Type(), contextType}, c.argTypes...), []reflect.Type{subscriptionType, errorType}, false)
|
|
|
|
// // What follows uses reflection to construct a dynamically typed function equivalent to:
|
|
// func(receiver <T>, cctx context.Context, args ...<T>) (rpc.Subscription, error) {
|
|
// notifier, supported := NotifierFromContext(cctx)
|
|
// if !supported { return Subscription{}, ErrNotificationsUnsupported}
|
|
// ctx, cancel := context.WithCancel(context.Background())
|
|
// ch, err := fn()
|
|
// if err != nil { return Subscription{}, err }
|
|
// rpcSub := notifier.CreateSubscription()
|
|
// go func() {
|
|
// select {
|
|
// case v, ok := <- ch:
|
|
// if !ok { return }
|
|
// notifier.Notify(rpcSub.ID, v)
|
|
// case <-rpcSub.Err():
|
|
// cancel()
|
|
// return
|
|
// case <-notifier.Closed():
|
|
// cancel()
|
|
// return
|
|
// }
|
|
// }()
|
|
// return rpcSub, nil
|
|
// }
|
|
//
|
|
|
|
c.fn = reflect.MakeFunc(retFnType, func(args []reflect.Value) ([]reflect.Value) {
|
|
notifier, supported := NotifierFromContext(args[1].Interface().(context.Context))
|
|
if !supported {
|
|
return []reflect.Value{reflect.Zero(subscriptionType), reflect.ValueOf(ErrNotificationsUnsupported)}
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
args[1] = reflect.ValueOf(ctx)
|
|
out := fn.Call(args)
|
|
if !out[1].IsNil() {
|
|
// This amounts to: if err != nil { return nil, err }
|
|
return []reflect.Value{reflect.Zero(subscriptionType), out[1]}
|
|
}
|
|
// Geth's provided context is done once we've returned the subscription id.
|
|
// This new context will cancel when the notifier closes.
|
|
|
|
rpcSub := notifier.CreateSubscription()
|
|
go func() {
|
|
defer log.Info("Plugin subscription goroutine closed")
|
|
selectCases := []reflect.SelectCase{
|
|
{Dir: reflect.SelectRecv, Chan: out[0]},
|
|
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(rpcSub.Err())},
|
|
{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(notifier.Closed())},
|
|
}
|
|
for {
|
|
chosen, val, recvOK := reflect.Select(selectCases)
|
|
switch chosen {
|
|
case 0: // val, ok := <-ch
|
|
if !recvOK {
|
|
return
|
|
}
|
|
if err := notifier.Notify(rpcSub.ID, val.Interface()); err != nil {
|
|
log.Warn("Subscription notification failed", "id", rpcSub.ID, "err", err)
|
|
}
|
|
case 1:
|
|
cancel()
|
|
return
|
|
case 2:
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return []reflect.Value{reflect.ValueOf(*rpcSub), reflect.Zero(errorType)}
|
|
})
|
|
return c
|
|
}
|
|
|
|
func RPCSubscription(pl *plugins.PluginLoader) {
|
|
fnList := pl.Lookup("RPCSubscriptionTest", func(item interface{}) bool {
|
|
_, ok := item.(func())
|
|
return ok
|
|
})
|
|
for _, fni := range fnList {
|
|
if fn, ok := fni.(func()); ok {
|
|
fn()
|
|
}
|
|
}
|
|
}
|
|
|
|
func pluginExtendedCallbacks(callbacks map[string]*callback, receiver reflect.Value) {
|
|
if plugins.DefaultPluginLoader == nil {
|
|
log.Warn("Attempting RPCSubscriptionTest, but default PluginLoader has not been initialized")
|
|
return
|
|
}
|
|
RPCSubscription(plugins.DefaultPluginLoader)
|
|
typ := receiver.Type()
|
|
for m := 0; m < typ.NumMethod(); m++ {
|
|
method := typ.Method(m)
|
|
if method.PkgPath != "" {
|
|
continue // method not exported
|
|
}
|
|
if isChanPubsub(method.Type) {
|
|
cb := callbackifyChanPubSub(receiver, method.Func)
|
|
name := formatName(method.Name)
|
|
callbacks[name] = cb
|
|
}
|
|
}
|
|
}
|