Adjust actor event API after review
This commit is contained in:
parent
ce38c31121
commit
2194eacc0f
@ -919,7 +919,7 @@ type FullNode interface {
|
|||||||
// This API also allows clients to read all historical events matching the given filter before
|
// This API also allows clients to read all historical events matching the given filter before
|
||||||
// any real-time events are written to the response stream.
|
// any real-time events are written to the response stream.
|
||||||
// NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW
|
// NOTE: THIS API IS ONLY SUPPORTED OVER WEBSOCKETS FOR NOW
|
||||||
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read
|
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) //perm:read
|
||||||
}
|
}
|
||||||
|
|
||||||
// reverse interface to the client, called after EthSubscribe
|
// reverse interface to the client, called after EthSubscribe
|
||||||
|
@ -131,5 +131,5 @@ type Gateway interface {
|
|||||||
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
|
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
|
||||||
|
|
||||||
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||||
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error)
|
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||||
}
|
}
|
||||||
|
@ -430,25 +430,8 @@ func init() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
FromEpoch: "earliest",
|
FromHeight: epochPtr(1010),
|
||||||
ToEpoch: "latest",
|
ToHeight: epochPtr(1020),
|
||||||
})
|
|
||||||
|
|
||||||
addExample(&types.SubActorEventFilter{
|
|
||||||
Filter: types.ActorEventFilter{
|
|
||||||
Addresses: []address.Address{addr},
|
|
||||||
Fields: map[string][]types.ActorEventBlock{
|
|
||||||
"abc": {
|
|
||||||
{
|
|
||||||
Codec: 0x51,
|
|
||||||
Value: []byte("ddata"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
FromEpoch: "earliest",
|
|
||||||
ToEpoch: "latest",
|
|
||||||
},
|
|
||||||
Prefill: true,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -555,6 +538,11 @@ func exampleStruct(method string, t, parent reflect.Type) interface{} {
|
|||||||
return ns.Interface()
|
return ns.Interface()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func epochPtr(ei int64) *abi.ChainEpoch {
|
||||||
|
ep := abi.ChainEpoch(ei)
|
||||||
|
return &ep
|
||||||
|
}
|
||||||
|
|
||||||
type Visitor struct {
|
type Visitor struct {
|
||||||
Root string
|
Root string
|
||||||
Methods map[string]ast.Node
|
Methods map[string]ast.Node
|
||||||
|
@ -3984,7 +3984,7 @@ func (mr *MockFullNodeMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 in
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeActorEvents mocks base method.
|
// SubscribeActorEvents mocks base method.
|
||||||
func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (m *MockFullNode) SubscribeActorEvents(arg0 context.Context, arg1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "SubscribeActorEvents", arg0, arg1)
|
ret := m.ctrl.Call(m, "SubscribeActorEvents", arg0, arg1)
|
||||||
ret0, _ := ret[0].(<-chan *types.ActorEvent)
|
ret0, _ := ret[0].(<-chan *types.ActorEvent)
|
||||||
|
@ -591,7 +591,7 @@ type FullNodeMethods struct {
|
|||||||
|
|
||||||
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `perm:"read"`
|
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `perm:"read"`
|
||||||
|
|
||||||
SubscribeActorEvents func(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"`
|
SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) `perm:"read"`
|
||||||
|
|
||||||
SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"`
|
SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"`
|
||||||
|
|
||||||
@ -835,7 +835,7 @@ type GatewayMethods struct {
|
|||||||
|
|
||||||
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) ``
|
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) ``
|
||||||
|
|
||||||
SubscribeActorEvents func(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) ``
|
SubscribeActorEvents func(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) ``
|
||||||
|
|
||||||
Version func(p0 context.Context) (APIVersion, error) ``
|
Version func(p0 context.Context) (APIVersion, error) ``
|
||||||
|
|
||||||
@ -4000,14 +4000,14 @@ func (s *FullNodeStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p
|
|||||||
return nil, ErrNotSupported
|
return nil, ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FullNodeStruct) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (s *FullNodeStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
if s.Internal.SubscribeActorEvents == nil {
|
if s.Internal.SubscribeActorEvents == nil {
|
||||||
return nil, ErrNotSupported
|
return nil, ErrNotSupported
|
||||||
}
|
}
|
||||||
return s.Internal.SubscribeActorEvents(p0, p1)
|
return s.Internal.SubscribeActorEvents(p0, p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FullNodeStub) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (s *FullNodeStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
return nil, ErrNotSupported
|
return nil, ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -5276,14 +5276,14 @@ func (s *GatewayStub) StateWaitMsg(p0 context.Context, p1 cid.Cid, p2 uint64, p3
|
|||||||
return nil, ErrNotSupported
|
return nil, ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GatewayStruct) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (s *GatewayStruct) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
if s.Internal.SubscribeActorEvents == nil {
|
if s.Internal.SubscribeActorEvents == nil {
|
||||||
return nil, ErrNotSupported
|
return nil, ErrNotSupported
|
||||||
}
|
}
|
||||||
return s.Internal.SubscribeActorEvents(p0, p1)
|
return s.Internal.SubscribeActorEvents(p0, p1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *GatewayStub) SubscribeActorEvents(p0 context.Context, p1 *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (s *GatewayStub) SubscribeActorEvents(p0 context.Context, p1 *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
return nil, ErrNotSupported
|
return nil, ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Binary file not shown.
Binary file not shown.
@ -18,16 +18,6 @@ type ActorEventBlock struct {
|
|||||||
Value []byte `json:"value"`
|
Value []byte `json:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SubActorEventFilter struct {
|
|
||||||
Filter ActorEventFilter `json:"filter"`
|
|
||||||
|
|
||||||
// If true, all available matching historical events will be written to the response stream
|
|
||||||
// before any new real-time events that match the given filter are written.
|
|
||||||
// If `Prefill` is true and `FromEpoch` is set to latest, the pre-fill operation will become a no-op.
|
|
||||||
// if `Prefill` is false and `FromEpoch` is set to earliest, historical events will still be sent to the client.
|
|
||||||
Prefill bool `json:"prefill"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ActorEventFilter struct {
|
type ActorEventFilter struct {
|
||||||
// Matches events from one of these actors, or any actor if empty.
|
// Matches events from one of these actors, or any actor if empty.
|
||||||
// For now, this MUST be a Filecoin address.
|
// For now, this MUST be a Filecoin address.
|
||||||
@ -37,17 +27,17 @@ type ActorEventFilter struct {
|
|||||||
// If the value is an empty slice, the filter will match on the key only, accepting any value.
|
// If the value is an empty slice, the filter will match on the key only, accepting any value.
|
||||||
Fields map[string][]ActorEventBlock `json:"fields,omitempty"`
|
Fields map[string][]ActorEventBlock `json:"fields,omitempty"`
|
||||||
|
|
||||||
// Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first,
|
// The height of the earliest tipset to include in the query. If empty, the query starts at the
|
||||||
// Optional, default: "latest".
|
// last finalized tipset.
|
||||||
FromEpoch string `json:"fromEpoch,omitempty"`
|
FromHeight *abi.ChainEpoch `json:"fromHeight,omitempty"`
|
||||||
|
|
||||||
// Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first,
|
// The height of the latest tipset to include in the query. If empty, the query ends at the
|
||||||
// Optional, default: "latest".
|
// latest tipset.
|
||||||
ToEpoch string `json:"toEpoch,omitempty"`
|
ToHeight *abi.ChainEpoch `json:"toHeight,omitempty"`
|
||||||
|
|
||||||
// Restricts events returned to those emitted from messages contained in this tipset.
|
// Restricts events returned to those emitted from messages contained in this tipset.
|
||||||
// If `TipSetCid` is present in the filter criteria, then neither `FromEpoch` nor `ToEpoch` are allowed.
|
// If `TipSetKey` is legt empty in the filter criteria, then neither `FromHeight` nor `ToHeight` are allowed.
|
||||||
TipSetCid *cid.Cid `json:"tipsetCid,omitempty"`
|
TipSetKey *TipSetKey `json:"tipsetKey,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ActorEvent struct {
|
type ActorEvent struct {
|
||||||
@ -55,7 +45,7 @@ type ActorEvent struct {
|
|||||||
Entries []EventEntry `json:"entries"`
|
Entries []EventEntry `json:"entries"`
|
||||||
|
|
||||||
// Filecoin address of the actor that emitted this event.
|
// Filecoin address of the actor that emitted this event.
|
||||||
EmitterAddr address.Address `json:"emitter"`
|
Emitter address.Address `json:"emitter"`
|
||||||
|
|
||||||
// Reverted is set to true if the message that produced this event was reverted because of a network re-org
|
// Reverted is set to true if the message that produced this event was reverted because of a network re-org
|
||||||
// in that case, the event should be considered as reverted as well.
|
// in that case, the event should be considered as reverted as well.
|
||||||
@ -64,8 +54,8 @@ type ActorEvent struct {
|
|||||||
// Height of the tipset that contained the message that produced this event.
|
// Height of the tipset that contained the message that produced this event.
|
||||||
Height abi.ChainEpoch `json:"height"`
|
Height abi.ChainEpoch `json:"height"`
|
||||||
|
|
||||||
// CID of the tipset that contained the message that produced this event.
|
// The tipset that contained the message that produced this event.
|
||||||
TipSetCid cid.Cid `json:"tipsetCid"`
|
TipSetKey TipSetKey `json:"tipsetKey"`
|
||||||
|
|
||||||
// CID of message that produced this event.
|
// CID of message that produced this event.
|
||||||
MsgCid cid.Cid `json:"msgCid"`
|
MsgCid cid.Cid `json:"msgCid"`
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
builtintypes "github.com/filecoin-project/go-state-types/builtin"
|
builtintypes "github.com/filecoin-project/go-state-types/builtin"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -29,10 +30,10 @@ func TestActorEventJson(t *testing.T) {
|
|||||||
Value: []byte("value2"),
|
Value: []byte("value2"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
EmitterAddr: randomF4Addr(t, rng),
|
Emitter: randomF4Addr(t, rng),
|
||||||
Reverted: false,
|
Reverted: false,
|
||||||
Height: 1001,
|
Height: 1001,
|
||||||
TipSetCid: randomCid(t, rng),
|
TipSetKey: NewTipSetKey(randomCid(t, rng)),
|
||||||
MsgCid: randomCid(t, rng),
|
MsgCid: randomCid(t, rng),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,7 +47,7 @@ func TestActorEventJson(t *testing.T) {
|
|||||||
require.Equal(t, in, out)
|
require.Equal(t, in, out)
|
||||||
|
|
||||||
s := `
|
s := `
|
||||||
{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetCid":{"/":"bafkqacx3dag26sfht3qlcdi"},"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}}
|
{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetKey":[{"/":"bafkqacx3dag26sfht3qlcdi"}],"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}}
|
||||||
`
|
`
|
||||||
var out2 ActorEvent
|
var out2 ActorEvent
|
||||||
err = json.Unmarshal([]byte(s), &out2)
|
err = json.Unmarshal([]byte(s), &out2)
|
||||||
@ -77,9 +78,9 @@ func TestActorEventBlockJson(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSubActorEventFilterJson(t *testing.T) {
|
func TestSubActorEventFilterJson(t *testing.T) {
|
||||||
c := randomCid(t, pseudo.New(pseudo.NewSource(0)))
|
tsk := NewTipSetKey(randomCid(t, pseudo.New(pseudo.NewSource(0))))
|
||||||
from := "earliest"
|
from := abi.ChainEpoch(0)
|
||||||
to := "latest"
|
to := abi.ChainEpoch(100)
|
||||||
f := ActorEventFilter{
|
f := ActorEventFilter{
|
||||||
Addresses: []address.Address{
|
Addresses: []address.Address{
|
||||||
randomF4Addr(t, pseudo.New(pseudo.NewSource(0))),
|
randomF4Addr(t, pseudo.New(pseudo.NewSource(0))),
|
||||||
@ -99,16 +100,17 @@ func TestSubActorEventFilterJson(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
FromEpoch: from,
|
FromHeight: &from,
|
||||||
ToEpoch: to,
|
ToHeight: &to,
|
||||||
TipSetCid: &c,
|
TipSetKey: &tsk,
|
||||||
}
|
}
|
||||||
|
|
||||||
bz, err := json.Marshal(f)
|
bz, err := json.Marshal(f)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEmpty(t, bz)
|
require.NotEmpty(t, bz)
|
||||||
|
t.Logf("%s", bz)
|
||||||
|
|
||||||
s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromEpoch":"earliest","toEpoch":"latest","tipsetCid":{"/":"bafkqacqbst64f6rp7taeduy"}}`
|
s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromHeight":0,"toHeight":100,"tipsetKey":[{"/":"bafkqacqbst64f6rp7taeduy"}]}`
|
||||||
var out ActorEventFilter
|
var out ActorEventFilter
|
||||||
err = json.Unmarshal([]byte(s), &out)
|
err = json.Unmarshal([]byte(s), &out)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -3411,8 +3411,8 @@ Inputs:
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"fromEpoch": "earliest",
|
"fromHeight": 1010,
|
||||||
"toEpoch": "latest"
|
"toHeight": 1020
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
@ -3432,9 +3432,14 @@ Response:
|
|||||||
"emitter": "f01234",
|
"emitter": "f01234",
|
||||||
"reverted": true,
|
"reverted": true,
|
||||||
"height": 10101,
|
"height": 10101,
|
||||||
"tipsetCid": {
|
"tipsetKey": [
|
||||||
|
{
|
||||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
|
||||||
|
}
|
||||||
|
],
|
||||||
"msgCid": {
|
"msgCid": {
|
||||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||||
}
|
}
|
||||||
@ -8836,7 +8841,6 @@ Inputs:
|
|||||||
```json
|
```json
|
||||||
[
|
[
|
||||||
{
|
{
|
||||||
"filter": {
|
|
||||||
"addresses": [
|
"addresses": [
|
||||||
"f01234"
|
"f01234"
|
||||||
],
|
],
|
||||||
@ -8848,10 +8852,8 @@ Inputs:
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"fromEpoch": "earliest",
|
"fromHeight": 1010,
|
||||||
"toEpoch": "latest"
|
"toHeight": 1020
|
||||||
},
|
|
||||||
"prefill": true
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
```
|
```
|
||||||
@ -8870,9 +8872,14 @@ Response:
|
|||||||
"emitter": "f01234",
|
"emitter": "f01234",
|
||||||
"reverted": true,
|
"reverted": true,
|
||||||
"height": 10101,
|
"height": 10101,
|
||||||
"tipsetCid": {
|
"tipsetKey": [
|
||||||
|
{
|
||||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve"
|
||||||
|
}
|
||||||
|
],
|
||||||
"msgCid": {
|
"msgCid": {
|
||||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||||
}
|
}
|
||||||
|
@ -330,13 +330,6 @@
|
|||||||
# env var: LOTUS_FEVM_ENABLEETHRPC
|
# env var: LOTUS_FEVM_ENABLEETHRPC
|
||||||
#EnableEthRPC = false
|
#EnableEthRPC = false
|
||||||
|
|
||||||
# EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
|
|
||||||
# This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
|
|
||||||
#
|
|
||||||
# type: bool
|
|
||||||
# env var: LOTUS_FEVM_ENABLEACTOREVENTSAPI
|
|
||||||
#EnableActorEventsAPI = false
|
|
||||||
|
|
||||||
# EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
|
# EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
|
||||||
# Set to 0 to keep all mappings
|
# Set to 0 to keep all mappings
|
||||||
#
|
#
|
||||||
@ -396,6 +389,17 @@
|
|||||||
#DatabasePath = ""
|
#DatabasePath = ""
|
||||||
|
|
||||||
|
|
||||||
|
[ActorEvents]
|
||||||
|
# EnableActorEventsAPI enables the Actor events API that enables clients to consume events
|
||||||
|
# emitted by (smart contracts + built-in Actors).
|
||||||
|
# This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be
|
||||||
|
# disabled by setting their respective Disable* options in Fevm.Events.
|
||||||
|
#
|
||||||
|
# type: bool
|
||||||
|
# env var: LOTUS_ACTOREVENTS_ENABLEACTOREVENTSAPI
|
||||||
|
#EnableActorEventsAPI = false
|
||||||
|
|
||||||
|
|
||||||
[Index]
|
[Index]
|
||||||
# EXPERIMENTAL FEATURE. USE WITH CAUTION
|
# EXPERIMENTAL FEATURE. USE WITH CAUTION
|
||||||
# EnableMsgIndex enables indexing of messages on chain.
|
# EnableMsgIndex enables indexing of messages on chain.
|
||||||
|
@ -148,7 +148,7 @@ type TargetAPI interface {
|
|||||||
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
|
EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error)
|
||||||
|
|
||||||
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||||
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error)
|
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ TargetAPI = *new(api.FullNode) // gateway depends on latest
|
var _ TargetAPI = *new(api.FullNode) // gateway depends on latest
|
||||||
|
@ -444,7 +444,7 @@ func (gw *Node) GetActorEvents(ctx context.Context, filter *types.ActorEventFilt
|
|||||||
return gw.target.GetActorEvents(ctx, filter)
|
return gw.target.GetActorEvents(ctx, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gw *Node) SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (gw *Node) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
|
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -151,25 +151,19 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
|||||||
/* --- Setup subscription channels for ActorEvents --- */
|
/* --- Setup subscription channels for ActorEvents --- */
|
||||||
|
|
||||||
// subscribe only to miner's actor events
|
// subscribe only to miner's actor events
|
||||||
minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
|
minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||||
Filter: types.ActorEventFilter{
|
|
||||||
Addresses: []address.Address{miner.ActorAddr},
|
Addresses: []address.Address{miner.ActorAddr},
|
||||||
},
|
|
||||||
Prefill: true,
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// subscribe only to sector-activated events
|
// subscribe only to sector-activated events
|
||||||
sectorActivatedCbor := stringToEventKey(t, "sector-activated")
|
sectorActivatedCbor := stringToEventKey(t, "sector-activated")
|
||||||
sectorActivatedEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
|
sectorActivatedEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||||
Filter: types.ActorEventFilter{
|
|
||||||
Fields: map[string][]types.ActorEventBlock{
|
Fields: map[string][]types.ActorEventBlock{
|
||||||
"$type": {
|
"$type": {
|
||||||
{Codec: 0x51, Value: sectorActivatedCbor},
|
{Codec: 0x51, Value: sectorActivatedCbor},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
|
||||||
Prefill: true,
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -303,6 +297,13 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
|||||||
head, err := client.ChainHead(ctx)
|
head, err := client.ChainHead(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// subscribe to actor events up until the current head
|
||||||
|
initialEventsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||||
|
FromHeight: epochPtr(0),
|
||||||
|
ToHeight: epochPtr(int64(head.Height())),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
so, err := miner.SectorAddPieceToAny(ctx, pieceSize, bytes.NewReader(pieceData), piece.PieceDealInfo{
|
so, err := miner.SectorAddPieceToAny(ctx, pieceSize, bytes.NewReader(pieceData), piece.PieceDealInfo{
|
||||||
PublishCid: nil,
|
PublishCid: nil,
|
||||||
DealID: 0,
|
DealID: 0,
|
||||||
@ -395,8 +396,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
|||||||
|
|
||||||
// construct ActorEvents from GetActorEvents API
|
// construct ActorEvents from GetActorEvents API
|
||||||
allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
|
allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
|
||||||
FromEpoch: "earliest",
|
FromHeight: epochPtr(0),
|
||||||
ToEpoch: "latest",
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
fmt.Println("Events from GetActorEvents:")
|
fmt.Println("Events from GetActorEvents:")
|
||||||
@ -414,7 +414,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
var allMinerEvts []*types.ActorEvent
|
var allMinerEvts []*types.ActorEvent
|
||||||
for _, evt := range eventsFromMessages {
|
for _, evt := range eventsFromMessages {
|
||||||
if evt.EmitterAddr == miner.ActorAddr {
|
if evt.Emitter == miner.ActorAddr {
|
||||||
allMinerEvts = append(allMinerEvts, evt)
|
allMinerEvts = append(allMinerEvts, evt)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -423,7 +423,7 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
|||||||
|
|
||||||
// construct ActorEvents from subscription channel for just the sector-activated events
|
// construct ActorEvents from subscription channel for just the sector-activated events
|
||||||
var prefillSectorActivatedEvts []*types.ActorEvent
|
var prefillSectorActivatedEvts []*types.ActorEvent
|
||||||
for evt := range sectorActivatedEvtsCh {
|
for evt := range sectorActivatedEvtsChan {
|
||||||
prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, evt)
|
prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, evt)
|
||||||
if len(prefillSectorActivatedEvts) == 2 {
|
if len(prefillSectorActivatedEvts) == 2 {
|
||||||
break
|
break
|
||||||
@ -442,17 +442,25 @@ func TestOnboardRawPieceVerified_WithActorEvents(t *testing.T) {
|
|||||||
// compare events from messages and receipts with events from subscription channel
|
// compare events from messages and receipts with events from subscription channel
|
||||||
require.Equal(t, sectorActivatedEvts, prefillSectorActivatedEvts)
|
require.Equal(t, sectorActivatedEvts, prefillSectorActivatedEvts)
|
||||||
|
|
||||||
|
// check that our `ToHeight` filter works as expected
|
||||||
|
var initialEvents []*types.ActorEvent
|
||||||
|
// TODO: this should probably close the channel when it knows it's done
|
||||||
|
for evt := range initialEventsChan {
|
||||||
|
initialEvents = append(initialEvents, evt)
|
||||||
|
// sector-precommitted, sector-activated, verifier-balance, verifier-balance
|
||||||
|
if len(initialEvents) == 4 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.Equal(t, eventsFromMessages[0:4], initialEvents)
|
||||||
|
|
||||||
// construct ActorEvents from subscription channel for all actor events
|
// construct ActorEvents from subscription channel for all actor events
|
||||||
allEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
|
allEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.ActorEventFilter{
|
||||||
Filter: types.ActorEventFilter{
|
FromHeight: epochPtr(0),
|
||||||
FromEpoch: "earliest",
|
|
||||||
ToEpoch: "latest",
|
|
||||||
},
|
|
||||||
Prefill: true,
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
var prefillEvts []*types.ActorEvent
|
var prefillEvts []*types.ActorEvent
|
||||||
for evt := range allEvtsCh {
|
for evt := range allEvtsChan {
|
||||||
prefillEvts = append(prefillEvts, evt)
|
prefillEvts = append(prefillEvts, evt)
|
||||||
if len(prefillEvts) == len(eventsFromMessages) {
|
if len(prefillEvts) == len(eventsFromMessages) {
|
||||||
break
|
break
|
||||||
@ -524,15 +532,13 @@ func buildActorEventsFromMessages(ctx context.Context, t *testing.T, node v1api.
|
|||||||
// for each event
|
// for each event
|
||||||
addr, err := address.NewIDAddress(uint64(evt.Emitter))
|
addr, err := address.NewIDAddress(uint64(evt.Emitter))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
tsCid, err := ts.Key().Cid()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
actorEvents = append(actorEvents, &types.ActorEvent{
|
actorEvents = append(actorEvents, &types.ActorEvent{
|
||||||
Entries: evt.Entries,
|
Entries: evt.Entries,
|
||||||
EmitterAddr: addr,
|
Emitter: addr,
|
||||||
Reverted: false,
|
Reverted: false,
|
||||||
Height: ts.Height(),
|
Height: ts.Height(),
|
||||||
TipSetCid: tsCid,
|
TipSetKey: ts.Key(),
|
||||||
MsgCid: m.Cid,
|
MsgCid: m.Cid,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -547,7 +553,7 @@ func printEvents(ctx context.Context, t *testing.T, node v1api.FullNode, events
|
|||||||
entryStrings := []string{
|
entryStrings := []string{
|
||||||
fmt.Sprintf("height=%d", event.Height),
|
fmt.Sprintf("height=%d", event.Height),
|
||||||
fmt.Sprintf("msg=%s", event.MsgCid),
|
fmt.Sprintf("msg=%s", event.MsgCid),
|
||||||
fmt.Sprintf("emitter=%s", event.EmitterAddr),
|
fmt.Sprintf("emitter=%s", event.Emitter),
|
||||||
fmt.Sprintf("reverted=%t", event.Reverted),
|
fmt.Sprintf("reverted=%t", event.Reverted),
|
||||||
}
|
}
|
||||||
for _, e := range event.Entries {
|
for _, e := range event.Entries {
|
||||||
@ -848,3 +854,8 @@ func TestOnboardRawPieceSnap(t *testing.T) {
|
|||||||
|
|
||||||
miner.WaitSectorsProving(ctx, toCheck)
|
miner.WaitSectorsProving(ctx, toCheck)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func epochPtr(ei int64) *abi.ChainEpoch {
|
||||||
|
ep := abi.ChainEpoch(ei)
|
||||||
|
return &ep
|
||||||
|
}
|
||||||
|
@ -65,8 +65,8 @@ var DefaultNodeOpts = nodeOpts{
|
|||||||
// test defaults
|
// test defaults
|
||||||
|
|
||||||
cfg.Fevm.EnableEthRPC = true
|
cfg.Fevm.EnableEthRPC = true
|
||||||
cfg.Fevm.EnableActorEventsAPI = true
|
|
||||||
cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64
|
cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64
|
||||||
|
cfg.ActorEvents.EnableActorEventsAPI = true
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -281,10 +281,10 @@ func ConfigFullNode(c interface{}) Option {
|
|||||||
),
|
),
|
||||||
|
|
||||||
ApplyIf(isFullNode,
|
ApplyIf(isFullNode,
|
||||||
If(cfg.Fevm.EnableActorEventsAPI,
|
If(cfg.ActorEvents.EnableActorEventsAPI,
|
||||||
Override(new(full.ActorEventAPI), modules.ActorEventHandler(cfg.Fevm)),
|
Override(new(full.ActorEventAPI), modules.ActorEventHandler(cfg.ActorEvents.EnableActorEventsAPI, cfg.Fevm)),
|
||||||
),
|
),
|
||||||
If(!cfg.Fevm.EnableActorEventsAPI,
|
If(!cfg.ActorEvents.EnableActorEventsAPI,
|
||||||
Override(new(full.ActorEventAPI), &full.ActorEventDummy{}),
|
Override(new(full.ActorEventAPI), &full.ActorEventDummy{}),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
@ -109,7 +109,6 @@ func DefaultFullNode() *FullNode {
|
|||||||
Cluster: *DefaultUserRaftConfig(),
|
Cluster: *DefaultUserRaftConfig(),
|
||||||
Fevm: FevmConfig{
|
Fevm: FevmConfig{
|
||||||
EnableEthRPC: false,
|
EnableEthRPC: false,
|
||||||
EnableActorEventsAPI: false,
|
|
||||||
EthTxHashMappingLifetimeDays: 0,
|
EthTxHashMappingLifetimeDays: 0,
|
||||||
Events: Events{
|
Events: Events{
|
||||||
DisableRealTimeFilterAPI: false,
|
DisableRealTimeFilterAPI: false,
|
||||||
@ -120,6 +119,9 @@ func DefaultFullNode() *FullNode {
|
|||||||
MaxFilterHeightRange: 2880, // conservative limit of one day
|
MaxFilterHeightRange: 2880, // conservative limit of one day
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
ActorEvents: ActorEventsConfig{
|
||||||
|
EnableActorEventsAPI: false,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,17 @@ var Doc = map[string][]DocField{
|
|||||||
Comment: ``,
|
Comment: ``,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"ActorEventsConfig": {
|
||||||
|
{
|
||||||
|
Name: "EnableActorEventsAPI",
|
||||||
|
Type: "bool",
|
||||||
|
|
||||||
|
Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events
|
||||||
|
emitted by (smart contracts + built-in Actors).
|
||||||
|
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be
|
||||||
|
disabled by setting their respective Disable* options in Fevm.Events.`,
|
||||||
|
},
|
||||||
|
},
|
||||||
"ApisConfig": {
|
"ApisConfig": {
|
||||||
{
|
{
|
||||||
Name: "ChainApiInfo",
|
Name: "ChainApiInfo",
|
||||||
@ -452,13 +463,6 @@ rewards. This address should have adequate funds to cover gas fees.`,
|
|||||||
Type: "bool",
|
Type: "bool",
|
||||||
|
|
||||||
Comment: `EnableEthRPC enables eth_ rpc, and enables storing a mapping of eth transaction hashes to filecoin message Cids.
|
Comment: `EnableEthRPC enables eth_ rpc, and enables storing a mapping of eth transaction hashes to filecoin message Cids.
|
||||||
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "EnableActorEventsAPI",
|
|
||||||
Type: "bool",
|
|
||||||
|
|
||||||
Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
|
|
||||||
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`,
|
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -512,6 +516,12 @@ Set to 0 to keep all mappings`,
|
|||||||
|
|
||||||
Comment: ``,
|
Comment: ``,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "ActorEvents",
|
||||||
|
Type: "ActorEventsConfig",
|
||||||
|
|
||||||
|
Comment: ``,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "Index",
|
Name: "Index",
|
||||||
Type: "IndexConfig",
|
Type: "IndexConfig",
|
||||||
|
@ -28,6 +28,7 @@ type FullNode struct {
|
|||||||
Chainstore Chainstore
|
Chainstore Chainstore
|
||||||
Cluster UserRaftConfig
|
Cluster UserRaftConfig
|
||||||
Fevm FevmConfig
|
Fevm FevmConfig
|
||||||
|
ActorEvents ActorEventsConfig
|
||||||
Index IndexConfig
|
Index IndexConfig
|
||||||
FaultReporter FaultReporterConfig
|
FaultReporter FaultReporterConfig
|
||||||
}
|
}
|
||||||
@ -786,10 +787,6 @@ type FevmConfig struct {
|
|||||||
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
|
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
|
||||||
EnableEthRPC bool
|
EnableEthRPC bool
|
||||||
|
|
||||||
// EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
|
|
||||||
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
|
|
||||||
EnableActorEventsAPI bool
|
|
||||||
|
|
||||||
// EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
|
// EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
|
||||||
// Set to 0 to keep all mappings
|
// Set to 0 to keep all mappings
|
||||||
EthTxHashMappingLifetimeDays int
|
EthTxHashMappingLifetimeDays int
|
||||||
@ -833,6 +830,14 @@ type Events struct {
|
|||||||
// Set upper bound on index size
|
// Set upper bound on index size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ActorEventsConfig struct {
|
||||||
|
// EnableActorEventsAPI enables the Actor events API that enables clients to consume events
|
||||||
|
// emitted by (smart contracts + built-in Actors).
|
||||||
|
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be
|
||||||
|
// disabled by setting their respective Disable* options in Fevm.Events.
|
||||||
|
EnableActorEventsAPI bool
|
||||||
|
}
|
||||||
|
|
||||||
type IndexConfig struct {
|
type IndexConfig struct {
|
||||||
// EXPERIMENTAL FEATURE. USE WITH CAUTION
|
// EXPERIMENTAL FEATURE. USE WITH CAUTION
|
||||||
// EnableMsgIndex enables indexing of messages on chain.
|
// EnableMsgIndex enables indexing of messages on chain.
|
||||||
@ -856,6 +861,7 @@ type HarmonyDB struct {
|
|||||||
// The port to find Yugabyte. Blank for default.
|
// The port to find Yugabyte. Blank for default.
|
||||||
Port string
|
Port string
|
||||||
}
|
}
|
||||||
|
|
||||||
type FaultReporterConfig struct {
|
type FaultReporterConfig struct {
|
||||||
// EnableConsensusFaultReporter controls whether the node will monitor and
|
// EnableConsensusFaultReporter controls whether the node will monitor and
|
||||||
// report consensus faults. When enabled, the node will watch for malicious
|
// report consensus faults. When enabled, the node will watch for malicious
|
||||||
|
112
node/impl/full/actor_event_test.go
Normal file
112
node/impl/full/actor_event_test.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
package full
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseHeightRange(t *testing.T) {
|
||||||
|
epochPtr := func(i int) *abi.ChainEpoch {
|
||||||
|
e := abi.ChainEpoch(i)
|
||||||
|
return &e
|
||||||
|
}
|
||||||
|
|
||||||
|
tcs := map[string]struct {
|
||||||
|
heaviest abi.ChainEpoch
|
||||||
|
from *abi.ChainEpoch
|
||||||
|
to *abi.ChainEpoch
|
||||||
|
maxRange abi.ChainEpoch
|
||||||
|
minOut abi.ChainEpoch
|
||||||
|
maxOut abi.ChainEpoch
|
||||||
|
errStr string
|
||||||
|
}{
|
||||||
|
"fails when both are specified and range is greater than max allowed range": {
|
||||||
|
heaviest: 100,
|
||||||
|
from: epochPtr(256),
|
||||||
|
to: epochPtr(512),
|
||||||
|
maxRange: 10,
|
||||||
|
minOut: 0,
|
||||||
|
maxOut: 0,
|
||||||
|
errStr: "too large",
|
||||||
|
},
|
||||||
|
"fails when min is specified and range is greater than max allowed range": {
|
||||||
|
heaviest: 500,
|
||||||
|
from: epochPtr(16),
|
||||||
|
to: nil,
|
||||||
|
maxRange: 10,
|
||||||
|
minOut: 0,
|
||||||
|
maxOut: 0,
|
||||||
|
errStr: "'from' height is too far in the past",
|
||||||
|
},
|
||||||
|
"fails when max is specified and range is greater than max allowed range": {
|
||||||
|
heaviest: 500,
|
||||||
|
from: nil,
|
||||||
|
to: epochPtr(65536),
|
||||||
|
maxRange: 10,
|
||||||
|
minOut: 0,
|
||||||
|
maxOut: 0,
|
||||||
|
errStr: "'to' height is too far in the future",
|
||||||
|
},
|
||||||
|
"fails when from is greater than to": {
|
||||||
|
heaviest: 100,
|
||||||
|
from: epochPtr(512),
|
||||||
|
to: epochPtr(256),
|
||||||
|
maxRange: 10,
|
||||||
|
minOut: 0,
|
||||||
|
maxOut: 0,
|
||||||
|
errStr: "must be after",
|
||||||
|
},
|
||||||
|
"works when range is valid (nil from)": {
|
||||||
|
heaviest: 500,
|
||||||
|
from: nil,
|
||||||
|
to: epochPtr(48),
|
||||||
|
maxRange: 1000,
|
||||||
|
minOut: -1,
|
||||||
|
maxOut: 48,
|
||||||
|
},
|
||||||
|
"works when range is valid (nil to)": {
|
||||||
|
heaviest: 500,
|
||||||
|
from: epochPtr(0),
|
||||||
|
to: nil,
|
||||||
|
maxRange: 1000,
|
||||||
|
minOut: 0,
|
||||||
|
maxOut: -1,
|
||||||
|
},
|
||||||
|
"works when range is valid (nil from and to)": {
|
||||||
|
heaviest: 500,
|
||||||
|
from: nil,
|
||||||
|
to: nil,
|
||||||
|
maxRange: 1000,
|
||||||
|
minOut: -1,
|
||||||
|
maxOut: -1,
|
||||||
|
},
|
||||||
|
"works when range is valid and specified": {
|
||||||
|
heaviest: 500,
|
||||||
|
from: epochPtr(16),
|
||||||
|
to: epochPtr(48),
|
||||||
|
maxRange: 1000,
|
||||||
|
minOut: 16,
|
||||||
|
maxOut: 48,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range tcs {
|
||||||
|
tc2 := tc
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
min, max, err := parseHeightRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange)
|
||||||
|
require.Equal(t, tc2.minOut, min)
|
||||||
|
require.Equal(t, tc2.maxOut, max)
|
||||||
|
if tc2.errStr != "" {
|
||||||
|
fmt.Println(err)
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Contains(t, err.Error(), tc2.errStr)
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -3,7 +3,6 @@ package full
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
@ -18,7 +17,7 @@ import (
|
|||||||
|
|
||||||
type ActorEventAPI interface {
|
type ActorEventAPI interface {
|
||||||
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error)
|
||||||
SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error)
|
SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -39,18 +38,26 @@ type ActorEventsAPI struct {
|
|||||||
ActorEventAPI
|
ActorEventAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActorEventHandler) GetActorEvents(ctx context.Context, filter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
func (a *ActorEventHandler) GetActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) ([]*types.ActorEvent, error) {
|
||||||
if a.EventFilterManager == nil {
|
if a.EventFilterManager == nil {
|
||||||
return nil, api.ErrNotSupported
|
return nil, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
params, err := a.parseFilter(filter)
|
if evtFilter == nil {
|
||||||
|
evtFilter = &types.ActorEventFilter{}
|
||||||
|
}
|
||||||
|
params, err := a.parseFilter(*evtFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a temporary filter
|
// Install a filter just for this call, collect events, remove the filter
|
||||||
f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, filter.Addresses, filter.Fields, false)
|
|
||||||
|
tipSetCid, err := params.GetTipSetCid()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
|
||||||
|
}
|
||||||
|
f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -63,33 +70,34 @@ func (a *ActorEventHandler) GetActorEvents(ctx context.Context, filter *types.Ac
|
|||||||
type filterParams struct {
|
type filterParams struct {
|
||||||
MinHeight abi.ChainEpoch
|
MinHeight abi.ChainEpoch
|
||||||
MaxHeight abi.ChainEpoch
|
MaxHeight abi.ChainEpoch
|
||||||
TipSetCid cid.Cid
|
TipSetKey types.TipSetKey
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActorEventHandler) parseFilter(f *types.ActorEventFilter) (*filterParams, error) {
|
func (fp filterParams) GetTipSetCid() (cid.Cid, error) {
|
||||||
if f.TipSetCid != nil {
|
if fp.TipSetKey.IsEmpty() {
|
||||||
if len(f.FromEpoch) != 0 || len(f.ToEpoch) != 0 {
|
return cid.Undef, nil
|
||||||
return nil, fmt.Errorf("cannot specify both TipSetCid and FromEpoch/ToEpoch")
|
}
|
||||||
|
return fp.TipSetKey.Cid()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *ActorEventHandler) parseFilter(f types.ActorEventFilter) (*filterParams, error) {
|
||||||
|
if f.TipSetKey != nil && !f.TipSetKey.IsEmpty() {
|
||||||
|
if f.FromHeight != nil || f.ToHeight != nil {
|
||||||
|
return nil, fmt.Errorf("cannot specify both TipSetKey and FromHeight/ToHeight")
|
||||||
|
}
|
||||||
|
|
||||||
|
tsk := types.EmptyTSK
|
||||||
|
if f.TipSetKey != nil {
|
||||||
|
tsk = *f.TipSetKey
|
||||||
|
}
|
||||||
return &filterParams{
|
return &filterParams{
|
||||||
MinHeight: 0,
|
MinHeight: 0,
|
||||||
MaxHeight: 0,
|
MaxHeight: 0,
|
||||||
TipSetCid: *f.TipSetCid,
|
TipSetKey: tsk,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
from := f.FromEpoch
|
min, max, err := parseHeightRange(a.Chain.GetHeaviestTipSet().Height(), f.FromHeight, f.ToHeight, a.MaxFilterHeightRange)
|
||||||
if len(from) != 0 && from != "latest" && from != "earliest" && !strings.HasPrefix(from, "0x") {
|
|
||||||
from = "0x" + from
|
|
||||||
}
|
|
||||||
|
|
||||||
to := f.ToEpoch
|
|
||||||
if len(to) != 0 && to != "latest" && to != "earliest" && !strings.HasPrefix(to, "0x") {
|
|
||||||
to = "0x" + to
|
|
||||||
}
|
|
||||||
|
|
||||||
min, max, err := parseBlockRange(a.Chain.GetHeaviestTipSet().Height(), &from, &to, a.MaxFilterHeightRange)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -97,21 +105,69 @@ func (a *ActorEventHandler) parseFilter(f *types.ActorEventFilter) (*filterParam
|
|||||||
return &filterParams{
|
return &filterParams{
|
||||||
MinHeight: min,
|
MinHeight: min,
|
||||||
MaxHeight: max,
|
MaxHeight: max,
|
||||||
TipSetCid: cid.Undef,
|
TipSetKey: types.EmptyTSK,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
// parseHeightRange is similar to eth's parseBlockRange but with slightly different semantics but
|
||||||
|
// results in equivalent values that we can plug in to the EventFilterManager.
|
||||||
|
//
|
||||||
|
// * Uses "height", allowing for nillable values rather than strings
|
||||||
|
// * No "latest" and "earliest", those are now represented by nil on the way in and -1 on the way out
|
||||||
|
// * No option for hex representation
|
||||||
|
func parseHeightRange(heaviest abi.ChainEpoch, fromHeight, toHeight *abi.ChainEpoch, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
|
||||||
|
if fromHeight != nil && *fromHeight < 0 {
|
||||||
|
return 0, 0, fmt.Errorf("range 'from' must be greater than or equal to 0")
|
||||||
|
}
|
||||||
|
if fromHeight == nil {
|
||||||
|
minHeight = -1
|
||||||
|
} else {
|
||||||
|
minHeight = *fromHeight
|
||||||
|
}
|
||||||
|
if toHeight == nil {
|
||||||
|
maxHeight = -1
|
||||||
|
} else {
|
||||||
|
maxHeight = *toHeight
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate height ranges are within limits set by node operator
|
||||||
|
if minHeight == -1 && maxHeight > 0 {
|
||||||
|
// Here the client is looking for events between the head and some future height
|
||||||
|
if maxHeight-heaviest > maxRange {
|
||||||
|
return 0, 0, fmt.Errorf("invalid epoch range: 'to' height is too far in the future (maximum: %d)", maxRange)
|
||||||
|
}
|
||||||
|
} else if minHeight >= 0 && maxHeight == -1 {
|
||||||
|
// Here the client is looking for events between some time in the past and the current head
|
||||||
|
if heaviest-minHeight > maxRange {
|
||||||
|
return 0, 0, fmt.Errorf("invalid epoch range: 'from' height is too far in the past (maximum: %d)", maxRange)
|
||||||
|
}
|
||||||
|
} else if minHeight >= 0 && maxHeight >= 0 {
|
||||||
|
if minHeight > maxHeight {
|
||||||
|
return 0, 0, fmt.Errorf("invalid epoch range: 'to' height (%d) must be after 'from' height (%d)", minHeight, maxHeight)
|
||||||
|
} else if maxHeight-minHeight > maxRange {
|
||||||
|
return 0, 0, fmt.Errorf("invalid epoch range: range between to and 'from' heights is too large (maximum: %d)", maxRange)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return minHeight, maxHeight, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, evtFilter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
if a.EventFilterManager == nil {
|
if a.EventFilterManager == nil {
|
||||||
return nil, api.ErrNotSupported
|
return nil, api.ErrNotSupported
|
||||||
}
|
}
|
||||||
|
if evtFilter == nil {
|
||||||
params, err := a.parseFilter(&f.Filter)
|
evtFilter = &types.ActorEventFilter{}
|
||||||
|
}
|
||||||
|
params, err := a.parseFilter(*evtFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, f.Filter.Addresses, f.Filter.Fields, false)
|
tipSetCid, err := params.GetTipSetCid()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
|
||||||
|
}
|
||||||
|
fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, tipSetCid, evtFilter.Addresses, evtFilter.Fields, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -128,7 +184,6 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.S
|
|||||||
_ = a.EventFilterManager.Remove(ctx, fm.ID())
|
_ = a.EventFilterManager.Remove(ctx, fm.ID())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if f.Prefill {
|
|
||||||
evs, err := getCollected(ctx, fm)
|
evs, err := getCollected(ctx, fm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get collected events: %w", err)
|
log.Errorf("failed to get collected events: %w", err)
|
||||||
@ -142,16 +197,16 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.S
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
|
// TODO: need to fix this, buffer of 25 isn't going to work for prefill without a _really_ fast client or a small number of events
|
||||||
log.Errorf("closing event subscription due to slow reader")
|
log.Errorf("closing event subscription due to slow reader")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
in := make(chan interface{}, 256)
|
in := make(chan interface{}, 256)
|
||||||
fm.SetSubChannel(in)
|
fm.SetSubChannel(in)
|
||||||
|
|
||||||
for {
|
for ctx.Err() == nil {
|
||||||
select {
|
select {
|
||||||
case val, ok := <-in:
|
case val, ok := <-in:
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -164,24 +219,19 @@ func (a *ActorEventHandler) SubscribeActorEvents(ctx context.Context, f *types.S
|
|||||||
log.Errorf("got unexpected value from event filter: %T", val)
|
log.Errorf("got unexpected value from event filter: %T", val)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c, err := ce.TipSetKey.Cid()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to get tipset cid: %w", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ev := &types.ActorEvent{
|
ev := &types.ActorEvent{
|
||||||
Entries: ce.Entries,
|
Entries: ce.Entries,
|
||||||
EmitterAddr: ce.EmitterAddr,
|
Emitter: ce.EmitterAddr,
|
||||||
Reverted: ce.Reverted,
|
Reverted: ce.Reverted,
|
||||||
Height: ce.Height,
|
Height: ce.Height,
|
||||||
TipSetCid: c,
|
TipSetKey: ce.TipSetKey,
|
||||||
MsgCid: ce.MsgCid,
|
MsgCid: ce.MsgCid,
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case out <- ev:
|
case out <- ev:
|
||||||
default:
|
default: // TODO: need to fix this to be more intelligent about the consumption rate vs the accumulation rate
|
||||||
log.Errorf("closing event subscription due to slow reader")
|
log.Errorf("closing event subscription due to slow reader")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -204,22 +254,14 @@ func getCollected(ctx context.Context, f *filter.EventFilter) ([]*types.ActorEve
|
|||||||
var out []*types.ActorEvent
|
var out []*types.ActorEvent
|
||||||
|
|
||||||
for _, e := range ces {
|
for _, e := range ces {
|
||||||
e := e
|
out = append(out, &types.ActorEvent{
|
||||||
c, err := e.TipSetKey.Cid()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to get tipset cid: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ev := &types.ActorEvent{
|
|
||||||
Entries: e.Entries,
|
Entries: e.Entries,
|
||||||
EmitterAddr: e.EmitterAddr,
|
Emitter: e.EmitterAddr,
|
||||||
Reverted: e.Reverted,
|
Reverted: e.Reverted,
|
||||||
Height: e.Height,
|
Height: e.Height,
|
||||||
TipSetCid: c,
|
TipSetKey: e.TipSetKey,
|
||||||
MsgCid: e.MsgCid,
|
MsgCid: e.MsgCid,
|
||||||
}
|
})
|
||||||
|
|
||||||
out = append(out, ev)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
|
@ -198,7 +198,7 @@ func (a *ActorEventDummy) GetActorEvents(ctx context.Context, filter *types.Acto
|
|||||||
return nil, ErrActorEventModuleDisabled
|
return nil, ErrActorEventModuleDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
|
func (a *ActorEventDummy) SubscribeActorEvents(ctx context.Context, filter *types.ActorEventFilter) (<-chan *types.ActorEvent, error) {
|
||||||
return nil, ErrActorEventModuleDisabled
|
return nil, ErrActorEventModuleDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1264,6 +1264,11 @@ func (e *EthEventHandler) EthGetFilterLogs(ctx context.Context, id ethtypes.EthF
|
|||||||
return nil, xerrors.Errorf("wrong filter type")
|
return nil, xerrors.Errorf("wrong filter type")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parseBlockRange is similar to actor event's parseHeightRange but with slightly different semantics
|
||||||
|
//
|
||||||
|
// * "block" instead of "height"
|
||||||
|
// * strings that can have "latest" and "earliest" and nil
|
||||||
|
// * hex strings for actual heights
|
||||||
func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
|
func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
|
||||||
if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 {
|
if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 {
|
||||||
minHeight = heaviest
|
minHeight = heaviest
|
||||||
|
@ -164,14 +164,14 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ActorEventHandler(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) {
|
func ActorEventHandler(enable bool, fevmCfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *filter.EventFilterManager, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI) (*full.ActorEventHandler, error) {
|
||||||
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) {
|
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEventHandler, error) {
|
||||||
ee := &full.ActorEventHandler{
|
ee := &full.ActorEventHandler{
|
||||||
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
|
MaxFilterHeightRange: abi.ChainEpoch(fevmCfg.Events.MaxFilterHeightRange),
|
||||||
Chain: cs,
|
Chain: cs,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cfg.EnableActorEventsAPI || cfg.Events.DisableRealTimeFilterAPI {
|
if !enable || fevmCfg.Events.DisableRealTimeFilterAPI {
|
||||||
// all Actor events functionality is disabled
|
// all Actor events functionality is disabled
|
||||||
return ee, nil
|
return ee, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user