fix: event filters use cbor encoding internally (#10085)
This commit is contained in:
parent
c5ed5dd254
commit
ddd5ff9c42
@ -105,18 +105,18 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
decodedEntries := make([]types.EventEntry, len(ev.Entries))
|
entries := make([]types.EventEntry, len(ev.Entries))
|
||||||
for i, entry := range ev.Entries {
|
for i, entry := range ev.Entries {
|
||||||
decodedEntries[i] = types.EventEntry{
|
entries[i] = types.EventEntry{
|
||||||
Flags: entry.Flags,
|
Flags: entry.Flags,
|
||||||
Key: entry.Key,
|
Key: entry.Key,
|
||||||
Value: decodeLogBytes(entry.Value),
|
Value: entry.Value,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// event matches filter, so record it
|
// event matches filter, so record it
|
||||||
cev := &CollectedEvent{
|
cev := &CollectedEvent{
|
||||||
Entries: decodedEntries,
|
Entries: entries,
|
||||||
EmitterAddr: addr,
|
EmitterAddr: addr,
|
||||||
EventIdx: evIdx,
|
EventIdx: evIdx,
|
||||||
Reverted: revert,
|
Reverted: revert,
|
||||||
@ -231,10 +231,7 @@ func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, w := range wantlist {
|
for _, w := range wantlist {
|
||||||
// TODO: remove this. Currently the filters use raw values but the value in the entry is cbor-encoded
|
if bytes.Equal(w, ee.Value) {
|
||||||
// We want to make all internal values cbor-encoded as per https://github.com/filecoin-project/ref-fvm/issues/1345
|
|
||||||
value := leftpad32(decodeLogBytes(ee.Value))
|
|
||||||
if bytes.Equal(w, value) {
|
|
||||||
matched[keyname] = true
|
matched[keyname] = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -489,13 +486,3 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
|
|||||||
|
|
||||||
return ems, nil
|
return ems, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func leftpad32(orig []byte) []byte {
|
|
||||||
needed := 32 - len(orig)
|
|
||||||
if needed <= 0 {
|
|
||||||
return orig
|
|
||||||
}
|
|
||||||
ret := make([]byte, 32)
|
|
||||||
copy(ret[needed:], orig)
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package filter
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
@ -11,7 +10,6 @@ import (
|
|||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
@ -191,13 +189,12 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, entry := range ev.Entries {
|
for _, entry := range ev.Entries {
|
||||||
value := decodeLogBytes(entry.Value)
|
|
||||||
_, err := stmtEntry.Exec(
|
_, err := stmtEntry.Exec(
|
||||||
lastID, // event_id
|
lastID, // event_id
|
||||||
isIndexedValue(entry.Flags), // indexed
|
isIndexedValue(entry.Flags), // indexed
|
||||||
[]byte{entry.Flags}, // flags
|
[]byte{entry.Flags}, // flags
|
||||||
entry.Key, // key
|
entry.Key, // key
|
||||||
value, // value
|
entry.Value, // value
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("exec insert entry: %w", err)
|
return xerrors.Errorf("exec insert entry: %w", err)
|
||||||
@ -213,21 +210,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// decodeLogBytes decodes a CBOR-serialized array into its original form.
|
|
||||||
//
|
|
||||||
// This function swallows errors and returns the original array if it failed
|
|
||||||
// to decode.
|
|
||||||
func decodeLogBytes(orig []byte) []byte {
|
|
||||||
if len(orig) == 0 {
|
|
||||||
return orig
|
|
||||||
}
|
|
||||||
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
|
|
||||||
if err != nil {
|
|
||||||
return orig
|
|
||||||
}
|
|
||||||
return decoded
|
|
||||||
}
|
|
||||||
|
|
||||||
// PrefillFilter fills a filter's collection of events from the historic index
|
// PrefillFilter fills a filter's collection of events from the historic index
|
||||||
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
|
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
|
||||||
clauses := []string{}
|
clauses := []string{}
|
||||||
|
@ -980,17 +980,9 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtype
|
|||||||
addresses = append(addresses, a)
|
addresses = append(addresses, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
for idx, vals := range filterSpec.Topics {
|
keys, err := parseEthTopics(filterSpec.Topics)
|
||||||
if len(vals) == 0 {
|
if err != nil {
|
||||||
continue
|
return nil, err
|
||||||
}
|
|
||||||
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
|
|
||||||
key := fmt.Sprintf("topic%d", idx+1)
|
|
||||||
for _, v := range vals {
|
|
||||||
buf := make([]byte, len(v[:]))
|
|
||||||
copy(buf, v[:])
|
|
||||||
keys[key] = append(keys[key], buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys)
|
return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys)
|
||||||
@ -1141,17 +1133,12 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
|
|||||||
case EthSubscribeEventTypeLogs:
|
case EthSubscribeEventTypeLogs:
|
||||||
keys := map[string][][]byte{}
|
keys := map[string][][]byte{}
|
||||||
if params != nil {
|
if params != nil {
|
||||||
for idx, vals := range params.Topics {
|
var err error
|
||||||
if len(vals) == 0 {
|
keys, err = parseEthTopics(params.Topics)
|
||||||
continue
|
if err != nil {
|
||||||
}
|
// clean up any previous filters added and stop the sub
|
||||||
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
|
_, _ = e.EthUnsubscribe(ctx, sub.id)
|
||||||
key := fmt.Sprintf("topic%d", idx+1)
|
return nil, err
|
||||||
for _, v := range vals {
|
|
||||||
buf := make([]byte, len(v[:]))
|
|
||||||
copy(buf, v[:])
|
|
||||||
keys[key] = append(keys[key], buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1238,7 +1225,10 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent, sa StateAPI) (*etht
|
|||||||
var err error
|
var err error
|
||||||
|
|
||||||
for _, entry := range ev.Entries {
|
for _, entry := range ev.Entries {
|
||||||
value := ethtypes.EthBytes(leftpad32(entry.Value)) // value has already been cbor-decoded but see https://github.com/filecoin-project/ref-fvm/issues/1345
|
value, err := cborDecodeTopicValue(entry.Value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if entry.Key == ethtypes.EthTopic1 || entry.Key == ethtypes.EthTopic2 || entry.Key == ethtypes.EthTopic3 || entry.Key == ethtypes.EthTopic4 {
|
if entry.Key == ethtypes.EthTopic1 || entry.Key == ethtypes.EthTopic2 || entry.Key == ethtypes.EthTopic3 || entry.Key == ethtypes.EthTopic4 {
|
||||||
log.Topics = append(log.Topics, value)
|
log.Topics = append(log.Topics, value)
|
||||||
} else {
|
} else {
|
||||||
@ -1892,10 +1882,6 @@ func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManag
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO we could also emit full EVM words from the EVM runtime, but not doing so
|
|
||||||
// makes the contract slightly cheaper (and saves storage bytes), at the expense
|
|
||||||
// of having to left pad in the API, which is a pretty acceptable tradeoff at
|
|
||||||
// face value. There may be other protocol implications to consider.
|
|
||||||
func leftpad32(orig []byte) []byte {
|
func leftpad32(orig []byte) []byte {
|
||||||
needed := 32 - len(orig)
|
needed := 32 - len(orig)
|
||||||
if needed <= 0 {
|
if needed <= 0 {
|
||||||
@ -1905,3 +1891,51 @@ func leftpad32(orig []byte) []byte {
|
|||||||
copy(ret[needed:], orig)
|
copy(ret[needed:], orig)
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func trimLeadingZeros(b []byte) []byte {
|
||||||
|
for i := range b {
|
||||||
|
if b[i] != 0 {
|
||||||
|
return b[i:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func cborEncodeTopicValue(orig []byte) ([]byte, error) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err := cbg.WriteByteArray(&buf, trimLeadingZeros(orig))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return buf.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func cborDecodeTopicValue(orig []byte) ([]byte, error) {
|
||||||
|
if len(orig) == 0 {
|
||||||
|
return orig, nil
|
||||||
|
}
|
||||||
|
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return leftpad32(decoded), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseEthTopics(topics ethtypes.EthTopicSpec) (map[string][][]byte, error) {
|
||||||
|
keys := map[string][][]byte{}
|
||||||
|
for idx, vals := range topics {
|
||||||
|
if len(vals) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
|
||||||
|
key := fmt.Sprintf("topic%d", idx+1)
|
||||||
|
for _, v := range vals {
|
||||||
|
encodedVal, err := cborEncodeTopicValue(v[:])
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to encode topic value")
|
||||||
|
}
|
||||||
|
keys[key] = append(keys[key], encodedVal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keys, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user