Adjust actor event API after review

This commit is contained in:
Rod Vagg 2024-02-22 19:16:06 +11:00 committed by Phi-rjan
parent 4a0e3b877c
commit 13d7411bd9
24 changed files with 398 additions and 219 deletions

View File

@ -919,7 +919,7 @@ type FullNode interface {
// This API also allows clients to read all historical events matching the given filter before // This API also allows clients to read all historical events matching the given filter before
// any real-time events are written to the response stream. // any real-time events are written to the response stream.
// NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW // NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read
} }
// reverse interface to the client, called after EthSubscribe // reverse interface to the client, called after EthSubscribe

View File

@ -131,5 +131,5 @@ type Gateway interface {
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error) EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
} }

View File

@ -430,25 +430,8 @@ func init() {
}, },
}, },
}, },
FromEpoch: "earliest", FromHeight: epochPtr(1010),
ToEpoch: "latest", ToHeight: epochPtr(1020),
})
addExample(&types.SubActorEventFilter{
Filter: types.ActorEventFilter{
Addresses: []address.Address{addr},
Fields: map[string][]types.ActorEventBlock{
"abc": {
{
Codec: 0x51,
Value: []byte("ddata"),
},
},
},
FromEpoch: "earliest",
ToEpoch: "latest",
},
Prefill: true,
}) })
} }
@ -555,6 +538,11 @@ func exampleStruct(method string, t, parent reflect.Type) interface{} {
return ns.Interface() return ns.Interface()
} }
func epochPtr(ei int64) *abi.ChainEpoch {
ep := abi.ChainEpoch(ei)
return &ep
}
type Visitor struct { type Visitor struct {
Root string Root string
Methods map[string]ast.Node Methods map[string]ast.Node

View File

@ -3984,7 +3984,7 @@ func (mr *MockFullNodeMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 in
} }
// SubscribeActorEvents mocks base method. // SubscribeActorEvents mocks base method.
func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SubscribeActorEvents", arg0, arg1) ret := m.ctrl.Call(m, "SubscribeActorEvents", arg0, arg1)
ret0, _ := ret[0].(<-chan *types.ActorEvent) ret0, _ := ret[0].(<-chan *types.ActorEvent)

View File

@ -591,7 +591,7 @@ type FullNodeMethods struct {
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `perm:"read"` StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `perm:"read"`
SubscribeActorEvents func(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"` SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"`
SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"` SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"`
@ -835,7 +835,7 @@ type GatewayMethods struct {
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `` StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) ``
SubscribeActorEvents func(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) `` SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) ``
Version func(p0 context.Context) (APIVersion, error) `` Version func(p0 context.Context) (APIVersion, error) ``
@ -4000,14 +4000,14 @@ func (s *FullNodeStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p
return nil, ErrNotSupported return nil, ErrNotSupported
} }
func (s *FullNodeStruct) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { func (s *FullNodeStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
if s.Internal.SubscribeActorEvents == nil { if s.Internal.SubscribeActorEvents == nil {
return nil, ErrNotSupported return nil, ErrNotSupported
} }
return s.Internal.SubscribeActorEvents(p0, p1) return s.Internal.SubscribeActorEvents(p0, p1)
} }
func (s *FullNodeStub) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { func (s *FullNodeStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
return nil, ErrNotSupported return nil, ErrNotSupported
} }
@ -5276,14 +5276,14 @@ func (s *GatewayStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p3
return nil, ErrNotSupported return nil, ErrNotSupported
} }
func (s *GatewayStruct) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { func (s *GatewayStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
if s.Internal.SubscribeActorEvents == nil { if s.Internal.SubscribeActorEvents == nil {
return nil, ErrNotSupported return nil, ErrNotSupported
} }
return s.Internal.SubscribeActorEvents(p0, p1) return s.Internal.SubscribeActorEvents(p0, p1)
} }
func (s *GatewayStub) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { func (s *GatewayStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
return nil, ErrNotSupported return nil, ErrNotSupported
} }

Binary file not shown.

Binary file not shown.

View File

@ -18,16 +18,6 @@ type ActorEventBlock struct {
Value []byte `json:"value"` Value []byte `json:"value"`
} }
type SubActorEventFilter struct {
Filter ActorEventFilter `json:"filter"`
// If true, all available matching historical events will be written to the response stream
// before any new real-time events that match the given filter are written.
// If `Prefill` is true and `FromEpoch` is set to latest, the pre-fill operation will become a no-op.
// if `Prefill` is false and `FromEpoch` is set to earliest, historical events will still be sent to the client.
Prefill bool `json:"prefill"`
}
type ActorEventFilter struct { type ActorEventFilter struct {
// Matches events from one of these actors, or any actor if empty. // Matches events from one of these actors, or any actor if empty.
// For now, this MUST be a Filecoin address. // For now, this MUST be a Filecoin address.
@ -37,17 +27,17 @@ type ActorEventFilter struct {
// If the value is an empty slice, the filter will match on the key only, accepting any value. // If the value is an empty slice, the filter will match on the key only, accepting any value.
Fields map[string][]ActorEventBlock `json:"fields,omitempty"` Fields map[string][]ActorEventBlock `json:"fields,omitempty"`
// Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first, // The height of the earliest tipset to include in the query. If empty, the query starts at the
// Optional, default: "latest". // last finalized tipset.
FromEpoch string `json:"fromEpoch,omitempty"` FromHeight *abi.ChainEpoch `json:"fromHeight,omitempty"`
// Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first, // The height of the latest tipset to include in the query. If empty, the query ends at the
// Optional, default: "latest". // latest tipset.
ToEpoch string `json:"toEpoch,omitempty"` ToHeight *abi.ChainEpoch `json:"toHeight,omitempty"`
// Restricts events returned to those emitted from messages contained in this tipset. // Restricts events returned to those emitted from messages contained in this tipset.
// If `TipSetCid` is present in the filter criteria, then neither `FromEpoch` nor `ToEpoch` are allowed. // If `TipSetKey` is legt empty in the filter criteria, then neither `FromHeight` nor `ToHeight` are allowed.
TipSetCid *cid.Cid `json:"tipsetCid,omitempty"` TipSetKey *TipSetKey `json:"tipsetKey,omitempty"`
} }
type ActorEvent struct { type ActorEvent struct {
@ -55,7 +45,7 @@ type ActorEvent struct {
Entries []EventEntry `json:"entries"` Entries []EventEntry `json:"entries"`
// Filecoin address of the actor that emitted this event. // Filecoin address of the actor that emitted this event.
EmitterAddr address.Address `json:"emitter"` Emitter address.Address `json:"emitter"`
// Reverted is set to true if the message that produced this event was reverted because of a network re-org // Reverted is set to true if the message that produced this event was reverted because of a network re-org
// in that case, the event should be considered as reverted as well. // in that case, the event should be considered as reverted as well.
@ -64,8 +54,8 @@ type ActorEvent struct {
// Height of the tipset that contained the message that produced this event. // Height of the tipset that contained the message that produced this event.
Height abi.ChainEpoch `json:"height"` Height abi.ChainEpoch `json:"height"`
// CID of the tipset that contained the message that produced this event. // The tipset that contained the message that produced this event.
TipSetCid cid.Cid `json:"tipsetCid"` TipSetKey TipSetKey `json:"tipsetKey"`
// CID of message that produced this event. // CID of message that produced this event.
MsgCid cid.Cid `json:"msgCid"` MsgCid cid.Cid `json:"msgCid"`

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin" builtintypes "github.com/filecoin-project/go-state-types/builtin"
) )
@ -29,11 +30,11 @@ func TestActorEventJson(t *testing.T) {
Value: []byte("value2"), Value: []byte("value2"),
}, },
}, },
EmitterAddr: randomF4Addr(t, rng), Emitter: randomF4Addr(t, rng),
Reverted: false, Reverted: false,
Height: 1001, Height: 1001,
TipSetCid: randomCid(t, rng), TipSetKey: NewTipSetKey(randomCid(t, rng)),
MsgCid: randomCid(t, rng), MsgCid: randomCid(t, rng),
} }
bz, err := json.Marshal(in) bz, err := json.Marshal(in)
@ -46,7 +47,7 @@ func TestActorEventJson(t *testing.T) {
require.Equal(t, in, out) require.Equal(t, in, out)
s := ` s := `
{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetCid":{"/":"bafkqacx3dag26sfht3qlcdi"},"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}} {"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetKey":[{"/":"bafkqacx3dag26sfht3qlcdi"}],"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}}
` `
var out2 ActorEvent var out2 ActorEvent
err = json.Unmarshal([]byte(s), &out2) err = json.Unmarshal([]byte(s), &out2)
@ -77,9 +78,9 @@ func TestActorEventBlockJson(t *testing.T) {
} }
func TestSubActorEventFilterJson(t *testing.T) { func TestSubActorEventFilterJson(t *testing.T) {
c := randomCid(t, pseudo.New(pseudo.NewSource(0))) tsk := NewTipSetKey(randomCid(t, pseudo.New(pseudo.NewSource(0))))
from := "earliest" from := abi.ChainEpoch(0)
to := "latest" to := abi.ChainEpoch(100)
f := ActorEventFilter{ f := ActorEventFilter{
Addresses: []address.Address{ Addresses: []address.Address{
randomF4Addr(t, pseudo.New(pseudo.NewSource(0))), randomF4Addr(t, pseudo.New(pseudo.NewSource(0))),
@ -99,16 +100,17 @@ func TestSubActorEventFilterJson(t *testing.T) {
}, },
}, },
}, },
FromEpoch: from, FromHeight: &from,
ToEpoch: to, ToHeight: &to,
TipSetCid: &c, TipSetKey: &tsk,
} }
bz, err := json.Marshal(f) bz, err := json.Marshal(f)
require.NoError(t, err) require.NoError(t, err)
require.NotEmpty(t, bz) require.NotEmpty(t, bz)
t.Logf("%s", bz)
s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromEpoch":"earliest","toEpoch":"latest","tipsetCid":{"/":"bafkqacqbst64f6rp7taeduy"}}` s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromHeight":0,"toHeight":100,"tipsetKey":[{"/":"bafkqacqbst64f6rp7taeduy"}]}`
var out ActorEventFilter var out ActorEventFilter
err = json.Unmarshal([]byte(s), &out) err = json.Unmarshal([]byte(s), &out)
require.NoError(t, err) require.NoError(t, err)

View File

@ -3411,8 +3411,8 @@ Inputs:
} }
] ]
}, },
"fromEpoch": "earliest", "fromHeight": 1010,
"toEpoch": "latest" "toHeight": 1020
} }
] ]
``` ```
@ -3432,9 +3432,14 @@ Response:
"emitter": "f01234", "emitter": "f01234",
"reverted": true, "reverted": true,
"height": 10101, "height": 10101,
"tipsetCid": { "tipsetKey": [
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" {
}, "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
],
"msgCid": { "msgCid": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
} }
@ -8836,22 +8841,19 @@ Inputs:
```json ```json
[ [
{ {
"filter": { "addresses": [
"addresses": [ "f01234"
"f01234" ],
], "fields": {
"fields": { "abc": [
"abc": [ {
{ "codec": 81,
"codec": 81, "value": "ZGRhdGE="
"value": "ZGRhdGE=" }
} ]
]
},
"fromEpoch": "earliest",
"toEpoch": "latest"
}, },
"prefill": true "fromHeight": 1010,
"toHeight": 1020
} }
] ]
``` ```
@ -8870,9 +8872,14 @@ Response:
"emitter": "f01234", "emitter": "f01234",
"reverted": true, "reverted": true,
"height": 10101, "height": 10101,
"tipsetCid": { "tipsetKey": [
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" {
}, "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
{
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
}
],
"msgCid": { "msgCid": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
} }

View File

@ -330,13 +330,6 @@
# env var: LOTUS_FEVM_ENABLEETHRPC # env var: LOTUS_FEVM_ENABLEETHRPC
#EnableEthRPC = false #EnableEthRPC = false
# EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
# This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
#
# type: bool
# env var: LOTUS_FEVM_ENABLEACTOREVENTSAPI
#EnableActorEventsAPI = false
# EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days # EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
# Set to 0 to keep all mappings # Set to 0 to keep all mappings
# #
@ -396,6 +389,17 @@
#DatabasePath = "" #DatabasePath = ""
[ActorEvents]
# EnableActorEventsAPI enables the Actor events API that enables clients to consume events
# emitted by (smart contracts + built-in Actors).
# This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be
# disabled by setting their respective Disable* options in Fevm.Events.
#
# type: bool
# env var: LOTUS_ACTOREVENTS_ENABLEACTOREVENTSAPI
#EnableActorEventsAPI = false
[Index] [Index]
# EXPERIMENTAL FEATURE. USE WITH CAUTION # EXPERIMENTAL FEATURE. USE WITH CAUTION
# EnableMsgIndex enables indexing of messages on chain. # EnableMsgIndex enables indexing of messages on chain.

View File

@ -148,7 +148,7 @@ type TargetAPI interface {
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error) EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
} }
var _ TargetAPI = *new(api.FullNode) // gateway depends on latest var _ TargetAPI = *new(api.FullNode) // gateway depends on latest

View File

@ -444,7 +444,7 @@ func (gw *Node) GetActorEvents(ctx context.Context, filter *types.ActorEventFilt
return gw.target.GetActorEvents(ctx, filter) return gw.target.GetActorEvents(ctx, filter)
} }
func (gw *Node) SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { func (gw *Node) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
if err := gw.limit(ctx, stateRateLimitTokens); err != nil { if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return nil, err return nil, err
} }

View File

@ -151,25 +151,19 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
/* --- Setup subscription channels for ActorEvents --- */ /* --- Setup subscription channels for ActorEvents --- */
// subscribe only to miner's actor events // subscribe only to miner's actor events
minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
Filter: types.ActorEventFilter{ Addresses: []address.Address{miner.ActorAddr},
Addresses: []address.Address{miner.ActorAddr},
},
Prefill: true,
}) })
require.NoError(t, err) require.NoError(t, err)
// subscribe only to sector-activated events // subscribe only to sector-activated events
sectorActivatedCbor := stringToEventKey(t, "sector-activated") sectorActivatedCbor := stringToEventKey(t, "sector-activated")
sectorActivatedEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ sectorActivatedEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
Filter: types.ActorEventFilter{ Fields: map[string][]types.ActorEventBlock{
Fields: map[string][]types.ActorEventBlock{ "$type": {
"$type": { {Codec: 0x51, Value: sectorActivatedCbor},
{Codec: 0x51, Value: sectorActivatedCbor},
},
}, },
}, },
Prefill: true,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -303,6 +297,13 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
head, err := client.ChainHead(ctx) head, err := client.ChainHead(ctx)
require.NoError(t, err) require.NoError(t, err)
// subscribe to actor events up until the current head
initialEventsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
FromHeight: epochPtr(0),
ToHeight: epochPtr(int64(head.Height())),
})
require.NoError(t, err)
so, err := miner.SectorAddPieceToAny(ctx, pieceSize, bytes.NewReader(pieceData), piece.PieceDealInfo{ so, err := miner.SectorAddPieceToAny(ctx, pieceSize, bytes.NewReader(pieceData), piece.PieceDealInfo{
PublishCid: nil, PublishCid: nil,
DealID: 0, DealID: 0,
@ -395,8 +396,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
// construct ActorEvents from GetActorEvents API // construct ActorEvents from GetActorEvents API
allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
FromEpoch: "earliest", FromHeight: epochPtr(0),
ToEpoch: "latest",
}) })
require.NoError(t, err) require.NoError(t, err)
fmt.Println("Events from GetActorEvents:") fmt.Println("Events from GetActorEvents:")
@ -414,7 +414,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
} }
var allMinerEvts []*types.ActorEvent var allMinerEvts []*types.ActorEvent
for _, evt := range eventsFromMessages { for _, evt := range eventsFromMessages {
if evt.EmitterAddr == miner.ActorAddr { if evt.Emitter == miner.ActorAddr {
allMinerEvts = append(allMinerEvts, evt) allMinerEvts = append(allMinerEvts, evt)
} }
} }
@ -423,7 +423,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
// construct ActorEvents from subscription channel for just the sector-activated events // construct ActorEvents from subscription channel for just the sector-activated events
var prefillSectorActivatedEvts []*types.ActorEvent var prefillSectorActivatedEvts []*types.ActorEvent
for evt := range sectorActivatedEvtsCh { for evt := range sectorActivatedEvtsChan {
prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, evt) prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, evt)
if len(prefillSectorActivatedEvts) == 2 { if len(prefillSectorActivatedEvts) == 2 {
break break
@ -442,17 +442,25 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
// compare events from messages and receipts with events from subscription channel // compare events from messages and receipts with events from subscription channel
require.Equal(t, sectorActivatedEvts, prefillSectorActivatedEvts) require.Equal(t, sectorActivatedEvts, prefillSectorActivatedEvts)
// check that our `ToHeight` filter works as expected
var initialEvents []*types.ActorEvent
// TODO: this should probably close the channel when it knows it's done
for evt := range initialEventsChan {
initialEvents = append(initialEvents, evt)
// sector-precommitted, sector-activated, verifier-balance, verifier-balance
if len(initialEvents) == 4 {
break
}
}
require.Equal(t, eventsFromMessages[0:4], initialEvents)
// construct ActorEvents from subscription channel for all actor events // construct ActorEvents from subscription channel for all actor events
allEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ allEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
Filter: types.ActorEventFilter{ FromHeight: epochPtr(0),
FromEpoch: "earliest",
ToEpoch: "latest",
},
Prefill: true,
}) })
require.NoError(t, err) require.NoError(t, err)
var prefillEvts []*types.ActorEvent var prefillEvts []*types.ActorEvent
for evt := range allEvtsCh { for evt := range allEvtsChan {
prefillEvts = append(prefillEvts, evt) prefillEvts = append(prefillEvts, evt)
if len(prefillEvts) == len(eventsFromMessages) { if len(prefillEvts) == len(eventsFromMessages) {
break break
@ -524,16 +532,14 @@ func buildActorEventsFromMessages(ctx context.Context, t *testing.T, node v1api.
// for each event // for each event
addr, err := address.NewIDAddress(uint64(evt.Emitter)) addr, err := address.NewIDAddress(uint64(evt.Emitter))
require.NoError(t, err) require.NoError(t, err)
tsCid, err := ts.Key().Cid()
require.NoError(t, err)
actorEvents = append(actorEvents, &types.ActorEvent{ actorEvents = append(actorEvents, &types.ActorEvent{
Entries: evt.Entries, Entries: evt.Entries,
EmitterAddr: addr, Emitter: addr,
Reverted: false, Reverted: false,
Height: ts.Height(), Height: ts.Height(),
TipSetCid: tsCid, TipSetKey: ts.Key(),
MsgCid: m.Cid, MsgCid: m.Cid,
}) })
} }
} }
@ -547,7 +553,7 @@ func printEvents(ctx context.Context, t *testing.T, node v1api.FullNode, events
entryStrings := []string{ entryStrings := []string{
fmt.Sprintf("height=%d", event.Height), fmt.Sprintf("height=%d", event.Height),
fmt.Sprintf("msg=%s", event.MsgCid), fmt.Sprintf("msg=%s", event.MsgCid),
fmt.Sprintf("emitter=%s", event.EmitterAddr), fmt.Sprintf("emitter=%s", event.Emitter),
fmt.Sprintf("reverted=%t", event.Reverted), fmt.Sprintf("reverted=%t", event.Reverted),
} }
for _, e := range event.Entries { for _, e := range event.Entries {
@ -848,3 +854,8 @@ func TestOnboardRawPieceSnap(t *testing.T) {
miner.WaitSectorsProving(ctx, toCheck) miner.WaitSectorsProving(ctx, toCheck)
} }
func epochPtr(ei int64) *abi.ChainEpoch {
ep := abi.ChainEpoch(ei)
return &ep
}

View File

@ -65,8 +65,8 @@ var DefaultNodeOpts = nodeOpts{
// test defaults // test defaults
cfg.Fevm.EnableEthRPC = true cfg.Fevm.EnableEthRPC = true
cfg.Fevm.EnableActorEventsAPI = true
cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64 cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64
cfg.ActorEvents.EnableActorEventsAPI = true
return nil return nil
}, },
}, },

View File

@ -281,10 +281,10 @@ func ConfigFullNode(c interface{}) Option {
), ),
ApplyIf(isFullNode, ApplyIf(isFullNode,
If(cfg.Fevm.EnableActorEventsAPI, If(cfg.ActorEvents.EnableActorEventsAPI,
Override(new(full.ActorEventAPI), modules.ActorEventHandler(cfg.Fevm)), Override(new(full.ActorEventAPI), modules.ActorEventHandler(cfg.ActorEvents.EnableActorEventsAPI, cfg.Fevm)),
), ),
If(!cfg.Fevm.EnableActorEventsAPI, If(!cfg.ActorEvents.EnableActorEventsAPI,
Override(new(full.ActorEventAPI), &full.ActorEventDummy{}), Override(new(full.ActorEventAPI), &full.ActorEventDummy{}),
), ),
), ),

View File

@ -109,7 +109,6 @@ func DefaultFullNode() *FullNode {
Cluster: *DefaultUserRaftConfig(), Cluster: *DefaultUserRaftConfig(),
Fevm: FevmConfig{ Fevm: FevmConfig{
EnableEthRPC: false, EnableEthRPC: false,
EnableActorEventsAPI: false,
EthTxHashMappingLifetimeDays: 0, EthTxHashMappingLifetimeDays: 0,
Events: Events{ Events: Events{
DisableRealTimeFilterAPI: false, DisableRealTimeFilterAPI: false,
@ -120,6 +119,9 @@ func DefaultFullNode() *FullNode {
MaxFilterHeightRange: 2880, // conservative limit of one day MaxFilterHeightRange: 2880, // conservative limit of one day
}, },
}, },
ActorEvents: ActorEventsConfig{
EnableActorEventsAPI: false,
},
} }
} }

View File

@ -29,6 +29,17 @@ var Doc = map[string][]DocField{
Comment: ``, Comment: ``,
}, },
}, },
"ActorEventsConfig": {
{
Name: "EnableActorEventsAPI",
Type: "bool",
Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events
emitted by (smart contracts + built-in Actors).
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be
disabled by setting their respective Disable* options in Fevm.Events.`,
},
},
"ApisConfig": { "ApisConfig": {
{ {
Name: "ChainApiInfo", Name: "ChainApiInfo",
@ -452,13 +463,6 @@ rewards. This address should have adequate funds to cover gas fees.`,
Type: "bool", Type: "bool",
Comment: `EnableEthRPC enables eth_ rpc, and enables storing a mapping of eth transaction hashes to filecoin message Cids. Comment: `EnableEthRPC enables eth_ rpc, and enables storing a mapping of eth transaction hashes to filecoin message Cids.
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`,
},
{
Name: "EnableActorEventsAPI",
Type: "bool",
Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`, This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`,
}, },
{ {
@ -512,6 +516,12 @@ Set to 0 to keep all mappings`,
Comment: ``, Comment: ``,
}, },
{
Name: "ActorEvents",
Type: "ActorEventsConfig",
Comment: ``,
},
{ {
Name: "Index", Name: "Index",
Type: "IndexConfig", Type: "IndexConfig",

View File

@ -28,6 +28,7 @@ type FullNode struct {
Chainstore Chainstore Chainstore Chainstore
Cluster UserRaftConfig Cluster UserRaftConfig
Fevm FevmConfig Fevm FevmConfig
ActorEvents ActorEventsConfig
Index IndexConfig Index IndexConfig
FaultReporter FaultReporterConfig FaultReporter FaultReporterConfig
} }
@ -786,10 +787,6 @@ type FevmConfig struct {
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
EnableEthRPC bool EnableEthRPC bool
// EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
EnableActorEventsAPI bool
// EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days // EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
// Set to 0 to keep all mappings // Set to 0 to keep all mappings
EthTxHashMappingLifetimeDays int EthTxHashMappingLifetimeDays int
@ -833,6 +830,14 @@ type Events struct {
// Set upper bound on index size // Set upper bound on index size
} }
type ActorEventsConfig struct {
// EnableActorEventsAPI enables the Actor events API that enables clients to consume events
// emitted by (smart contracts + built-in Actors).
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be
// disabled by setting their respective Disable* options in Fevm.Events.
EnableActorEventsAPI bool
}
type IndexConfig struct { type IndexConfig struct {
// EXPERIMENTAL FEATURE. USE WITH CAUTION // EXPERIMENTAL FEATURE. USE WITH CAUTION
// EnableMsgIndex enables indexing of messages on chain. // EnableMsgIndex enables indexing of messages on chain.
@ -856,6 +861,7 @@ type HarmonyDB struct {
// The port to find Yugabyte. Blank for default. // The port to find Yugabyte. Blank for default.
Port string Port string
} }
type FaultReporterConfig struct { type FaultReporterConfig struct {
// EnableConsensusFaultReporter controls whether the node will monitor and // EnableConsensusFaultReporter controls whether the node will monitor and
// report consensus faults. When enabled, the node will watch for malicious // report consensus faults. When enabled, the node will watch for malicious

View File

@ -0,0 +1,112 @@
package full
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
)
func TestParseHeightRange(t *testing.T) {
epochPtr := func(i int) *abi.ChainEpoch {
e := abi.ChainEpoch(i)
return &e
}
tcs := map[string]struct {
heaviest abi.ChainEpoch
from *abi.ChainEpoch
to *abi.ChainEpoch
maxRange abi.ChainEpoch
minOut abi.ChainEpoch
maxOut abi.ChainEpoch
errStr string
}{
"fails when both are specified and range is greater than max allowed range": {
heaviest: 100,
from: epochPtr(256),
to: epochPtr(512),
maxRange: 10,
minOut: 0,
maxOut: 0,
errStr: "too large",
},
"fails when min is specified and range is greater than max allowed range": {
heaviest: 500,
from: epochPtr(16),
to: nil,
maxRange: 10,
minOut: 0,
maxOut: 0,
errStr: "'from' height is too far in the past",
},
"fails when max is specified and range is greater than max allowed range": {
heaviest: 500,
from: nil,
to: epochPtr(65536),
maxRange: 10,
minOut: 0,
maxOut: 0,
errStr: "'to' height is too far in the future",
},
"fails when from is greater than to": {
heaviest: 100,
from: epochPtr(512),
to: epochPtr(256),
maxRange: 10,
minOut: 0,
maxOut: 0,
errStr: "must be after",
},
"works when range is valid (nil from)": {
heaviest: 500,
from: nil,
to: epochPtr(48),
maxRange: 1000,
minOut: -1,
maxOut: 48,
},
"works when range is valid (nil to)": {
heaviest: 500,
from: epochPtr(0),
to: nil,
maxRange: 1000,
minOut: 0,
maxOut: -1,
},
"works when range is valid (nil from and to)": {
heaviest: 500,
from: nil,
to: nil,
maxRange: 1000,
minOut: -1,
maxOut: -1,
},
"works when range is valid and specified": {
heaviest: 500,
from: epochPtr(16),
to: epochPtr(48),
maxRange: 1000,
minOut: 16,
maxOut: 48,
},
}
for name, tc := range tcs {
tc2 := tc
t.Run(name, func(t *testing.T) {
min, max, err := parseHeightRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange)
require.Equal(t, tc2.minOut, min)
require.Equal(t, tc2.maxOut, max)
if tc2.errStr != "" {
fmt.Println(err)
require.Error(t, err)
require.Contains(t, err.Error(), tc2.errStr)
} else {
require.NoError(t, err)
}
})
}
}

View File

@ -3,7 +3,6 @@ package full
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"go.uber.org/fx" "go.uber.org/fx"
@ -18,7 +17,7 @@ import (
type ActorEventAPI interface { type ActorEventAPI interface {
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
} }
var ( var (
@ -39,18 +38,26 @@ type ActorEventsAPI struct {
ActorEventAPI ActorEventAPI
} }
func (a *ActorEventHandler) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) { func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
if a.EventFilterManager == nil { if a.EventFilterManager == nil {
return nil, api.ErrNotSupported return nil, api.ErrNotSupported
} }
params, err := a.parseFilter(filter) if evtFilter == nil {
evtFilter = &types.ActorEventFilter{}
}
params, err := a.parseFilter(*evtFilter)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Create a temporary filter // Install a filter just for this call, collect events, remove the filter
f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, filter.Addresses, filter.Fields, false)
tipSetCid, err := params.GetTipSetCid()
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -63,33 +70,34 @@ func (a *ActorEventHandler) GetActorEvents(ctx context.Context, filter *types.Ac
type filterParams struct { type filterParams struct {
MinHeight abi.ChainEpoch MinHeight abi.ChainEpoch
MaxHeight abi.ChainEpoch MaxHeight abi.ChainEpoch
TipSetCid cid.Cid TipSetKey types.TipSetKey
} }
func (a *ActorEventHandler) parseFilter(f *types.ActorEventFilter) (*filterParams, error) { func (fp filterParams) GetTipSetCid() (cid.Cid, error) {
if f.TipSetCid != nil { if fp.TipSetKey.IsEmpty() {
if len(f.FromEpoch) != 0 || len(f.ToEpoch) != 0 { return cid.Undef, nil
return nil, fmt.Errorf("cannot specify both TipSetCid and FromEpoch/ToEpoch") }
return fp.TipSetKey.Cid()
}
func (a *ActorEventHandler) parseFilter(f types.ActorEventFilter) (*filterParams, error) {
if f.TipSetKey != nil && !f.TipSetKey.IsEmpty() {
if f.FromHeight != nil || f.ToHeight != nil {
return nil, fmt.Errorf("cannot specify both TipSetKey and FromHeight/ToHeight")
} }
tsk := types.EmptyTSK
if f.TipSetKey != nil {
tsk = *f.TipSetKey
}
return &filterParams{ return &filterParams{
MinHeight: 0, MinHeight: 0,
MaxHeight: 0, MaxHeight: 0,
TipSetCid: *f.TipSetCid, TipSetKey: tsk,
}, nil }, nil
} }
from := f.FromEpoch min, max, err := parseHeightRange(a.Chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.MaxFilterHeightRange)
if len(from) != 0 && from != "latest" && from != "earliest" && !strings.HasPrefix(from, "0x") {
from = "0x" + from
}
to := f.ToEpoch
if len(to) != 0 && to != "latest" && to != "earliest" && !strings.HasPrefix(to, "0x") {
to = "0x" + to
}
min, max, err := parseBlockRange(a.Chain.GetHeaviestTipSet().Height(), &from, &to, a.MaxFilterHeightRange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -97,21 +105,69 @@ func (a *ActorEventHandler) parseFilter(f *types.ActorEventFilter) (*filterParam
return &filterParams{ return &filterParams{
MinHeight: min, MinHeight: min,
MaxHeight: max, MaxHeight: max,
TipSetCid: cid.Undef, TipSetKey: types.EmptyTSK,
}, nil }, nil
} }
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { // parseHeightRange is similar to eth's parseBlockRange but with slightly different semantics but
// results in equivalent values that we can plug in to the EventFilterManager.
//
// * Uses "height", allowing for nillable values rather than strings
// * No "latest" and "earliest", those are now represented by nil on the way in and -1 on the way out
// * No option for hex representation
func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEpoch, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
if fromHeight != nil && *fromHeight < 0 {
return 0, 0, fmt.Errorf("range 'from' must be greater than or equal to 0")
}
if fromHeight == nil {
minHeight = -1
} else {
minHeight = *fromHeight
}
if toHeight == nil {
maxHeight = -1
} else {
maxHeight = *toHeight
}
// Validate height ranges are within limits set by node operator
if minHeight == -1 && maxHeight > 0 {
// Here the client is looking for events between the head and some future height
if maxHeight-heaviest > maxRange {
return 0, 0, fmt.Errorf("invalid epoch range: 'to' height is too far in the future (maximum: %d)", maxRange)
}
} else if minHeight >= 0 && maxHeight == -1 {
// Here the client is looking for events between some time in the past and the current head
if heaviest-minHeight > maxRange {
return 0, 0, fmt.Errorf("invalid epoch range: 'from' height is too far in the past (maximum: %d)", maxRange)
}
} else if minHeight >= 0 && maxHeight >= 0 {
if minHeight > maxHeight {
return 0, 0, fmt.Errorf("invalid epoch range: 'to' height (%d) must be after 'from' height (%d)", minHeight, maxHeight)
} else if maxHeight-minHeight > maxRange {
return 0, 0, fmt.Errorf("invalid epoch range: range between to and 'from' heights is too large (maximum: %d)", maxRange)
}
}
return minHeight, maxHeight, nil
}
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
if a.EventFilterManager == nil { if a.EventFilterManager == nil {
return nil, api.ErrNotSupported return nil, api.ErrNotSupported
} }
if evtFilter == nil {
params, err := a.parseFilter(&f.Filter) evtFilter = &types.ActorEventFilter{}
}
params, err := a.parseFilter(*evtFilter)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, f.Filter.Addresses, f.Filter.Fields, false) tipSetCid, err := params.GetTipSetCid()
if err != nil {
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
}
fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -128,30 +184,29 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.S
_ = a.EventFilterManager.Remove(ctx, fm.ID()) _ = a.EventFilterManager.Remove(ctx, fm.ID())
}() }()
if f.Prefill { evs, err := getCollected(ctx, fm)
evs, err := getCollected(ctx, fm) if err != nil {
if err != nil { log.Errorf("failed to get collected events: %w", err)
log.Errorf("failed to get collected events: %w", err) return
return }
}
for _, ev := range evs { for _, ev := range evs {
ev := ev ev := ev
select { select {
case out <- ev: case out <- ev:
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
log.Errorf("closing event subscription due to slow reader") // TODO: need to fix this, buffer of 25 isn't going to work for prefill without a _really_ fast client or a small number of events
return log.Errorf("closing event subscription due to slow reader")
} return
} }
} }
in := make(chan interface{}, 256) in := make(chan interface{}, 256)
fm.SetSubChannel(in) fm.SetSubChannel(in)
for { for ctx.Err() == nil {
select { select {
case val, ok := <-in: case val, ok := <-in:
if !ok { if !ok {
@ -164,24 +219,19 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.S
log.Errorf("got unexpected value from event filter: %T", val) log.Errorf("got unexpected value from event filter: %T", val)
return return
} }
c, err := ce.TipSetKey.Cid()
if err != nil {
log.Errorf("failed to get tipset cid: %w", err)
return
}
ev := &types.ActorEvent{ ev := &types.ActorEvent{
Entries: ce.Entries, Entries: ce.Entries,
EmitterAddr: ce.EmitterAddr, Emitter: ce.EmitterAddr,
Reverted: ce.Reverted, Reverted: ce.Reverted,
Height: ce.Height, Height: ce.Height,
TipSetCid: c, TipSetKey: ce.TipSetKey,
MsgCid: ce.MsgCid, MsgCid: ce.MsgCid,
} }
select { select {
case out <- ev: case out <- ev:
default: default: // TODO: need to fix this to be more intelligent about the consumption rate vs the accumulation rate
log.Errorf("closing event subscription due to slow reader") log.Errorf("closing event subscription due to slow reader")
return return
} }
@ -204,22 +254,14 @@ func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEve
var out []*types.ActorEvent var out []*types.ActorEvent
for _, e := range ces { for _, e := range ces {
e := e out = append(out, &types.ActorEvent{
c, err := e.TipSetKey.Cid() Entries: e.Entries,
if err != nil { Emitter: e.EmitterAddr,
return nil, fmt.Errorf("failed to get tipset cid: %w", err) Reverted: e.Reverted,
} Height: e.Height,
TipSetKey: e.TipSetKey,
ev := &types.ActorEvent{ MsgCid: e.MsgCid,
Entries: e.Entries, })
EmitterAddr: e.EmitterAddr,
Reverted: e.Reverted,
Height: e.Height,
TipSetCid: c,
MsgCid: e.MsgCid,
}
out = append(out, ev)
} }
return out, nil return out, nil

View File

@ -198,7 +198,7 @@ func (a *ActorEventDummy) GetActorEvents(ctx context.Context, filter *types.Acto
return nil, ErrActorEventModuleDisabled return nil, ErrActorEventModuleDisabled
} }
func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
return nil, ErrActorEventModuleDisabled return nil, ErrActorEventModuleDisabled
} }

View File

@ -1264,6 +1264,11 @@ func (e *EthEventHandler) EthGetFilterLogs(ctx context.Context, id ethtypes.EthF
return nil, xerrors.Errorf("wrong filter type") return nil, xerrors.Errorf("wrong filter type")
} }
// parseBlockRange is similar to actor event's parseHeightRange but with slightly different semantics
//
// * "block" instead of "height"
// * strings that can have "latest" and "earliest" and nil
// * hex strings for actual heights
func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) { func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 { if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 {
minHeight = heaviest minHeight = heaviest

View File

@ -164,14 +164,14 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc
} }
} }
func ActorEventHandler(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) { func ActorEventHandler(enable bool, fevmCfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) {
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) { return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) {
ee := &full.ActorEventHandler{ ee := &full.ActorEventHandler{
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange), MaxFilterHeightRange: abi.ChainEpoch(fevmCfg.Events.MaxFilterHeightRange),
Chain: cs, Chain: cs,
} }
if !cfg.EnableActorEventsAPI || cfg.Events.DisableRealTimeFilterAPI { if !enable || fevmCfg.Events.DisableRealTimeFilterAPI {
// all Actor events functionality is disabled // all Actor events functionality is disabled
return ee, nil return ee, nil
} }