utils refactor work from the week

This commit is contained in:
philip-morlier 2021-09-03 15:20:49 -07:00
parent 0d5af1c7dc
commit c36c999383
No known key found for this signature in database
GPG Key ID: 0323A143B7B6F663
4 changed files with 364 additions and 278 deletions

View File

@ -4,24 +4,40 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/plugins" "github.com/ethereum/go-ethereum/plugins"
"github.com/opoenrelayxyz/plugeth-utils/core" "github.com/openrelayxyz/plugeth-utils/core"
) )
// TODO (philip): change common.Hash to core.Hash, // TODO (philip): change common.Hash to core.Hash,
func PluginStateUpdate(pl *plugins.PluginLoader, blockRoot, parentRoot core.Hash, destructs map[core.Hash]struct{}, accounts map[core.Hash][]byte, storage map[core.Hash]map[core.Hash][]byte) { func PluginStateUpdate(pl *plugins.PluginLoader, blockRoot, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) {
fnList := pl.Lookup("StateUpdate", func(item interface{}) bool { fnList := pl.Lookup("StateUpdate", func(item interface{}) bool {
_, ok := item.(func(core.Hash, core.Hash, map[core.Hash]struct{}, map[core.Hash][]byte, map[core.Hash]map[core.Hash][]byte)) _, ok := item.(func(common.Hash, common.Hash, map[common.Hash]struct{}, map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte))
return ok return ok
}) })
coreDestructs := make(map[core.Hash]struct{})
for k, v := range destructs {
coreDestructs[core.Hash(k)] = v
}
coreAccounts := make(map[core.Hash][]byte)
for k, v := range accounts {
coreAccounts[core.Hash(k)] = v
}
coreStorage := make(map[core.Hash]map[core.Hash][]byte)
for k, v := range storage {
coreStorage[core.Hash(k)] = make(map[core.Hash][]byte)
for h, d := range v {
coreStorage[core.Hash(k)][core.Hash(h)] = d
}
}
for _, fni := range fnList { for _, fni := range fnList {
if fn, ok := fni.(func(core.Hash, core.Hash, map[core.Hash]struct{}, map[core.Hash][]byte, map[core.Hash]map[core.Hash][]byte)); ok { if fn, ok := fni.(func(core.Hash, core.Hash, map[core.Hash]struct{}, map[core.Hash][]byte, map[core.Hash]map[core.Hash][]byte)); ok {
fn(blockRoot, parentRoot, destructs, accounts, storage) fn(core.Hash(blockRoot), core.Hash(parentRoot), coreDestructs, coreAccounts, coreStorage)
} }
} }
} }
func pluginStateUpdate(blockRoot, parentRoot core.Hash, destructs map[core.Hash]struct{}, accounts map[core.Hash][]byte, storage map[core.Hash]map[core.Hash][]byte) { func pluginStateUpdate(blockRoot, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) {
if plugins.DefaultPluginLoader == nil { if plugins.DefaultPluginLoader == nil {
log.Warn("Attempting StateUpdate, but default PluginLoader has not been initialized") log.Warn("Attempting StateUpdate, but default PluginLoader has not been initialized")
return return

View File

@ -145,7 +145,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
chainDb: chainDb, chainDb: chainDb,
eventMux: stack.EventMux(), eventMux: stack.EventMux(),
accountManager: stack.AccountManager(), accountManager: stack.AccountManager(),
engine: pluginCreateConsensusEngine(stack, chainConfig, &ethashConfig, config.Miner.Notify, config.Miner.Noverify, chainDb), engine: ethconfig.CreateConsensusEngine(stack, chainConfig, &ethashConfig, config.Miner.Notify, config.Miner.Noverify, chainDb),
closeBloomHandler: make(chan struct{}), closeBloomHandler: make(chan struct{}),
networkID: config.NetworkId, networkID: config.NetworkId,
gasPrice: config.Miner.GasPrice, gasPrice: config.Miner.GasPrice,

4
go.mod
View File

@ -49,7 +49,7 @@ require (
github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/openrelayxyz/plugeth-utils v0.0.1 github.com/openrelayxyz/plugeth-utils v0.0.3
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/prometheus/tsdb v0.7.1 github.com/prometheus/tsdb v0.7.1
github.com/rjeczalik/notify v0.9.1 github.com/rjeczalik/notify v0.9.1
@ -70,3 +70,5 @@ require (
gopkg.in/urfave/cli.v1 v1.20.0 gopkg.in/urfave/cli.v1 v1.20.0
gotest.tools v2.2.0+incompatible // indirect gotest.tools v2.2.0+incompatible // indirect
) )
replace github.com/openrelayxyz/plugeth-utils => /home/philip/src/rivet/plugeth-utils

View File

@ -1,389 +1,457 @@
package wrappers package wrappers
import ( import (
"context" "context"
"math/big" "encoding/json"
"encoding/json" "math/big"
"github.com/ethereum/go-ethereum" "sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event" gcore "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/downloader"
gcore "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/plugins/interfaces" "github.com/ethereum/go-ethereum/node"
"github.com/openrelayxyz/plugeth-utils/core" "github.com/ethereum/go-ethereum/plugins/interfaces"
"github.com/openrelayxyz/plugeth-utils/restricted" "github.com/ethereum/go-ethereum/rlp"
"sync" "github.com/ethereum/go-ethereum/rpc"
"github.com/openrelayxyz/plugeth-utils/core"
"github.com/openrelayxyz/plugeth-utils/restricted"
) )
type WrapedScopeContext struct {
s *vm.ScopeContext
}
func (w *WrapedScopeContext) Memory() core.Memory {
return w.s.Memory
}
func (w *WrapedScopeContext) Stack() core.Stack {
return w.s.Stack
}
// type Contract interface { <= this is the core.Contract
// AsDelegate() Contract
// GetOp(n uint64) OpCode
// GetByte(n uint64) byte
// Caller() Address
// Address() Address
// Value() *big.Int
// }
type WrappedContract struct {
c *vm.Contract
}
func (w WrappedContract) AsDelegate() core.Contract {
return WrappedContract{w.c.AsDelegate()}
}
func (w WrappedContract) GetOp(n uint64) core.OpCode {
return core.OpCode(w.c.GetOp(n))
}
func (w WrappedContract) GetByte(n uint64) byte {
return w.c.GetByte(n)
}
func (w WrappedContract) Caller() core.Address {
return core.Address(w.c.Caller())
}
func (w WrappedContract) Address() core.Address {
return core.Address(w.c.Address())
}
func (w WrappedContract) Value() *big.Int {
return w.c.Value()
}
type Node struct { type Node struct {
n *node.Node n *node.Node
} }
func NewNode(n *node.Node) *Node { func NewNode(n *node.Node) *Node {
return &Node{n} return &Node{n}
} }
func (n *Node) Server() core.Server { func (n *Node) Server() core.Server {
return n.Server() return n.Server()
} }
func (n *Node) DataDir() string { func (n *Node) DataDir() string {
return n.n.DataDir() return n.n.DataDir()
} }
func (n *Node) InstanceDir() string { func (n *Node) InstanceDir() string {
return n.n.InstanceDir() return n.n.InstanceDir()
} }
func (n *Node) IPCEndpoint() string { func (n *Node) IPCEndpoint() string {
return n.n.IPCEndpoint() return n.n.IPCEndpoint()
} }
func (n *Node) HTTPEndpoint() string { func (n *Node) HTTPEndpoint() string {
return n.n.HTTPEndpoint() return n.n.HTTPEndpoint()
} }
func (n *Node) WSEndpoint() string { func (n *Node) WSEndpoint() string {
return n.n.WSEndpoint() return n.n.WSEndpoint()
} }
func (n *Node) ResolvePath(x string) string { func (n *Node) ResolvePath(x string) string {
return n.n.ResolvePath(x) return n.n.ResolvePath(x)
} }
type Backend struct{ type Backend struct {
b interfaces.Backend b interfaces.Backend
newTxsFeed event.Feed newTxsFeed event.Feed
newTxsOnce sync.Once newTxsOnce sync.Once
chainFeed event.Feed chainFeed event.Feed
chainOnce sync.Once chainOnce sync.Once
chainHeadFeed event.Feed chainHeadFeed event.Feed
chainHeadOnce sync.Once chainHeadOnce sync.Once
chainSideFeed event.Feed chainSideFeed event.Feed
chainSideOnce sync.Once chainSideOnce sync.Once
logsFeed event.Feed logsFeed event.Feed
logsOnce sync.Once logsOnce sync.Once
pendingLogsFeed event.Feed pendingLogsFeed event.Feed
pendingLogsOnce sync.Once pendingLogsOnce sync.Once
removedLogsFeed event.Feed removedLogsFeed event.Feed
removedLogsOnce sync.Once removedLogsOnce sync.Once
} }
func NewBackend(b interfaces.Backend) *Backend { func NewBackend(b interfaces.Backend) *Backend {
return &Backend{b: b} return &Backend{b: b}
} }
func (b *Backend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { func (b *Backend) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
return b.b.SuggestGasTipCap(ctx) return b.b.SuggestGasTipCap(ctx)
} }
func (b *Backend) ChainDb() restricted.Database { func (b *Backend) ChainDb() restricted.Database {
return b.b.ChainDb() return b.b.ChainDb()
} }
func (b *Backend) ExtRPCEnabled() bool { func (b *Backend) ExtRPCEnabled() bool {
return b.b.ExtRPCEnabled() return b.b.ExtRPCEnabled()
} }
func (b *Backend) RPCGasCap() uint64 { func (b *Backend) RPCGasCap() uint64 {
return b.b.RPCGasCap() return b.b.RPCGasCap()
} }
func (b *Backend) RPCTxFeeCap() float64 { func (b *Backend) RPCTxFeeCap() float64 {
return b.b.RPCTxFeeCap() return b.b.RPCTxFeeCap()
} }
func (b *Backend) UnprotectedAllowed() bool { func (b *Backend) UnprotectedAllowed() bool {
return b.b.UnprotectedAllowed() return b.b.UnprotectedAllowed()
} }
func (b *Backend) SetHead(number uint64) { func (b *Backend) SetHead(number uint64) {
b.b.SetHead(number) b.b.SetHead(number)
} }
func (b *Backend) HeaderByNumber(ctx context.Context, number int64) ([]byte, error) { func (b *Backend) HeaderByNumber(ctx context.Context, number int64) ([]byte, error) {
header, err := b.b.HeaderByNumber(ctx, rpc.BlockNumber(number)) header, err := b.b.HeaderByNumber(ctx, rpc.BlockNumber(number))
if err != nil { return nil, err } if err != nil {
return rlp.EncodeToBytes(header) return nil, err
}
return rlp.EncodeToBytes(header)
} }
func (b *Backend) HeaderByHash(ctx context.Context, hash core.Hash) ([]byte, error) { func (b *Backend) HeaderByHash(ctx context.Context, hash core.Hash) ([]byte, error) {
header, err := b.b.HeaderByHash(ctx, common.Hash(hash)) header, err := b.b.HeaderByHash(ctx, common.Hash(hash))
if err != nil { return nil, err } if err != nil {
return rlp.EncodeToBytes(header) return nil, err
}
return rlp.EncodeToBytes(header)
} }
func (b *Backend) CurrentHeader() []byte { func (b *Backend) CurrentHeader() []byte {
ret, _ := rlp.EncodeToBytes(b.b.CurrentHeader()) ret, _ := rlp.EncodeToBytes(b.b.CurrentHeader())
return ret return ret
} }
func (b *Backend) CurrentBlock() []byte { func (b *Backend) CurrentBlock() []byte {
ret, _ := rlp.EncodeToBytes(b.b.CurrentBlock()) ret, _ := rlp.EncodeToBytes(b.b.CurrentBlock())
return ret return ret
} }
func (b *Backend) BlockByNumber(ctx context.Context, number int64) ([]byte, error){ func (b *Backend) BlockByNumber(ctx context.Context, number int64) ([]byte, error) {
block, err := b.b.BlockByNumber(ctx, rpc.BlockNumber(number)) block, err := b.b.BlockByNumber(ctx, rpc.BlockNumber(number))
if err != nil { return nil, err } if err != nil {
return rlp.EncodeToBytes(block) return nil, err
}
return rlp.EncodeToBytes(block)
} }
func (b *Backend) BlockByHash(ctx context.Context, hash core.Hash) ([]byte, error){ func (b *Backend) BlockByHash(ctx context.Context, hash core.Hash) ([]byte, error) {
block, err := b.b.BlockByHash(ctx, common.Hash(hash)) block, err := b.b.BlockByHash(ctx, common.Hash(hash))
if err != nil { return nil, err } if err != nil {
return rlp.EncodeToBytes(block) return nil, err
}
return rlp.EncodeToBytes(block)
} }
func (b *Backend) GetReceipts(ctx context.Context, hash core.Hash) ([]byte, error) { func (b *Backend) GetReceipts(ctx context.Context, hash core.Hash) ([]byte, error) {
receipts, err := b.b.GetReceipts(ctx, common.Hash(hash)) receipts, err := b.b.GetReceipts(ctx, common.Hash(hash))
if err != nil { return nil, err } if err != nil {
return json.Marshal(receipts) return nil, err
}
return json.Marshal(receipts)
} }
func (b *Backend) GetTd(ctx context.Context, hash core.Hash) *big.Int { func (b *Backend) GetTd(ctx context.Context, hash core.Hash) *big.Int {
return b.b.GetTd(ctx, common.Hash(hash)) return b.b.GetTd(ctx, common.Hash(hash))
} }
func (b *Backend) SendTx(ctx context.Context, signedTx []byte) error { func (b *Backend) SendTx(ctx context.Context, signedTx []byte) error {
tx := new(types.Transaction) tx := new(types.Transaction)
if err := tx.UnmarshalBinary(signedTx); err != nil { if err := tx.UnmarshalBinary(signedTx); err != nil {
return err return err
} }
return b.b.SendTx(ctx, tx) return b.b.SendTx(ctx, tx)
} }
func (b *Backend) GetTransaction(ctx context.Context, txHash core.Hash) ([]byte, core.Hash, uint64, uint64, error) { // RLP Encoded transaction { func (b *Backend) GetTransaction(ctx context.Context, txHash core.Hash) ([]byte, core.Hash, uint64, uint64, error) { // RLP Encoded transaction {
tx, blockHash, blockNumber, index, err := b.b.GetTransaction(ctx, common.Hash(txHash)) tx, blockHash, blockNumber, index, err := b.b.GetTransaction(ctx, common.Hash(txHash))
if err != nil { return nil, core.Hash(blockHash), blockNumber, index, err } if err != nil {
enc, err := tx.MarshalBinary() return nil, core.Hash(blockHash), blockNumber, index, err
return enc, core.Hash(blockHash), blockNumber, index, err }
enc, err := tx.MarshalBinary()
return enc, core.Hash(blockHash), blockNumber, index, err
} }
func (b *Backend) GetPoolTransactions() ([][]byte, error) { func (b *Backend) GetPoolTransactions() ([][]byte, error) {
txs, err := b.b.GetPoolTransactions() txs, err := b.b.GetPoolTransactions()
if err != nil { return nil, err } if err != nil {
results := make([][]byte, len(txs)) return nil, err
for i, tx := range txs { }
results[i], _ = rlp.EncodeToBytes(tx) results := make([][]byte, len(txs))
} for i, tx := range txs {
return results, nil results[i], _ = rlp.EncodeToBytes(tx)
}
return results, nil
} }
func (b *Backend) GetPoolTransaction(txHash core.Hash) []byte { func (b *Backend) GetPoolTransaction(txHash core.Hash) []byte {
tx := b.b.GetPoolTransaction(common.Hash(txHash)) tx := b.b.GetPoolTransaction(common.Hash(txHash))
if tx == nil { return []byte{} } if tx == nil {
enc, _ := rlp.EncodeToBytes(tx) return []byte{}
return enc }
enc, _ := rlp.EncodeToBytes(tx)
return enc
} }
func (b *Backend) GetPoolNonce(ctx context.Context, addr core.Address) (uint64, error) { func (b *Backend) GetPoolNonce(ctx context.Context, addr core.Address) (uint64, error) {
return b.b.GetPoolNonce(ctx, common.Address(addr)) return b.b.GetPoolNonce(ctx, common.Address(addr))
} }
func (b *Backend) Stats() (pending int, queued int) { func (b *Backend) Stats() (pending int, queued int) {
return b.b.Stats() return b.b.Stats()
} }
func (b *Backend) TxPoolContent() (map[core.Address][][]byte, map[core.Address][][]byte) { func (b *Backend) TxPoolContent() (map[core.Address][][]byte, map[core.Address][][]byte) {
pending, queued := b.b.TxPoolContent() pending, queued := b.b.TxPoolContent()
trpending, trqueued := make(map[core.Address][][]byte), make(map[core.Address][][]byte) trpending, trqueued := make(map[core.Address][][]byte), make(map[core.Address][][]byte)
for k, v := range pending { for k, v := range pending {
trpending[core.Address(k)] = make([][]byte, len(v)) trpending[core.Address(k)] = make([][]byte, len(v))
for i, tx := range v { for i, tx := range v {
trpending[core.Address(k)][i], _ = tx.MarshalBinary() trpending[core.Address(k)][i], _ = tx.MarshalBinary()
} }
} }
for k, v := range queued { for k, v := range queued {
trqueued[core.Address(k)] = make([][]byte, len(v)) trqueued[core.Address(k)] = make([][]byte, len(v))
for i, tx := range v { for i, tx := range v {
trpending[core.Address(k)][i], _ = tx.MarshalBinary() trpending[core.Address(k)][i], _ = tx.MarshalBinary()
} }
} }
return trpending, trqueued return trpending, trqueued
} // RLP encoded transactions } // RLP encoded transactions
func (b *Backend) BloomStatus() (uint64, uint64) { func (b *Backend) BloomStatus() (uint64, uint64) {
return b.b.BloomStatus() return b.b.BloomStatus()
} }
func (b *Backend) GetLogs(ctx context.Context, blockHash core.Hash) ([][]byte, error) { func (b *Backend) GetLogs(ctx context.Context, blockHash core.Hash) ([][]byte, error) {
logs, err := b.b.GetLogs(ctx, common.Hash(blockHash)) logs, err := b.b.GetLogs(ctx, common.Hash(blockHash))
if err != nil { return nil, err } if err != nil {
encLogs := make([][]byte, len(logs)) return nil, err
for i, log := range logs { }
encLogs[i], _ = rlp.EncodeToBytes(log) encLogs := make([][]byte, len(logs))
} for i, log := range logs {
return encLogs, nil encLogs[i], _ = rlp.EncodeToBytes(log)
}
return encLogs, nil
} // []RLP encoded logs } // []RLP encoded logs
type dl struct {
type dl struct{ dl *downloader.Downloader
dl *downloader.Downloader
} }
type progress struct{ type progress struct {
p ethereum.SyncProgress p ethereum.SyncProgress
} }
func (p *progress) StartingBlock() uint64 { func (p *progress) StartingBlock() uint64 {
return p.p.StartingBlock return p.p.StartingBlock
} }
func (p *progress) CurrentBlock() uint64 { func (p *progress) CurrentBlock() uint64 {
return p.p.CurrentBlock return p.p.CurrentBlock
} }
func (p *progress) HighestBlock() uint64 { func (p *progress) HighestBlock() uint64 {
return p.p.HighestBlock return p.p.HighestBlock
} }
func (p *progress) PulledStates() uint64 { func (p *progress) PulledStates() uint64 {
return p.p.PulledStates return p.p.PulledStates
} }
func (p *progress) KnownStates() uint64 { func (p *progress) KnownStates() uint64 {
return p.p.KnownStates return p.p.KnownStates
} }
func (d *dl) Progress() core.Progress { func (d *dl) Progress() core.Progress {
return &progress{d.dl.Progress()} return &progress{d.dl.Progress()}
} }
func (b *Backend) Downloader() core.Downloader { func (b *Backend) Downloader() core.Downloader {
return &dl{b.b.Downloader()} return &dl{b.b.Downloader()}
} }
func (b *Backend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) core.Subscription { func (b *Backend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) core.Subscription {
var sub event.Subscription var sub event.Subscription
b.newTxsOnce.Do(func() { b.newTxsOnce.Do(func() {
bch := make(chan gcore.NewTxsEvent, 100) bch := make(chan gcore.NewTxsEvent, 100)
sub = b.b.SubscribeNewTxsEvent(bch) sub = b.b.SubscribeNewTxsEvent(bch)
go func(){ go func() {
for { for {
select { select {
case item := <-bch: case item := <-bch:
txe := core.NewTxsEvent{ txe := core.NewTxsEvent{
Txs: make([][]byte, len(item.Txs)), Txs: make([][]byte, len(item.Txs)),
} }
for i, tx := range item.Txs { for i, tx := range item.Txs {
txe.Txs[i], _ = tx.MarshalBinary() txe.Txs[i], _ = tx.MarshalBinary()
} }
b.newTxsFeed.Send(txe) b.newTxsFeed.Send(txe)
case err := <-sub.Err(): case err := <-sub.Err():
log.Warn("Subscription error for NewTxs", "err", err) log.Warn("Subscription error for NewTxs", "err", err)
return return
} }
} }
}() }()
}) })
return b.newTxsFeed.Subscribe(ch) return b.newTxsFeed.Subscribe(ch)
} }
func (b *Backend) SubscribeChainEvent(ch chan<- core.ChainEvent) core.Subscription { func (b *Backend) SubscribeChainEvent(ch chan<- core.ChainEvent) core.Subscription {
var sub event.Subscription var sub event.Subscription
b.chainOnce.Do(func() { b.chainOnce.Do(func() {
bch := make(chan gcore.ChainEvent, 100) bch := make(chan gcore.ChainEvent, 100)
sub = b.b.SubscribeChainEvent(bch) sub = b.b.SubscribeChainEvent(bch)
go func(){ go func() {
for { for {
select { select {
case item := <-bch: case item := <-bch:
ce := core.ChainEvent{ ce := core.ChainEvent{
Hash: core.Hash(item.Hash), Hash: core.Hash(item.Hash),
} }
ce.Block, _ = rlp.EncodeToBytes(item.Block) ce.Block, _ = rlp.EncodeToBytes(item.Block)
ce.Logs, _ = rlp.EncodeToBytes(item.Logs) ce.Logs, _ = rlp.EncodeToBytes(item.Logs)
b.chainFeed.Send(ce) b.chainFeed.Send(ce)
case err := <-sub.Err(): case err := <-sub.Err():
log.Warn("Subscription error for Chain", "err", err) log.Warn("Subscription error for Chain", "err", err)
return return
} }
} }
}() }()
}) })
return b.chainFeed.Subscribe(ch) return b.chainFeed.Subscribe(ch)
} }
func (b *Backend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) core.Subscription { func (b *Backend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) core.Subscription {
var sub event.Subscription var sub event.Subscription
b.chainHeadOnce.Do(func() { b.chainHeadOnce.Do(func() {
bch := make(chan gcore.ChainHeadEvent, 100) bch := make(chan gcore.ChainHeadEvent, 100)
sub = b.b.SubscribeChainHeadEvent(bch) sub = b.b.SubscribeChainHeadEvent(bch)
go func(){ go func() {
for { for {
select { select {
case item := <-bch: case item := <-bch:
che := core.ChainHeadEvent{} che := core.ChainHeadEvent{}
che.Block, _ = rlp.EncodeToBytes(item.Block) che.Block, _ = rlp.EncodeToBytes(item.Block)
b.chainHeadFeed.Send(che) b.chainHeadFeed.Send(che)
case err := <-sub.Err(): case err := <-sub.Err():
log.Warn("Subscription error for ChainHead", "err", err) log.Warn("Subscription error for ChainHead", "err", err)
return return
} }
} }
}() }()
}) })
return b.chainHeadFeed.Subscribe(ch) return b.chainHeadFeed.Subscribe(ch)
} }
func (b *Backend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) core.Subscription { func (b *Backend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) core.Subscription {
var sub event.Subscription var sub event.Subscription
b.chainSideOnce.Do(func() { b.chainSideOnce.Do(func() {
bch := make(chan gcore.ChainSideEvent, 100) bch := make(chan gcore.ChainSideEvent, 100)
sub = b.b.SubscribeChainSideEvent(bch) sub = b.b.SubscribeChainSideEvent(bch)
go func(){ go func() {
for { for {
select { select {
case item := <-bch: case item := <-bch:
cse := core.ChainSideEvent{} cse := core.ChainSideEvent{}
cse.Block, _ = rlp.EncodeToBytes(item.Block) cse.Block, _ = rlp.EncodeToBytes(item.Block)
b.chainSideFeed.Send(cse) b.chainSideFeed.Send(cse)
case err := <-sub.Err(): case err := <-sub.Err():
log.Warn("Subscription error for ChainSide", "err", err) log.Warn("Subscription error for ChainSide", "err", err)
return return
} }
} }
}() }()
}) })
return b.chainSideFeed.Subscribe(ch) return b.chainSideFeed.Subscribe(ch)
} }
func (b *Backend) SubscribeLogsEvent(ch chan<- [][]byte) core.Subscription { func (b *Backend) SubscribeLogsEvent(ch chan<- [][]byte) core.Subscription {
var sub event.Subscription var sub event.Subscription
b.logsOnce.Do(func() { b.logsOnce.Do(func() {
bch := make(chan []*types.Log, 100) bch := make(chan []*types.Log, 100)
sub = b.b.SubscribeLogsEvent(bch) sub = b.b.SubscribeLogsEvent(bch)
go func(){ go func() {
for { for {
select { select {
case item := <-bch: case item := <-bch:
logs := make([][]byte, len(item)) logs := make([][]byte, len(item))
for i, log := range item { for i, log := range item {
logs[i], _ = rlp.EncodeToBytes(log) logs[i], _ = rlp.EncodeToBytes(log)
} }
b.logsFeed.Send(logs) b.logsFeed.Send(logs)
case err := <-sub.Err(): case err := <-sub.Err():
log.Warn("Subscription error for Logs", "err", err) log.Warn("Subscription error for Logs", "err", err)
return return
} }
} }
}() }()
}) })
return b.logsFeed.Subscribe(ch) return b.logsFeed.Subscribe(ch)
} // []RLP encoded logs } // []RLP encoded logs
func (b *Backend) SubscribePendingLogsEvent(ch chan<- [][]byte) core.Subscription { func (b *Backend) SubscribePendingLogsEvent(ch chan<- [][]byte) core.Subscription {
var sub event.Subscription var sub event.Subscription
b.pendingLogsOnce.Do(func() { b.pendingLogsOnce.Do(func() {
bch := make(chan []*types.Log, 100) bch := make(chan []*types.Log, 100)
sub = b.b.SubscribePendingLogsEvent(bch) sub = b.b.SubscribePendingLogsEvent(bch)
go func(){ go func() {
for { for {
select { select {
case item := <-bch: case item := <-bch:
logs := make([][]byte, len(item)) logs := make([][]byte, len(item))
for i, log := range item { for i, log := range item {
logs[i], _ = rlp.EncodeToBytes(log) logs[i], _ = rlp.EncodeToBytes(log)
} }
b.pendingLogsFeed.Send(logs) b.pendingLogsFeed.Send(logs)
case err := <-sub.Err(): case err := <-sub.Err():
log.Warn("Subscription error for PendingLogs", "err", err) log.Warn("Subscription error for PendingLogs", "err", err)
return return
} }
} }
}() }()
}) })
return b.pendingLogsFeed.Subscribe(ch) return b.pendingLogsFeed.Subscribe(ch)
} // RLP Encoded logs } // RLP Encoded logs
func (b *Backend) SubscribeRemovedLogsEvent(ch chan<- []byte) core.Subscription { func (b *Backend) SubscribeRemovedLogsEvent(ch chan<- []byte) core.Subscription {
var sub event.Subscription var sub event.Subscription
b.removedLogsOnce.Do(func() { b.removedLogsOnce.Do(func() {
bch := make(chan gcore.RemovedLogsEvent, 100) bch := make(chan gcore.RemovedLogsEvent, 100)
sub = b.b.SubscribeRemovedLogsEvent(bch) sub = b.b.SubscribeRemovedLogsEvent(bch)
go func(){ go func() {
for { for {
select { select {
case item := <-bch: case item := <-bch:
logs := make([][]byte, len(item.Logs)) logs := make([][]byte, len(item.Logs))
for i, log := range item.Logs { for i, log := range item.Logs {
logs[i], _ = rlp.EncodeToBytes(log) logs[i], _ = rlp.EncodeToBytes(log)
} }
b.removedLogsFeed.Send(item) b.removedLogsFeed.Send(item)
case err := <-sub.Err(): case err := <-sub.Err():
log.Warn("Subscription error for RemovedLogs", "err", err) log.Warn("Subscription error for RemovedLogs", "err", err)
return return
} }
} }
}() }()
}) })
return b.removedLogsFeed.Subscribe(ch) return b.removedLogsFeed.Subscribe(ch)
} // RLP encoded logs } // RLP encoded logs