lotus/node/modules/actorevent.go

184 lines
5.5 KiB
Go
Raw Normal View History

package modules
import (
"context"
2024-02-07 08:17:46 +00:00
"fmt"
"path/filepath"
"time"
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
type EventAPI struct {
fx.In
full.ChainAPI
full.StateAPI
}
var _ events.EventAPI = &EventAPI{}
2023-12-19 11:07:08 +00:00
func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.EthEvent, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.EthEvent, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
ee := &full.EthEvent{
Chain: cs,
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
2023-01-26 14:20:49 +00:00
SubscribtionCtx: ctx,
}
if !cfg.EnableEthRPC || cfg.Events.DisableRealTimeFilterAPI {
// all event functionality is disabled
// the historic filter API relies on the real time one
return ee, nil
}
ee.SubManager = &full.EthSubscriptionManager{
Chain: cs,
StateAPI: stateapi,
ChainAPI: chainapi,
}
ee.FilterStore = filter.NewMemFilterStore(cfg.Events.MaxFilters)
// Start garbage collection for filters
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go ee.GC(ctx, time.Duration(cfg.Events.FilterTTL))
return nil
},
})
2023-12-19 11:07:08 +00:00
ee.TipSetFilterManager = &filter.TipSetFilterManager{
MaxFilterResults: cfg.Events.MaxFilterResults,
}
ee.MemPoolFilterManager = &filter.MemPoolFilterManager{
MaxFilterResults: cfg.Events.MaxFilterResults,
}
ee.EventFilterManager = fm
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
ev, err := events.NewEvents(ctx, &evapi)
if err != nil {
return err
}
// ignore returned tipsets
_ = ev.Observe(ee.TipSetFilterManager)
ch, err := mp.Updates(ctx)
if err != nil {
return err
}
go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch)
return nil
},
})
return ee, nil
}
}
func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, full.ChainAPI) (*filter.EventFilterManager, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, chainapi full.ChainAPI) (*filter.EventFilterManager, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
// Enable indexing of actor events
var eventIndex *filter.EventIndex
if !cfg.Events.DisableHistoricFilterAPI {
var dbPath string
if cfg.Events.DatabasePath == "" {
sqlitePath, err := r.SqlitePath()
if err != nil {
return nil, err
}
dbPath = filepath.Join(sqlitePath, "events.db")
} else {
dbPath = cfg.Events.DatabasePath
}
var err error
eventIndex, err = filter.NewEventIndex(ctx, dbPath, chainapi.Chain)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
return eventIndex.Close()
},
})
}
2023-12-19 11:07:08 +00:00
fm := &filter.EventFilterManager{
ChainStore: cs,
EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true
2024-02-07 08:17:46 +00:00
// TODO:
// We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}
actor, err := sm.LoadActor(ctx, idAddr, ts)
if err != nil || actor.Address == nil {
2024-02-07 08:17:46 +00:00
return idAddr, true
}
2024-02-07 08:17:46 +00:00
fmt.Println("")
return *actor.Address, true
},
MaxFilterResults: cfg.Events.MaxFilterResults,
}
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
ev, err := events.NewEvents(ctx, &evapi)
if err != nil {
return err
}
2023-12-19 11:07:08 +00:00
_ = ev.Observe(fm)
return nil
},
})
2023-12-19 11:07:08 +00:00
return fm, nil
}
}
func ActorEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEvent, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEvent, error) {
ee := &full.ActorEvent{
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
2024-02-07 08:17:46 +00:00
Chain: cs,
2023-12-19 11:07:08 +00:00
}
2024-02-07 08:17:46 +00:00
if !cfg.EnableActorEventsAPI || cfg.Events.DisableRealTimeFilterAPI {
2023-12-19 11:07:08 +00:00
// all Actor events functionality is disabled
return ee, nil
}
ee.EventFilterManager = fm
return ee, nil
}
}