Add TestEthGetLogsAll itest
This commit is contained in:
parent
b5f95b7837
commit
d5177a394a
@ -299,10 +299,9 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
|
||||
defer m.mu.Unlock()
|
||||
m.currentHeight = to.Height()
|
||||
|
||||
if len(m.filters) == 0 {
|
||||
if len(m.filters) == 0 && m.EventIndex == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
tse := &TipSetEvents{
|
||||
msgTs: from,
|
||||
rctTs: to,
|
||||
@ -330,7 +329,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
|
||||
defer m.mu.Unlock()
|
||||
m.currentHeight = to.Height()
|
||||
|
||||
if len(m.filters) == 0 {
|
||||
if len(m.filters) == 0 && m.EventIndex == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -130,6 +130,7 @@ func (ei *EventIndex) Close() error {
|
||||
|
||||
func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
|
||||
// cache of lookups between actor id and f4 address
|
||||
|
||||
addressLookups := make(map[abi.ActorID]address.Address)
|
||||
|
||||
ems, err := te.messages(ctx)
|
||||
@ -243,7 +244,6 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
|
||||
for key, vals := range f.keys {
|
||||
join++
|
||||
joinAlias := fmt.Sprintf("ee%d", join)
|
||||
// JOIN ee1 event_entry ON event.id=ee1.event_id
|
||||
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
|
||||
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
|
||||
values = append(values, key)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -436,3 +437,158 @@ func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) {
|
||||
|
||||
return el, err
|
||||
}
|
||||
|
||||
func TestEthGetLogsAll(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
blockTime := 100 * time.Millisecond
|
||||
dbpath := filepath.Join(t.TempDir(), "actorevents.db")
|
||||
|
||||
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// install contract
|
||||
contractHex, err := os.ReadFile("contracts/events.bin")
|
||||
require.NoError(err)
|
||||
|
||||
contract, err := hex.DecodeString(string(contractHex))
|
||||
require.NoError(err)
|
||||
|
||||
fromAddr, err := client.WalletDefaultAddress(ctx)
|
||||
require.NoError(err)
|
||||
|
||||
result := client.EVM().DeployContract(ctx, fromAddr, contract)
|
||||
|
||||
idAddr, err := address.NewIDAddress(result.ActorID)
|
||||
require.NoError(err)
|
||||
t.Logf("actor ID address is %s", idAddr)
|
||||
|
||||
const iterations = 10
|
||||
|
||||
type msgInTipset struct {
|
||||
msg api.Message
|
||||
ts *types.TipSet
|
||||
}
|
||||
|
||||
msgChan := make(chan msgInTipset, iterations)
|
||||
|
||||
waitAllCh := make(chan struct{})
|
||||
go func() {
|
||||
headChangeCh, err := client.ChainNotify(ctx)
|
||||
require.NoError(err)
|
||||
<-headChangeCh // skip hccurrent
|
||||
|
||||
count := 0
|
||||
for {
|
||||
select {
|
||||
case headChanges := <-headChangeCh:
|
||||
for _, change := range headChanges {
|
||||
if change.Type == store.HCApply || change.Type == store.HCRevert {
|
||||
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
|
||||
require.NoError(err)
|
||||
|
||||
count += len(msgs)
|
||||
for _, m := range msgs {
|
||||
select {
|
||||
case msgChan <- msgInTipset{msg: m, ts: change.Val}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
if count == iterations {
|
||||
close(msgChan)
|
||||
close(waitAllCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(blockTime * 6)
|
||||
|
||||
for i := 0; i < iterations; i++ {
|
||||
// log a four topic event with data
|
||||
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
|
||||
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-waitAllCh:
|
||||
case <-time.After(time.Minute):
|
||||
t.Errorf("timeout to wait for pack messages")
|
||||
}
|
||||
|
||||
received := make(map[api.EthHash]msgInTipset)
|
||||
for m := range msgChan {
|
||||
eh, err := api.NewEthHashFromCid(m.msg.Cid)
|
||||
require.NoError(err)
|
||||
received[eh] = m
|
||||
}
|
||||
require.Equal(iterations, len(received), "all messages on chain")
|
||||
|
||||
ts, err := client.ChainHead(ctx)
|
||||
require.NoError(err)
|
||||
|
||||
actor, err := client.StateGetActor(ctx, idAddr, ts.Key())
|
||||
require.NoError(err)
|
||||
require.NotNil(actor.Address)
|
||||
ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address)
|
||||
require.NoError(err)
|
||||
|
||||
topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11})
|
||||
topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22})
|
||||
topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33})
|
||||
topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44})
|
||||
data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})
|
||||
|
||||
pstring := func(s string) *string { return &s }
|
||||
|
||||
// get logs
|
||||
res, err := client.EthGetLogs(ctx, &api.EthFilterSpec{
|
||||
FromBlock: pstring("0"),
|
||||
})
|
||||
require.NoError(err)
|
||||
|
||||
// expect to have seen iteration number of events
|
||||
require.Equal(iterations, len(res.Results))
|
||||
|
||||
for _, r := range res.Results {
|
||||
// since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results
|
||||
rc, ok := r.(map[string]interface{})
|
||||
require.True(ok, "result type")
|
||||
|
||||
elog, err := ParseEthLog(rc)
|
||||
require.NoError(err)
|
||||
|
||||
require.Equal(ethContractAddr, elog.Address, "event address")
|
||||
require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset
|
||||
|
||||
msg, exists := received[elog.TransactionHash]
|
||||
require.True(exists, "message seen on chain")
|
||||
|
||||
tsCid, err := msg.ts.Key().Cid()
|
||||
require.NoError(err)
|
||||
|
||||
tsCidHash, err := api.NewEthHashFromCid(tsCid)
|
||||
require.NoError(err)
|
||||
|
||||
require.Equal(tsCidHash, elog.BlockHash, "block hash")
|
||||
|
||||
require.Equal(4, len(elog.Topics), "number of topics")
|
||||
require.Equal(topic1Hash, elog.Topics[0], "topic1")
|
||||
require.Equal(topic2Hash, elog.Topics[1], "topic2")
|
||||
require.Equal(topic3Hash, elog.Topics[2], "topic3")
|
||||
require.Equal(topic4Hash, elog.Topics[3], "topic4")
|
||||
|
||||
require.Equal(1, len(elog.Data), "number of data")
|
||||
require.Equal(data1Hash, elog.Data[0], "data1")
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -287,3 +287,12 @@ func RealTimeFilterAPI() NodeOpt {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func HistoricFilterAPI(dbpath string) NodeOpt {
|
||||
return WithCfgOpt(func(cfg *config.FullNode) error {
|
||||
cfg.ActorEvent.EnableRealTimeFilterAPI = true
|
||||
cfg.ActorEvent.EnableHistoricFilterAPI = true
|
||||
cfg.ActorEvent.ActorEventDatabasePath = dbpath
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -882,9 +882,7 @@ func (e *EthEvent) EthGetLogs(ctx context.Context, filterSpec *api.EthFilterSpec
|
||||
}
|
||||
ces := f.TakeCollectedEvents(ctx)
|
||||
|
||||
if err := e.uninstallFilter(ctx, f); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_ = e.uninstallFilter(ctx, f)
|
||||
|
||||
return ethFilterResultFromEvents(ces)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user