diff --git a/core/plugin_hooks.go b/core/plugin_hooks.go index ffd9ff1b9..0d6661741 100644 --- a/core/plugin_hooks.go +++ b/core/plugin_hooks.go @@ -1,22 +1,24 @@ package core import ( + "encoding/json" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/plugins" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "github.com/openrelayxyz/plugeth-utils/core" ) func PluginPreProcessBlock(pl *plugins.PluginLoader, block *types.Block) { fnList := pl.Lookup("PreProcessBlock", func(item interface{}) bool { - _, ok := item.(func([]byte)) + _, ok := item.(func(core.Hash, uint64, []byte)) return ok }) encoded, _ := rlp.EncodeToBytes(block) for _, fni := range fnList { - if fn, ok := fni.(func([]byte)); ok { - fn(encoded) + if fn, ok := fni.(func(core.Hash, uint64, []byte)); ok { + fn(core.Hash(block.Hash()), block.NumberU64(), encoded) } } } @@ -29,14 +31,13 @@ func pluginPreProcessBlock(block *types.Block) { } func PluginPreProcessTransaction(pl *plugins.PluginLoader, tx *types.Transaction, block *types.Block, i int) { fnList := pl.Lookup("PreProcessTransaction", func(item interface{}) bool { - _, ok := item.(func([]byte, []byte, int)) + _, ok := item.(func([]byte, core.Hash, core.Hash, int)) return ok }) txBytes, _ := tx.MarshalBinary() - blockBytes, _ := rlp.EncodeToBytes(block) for _, fni := range fnList { - if fn, ok := fni.(func([]byte, []byte, int)); ok { - fn(txBytes, blockBytes, i) + if fn, ok := fni.(func([]byte, core.Hash, core.Hash, int)); ok { + fn(txBytes, core.Hash(tx.Hash()), core.Hash(block.Hash()), i) } } } @@ -49,12 +50,12 @@ func pluginPreProcessTransaction(tx *types.Transaction, block *types.Block, i in } func PluginBlockProcessingError(pl *plugins.PluginLoader, tx *types.Transaction, block *types.Block, err error) { fnList := pl.Lookup("BlockProcessingError", func(item interface{}) bool { - _, ok := item.(func(*types.Transaction, *types.Block, error)) + _, ok := item.(func(core.Hash, core.Hash, error)) return ok }) for _, fni := range fnList { - if fn, ok := fni.(func(*types.Transaction, *types.Block, error)); ok { - fn(tx, block, err) + if fn, ok := fni.(func(core.Hash, core.Hash, error)); ok { + fn(core.Hash(tx.Hash()), core.Hash(block.Hash()), err) } } } @@ -67,12 +68,13 @@ func pluginBlockProcessingError(tx *types.Transaction, block *types.Block, err e } func PluginPostProcessTransaction(pl *plugins.PluginLoader, tx *types.Transaction, block *types.Block, i int, receipt *types.Receipt) { fnList := pl.Lookup("PostProcessTransaction", func(item interface{}) bool { - _, ok := item.(func(*types.Transaction, *types.Block, int, *types.Receipt)) + _, ok := item.(func(core.Hash, core.Hash, int, []byte)) return ok }) + receiptBytes, _ := json.Marshal(receipt) for _, fni := range fnList { - if fn, ok := fni.(func(*types.Transaction, *types.Block, int, *types.Receipt)); ok { - fn(tx, block, i, receipt) + if fn, ok := fni.(func(core.Hash, core.Hash, int, []byte)); ok { + fn(core.Hash(tx.Hash()), core.Hash(block.Hash()), i, receiptBytes) } } } @@ -85,12 +87,12 @@ func pluginPostProcessTransaction(tx *types.Transaction, block *types.Block, i i } func PluginPostProcessBlock(pl *plugins.PluginLoader, block *types.Block) { fnList := pl.Lookup("PostProcessBlock", func(item interface{}) bool { - _, ok := item.(func(*types.Block)) + _, ok := item.(func(core.Hash)) return ok }) for _, fni := range fnList { - if fn, ok := fni.(func(*types.Block)); ok { - fn(block) + if fn, ok := fni.(func(core.Hash)); ok { + fn(core.Hash(block.Hash())) } } } @@ -105,12 +107,17 @@ func pluginPostProcessBlock(block *types.Block) { func PluginNewHead(pl *plugins.PluginLoader, block *types.Block, hash common.Hash, logs []*types.Log) { fnList := pl.Lookup("NewHead", func(item interface{}) bool { - _, ok := item.(func(*types.Block, common.Hash, []*types.Log)) + _, ok := item.(func([]byte, core.Hash, [][]byte)) return ok }) + blockBytes, _ := rlp.EncodeToBytes(block) + logBytes := make([][]byte, len(logs)) + for i, l := range logs { + logBytes[i], _ = rlp.EncodeToBytes(l) + } for _, fni := range fnList { - if fn, ok := fni.(func(*types.Block, common.Hash, []*types.Log)); ok { - fn(block, hash, logs) + if fn, ok := fni.(func([]byte, core.Hash, [][]byte)); ok { + fn(blockBytes, core.Hash(hash), logBytes) } } } @@ -124,12 +131,17 @@ func pluginNewHead(block *types.Block, hash common.Hash, logs []*types.Log) { func PluginNewSideBlock(pl *plugins.PluginLoader, block *types.Block, hash common.Hash, logs []*types.Log) { fnList := pl.Lookup("NewSideBlock", func(item interface{}) bool { - _, ok := item.(func(*types.Block, common.Hash, []*types.Log)) + _, ok := item.(func([]byte, core.Hash, [][]byte)) return ok }) + blockBytes, _ := rlp.EncodeToBytes(block) + logBytes := make([][]byte, len(logs)) + for i, l := range logs { + logBytes[i], _ = rlp.EncodeToBytes(l) + } for _, fni := range fnList { - if fn, ok := fni.(func(*types.Block, common.Hash, []*types.Log)); ok { - fn(block, hash, logs) + if fn, ok := fni.(func([]byte, core.Hash, [][]byte)); ok { + fn(blockBytes, core.Hash(hash), logBytes) } } } @@ -143,12 +155,20 @@ func pluginNewSideBlock(block *types.Block, hash common.Hash, logs []*types.Log) func PluginReorg(pl *plugins.PluginLoader, commonBlock *types.Block, oldChain, newChain types.Blocks) { fnList := pl.Lookup("Reorg", func(item interface{}) bool { - _, ok := item.(func(common *types.Block, oldChain, newChain types.Blocks)) + _, ok := item.(func(core.Hash, []core.Hash, []core.Hash)) return ok }) + oldChainHashes := make([]core.Hash, len(oldChain)) + for i, block := range oldChain { + oldChainHashes[i] = core.Hash(block.Hash()) + } + newChainHashes := make([]core.Hash, len(newChain)) + for i, block := range newChain { + newChainHashes[i] = core.Hash(block.Hash()) + } for _, fni := range fnList { - if fn, ok := fni.(func(common *types.Block, oldChain, newChain types.Blocks)); ok { - fn(commonBlock, oldChain, newChain) + if fn, ok := fni.(func(core.Hash, []core.Hash, []core.Hash)); ok { + fn(core.Hash(commonBlock.Hash()), oldChainHashes, newChainHashes) } } } diff --git a/core/state/plugin_hooks.go b/core/state/plugin_hooks.go index ecafcdfab..46933901b 100644 --- a/core/state/plugin_hooks.go +++ b/core/state/plugin_hooks.go @@ -11,7 +11,7 @@ import ( func PluginStateUpdate(pl *plugins.PluginLoader, blockRoot, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) { fnList := pl.Lookup("StateUpdate", func(item interface{}) bool { - _, ok := item.(func(common.Hash, common.Hash, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte)) + _, ok := item.(func(core.Hash, core.Hash, map[core.Hash]struct{}, map[core.Hash][]byte, map[core.Hash]map[core.Hash][]byte)) return ok }) coreDestructs := make(map[core.Hash]struct{}) diff --git a/plugins/plugin_loader.go b/plugins/plugin_loader.go index 7cd4abd31..358cb6285 100644 --- a/plugins/plugin_loader.go +++ b/plugins/plugin_loader.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/event" "gopkg.in/urfave/cli.v1" ) @@ -156,3 +157,20 @@ func ParseFlags(args []string) bool { } return DefaultPluginLoader.ParseFlags(args) } + + +type feedWrapper struct { + feed *event.Feed +} + +func (f *feedWrapper) Send(item interface{}) int { + return f.feed.Send(item) +} + +func (f *feedWrapper) Subscribe(ch interface{}) core.Subscription { + return f.feed.Subscribe(ch) +} + +func (pl *PluginLoader) GetFeed() core.Feed { + return &feedWrapper{&event.Feed{}} +} diff --git a/rpc/plugin_subscriptions.go b/rpc/plugin_subscriptions.go index 165717176..ac8282925 100644 --- a/rpc/plugin_subscriptions.go +++ b/rpc/plugin_subscriptions.go @@ -104,7 +104,9 @@ func callbackifyChanPubSub(receiver, fn reflect.Value) *callback { if !recvOK { return } - notifier.Notify(rpcSub.ID, val.Interface()) + if err := notifier.Notify(rpcSub.ID, val.Interface()); err != nil { + log.Warn("Subscription notification failed", "id", rpcSub.ID, "err", err) + } case 1: cancel() return @@ -126,15 +128,10 @@ func pluginExtendedCallbacks(callbacks map[string]*callback, receiver reflect.Va if method.PkgPath != "" { continue // method not exported } - if method.Name == "Timer" { - methodType := method.Func.Type() - log.Info("Timer method", "in", methodType.NumIn(), "out", methodType.NumOut(), "contextType", isContextType(methodType.In(1)), "chanType", isChanType(methodType.Out(0)), "chandir", methodType.Out(0).ChanDir() & reflect.RecvDir == reflect.RecvDir, "errorType", isErrorType(methodType.Out(1))) - } if isChanPubsub(method.Type) { cb := callbackifyChanPubSub(receiver, method.Func) name := formatName(method.Name) callbacks[name] = cb - log.Info("Added chanPubsub", "name", name, "args", cb.argTypes) } } }