Rename internal events modules for clarity
This commit is contained in:
parent
af6cecbd32
commit
ce38c31121
@ -26,7 +26,7 @@ type cache struct {
|
|||||||
uncachedAPI
|
uncachedAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCache(api EventAPI, gcConfidence abi.ChainEpoch) *cache {
|
func newCache(api EventHelperAPI, gcConfidence abi.ChainEpoch) *cache {
|
||||||
return &cache{
|
return &cache{
|
||||||
newTSCache(api, gcConfidence),
|
newTSCache(api, gcConfidence),
|
||||||
newMessageCache(api),
|
newMessageCache(api),
|
||||||
|
@ -28,7 +28,7 @@ type TipSetObserver interface {
|
|||||||
Revert(ctx context.Context, from, to *types.TipSet) error
|
Revert(ctx context.Context, from, to *types.TipSet) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventAPI interface {
|
type EventHelperAPI interface {
|
||||||
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
|
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
|
||||||
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
||||||
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
||||||
@ -47,7 +47,7 @@ type Events struct {
|
|||||||
*hcEvents
|
*hcEvents
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEventsWithGCConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) (*Events, error) {
|
func newEventsWithGCConfidence(ctx context.Context, api EventHelperAPI, gcConfidence abi.ChainEpoch) (*Events, error) {
|
||||||
cache := newCache(api, gcConfidence)
|
cache := newCache(api, gcConfidence)
|
||||||
|
|
||||||
ob := newObserver(cache, gcConfidence)
|
ob := newObserver(cache, gcConfidence)
|
||||||
@ -61,7 +61,7 @@ func newEventsWithGCConfidence(ctx context.Context, api EventAPI, gcConfidence a
|
|||||||
return &Events{ob, he, headChange}, nil
|
return &Events{ob, he, headChange}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEvents(ctx context.Context, api EventAPI) (*Events, error) {
|
func NewEvents(ctx context.Context, api EventHelperAPI) (*Events, error) {
|
||||||
gcConfidence := 2 * build.ForkLengthThreshold
|
gcConfidence := 2 * build.ForkLengthThreshold
|
||||||
return newEventsWithGCConfidence(ctx, api, gcConfidence)
|
return newEventsWithGCConfidence(ctx, api, gcConfidence)
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,7 @@ type queuedEvent struct {
|
|||||||
// Manages chain head change events, which may be forward (new tipset added to
|
// Manages chain head change events, which may be forward (new tipset added to
|
||||||
// chain) or backward (chain branch discarded in favour of heavier branch)
|
// chain) or backward (chain branch discarded in favour of heavier branch)
|
||||||
type hcEvents struct {
|
type hcEvents struct {
|
||||||
cs EventAPI
|
cs EventHelperAPI
|
||||||
|
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
lastTs *types.TipSet
|
lastTs *types.TipSet
|
||||||
@ -94,7 +94,7 @@ type hcEvents struct {
|
|||||||
watcherEvents
|
watcherEvents
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHCEvents(api EventAPI, obs *observer) *hcEvents {
|
func newHCEvents(api EventHelperAPI, obs *observer) *hcEvents {
|
||||||
e := &hcEvents{
|
e := &hcEvents{
|
||||||
cs: api,
|
cs: api,
|
||||||
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
|
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
|
||||||
@ -326,14 +326,14 @@ type headChangeAPI interface {
|
|||||||
|
|
||||||
// watcherEvents watches for a state change
|
// watcherEvents watches for a state change
|
||||||
type watcherEvents struct {
|
type watcherEvents struct {
|
||||||
cs EventAPI
|
cs EventHelperAPI
|
||||||
hcAPI headChangeAPI
|
hcAPI headChangeAPI
|
||||||
|
|
||||||
lk sync.RWMutex
|
lk sync.RWMutex
|
||||||
matchers map[triggerID]StateMatchFunc
|
matchers map[triggerID]StateMatchFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatcherEvents(hcAPI headChangeAPI, cs EventAPI) watcherEvents {
|
func newWatcherEvents(hcAPI headChangeAPI, cs EventHelperAPI) watcherEvents {
|
||||||
return watcherEvents{
|
return watcherEvents{
|
||||||
cs: cs,
|
cs: cs,
|
||||||
hcAPI: hcAPI,
|
hcAPI: hcAPI,
|
||||||
@ -426,14 +426,14 @@ func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler,
|
|||||||
|
|
||||||
// messageEvents watches for message calls to actors
|
// messageEvents watches for message calls to actors
|
||||||
type messageEvents struct {
|
type messageEvents struct {
|
||||||
cs EventAPI
|
cs EventHelperAPI
|
||||||
hcAPI headChangeAPI
|
hcAPI headChangeAPI
|
||||||
|
|
||||||
lk sync.RWMutex
|
lk sync.RWMutex
|
||||||
matchers map[triggerID]MsgMatchFunc
|
matchers map[triggerID]MsgMatchFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMessageEvents(hcAPI headChangeAPI, cs EventAPI) messageEvents {
|
func newMessageEvents(hcAPI headChangeAPI, cs EventHelperAPI) messageEvents {
|
||||||
return messageEvents{
|
return messageEvents{
|
||||||
cs: cs,
|
cs: cs,
|
||||||
hcAPI: hcAPI,
|
hcAPI: hcAPI,
|
||||||
|
@ -22,7 +22,7 @@ type heightHandler struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type heightEvents struct {
|
type heightEvents struct {
|
||||||
api EventAPI
|
api EventHelperAPI
|
||||||
gcConfidence abi.ChainEpoch
|
gcConfidence abi.ChainEpoch
|
||||||
|
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
@ -31,7 +31,7 @@ type heightEvents struct {
|
|||||||
lastGc abi.ChainEpoch //nolint:structcheck
|
lastGc abi.ChainEpoch //nolint:structcheck
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHeightEvents(api EventAPI, obs *observer, gcConfidence abi.ChainEpoch) *heightEvents {
|
func newHeightEvents(api EventHelperAPI, obs *observer, gcConfidence abi.ChainEpoch) *heightEvents {
|
||||||
he := &heightEvents{
|
he := &heightEvents{
|
||||||
api: api,
|
api: api,
|
||||||
gcConfidence: gcConfidence,
|
gcConfidence: gcConfidence,
|
||||||
|
@ -358,7 +358,7 @@ func (fcs *fakeCS) advance(rev, app, drop int, msgs map[int]cid.Cid, nulls ...in
|
|||||||
fcs.sub(nil, nil)
|
fcs.sub(nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ EventAPI = &fakeCS{}
|
var _ EventHelperAPI = &fakeCS{}
|
||||||
|
|
||||||
func TestAt(t *testing.T) {
|
func TestAt(t *testing.T) {
|
||||||
//stm: @EVENTS_HEIGHT_CHAIN_AT_001, @EVENTS_HEIGHT_REVERT_001
|
//stm: @EVENTS_HEIGHT_CHAIN_AT_001, @EVENTS_HEIGHT_REVERT_001
|
||||||
|
@ -11,13 +11,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type messageCache struct {
|
type messageCache struct {
|
||||||
api EventAPI
|
api EventHelperAPI
|
||||||
|
|
||||||
blockMsgLk sync.Mutex
|
blockMsgLk sync.Mutex
|
||||||
blockMsgCache *arc.ARCCache[cid.Cid, *api.BlockMessages]
|
blockMsgCache *arc.ARCCache[cid.Cid, *api.BlockMessages]
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMessageCache(a EventAPI) *messageCache {
|
func newMessageCache(a EventHelperAPI) *messageCache {
|
||||||
blsMsgCache, _ := arc.NewARC[cid.Cid, *api.BlockMessages](500)
|
blsMsgCache, _ := arc.NewARC[cid.Cid, *api.BlockMessages](500)
|
||||||
|
|
||||||
return &messageCache{
|
return &messageCache{
|
||||||
|
@ -17,7 +17,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type observer struct {
|
type observer struct {
|
||||||
api EventAPI
|
api EventHelperAPI
|
||||||
|
|
||||||
gcConfidence abi.ChainEpoch
|
gcConfidence abi.ChainEpoch
|
||||||
|
|
||||||
|
@ -265,15 +265,14 @@ func ConfigFullNode(c interface{}) Option {
|
|||||||
),
|
),
|
||||||
|
|
||||||
// Actor event filtering support
|
// Actor event filtering support
|
||||||
Override(new(events.EventAPI), From(new(modules.EventAPI))),
|
Override(new(events.EventHelperAPI), From(new(modules.EventHelperAPI))),
|
||||||
|
|
||||||
Override(new(*filter.EventFilterManager), modules.EventFilterManager(cfg.Fevm)),
|
Override(new(*filter.EventFilterManager), modules.EventFilterManager(cfg.Fevm)),
|
||||||
|
|
||||||
// in lite-mode Eth api is provided by gateway
|
// in lite-mode Eth api is provided by gateway
|
||||||
ApplyIf(isFullNode,
|
ApplyIf(isFullNode,
|
||||||
If(cfg.Fevm.EnableEthRPC,
|
If(cfg.Fevm.EnableEthRPC,
|
||||||
Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm)),
|
Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm)),
|
||||||
Override(new(full.EthEventAPI), modules.EthEventAPI(cfg.Fevm)),
|
Override(new(full.EthEventAPI), modules.EthEventHandler(cfg.Fevm)),
|
||||||
),
|
),
|
||||||
If(!cfg.Fevm.EnableEthRPC,
|
If(!cfg.Fevm.EnableEthRPC,
|
||||||
Override(new(full.EthModuleAPI), &full.EthModuleDummy{}),
|
Override(new(full.EthModuleAPI), &full.EthModuleDummy{}),
|
||||||
@ -283,7 +282,7 @@ func ConfigFullNode(c interface{}) Option {
|
|||||||
|
|
||||||
ApplyIf(isFullNode,
|
ApplyIf(isFullNode,
|
||||||
If(cfg.Fevm.EnableActorEventsAPI,
|
If(cfg.Fevm.EnableActorEventsAPI,
|
||||||
Override(new(full.ActorEventAPI), modules.ActorEventAPI(cfg.Fevm)),
|
Override(new(full.ActorEventAPI), modules.ActorEventHandler(cfg.Fevm)),
|
||||||
),
|
),
|
||||||
If(!cfg.Fevm.EnableActorEventsAPI,
|
If(!cfg.Fevm.EnableActorEventsAPI,
|
||||||
Override(new(full.ActorEventAPI), &full.ActorEventDummy{}),
|
Override(new(full.ActorEventAPI), &full.ActorEventDummy{}),
|
||||||
|
@ -26,20 +26,20 @@ var (
|
|||||||
_ ActorEventAPI = *new(api.Gateway)
|
_ ActorEventAPI = *new(api.Gateway)
|
||||||
)
|
)
|
||||||
|
|
||||||
type ActorEvent struct {
|
type ActorEventHandler struct {
|
||||||
EventFilterManager *filter.EventFilterManager
|
EventFilterManager *filter.EventFilterManager
|
||||||
MaxFilterHeightRange abi.ChainEpoch
|
MaxFilterHeightRange abi.ChainEpoch
|
||||||
Chain *store.ChainStore
|
Chain *store.ChainStore
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ ActorEventAPI = (*ActorEvent)(nil)
|
var _ ActorEventAPI = (*ActorEventHandler)(nil)
|
||||||
|
|
||||||
type ActorEventsAPI struct {
|
type ActorEventsAPI struct {
|
||||||
fx.In
|
fx.In
|
||||||
ActorEventAPI
|
ActorEventAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActorEvent) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
func (a *ActorEventHandler) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||||
if a.EventFilterManager == nil {
|
if a.EventFilterManager == nil {
|
||||||
return nil, api.ErrNotSupported
|
return nil, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -66,7 +66,7 @@ type filterParams struct {
|
|||||||
TipSetCid cid.Cid
|
TipSetCid cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActorEvent) parseFilter(f *types.ActorEventFilter) (*filterParams, error) {
|
func (a *ActorEventHandler) parseFilter(f *types.ActorEventFilter) (*filterParams, error) {
|
||||||
if f.TipSetCid != nil {
|
if f.TipSetCid != nil {
|
||||||
if len(f.FromEpoch) != 0 || len(f.ToEpoch) != 0 {
|
if len(f.FromEpoch) != 0 || len(f.ToEpoch) != 0 {
|
||||||
return nil, fmt.Errorf("cannot specify both TipSetCid and FromEpoch/ToEpoch")
|
return nil, fmt.Errorf("cannot specify both TipSetCid and FromEpoch/ToEpoch")
|
||||||
@ -101,7 +101,7 @@ func (a *ActorEvent) parseFilter(f *types.ActorEventFilter) (*filterParams, erro
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActorEvent) SubscribeActorEvents(ctx context.Context, f *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
if a.EventFilterManager == nil {
|
if a.EventFilterManager == nil {
|
||||||
return nil, api.ErrNotSupported
|
return nil, api.ErrNotSupported
|
||||||
}
|
}
|
@ -135,7 +135,7 @@ type EthModule struct {
|
|||||||
|
|
||||||
var _ EthModuleAPI = (*EthModule)(nil)
|
var _ EthModuleAPI = (*EthModule)(nil)
|
||||||
|
|
||||||
type EthEvent struct {
|
type EthEventHandler struct {
|
||||||
Chain *store.ChainStore
|
Chain *store.ChainStore
|
||||||
EventFilterManager *filter.EventFilterManager
|
EventFilterManager *filter.EventFilterManager
|
||||||
TipSetFilterManager *filter.TipSetFilterManager
|
TipSetFilterManager *filter.TipSetFilterManager
|
||||||
@ -146,7 +146,7 @@ type EthEvent struct {
|
|||||||
SubscribtionCtx context.Context
|
SubscribtionCtx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ EthEventAPI = (*EthEvent)(nil)
|
var _ EthEventAPI = (*EthEventHandler)(nil)
|
||||||
|
|
||||||
type EthAPI struct {
|
type EthAPI struct {
|
||||||
fx.In
|
fx.In
|
||||||
@ -1207,7 +1207,7 @@ func (a *EthModule) EthCall(ctx context.Context, tx ethtypes.EthCall, blkParam e
|
|||||||
return ethtypes.EthBytes{}, nil
|
return ethtypes.EthBytes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthGetLogs(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*ethtypes.EthFilterResult, error) {
|
func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*ethtypes.EthFilterResult, error) {
|
||||||
if e.EventFilterManager == nil {
|
if e.EventFilterManager == nil {
|
||||||
return nil, api.ErrNotSupported
|
return nil, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1224,7 +1224,7 @@ func (e *EthEvent) EthGetLogs(ctx context.Context, filterSpec *ethtypes.EthFilte
|
|||||||
return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI)
|
return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) {
|
func (e *EthEventHandler) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) {
|
||||||
if e.FilterStore == nil {
|
if e.FilterStore == nil {
|
||||||
return nil, api.ErrNotSupported
|
return nil, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1246,7 +1246,7 @@ func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilte
|
|||||||
return nil, xerrors.Errorf("unknown filter type")
|
return nil, xerrors.Errorf("unknown filter type")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) {
|
func (e *EthEventHandler) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) {
|
||||||
if e.FilterStore == nil {
|
if e.FilterStore == nil {
|
||||||
return nil, api.ErrNotSupported
|
return nil, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1317,7 +1317,7 @@ func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRan
|
|||||||
return minHeight, maxHeight, nil
|
return minHeight, maxHeight, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*filter.EventFilter, error) {
|
func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*filter.EventFilter, error) {
|
||||||
var (
|
var (
|
||||||
minHeight abi.ChainEpoch
|
minHeight abi.ChainEpoch
|
||||||
maxHeight abi.ChainEpoch
|
maxHeight abi.ChainEpoch
|
||||||
@ -1370,7 +1370,7 @@ func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEvent
|
|||||||
return keysWithCodec
|
return keysWithCodec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (ethtypes.EthFilterID, error) {
|
func (e *EthEventHandler) EthNewFilter(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (ethtypes.EthFilterID, error) {
|
||||||
if e.FilterStore == nil || e.EventFilterManager == nil {
|
if e.FilterStore == nil || e.EventFilterManager == nil {
|
||||||
return ethtypes.EthFilterID{}, api.ErrNotSupported
|
return ethtypes.EthFilterID{}, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1392,7 +1392,7 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *ethtypes.EthFil
|
|||||||
return ethtypes.EthFilterID(f.ID()), nil
|
return ethtypes.EthFilterID(f.ID()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error) {
|
func (e *EthEventHandler) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error) {
|
||||||
if e.FilterStore == nil || e.TipSetFilterManager == nil {
|
if e.FilterStore == nil || e.TipSetFilterManager == nil {
|
||||||
return ethtypes.EthFilterID{}, api.ErrNotSupported
|
return ethtypes.EthFilterID{}, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1415,7 +1415,7 @@ func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID,
|
|||||||
return ethtypes.EthFilterID(f.ID()), nil
|
return ethtypes.EthFilterID(f.ID()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error) {
|
func (e *EthEventHandler) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error) {
|
||||||
if e.FilterStore == nil || e.MemPoolFilterManager == nil {
|
if e.FilterStore == nil || e.MemPoolFilterManager == nil {
|
||||||
return ethtypes.EthFilterID{}, api.ErrNotSupported
|
return ethtypes.EthFilterID{}, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1438,7 +1438,7 @@ func (e *EthEvent) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes
|
|||||||
return ethtypes.EthFilterID(f.ID()), nil
|
return ethtypes.EthFilterID(f.ID()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error) {
|
func (e *EthEventHandler) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error) {
|
||||||
if e.FilterStore == nil {
|
if e.FilterStore == nil {
|
||||||
return false, api.ErrNotSupported
|
return false, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1458,7 +1458,7 @@ func (e *EthEvent) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilter
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) uninstallFilter(ctx context.Context, f filter.Filter) error {
|
func (e *EthEventHandler) uninstallFilter(ctx context.Context, f filter.Filter) error {
|
||||||
switch f.(type) {
|
switch f.(type) {
|
||||||
case *filter.EventFilter:
|
case *filter.EventFilter:
|
||||||
err := e.EventFilterManager.Remove(ctx, f.ID())
|
err := e.EventFilterManager.Remove(ctx, f.ID())
|
||||||
@ -1488,7 +1488,7 @@ const (
|
|||||||
EthSubscribeEventTypePendingTransactions = "newPendingTransactions"
|
EthSubscribeEventTypePendingTransactions = "newPendingTransactions"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
|
func (e *EthEventHandler) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
|
||||||
params, err := jsonrpc.DecodeParams[ethtypes.EthSubscribeParams](p)
|
params, err := jsonrpc.DecodeParams[ethtypes.EthSubscribeParams](p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("decoding params: %w", err)
|
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("decoding params: %w", err)
|
||||||
@ -1564,7 +1564,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty
|
|||||||
return sub.id, nil
|
return sub.id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) {
|
func (e *EthEventHandler) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) {
|
||||||
if e.SubManager == nil {
|
if e.SubManager == nil {
|
||||||
return false, api.ErrNotSupported
|
return false, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
@ -1578,7 +1578,7 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscripti
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GC runs a garbage collection loop, deleting filters that have not been used within the ttl window
|
// GC runs a garbage collection loop, deleting filters that have not been used within the ttl window
|
||||||
func (e *EthEvent) GC(ctx context.Context, ttl time.Duration) {
|
func (e *EthEventHandler) GC(ctx context.Context, ttl time.Duration) {
|
||||||
if e.FilterStore == nil {
|
if e.FilterStore == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -23,20 +23,20 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EventAPI struct {
|
type EventHelperAPI struct {
|
||||||
fx.In
|
fx.In
|
||||||
|
|
||||||
full.ChainAPI
|
full.ChainAPI
|
||||||
full.StateAPI
|
full.StateAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ events.EventAPI = &EventAPI{}
|
var _ events.EventHelperAPI = &EventHelperAPI{}
|
||||||
|
|
||||||
func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.EthEvent, error) {
|
func EthEventHandler(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.EthEventHandler, error) {
|
||||||
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.EthEvent, error) {
|
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.EthEventHandler, error) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
ee := &full.EthEvent{
|
ee := &full.EthEventHandler{
|
||||||
Chain: cs,
|
Chain: cs,
|
||||||
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
|
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
|
||||||
SubscribtionCtx: ctx,
|
SubscribtionCtx: ctx,
|
||||||
@ -94,8 +94,8 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, full.ChainAPI) (*filter.EventFilterManager, error) {
|
func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, full.ChainAPI) (*filter.EventFilterManager, error) {
|
||||||
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, chainapi full.ChainAPI) (*filter.EventFilterManager, error) {
|
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, chainapi full.ChainAPI) (*filter.EventFilterManager, error) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
// Enable indexing of actor events
|
// Enable indexing of actor events
|
||||||
@ -164,9 +164,9 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ActorEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEvent, error) {
|
func ActorEventHandler(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) {
|
||||||
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEvent, error) {
|
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) {
|
||||||
ee := &full.ActorEvent{
|
ee := &full.ActorEventHandler{
|
||||||
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
|
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
|
||||||
Chain: cs,
|
Chain: cs,
|
||||||
}
|
}
|
||||||
|
@ -21,8 +21,8 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI) (*full.EthModule, error) {
|
func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI) (*full.EthModule, error) {
|
||||||
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI) (*full.EthModule, error) {
|
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI) (*full.EthModule, error) {
|
||||||
sqlitePath, err := r.SqlitePath()
|
sqlitePath, err := r.SqlitePath()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Loading…
Reference in New Issue
Block a user