Return eth blocks not tipsets in subscriptions

This commit is contained in:
Ian Davis 2022-11-16 12:57:03 +00:00
parent 314fb31886
commit 5a1f8d8f28
2 changed files with 43 additions and 11 deletions

View File

@ -97,12 +97,13 @@ type EthModule struct {
var _ EthModuleAPI = (*EthModule)(nil) var _ EthModuleAPI = (*EthModule)(nil)
type EthEvent struct { type EthEvent struct {
EthModuleAPI
Chain *store.ChainStore Chain *store.ChainStore
EventFilterManager *filter.EventFilterManager EventFilterManager *filter.EventFilterManager
TipSetFilterManager *filter.TipSetFilterManager TipSetFilterManager *filter.TipSetFilterManager
MemPoolFilterManager *filter.MemPoolFilterManager MemPoolFilterManager *filter.MemPoolFilterManager
FilterStore filter.FilterStore FilterStore filter.FilterStore
SubManager ethSubscriptionManager SubManager *EthSubscriptionManager
MaxFilterHeightRange abi.ChainEpoch MaxFilterHeightRange abi.ChainEpoch
} }
@ -1136,6 +1137,9 @@ const (
) )
func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
if e.SubManager == nil {
return nil, api.ErrNotSupported
}
// Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the // Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the
// method to be "eth_subscription". This probably doesn't matter in practice. // method to be "eth_subscription". This probably doesn't matter in practice.
@ -1183,6 +1187,10 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *a
} }
func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) { func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) {
if e.SubManager == nil {
return false, api.ErrNotSupported
}
filters, err := e.SubManager.StopSubscription(ctx, string(id)) filters, err := e.SubManager.StopSubscription(ctx, string(id))
if err != nil { if err != nil {
return false, nil return false, nil
@ -1321,12 +1329,13 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) {
return res, nil return res, nil
} }
type ethSubscriptionManager struct { type EthSubscriptionManager struct {
EthModuleAPI
mu sync.Mutex mu sync.Mutex
subs map[string]*ethSubscription subs map[string]*ethSubscription
} }
func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) {
id, err := uuid.NewRandom() id, err := uuid.NewRandom()
if err != nil { if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err) return nil, xerrors.Errorf("new uuid: %w", err)
@ -1335,6 +1344,7 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
ctx, quit := context.WithCancel(ctx) ctx, quit := context.WithCancel(ctx)
sub := &ethSubscription{ sub := &ethSubscription{
EthModuleAPI: e.EthModuleAPI,
id: id.String(), id: id.String(),
in: make(chan interface{}, 200), in: make(chan interface{}, 200),
out: make(chan api.EthSubscriptionResponse), out: make(chan api.EthSubscriptionResponse),
@ -1353,7 +1363,7 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
return sub, nil return sub, nil
} }
func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) { func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
@ -1368,6 +1378,7 @@ func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string
} }
type ethSubscription struct { type ethSubscription struct {
EthModuleAPI
id string id string
in chan interface{} in chan interface{}
out chan api.EthSubscriptionResponse out chan api.EthSubscriptionResponse
@ -1400,7 +1411,24 @@ func (e *ethSubscription) start(ctx context.Context) {
case *filter.CollectedEvent: case *filter.CollectedEvent:
resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt}) resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt})
case *types.TipSet: case *types.TipSet:
resp.Result = vt // Sadly convoluted since the logic for conversion to eth block is long and buried away
// in unexported methods of EthModule
tsCid, err := vt.Key().Cid()
if err != nil {
break
}
hash, err := api.NewEthHashFromCid(tsCid)
if err != nil {
break
}
eb, err := e.EthGetBlockByHash(ctx, hash, true)
if err != nil {
break
}
resp.Result = eb
default: default:
log.Warnf("unexpected subscription value type: %T", vt) log.Warnf("unexpected subscription value type: %T", vt)
} }

View File

@ -31,9 +31,10 @@ type EventAPI struct {
var _ events.EventAPI = &EventAPI{} var _ events.EventAPI = &EventAPI{}
func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) { func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.EthModuleAPI) (*full.EthEvent, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, em full.EthModuleAPI) (*full.EthEvent, error) {
ee := &full.EthEvent{ ee := &full.EthEvent{
EthModuleAPI: em,
Chain: cs, Chain: cs,
MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange), MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange),
} }
@ -44,6 +45,9 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy
return ee, nil return ee, nil
} }
ee.SubManager = &full.EthSubscriptionManager{
EthModuleAPI: em,
}
ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters) ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters)
// Start garbage collection for filters // Start garbage collection for filters