Merge pull request #10083 from filecoin-project/iand/issue-9849-realtime

itests: add event matrix tests for realtime eth filters and subscriptions
This commit is contained in:
Geoff Stuart 2023-01-26 08:50:51 -05:00 committed by GitHub
commit c3be4f2f15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 923 additions and 637 deletions

View File

@ -20,7 +20,12 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
const indexed uint8 = 0x01 func isIndexedValue(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}
type EventFilter struct { type EventFilter struct {
id types.FilterID id types.FilterID
@ -100,18 +105,18 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
continue continue
} }
decodedEntries := make([]types.EventEntry, len(ev.Entries)) entries := make([]types.EventEntry, len(ev.Entries))
for i, entry := range ev.Entries { for i, entry := range ev.Entries {
decodedEntries[i] = types.EventEntry{ entries[i] = types.EventEntry{
Flags: entry.Flags, Flags: entry.Flags,
Key: entry.Key, Key: entry.Key,
Value: decodeLogBytes(entry.Value), Value: entry.Value,
} }
} }
// event matches filter, so record it // event matches filter, so record it
cev := &CollectedEvent{ cev := &CollectedEvent{
Entries: decodedEntries, Entries: entries,
EmitterAddr: addr, EmitterAddr: addr,
EventIdx: evIdx, EventIdx: evIdx,
Reverted: revert, Reverted: revert,
@ -209,7 +214,7 @@ func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
matched := map[string]bool{} matched := map[string]bool{}
for _, ee := range ees { for _, ee := range ees {
// Skip an entry that is not indexable // Skip an entry that is not indexable
if ee.Flags&indexed != indexed { if !isIndexedValue(ee.Flags) {
continue continue
} }
@ -221,7 +226,7 @@ func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
} }
wantlist, ok := f.keys[keyname] wantlist, ok := f.keys[keyname]
if !ok { if !ok || len(wantlist) == 0 {
continue continue
} }

View File

@ -1,7 +1,6 @@
package filter package filter
import ( import (
"bytes"
"context" "context"
"database/sql" "database/sql"
"errors" "errors"
@ -11,7 +10,6 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -153,13 +151,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("prepare insert entry: %w", err) return xerrors.Errorf("prepare insert entry: %w", err)
} }
isIndexedValue := func(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}
for msgIdx, em := range ems { for msgIdx, em := range ems {
for evIdx, ev := range em.Events() { for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter] addr, found := addressLookups[ev.Emitter]
@ -198,13 +189,12 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
} }
for _, entry := range ev.Entries { for _, entry := range ev.Entries {
value := decodeLogBytes(entry.Value)
_, err := stmtEntry.Exec( _, err := stmtEntry.Exec(
lastID, // event_id lastID, // event_id
isIndexedValue(entry.Flags), // indexed isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags []byte{entry.Flags}, // flags
entry.Key, // key entry.Key, // key
value, // value entry.Value, // value
) )
if err != nil { if err != nil {
return xerrors.Errorf("exec insert entry: %w", err) return xerrors.Errorf("exec insert entry: %w", err)
@ -220,21 +210,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil return nil
} }
// decodeLogBytes decodes a CBOR-serialized array into its original form.
//
// This function swallows errors and returns the original array if it failed
// to decode.
func decodeLogBytes(orig []byte) []byte {
if len(orig) == 0 {
return orig
}
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
if err != nil {
return orig
}
return decoded
}
// PrefillFilter fills a filter's collection of events from the historic index // PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
clauses := []string{} clauses := []string{}

File diff suppressed because it is too large Load Diff

View File

@ -982,17 +982,9 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtype
addresses = append(addresses, a) addresses = append(addresses, a)
} }
for idx, vals := range filterSpec.Topics { keys, err := parseEthTopics(filterSpec.Topics)
if len(vals) == 0 { if err != nil {
continue return nil, err
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
for _, v := range vals {
buf := make([]byte, len(v[:]))
copy(buf, v[:])
keys[key] = append(keys[key], buf)
}
} }
return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys) return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys)
@ -1017,7 +1009,6 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *ethtypes.EthFil
return ethtypes.EthFilterID{}, err return ethtypes.EthFilterID{}, err
} }
return ethtypes.EthFilterID(f.ID()), nil return ethtypes.EthFilterID(f.ID()), nil
} }
@ -1141,14 +1132,12 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
case EthSubscribeEventTypeLogs: case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{} keys := map[string][][]byte{}
if params != nil { if params != nil {
for idx, vals := range params.Topics { var err error
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4 keys, err = parseEthTopics(params.Topics)
key := fmt.Sprintf("topic%d", idx+1) if err != nil {
keyvals := make([][]byte, len(vals)) // clean up any previous filters added and stop the sub
for i, v := range vals { _, _ = e.EthUnsubscribe(ctx, sub.id)
keyvals[i] = v[:] return nil, err
}
keys[key] = keyvals
} }
} }
@ -1235,7 +1224,10 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent, sa StateAPI) (*etht
var err error var err error
for _, entry := range ev.Entries { for _, entry := range ev.Entries {
value := ethtypes.EthBytes(leftpad32(entry.Value)) // value has already been cbor-decoded but see https://github.com/filecoin-project/ref-fvm/issues/1345 value, err := cborDecodeTopicValue(entry.Value)
if err != nil {
return nil, err
}
if entry.Key == ethtypes.EthTopic1 || entry.Key == ethtypes.EthTopic2 || entry.Key == ethtypes.EthTopic3 || entry.Key == ethtypes.EthTopic4 { if entry.Key == ethtypes.EthTopic1 || entry.Key == ethtypes.EthTopic2 || entry.Key == ethtypes.EthTopic3 || entry.Key == ethtypes.EthTopic4 {
log.Topics = append(log.Topics, value) log.Topics = append(log.Topics, value)
} else { } else {
@ -1778,7 +1770,10 @@ func newEthTxReceipt(ctx context.Context, tx ethtypes.EthTx, lookup *api.MsgLook
} }
for _, entry := range evt.Entries { for _, entry := range evt.Entries {
value := ethtypes.EthBytes(leftpad32(entry.Value)) // value has already been cbor-decoded but see https://github.com/filecoin-project/ref-fvm/issues/1345 value, err := cborDecodeTopicValue(entry.Value)
if err != nil {
return api.EthTxReceipt{}, xerrors.Errorf("failed to decode event log value: %w", err)
}
if entry.Key == ethtypes.EthTopic1 || entry.Key == ethtypes.EthTopic2 || entry.Key == ethtypes.EthTopic3 || entry.Key == ethtypes.EthTopic4 { if entry.Key == ethtypes.EthTopic1 || entry.Key == ethtypes.EthTopic2 || entry.Key == ethtypes.EthTopic3 || entry.Key == ethtypes.EthTopic4 {
l.Topics = append(l.Topics, value) l.Topics = append(l.Topics, value)
} else { } else {
@ -1889,10 +1884,6 @@ func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManag
} }
} }
// TODO we could also emit full EVM words from the EVM runtime, but not doing so
// makes the contract slightly cheaper (and saves storage bytes), at the expense
// of having to left pad in the API, which is a pretty acceptable tradeoff at
// face value. There may be other protocol implications to consider.
func leftpad32(orig []byte) []byte { func leftpad32(orig []byte) []byte {
needed := 32 - len(orig) needed := 32 - len(orig)
if needed <= 0 { if needed <= 0 {
@ -1902,3 +1893,51 @@ func leftpad32(orig []byte) []byte {
copy(ret[needed:], orig) copy(ret[needed:], orig)
return ret return ret
} }
func trimLeadingZeros(b []byte) []byte {
for i := range b {
if b[i] != 0 {
return b[i:]
}
}
return []byte{}
}
func cborEncodeTopicValue(orig []byte) ([]byte, error) {
var buf bytes.Buffer
err := cbg.WriteByteArray(&buf, trimLeadingZeros(orig))
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func cborDecodeTopicValue(orig []byte) ([]byte, error) {
if len(orig) == 0 {
return orig, nil
}
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
if err != nil {
return nil, err
}
return leftpad32(decoded), nil
}
func parseEthTopics(topics ethtypes.EthTopicSpec) (map[string][][]byte, error) {
keys := map[string][][]byte{}
for idx, vals := range topics {
if len(vals) == 0 {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
for _, v := range vals {
encodedVal, err := cborEncodeTopicValue(v[:])
if err != nil {
return nil, xerrors.Errorf("failed to encode topic value")
}
keys[key] = append(keys[key], encodedVal)
}
}
return keys, nil
}