diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 76e6db564..4d485c442 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -417,7 +417,7 @@ func init() { addExample(&types.ActorEventBlock{ Codec: 0x51, - Value: []byte("data"), + Value: []byte("ddata"), }) addExample(&types.ActorEventFilter{ @@ -426,12 +426,12 @@ func init() { "abc": { { Codec: 0x51, - Value: []byte("data"), + Value: []byte("ddata"), }, }, }, - MinEpoch: 2301220, - MaxEpoch: 2301220, + FromEpoch: "earliest", + ToEpoch: "latest", }) addExample(&types.SubActorEventFilter{ @@ -441,12 +441,12 @@ func init() { "abc": { { Codec: 0x51, - Value: []byte("data"), + Value: []byte("ddata"), }, }, }, - MinEpoch: 2301220, - MaxEpoch: 2301220, + FromEpoch: "earliest", + ToEpoch: "latest", }, Prefill: true, }) diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 663643bf7..176f299f2 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index f4e54a73b..db0e52442 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/chain/types/actor_event.go b/chain/types/actor_event.go index 5baca6caa..7aa268faf 100644 --- a/chain/types/actor_event.go +++ b/chain/types/actor_event.go @@ -8,33 +8,46 @@ import ( ) type ActorEventBlock struct { - // what value codec does client want to match on ? + // The value codec to match when filtering event values. Codec uint64 `json:"codec"` - // data associated with the "event key" + + // The value to want to match on associated with the corresponding "event key" + // when filtering events. + // Should be a byte array encoded with the specified codec. + // Assumes base64 encoding when converting to/from JSON strings. Value []byte `json:"value"` } type SubActorEventFilter struct { - Filter ActorEventFilter `json:"filter"` - Prefill bool `json:"prefill"` + Filter ActorEventFilter `json:"filter"` + + // If true, all available matching historical events will be written to the response stream + // before any new real-time events that match the given filter are written. + // If `Prefill` is true and `FromEpoch` is set to latest, the pre-fill operation will become a no-op. + // if `Prefill` is false and `FromEpoch` is set to earliest, historical events will still be sent to the client. + Prefill bool `json:"prefill"` } type ActorEventFilter struct { // Matches events from one of these actors, or any actor if empty. - // TODO: Should we also allow Eth addresses here? // For now, this MUST be a Filecoin address. - Addresses []address.Address `json:"address"` + Addresses []address.Address `json:"addresses"` // Matches events with the specified key/values, or all events if empty. - // If the `Blocks` slice is empty, matches on the key only. - Fields map[string][]ActorEventBlock `json:"fields"` + // If the value is an empty slice, the filter will match on the key only, accepting any value. + Fields map[string][]ActorEventBlock `json:"fields,omitempty"` - // Epoch based filtering ? - // Start epoch for the filter; -1 means no minimum - MinEpoch abi.ChainEpoch `json:"minEpoch,omitempty"` + // Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first, + // Optional, default: "latest". + FromEpoch string `json:"fromEpoch,omitempty"` - // End epoch for the filter; -1 means no maximum - MaxEpoch abi.ChainEpoch `json:"maxEpoch,omitempty"` + // Interpreted as an epoch (in hex) or one of "latest" for last mined block, "earliest" for first, + // Optional, default: "latest". + ToEpoch string `json:"toEpoch,omitempty"` + + // Restricts events returned to those emitted from messages contained in this tipset. + // If `TipSetCid` is present in the filter criteria, then neither `FromEpoch` nor `ToEpoch` are allowed. + TipSetCid *cid.Cid `json:"tipsetCid,omitempty"` } type ActorEvent struct { diff --git a/chain/types/actor_event_test.go b/chain/types/actor_event_test.go new file mode 100644 index 000000000..aae4865d3 --- /dev/null +++ b/chain/types/actor_event_test.go @@ -0,0 +1,138 @@ +package types + +import ( + "encoding/json" + pseudo "math/rand" + "testing" + + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + builtintypes "github.com/filecoin-project/go-state-types/builtin" +) + +func TestActorEventJson(t *testing.T) { + // generate a mock Actor event for me + rng := pseudo.New(pseudo.NewSource(0)) + in := ActorEvent{ + Entries: []EventEntry{ + { + Key: "key1", + Codec: 0x51, + Value: []byte("value1"), + }, + { + Key: "key2", + Codec: 0x52, + Value: []byte("value2"), + }, + }, + EmitterAddr: randomF4Addr(t, rng), + Reverted: false, + Height: 1001, + TipSetKey: randomCid(t, rng), + MsgCid: randomCid(t, rng), + } + + bz, err := json.Marshal(in) + require.NoError(t, err) + require.NotEmpty(t, bz) + + var out ActorEvent + err = json.Unmarshal(bz, &out) + require.NoError(t, err) + require.Equal(t, in, out) + + s := ` +{"entries":[{"Flags":0,"Key":"key1","Codec":81,"Value":"dmFsdWUx"},{"Flags":0,"Key":"key2","Codec":82,"Value":"dmFsdWUy"}],"emitter":"f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","reverted":false,"height":1001,"tipsetCid":{"/":"bafkqacx3dag26sfht3qlcdi"},"msgCid":{"/":"bafkqacrziziykd6uuf4islq"}} +` + var out2 ActorEvent + err = json.Unmarshal([]byte(s), &out2) + require.NoError(t, err) + require.Equal(t, out, out2) +} + +func TestActorEventBlockJson(t *testing.T) { + in := ActorEventBlock{ + Codec: 1, + Value: []byte("test"), + } + + bz, err := json.Marshal(in) + require.NoError(t, err) + require.NotEmpty(t, bz) + + var out ActorEventBlock + err = json.Unmarshal(bz, &out) + require.NoError(t, err) + require.Equal(t, in, out) + + var out2 ActorEventBlock + s := "{\"codec\":1,\"value\":\"dGVzdA==\"}" + err = json.Unmarshal([]byte(s), &out2) + require.NoError(t, err) + require.Equal(t, in, out2) +} + +func TestSubActorEventFilterJson(t *testing.T) { + c := randomCid(t, pseudo.New(pseudo.NewSource(0))) + from := "earliest" + to := "latest" + f := ActorEventFilter{ + Addresses: []address.Address{ + randomF4Addr(t, pseudo.New(pseudo.NewSource(0))), + randomF4Addr(t, pseudo.New(pseudo.NewSource(0))), + }, + Fields: map[string][]ActorEventBlock{ + "key1": { + { + Codec: 0x51, + Value: []byte("value1"), + }, + }, + "key2": { + { + Codec: 0x52, + Value: []byte("value2"), + }, + }, + }, + FromEpoch: from, + ToEpoch: to, + TipSetCid: &c, + } + + bz, err := json.Marshal(f) + require.NoError(t, err) + require.NotEmpty(t, bz) + + s := `{"addresses":["f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua","f410fagkp3qx2f76maqot74jaiw3tzbxe76k76zrkl3xifk67isrnbn2sll3yua"],"fields":{"key1":[{"codec":81,"value":"dmFsdWUx"}],"key2":[{"codec":82,"value":"dmFsdWUy"}]},"fromEpoch":"earliest","toEpoch":"latest","tipsetCid":{"/":"bafkqacqbst64f6rp7taeduy"}}` + var out ActorEventFilter + err = json.Unmarshal([]byte(s), &out) + require.NoError(t, err) + require.Equal(t, f, out) +} + +func randomF4Addr(tb testing.TB, rng *pseudo.Rand) address.Address { + tb.Helper() + addr, err := address.NewDelegatedAddress(builtintypes.EthereumAddressManagerActorID, randomBytes(32, rng)) + require.NoError(tb, err) + + return addr +} + +func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid { + tb.Helper() + cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} + c, err := cb.Sum(randomBytes(10, rng)) + require.NoError(tb, err) + return c +} + +func randomBytes(n int, rng *pseudo.Rand) []byte { + buf := make([]byte, n) + rng.Read(buf) + return buf +} diff --git a/chain/types/ethtypes/eth_types.go b/chain/types/ethtypes/eth_types.go index a419874bd..b658b8856 100644 --- a/chain/types/ethtypes/eth_types.go +++ b/chain/types/ethtypes/eth_types.go @@ -598,7 +598,7 @@ type EthFilterSpec struct { Topics EthTopicSpec `json:"topics"` // Restricts event logs returned to those emitted from messages contained in this tipset. - // If BlockHash is present in in the filter criteria, then neither FromBlock nor ToBlock are allowed. + // If BlockHash is present in the filter criteria, then neither FromBlock nor ToBlock are allowed. // Added in EIP-234 BlockHash *EthHash `json:"blockHash,omitempty"` } diff --git a/chain/types/event.go b/chain/types/event.go index 106a120e2..5f6415d49 100644 --- a/chain/types/event.go +++ b/chain/types/event.go @@ -28,7 +28,7 @@ type EventEntry struct { // The event value's codec Codec uint64 - // The event value + // The event value. It is encoded using the codec specified above Value []byte } diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index be7d43e05..620933e25 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -9,13 +9,11 @@ import ( "strings" "github.com/mitchellh/go-homedir" - "github.com/multiformats/go-varint" "github.com/urfave/cli/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" @@ -109,6 +107,7 @@ var backfillEventsCmd = &cli.Command{ addressLookups := make(map[abi.ActorID]address.Address) + // TODO: We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { // we only want to match using f4 addresses idAddr, err := address.NewIDAddress(uint64(emitter)) @@ -118,18 +117,9 @@ var backfillEventsCmd = &cli.Command{ actor, err := api.StateGetActor(ctx, idAddr, ts.Key()) if err != nil || actor.Address == nil { - return address.Undef, false + return idAddr, true } - // if robust address is not f4 then we won't match against it so bail early - if actor.Address.Protocol() != address.Delegated { - return address.Undef, false - } - - // we have an f4 address, make sure it's assigned by the EAM - if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { - return address.Undef, false - } return *actor.Address, true } diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 90879a69a..7c2e36ed7 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -3400,19 +3400,19 @@ Inputs: ```json [ { - "address": [ + "addresses": [ "f01234" ], "fields": { "abc": [ { "codec": 81, - "value": "ZGF0YQ==" + "value": "ZGRhdGE=" } ] }, - "minEpoch": 2301220, - "maxEpoch": 2301220 + "fromEpoch": "earliest", + "toEpoch": "latest" } ] ``` @@ -8837,19 +8837,19 @@ Inputs: [ { "filter": { - "address": [ + "addresses": [ "f01234" ], "fields": { "abc": [ { "codec": 81, - "value": "ZGF0YQ==" + "value": "ZGRhdGE=" } ] }, - "minEpoch": 2301220, - "maxEpoch": 2301220 + "fromEpoch": "earliest", + "toEpoch": "latest" }, "prefill": true } diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index c97ce0fe1..a403a580e 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -330,6 +330,9 @@ # env var: LOTUS_FEVM_ENABLEETHRPC #EnableEthRPC = false + # EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors). + # This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. + # # type: bool # env var: LOTUS_FEVM_ENABLEACTOREVENTSAPI #EnableActorEventsAPI = false @@ -342,9 +345,8 @@ #EthTxHashMappingLifetimeDays = 0 [Fevm.Events] - # EnableEthRPC enables APIs that # DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted. - # The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + # The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag. # # type: bool # env var: LOTUS_FEVM_EVENTS_DISABLEREALTIMEFILTERAPI @@ -352,7 +354,7 @@ # DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events # that occurred in the past. HistoricFilterAPI maintains a queryable index of events. - # The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + # The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag. # # type: bool # env var: LOTUS_FEVM_EVENTS_DISABLEHISTORICFILTERAPI diff --git a/itests/actor_events_filter_test.go b/itests/actor_events_filter_test.go deleted file mode 100644 index 9a3d4f8f3..000000000 --- a/itests/actor_events_filter_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package itests - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-address" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/itests/kit" -) - -func TestGetActorEvents(t *testing.T) { - t.Skip("skipping for now") - //require := require.New(t) - kit.QuietAllLogsExcept("events", "messagepool") - - blockTime := 100 * time.Millisecond - - client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC()) - ens.InterconnectAll().BeginMining(blockTime) - - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - // Set up the test fixture with a standard list of invocations - contract1, contract2, invocations := prepareEventMatrixInvocations(ctx, t, client) - fmt.Printf("contract1:%s; contract2:%s\n", contract1, contract2) - - cf1, err := contract1.ToFilecoinAddress() - if err != nil { - panic(err) - } - - cf2, err := contract2.ToFilecoinAddress() - if err != nil { - panic(err) - } - - fmt.Printf("contract1 f4 is:%s; contract2 f4 is:%s\n", cf1.String(), cf2.String()) - - testCases := getCombinationFilterTestCases(contract1, contract2, "0x0") - - messages := invokeAndWaitUntilAllOnChain(t, client, invocations) - - // f410fiy2dwcbbvc5c6xwwrhlwgi2dby4rzgamxllpgva - - for _, tc := range testCases { - tc := tc // appease the lint despot - t.Run(tc.name, func(t *testing.T) { - - res, err := client.EthGetLogs(ctx, tc.spec) - require.NoError(t, err) - - /*ch, _ := client.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ - Prefill: true, - ActorEventFilter: types.ActorEventFilter{ - MinEpoch: 0, - MaxEpoch: 1000, - }, - }) - - for i := range ch { - fmt.Println("Hello Chan", i.Entries[0].Key, i.Entries[0].Codec, i.EmitterAddr.String()) - }*/ - - res2, _ := client.GetActorEvents(ctx, &types.ActorEventFilter{ - MinEpoch: 0, - MaxEpoch: -1, - Addresses: []address.Address{cf2}, - //EthAddresses: []ethtypes.EthAddress{ - // contract1, - //}, - }) - for _, res := range res2 { - res := res - fmt.Println("Emitter Address is", res.EmitterAddr.String()) - for _, entry := range res.Entries { - fmt.Println("Hello", entry.Key, entry.Codec, string(entry.Value)) - } - - } - fmt.Println("Hello", res2[0].Entries[0].Key, res2[0].Entries[0].Codec, res2[0].EmitterAddr.String()) - - elogs, err := parseEthLogsFromFilterResult(res) - require.NoError(t, err) - AssertEthLogs(t, elogs, tc.expected, messages) - }) - } -} diff --git a/itests/direct_data_onboard_test.go b/itests/direct_data_onboard_test.go index 8377caecd..f769bdfb4 100644 --- a/itests/direct_data_onboard_test.go +++ b/itests/direct_data_onboard_test.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "crypto/rand" + "encoding/json" "fmt" "os" + "sort" "strings" "testing" "time" @@ -149,22 +151,28 @@ func TestOnboardRawPieceVerified(t *testing.T) { kit.Account(verifiedClientKey, abi.NewTokenAmount(bal.Int64())), ) - evtChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ + minerEvtsChan, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ Filter: types.ActorEventFilter{ - MinEpoch: -1, - MaxEpoch: -1, + Addresses: []address.Address{miner.ActorAddr}, }, Prefill: true, }) require.NoError(t, err) - events := make([]types.ActorEvent, 0) - go func() { - for e := range evtChan { - fmt.Printf("%s Got ActorEvent: %+v", time.Now().Format(time.StampMilli), e) - events = append(events, *e) - } - }() + // only consume and match sector-activated events + sectorActivatedCbor, err := ipld.Encode(basicnode.NewString("sector-activated"), dagcbor.Encode) + require.NoError(t, err) + sectorActivatedEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ + Filter: types.ActorEventFilter{ + Fields: map[string][]types.ActorEventBlock{ + "$type": { + {Codec: 0x51, Value: sectorActivatedCbor}, + }, + }, + }, + Prefill: true, + }) + require.NoError(t, err) ens.InterconnectAll().BeginMiningMustPost(blocktime) @@ -319,26 +327,112 @@ func TestOnboardRawPieceVerified(t *testing.T) { allocations, err = client.StateGetAllocations(ctx, verifiedClientAddr, types.EmptyTSK) require.NoError(t, err) require.Len(t, allocations, 0) + eventsFromMessages := buildActorEventsFromMessages(ctx, t, miner.FullNode) + fmt.Println("eventsFromMessages", eventsFromMessages) + writeEventsToFile(ctx, t, miner.FullNode, eventsFromMessages) - evts, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ - MinEpoch: -1, - MaxEpoch: -1, + /* --- Tests for the Actor events API --- */ + // Match events from Get API and receipts + allEvtsFromGetAPI, err := miner.FullNode.GetActorEvents(ctx, &types.ActorEventFilter{ + FromEpoch: "earliest", + ToEpoch: "latest", }) require.NoError(t, err) - for _, evt := range evts { - fmt.Printf("Got ActorEvent: %+v", evt) - } + matchEvents(t, eventsFromMessages, getEventsArray(allEvtsFromGetAPI)) - eventsFromMessages := buildActorEventsFromMessages(t, ctx, miner.FullNode) - writeEventsToFile(t, ctx, miner.FullNode, eventsFromMessages) - for _, evt := range evts { - fmt.Printf("Got ActorEvent from messages: %+v", evt) + // match Miner Actor events from subscription channel and Miner Actor events obtained from receipts + var subMinerEvts []types.ActorEvent + for evt := range minerEvtsChan { + subMinerEvts = append(subMinerEvts, *evt) + if len(subMinerEvts) == 4 { + break + } } + var allMinerEvts []types.ActorEvent + for _, evt := range eventsFromMessages { + if evt.EmitterAddr == miner.ActorAddr { + allMinerEvts = append(allMinerEvts, evt) + } + } + matchEvents(t, allMinerEvts, subMinerEvts) - // TODO: compare GetActorEvents & SubscribeActorEvents & eventsFromMessages for equality + // Match pre-filled events from sector activated channel and events obtained from receipts + var prefillSectorActivatedEvts []types.ActorEvent + for evt := range sectorActivatedEvtsCh { + prefillSectorActivatedEvts = append(prefillSectorActivatedEvts, *evt) + if len(prefillSectorActivatedEvts) == 2 { + break + } + } + require.Len(t, prefillSectorActivatedEvts, 2) + var sectorActivatedEvts []types.ActorEvent + for _, evt := range eventsFromMessages { + for _, entry := range evt.Entries { + if entry.Key == "$type" && bytes.Equal(entry.Value, sectorActivatedCbor) { + sectorActivatedEvts = append(sectorActivatedEvts, evt) + break + } + } + } + matchEvents(t, sectorActivatedEvts, prefillSectorActivatedEvts) + + // Match pre-filled events from subscription channel and events obtained from receipts + allEvtsCh, err := miner.FullNode.SubscribeActorEvents(ctx, &types.SubActorEventFilter{ + Filter: types.ActorEventFilter{ + FromEpoch: "earliest", + ToEpoch: "latest", + }, + Prefill: true, + }) + require.NoError(t, err) + var prefillEvts []types.ActorEvent + for evt := range allEvtsCh { + prefillEvts = append(prefillEvts, *evt) + if len(prefillEvts) == len(eventsFromMessages) { + break + } + } + matchEvents(t, eventsFromMessages, prefillEvts) } -func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api.FullNode) []types.ActorEvent { +func getEventsArray(ptr []*types.ActorEvent) []types.ActorEvent { + var evts []types.ActorEvent + for _, evt := range ptr { + evts = append(evts, *evt) + } + return evts +} + +func matchEvents(t *testing.T, exp []types.ActorEvent, actual []types.ActorEvent) { + // height and tipset cid can mismatch because expected events are sourced using APIs that can put in different tipsets + for i := range exp { + exp[i].Height = 0 + exp[i].TipSetKey = cid.Undef + } + for i := range actual { + actual[i].Height = 0 + actual[i].TipSetKey = cid.Undef + } + + require.Equal(t, len(exp), len(actual)) + // marshal both arrays to json, sort by json, and compare + bz1, err := json.Marshal(exp) + require.NoError(t, err) + sort.Slice(bz1, func(i, j int) bool { + return bz1[i] <= bz1[j] + }) + + bz2, err := json.Marshal(actual) + require.NoError(t, err) + sort.Slice(bz2, func(i, j int) bool { + return bz2[i] <= bz2[j] + }) + fmt.Println("bz1", string(bz1)) + fmt.Println("bz2", string(bz2)) + require.True(t, bytes.Equal(bz1, bz2)) +} + +func buildActorEventsFromMessages(ctx context.Context, t *testing.T, node v1api.FullNode) []types.ActorEvent { actorEvents := make([]types.ActorEvent, 0) head, err := node.ChainHead(ctx) @@ -374,7 +468,7 @@ func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api. Entries: evt.Entries, EmitterAddr: addr, Reverted: false, - Height: abi.ChainEpoch(height), + Height: ts.Height(), TipSetKey: tsCid, MsgCid: m.Cid, }) @@ -386,7 +480,7 @@ func buildActorEventsFromMessages(t *testing.T, ctx context.Context, node v1api. return actorEvents } -func writeEventsToFile(t *testing.T, ctx context.Context, node v1api.FullNode, events []types.ActorEvent) { +func writeEventsToFile(ctx context.Context, t *testing.T, node v1api.FullNode, events []types.ActorEvent) { file, err := os.Create("block.out") require.NoError(t, err) defer func() { @@ -417,11 +511,11 @@ func writeEventsToFile(t *testing.T, ctx context.Context, node v1api.FullNode, e if e.Key == "$type" && bytes.Equal(e.Value, claimKeyCbor) { isClaim = true } else if isClaim && e.Key == "id" { - nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) + nd, err := ipld.DecodeUsingPrototype(e.Value, dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) require.NoError(t, err) claimId = *bindnode.Unwrap(nd).(*int64) } else if isClaim && e.Key == "provider" { - nd, err := ipld.DecodeUsingPrototype([]byte(e.Value), dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) + nd, err := ipld.DecodeUsingPrototype(e.Value, dagcbor.Decode, bindnode.Prototype((*int64)(nil), nil)) require.NoError(t, err) providerId = *bindnode.Unwrap(nd).(*int64) } diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 51db8d8b2..025334203 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -1,6 +1,8 @@ package kit import ( + "math" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -64,6 +66,7 @@ var DefaultNodeOpts = nodeOpts{ cfg.Fevm.EnableEthRPC = true cfg.Fevm.EnableActorEventsAPI = true + cfg.Fevm.Events.MaxFilterHeightRange = math.MaxInt64 return nil }, }, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index a1ee2634f..1e8252ca9 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -362,9 +362,8 @@ see https://lotus.filecoin.io/storage-providers/advanced-configurations/market/# Name: "DisableRealTimeFilterAPI", Type: "bool", - Comment: `EnableEthRPC enables APIs that -DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted. -The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.`, + Comment: `DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted. +The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.`, }, { Name: "DisableHistoricFilterAPI", @@ -372,7 +371,7 @@ The API is enabled when EnableEthRPC is true, but can be disabled selectively wi Comment: `DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events that occurred in the past. HistoricFilterAPI maintains a queryable index of events. -The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag.`, +The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag.`, }, { Name: "FilterTTL", @@ -459,7 +458,8 @@ This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, bu Name: "EnableActorEventsAPI", Type: "bool", - Comment: ``, + Comment: `EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors). +This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above.`, }, { Name: "EthTxHashMappingLifetimeDays", diff --git a/node/config/types.go b/node/config/types.go index 57e06c093..6fb7389ff 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -786,6 +786,8 @@ type FevmConfig struct { // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. EnableEthRPC bool + // EnableActorEventsAPI enables the Actor events API that enables clients to consume events emitted by (smart contracts + built-in Actors). + // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. EnableActorEventsAPI bool // EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days @@ -796,14 +798,13 @@ type FevmConfig struct { } type Events struct { - // EnableEthRPC enables APIs that // DisableRealTimeFilterAPI will disable the RealTimeFilterAPI that can create and query filters for actor events as they are emitted. - // The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + // The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag. DisableRealTimeFilterAPI bool // DisableHistoricFilterAPI will disable the HistoricFilterAPI that can create and query filters for actor events // that occurred in the past. HistoricFilterAPI maintains a queryable index of events. - // The API is enabled when EnableEthRPC is true, but can be disabled selectively with this flag. + // The API is enabled when EnableEthRPC or EnableActorEventsAPI is true, but can be disabled selectively with this flag. DisableHistoricFilterAPI bool // FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than diff --git a/node/impl/full/actor_event.go b/node/impl/full/actor_event.go index a1b038dc6..ce555d6d2 100644 --- a/node/impl/full/actor_event.go +++ b/node/impl/full/actor_event.go @@ -3,6 +3,7 @@ package full import ( "context" "fmt" + "strings" "github.com/ipfs/go-cid" "go.uber.org/fx" @@ -11,6 +12,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/events/filter" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -27,6 +29,7 @@ var ( type ActorEvent struct { EventFilterManager *filter.EventFilterManager MaxFilterHeightRange abi.ChainEpoch + Chain *store.ChainStore } var _ ActorEventAPI = (*ActorEvent)(nil) @@ -41,8 +44,13 @@ func (a *ActorEvent) GetActorEvents(ctx context.Context, filter *types.ActorEven return nil, api.ErrNotSupported } + params, err := a.parseFilter(filter) + if err != nil { + return nil, err + } + // Create a temporary filter - f, err := a.EventFilterManager.Install(ctx, filter.MinEpoch, filter.MaxEpoch, cid.Undef, filter.Addresses, filter.Fields, false) + f, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, filter.Addresses, filter.Fields, false) if err != nil { return nil, err } @@ -52,11 +60,58 @@ func (a *ActorEvent) GetActorEvents(ctx context.Context, filter *types.ActorEven return evs, err } +type filterParams struct { + MinHeight abi.ChainEpoch + MaxHeight abi.ChainEpoch + TipSetCid cid.Cid +} + +func (a *ActorEvent) parseFilter(f *types.ActorEventFilter) (*filterParams, error) { + if f.TipSetCid != nil { + if len(f.FromEpoch) != 0 || len(f.ToEpoch) != 0 { + return nil, fmt.Errorf("cannot specify both TipSetCid and FromEpoch/ToEpoch") + } + + return &filterParams{ + MinHeight: 0, + MaxHeight: 0, + TipSetCid: *f.TipSetCid, + }, nil + } + + from := f.FromEpoch + if len(from) != 0 && from != "latest" && from != "earliest" && !strings.HasPrefix(from, "0x") { + from = "0x" + from + } + + to := f.ToEpoch + if len(to) != 0 && to != "latest" && to != "earliest" && !strings.HasPrefix(to, "0x") { + to = "0x" + to + } + + min, max, err := parseBlockRange(a.Chain.GetHeaviestTipSet().Height(), &from, &to, a.MaxFilterHeightRange) + if err != nil { + return nil, err + } + + return &filterParams{ + MinHeight: min, + MaxHeight: max, + TipSetCid: cid.Undef, + }, nil +} + func (a *ActorEvent) SubscribeActorEvents(ctx context.Context, f *types.SubActorEventFilter) (<-chan *types.ActorEvent, error) { if a.EventFilterManager == nil { return nil, api.ErrNotSupported } - fm, err := a.EventFilterManager.Install(ctx, f.Filter.MinEpoch, f.Filter.MaxEpoch, cid.Undef, f.Filter.Addresses, f.Filter.Fields, false) + + params, err := a.parseFilter(&f.Filter) + if err != nil { + return nil, err + } + + fm, err := a.EventFilterManager.Install(ctx, params.MinHeight, params.MaxHeight, params.TipSetCid, f.Filter.Addresses, f.Filter.Fields, false) if err != nil { return nil, err } diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 6ab0a3d4e..f45b35a5a 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1264,6 +1264,59 @@ func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID return nil, xerrors.Errorf("wrong filter type") } +func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) { + if fromBlock == nil || *fromBlock == "latest" || len(*fromBlock) == 0 { + minHeight = heaviest + } else if *fromBlock == "earliest" { + minHeight = 0 + } else { + if !strings.HasPrefix(*fromBlock, "0x") { + return 0, 0, xerrors.Errorf("FromBlock is not a hex") + } + epoch, err := ethtypes.EthUint64FromHex(*fromBlock) + if err != nil { + return 0, 0, xerrors.Errorf("invalid epoch") + } + minHeight = abi.ChainEpoch(epoch) + } + + if toBlock == nil || *toBlock == "latest" || len(*toBlock) == 0 { + // here latest means the latest at the time + maxHeight = -1 + } else if *toBlock == "earliest" { + maxHeight = 0 + } else { + if !strings.HasPrefix(*toBlock, "0x") { + return 0, 0, xerrors.Errorf("ToBlock is not a hex") + } + epoch, err := ethtypes.EthUint64FromHex(*toBlock) + if err != nil { + return 0, 0, xerrors.Errorf("invalid epoch") + } + maxHeight = abi.ChainEpoch(epoch) + } + + // Validate height ranges are within limits set by node operator + if minHeight == -1 && maxHeight > 0 { + // Here the client is looking for events between the head and some future height + if maxHeight-heaviest > maxRange { + return 0, 0, xerrors.Errorf("invalid epoch range: to block is too far in the future (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight == -1 { + // Here the client is looking for events between some time in the past and the current head + if heaviest-minHeight > maxRange { + return 0, 0, xerrors.Errorf("invalid epoch range: from block is too far in the past (maximum: %d)", maxRange) + } + } else if minHeight >= 0 && maxHeight >= 0 { + if minHeight > maxHeight { + return 0, 0, xerrors.Errorf("invalid epoch range: to block (%d) must be after from block (%d)", minHeight, maxHeight) + } else if maxHeight-minHeight > maxRange { + return 0, 0, xerrors.Errorf("invalid epoch range: range between to and from blocks is too large (maximum: %d)", maxRange) + } + } + return minHeight, maxHeight, nil +} + func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*filter.EventFilter, error) { var ( minHeight abi.ChainEpoch @@ -1280,64 +1333,11 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtype tipsetCid = filterSpec.BlockHash.ToCid() } else { - if filterSpec.FromBlock == nil || *filterSpec.FromBlock == "latest" { - ts := e.Chain.GetHeaviestTipSet() - minHeight = ts.Height() - } else if *filterSpec.FromBlock == "earliest" { - minHeight = 0 - } else if *filterSpec.FromBlock == "pending" { - return nil, api.ErrNotSupported - } else { - if !strings.HasPrefix(*filterSpec.FromBlock, "0x") { - return nil, xerrors.Errorf("FromBlock is not a hex") - } - epoch, err := ethtypes.EthUint64FromHex(*filterSpec.FromBlock) - if err != nil { - return nil, xerrors.Errorf("invalid epoch") - } - minHeight = abi.ChainEpoch(epoch) + var err error + minHeight, maxHeight, err = parseBlockRange(e.Chain.GetHeaviestTipSet().Height(), filterSpec.FromBlock, filterSpec.ToBlock, e.MaxFilterHeightRange) + if err != nil { + return nil, err } - - if filterSpec.ToBlock == nil || *filterSpec.ToBlock == "latest" { - // here latest means the latest at the time - maxHeight = -1 - } else if *filterSpec.ToBlock == "earliest" { - maxHeight = 0 - } else if *filterSpec.ToBlock == "pending" { - return nil, api.ErrNotSupported - } else { - if !strings.HasPrefix(*filterSpec.ToBlock, "0x") { - return nil, xerrors.Errorf("ToBlock is not a hex") - } - epoch, err := ethtypes.EthUint64FromHex(*filterSpec.ToBlock) - if err != nil { - return nil, xerrors.Errorf("invalid epoch") - } - maxHeight = abi.ChainEpoch(epoch) - } - - // Validate height ranges are within limits set by node operator - if minHeight == -1 && maxHeight > 0 { - // Here the client is looking for events between the head and some future height - ts := e.Chain.GetHeaviestTipSet() - if maxHeight-ts.Height() > e.MaxFilterHeightRange { - return nil, xerrors.Errorf("invalid epoch range: to block is too far in the future (maximum: %d)", e.MaxFilterHeightRange) - } - } else if minHeight >= 0 && maxHeight == -1 { - // Here the client is looking for events between some time in the past and the current head - ts := e.Chain.GetHeaviestTipSet() - if ts.Height()-minHeight > e.MaxFilterHeightRange { - return nil, xerrors.Errorf("invalid epoch range: from block is too far in the past (maximum: %d)", e.MaxFilterHeightRange) - } - - } else if minHeight >= 0 && maxHeight >= 0 { - if minHeight > maxHeight { - return nil, xerrors.Errorf("invalid epoch range: to block (%d) must be after from block (%d)", minHeight, maxHeight) - } else if maxHeight-minHeight > e.MaxFilterHeightRange { - return nil, xerrors.Errorf("invalid epoch range: range between to and from blocks is too large (maximum: %d)", e.MaxFilterHeightRange) - } - } - } // Convert all addresses to filecoin f4 addresses @@ -1362,7 +1362,7 @@ func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEvent for k, v := range keys { for _, vv := range v { keysWithCodec[k] = append(keysWithCodec[k], types.ActorEventBlock{ - Codec: uint64(multicodec.Raw), + Codec: uint64(multicodec.Raw), // FEVM smart contract events are always encoded with the `raw` Codec. Value: vv, }) } diff --git a/node/impl/full/eth_test.go b/node/impl/full/eth_test.go index 05c3f2575..6f9d8f297 100644 --- a/node/impl/full/eth_test.go +++ b/node/impl/full/eth_test.go @@ -3,6 +3,7 @@ package full import ( "bytes" "encoding/hex" + "fmt" "testing" "github.com/ipfs/go-cid" @@ -10,12 +11,87 @@ import ( "github.com/stretchr/testify/require" cbg "github.com/whyrusleeping/cbor-gen" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" ) +func TestParseBlockRange(t *testing.T) { + pstring := func(s string) *string { return &s } + + tcs := map[string]struct { + heaviest abi.ChainEpoch + from *string + to *string + maxRange abi.ChainEpoch + minOut abi.ChainEpoch + maxOut abi.ChainEpoch + errStr string + }{ + "fails when both are specified and range is greater than max allowed range": { + heaviest: 100, + from: pstring("0x100"), + to: pstring("0x200"), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too large", + }, + "fails when min is specified and range is greater than max allowed range": { + heaviest: 500, + from: pstring("0x10"), + to: pstring("latest"), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too far in the past", + }, + "fails when max is specified and range is greater than max allowed range": { + heaviest: 500, + from: pstring("earliest"), + to: pstring("0x10000"), + maxRange: 10, + minOut: 0, + maxOut: 0, + errStr: "too large", + }, + "works when range is valid": { + heaviest: 500, + from: pstring("earliest"), + to: pstring("latest"), + maxRange: 1000, + minOut: 0, + maxOut: -1, + }, + "works when range is valid and specified": { + heaviest: 500, + from: pstring("0x10"), + to: pstring("0x30"), + maxRange: 1000, + minOut: 16, + maxOut: 48, + }, + } + + for name, tc := range tcs { + tc2 := tc + t.Run(name, func(t *testing.T) { + min, max, err := parseBlockRange(tc2.heaviest, tc2.from, tc2.to, tc2.maxRange) + require.Equal(t, tc2.minOut, min) + require.Equal(t, tc2.maxOut, max) + if tc2.errStr != "" { + fmt.Println(err) + require.Error(t, err) + require.Contains(t, err.Error(), tc2.errStr) + } else { + require.NoError(t, err) + } + }) + } +} + func TestEthLogFromEvent(t *testing.T) { // basic empty data, topics, ok := ethLogFromEvent(nil) diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 0f9049bf7..9cb7a4a16 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -2,15 +2,14 @@ package modules import ( "context" + "fmt" "path/filepath" "time" - "github.com/multiformats/go-varint" "go.uber.org/fx" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/filter" @@ -129,8 +128,9 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc fm := &filter.EventFilterManager{ ChainStore: cs, EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true + // TODO: + // We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { - // we only want to match using f4 addresses idAddr, err := address.NewIDAddress(uint64(emitter)) if err != nil { return address.Undef, false @@ -138,18 +138,11 @@ func EventFilterManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.Loc actor, err := sm.LoadActor(ctx, idAddr, ts) if err != nil || actor.Address == nil { - return address.Undef, false + return idAddr, true } - // if robust address is not f4 then we won't match against it so bail early - if actor.Address.Protocol() != address.Delegated { - return address.Undef, false - } - // we have an f4 address, make sure it's assigned by the EAM - // What happens when we introduce events for built-in Actor events here ? - if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { - return address.Undef, false - } + fmt.Println("") + return *actor.Address, true }, @@ -175,9 +168,10 @@ func ActorEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRe return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, fm *filter.EventFilterManager, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI) (*full.ActorEvent, error) { ee := &full.ActorEvent{ MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange), + Chain: cs, } - if !cfg.EnableActorEventsAPI { + if !cfg.EnableActorEventsAPI || cfg.Events.DisableRealTimeFilterAPI { // all Actor events functionality is disabled return ee, nil }