Add basic itests for ethereum filter api

This commit is contained in:
Ian Davis 2022-11-15 17:28:26 +00:00
parent ade75af765
commit 10357112e5
8 changed files with 167 additions and 45 deletions

View File

@ -484,6 +484,9 @@ type EthFilterSpec struct {
type EthAddressList []EthAddress type EthAddressList []EthAddress
func (e *EthAddressList) UnmarshalJSON(b []byte) error { func (e *EthAddressList) UnmarshalJSON(b []byte) error {
if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) {
return nil
}
if len(b) > 0 && b[0] == '[' { if len(b) > 0 && b[0] == '[' {
var addrs []EthAddress var addrs []EthAddress
err := json.Unmarshal(b, &addrs) err := json.Unmarshal(b, &addrs)
@ -542,34 +545,29 @@ func (e *EthHashList) UnmarshalJSON(b []byte) error {
return nil return nil
} }
// FilterResult represents the response from executing a filter: a list of bloack hashes, a list of transaction hashes // FilterResult represents the response from executing a filter: a list of block hashes, a list of transaction hashes
// or a list of logs // or a list of logs
// This is a union type. Only one field will be populated. // This is a union type. Only one field will be populated.
// The JSON encoding must produce an array of the populated field. // The JSON encoding must produce an array of the populated field.
type EthFilterResult struct { type EthFilterResult struct {
// List of block hashes. Only populated when the filter has been installed via EthNewBlockFilter Results []interface{}
NewBlockHashes []EthHash
// List of transaction hashes. Only populated when the filter has been installed via EthNewPendingTransactionFilter
NewTransactionHashes []EthHash
// List of event logs. Only populated when the filter has been installed via EthNewFilter
NewLogs []EthLog
} }
func (h EthFilterResult) MarshalJSON() ([]byte, error) { func (h EthFilterResult) MarshalJSON() ([]byte, error) {
if h.NewBlockHashes != nil { if h.Results != nil {
return json.Marshal(h.NewBlockHashes) return json.Marshal(h.Results)
}
if h.NewTransactionHashes != nil {
return json.Marshal(h.NewTransactionHashes)
}
if h.NewLogs != nil {
return json.Marshal(h.NewLogs)
} }
return []byte{'[', ']'}, nil return []byte{'[', ']'}, nil
} }
func (h *EthFilterResult) UnmarshalJSON(b []byte) error {
if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) {
return nil
}
err := json.Unmarshal(b, &h.Results)
return err
}
// EthLog represents the results of an event filter execution. // EthLog represents the results of an event filter execution.
type EthLog struct { type EthLog struct {
// Address is the address of the actor that produced the event log. // Address is the address of the actor that produced the event log.

View File

@ -193,30 +193,33 @@ func TestEthFilterResultMarshalJSON(t *testing.T) {
{ {
res: EthFilterResult{ res: EthFilterResult{
NewBlockHashes: []EthHash{hash1, hash2}, Results: []any{hash1, hash2},
}, },
want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`,
}, },
{ {
res: EthFilterResult{ res: EthFilterResult{
NewTransactionHashes: []EthHash{hash1, hash2}, Results: []any{hash1, hash2},
}, },
want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`,
}, },
{ {
res: EthFilterResult{ res: EthFilterResult{
NewLogs: []EthLog{log}, Results: []any{log},
}, },
want: `[` + string(logjson) + `]`, want: `[` + string(logjson) + `]`,
}, },
} }
for _, tc := range testcases { for _, tc := range testcases {
data, err := json.Marshal(tc.res) tc := tc
require.NoError(t, err) t.Run("", func(t *testing.T) {
require.Equal(t, tc.want, string(data)) data, err := json.Marshal(tc.res)
require.NoError(t, err)
require.Equal(t, tc.want, string(data))
})
} }
} }
@ -325,12 +328,23 @@ func TestEthAddressListUnmarshalJSON(t *testing.T) {
input: `"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"`, input: `"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"`,
want: EthAddressList{addr1}, want: EthAddressList{addr1},
}, },
{
input: `[]`,
want: EthAddressList{},
},
{
input: `null`,
want: EthAddressList(nil),
},
} }
for _, tc := range testcases { for _, tc := range testcases {
var got EthAddressList tc := tc
err := json.Unmarshal([]byte(tc.input), &got) t.Run("", func(t *testing.T) {
require.NoError(t, err) var got EthAddressList
require.Equal(t, tc.want, got) err := json.Unmarshal([]byte(tc.input), &got)
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
} }
} }

View File

@ -3,14 +3,17 @@ package filter
import ( import (
"bytes" "bytes"
"context" "context"
"math"
"sync" "sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
@ -385,6 +388,9 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
} }
m.mu.Lock() m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*EventFilter)
}
m.filters[id.String()] = f m.filters[id.String()] = f
m.mu.Unlock() m.mu.Unlock()
@ -437,19 +443,30 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
continue continue
} }
evtArr, err := blockadt.AsArray(st, *rct.EventsRoot) evtArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(5))
if err != nil { if err != nil {
return nil, xerrors.Errorf("load events amt: %w", err) return nil, xerrors.Errorf("load events amt: %w", err)
} }
ems[i].evs = make([]*types.Event, evtArr.Length()) ems[i].evs = make([]*types.Event, evtArr.Len())
var evt types.Event var evt types.Event
_ = arr.ForEach(&evt, func(i int64) error { err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error {
if u > math.MaxInt {
return xerrors.Errorf("too many events")
}
if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil {
return err
}
cpy := evt cpy := evt
ems[i].evs[int(i)] = &cpy ems[i].evs[int(u)] = &cpy
return nil return nil
}) })
if err != nil {
return nil, xerrors.Errorf("read events: %w", err)
}
} }
return ems, nil return ems, nil

View File

@ -124,6 +124,9 @@ func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, err
} }
m.mu.Lock() m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*MemPoolFilter)
}
m.filters[id.String()] = f m.filters[id.String()] = f
m.mu.Unlock() m.mu.Unlock()

View File

@ -111,6 +111,9 @@ func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error
} }
m.mu.Lock() m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*TipSetFilter)
}
m.filters[id.String()] = f m.filters[id.String()] = f
m.mu.Unlock() m.mu.Unlock()

View File

@ -3,24 +3,28 @@ package itests
import ( import (
"context" "context"
"encoding/hex"
"os"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/itests/kit"
) )
func TestActorEventsMpool(t *testing.T) { func TestEthNewPendingTransactionFilter(t *testing.T) {
ctx := context.Background() ctx := context.Background()
kit.QuietMiningLogs() kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(10 * time.Millisecond) ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds. // create a new address where to send funds.
@ -92,15 +96,15 @@ func TestActorEventsMpool(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// expect to have seen iteration number of mpool messages // expect to have seen iteration number of mpool messages
require.Equal(t, iterations, len(res.NewTransactionHashes)) require.Equal(t, iterations, len(res.Results))
} }
func TestActorEventsTipsets(t *testing.T) { func TestEthNewBlockFilter(t *testing.T) {
ctx := context.Background() ctx := context.Background()
kit.QuietMiningLogs() kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(10 * time.Millisecond) ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds. // create a new address where to send funds.
@ -115,7 +119,7 @@ func TestActorEventsTipsets(t *testing.T) {
filterID, err := client.EthNewBlockFilter(ctx) filterID, err := client.EthNewBlockFilter(ctx)
require.NoError(t, err) require.NoError(t, err)
const iterations = 100 const iterations = 30
// we'll send half our balance (saving the other half for gas), // we'll send half our balance (saving the other half for gas),
// in `iterations` increments. // in `iterations` increments.
@ -133,10 +137,8 @@ func TestActorEventsTipsets(t *testing.T) {
select { select {
case headChanges := <-headChangeCh: case headChanges := <-headChangeCh:
for _, change := range headChanges { for _, change := range headChanges {
if change.Type == store.HCApply { if change.Type == store.HCApply || change.Type == store.HCRevert {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) count++
require.NoError(t, err)
count += len(msgs)
if count == iterations { if count == iterations {
waitAllCh <- struct{}{} waitAllCh <- struct{}{}
} }
@ -172,5 +174,83 @@ func TestActorEventsTipsets(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// expect to have seen iteration number of tipsets // expect to have seen iteration number of tipsets
require.Equal(t, iterations, len(res.NewBlockHashes)) require.Equal(t, iterations, len(res.Results))
}
func TestEthNewFilterCatchAll(t *testing.T) {
require := require.New(t)
kit.QuietMiningLogs()
blockTime := 100 * time.Millisecond
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// install contract
contractHex, err := os.ReadFile("contracts/events.bin")
require.NoError(err)
contract, err := hex.DecodeString(string(contractHex))
require.NoError(err)
fromAddr, err := client.WalletDefaultAddress(ctx)
require.NoError(err)
result := client.EVM().DeployContract(ctx, fromAddr, contract)
idAddr, err := address.NewIDAddress(result.ActorID)
require.NoError(err)
t.Logf("actor ID address is %s", idAddr)
// install filter
filterID, err := client.EthNewFilter(ctx, &api.EthFilterSpec{})
require.NoError(err)
const iterations = 10
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply || change.Type == store.HCRevert {
count++
if count == iterations*3 {
waitAllCh <- struct{}{}
}
}
}
}
}
}()
time.Sleep(blockTime * 6)
for i := 0; i < iterations; i++ {
// log a four topic event with data
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(err)
// expect to have seen iteration number of events
require.Equal(iterations, len(res.Results))
} }

View File

@ -280,3 +280,10 @@ func SplitstoreMessges() NodeOpt {
return nil return nil
}) })
} }
func RealTimeFilterAPI() NodeOpt {
return WithCfgOpt(func(cfg *config.FullNode) error {
cfg.ActorEvent.EnableRealTimeFilterAPI = true
return nil
})
}

View File

@ -1293,7 +1293,7 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu
return nil, err return nil, err
} }
res.NewLogs = append(res.NewLogs, log) res.Results = append(res.Results, log)
} }
return res, nil return res, nil
@ -1312,7 +1312,7 @@ func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*api.EthFilterResult, e
return nil, err return nil, err
} }
res.NewBlockHashes = append(res.NewBlockHashes, hash) res.Results = append(res.Results, hash)
} }
return res, nil return res, nil
@ -1327,7 +1327,7 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) {
return nil, err return nil, err
} }
res.NewTransactionHashes = append(res.NewTransactionHashes, hash) res.Results = append(res.Results, hash)
} }
return res, nil return res, nil