lotus/node/impl/full/actor_events.go

269 lines
7.1 KiB
Go
Raw Normal View History

2023-12-19 11:07:08 +00:00
package full
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/filter"
2024-02-07 08:17:46 +00:00
"github.com/filecoin-project/lotus/chain/store"
2023-12-19 11:07:08 +00:00
"github.com/filecoin-project/lotus/chain/types"
)
type ActorEventAPI interface {
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
2024-02-22 08:16:06 +00:00
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
2023-12-19 11:07:08 +00:00
}
var (
_ ActorEventAPI = *new(api.FullNode)
_ ActorEventAPI = *new(api.Gateway)
)
type ActorEventHandler struct {
2023-12-19 11:07:08 +00:00
EventFilterManager *filter.EventFilterManager
MaxFilterHeightRange abi.ChainEpoch
2024-02-07 08:17:46 +00:00
Chain *store.ChainStore
2023-12-19 11:07:08 +00:00
}
var _ ActorEventAPI = (*ActorEventHandler)(nil)
2023-12-19 11:07:08 +00:00
type ActorEventsAPI struct {
fx.In
ActorEventAPI
}
2024-02-22 08:16:06 +00:00
func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
2023-12-19 11:07:08 +00:00
if a.EventFilterManager == nil {
return nil, api.ErrNotSupported
}
2024-02-22 08:16:06 +00:00
if evtFilter == nil {
evtFilter = &types.ActorEventFilter{}
}
params, err := a.parseFilter(*evtFilter)
2024-02-07 08:17:46 +00:00
if err != nil {
return nil, err
}
2024-02-22 08:16:06 +00:00
// Install a filter just for this call, collect events, remove the filter
tipSetCid, err := params.GetTipSetCid()
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
2023-12-19 11:07:08 +00:00
if err != nil {
return nil, err
}
evs, err := getCollected(ctx, f)
_ = a.EventFilterManager.Remove(ctx, f.ID())
return evs, err
}
2024-02-07 08:17:46 +00:00
type filterParams struct {
MinHeight abi.ChainEpoch
MaxHeight abi.ChainEpoch
2024-02-22 08:16:06 +00:00
TipSetKey types.TipSetKey
}
func (fp filterParams) GetTipSetCid() (cid.Cid, error) {
if fp.TipSetKey.IsEmpty() {
return cid.Undef, nil
}
return fp.TipSetKey.Cid()
2024-02-07 08:17:46 +00:00
}
2024-02-22 08:16:06 +00:00
func (a *ActorEventHandler) parseFilter(f types.ActorEventFilter) (*filterParams, error) {
if f.TipSetKey != nil && !f.TipSetKey.IsEmpty() {
if f.FromHeight != nil || f.ToHeight != nil {
return nil, fmt.Errorf("cannot specify both TipSetKey and FromHeight/ToHeight")
2024-02-07 08:17:46 +00:00
}
2024-02-22 08:16:06 +00:00
tsk := types.EmptyTSK
if f.TipSetKey != nil {
tsk = *f.TipSetKey
}
2024-02-07 08:17:46 +00:00
return &filterParams{
MinHeight: 0,
MaxHeight: 0,
2024-02-22 08:16:06 +00:00
TipSetKey: tsk,
2024-02-07 08:17:46 +00:00
}, nil
}
2024-02-22 08:16:06 +00:00
min, max, err := parseHeightRange(a.Chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.MaxFilterHeightRange)
2024-02-07 08:17:46 +00:00
if err != nil {
return nil, err
}
return &filterParams{
MinHeight: min,
MaxHeight: max,
2024-02-22 08:16:06 +00:00
TipSetKey: types.EmptyTSK,
2024-02-07 08:17:46 +00:00
}, nil
}
2024-02-22 08:16:06 +00:00
// parseHeightRange is similar to eth's parseBlockRange but with slightly different semantics but
// results in equivalent values that we can plug in to the EventFilterManager.
//
// * Uses "height", allowing for nillable values rather than strings
// * No "latest" and "earliest", those are now represented by nil on the way in and -1 on the way out
// * No option for hex representation
func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEpoch, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
if fromHeight != nil && *fromHeight < 0 {
return 0, 0, fmt.Errorf("range 'from' must be greater than or equal to 0")
}
if fromHeight == nil {
minHeight = -1
} else {
minHeight = *fromHeight
}
if toHeight == nil {
maxHeight = -1
} else {
maxHeight = *toHeight
}
// Validate height ranges are within limits set by node operator
if minHeight == -1 && maxHeight > 0 {
// Here the client is looking for events between the head and some future height
if maxHeight-heaviest > maxRange {
return 0, 0, fmt.Errorf("invalid epoch range: 'to' height is too far in the future (maximum: %d)", maxRange)
}
} else if minHeight >= 0 && maxHeight == -1 {
// Here the client is looking for events between some time in the past and the current head
if heaviest-minHeight > maxRange {
return 0, 0, fmt.Errorf("invalid epoch range: 'from' height is too far in the past (maximum: %d)", maxRange)
}
} else if minHeight >= 0 && maxHeight >= 0 {
if minHeight > maxHeight {
return 0, 0, fmt.Errorf("invalid epoch range: 'to' height (%d) must be after 'from' height (%d)", minHeight, maxHeight)
} else if maxHeight-minHeight > maxRange {
return 0, 0, fmt.Errorf("invalid epoch range: range between to and 'from' heights is too large (maximum: %d)", maxRange)
}
}
return minHeight, maxHeight, nil
}
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
2023-12-19 11:07:08 +00:00
if a.EventFilterManager == nil {
return nil, api.ErrNotSupported
}
2024-02-22 08:16:06 +00:00
if evtFilter == nil {
evtFilter = &types.ActorEventFilter{}
}
params, err := a.parseFilter(*evtFilter)
2024-02-07 08:17:46 +00:00
if err != nil {
return nil, err
}
2024-02-22 08:16:06 +00:00
tipSetCid, err := params.GetTipSetCid()
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
2023-12-19 11:07:08 +00:00
if err != nil {
return nil, err
}
out := make(chan *types.ActorEvent, 25)
go func() {
defer func() {
// Tell the caller we're done
close(out)
// Unsubscribe.
fm.ClearSubChannel()
_ = a.EventFilterManager.Remove(ctx, fm.ID())
}()
2024-02-22 08:16:06 +00:00
evs, err := getCollected(ctx, fm)
if err != nil {
log.Errorf("failed to get collected events: %w", err)
return
}
2023-12-19 11:07:08 +00:00
2024-02-22 08:16:06 +00:00
for _, ev := range evs {
ev := ev
select {
case out <- ev:
case <-ctx.Done():
return
default:
// TODO: need to fix this, buffer of 25 isn't going to work for prefill without a _really_ fast client or a small number of events
log.Errorf("closing event subscription due to slow reader")
return
2023-12-19 11:07:08 +00:00
}
}
in := make(chan interface{}, 256)
fm.SetSubChannel(in)
2024-02-22 08:16:06 +00:00
for ctx.Err() == nil {
2023-12-19 11:07:08 +00:00
select {
case val, ok := <-in:
if !ok {
// Shutting down.
return
}
ce, ok := val.(*filter.CollectedEvent)
if !ok {
log.Errorf("got unexpected value from event filter: %T", val)
return
}
ev := &types.ActorEvent{
2024-02-22 08:16:06 +00:00
Entries: ce.Entries,
Emitter: ce.EmitterAddr,
Reverted: ce.Reverted,
Height: ce.Height,
TipSetKey: ce.TipSetKey,
MsgCid: ce.MsgCid,
2023-12-19 11:07:08 +00:00
}
select {
case out <- ev:
2024-02-22 08:16:06 +00:00
default: // TODO: need to fix this to be more intelligent about the consumption rate vs the accumulation rate
2023-12-19 11:07:08 +00:00
log.Errorf("closing event subscription due to slow reader")
return
}
if len(out) > 5 {
log.Warnf("event subscription is slow, has %d buffered entries", len(out))
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEvent, error) {
ces := f.TakeCollectedEvents(ctx)
var out []*types.ActorEvent
for _, e := range ces {
2024-02-22 08:16:06 +00:00
out = append(out, &types.ActorEvent{
Entries: e.Entries,
Emitter: e.EmitterAddr,
Reverted: e.Reverted,
Height: e.Height,
TipSetKey: e.TipSetKey,
MsgCid: e.MsgCid,
})
2023-12-19 11:07:08 +00:00
}
return out, nil
}