forked from cerc-io/plugeth
cmd/utils, rpc/comms: stop XEth when IPC connection ends
There are a bunch of changes required to make this work: - in miner: allow unregistering agents, fix RemoteAgent.Stop - in eth/filters: make FilterSystem.Stop not crash - in rpc/comms: move listen loop to platform-independent code Fixes #1930. I ran the shell loop there for a few minutes and didn't see any changes in the memory profile.
This commit is contained in:
parent
56f8699a6c
commit
fbdb44dcc1
@ -627,17 +627,14 @@ func StartIPC(eth *eth.Ethereum, ctx *cli.Context) error {
|
||||
Endpoint: IpcSocketPath(ctx),
|
||||
}
|
||||
|
||||
initializer := func(conn net.Conn) (shared.EthereumApi, error) {
|
||||
initializer := func(conn net.Conn) (comms.Stopper, shared.EthereumApi, error) {
|
||||
fe := useragent.NewRemoteFrontend(conn, eth.AccountManager())
|
||||
xeth := xeth.New(eth, fe)
|
||||
codec := codec.JSON
|
||||
|
||||
apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec, xeth, eth)
|
||||
apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec.JSON, xeth, eth)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return api.Merge(apis...), nil
|
||||
return xeth, api.Merge(apis...), nil
|
||||
}
|
||||
|
||||
return comms.StartIpc(config, codec.JSON, initializer)
|
||||
|
@ -31,30 +31,32 @@ import (
|
||||
// block, transaction and log events. The Filtering system can be used to listen
|
||||
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
|
||||
type FilterSystem struct {
|
||||
eventMux *event.TypeMux
|
||||
|
||||
filterMu sync.RWMutex
|
||||
filterId int
|
||||
filters map[int]*Filter
|
||||
created map[int]time.Time
|
||||
|
||||
quit chan struct{}
|
||||
sub event.Subscription
|
||||
}
|
||||
|
||||
// NewFilterSystem returns a newly allocated filter manager
|
||||
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
|
||||
fs := &FilterSystem{
|
||||
eventMux: mux,
|
||||
filters: make(map[int]*Filter),
|
||||
created: make(map[int]time.Time),
|
||||
filters: make(map[int]*Filter),
|
||||
created: make(map[int]time.Time),
|
||||
}
|
||||
fs.sub = mux.Subscribe(
|
||||
//core.PendingBlockEvent{},
|
||||
core.ChainEvent{},
|
||||
core.TxPreEvent{},
|
||||
vm.Logs(nil),
|
||||
)
|
||||
go fs.filterLoop()
|
||||
return fs
|
||||
}
|
||||
|
||||
// Stop quits the filter loop required for polling events
|
||||
func (fs *FilterSystem) Stop() {
|
||||
close(fs.quit)
|
||||
fs.sub.Unsubscribe()
|
||||
}
|
||||
|
||||
// Add adds a filter to the filter manager
|
||||
@ -89,57 +91,37 @@ func (fs *FilterSystem) Get(id int) *Filter {
|
||||
// filterLoop waits for specific events from ethereum and fires their handlers
|
||||
// when the filter matches the requirements.
|
||||
func (fs *FilterSystem) filterLoop() {
|
||||
// Subscribe to events
|
||||
eventCh := fs.eventMux.Subscribe(
|
||||
//core.PendingBlockEvent{},
|
||||
core.ChainEvent{},
|
||||
core.TxPreEvent{},
|
||||
vm.Logs(nil),
|
||||
).Chan()
|
||||
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case <-fs.quit:
|
||||
break out
|
||||
case event, ok := <-eventCh:
|
||||
if !ok {
|
||||
// Event subscription closed, set the channel to nil to stop spinning
|
||||
eventCh = nil
|
||||
continue
|
||||
for event := range fs.sub.Chan() {
|
||||
switch ev := event.Data.(type) {
|
||||
case core.ChainEvent:
|
||||
fs.filterMu.RLock()
|
||||
for id, filter := range fs.filters {
|
||||
if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
|
||||
filter.BlockCallback(ev.Block, ev.Logs)
|
||||
}
|
||||
}
|
||||
// A real event arrived, notify the registered filters
|
||||
switch ev := event.Data.(type) {
|
||||
case core.ChainEvent:
|
||||
fs.filterMu.RLock()
|
||||
for id, filter := range fs.filters {
|
||||
if filter.BlockCallback != nil && fs.created[id].Before(event.Time) {
|
||||
filter.BlockCallback(ev.Block, ev.Logs)
|
||||
}
|
||||
}
|
||||
fs.filterMu.RUnlock()
|
||||
fs.filterMu.RUnlock()
|
||||
|
||||
case core.TxPreEvent:
|
||||
fs.filterMu.RLock()
|
||||
for id, filter := range fs.filters {
|
||||
if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
|
||||
filter.TransactionCallback(ev.Tx)
|
||||
}
|
||||
case core.TxPreEvent:
|
||||
fs.filterMu.RLock()
|
||||
for id, filter := range fs.filters {
|
||||
if filter.TransactionCallback != nil && fs.created[id].Before(event.Time) {
|
||||
filter.TransactionCallback(ev.Tx)
|
||||
}
|
||||
fs.filterMu.RUnlock()
|
||||
|
||||
case vm.Logs:
|
||||
fs.filterMu.RLock()
|
||||
for id, filter := range fs.filters {
|
||||
if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
|
||||
msgs := filter.FilterLogs(ev)
|
||||
if len(msgs) > 0 {
|
||||
filter.LogsCallback(msgs)
|
||||
}
|
||||
}
|
||||
}
|
||||
fs.filterMu.RUnlock()
|
||||
}
|
||||
fs.filterMu.RUnlock()
|
||||
|
||||
case vm.Logs:
|
||||
fs.filterMu.RLock()
|
||||
for id, filter := range fs.filters {
|
||||
if filter.LogsCallback != nil && fs.created[id].Before(event.Time) {
|
||||
msgs := filter.FilterLogs(ev)
|
||||
if len(msgs) > 0 {
|
||||
filter.LogsCallback(msgs)
|
||||
}
|
||||
}
|
||||
}
|
||||
fs.filterMu.RUnlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -133,10 +133,13 @@ func (self *Miner) Register(agent Agent) {
|
||||
if self.Mining() {
|
||||
agent.Start()
|
||||
}
|
||||
|
||||
self.worker.register(agent)
|
||||
}
|
||||
|
||||
func (self *Miner) Unregister(agent Agent) {
|
||||
self.worker.unregister(agent)
|
||||
}
|
||||
|
||||
func (self *Miner) Mining() bool {
|
||||
return atomic.LoadInt32(&self.mining) > 0
|
||||
}
|
||||
@ -146,7 +149,7 @@ func (self *Miner) HashRate() (tot int64) {
|
||||
// do we care this might race? is it worth we're rewriting some
|
||||
// aspects of the worker/locking up agents so we can get an accurate
|
||||
// hashrate?
|
||||
for _, agent := range self.worker.agents {
|
||||
for agent := range self.worker.agents {
|
||||
tot += agent.GetHashRate()
|
||||
}
|
||||
return
|
||||
|
@ -48,9 +48,10 @@ type RemoteAgent struct {
|
||||
}
|
||||
|
||||
func NewRemoteAgent() *RemoteAgent {
|
||||
agent := &RemoteAgent{work: make(map[common.Hash]*Work), hashrate: make(map[common.Hash]hashrate)}
|
||||
|
||||
return agent
|
||||
return &RemoteAgent{
|
||||
work: make(map[common.Hash]*Work),
|
||||
hashrate: make(map[common.Hash]hashrate),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *RemoteAgent) SubmitHashrate(id common.Hash, rate uint64) {
|
||||
@ -75,8 +76,12 @@ func (a *RemoteAgent) Start() {
|
||||
}
|
||||
|
||||
func (a *RemoteAgent) Stop() {
|
||||
close(a.quit)
|
||||
close(a.workCh)
|
||||
if a.quit != nil {
|
||||
close(a.quit)
|
||||
}
|
||||
if a.workCh != nil {
|
||||
close(a.workCh)
|
||||
}
|
||||
}
|
||||
|
||||
// GetHashRate returns the accumulated hashrate of all identifier combined
|
||||
|
@ -92,7 +92,7 @@ type Result struct {
|
||||
type worker struct {
|
||||
mu sync.Mutex
|
||||
|
||||
agents []Agent
|
||||
agents map[Agent]struct{}
|
||||
recv chan *Result
|
||||
mux *event.TypeMux
|
||||
quit chan struct{}
|
||||
@ -136,6 +136,7 @@ func newWorker(coinbase common.Address, eth core.Backend) *worker {
|
||||
coinbase: coinbase,
|
||||
txQueue: make(map[common.Hash]*types.Transaction),
|
||||
quit: make(chan struct{}),
|
||||
agents: make(map[Agent]struct{}),
|
||||
fullValidation: false,
|
||||
}
|
||||
go worker.update()
|
||||
@ -180,7 +181,7 @@ func (self *worker) start() {
|
||||
atomic.StoreInt32(&self.mining, 1)
|
||||
|
||||
// spin up agents
|
||||
for _, agent := range self.agents {
|
||||
for agent := range self.agents {
|
||||
agent.Start()
|
||||
}
|
||||
}
|
||||
@ -190,16 +191,14 @@ func (self *worker) stop() {
|
||||
defer self.mu.Unlock()
|
||||
|
||||
if atomic.LoadInt32(&self.mining) == 1 {
|
||||
var keep []Agent
|
||||
// stop all agents
|
||||
for _, agent := range self.agents {
|
||||
// Stop all agents.
|
||||
for agent := range self.agents {
|
||||
agent.Stop()
|
||||
// keep all that's not a cpu agent
|
||||
if _, ok := agent.(*CpuAgent); !ok {
|
||||
keep = append(keep, agent)
|
||||
// Remove CPU agents.
|
||||
if _, ok := agent.(*CpuAgent); ok {
|
||||
delete(self.agents, agent)
|
||||
}
|
||||
}
|
||||
self.agents = keep
|
||||
}
|
||||
|
||||
atomic.StoreInt32(&self.mining, 0)
|
||||
@ -209,10 +208,17 @@ func (self *worker) stop() {
|
||||
func (self *worker) register(agent Agent) {
|
||||
self.mu.Lock()
|
||||
defer self.mu.Unlock()
|
||||
self.agents = append(self.agents, agent)
|
||||
self.agents[agent] = struct{}{}
|
||||
agent.SetReturnCh(self.recv)
|
||||
}
|
||||
|
||||
func (self *worker) unregister(agent Agent) {
|
||||
self.mu.Lock()
|
||||
defer self.mu.Unlock()
|
||||
delete(self.agents, agent)
|
||||
agent.Stop()
|
||||
}
|
||||
|
||||
func (self *worker) update() {
|
||||
eventSub := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
|
||||
defer eventSub.Unsubscribe()
|
||||
@ -341,11 +347,9 @@ func (self *worker) push(work *Work) {
|
||||
glog.Infoln("You turn back and abort mining")
|
||||
return
|
||||
}
|
||||
|
||||
// push new work to agents
|
||||
for _, agent := range self.agents {
|
||||
for agent := range self.agents {
|
||||
atomic.AddInt32(&self.atWork, 1)
|
||||
|
||||
if agent.Work() != nil {
|
||||
agent.Work() <- work
|
||||
}
|
||||
|
@ -20,13 +20,22 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/rpc/codec"
|
||||
"github.com/ethereum/go-ethereum/rpc/shared"
|
||||
)
|
||||
|
||||
type Stopper interface {
|
||||
Stop()
|
||||
}
|
||||
|
||||
type InitFunc func(conn net.Conn) (Stopper, shared.EthereumApi, error)
|
||||
|
||||
type IpcConfig struct {
|
||||
Endpoint string
|
||||
}
|
||||
@ -90,8 +99,38 @@ func NewIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
|
||||
}
|
||||
|
||||
// Start IPC server
|
||||
func StartIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error {
|
||||
return startIpc(cfg, codec, initializer)
|
||||
func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error {
|
||||
l, err := ipcListen(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go ipcLoop(cfg, codec, initializer, l)
|
||||
return nil
|
||||
}
|
||||
|
||||
func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) {
|
||||
glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
|
||||
defer os.Remove(cfg.Endpoint)
|
||||
defer l.Close()
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
glog.V(logger.Debug).Infof("accept: %v", err)
|
||||
return
|
||||
}
|
||||
id := newIpcConnId()
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
glog.V(logger.Debug).Infof("new connection with id %06d started", id)
|
||||
stopper, api, err := initializer(conn)
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Unable to initialize IPC connection: %v", err)
|
||||
return
|
||||
}
|
||||
defer stopper.Stop()
|
||||
handle(id, conn, api, codec)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func newIpcConnId() int {
|
||||
|
@ -23,8 +23,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/rpc/codec"
|
||||
"github.com/ethereum/go-ethereum/rpc/shared"
|
||||
"github.com/ethereum/go-ethereum/rpc/useragent"
|
||||
@ -69,44 +67,16 @@ func (self *ipcClient) reconnect() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error {
|
||||
func ipcListen(cfg IpcConfig) (net.Listener, error) {
|
||||
// Ensure the IPC path exists and remove any previous leftover
|
||||
if err := os.MkdirAll(filepath.Dir(cfg.Endpoint), 0751); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
os.Remove(cfg.Endpoint)
|
||||
|
||||
l, err := net.ListenUnix("unix", &net.UnixAddr{Name: cfg.Endpoint, Net: "unix"})
|
||||
l, err := net.Listen("unix", cfg.Endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
os.Chmod(cfg.Endpoint, 0600)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
conn, err := l.AcceptUnix()
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
id := newIpcConnId()
|
||||
glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id)
|
||||
|
||||
api, err := initializer(conn)
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err)
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
go handle(id, conn, api, codec)
|
||||
}
|
||||
|
||||
os.Remove(cfg.Endpoint)
|
||||
}()
|
||||
|
||||
glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
|
||||
|
||||
return nil
|
||||
return l, nil
|
||||
}
|
||||
|
@ -28,8 +28,6 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/rpc/codec"
|
||||
"github.com/ethereum/go-ethereum/rpc/shared"
|
||||
"github.com/ethereum/go-ethereum/rpc/useragent"
|
||||
@ -688,40 +686,12 @@ func (self *ipcClient) reconnect() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func startIpc(cfg IpcConfig, codec codec.Codec, initializer func(conn net.Conn) (shared.EthereumApi, error)) error {
|
||||
func ipcListen(cfg IpcConfig) (net.Listener, error) {
|
||||
os.Remove(cfg.Endpoint) // in case it still exists from a previous run
|
||||
|
||||
l, err := Listen(cfg.Endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
os.Chmod(cfg.Endpoint, 0600)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Error accepting ipc connection - %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
id := newIpcConnId()
|
||||
glog.V(logger.Debug).Infof("New IPC connection with id %06d started\n", id)
|
||||
|
||||
api, err := initializer(conn)
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("Unable to initialize IPC connection - %v\n", err)
|
||||
conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
go handle(id, conn, api, codec)
|
||||
}
|
||||
|
||||
os.Remove(cfg.Endpoint)
|
||||
}()
|
||||
|
||||
glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
|
||||
|
||||
return nil
|
||||
return l, nil
|
||||
}
|
||||
|
14
xeth/xeth.go
14
xeth/xeth.go
@ -113,19 +113,15 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
|
||||
if frontend == nil {
|
||||
xeth.frontend = dummyFrontend{}
|
||||
}
|
||||
state, err := xeth.backend.BlockChain().State()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
state, _ := xeth.backend.BlockChain().State()
|
||||
xeth.state = NewState(xeth, state)
|
||||
|
||||
go xeth.start()
|
||||
|
||||
return xeth
|
||||
}
|
||||
|
||||
func (self *XEth) start() {
|
||||
timer := time.NewTicker(2 * time.Second)
|
||||
defer timer.Stop()
|
||||
done:
|
||||
for {
|
||||
select {
|
||||
@ -171,8 +167,12 @@ done:
|
||||
}
|
||||
}
|
||||
|
||||
func (self *XEth) stop() {
|
||||
// Stop releases any resources associated with self.
|
||||
// It may not be called more than once.
|
||||
func (self *XEth) Stop() {
|
||||
close(self.quit)
|
||||
self.filterManager.Stop()
|
||||
self.backend.Miner().Unregister(self.agent)
|
||||
}
|
||||
|
||||
func cAddress(a []string) []common.Address {
|
||||
|
Loading…
Reference in New Issue
Block a user