Tests for builtin actor events API

This commit is contained in:
Aarsh Shah 2024-02-07 12:17:46 +04:00 committed by Rod Vagg
parent cef19d5043
commit 01ac45c90a
19 changed files with 519 additions and 247 deletions

View File

@ -417,7 +417,7 @@ func init() {
addExample(&types.ActorEventBlock{
Codec: 0x51,
Value: []byte("data"),
Value: []byte("ddata"),
})
addExample(&types.ActorEventFilter{
@ -426,12 +426,12 @@ func init() {
"abc": {
{
Codec: 0x51,
Value: []byte("data"),
Value: []byte("ddata"),
},
},
},
MinEpoch: 2301220,
MaxEpoch: 2301220,
FromEpoch: "earliest",
ToEpoch: "latest",
})
addExample(&types.SubActorEventFilter{
@ -441,12 +441,12 @@ func init() {
"abc": {
{
Codec: 0x51,
Value: []byte("data"),
Value: []byte("ddata"),
},
},
},
MinEpoch: 2301220,
MaxEpoch: 2301220,
FromEpoch: "earliest",
ToEpoch: "latest",
},
Prefill: true,
})

Binary file not shown.

Binary file not shown.

View File

@ -8,33 +8,46 @@ import (
)
type ActorEventBlock struct {
// what value codec does client want to match on ?
// The value codec to match when filtering event values.
Codec uint64 `json:"codec"`
// data associated with the "event key"
// The value to want to match on associated with the corresponding "event key"
// when filtering events.
// Should be a byte array encoded with the specified codec.
// Assumes base64 encoding when converting to/from JSON strings.
Value []byte `json:"value"`
}
type SubActorEventFilter struct {
Filter ActorEventFilter `json:"filter"`
// If true, all available matching historical events will be written to the response stream
// before any new real-time events that match the given filter are written.
// If `Prefill` is true and `FromEpoch` is set to latest, the pre-fill operation will become a no-op.
// if `Prefill` is false and `FromEpoch` is set to earliest, historical events will still be sent to the client.
Prefill bool `json:"prefill"`
}
type ActorEventFilter struct {
// Matches events from one of these actors, or any actor if empty.
// TODO: Should we also allow Eth addresses here?
// For now, this MUST be a Filecoin address.
Addresses []address.Address `json:"address"`
Addresses []address.Address `json:"addresses"`
// Matches events with the specified key/values, or all events if empty.
// If the `Blocks` slice is empty, matches on the key only.
Fields map[string][]ActorEventBlock `json:"fields"`
// If the value is an empty slice, the filter will match on the key only, accepting any value.
Fields map[string][]ActorEventBlock `json:"fields,omitempty"`
// Epoch based filtering ?
// Start epoch for the filter; -1 means no minimum
MinEpoch abi.ChainEpoch `json:"minEpoch,omitempty"`
// Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first,
// Optional, default: "latest".
FromEpoch string `json:"fromEpoch,omitempty"`
// End epoch for the filter; -1 means no maximum
MaxEpoch abi.ChainEpoch `json:"maxEpoch,omitempty"`
// Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first,
// Optional, default: "latest".
ToEpoch string `json:"toEpoch,omitempty"`
// Restricts events returned to those emitted from messages contained in this tipset.
// If `TipSetCid` is present in the filter criteria, then neither `FromEpoch` nor `ToEpoch` are allowed.
TipSetCid *cid.Cid `json:"tipsetCid,omitempty"`
}
type ActorEvent struct {

View File

@ -0,0 +1,138 @@
package types
import (
"encoding/json"
pseudo "math/rand"
"testing"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
)
func TestActorEventJson(t *testing.T) {
// generate a mock Actor event for me
rng := pseudo.New(pseudo.NewSource(0))
in := ActorEvent{
Entries: []EventEntry{
{
Key: "key1",
Codec: 0x51,
Value: []byte("value1"),
},
{
Key: "key2",
Codec: 0x52,
Value: []byte("value2"),
},
},
EmitterAddr: randomF4Addr(t, rng),
Reverted: false,
Height: 1001,
TipSetKey: randomCid(t, rng),
MsgCid: randomCid(t, rng),
}
bz, err := json.Marshal(in)
require.NoError(t, err)
require.NotEmpty(t, bz)
var out ActorEvent
err = json.Unmarshal(bz, &out)
require.NoError(t, err)
require.Equal(t, in, out)
s := `
{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetCid":{"/":"bafkqacx3dag26sfht3qlcdi"},"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}}
`
var out2 ActorEvent
err = json.Unmarshal([]byte(s), &out2)
require.NoError(t, err)
require.Equal(t, out, out2)
}
func TestActorEventBlockJson(t *testing.T) {
in := ActorEventBlock{
Codec: 1,
Value: []byte("test"),
}
bz, err := json.Marshal(in)
require.NoError(t, err)
require.NotEmpty(t, bz)
var out ActorEventBlock
err = json.Unmarshal(bz, &out)
require.NoError(t, err)
require.Equal(t, in, out)
var out2 ActorEventBlock
s := "{\"codec\":1,\"value\":\"dGVzdA==\"}"
err = json.Unmarshal([]byte(s), &out2)
require.NoError(t, err)
require.Equal(t, in, out2)
}
func TestSubActorEventFilterJson(t *testing.T) {
c := randomCid(t, pseudo.New(pseudo.NewSource(0)))
from := "earliest"
to := "latest"
f := ActorEventFilter{
Addresses: []address.Address{
randomF4Addr(t, pseudo.New(pseudo.NewSource(0))),
randomF4Addr(t, pseudo.New(pseudo.NewSource(0))),
},
Fields: map[string][]ActorEventBlock{
"key1": {
{
Codec: 0x51,
Value: []byte("value1"),
},
},
"key2": {
{
Codec: 0x52,
Value: []byte("value2"),
},
},
},
FromEpoch: from,
ToEpoch: to,
TipSetCid: &c,
}
bz, err := json.Marshal(f)
require.NoError(t, err)
require.NotEmpty(t, bz)
s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromEpoch":"earliest","toEpoch":"latest","tipsetCid":{"/":"bafkqacqbst64f6rp7taeduy"}}`
var out ActorEventFilter
err = json.Unmarshal([]byte(s), &out)
require.NoError(t, err)
require.Equal(t, f, out)
}
func randomF4Addr(tb testing.TB, rng *pseudo.Rand) address.Address {
tb.Helper()
addr, err := address.NewDelegatedAddress(builtintypes.EthereumAddressManagerActorID, randomBytes(32, rng))
require.NoError(tb, err)
return addr
}
func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid {
tb.Helper()
cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY}
c, err := cb.Sum(randomBytes(10, rng))
require.NoError(tb, err)
return c
}
func randomBytes(n int, rng *pseudo.Rand) []byte {
buf := make([]byte, n)
rng.Read(buf)
return buf
}

View File

@ -598,7 +598,7 @@ type EthFilterSpec struct {
Topics EthTopicSpec `json:"topics"`
// Restricts event logs returned to those emitted from messages contained in this tipset.
// If BlockHash is present in in the filter criteria, then neither FromBlock nor ToBlock are allowed.
// If BlockHash is present in the filter criteria, then neither FromBlock nor ToBlock are allowed.
// Added in EIP-234
BlockHash *EthHash `json:"blockHash,omitempty"`
}

View File

@ -28,7 +28,7 @@ type EventEntry struct {
// The event value's codec
Codec uint64
// The event value
// The event value. It is encoded using the codec specified above
Value []byte
}

View File

@ -9,13 +9,11 @@ import (
"strings"
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-varint"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
@ -109,6 +107,7 @@ var backfillEventsCmd = &cli.Command{
addressLookups := make(map[abi.ActorID]address.Address)
// TODO: We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands
resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
@ -118,18 +117,9 @@ var backfillEventsCmd = &cli.Command{
actor, err := api.StateGetActor(ctx, idAddr, ts.Key())
if err != nil || actor.Address == nil {
return address.Undef, false
return idAddr, true
}
// if robust address is not f4 then we won't match against it so bail early
if actor.Address.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return *actor.Address, true
}

View File

@ -3400,19 +3400,19 @@ Inputs:
```json
[
{
"address": [
"addresses": [
"f01234"
],
"fields": {
"abc": [
{
"codec": 81,
"value": "ZGF0YQ=="
"value": "ZGRhdGE="
}
]
},
"minEpoch": 2301220,
"maxEpoch": 2301220
"fromEpoch": "earliest",
"toEpoch": "latest"
}
]
```
@ -8837,19 +8837,19 @@ Inputs:
[
{
"filter": {
"address": [
"addresses": [
"f01234"
],
"fields": {
"abc": [
{
"codec": 81,
"value": "ZGF0YQ=="
"value": "ZGRhdGE="
}
]
},
"minEpoch": 2301220,
"maxEpoch": 2301220
"fromEpoch": "earliest",
"toEpoch": "latest"
},
"prefill": true
}

View File

@ -330,6 +330,9 @@
# env var: LOTUS_FEVM_ENABLEETHRPC
#EnableEthRPC = false
# EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
# This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
#
# type: bool
# env var: LOTUS_FEVM_ENABLEACTOREVENTSAPI
#EnableActorEventsAPI = false
@ -342,9 +345,8 @@
#EthTxHashMappingLifetimeDays = 0
[Fevm.Events]
# EnableEthRPC enables APIs that
# DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted.
# The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.
# The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.
#
# type: bool
# env var: LOTUS_FEVM_EVENTS_DISABLEREALTIMEFILTERAPI
@ -352,7 +354,7 @@
# DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events
# that occurred in the past. HistoricFilterAPI maintains a queryable index of events.
# The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.
# The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.
#
# type: bool
# env var: LOTUS_FEVM_EVENTS_DISABLEHISTORICFILTERAPI

View File

@ -1,94 +0,0 @@
package itests
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestGetActorEvents(t *testing.T) {
t.Skip("skipping for now")
//require := require.New(t)
kit.QuietAllLogsExcept("events", "messagepool")
blockTime := 100 * time.Millisecond
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// Set up the test fixture with a standard list of invocations
contract1, contract2, invocations := prepareEventMatrixInvocations(ctx, t, client)
fmt.Printf("contract1:%s; contract2:%s\n", contract1, contract2)
cf1, err := contract1.ToFilecoinAddress()
if err != nil {
panic(err)
}
cf2, err := contract2.ToFilecoinAddress()
if err != nil {
panic(err)
}
fmt.Printf("contract1 f4 is:%s; contract2 f4 is:%s\n", cf1.String(), cf2.String())
testCases := getCombinationFilterTestCases(contract1, contract2, "0x0")
messages := invokeAndWaitUntilAllOnChain(t, client, invocations)
// f410fiy2dwcbbvc5c6xwwrhlwgi2dby4rzgamxllpgva
for _, tc := range testCases {
tc := tc // appease the lint despot
t.Run(tc.name, func(t *testing.T) {
res, err := client.EthGetLogs(ctx, tc.spec)
require.NoError(t, err)
/*ch, _ := client.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
Prefill: true,
ActorEventFilter: types.ActorEventFilter{
MinEpoch: 0,
MaxEpoch: 1000,
},
})
for i := range ch {
fmt.Println("Hello Chan", i.Entries[0].Key, i.Entries[0].Codec, i.EmitterAddr.String())
}*/
res2, _ := client.GetActorEvents(ctx, &types.ActorEventFilter{
MinEpoch: 0,
MaxEpoch: -1,
Addresses: []address.Address{cf2},
//EthAddresses: []ethtypes.EthAddress{
// contract1,
//},
})
for _, res := range res2 {
res := res
fmt.Println("Emitter Address is", res.EmitterAddr.String())
for _, entry := range res.Entries {
fmt.Println("Hello", entry.Key, entry.Codec, string(entry.Value))
}
}
fmt.Println("Hello", res2[0].Entries[0].Key, res2[0].Entries[0].Codec, res2[0].EmitterAddr.String())
elogs, err := parseEthLogsFromFilterResult(res)
require.NoError(t, err)
AssertEthLogs(t, elogs, tc.expected, messages)
})
}
}

View File

@ -4,8 +4,10 @@ import (
"bytes"
"context"
"crypto/rand"
"encoding/json"
"fmt"
"os"
"sort"
"strings"
"testing"
"time"
@ -149,22 +151,28 @@ func TestOnboardRawPieceVerified(t *testing.T) {
kit.Account(verifiedClientKey, abi.NewTokenAmount(bal.Int64())),
)
evtChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
Filter: types.ActorEventFilter{
MinEpoch: -1,
MaxEpoch: -1,
Addresses: []address.Address{miner.ActorAddr},
},
Prefill: true,
})
require.NoError(t, err)
events := make([]types.ActorEvent, 0)
go func() {
for e := range evtChan {
fmt.Printf("%s Got ActorEvent: %+v", time.Now().Format(time.StampMilli), e)
events = append(events, *e)
}
}()
// only consume and match sector-activated events
sectorActivatedCbor, err := ipld.Encode(basicnode.NewString("sector-activated"), dagcbor.Encode)
require.NoError(t, err)
sectorActivatedEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
Filter: types.ActorEventFilter{
Fields: map[string][]types.ActorEventBlock{
"$type": {
{Codec: 0x51, Value: sectorActivatedCbor},
},
},
},
Prefill: true,
})
require.NoError(t, err)
ens.InterconnectAll().BeginMiningMustPost(blocktime)
@ -319,26 +327,112 @@ func TestOnboardRawPieceVerified(t *testing.T) {
allocations, err = client.StateGetAllocations(ctx, verifiedClientAddr, types.EmptyTSK)
require.NoError(t, err)
require.Len(t, allocations, 0)
eventsFromMessages := buildActorEventsFromMessages(ctx, t, miner.FullNode)
fmt.Println("eventsFromMessages", eventsFromMessages)
writeEventsToFile(ctx, t, miner.FullNode, eventsFromMessages)
evts, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
MinEpoch: -1,
MaxEpoch: -1,
/* --- Tests for the Actor events API --- */
// Match events from Get API and receipts
allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{
FromEpoch: "earliest",
ToEpoch: "latest",
})
require.NoError(t, err)
for _, evt := range evts {
fmt.Printf("Got ActorEvent: %+v", evt)
matchEvents(t, eventsFromMessages, getEventsArray(allEvtsFromGetAPI))
// match Miner Actor events from subscription channel and Miner Actor events obtained from receipts
var subMinerEvts []types.ActorEvent
for evt := range minerEvtsChan {
subMinerEvts = append(subMinerEvts, *evt)
if len(subMinerEvts) == 4 {
break
}
}
var allMinerEvts []types.ActorEvent
for _, evt := range eventsFromMessages {
if evt.EmitterAddr == miner.ActorAddr {
allMinerEvts = append(allMinerEvts, evt)
}
}
matchEvents(t, allMinerEvts, subMinerEvts)
// Match pre-filled events from sector activated channel and events obtained from receipts
var prefillSectorActivatedEvts []types.ActorEvent
for evt := range sectorActivatedEvtsCh {
prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, *evt)
if len(prefillSectorActivatedEvts) == 2 {
break
}
}
require.Len(t, prefillSectorActivatedEvts, 2)
var sectorActivatedEvts []types.ActorEvent
for _, evt := range eventsFromMessages {
for _, entry := range evt.Entries {
if entry.Key == "$type" && bytes.Equal(entry.Value, sectorActivatedCbor) {
sectorActivatedEvts = append(sectorActivatedEvts, evt)
break
}
}
}
matchEvents(t, sectorActivatedEvts, prefillSectorActivatedEvts)
// Match pre-filled events from subscription channel and events obtained from receipts
allEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{
Filter: types.ActorEventFilter{
FromEpoch: "earliest",
ToEpoch: "latest",
},
Prefill: true,
})
require.NoError(t, err)
var prefillEvts []types.ActorEvent
for evt := range allEvtsCh {
prefillEvts = append(prefillEvts, *evt)
if len(prefillEvts) == len(eventsFromMessages) {
break
}
}
matchEvents(t, eventsFromMessages, prefillEvts)
}
eventsFromMessages := buildActorEventsFromMessages(t, ctx, miner.FullNode)
writeEventsToFile(t, ctx, miner.FullNode, eventsFromMessages)
for _, evt := range evts {
fmt.Printf("Got ActorEvent from messages: %+v", evt)
func getEventsArray(ptr []*types.ActorEvent) []types.ActorEvent {
var evts []types.ActorEvent
for _, evt := range ptr {
evts = append(evts, *evt)
}
return evts
}
// TODO: compare GetActorEvents & SubscribeActorEvents & eventsFromMessages for equality
func matchEvents(t *testing.T, exp []types.ActorEvent, actual []types.ActorEvent) {
// height and tipset cid can mismatch because expected events are sourced using APIs that can put in different tipsets
for i := range exp {
exp[i].Height = 0
exp[i].TipSetKey = cid.Undef
}
for i := range actual {
actual[i].Height = 0
actual[i].TipSetKey = cid.Undef
}
func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api.FullNode) []types.ActorEvent {
require.Equal(t, len(exp), len(actual))
// marshal both arrays to json, sort by json, and compare
bz1, err := json.Marshal(exp)
require.NoError(t, err)
sort.Slice(bz1, func(i, j int) bool {
return bz1[i] <= bz1[j]
})
bz2, err := json.Marshal(actual)
require.NoError(t, err)
sort.Slice(bz2, func(i, j int) bool {
return bz2[i] <= bz2[j]
})
fmt.Println("bz1", string(bz1))
fmt.Println("bz2", string(bz2))
require.True(t, bytes.Equal(bz1, bz2))
}
func buildActorEventsFromMessages(ctx context.Context, t *testing.T, node v1api.FullNode) []types.ActorEvent {
actorEvents := make([]types.ActorEvent, 0)
head, err := node.ChainHead(ctx)
@ -374,7 +468,7 @@ func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api.
Entries: evt.Entries,
EmitterAddr: addr,
Reverted: false,
Height: abi.ChainEpoch(height),
Height: ts.Height(),
TipSetKey: tsCid,
MsgCid: m.Cid,
})
@ -386,7 +480,7 @@ func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api.
return actorEvents
}
func writeEventsToFile(t *testing.T, ctx context.Context, node v1api.FullNode, events []types.ActorEvent) {
func writeEventsToFile(ctx context.Context, t *testing.T, node v1api.FullNode, events []types.ActorEvent) {
file, err := os.Create("block.out")
require.NoError(t, err)
defer func() {
@ -417,11 +511,11 @@ func writeEventsToFile(t *testing.T, ctx context.Context, node v1api.FullNode, e
if e.Key == "$type" && bytes.Equal(e.Value, claimKeyCbor) {
isClaim = true
} else if isClaim && e.Key == "id" {
nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil))
nd, err := ipld.DecodeUsingPrototype(e.Value, dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil))
require.NoError(t, err)
claimId = *bindnode.Unwrap(nd).(*int64)
} else if isClaim && e.Key == "provider" {
nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil))
nd, err := ipld.DecodeUsingPrototype(e.Value, dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil))
require.NoError(t, err)
providerId = *bindnode.Unwrap(nd).(*int64)
}

View File

@ -1,6 +1,8 @@
package kit
import (
"math"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
@ -64,6 +66,7 @@ var DefaultNodeOpts = nodeOpts{
cfg.Fevm.EnableEthRPC = true
cfg.Fevm.EnableActorEventsAPI = true
cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64
return nil
},
},

View File

@ -362,9 +362,8 @@ see https://lotus.filecoin.io/storage-providers/advanced-configurations/market/#
Name: "DisableRealTimeFilterAPI",
Type: "bool",
Comment: `EnableEthRPC enables APIs that
DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted.
The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.`,
Comment: `DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted.
The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.`,
},
{
Name: "DisableHistoricFilterAPI",
@ -372,7 +371,7 @@ The API is enabled when EnableEthRPC is true, but can be disabled selectively wi
Comment: `DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events
that occurred in the past. HistoricFilterAPI maintains a queryable index of events.
The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.`,
The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.`,
},
{
Name: "FilterTTL",
@ -459,7 +458,8 @@ This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, bu
Name: "EnableActorEventsAPI",
Type: "bool",
Comment: ``,
Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`,
},
{
Name: "EthTxHashMappingLifetimeDays",

View File

@ -786,6 +786,8 @@ type FevmConfig struct {
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
EnableEthRPC bool
// EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors).
// This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.
EnableActorEventsAPI bool
// EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
@ -796,14 +798,13 @@ type FevmConfig struct {
}
type Events struct {
// EnableEthRPC enables APIs that
// DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted.
// The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.
// The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.
DisableRealTimeFilterAPI bool
// DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events
// that occurred in the past. HistoricFilterAPI maintains a queryable index of events.
// The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.
// The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.
DisableHistoricFilterAPI bool
// FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than

View File

@ -3,6 +3,7 @@ package full
import (
"context"
"fmt"
"strings"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
@ -11,6 +12,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -27,6 +29,7 @@ var (
type ActorEvent struct {
EventFilterManager *filter.EventFilterManager
MaxFilterHeightRange abi.ChainEpoch
Chain *store.ChainStore
}
var _ ActorEventAPI = (*ActorEvent)(nil)
@ -41,8 +44,13 @@ func (a *ActorEvent) GetActorEvents(ctx context.Context, filter *types.ActorEven
return nil, api.ErrNotSupported
}
params, err := a.parseFilter(filter)
if err != nil {
return nil, err
}
// Create a temporary filter
f, err := a.EventFilterManager.Install(ctx, filter.MinEpoch, filter.MaxEpoch, cid.Undef, filter.Addresses, filter.Fields, false)
f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, filter.Addresses, filter.Fields, false)
if err != nil {
return nil, err
}
@ -52,11 +60,58 @@ func (a *ActorEvent) GetActorEvents(ctx context.Context, filter *types.ActorEven
return evs, err
}
type filterParams struct {
MinHeight abi.ChainEpoch
MaxHeight abi.ChainEpoch
TipSetCid cid.Cid
}
func (a *ActorEvent) parseFilter(f *types.ActorEventFilter) (*filterParams, error) {
if f.TipSetCid != nil {
if len(f.FromEpoch) != 0 || len(f.ToEpoch) != 0 {
return nil, fmt.Errorf("cannot specify both TipSetCid and FromEpoch/ToEpoch")
}
return &filterParams{
MinHeight: 0,
MaxHeight: 0,
TipSetCid: *f.TipSetCid,
}, nil
}
from := f.FromEpoch
if len(from) != 0 && from != "latest" && from != "earliest" && !strings.HasPrefix(from, "0x") {
from = "0x" + from
}
to := f.ToEpoch
if len(to) != 0 && to != "latest" && to != "earliest" && !strings.HasPrefix(to, "0x") {
to = "0x" + to
}
min, max, err := parseBlockRange(a.Chain.GetHeaviestTipSet().Height(), &from, &to, a.MaxFilterHeightRange)
if err != nil {
return nil, err
}
return &filterParams{
MinHeight: min,
MaxHeight: max,
TipSetCid: cid.Undef,
}, nil
}
func (a *ActorEvent) SubscribeActorEvents(ctx context.Context, f *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) {
if a.EventFilterManager == nil {
return nil, api.ErrNotSupported
}
fm, err := a.EventFilterManager.Install(ctx, f.Filter.MinEpoch, f.Filter.MaxEpoch, cid.Undef, f.Filter.Addresses, f.Filter.Fields, false)
params, err := a.parseFilter(&f.Filter)
if err != nil {
return nil, err
}
fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, f.Filter.Addresses, f.Filter.Fields, false)
if err != nil {
return nil, err
}

View File

@ -1264,6 +1264,59 @@ func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID
return nil, xerrors.Errorf("wrong filter type")
}
func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 {
minHeight = heaviest
} else if *fromBlock == "earliest" {
minHeight = 0
} else {
if !strings.HasPrefix(*fromBlock, "0x") {
return 0, 0, xerrors.Errorf("FromBlock is not a hex")
}
epoch, err := ethtypes.EthUint64FromHex(*fromBlock)
if err != nil {
return 0, 0, xerrors.Errorf("invalid epoch")
}
minHeight = abi.ChainEpoch(epoch)
}
if toBlock == nil || *toBlock == "latest" || len(*toBlock) == 0 {
// here latest means the latest at the time
maxHeight = -1
} else if *toBlock == "earliest" {
maxHeight = 0
} else {
if !strings.HasPrefix(*toBlock, "0x") {
return 0, 0, xerrors.Errorf("ToBlock is not a hex")
}
epoch, err := ethtypes.EthUint64FromHex(*toBlock)
if err != nil {
return 0, 0, xerrors.Errorf("invalid epoch")
}
maxHeight = abi.ChainEpoch(epoch)
}
// Validate height ranges are within limits set by node operator
if minHeight == -1 && maxHeight > 0 {
// Here the client is looking for events between the head and some future height
if maxHeight-heaviest > maxRange {
return 0, 0, xerrors.Errorf("invalid epoch range: to block is too far in the future (maximum: %d)", maxRange)
}
} else if minHeight >= 0 && maxHeight == -1 {
// Here the client is looking for events between some time in the past and the current head
if heaviest-minHeight > maxRange {
return 0, 0, xerrors.Errorf("invalid epoch range: from block is too far in the past (maximum: %d)", maxRange)
}
} else if minHeight >= 0 && maxHeight >= 0 {
if minHeight > maxHeight {
return 0, 0, xerrors.Errorf("invalid epoch range: to block (%d) must be after from block (%d)", minHeight, maxHeight)
} else if maxHeight-minHeight > maxRange {
return 0, 0, xerrors.Errorf("invalid epoch range: range between to and from blocks is too large (maximum: %d)", maxRange)
}
}
return minHeight, maxHeight, nil
}
func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*filter.EventFilter, error) {
var (
minHeight abi.ChainEpoch
@ -1280,64 +1333,11 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtype
tipsetCid = filterSpec.BlockHash.ToCid()
} else {
if filterSpec.FromBlock == nil || *filterSpec.FromBlock == "latest" {
ts := e.Chain.GetHeaviestTipSet()
minHeight = ts.Height()
} else if *filterSpec.FromBlock == "earliest" {
minHeight = 0
} else if *filterSpec.FromBlock == "pending" {
return nil, api.ErrNotSupported
} else {
if !strings.HasPrefix(*filterSpec.FromBlock, "0x") {
return nil, xerrors.Errorf("FromBlock is not a hex")
}
epoch, err := ethtypes.EthUint64FromHex(*filterSpec.FromBlock)
var err error
minHeight, maxHeight, err = parseBlockRange(e.Chain.GetHeaviestTipSet().Height(), filterSpec.FromBlock, filterSpec.ToBlock, e.MaxFilterHeightRange)
if err != nil {
return nil, xerrors.Errorf("invalid epoch")
return nil, err
}
minHeight = abi.ChainEpoch(epoch)
}
if filterSpec.ToBlock == nil || *filterSpec.ToBlock == "latest" {
// here latest means the latest at the time
maxHeight = -1
} else if *filterSpec.ToBlock == "earliest" {
maxHeight = 0
} else if *filterSpec.ToBlock == "pending" {
return nil, api.ErrNotSupported
} else {
if !strings.HasPrefix(*filterSpec.ToBlock, "0x") {
return nil, xerrors.Errorf("ToBlock is not a hex")
}
epoch, err := ethtypes.EthUint64FromHex(*filterSpec.ToBlock)
if err != nil {
return nil, xerrors.Errorf("invalid epoch")
}
maxHeight = abi.ChainEpoch(epoch)
}
// Validate height ranges are within limits set by node operator
if minHeight == -1 && maxHeight > 0 {
// Here the client is looking for events between the head and some future height
ts := e.Chain.GetHeaviestTipSet()
if maxHeight-ts.Height() > e.MaxFilterHeightRange {
return nil, xerrors.Errorf("invalid epoch range: to block is too far in the future (maximum: %d)", e.MaxFilterHeightRange)
}
} else if minHeight >= 0 && maxHeight == -1 {
// Here the client is looking for events between some time in the past and the current head
ts := e.Chain.GetHeaviestTipSet()
if ts.Height()-minHeight > e.MaxFilterHeightRange {
return nil, xerrors.Errorf("invalid epoch range: from block is too far in the past (maximum: %d)", e.MaxFilterHeightRange)
}
} else if minHeight >= 0 && maxHeight >= 0 {
if minHeight > maxHeight {
return nil, xerrors.Errorf("invalid epoch range: to block (%d) must be after from block (%d)", minHeight, maxHeight)
} else if maxHeight-minHeight > e.MaxFilterHeightRange {
return nil, xerrors.Errorf("invalid epoch range: range between to and from blocks is too large (maximum: %d)", e.MaxFilterHeightRange)
}
}
}
// Convert all addresses to filecoin f4 addresses
@ -1362,7 +1362,7 @@ func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEvent
for k, v := range keys {
for _, vv := range v {
keysWithCodec[k] = append(keysWithCodec[k], types.ActorEventBlock{
Codec: uint64(multicodec.Raw),
Codec: uint64(multicodec.Raw), // FEVM smart contract events are always encoded with the `raw` Codec.
Value: vv,
})
}

View File

@ -3,6 +3,7 @@ package full
import (
"bytes"
"encoding/hex"
"fmt"
"testing"
"github.com/ipfs/go-cid"
@ -10,12 +11,87 @@ import (
"github.com/stretchr/testify/require"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)
func TestParseBlockRange(t *testing.T) {
pstring := func(s string) *string { return &s }
tcs := map[string]struct {
heaviest abi.ChainEpoch
from *string
to *string
maxRange abi.ChainEpoch
minOut abi.ChainEpoch
maxOut abi.ChainEpoch
errStr string
}{
"fails when both are specified and range is greater than max allowed range": {
heaviest: 100,
from: pstring("0x100"),
to: pstring("0x200"),
maxRange: 10,
minOut: 0,
maxOut: 0,
errStr: "too large",
},
"fails when min is specified and range is greater than max allowed range": {
heaviest: 500,
from: pstring("0x10"),
to: pstring("latest"),
maxRange: 10,
minOut: 0,
maxOut: 0,
errStr: "too far in the past",
},
"fails when max is specified and range is greater than max allowed range": {
heaviest: 500,
from: pstring("earliest"),
to: pstring("0x10000"),
maxRange: 10,
minOut: 0,
maxOut: 0,
errStr: "too large",
},
"works when range is valid": {
heaviest: 500,
from: pstring("earliest"),
to: pstring("latest"),
maxRange: 1000,
minOut: 0,
maxOut: -1,
},
"works when range is valid and specified": {
heaviest: 500,
from: pstring("0x10"),
to: pstring("0x30"),
maxRange: 1000,
minOut: 16,
maxOut: 48,
},
}
for name, tc := range tcs {
tc2 := tc
t.Run(name, func(t *testing.T) {
min, max, err := parseBlockRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange)
require.Equal(t, tc2.minOut, min)
require.Equal(t, tc2.maxOut, max)
if tc2.errStr != "" {
fmt.Println(err)
require.Error(t, err)
require.Contains(t, err.Error(), tc2.errStr)
} else {
require.NoError(t, err)
}
})
}
}
func TestEthLogFromEvent(t *testing.T) {
// basic empty
data, topics, ok := ethLogFromEvent(nil)

View File

@ -2,15 +2,14 @@ package modules
import (
"context"
"fmt"
"path/filepath"
"time"
"github.com/multiformats/go-varint"
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/filter"
@ -129,8 +128,9 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc
fm := &filter.EventFilterManager{
ChainStore: cs,
EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true
// TODO:
// We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
@ -138,18 +138,11 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc
actor, err := sm.LoadActor(ctx, idAddr, ts)
if err != nil || actor.Address == nil {
return address.Undef, false
return idAddr, true
}
// if robust address is not f4 then we won't match against it so bail early
if actor.Address.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
// What happens when we introduce events for built-in Actor events here ?
if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
fmt.Println("")
return *actor.Address, true
},
@ -175,9 +168,10 @@ func ActorEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRe
return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEvent, error) {
ee := &full.ActorEvent{
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
Chain: cs,
}
if !cfg.EnableActorEventsAPI {
if !cfg.EnableActorEventsAPI || cfg.Events.DisableRealTimeFilterAPI {
// all Actor events functionality is disabled
return ee, nil
}