Ethereum actor events API fixes (#9639)

Co-authored-by: Raúl Kripalani <raul@protocol.ai>
This commit is contained in:
Ian Davis 2022-11-15 13:02:11 +00:00 committed by GitHub
parent 30050a6cb2
commit 273ac513fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 124 additions and 47 deletions

View File

@ -18,6 +18,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type RobustAddresser interface {
LookupRobustAddress(ctx context.Context, idAddr address.Address, ts *types.TipSet) (address.Address, error)
}
const indexed uint8 = 0x01
type EventFilter struct {
@ -25,7 +29,7 @@ type EventFilter struct {
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
tipsetCid cid.Cid
addresses []address.Address // list of actor ids that originated the event
addresses []address.Address // list of f4 actor addresses that are extpected to emit the event
keys map[string][][]byte // map of key names to a list of alternate values that may match
maxResults int // maximum number of results to collect, 0 is unlimited
@ -39,6 +43,7 @@ var _ Filter = (*EventFilter)(nil)
type CollectedEvent struct {
Event *types.Event
EmitterAddr address.Address // f4 address of emitter
EventIdx int // index of the event within the list of emitted events
Reverted bool
Height abi.ChainEpoch
@ -64,18 +69,33 @@ func (f *EventFilter) ClearSubChannel() {
f.ch = nil
}
func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool) error {
func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
if !f.matchTipset(te) {
return nil
}
// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)
ems, err := te.messages(ctx)
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
if !f.matchAddress(ev.Emitter) {
// lookup address corresponding to the actor id
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[ev.Emitter] = addr
}
if !f.matchAddress(addr) {
continue
}
if !f.matchKeys(ev.Entries) {
@ -85,6 +105,7 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
// event matches filter, so record it
cev := &CollectedEvent{
Event: ev,
EmitterAddr: addr,
EventIdx: evIdx,
Reverted: revert,
Height: te.msgTs.Height(),
@ -152,8 +173,9 @@ func (f *EventFilter) matchAddress(o address.Address) bool {
if len(f.addresses) == 0 {
return true
}
// Assume short lists of addresses
// TODO: binary search for longer lists
// TODO: binary search for longer lists or restrict list length
for _, a := range f.addresses {
if a == o {
return true
@ -258,6 +280,7 @@ func (e *executedMessage) Events() []*types.Event {
type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
MaxFilterResults int
mu sync.Mutex // guards mutations to filters
@ -279,7 +302,7 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false); err != nil {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
return err
}
}
@ -302,7 +325,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true); err != nil {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
return err
}
}

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
@ -23,11 +24,18 @@ import (
func TestEventFilterCollectEvents(t *testing.T) {
rng := pseudo.New(pseudo.NewSource(299792458))
a1 := randomActorAddr(t, rng)
a2 := randomActorAddr(t, rng)
a1 := randomF4Addr(t, rng)
a2 := randomF4Addr(t, rng)
a1ID := abi.ActorID(1)
a2ID := abi.ActorID(2)
addrMap := addressMap{}
addrMap.add(a1ID, a1)
addrMap.add(a2ID, a2)
ev1 := fakeEvent(
a1,
a1ID,
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr1")},
@ -40,7 +48,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
st := newStore()
events := []*types.Event{ev1}
em := executedMessage{
msg: fakeMessage(randomActorAddr(t, rng), randomActorAddr(t, rng)),
msg: fakeMessage(randomF4Addr(t, rng), randomF4Addr(t, rng)),
rct: fakeReceipt(t, rng, st, events),
evs: events,
}
@ -53,6 +61,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
oneCollectedEvent := []*CollectedEvent{
{
Event: ev1,
EmitterAddr: a1,
EventIdx: 0,
Reverted: false,
Height: 14000,
@ -254,7 +263,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := tc.filter.CollectEvents(context.Background(), tc.te, false); err != nil {
if err := tc.filter.CollectEvents(context.Background(), tc.te, false, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events")
}
@ -269,7 +278,7 @@ type kv struct {
v []byte
}
func fakeEvent(emitter address.Address, indexed []kv, unindexed []kv) *types.Event {
func fakeEvent(emitter abi.ActorID, indexed []kv, unindexed []kv) *types.Event {
ev := &types.Event{
Emitter: emitter,
}
@ -293,9 +302,9 @@ func fakeEvent(emitter address.Address, indexed []kv, unindexed []kv) *types.Eve
return ev
}
func randomActorAddr(tb testing.TB, rng *pseudo.Rand) address.Address {
func randomF4Addr(tb testing.TB, rng *pseudo.Rand) address.Address {
tb.Helper()
addr, err := address.NewActorAddress(randomBytes(32, rng))
addr, err := address.NewDelegatedAddress(builtintypes.EthereumAddressManagerActorID, randomBytes(32, rng))
require.NoError(tb, err)
return addr
@ -409,3 +418,14 @@ func buildTipSetEvents(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, em exe
},
}
}
type addressMap map[abi.ActorID]address.Address
func (a addressMap) add(actorID abi.ActorID, addr address.Address) {
a[actorID] = addr
}
func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
ra, ok := a[emitter]
return ra, ok
}

View File

@ -1852,8 +1852,9 @@ func (t *Event) MarshalCBOR(w io.Writer) error {
return err
}
// t.Emitter (address.Address) (struct)
if err := t.Emitter.MarshalCBOR(cw); err != nil {
// t.Emitter (abi.ActorID) (uint64)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Emitter)); err != nil {
return err
}
@ -1896,13 +1897,18 @@ func (t *Event) UnmarshalCBOR(r io.Reader) (err error) {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Emitter (address.Address) (struct)
// t.Emitter (abi.ActorID) (uint64)
{
if err := t.Emitter.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.Emitter: %w", err)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Emitter = abi.ActorID(extra)
}
// t.Entries ([]types.EventEntry) (slice)

View File

@ -1,12 +1,12 @@
package types
import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
)
type Event struct {
// The ID of the actor that emitted this event.
Emitter address.Address
Emitter abi.ActorID
// Key values making up this event.
Entries []EventEntry

View File

@ -244,7 +244,7 @@ func ConfigFullNode(c interface{}) Option {
// Actor event filtering support
Override(new(events.EventAPI), From(new(modules.EventAPI))),
Override(new(full.EthEventAPI), modules.EthEvent(cfg.ActorEvent)),
Override(new(full.EthEventAPI), modules.EthEventAPI(cfg.ActorEvent)),
)
}

View File

@ -986,6 +986,8 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec)
}
}
// Convert all addresses to filecoin f4 addresses
for _, ea := range filter.Address {
a, err := ea.ToFilecoinAddress()
if err != nil {
@ -1254,7 +1256,7 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu
}
}
log.Address, err = api.EthAddressFromFilecoinAddress(ev.Event.Emitter)
log.Address, err = api.EthAddressFromFilecoinAddress(ev.EmitterAddr)
if err != nil {
return nil, err
}

View File

@ -4,14 +4,19 @@ import (
"context"
"time"
"github.com/multiformats/go-varint"
"go.uber.org/fx"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
@ -26,8 +31,8 @@ type EventAPI struct {
var _ events.EventAPI = &EventAPI{}
func EthEvent(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) {
func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) {
ee := &full.EthEvent{
Chain: cs,
MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange),
@ -51,6 +56,27 @@ func EthEvent(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle
if cfg.EnableRealTimeFilterAPI {
ee.EventFilterManager = &filter.EventFilterManager{
ChainStore: cs,
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}
addr, err := sm.LookupRobustAddress(ctx, idAddr, ts)
if err != nil {
return address.Undef, false
}
// if robust address is not f4 then we won't match against it so bail early
if addr.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(addr.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return addr, true
},
MaxFilterResults: cfg.MaxFilterResults,
}
ee.TipSetFilterManager = &filter.TipSetFilterManager{