Merge pull request #9819 from filecoin-project/issue/ref-fvm-1207

Fix getting event logs by topic
This commit is contained in:
Aayush Rajasekaran 2022-12-08 14:29:33 -05:00 committed by GitHub
commit 6da93cf921
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 38 deletions

View File

@ -1,6 +1,7 @@
package filter package filter
import ( import (
"bytes"
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
@ -10,6 +11,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -151,6 +153,10 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("prepare insert entry: %w", err) return xerrors.Errorf("prepare insert entry: %w", err)
} }
isIndexedValue := func(b uint8) bool {
return b&types.EventFlagIndexedValue == types.EventFlagIndexedValue
}
for msgIdx, em := range ems { for msgIdx, em := range ems {
for evIdx, ev := range em.Events() { for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter] addr, found := addressLookups[ev.Emitter]
@ -189,12 +195,13 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
} }
for _, entry := range ev.Entries { for _, entry := range ev.Entries {
value := decodeLogBytes(entry.Value)
_, err := stmtEntry.Exec( _, err := stmtEntry.Exec(
lastID, // event_id lastID, // event_id
entry.Flags&indexed == indexed, // indexed isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags []byte{entry.Flags}, // flags
entry.Key, // key entry.Key, // key
entry.Value, // value value, // value
) )
if err != nil { if err != nil {
return xerrors.Errorf("exec insert entry: %w", err) return xerrors.Errorf("exec insert entry: %w", err)
@ -210,6 +217,21 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil return nil
} }
// decodeLogBytes decodes a CBOR-serialized array into its original form.
//
// This function swallows errors and returns the original array if it failed
// to decode.
func decodeLogBytes(orig []byte) []byte {
if orig == nil {
return orig
}
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
if err != nil {
return orig
}
return decoded
}
// PrefillFilter fills a filter's collection of events from the historic index // PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
clauses := []string{} clauses := []string{}
@ -242,6 +264,7 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
if len(f.keys) > 0 { if len(f.keys) > 0 {
join := 0 join := 0
for key, vals := range f.keys { for key, vals := range f.keys {
if len(vals) > 0 {
join++ join++
joinAlias := fmt.Sprintf("ee%d", join) joinAlias := fmt.Sprintf("ee%d", join)
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias)) joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
@ -250,10 +273,10 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
subclauses := []string{} subclauses := []string{}
for _, val := range vals { for _, val := range vals {
subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias)) subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias))
values = append(values, val) values = append(values, trimLeadingZeros(val))
} }
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
} }
} }
@ -397,3 +420,12 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
return nil return nil
} }
func trimLeadingZeros(b []byte) []byte {
for i := range b {
if b[i] != 0 {
return b[i:]
}
}
return []byte{}
}

View File

@ -24,3 +24,9 @@ type EventEntry struct {
} }
type FilterID [32]byte // compatible with EthHash type FilterID [32]byte // compatible with EthHash
// EventEntry flags defined in fvm_shared
const (
EventFlagIndexedKey = 0b00000001
EventFlagIndexedValue = 0b00000010
)

View File

@ -4,6 +4,7 @@ package itests
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/json"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
@ -446,17 +447,15 @@ func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) {
return el, err return el, err
} }
func TestEthGetLogsAll(t *testing.T) { type msgInTipset struct {
msg api.Message
ts *types.TipSet
reverted bool
}
func invokeContractAndWaitUntilAllOnChain(t *testing.T, client *kit.TestFullNode, iterations int) (api.EthAddress, map[api.EthHash]msgInTipset) {
require := require.New(t) require := require.New(t)
kit.QuietMiningLogs()
blockTime := 100 * time.Millisecond
dbpath := filepath.Join(t.TempDir(), "actorevents.db")
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel() defer cancel()
@ -476,13 +475,6 @@ func TestEthGetLogsAll(t *testing.T) {
require.NoError(err) require.NoError(err)
t.Logf("actor ID address is %s", idAddr) t.Logf("actor ID address is %s", idAddr)
const iterations = 10
type msgInTipset struct {
msg api.Message
ts *types.TipSet
}
msgChan := make(chan msgInTipset, iterations) msgChan := make(chan msgInTipset, iterations)
waitAllCh := make(chan struct{}) waitAllCh := make(chan struct{})
@ -503,7 +495,7 @@ func TestEthGetLogsAll(t *testing.T) {
count += len(msgs) count += len(msgs)
for _, m := range msgs { for _, m := range msgs {
select { select {
case msgChan <- msgInTipset{msg: m, ts: change.Val}: case msgChan <- msgInTipset{msg: m, ts: change.Val, reverted: change.Type == store.HCRevert}:
default: default:
} }
} }
@ -550,6 +542,22 @@ func TestEthGetLogsAll(t *testing.T) {
ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address) ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address)
require.NoError(err) require.NoError(err)
return ethContractAddr, received
}
func TestEthGetLogsAll(t *testing.T) {
require := require.New(t)
kit.QuietMiningLogs()
blockTime := 100 * time.Millisecond
dbpath := filepath.Join(t.TempDir(), "actorevents.db")
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
ens.InterconnectAll().BeginMining(blockTime)
ethContractAddr, received := invokeContractAndWaitUntilAllOnChain(t, client, 10)
topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11})) topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11}))
topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22})) topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22}))
topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33})) topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33}))
@ -558,8 +566,8 @@ func TestEthGetLogsAll(t *testing.T) {
pstring := func(s string) *string { return &s } pstring := func(s string) *string { return &s }
// get logs // get all logs
res, err := client.EthGetLogs(ctx, &api.EthFilterSpec{ res, err := client.EthGetLogs(context.Background(), &api.EthFilterSpec{
FromBlock: pstring("0x0"), FromBlock: pstring("0x0"),
}) })
require.NoError(err) require.NoError(err)
@ -600,6 +608,70 @@ func TestEthGetLogsAll(t *testing.T) {
} }
} }
func TestEthGetLogsByTopic(t *testing.T) {
require := require.New(t)
kit.QuietMiningLogs()
blockTime := 100 * time.Millisecond
dbpath := filepath.Join(t.TempDir(), "actorevents.db")
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
ens.InterconnectAll().BeginMining(blockTime)
invocations := 1
ethContractAddr, received := invokeContractAndWaitUntilAllOnChain(t, client, invocations)
topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11}))
topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22}))
topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33}))
topic4 := api.EthBytes(leftpad32([]byte{0x44, 0x44}))
data1 := api.EthBytes(leftpad32([]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}))
// find log by known topic1
var spec api.EthFilterSpec
err := json.Unmarshal([]byte(`{"fromBlock":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000001111"]}`), &spec)
require.NoError(err)
res, err := client.EthGetLogs(context.Background(), &spec)
require.NoError(err)
require.Equal(invocations, len(res.Results))
for _, r := range res.Results {
// since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results
rc, ok := r.(map[string]interface{})
require.True(ok, "result type")
elog, err := ParseEthLog(rc)
require.NoError(err)
require.Equal(ethContractAddr, elog.Address, "event address")
require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset
msg, exists := received[elog.TransactionHash]
require.True(exists, "message seen on chain")
tsCid, err := msg.ts.Key().Cid()
require.NoError(err)
tsCidHash, err := api.NewEthHashFromCid(tsCid)
require.NoError(err)
require.Equal(tsCidHash, elog.BlockHash, "block hash")
require.Equal(4, len(elog.Topics), "number of topics")
require.Equal(topic1, elog.Topics[0], "topic1")
require.Equal(topic2, elog.Topics[1], "topic2")
require.Equal(topic3, elog.Topics[2], "topic3")
require.Equal(topic4, elog.Topics[3], "topic4")
require.Equal(data1, elog.Data, "data1")
}
}
func TestEthSubscribeLogs(t *testing.T) { func TestEthSubscribeLogs(t *testing.T) {
require := require.New(t) require := require.New(t)

View File

@ -29,6 +29,7 @@ func TestFEVMEvents(t *testing.T) {
defer cancel() defer cancel()
// install contract // install contract
// See https://github.com/filecoin-project/builtin-actors/blob/next/actors/evm/tests/events.rs#L12
contractHex, err := os.ReadFile("contracts/events.bin") contractHex, err := os.ReadFile("contracts/events.bin")
require.NoError(err) require.NoError(err)
@ -65,17 +66,18 @@ func TestFEVMEvents(t *testing.T) {
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x00}, nil) ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x00}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
require.NotNil(ret.Receipt.EventsRoot) require.NotNil(ret.Receipt.EventsRoot)
fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) fmt.Println(ret)
fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
// log a zero topic event with no data // log a zero topic event with no data
ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x01}, nil) ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x01}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
fmt.Println(ret) fmt.Println(ret)
fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
// log a four topic event with data // log a four topic event with data
ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil) ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
fmt.Println(ret) fmt.Println(ret)
fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
} }