diff --git a/api/api_full.go b/api/api_full.go index 6a10d05a5..c07688ac0 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -919,7 +919,7 @@ type FullNode interface { // This API also allows clients to read all historical events matching the given filter before // any real-time events are written to the response stream. // NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW - SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read + SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read } // reverse interface to the client, called after EthSubscribe diff --git a/api/api_gateway.go b/api/api_gateway.go index 49fe7d7f1..e71a8b712 100644 --- a/api/api_gateway.go +++ b/api/api_gateway.go @@ -131,5 +131,5 @@ type Gateway interface { EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) - SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) + SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) } diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 4d485c442..bf76444e6 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -430,25 +430,8 @@ func init() { }, }, }, - FromEpoch: "earliest", - ToEpoch: "latest", - }) - - addExample(&types.SubActorEventFilter{ - Filter: types.ActorEventFilter{ - Addresses: []address.Address{addr}, - Fields: map[string][]types.ActorEventBlock{ - "abc": { - { - Codec: 0x51, - Value: []byte("ddata"), - }, - }, - }, - FromEpoch: "earliest", - ToEpoch: "latest", - }, - Prefill: true, + FromHeight: epochPtr(1010), + ToHeight: epochPtr(1020), }) } @@ -555,6 +538,11 @@ func exampleStruct(method string, t, parent reflect.Type) interface{} { return ns.Interface() } +func epochPtr(ei int64) *abi.ChainEpoch { + ep := abi.ChainEpoch(ei) + return &ep +} + type Visitor struct { Root string Methods map[string]ast.Node diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index cc0359afc..01e0cec45 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -3984,7 +3984,7 @@ func (mr *MockFullNodeMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 in } // SubscribeActorEvents mocks base method. -func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SubscribeActorEvents", arg0, arg1) ret0, _ := ret[0].(<-chan *types.ActorEvent) diff --git a/api/proxy_gen.go b/api/proxy_gen.go index b2e9cf97f..5bbd32aa9 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -591,7 +591,7 @@ type FullNodeMethods struct { StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `perm:"read"` - SubscribeActorEvents func(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"` + SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"` SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"` @@ -835,7 +835,7 @@ type GatewayMethods struct { StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `` - SubscribeActorEvents func(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) `` + SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) `` Version func(p0 context.Context) (APIVersion, error) `` @@ -4000,14 +4000,14 @@ func (s *FullNodeStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p return nil, ErrNotSupported } -func (s *FullNodeStruct) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +func (s *FullNodeStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { if s.Internal.SubscribeActorEvents == nil { return nil, ErrNotSupported } return s.Internal.SubscribeActorEvents(p0, p1) } -func (s *FullNodeStub) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +func (s *FullNodeStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { return nil, ErrNotSupported } @@ -5276,14 +5276,14 @@ func (s *GatewayStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p3 return nil, ErrNotSupported } -func (s *GatewayStruct) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +func (s *GatewayStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { if s.Internal.SubscribeActorEvents == nil { return nil, ErrNotSupported } return s.Internal.SubscribeActorEvents(p0, p1) } -func (s *GatewayStub) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +func (s *GatewayStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { return nil, ErrNotSupported } diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index badaa1984..10e58d209 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index 2c85d0040..dd9fe19c5 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/chain/types/actor_event.go b/chain/types/actor_event.go index 127dfaea2..8987b00cc 100644 --- a/chain/types/actor_event.go +++ b/chain/types/actor_event.go @@ -18,16 +18,6 @@ type ActorEventBlock struct { Value []byte `json:"value"` } -type SubActorEventFilter struct { - Filter ActorEventFilter `json:"filter"` - - // If true, all available matching historical events will be written to the response stream - // before any new real-time events that match the given filter are written. - // If `Prefill` is true and `FromEpoch` is set to latest, the pre-fill operation will become a no-op. - // if `Prefill` is false and `FromEpoch` is set to earliest, historical events will still be sent to the client. - Prefill bool `json:"prefill"` -} - type ActorEventFilter struct { // Matches events from one of these actors, or any actor if empty. // For now, this MUST be a Filecoin address. @@ -37,17 +27,17 @@ type ActorEventFilter struct { // If the value is an empty slice, the filter will match on the key only, accepting any value. Fields map[string][]ActorEventBlock `json:"fields,omitempty"` - // Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first, - // Optional, default: "latest". - FromEpoch string `json:"fromEpoch,omitempty"` + // The height of the earliest tipset to include in the query. If empty, the query starts at the + // last finalized tipset. + FromHeight *abi.ChainEpoch `json:"fromHeight,omitempty"` - // Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first, - // Optional, default: "latest". - ToEpoch string `json:"toEpoch,omitempty"` + // The height of the latest tipset to include in the query. If empty, the query ends at the + // latest tipset. + ToHeight *abi.ChainEpoch `json:"toHeight,omitempty"` // Restricts events returned to those emitted from messages contained in this tipset. - // If `TipSetCid` is present in the filter criteria, then neither `FromEpoch` nor `ToEpoch` are allowed. - TipSetCid *cid.Cid `json:"tipsetCid,omitempty"` + // If `TipSetKey` is legt empty in the filter criteria, then neither `FromHeight` nor `ToHeight` are allowed. + TipSetKey *TipSetKey `json:"tipsetKey,omitempty"` } type ActorEvent struct { @@ -55,7 +45,7 @@ type ActorEvent struct { Entries []EventEntry `json:"entries"` // Filecoin address of the actor that emitted this event. - EmitterAddr address.Address `json:"emitter"` + Emitter address.Address `json:"emitter"` // Reverted is set to true if the message that produced this event was reverted because of a network re-org // in that case, the event should be considered as reverted as well. @@ -64,8 +54,8 @@ type ActorEvent struct { // Height of the tipset that contained the message that produced this event. Height abi.ChainEpoch `json:"height"` - // CID of the tipset that contained the message that produced this event. - TipSetCid cid.Cid `json:"tipsetCid"` + // The tipset that contained the message that produced this event. + TipSetKey TipSetKey `json:"tipsetKey"` // CID of message that produced this event. MsgCid cid.Cid `json:"msgCid"` diff --git a/chain/types/actor_event_test.go b/chain/types/actor_event_test.go index aaac329b2..1060c010b 100644 --- a/chain/types/actor_event_test.go +++ b/chain/types/actor_event_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" builtintypes "github.com/filecoin-project/go-state-types/builtin" ) @@ -29,11 +30,11 @@ func TestActorEventJson(t *testing.T) { Value: []byte("value2"), }, }, - EmitterAddr: randomF4Addr(t, rng), - Reverted: false, - Height: 1001, - TipSetCid: randomCid(t, rng), - MsgCid: randomCid(t, rng), + Emitter: randomF4Addr(t, rng), + Reverted: false, + Height: 1001, + TipSetKey: NewTipSetKey(randomCid(t, rng)), + MsgCid: randomCid(t, rng), } bz, err := json.Marshal(in) @@ -46,7 +47,7 @@ func TestActorEventJson(t *testing.T) { require.Equal(t, in, out) s := ` -{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetCid":{"/":"bafkqacx3dag26sfht3qlcdi"},"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}} +{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetKey":[{"/":"bafkqacx3dag26sfht3qlcdi"}],"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}} ` var out2 ActorEvent err = json.Unmarshal([]byte(s), &out2) @@ -77,9 +78,9 @@ func TestActorEventBlockJson(t *testing.T) { } func TestSubActorEventFilterJson(t *testing.T) { - c := randomCid(t, pseudo.New(pseudo.NewSource(0))) - from := "earliest" - to := "latest" + tsk := NewTipSetKey(randomCid(t, pseudo.New(pseudo.NewSource(0)))) + from := abi.ChainEpoch(0) + to := abi.ChainEpoch(100) f := ActorEventFilter{ Addresses: []address.Address{ randomF4Addr(t, pseudo.New(pseudo.NewSource(0))), @@ -99,16 +100,17 @@ func TestSubActorEventFilterJson(t *testing.T) { }, }, }, - FromEpoch: from, - ToEpoch: to, - TipSetCid: &c, + FromHeight: &from, + ToHeight: &to, + TipSetKey: &tsk, } bz, err := json.Marshal(f) require.NoError(t, err) require.NotEmpty(t, bz) + t.Logf("%s", bz) - s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromEpoch":"earliest","toEpoch":"latest","tipsetCid":{"/":"bafkqacqbst64f6rp7taeduy"}}` + s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromHeight":0,"toHeight":100,"tipsetKey":[{"/":"bafkqacqbst64f6rp7taeduy"}]}` var out ActorEventFilter err = json.Unmarshal([]byte(s), &out) require.NoError(t, err) diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 80f32158e..3185442ef 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -3411,8 +3411,8 @@ Inputs: } ] }, - "fromEpoch": "earliest", - "toEpoch": "latest" + "fromHeight": 1010, + "toHeight": 1020 } ] ``` @@ -3432,9 +3432,14 @@ Response: "emitter": "f01234", "reverted": true, "height": 10101, - "tipsetCid": { - "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" - }, + "tipsetKey": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], "msgCid": { "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" } @@ -8836,22 +8841,19 @@ Inputs: ```json [ { - "filter": { - "addresses": [ - "f01234" - ], - "fields": { - "abc": [ - { - "codec": 81, - "value": "ZGRhdGE=" - } - ] - }, - "fromEpoch": "earliest", - "toEpoch": "latest" + "addresses": [ + "f01234" + ], + "fields": { + "abc": [ + { + "codec": 81, + "value": "ZGRhdGE=" + } + ] }, - "prefill": true + "fromHeight": 1010, + "toHeight": 1020 } ] ``` @@ -8870,9 +8872,14 @@ Response: "emitter": "f01234", "reverted": true, "height": 10101, - "tipsetCid": { - "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" - }, + "tipsetKey": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ], "msgCid": { "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" } diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index a403a580e..2f8a9177a 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -330,13 +330,6 @@ # env var: LOTUS_FEVM_ENABLEETHRPC #EnableEthRPC = false - # EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors). - # This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. - # - # type: bool - # env var: LOTUS_FEVM_ENABLEACTOREVENTSAPI - #EnableActorEventsAPI = false - # EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days # Set to 0 to keep all mappings # @@ -396,6 +389,17 @@ #DatabasePath = "" +[ActorEvents] + # EnableActorEventsAPI enables the Actor events API that enables clients to consume events + # emitted by (smart contracts + built-in Actors). + # This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be + # disabled by setting their respective Disable* options in Fevm.Events. + # + # type: bool + # env var: LOTUS_ACTOREVENTS_ENABLEACTOREVENTSAPI + #EnableActorEventsAPI = false + + [Index] # EXPERIMENTAL FEATURE. USE WITH CAUTION # EnableMsgIndex enables indexing of messages on chain. diff --git a/gateway/node.go b/gateway/node.go index 411820de6..f3ecb7640 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -148,7 +148,7 @@ type TargetAPI interface { EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) - SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) + SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) } var _ TargetAPI = *new(api.FullNode) // gateway depends on latest diff --git a/gateway/proxy_fil.go b/gateway/proxy_fil.go index c62d93789..e7ad3bdb4 100644 --- a/gateway/proxy_fil.go +++ b/gateway/proxy_fil.go @@ -444,7 +444,7 @@ func (gw *Node) GetActorEvents(ctx context.Context, filter *types.ActorEventFilt return gw.target.GetActorEvents(ctx, filter) } -func (gw *Node) SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +func (gw *Node) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { if err := gw.limit(ctx, stateRateLimitTokens); err != nil { return nil, err } diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index b8e9be7da..7c6ce8fbb 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -151,25 +151,19 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { /* --- Setup subscription channels for ActorEvents --- */ // subscribe only to miner's actor events - minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ - Filter: types.ActorEventFilter{ - Addresses: []address.Address{miner.ActorAddr}, - }, - Prefill: true, + minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{ + Addresses: []address.Address{miner.ActorAddr}, }) require.NoError(t, err) // subscribe only to sector-activated events sectorActivatedCbor := stringToEventKey(t, "sector-activated") - sectorActivatedEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ - Filter: types.ActorEventFilter{ - Fields: map[string][]types.ActorEventBlock{ - "$type": { - {Codec: 0x51, Value: sectorActivatedCbor}, - }, + sectorActivatedEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{ + Fields: map[string][]types.ActorEventBlock{ + "$type": { + {Codec: 0x51, Value: sectorActivatedCbor}, }, }, - Prefill: true, }) require.NoError(t, err) @@ -303,6 +297,13 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { head, err := client.ChainHead(ctx) require.NoError(t, err) + // subscribe to actor events up until the current head + initialEventsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{ + FromHeight: epochPtr(0), + ToHeight: epochPtr(int64(head.Height())), + }) + require.NoError(t, err) + so, err := miner.SectorAddPieceToAny(ctx, pieceSize, bytes.NewReader(pieceData), piece.PieceDealInfo{ PublishCid: nil, DealID: 0, @@ -395,8 +396,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { // construct ActorEvents from GetActorEvents API allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ - FromEpoch: "earliest", - ToEpoch: "latest", + FromHeight: epochPtr(0), }) require.NoError(t, err) fmt.Println("Events from GetActorEvents:") @@ -414,7 +414,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { } var allMinerEvts []*types.ActorEvent for _, evt := range eventsFromMessages { - if evt.EmitterAddr == miner.ActorAddr { + if evt.Emitter == miner.ActorAddr { allMinerEvts = append(allMinerEvts, evt) } } @@ -423,7 +423,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { // construct ActorEvents from subscription channel for just the sector-activated events var prefillSectorActivatedEvts []*types.ActorEvent - for evt := range sectorActivatedEvtsCh { + for evt := range sectorActivatedEvtsChan { prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, evt) if len(prefillSectorActivatedEvts) == 2 { break @@ -442,17 +442,25 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) { // compare events from messages and receipts with events from subscription channel require.Equal(t, sectorActivatedEvts, prefillSectorActivatedEvts) + // check that our `ToHeight` filter works as expected + var initialEvents []*types.ActorEvent + // TODO: this should probably close the channel when it knows it's done + for evt := range initialEventsChan { + initialEvents = append(initialEvents, evt) + // sector-precommitted, sector-activated, verifier-balance, verifier-balance + if len(initialEvents) == 4 { + break + } + } + require.Equal(t, eventsFromMessages[0:4], initialEvents) + // construct ActorEvents from subscription channel for all actor events - allEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ - Filter: types.ActorEventFilter{ - FromEpoch: "earliest", - ToEpoch: "latest", - }, - Prefill: true, + allEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{ + FromHeight: epochPtr(0), }) require.NoError(t, err) var prefillEvts []*types.ActorEvent - for evt := range allEvtsCh { + for evt := range allEvtsChan { prefillEvts = append(prefillEvts, evt) if len(prefillEvts) == len(eventsFromMessages) { break @@ -524,16 +532,14 @@ func buildActorEventsFromMessages(ctx context.Context, t *testing.T, node v1api. // for each event addr, err := address.NewIDAddress(uint64(evt.Emitter)) require.NoError(t, err) - tsCid, err := ts.Key().Cid() - require.NoError(t, err) actorEvents = append(actorEvents, &types.ActorEvent{ - Entries: evt.Entries, - EmitterAddr: addr, - Reverted: false, - Height: ts.Height(), - TipSetCid: tsCid, - MsgCid: m.Cid, + Entries: evt.Entries, + Emitter: addr, + Reverted: false, + Height: ts.Height(), + TipSetKey: ts.Key(), + MsgCid: m.Cid, }) } } @@ -547,7 +553,7 @@ func printEvents(ctx context.Context, t *testing.T, node v1api.FullNode, events entryStrings := []string{ fmt.Sprintf("height=%d", event.Height), fmt.Sprintf("msg=%s", event.MsgCid), - fmt.Sprintf("emitter=%s", event.EmitterAddr), + fmt.Sprintf("emitter=%s", event.Emitter), fmt.Sprintf("reverted=%t", event.Reverted), } for _, e := range event.Entries { @@ -848,3 +854,8 @@ func TestOnboardRawPieceSnap(t *testing.T) { miner.WaitSectorsProving(ctx, toCheck) } + +func epochPtr(ei int64) *abi.ChainEpoch { + ep := abi.ChainEpoch(ei) + return &ep +} diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 025334203..f96174c44 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -65,8 +65,8 @@ var DefaultNodeOpts = nodeOpts{ // test defaults cfg.Fevm.EnableEthRPC = true - cfg.Fevm.EnableActorEventsAPI = true cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64 + cfg.ActorEvents.EnableActorEventsAPI = true return nil }, }, diff --git a/node/builder_chain.go b/node/builder_chain.go index a3de7d55f..261ea6120 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -281,10 +281,10 @@ func ConfigFullNode(c interface{}) Option { ), ApplyIf(isFullNode, - If(cfg.Fevm.EnableActorEventsAPI, - Override(new(full.ActorEventAPI), modules.ActorEventHandler(cfg.Fevm)), + If(cfg.ActorEvents.EnableActorEventsAPI, + Override(new(full.ActorEventAPI), modules.ActorEventHandler(cfg.ActorEvents.EnableActorEventsAPI, cfg.Fevm)), ), - If(!cfg.Fevm.EnableActorEventsAPI, + If(!cfg.ActorEvents.EnableActorEventsAPI, Override(new(full.ActorEventAPI), &full.ActorEventDummy{}), ), ), diff --git a/node/config/def.go b/node/config/def.go index a64b6d5e8..f65c62e5c 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -109,7 +109,6 @@ func DefaultFullNode() *FullNode { Cluster: *DefaultUserRaftConfig(), Fevm: FevmConfig{ EnableEthRPC: false, - EnableActorEventsAPI: false, EthTxHashMappingLifetimeDays: 0, Events: Events{ DisableRealTimeFilterAPI: false, @@ -120,6 +119,9 @@ func DefaultFullNode() *FullNode { MaxFilterHeightRange: 2880, // conservative limit of one day }, }, + ActorEvents: ActorEventsConfig{ + EnableActorEventsAPI: false, + }, } } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 1e8252ca9..47dde35cd 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -29,6 +29,17 @@ var Doc = map[string][]DocField{ Comment: ``, }, }, + "ActorEventsConfig": { + { + Name: "EnableActorEventsAPI", + Type: "bool", + + Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events +emitted by (smart contracts + built-in Actors). +This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be +disabled by setting their respective Disable* options in Fevm.Events.`, + }, + }, "ApisConfig": { { Name: "ChainApiInfo", @@ -452,13 +463,6 @@ rewards. This address should have adequate funds to cover gas fees.`, Type: "bool", Comment: `EnableEthRPC enables eth_ rpc, and enables storing a mapping of eth transaction hashes to filecoin message Cids. -This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`, - }, - { - Name: "EnableActorEventsAPI", - Type: "bool", - - Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors). This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`, }, { @@ -512,6 +516,12 @@ Set to 0 to keep all mappings`, Comment: ``, }, + { + Name: "ActorEvents", + Type: "ActorEventsConfig", + + Comment: ``, + }, { Name: "Index", Type: "IndexConfig", diff --git a/node/config/types.go b/node/config/types.go index 6fb7389ff..ab95923d1 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -28,6 +28,7 @@ type FullNode struct { Chainstore Chainstore Cluster UserRaftConfig Fevm FevmConfig + ActorEvents ActorEventsConfig Index IndexConfig FaultReporter FaultReporterConfig } @@ -786,10 +787,6 @@ type FevmConfig struct { // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. EnableEthRPC bool - // EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors). - // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. - EnableActorEventsAPI bool - // EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days // Set to 0 to keep all mappings EthTxHashMappingLifetimeDays int @@ -833,6 +830,14 @@ type Events struct { // Set upper bound on index size } +type ActorEventsConfig struct { + // EnableActorEventsAPI enables the Actor events API that enables clients to consume events + // emitted by (smart contracts + built-in Actors). + // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be + // disabled by setting their respective Disable* options in Fevm.Events. + EnableActorEventsAPI bool +} + type IndexConfig struct { // EXPERIMENTAL FEATURE. USE WITH CAUTION // EnableMsgIndex enables indexing of messages on chain. @@ -856,6 +861,7 @@ type HarmonyDB struct { // The port to find Yugabyte. Blank for default. Port string } + type FaultReporterConfig struct { // EnableConsensusFaultReporter controls whether the node will monitor and // report consensus faults. When enabled, the node will watch for malicious diff --git a/node/impl/full/actor_event_test.go b/node/impl/full/actor_event_test.go new file mode 100644 index 000000000..a7778ced5 --- /dev/null +++ b/node/impl/full/actor_event_test.go @@ -0,0 +1,112 @@ +package full + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" +) + +func TestParseHeightRange(t *testing.T) { + epochPtr := func(i int) *abi.ChainEpoch { + e := abi.ChainEpoch(i) + return &e + } + + tcs := map[string]struct { + heaviest abi.ChainEpoch + from *abi.ChainEpoch + to *abi.ChainEpoch + maxRange abi.ChainEpoch + minOut abi.ChainEpoch + maxOut abi.ChainEpoch + errStr string + }{ + "fails when both are specified and range is greater than max allowed range": { + heaviest: 100, + from: epochPtr(256), + to: epochPtr(512), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too large", + }, + "fails when min is specified and range is greater than max allowed range": { + heaviest: 500, + from: epochPtr(16), + to: nil, + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "'from' height is too far in the past", + }, + "fails when max is specified and range is greater than max allowed range": { + heaviest: 500, + from: nil, + to: epochPtr(65536), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "'to' height is too far in the future", + }, + "fails when from is greater than to": { + heaviest: 100, + from: epochPtr(512), + to: epochPtr(256), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "must be after", + }, + "works when range is valid (nil from)": { + heaviest: 500, + from: nil, + to: epochPtr(48), + maxRange: 1000, + minOut: -1, + maxOut: 48, + }, + "works when range is valid (nil to)": { + heaviest: 500, + from: epochPtr(0), + to: nil, + maxRange: 1000, + minOut: 0, + maxOut: -1, + }, + "works when range is valid (nil from and to)": { + heaviest: 500, + from: nil, + to: nil, + maxRange: 1000, + minOut: -1, + maxOut: -1, + }, + "works when range is valid and specified": { + heaviest: 500, + from: epochPtr(16), + to: epochPtr(48), + maxRange: 1000, + minOut: 16, + maxOut: 48, + }, + } + + for name, tc := range tcs { + tc2 := tc + t.Run(name, func(t *testing.T) { + min, max, err := parseHeightRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange) + require.Equal(t, tc2.minOut, min) + require.Equal(t, tc2.maxOut, max) + if tc2.errStr != "" { + fmt.Println(err) + require.Error(t, err) + require.Contains(t, err.Error(), tc2.errStr) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/node/impl/full/actor_events.go b/node/impl/full/actor_events.go index 60b086284..6ee62d3cb 100644 --- a/node/impl/full/actor_events.go +++ b/node/impl/full/actor_events.go @@ -3,7 +3,6 @@ package full import ( "context" "fmt" - "strings" "github.com/ipfs/go-cid" "go.uber.org/fx" @@ -18,7 +17,7 @@ import ( type ActorEventAPI interface { GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) - SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) + SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) } var ( @@ -39,18 +38,26 @@ type ActorEventsAPI struct { ActorEventAPI } -func (a *ActorEventHandler) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) { +func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) { if a.EventFilterManager == nil { return nil, api.ErrNotSupported } - params, err := a.parseFilter(filter) + if evtFilter == nil { + evtFilter = &types.ActorEventFilter{} + } + params, err := a.parseFilter(*evtFilter) if err != nil { return nil, err } - // Create a temporary filter - f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, filter.Addresses, filter.Fields, false) + // Install a filter just for this call, collect events, remove the filter + + tipSetCid, err := params.GetTipSetCid() + if err != nil { + return nil, fmt.Errorf("failed to get tipset cid: %w", err) + } + f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) if err != nil { return nil, err } @@ -63,33 +70,34 @@ func (a *ActorEventHandler) GetActorEvents(ctx context.Context, filter *types.Ac type filterParams struct { MinHeight abi.ChainEpoch MaxHeight abi.ChainEpoch - TipSetCid cid.Cid + TipSetKey types.TipSetKey } -func (a *ActorEventHandler) parseFilter(f *types.ActorEventFilter) (*filterParams, error) { - if f.TipSetCid != nil { - if len(f.FromEpoch) != 0 || len(f.ToEpoch) != 0 { - return nil, fmt.Errorf("cannot specify both TipSetCid and FromEpoch/ToEpoch") +func (fp filterParams) GetTipSetCid() (cid.Cid, error) { + if fp.TipSetKey.IsEmpty() { + return cid.Undef, nil + } + return fp.TipSetKey.Cid() +} + +func (a *ActorEventHandler) parseFilter(f types.ActorEventFilter) (*filterParams, error) { + if f.TipSetKey != nil && !f.TipSetKey.IsEmpty() { + if f.FromHeight != nil || f.ToHeight != nil { + return nil, fmt.Errorf("cannot specify both TipSetKey and FromHeight/ToHeight") } + tsk := types.EmptyTSK + if f.TipSetKey != nil { + tsk = *f.TipSetKey + } return &filterParams{ MinHeight: 0, MaxHeight: 0, - TipSetCid: *f.TipSetCid, + TipSetKey: tsk, }, nil } - from := f.FromEpoch - if len(from) != 0 && from != "latest" && from != "earliest" && !strings.HasPrefix(from, "0x") { - from = "0x" + from - } - - to := f.ToEpoch - if len(to) != 0 && to != "latest" && to != "earliest" && !strings.HasPrefix(to, "0x") { - to = "0x" + to - } - - min, max, err := parseBlockRange(a.Chain.GetHeaviestTipSet().Height(), &from, &to, a.MaxFilterHeightRange) + min, max, err := parseHeightRange(a.Chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.MaxFilterHeightRange) if err != nil { return nil, err } @@ -97,21 +105,69 @@ func (a *ActorEventHandler) parseFilter(f *types.ActorEventFilter) (*filterParam return &filterParams{ MinHeight: min, MaxHeight: max, - TipSetCid: cid.Undef, + TipSetKey: types.EmptyTSK, }, nil } -func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +// parseHeightRange is similar to eth's parseBlockRange but with slightly different semantics but +// results in equivalent values that we can plug in to the EventFilterManager. +// +// * Uses "height", allowing for nillable values rather than strings +// * No "latest" and "earliest", those are now represented by nil on the way in and -1 on the way out +// * No option for hex representation +func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEpoch, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) { + if fromHeight != nil && *fromHeight < 0 { + return 0, 0, fmt.Errorf("range 'from' must be greater than or equal to 0") + } + if fromHeight == nil { + minHeight = -1 + } else { + minHeight = *fromHeight + } + if toHeight == nil { + maxHeight = -1 + } else { + maxHeight = *toHeight + } + + // Validate height ranges are within limits set by node operator + if minHeight == -1 && maxHeight > 0 { + // Here the client is looking for events between the head and some future height + if maxHeight-heaviest > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: 'to' height is too far in the future (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight == -1 { + // Here the client is looking for events between some time in the past and the current head + if heaviest-minHeight > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: 'from' height is too far in the past (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight >= 0 { + if minHeight > maxHeight { + return 0, 0, fmt.Errorf("invalid epoch range: 'to' height (%d) must be after 'from' height (%d)", minHeight, maxHeight) + } else if maxHeight-minHeight > maxRange { + return 0, 0, fmt.Errorf("invalid epoch range: range between to and 'from' heights is too large (maximum: %d)", maxRange) + } + } + return minHeight, maxHeight, nil +} + +func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { if a.EventFilterManager == nil { return nil, api.ErrNotSupported } - - params, err := a.parseFilter(&f.Filter) + if evtFilter == nil { + evtFilter = &types.ActorEventFilter{} + } + params, err := a.parseFilter(*evtFilter) if err != nil { return nil, err } - fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, f.Filter.Addresses, f.Filter.Fields, false) + tipSetCid, err := params.GetTipSetCid() + if err != nil { + return nil, fmt.Errorf("failed to get tipset cid: %w", err) + } + fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false) if err != nil { return nil, err } @@ -128,30 +184,29 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.S _ = a.EventFilterManager.Remove(ctx, fm.ID()) }() - if f.Prefill { - evs, err := getCollected(ctx, fm) - if err != nil { - log.Errorf("failed to get collected events: %w", err) - return - } + evs, err := getCollected(ctx, fm) + if err != nil { + log.Errorf("failed to get collected events: %w", err) + return + } - for _, ev := range evs { - ev := ev - select { - case out <- ev: - case <-ctx.Done(): - return - default: - log.Errorf("closing event subscription due to slow reader") - return - } + for _, ev := range evs { + ev := ev + select { + case out <- ev: + case <-ctx.Done(): + return + default: + // TODO: need to fix this, buffer of 25 isn't going to work for prefill without a _really_ fast client or a small number of events + log.Errorf("closing event subscription due to slow reader") + return } } in := make(chan interface{}, 256) fm.SetSubChannel(in) - for { + for ctx.Err() == nil { select { case val, ok := <-in: if !ok { @@ -164,24 +219,19 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.S log.Errorf("got unexpected value from event filter: %T", val) return } - c, err := ce.TipSetKey.Cid() - if err != nil { - log.Errorf("failed to get tipset cid: %w", err) - return - } ev := &types.ActorEvent{ - Entries: ce.Entries, - EmitterAddr: ce.EmitterAddr, - Reverted: ce.Reverted, - Height: ce.Height, - TipSetCid: c, - MsgCid: ce.MsgCid, + Entries: ce.Entries, + Emitter: ce.EmitterAddr, + Reverted: ce.Reverted, + Height: ce.Height, + TipSetKey: ce.TipSetKey, + MsgCid: ce.MsgCid, } select { case out <- ev: - default: + default: // TODO: need to fix this to be more intelligent about the consumption rate vs the accumulation rate log.Errorf("closing event subscription due to slow reader") return } @@ -204,22 +254,14 @@ func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEve var out []*types.ActorEvent for _, e := range ces { - e := e - c, err := e.TipSetKey.Cid() - if err != nil { - return nil, fmt.Errorf("failed to get tipset cid: %w", err) - } - - ev := &types.ActorEvent{ - Entries: e.Entries, - EmitterAddr: e.EmitterAddr, - Reverted: e.Reverted, - Height: e.Height, - TipSetCid: c, - MsgCid: e.MsgCid, - } - - out = append(out, ev) + out = append(out, &types.ActorEvent{ + Entries: e.Entries, + Emitter: e.EmitterAddr, + Reverted: e.Reverted, + Height: e.Height, + TipSetKey: e.TipSetKey, + MsgCid: e.MsgCid, + }) } return out, nil diff --git a/node/impl/full/dummy.go b/node/impl/full/dummy.go index 9adb93a27..497c896ac 100644 --- a/node/impl/full/dummy.go +++ b/node/impl/full/dummy.go @@ -198,7 +198,7 @@ func (a *ActorEventDummy) GetActorEvents(ctx context.Context, filter *types.Acto return nil, ErrActorEventModuleDisabled } -func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { +func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) { return nil, ErrActorEventModuleDisabled } diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 5c6f29d66..c7529c0b6 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1264,6 +1264,11 @@ func (e *EthEventHandler) EthGetFilterLogs(ctx context.Context, id ethtypes.EthF return nil, xerrors.Errorf("wrong filter type") } +// parseBlockRange is similar to actor event's parseHeightRange but with slightly different semantics +// +// * "block" instead of "height" +// * strings that can have "latest" and "earliest" and nil +// * hex strings for actual heights func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) { if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 { minHeight = heaviest diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index ddab4eda3..1e790179c 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -164,14 +164,14 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc } } -func ActorEventHandler(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) { +func ActorEventHandler(enable bool, fevmCfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) { return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) { ee := &full.ActorEventHandler{ - MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange), + MaxFilterHeightRange: abi.ChainEpoch(fevmCfg.Events.MaxFilterHeightRange), Chain: cs, } - if !cfg.EnableActorEventsAPI || cfg.Events.DisableRealTimeFilterAPI { + if !enable || fevmCfg.Events.DisableRealTimeFilterAPI { // all Actor events functionality is disabled return ee, nil }