feat(events): add "Raw" suffix to {Get,Subscribe}ActorEvents
This is done with the intention to add new {Get,Subscribe}ActorEvents in a future release (i.e. soon!) with both decoded values (dag-json represented) and simplified (no flags or codec). But because this comes with some trade-offs wrt fidelity of information (e.g. likely needing to drop events with badly encoded values, and not retaining original codec), we need to also have a Raw form of these APIs for consumers that want to take on the burden of consuming them as they are.
This commit is contained in:
parent
81f4645fca
commit
d7f59b3d74
10
CHANGELOG.md
10
CHANGELOG.md
@ -144,20 +144,22 @@ Additionally, Filecoin is not Ethereum no matter how much we try to provide API/
|
||||
|
||||
[handlefilecoinmethod]: https://fips.filecoin.io/FIPS/fip-0054.html#handlefilecoinmethod-general-handler-for-method-numbers--1024
|
||||
|
||||
### GetActorEvents and SubscribeActorEvents
|
||||
### GetActorEventsRaw and SubscribeActorEventsRaw
|
||||
|
||||
[FIP-0049](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0049.md) introduced _Actor Events_ that can be emitted by user programmed actors. [FIP-0083](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0083.md) introduces new events emitted by the builtin Verified Registry, Miner and Market Actors. These new events for builtin actors are being activated with network version 22 to coincide with _Direct Data Onboarding_ as defined in [FIP-0076](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0076.md) which introduces additional flexibility for data onboarding. Sector, Deal and DataCap lifecycles can be tracked with these events, providing visibility and options for programmatic responses to changes in state.
|
||||
|
||||
Actor events are available on message receipts, but can now be retrieved from a node using the new `GetActorEvents` and `SubscribeActorEvents` methods. These methods allow for querying and subscribing to actor events, respectively. They depend on the Lotus node both collecting events (with `Events.RealTimeFilterAPI` and `Events.HistoricFilterAPI`) and being enabled with the new configuration option `Events.EnableActorEventsAPI`. Note that a Lotus node can only respond to requests for historic events that it retains in its event store.
|
||||
Actor events are available on message receipts, but can now be retrieved from a node using the new `GetActorEventsRaw` and `SubscribeActorEventsRaw` methods. These methods allow for querying and subscribing to actor events, respectively. They depend on the Lotus node both collecting events (with `Fevm.Events.RealTimeFilterAPI` and `Fevm.Events.HistoricFilterAPI`) and being enabled with the new configuration option `Events.EnableActorEventsAPI`. Note that a Lotus node can only respond to requests for historic events that it retains in its event store.
|
||||
|
||||
Both `GetActorEvents` and `SubscribeActorEvents` take a filter parameter which can optionally filter events on:
|
||||
Both `GetActorEventsRaw` and `SubscribeActorEventsRaw` take a filter parameter which can optionally filter events on:
|
||||
|
||||
* `Addresses` of the actor(s) emitting the event
|
||||
* Specific `Fields` within the event
|
||||
* `FromHeight` and `ToHeight` to filter events by block height
|
||||
* `TipSetKey` to restrict events contained within a specific tipset
|
||||
|
||||
`GetActorEvents` provides a one-time query for actor events, while `SubscribeActorEvents` provides a long-lived connection (via websockets) to the Lotus node, allowing for real-time updates on actor events. The subscription can be cancelled by the client at any time.
|
||||
`GetActorEventsRaw` provides a one-time query for actor events, while `SubscribeActorEventsRaw` provides a long-lived connection (via websockets) to the Lotus node, allowing for real-time updates on actor events. The subscription can be cancelled by the client at any time.
|
||||
|
||||
A future Lotus release may include `GetActorEvents` and `SubscribeActorEvents` methods which will provide a more user-friendly interface to actor events, including deserialization of event data.
|
||||
|
||||
### Events Configuration Changes
|
||||
|
||||
|
@ -909,16 +909,16 @@ type FullNode interface {
|
||||
|
||||
// Actor events
|
||||
|
||||
// GetActorEvents returns all user-programmed and built-in actor events that match the given
|
||||
// GetActorEventsRaw returns all user-programmed and built-in actor events that match the given
|
||||
// filter.
|
||||
// This is a request/response API.
|
||||
// Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange
|
||||
// configuration options and also the amount of historical data available in the node.
|
||||
//
|
||||
// This is an EXPERIMENTAL API and may be subject to change.
|
||||
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) //perm:read
|
||||
GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) //perm:read
|
||||
|
||||
// SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor
|
||||
// SubscribeActorEventsRaw returns a long-lived stream of all user-programmed and built-in actor
|
||||
// events that match the given filter.
|
||||
// Events that match the given filter are written to the stream in real-time as they are emitted
|
||||
// from the FVM.
|
||||
@ -932,7 +932,7 @@ type FullNode interface {
|
||||
//
|
||||
// Note: this API is only available via websocket connections.
|
||||
// This is an EXPERIMENTAL API and may be subject to change.
|
||||
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read
|
||||
SubscribeActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read
|
||||
}
|
||||
|
||||
// reverse interface to the client, called after EthSubscribe
|
||||
|
@ -130,7 +130,7 @@ type Gateway interface {
|
||||
EthTraceBlock(ctx context.Context, blkNum string) ([]*ethtypes.EthTraceBlock, error)
|
||||
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
|
||||
|
||||
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||
GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||
SubscribeActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||
ChainGetEvents(context.Context, cid.Cid) ([]types.Event, error)
|
||||
}
|
||||
|
@ -1626,19 +1626,19 @@ func (mr *MockFullNodeMockRecorder) GasEstimateMessageGas(arg0, arg1, arg2, arg3
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasEstimateMessageGas", reflect.TypeOf((*MockFullNode)(nil).GasEstimateMessageGas), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// GetActorEvents mocks base method.
|
||||
func (m *MockFullNode) GetActorEvents(arg0 context.Context, arg1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
// GetActorEventsRaw mocks base method.
|
||||
func (m *MockFullNode) GetActorEventsRaw(arg0 context.Context, arg1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetActorEvents", arg0, arg1)
|
||||
ret := m.ctrl.Call(m, "GetActorEventsRaw", arg0, arg1)
|
||||
ret0, _ := ret[0].([]*types.ActorEvent)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetActorEvents indicates an expected call of GetActorEvents.
|
||||
func (mr *MockFullNodeMockRecorder) GetActorEvents(arg0, arg1 interface{}) *gomock.Call {
|
||||
// GetActorEventsRaw indicates an expected call of GetActorEventsRaw.
|
||||
func (mr *MockFullNodeMockRecorder) GetActorEventsRaw(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActorEvents", reflect.TypeOf((*MockFullNode)(nil).GetActorEvents), arg0, arg1)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActorEventsRaw", reflect.TypeOf((*MockFullNode)(nil).GetActorEventsRaw), arg0, arg1)
|
||||
}
|
||||
|
||||
// ID mocks base method.
|
||||
@ -3983,19 +3983,19 @@ func (mr *MockFullNodeMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 in
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockFullNode)(nil).StateWaitMsg), arg0, arg1, arg2, arg3, arg4)
|
||||
}
|
||||
|
||||
// SubscribeActorEvents mocks base method.
|
||||
func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
// SubscribeActorEventsRaw mocks base method.
|
||||
func (m *MockFullNode) SubscribeActorEventsRaw(arg0 context.Context, arg1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SubscribeActorEvents", arg0, arg1)
|
||||
ret := m.ctrl.Call(m, "SubscribeActorEventsRaw", arg0, arg1)
|
||||
ret0, _ := ret[0].(<-chan *types.ActorEvent)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// SubscribeActorEvents indicates an expected call of SubscribeActorEvents.
|
||||
func (mr *MockFullNodeMockRecorder) SubscribeActorEvents(arg0, arg1 interface{}) *gomock.Call {
|
||||
// SubscribeActorEventsRaw indicates an expected call of SubscribeActorEventsRaw.
|
||||
func (mr *MockFullNodeMockRecorder) SubscribeActorEventsRaw(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeActorEvents", reflect.TypeOf((*MockFullNode)(nil).SubscribeActorEvents), arg0, arg1)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeActorEventsRaw", reflect.TypeOf((*MockFullNode)(nil).SubscribeActorEventsRaw), arg0, arg1)
|
||||
}
|
||||
|
||||
// SyncCheckBad mocks base method.
|
||||
|
@ -335,7 +335,7 @@ type FullNodeMethods struct {
|
||||
|
||||
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) `perm:"read"`
|
||||
|
||||
GetActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) `perm:"read"`
|
||||
GetActorEventsRaw func(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) `perm:"read"`
|
||||
|
||||
MarketAddBalance func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (cid.Cid, error) `perm:"sign"`
|
||||
|
||||
@ -591,7 +591,7 @@ type FullNodeMethods struct {
|
||||
|
||||
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `perm:"read"`
|
||||
|
||||
SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"`
|
||||
SubscribeActorEventsRaw func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"`
|
||||
|
||||
SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"`
|
||||
|
||||
@ -761,7 +761,7 @@ type GatewayMethods struct {
|
||||
|
||||
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) ``
|
||||
|
||||
GetActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) ``
|
||||
GetActorEventsRaw func(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) ``
|
||||
|
||||
MinerGetBaseInfo func(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*MiningBaseInfo, error) ``
|
||||
|
||||
@ -837,7 +837,7 @@ type GatewayMethods struct {
|
||||
|
||||
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) ``
|
||||
|
||||
SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) ``
|
||||
SubscribeActorEventsRaw func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) ``
|
||||
|
||||
Version func(p0 context.Context) (APIVersion, error) ``
|
||||
|
||||
@ -2594,14 +2594,14 @@ func (s *FullNodeStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messa
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) GetActorEvents(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
if s.Internal.GetActorEvents == nil {
|
||||
func (s *FullNodeStruct) GetActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
if s.Internal.GetActorEventsRaw == nil {
|
||||
return *new([]*types.ActorEvent), ErrNotSupported
|
||||
}
|
||||
return s.Internal.GetActorEvents(p0, p1)
|
||||
return s.Internal.GetActorEventsRaw(p0, p1)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) GetActorEvents(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
func (s *FullNodeStub) GetActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
return *new([]*types.ActorEvent), ErrNotSupported
|
||||
}
|
||||
|
||||
@ -4002,14 +4002,14 @@ func (s *FullNodeStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
if s.Internal.SubscribeActorEvents == nil {
|
||||
func (s *FullNodeStruct) SubscribeActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
if s.Internal.SubscribeActorEventsRaw == nil {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
return s.Internal.SubscribeActorEvents(p0, p1)
|
||||
return s.Internal.SubscribeActorEventsRaw(p0, p1)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
func (s *FullNodeStub) SubscribeActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
@ -4871,14 +4871,14 @@ func (s *GatewayStub) GasEstimateMessageGas(p0 context.Context, p1 *types.Messag
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *GatewayStruct) GetActorEvents(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
if s.Internal.GetActorEvents == nil {
|
||||
func (s *GatewayStruct) GetActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
if s.Internal.GetActorEventsRaw == nil {
|
||||
return *new([]*types.ActorEvent), ErrNotSupported
|
||||
}
|
||||
return s.Internal.GetActorEvents(p0, p1)
|
||||
return s.Internal.GetActorEventsRaw(p0, p1)
|
||||
}
|
||||
|
||||
func (s *GatewayStub) GetActorEvents(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
func (s *GatewayStub) GetActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
return *new([]*types.ActorEvent), ErrNotSupported
|
||||
}
|
||||
|
||||
@ -5289,14 +5289,14 @@ func (s *GatewayStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p3
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *GatewayStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
if s.Internal.SubscribeActorEvents == nil {
|
||||
func (s *GatewayStruct) SubscribeActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
if s.Internal.SubscribeActorEventsRaw == nil {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
return s.Internal.SubscribeActorEvents(p0, p1)
|
||||
return s.Internal.SubscribeActorEventsRaw(p0, p1)
|
||||
}
|
||||
|
||||
func (s *GatewayStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
func (s *GatewayStub) SubscribeActorEventsRaw(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
|
@ -116,7 +116,7 @@
|
||||
* [GasEstimateGasPremium](#GasEstimateGasPremium)
|
||||
* [GasEstimateMessageGas](#GasEstimateMessageGas)
|
||||
* [Get](#Get)
|
||||
* [GetActorEvents](#GetActorEvents)
|
||||
* [GetActorEventsRaw](#GetActorEventsRaw)
|
||||
* [I](#I)
|
||||
* [ID](#ID)
|
||||
* [Log](#Log)
|
||||
@ -285,7 +285,7 @@
|
||||
* [StateVerifierStatus](#StateVerifierStatus)
|
||||
* [StateWaitMsg](#StateWaitMsg)
|
||||
* [Subscribe](#Subscribe)
|
||||
* [SubscribeActorEvents](#SubscribeActorEvents)
|
||||
* [SubscribeActorEventsRaw](#SubscribeActorEventsRaw)
|
||||
* [Sync](#Sync)
|
||||
* [SyncCheckBad](#SyncCheckBad)
|
||||
* [SyncCheckpoint](#SyncCheckpoint)
|
||||
@ -3389,8 +3389,8 @@ Response:
|
||||
## Get
|
||||
|
||||
|
||||
### GetActorEvents
|
||||
GetActorEvents returns all user-programmed and built-in actor events that match the given
|
||||
### GetActorEventsRaw
|
||||
GetActorEventsRaw returns all user-programmed and built-in actor events that match the given
|
||||
filter.
|
||||
This is a request/response API.
|
||||
Results available from this API may be limited by the MaxFilterResults and MaxFilterHeightRange
|
||||
@ -8831,8 +8831,8 @@ Response:
|
||||
## Subscribe
|
||||
|
||||
|
||||
### SubscribeActorEvents
|
||||
SubscribeActorEvents returns a long-lived stream of all user-programmed and built-in actor
|
||||
### SubscribeActorEventsRaw
|
||||
SubscribeActorEventsRaw returns a long-lived stream of all user-programmed and built-in actor
|
||||
events that match the given filter.
|
||||
Events that match the given filter are written to the stream in real-time as they are emitted
|
||||
from the FVM.
|
||||
|
@ -147,8 +147,8 @@ type TargetAPI interface {
|
||||
EthTraceBlock(ctx context.Context, blkNum string) ([]*ethtypes.EthTraceBlock, error)
|
||||
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
|
||||
|
||||
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||
GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||
SubscribeActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||
ChainGetEvents(ctx context.Context, eventsRoot cid.Cid) ([]types.Event, error)
|
||||
}
|
||||
|
||||
|
@ -437,18 +437,28 @@ func (gw *Node) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64
|
||||
return gw.target.StateWaitMsg(ctx, msg, confidence, limit, allowReplaced)
|
||||
}
|
||||
|
||||
func (gw *Node) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
func (gw *Node) GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return gw.target.GetActorEvents(ctx, filter)
|
||||
if filter != nil && filter.FromHeight != nil {
|
||||
if err := gw.checkTipSetHeight(ctx, *filter.FromHeight, types.EmptyTSK); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return gw.target.GetActorEventsRaw(ctx, filter)
|
||||
}
|
||||
|
||||
func (gw *Node) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
func (gw *Node) SubscribeActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return gw.target.SubscribeActorEvents(ctx, filter)
|
||||
if filter != nil && filter.FromHeight != nil {
|
||||
if err := gw.checkTipSetHeight(ctx, *filter.FromHeight, types.EmptyTSK); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return gw.target.SubscribeActorEventsRaw(ctx, filter)
|
||||
}
|
||||
|
||||
func (gw *Node) ChainGetEvents(ctx context.Context, eventsRoot cid.Cid) ([]types.Event, error) {
|
||||
|
@ -274,7 +274,7 @@ func TestOnboardMixedMarketDDO(t *testing.T) {
|
||||
|
||||
// check "deal-published" actor event
|
||||
var epochZero abi.ChainEpoch
|
||||
allEvents, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
|
||||
allEvents, err := miner.FullNode.GetActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
FromHeight: &epochZero,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -71,14 +71,14 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
/* --- Setup subscription channels for ActorEvents --- */
|
||||
|
||||
// subscribe only to miner's actor events
|
||||
minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||
minerEvtsChan, err := miner.FullNode.SubscribeActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
Addresses: []address.Address{miner.ActorAddr},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// subscribe only to sector-activated events
|
||||
sectorActivatedCbor := must.One(ipld.Encode(basicnode.NewString("sector-activated"), dagcbor.Encode))
|
||||
sectorActivatedEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||
sectorActivatedEvtsChan, err := miner.FullNode.SubscribeActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
Fields: map[string][]types.ActorEventBlock{
|
||||
"$type": {
|
||||
{Codec: uint64(multicodec.Cbor), Value: sectorActivatedCbor},
|
||||
@ -127,7 +127,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// subscribe to actor events up until the current head
|
||||
initialEventsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||
initialEventsChan, err := miner.FullNode.SubscribeActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
FromHeight: epochPtr(0),
|
||||
ToHeight: epochPtr(int64(head.Height())),
|
||||
})
|
||||
@ -284,19 +284,19 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
},
|
||||
}, claims)
|
||||
|
||||
// construct ActorEvents from GetActorEvents API
|
||||
t.Logf("Inspecting full events list from GetActorEvents")
|
||||
allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
|
||||
// construct ActorEvents from GetActorEventsRaw API
|
||||
t.Logf("Inspecting full events list from GetActorEventsRaw")
|
||||
allEvtsFromGetAPI, err := miner.FullNode.GetActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
FromHeight: epochPtr(0),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
fmt.Println("Events from GetActorEvents:")
|
||||
fmt.Println("Events from GetActorEventsRaw:")
|
||||
printEvents(t, allEvtsFromGetAPI)
|
||||
// compare events from messages and receipts with events from GetActorEvents API
|
||||
// compare events from messages and receipts with events from GetActorEventsRaw API
|
||||
require.Equal(t, eventsFromMessages, allEvtsFromGetAPI)
|
||||
|
||||
// construct ActorEvents from subscription channel for just the miner actor
|
||||
t.Logf("Inspecting only miner's events list from SubscribeActorEvents")
|
||||
t.Logf("Inspecting only miner's events list from SubscribeActorEventsRaw")
|
||||
var subMinerEvts []*types.ActorEvent
|
||||
for evt := range minerEvtsChan {
|
||||
subMinerEvts = append(subMinerEvts, evt)
|
||||
@ -325,7 +325,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
}
|
||||
require.Len(t, sectorActivatedEvts, 2) // sanity check
|
||||
|
||||
t.Logf("Inspecting only sector-activated events list from real-time SubscribeActorEvents")
|
||||
t.Logf("Inspecting only sector-activated events list from real-time SubscribeActorEventsRaw")
|
||||
var subscribedSectorActivatedEvts []*types.ActorEvent
|
||||
for evt := range sectorActivatedEvtsChan {
|
||||
subscribedSectorActivatedEvts = append(subscribedSectorActivatedEvts, evt)
|
||||
@ -337,8 +337,8 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
require.Equal(t, sectorActivatedEvts, subscribedSectorActivatedEvts)
|
||||
|
||||
// same thing but use historical event fetching to see the same list
|
||||
t.Logf("Inspecting only sector-activated events list from historical SubscribeActorEvents")
|
||||
sectorActivatedEvtsChan, err = miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||
t.Logf("Inspecting only sector-activated events list from historical SubscribeActorEventsRaw")
|
||||
sectorActivatedEvtsChan, err = miner.FullNode.SubscribeActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
Fields: map[string][]types.ActorEventBlock{
|
||||
"$type": {
|
||||
{Codec: uint64(multicodec.Cbor), Value: sectorActivatedCbor},
|
||||
@ -358,7 +358,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
require.Equal(t, sectorActivatedEvts, subscribedSectorActivatedEvts)
|
||||
|
||||
// check that our `ToHeight` filter works as expected
|
||||
t.Logf("Inspecting only initial list of events SubscribeActorEvents with ToHeight")
|
||||
t.Logf("Inspecting only initial list of events SubscribeActorEventsRaw with ToHeight")
|
||||
var initialEvents []*types.ActorEvent
|
||||
for evt := range initialEventsChan {
|
||||
initialEvents = append(initialEvents, evt)
|
||||
@ -367,8 +367,8 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
require.Equal(t, eventsFromMessages[0:6], initialEvents)
|
||||
|
||||
// construct ActorEvents from subscription channel for all actor events
|
||||
t.Logf("Inspecting full events list from historical SubscribeActorEvents")
|
||||
allEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||
t.Logf("Inspecting full events list from historical SubscribeActorEventsRaw")
|
||||
allEvtsChan, err := miner.FullNode.SubscribeActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
FromHeight: epochPtr(0),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
@ -383,7 +383,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
||||
require.Equal(t, eventsFromMessages, prefillEvts)
|
||||
t.Logf("All done comparing events")
|
||||
|
||||
// NOTE: There is a delay in finishing this test because the SubscribeActorEvents
|
||||
// NOTE: There is a delay in finishing this test because the SubscribeActorEventsRaw
|
||||
// with the ToHeight (initialEventsChan) has to wait at least a full actual epoch before
|
||||
// realising that there's no more events for that filter. itests run with a different block
|
||||
// speed than the ActorEventHandler is aware of.
|
||||
|
@ -173,7 +173,7 @@ loop:
|
||||
|
||||
// check "sector-terminated" actor event
|
||||
var epochZero abi.ChainEpoch
|
||||
allEvents, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
|
||||
allEvents, err := miner.FullNode.GetActorEventsRaw(ctx, &types.ActorEventFilter{
|
||||
FromHeight: &epochZero,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -18,8 +18,8 @@ import (
|
||||
)
|
||||
|
||||
type ActorEventAPI interface {
|
||||
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||
GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||
SubscribeActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -89,7 +89,7 @@ func NewActorEventHandlerWithClock(
|
||||
}
|
||||
}
|
||||
|
||||
func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
func (a *ActorEventHandler) GetActorEventsRaw(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
if a.eventFilterManager == nil {
|
||||
return nil, api.ErrNotSupported
|
||||
}
|
||||
@ -200,7 +200,7 @@ func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEp
|
||||
return minHeight, maxHeight, nil
|
||||
}
|
||||
|
||||
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
func (a *ActorEventHandler) SubscribeActorEventsRaw(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
if a.eventFilterManager == nil {
|
||||
return nil, api.ErrNotSupported
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ func TestParseHeightRange(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetActorEvents(t *testing.T) {
|
||||
func TestGetActorEventsRaw(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
req := require.New(t)
|
||||
|
||||
@ -231,7 +231,7 @@ func TestGetActorEvents(t *testing.T) {
|
||||
|
||||
handler := NewActorEventHandler(chain, efm, 50*time.Millisecond, maxFilterHeightRange)
|
||||
|
||||
gotEvents, err := handler.GetActorEvents(ctx, tc.filter)
|
||||
gotEvents, err := handler.GetActorEventsRaw(ctx, tc.filter)
|
||||
if tc.expectErr != "" {
|
||||
req.Error(err)
|
||||
req.Contains(err.Error(), tc.expectErr)
|
||||
@ -245,7 +245,7 @@ func TestGetActorEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeActorEvents(t *testing.T) {
|
||||
func TestSubscribeActorEventsRaw(t *testing.T) {
|
||||
const (
|
||||
seed = 984651320
|
||||
maxFilterHeightRange = 100
|
||||
@ -300,7 +300,7 @@ func TestSubscribeActorEvents(t *testing.T) {
|
||||
if tc.endEpoch >= 0 {
|
||||
aef.ToHeight = epochPtr(tc.endEpoch)
|
||||
}
|
||||
eventChan, err := handler.SubscribeActorEvents(ctx, aef)
|
||||
eventChan, err := handler.SubscribeActorEventsRaw(ctx, aef)
|
||||
req.NoError(err)
|
||||
|
||||
// assume we can cleanly pick up all historical events in one go
|
||||
@ -411,8 +411,8 @@ func TestSubscribeActorEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) {
|
||||
// Similar to TestSubscribeActorEvents but we set an explicit end that caps out at the current height
|
||||
func TestSubscribeActorEventsRaw_OnlyHistorical(t *testing.T) {
|
||||
// Similar to TestSubscribeActorEventsRaw but we set an explicit end that caps out at the current height
|
||||
const (
|
||||
seed = 984651320
|
||||
maxFilterHeightRange = 100
|
||||
@ -458,7 +458,7 @@ func TestSubscribeActorEvents_OnlyHistorical(t *testing.T) {
|
||||
handler := NewActorEventHandlerWithClock(mockChain, mockFilterManager, blockDelay, maxFilterHeightRange, mockClock)
|
||||
|
||||
aef := &types.ActorEventFilter{FromHeight: epochPtr(0), ToHeight: epochPtr(currentHeight)}
|
||||
eventChan, err := handler.SubscribeActorEvents(ctx, aef)
|
||||
eventChan, err := handler.SubscribeActorEventsRaw(ctx, aef)
|
||||
req.NoError(err)
|
||||
|
||||
var gotEvents []*types.ActorEvent
|
||||
|
@ -194,11 +194,11 @@ var ErrActorEventModuleDisabled = errors.New("module disabled, enable with Fevm.
|
||||
|
||||
type ActorEventDummy struct{}
|
||||
|
||||
func (a *ActorEventDummy) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
func (a *ActorEventDummy) GetActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||
return nil, ErrActorEventModuleDisabled
|
||||
}
|
||||
|
||||
func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
func (a *ActorEventDummy) SubscribeActorEventsRaw(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||
return nil, ErrActorEventModuleDisabled
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user