itests: add event matrix tests for realtime eth filters and subscriptions

This commit is contained in:
Ian Davis 2023-01-20 14:01:25 +00:00
parent 1ea57740aa
commit c5ed5dd254
4 changed files with 876 additions and 593 deletions

View File

@ -20,7 +20,12 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
const indexed uint8 = 0x01 func isIndexedValue(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}
type EventFilter struct { type EventFilter struct {
id types.FilterID id types.FilterID
@ -209,7 +214,7 @@ func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
matched := map[string]bool{} matched := map[string]bool{}
for _, ee := range ees { for _, ee := range ees {
// Skip an entry that is not indexable // Skip an entry that is not indexable
if ee.Flags&indexed != indexed { if !isIndexedValue(ee.Flags) {
continue continue
} }
@ -221,12 +226,15 @@ func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
} }
wantlist, ok := f.keys[keyname] wantlist, ok := f.keys[keyname]
if !ok { if !ok || len(wantlist) == 0 {
continue continue
} }
for _, w := range wantlist { for _, w := range wantlist {
if bytes.Equal(w, ee.Value) { // TODO: remove this. Currently the filters use raw values but the value in the entry is cbor-encoded
// We want to make all internal values cbor-encoded as per https://github.com/filecoin-project/ref-fvm/issues/1345
value := leftpad32(decodeLogBytes(ee.Value))
if bytes.Equal(w, value) {
matched[keyname] = true matched[keyname] = true
break break
} }
@ -481,3 +489,13 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
return ems, nil return ems, nil
} }
func leftpad32(orig []byte) []byte {
needed := 32 - len(orig)
if needed <= 0 {
return orig
}
ret := make([]byte, 32)
copy(ret[needed:], orig)
return ret
}

View File

@ -153,13 +153,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("prepare insert entry: %w", err) return xerrors.Errorf("prepare insert entry: %w", err)
} }
isIndexedValue := func(b uint8) bool {
// currently we mark the full entry as indexed if either the key
// or the value are indexed; in the future we will need finer-grained
// management of indices
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}
for msgIdx, em := range ems { for msgIdx, em := range ems {
for evIdx, ev := range em.Events() { for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter] addr, found := addressLookups[ev.Emitter]

File diff suppressed because it is too large Load Diff

View File

@ -1016,6 +1016,8 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *ethtypes.EthFil
return ethtypes.EthFilterID{}, err return ethtypes.EthFilterID{}, err
} }
fmt.Printf("REMOVEME: EthNewFilter.f=%+v\n", f)
return ethtypes.EthFilterID(f.ID()), nil return ethtypes.EthFilterID(f.ID()), nil
} }
@ -1140,13 +1142,16 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
keys := map[string][][]byte{} keys := map[string][][]byte{}
if params != nil { if params != nil {
for idx, vals := range params.Topics { for idx, vals := range params.Topics {
if len(vals) == 0 {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4 // Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1) key := fmt.Sprintf("topic%d", idx+1)
keyvals := make([][]byte, len(vals)) for _, v := range vals {
for i, v := range vals { buf := make([]byte, len(v[:]))
keyvals[i] = v[:] copy(buf, v[:])
keys[key] = append(keys[key], buf)
} }
keys[key] = keyvals
} }
} }