diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 41fd3e5c3..9a25b810b 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -97,12 +97,13 @@ type EthModule struct { var _ EthModuleAPI = (*EthModule)(nil) type EthEvent struct { + EthModuleAPI Chain *store.ChainStore EventFilterManager *filter.EventFilterManager TipSetFilterManager *filter.TipSetFilterManager MemPoolFilterManager *filter.MemPoolFilterManager FilterStore filter.FilterStore - SubManager ethSubscriptionManager + SubManager *EthSubscriptionManager MaxFilterHeightRange abi.ChainEpoch } @@ -1136,6 +1137,9 @@ const ( ) func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { + if e.SubManager == nil { + return nil, api.ErrNotSupported + } // Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the // method to be "eth_subscription". This probably doesn't matter in practice. @@ -1183,6 +1187,10 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *a } func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) { + if e.SubManager == nil { + return false, api.ErrNotSupported + } + filters, err := e.SubManager.StopSubscription(ctx, string(id)) if err != nil { return false, nil @@ -1321,12 +1329,13 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) { return res, nil } -type ethSubscriptionManager struct { +type EthSubscriptionManager struct { + EthModuleAPI mu sync.Mutex subs map[string]*ethSubscription } -func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { +func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { id, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) @@ -1335,10 +1344,11 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub ctx, quit := context.WithCancel(ctx) sub := ðSubscription{ - id: id.String(), - in: make(chan interface{}, 200), - out: make(chan api.EthSubscriptionResponse), - quit: quit, + EthModuleAPI: e.EthModuleAPI, + id: id.String(), + in: make(chan interface{}, 200), + out: make(chan api.EthSubscriptionResponse), + quit: quit, } e.mu.Lock() @@ -1353,7 +1363,7 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub return sub, nil } -func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) { +func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) { e.mu.Lock() defer e.mu.Unlock() @@ -1368,6 +1378,7 @@ func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string } type ethSubscription struct { + EthModuleAPI id string in chan interface{} out chan api.EthSubscriptionResponse @@ -1400,7 +1411,24 @@ func (e *ethSubscription) start(ctx context.Context) { case *filter.CollectedEvent: resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt}) case *types.TipSet: - resp.Result = vt + // Sadly convoluted since the logic for conversion to eth block is long and buried away + // in unexported methods of EthModule + tsCid, err := vt.Key().Cid() + if err != nil { + break + } + + hash, err := api.NewEthHashFromCid(tsCid) + if err != nil { + break + } + + eb, err := e.EthGetBlockByHash(ctx, hash, true) + if err != nil { + break + } + + resp.Result = eb default: log.Warnf("unexpected subscription value type: %T", vt) } diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 1eb58b4b1..4f25f0722 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -31,9 +31,10 @@ type EventAPI struct { var _ events.EventAPI = &EventAPI{} -func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) { +func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.EthModuleAPI) (*full.EthEvent, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, em full.EthModuleAPI) (*full.EthEvent, error) { ee := &full.EthEvent{ + EthModuleAPI: em, Chain: cs, MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange), } @@ -44,6 +45,9 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy return ee, nil } + ee.SubManager = &full.EthSubscriptionManager{ + EthModuleAPI: em, + } ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters) // Start garbage collection for filters