Merge pull request #9646 from iand/feat/nv18-events-historic

This commit is contained in:
raulk 2022-11-16 14:38:24 +00:00 committed by GitHub
commit cd180f1325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1849 additions and 430 deletions

View File

@ -811,11 +811,6 @@ workflows:
- gofmt
- gen-check
- docs-check
- test:
name: test-itest-actor_events
suite: itest-actor_events
target: "./itests/actor_events_test.go"
- test:
name: test-itest-api
suite: itest-api
@ -916,6 +911,16 @@ workflows:
suite: itest-dup_mpool_messages
target: "./itests/dup_mpool_messages_test.go"
- test:
name: test-itest-eth_filter
suite: itest-eth_filter
target: "./itests/eth_filter_test.go"
- test:
name: test-itest-fevm_events
suite: itest-fevm_events
target: "./itests/fevm_events_test.go"
- test:
name: test-itest-fevm
suite: itest-fevm

View File

@ -181,6 +181,9 @@ type FullNode interface {
// ChainBlockstoreInfo returns some basic information about the blockstore
ChainBlockstoreInfo(context.Context) (map[string]interface{}, error) //perm:read
// ChainGetEvents returns the events under an event AMT root CID.
ChainGetEvents(context.Context, cid.Cid) ([]types.Event, error) //perm:read
// GasEstimateFeeCap estimates gas fee cap
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) //perm:read
@ -824,7 +827,7 @@ type FullNode interface {
// - logs: notify new event logs that match a criteria
// params contains additional parameters used with the log event type
// The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called.
EthSubscribe(ctx context.Context, eventTypes []string, params EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write
EthSubscribe(ctx context.Context, eventType string, params *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write
// Unsubscribe from a websocket subscription
EthUnsubscribe(ctx context.Context, id EthSubscriptionID) (bool, error) //perm:write

View File

@ -19,11 +19,17 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/builtin/v10/eam"
"github.com/filecoin-project/lotus/build"
)
var (
EthTopic1 = "topic1"
EthTopic2 = "topic2"
EthTopic3 = "topic3"
EthTopic4 = "topic4"
)
type EthUint64 uint64
func (e EthUint64) MarshalJSON() ([]byte, error) {
@ -47,6 +53,14 @@ func (e *EthUint64) UnmarshalJSON(b []byte) error {
return nil
}
func EthUint64FromHex(s string) (EthUint64, error) {
parsedInt, err := strconv.ParseUint(strings.Replace(s, "0x", "", -1), 16, 64)
if err != nil {
return EthUint64(0), err
}
return EthUint64(parsedInt), nil
}
type EthBigInt big.Int
var EthBigIntZero = EthBigInt{Int: big.Zero().Int}
@ -177,14 +191,12 @@ func (c *EthCall) UnmarshalJSON(b []byte) error {
}
type EthTxReceipt struct {
TransactionHash EthHash `json:"transactionHash"`
TransactionIndex EthUint64 `json:"transactionIndex"`
BlockHash EthHash `json:"blockHash"`
BlockNumber EthUint64 `json:"blockNumber"`
From EthAddress `json:"from"`
To *EthAddress `json:"to"`
// Logs
// LogsBloom
TransactionHash EthHash `json:"transactionHash"`
TransactionIndex EthUint64 `json:"transactionIndex"`
BlockHash EthHash `json:"blockHash"`
BlockNumber EthUint64 `json:"blockNumber"`
From EthAddress `json:"from"`
To *EthAddress `json:"to"`
StateRoot EthHash `json:"root"`
Status EthUint64 `json:"status"`
ContractAddress *EthAddress `json:"contractAddress"`
@ -192,47 +204,7 @@ type EthTxReceipt struct {
GasUsed EthUint64 `json:"gasUsed"`
EffectiveGasPrice EthBigInt `json:"effectiveGasPrice"`
LogsBloom EthBytes `json:"logsBloom"`
Logs []string `json:"logs"`
}
func NewEthTxReceipt(tx EthTx, lookup *MsgLookup, replay *InvocResult) (EthTxReceipt, error) {
receipt := EthTxReceipt{
TransactionHash: tx.Hash,
TransactionIndex: tx.TransactionIndex,
BlockHash: tx.BlockHash,
BlockNumber: tx.BlockNumber,
From: tx.From,
To: tx.To,
StateRoot: EmptyEthHash,
LogsBloom: []byte{0},
Logs: []string{},
}
if receipt.To == nil && lookup.Receipt.ExitCode.IsSuccess() {
// Create and Create2 return the same things.
var ret eam.CreateReturn
if err := ret.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
return EthTxReceipt{}, xerrors.Errorf("failed to parse contract creation result: %w", err)
}
addr := EthAddress(ret.EthAddress)
receipt.ContractAddress = &addr
}
if lookup.Receipt.ExitCode.IsSuccess() {
receipt.Status = 1
}
if lookup.Receipt.ExitCode.IsError() {
receipt.Status = 0
}
receipt.GasUsed = EthUint64(lookup.Receipt.GasUsed)
// TODO: handle CumulativeGasUsed
receipt.CumulativeGasUsed = EmptyEthInt
effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(lookup.Receipt.GasUsed))
receipt.EffectiveGasPrice = EthBigInt(effectiveGasPrice)
return receipt, nil
Logs []EthLog `json:"logs"`
}
const (
@ -484,6 +456,9 @@ type EthFilterSpec struct {
type EthAddressList []EthAddress
func (e *EthAddressList) UnmarshalJSON(b []byte) error {
if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) {
return nil
}
if len(b) > 0 && b[0] == '[' {
var addrs []EthAddress
err := json.Unmarshal(b, &addrs)
@ -542,34 +517,29 @@ func (e *EthHashList) UnmarshalJSON(b []byte) error {
return nil
}
// FilterResult represents the response from executing a filter: a list of bloack hashes, a list of transaction hashes
// FilterResult represents the response from executing a filter: a list of block hashes, a list of transaction hashes
// or a list of logs
// This is a union type. Only one field will be populated.
// The JSON encoding must produce an array of the populated field.
type EthFilterResult struct {
// List of block hashes. Only populated when the filter has been installed via EthNewBlockFilter
NewBlockHashes []EthHash
// List of transaction hashes. Only populated when the filter has been installed via EthNewPendingTransactionFilter
NewTransactionHashes []EthHash
// List of event logs. Only populated when the filter has been installed via EthNewFilter
NewLogs []EthLog
Results []interface{}
}
func (h EthFilterResult) MarshalJSON() ([]byte, error) {
if h.NewBlockHashes != nil {
return json.Marshal(h.NewBlockHashes)
}
if h.NewTransactionHashes != nil {
return json.Marshal(h.NewTransactionHashes)
}
if h.NewLogs != nil {
return json.Marshal(h.NewLogs)
if h.Results != nil {
return json.Marshal(h.Results)
}
return []byte{'[', ']'}, nil
}
func (h *EthFilterResult) UnmarshalJSON(b []byte) error {
if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) {
return nil
}
err := json.Unmarshal(b, &h.Results)
return err
}
// EthLog represents the results of an event filter execution.
type EthLog struct {
// Address is the address of the actor that produced the event log.

View File

@ -193,30 +193,33 @@ func TestEthFilterResultMarshalJSON(t *testing.T) {
{
res: EthFilterResult{
NewBlockHashes: []EthHash{hash1, hash2},
Results: []any{hash1, hash2},
},
want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`,
},
{
res: EthFilterResult{
NewTransactionHashes: []EthHash{hash1, hash2},
Results: []any{hash1, hash2},
},
want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`,
},
{
res: EthFilterResult{
NewLogs: []EthLog{log},
Results: []any{log},
},
want: `[` + string(logjson) + `]`,
},
}
for _, tc := range testcases {
data, err := json.Marshal(tc.res)
require.NoError(t, err)
require.Equal(t, tc.want, string(data))
tc := tc
t.Run("", func(t *testing.T) {
data, err := json.Marshal(tc.res)
require.NoError(t, err)
require.Equal(t, tc.want, string(data))
})
}
}
@ -325,12 +328,23 @@ func TestEthAddressListUnmarshalJSON(t *testing.T) {
input: `"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"`,
want: EthAddressList{addr1},
},
{
input: `[]`,
want: EthAddressList{},
},
{
input: `null`,
want: EthAddressList(nil),
},
}
for _, tc := range testcases {
var got EthAddressList
err := json.Unmarshal([]byte(tc.input), &got)
require.NoError(t, err)
require.Equal(t, tc.want, got)
tc := tc
t.Run("", func(t *testing.T) {
var got EthAddressList
err := json.Unmarshal([]byte(tc.input), &got)
require.NoError(t, err)
require.Equal(t, tc.want, got)
})
}
}

View File

@ -183,6 +183,21 @@ func (mr *MockFullNodeMockRecorder) ChainGetBlockMessages(arg0, arg1 interface{}
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetBlockMessages", reflect.TypeOf((*MockFullNode)(nil).ChainGetBlockMessages), arg0, arg1)
}
// ChainGetEvents mocks base method.
func (m *MockFullNode) ChainGetEvents(arg0 context.Context, arg1 cid.Cid) ([]types.Event, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainGetEvents", arg0, arg1)
ret0, _ := ret[0].([]types.Event)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainGetEvents indicates an expected call of ChainGetEvents.
func (mr *MockFullNodeMockRecorder) ChainGetEvents(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetEvents", reflect.TypeOf((*MockFullNode)(nil).ChainGetEvents), arg0, arg1)
}
// ChainGetGenesis mocks base method.
func (m *MockFullNode) ChainGetGenesis(arg0 context.Context) (*types.TipSet, error) {
m.ctrl.T.Helper()
@ -1342,7 +1357,7 @@ func (mr *MockFullNodeMockRecorder) EthSendRawTransaction(arg0, arg1 interface{}
}
// EthSubscribe mocks base method.
func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 []string, arg2 api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 string, arg2 *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EthSubscribe", arg0, arg1, arg2)
ret0, _ := ret[0].(<-chan api.EthSubscriptionResponse)

View File

@ -123,6 +123,8 @@ type FullNodeStruct struct {
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) `perm:"read"`
ChainGetEvents func(p0 context.Context, p1 cid.Cid) ([]types.Event, error) `perm:"read"`
ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) `perm:"read"`
@ -275,7 +277,7 @@ type FullNodeStruct struct {
EthSendRawTransaction func(p0 context.Context, p1 EthBytes) (EthHash, error) `perm:"read"`
EthSubscribe func(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) `perm:"write"`
EthSubscribe func(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) `perm:"write"`
EthUninstallFilter func(p0 context.Context, p1 EthFilterID) (bool, error) `perm:"write"`
@ -1330,6 +1332,17 @@ func (s *FullNodeStub) ChainGetBlockMessages(p0 context.Context, p1 cid.Cid) (*B
return nil, ErrNotSupported
}
func (s *FullNodeStruct) ChainGetEvents(p0 context.Context, p1 cid.Cid) ([]types.Event, error) {
if s.Internal.ChainGetEvents == nil {
return *new([]types.Event), ErrNotSupported
}
return s.Internal.ChainGetEvents(p0, p1)
}
func (s *FullNodeStub) ChainGetEvents(p0 context.Context, p1 cid.Cid) ([]types.Event, error) {
return *new([]types.Event), ErrNotSupported
}
func (s *FullNodeStruct) ChainGetGenesis(p0 context.Context) (*types.TipSet, error) {
if s.Internal.ChainGetGenesis == nil {
return nil, ErrNotSupported
@ -2166,14 +2179,14 @@ func (s *FullNodeStub) EthSendRawTransaction(p0 context.Context, p1 EthBytes) (E
return *new(EthHash), ErrNotSupported
}
func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) {
func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) {
if s.Internal.EthSubscribe == nil {
return nil, ErrNotSupported
}
return s.Internal.EthSubscribe(p0, p1, p2)
}
func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) {
func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) {
return nil, ErrNotSupported
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -3,14 +3,17 @@ package filter
import (
"bytes"
"context"
"math"
"sync"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
@ -18,10 +21,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type RobustAddresser interface {
LookupRobustAddress(ctx context.Context, idAddr address.Address, ts *types.TipSet) (address.Address, error)
}
const indexed uint8 = 0x01
type EventFilter struct {
@ -42,7 +41,7 @@ type EventFilter struct {
var _ Filter = (*EventFilter)(nil)
type CollectedEvent struct {
Event *types.Event
Entries []types.EventEntry
EmitterAddr address.Address // f4 address of emitter
EventIdx int // index of the event within the list of emitted events
Reverted bool
@ -104,7 +103,7 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
// event matches filter, so record it
cev := &CollectedEvent{
Event: ev,
Entries: ev.Entries,
EmitterAddr: addr,
EventIdx: evIdx,
Reverted: revert,
@ -134,6 +133,12 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}
func (f *EventFilter) setCollectedEvents(ces []*CollectedEvent) {
f.mu.Lock()
f.collected = ces
f.mu.Unlock()
}
func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
f.mu.Lock()
collected := f.collected
@ -282,24 +287,33 @@ type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
MaxFilterResults int
EventIndex *EventIndex
mu sync.Mutex // guards mutations to filters
filters map[string]*EventFilter
mu sync.Mutex // guards mutations to filters
filters map[string]*EventFilter
currentHeight abi.ChainEpoch
}
func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.filters) == 0 {
m.currentHeight = to.Height()
if len(m.filters) == 0 && m.EventIndex == nil {
return nil
}
tse := &TipSetEvents{
msgTs: from,
rctTs: to,
load: m.loadExecutedMessages,
}
if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
return err
}
}
// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
@ -313,7 +327,9 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.filters) == 0 {
m.currentHeight = to.Height()
if len(m.filters) == 0 && m.EventIndex == nil {
return nil
}
@ -323,6 +339,12 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages,
}
if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
return err
}
}
// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
@ -334,6 +356,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}
func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keys map[string][][]byte) (*EventFilter, error) {
m.mu.Lock()
currentHeight := m.currentHeight
m.mu.Unlock()
if m.EventIndex == nil && minHeight < currentHeight {
return nil, xerrors.Errorf("historic event index disabled")
}
id, err := uuid.NewRandom()
if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err)
@ -349,7 +379,17 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
maxResults: m.MaxFilterResults,
}
if m.EventIndex != nil && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.PrefillFilter(ctx, f); err != nil {
return nil, err
}
}
m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*EventFilter)
}
m.filters[id.String()] = f
m.mu.Unlock()
@ -402,19 +442,30 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
continue
}
evtArr, err := blockadt.AsArray(st, *rct.EventsRoot)
evtArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(5))
if err != nil {
return nil, xerrors.Errorf("load events amt: %w", err)
}
ems[i].evs = make([]*types.Event, evtArr.Length())
ems[i].evs = make([]*types.Event, evtArr.Len())
var evt types.Event
_ = arr.ForEach(&evt, func(i int64) error {
err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error {
if u > math.MaxInt {
return xerrors.Errorf("too many events")
}
if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil {
return err
}
cpy := evt
ems[i].evs[int(i)] = &cpy
ems[i].evs[int(u)] = &cpy //nolint:scopelint
return nil
})
if err != nil {
return nil, xerrors.Errorf("read events: %w", err)
}
}
return ems, nil

View File

@ -60,7 +60,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
noCollectedEvents := []*CollectedEvent{}
oneCollectedEvent := []*CollectedEvent{
{
Event: ev1,
Entries: ev1.Entries,
EmitterAddr: a1,
EventIdx: 0,
Reverted: false,

View File

@ -0,0 +1,399 @@
package filter
import (
"context"
"database/sql"
"errors"
"fmt"
"sort"
"strings"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
)
var pragmas = []string{
"PRAGMA synchronous = normal",
"PRAGMA temp_store = memory",
"PRAGMA mmap_size = 30000000000",
"PRAGMA page_size = 32768",
"PRAGMA auto_vacuum = NONE",
"PRAGMA automatic_index = OFF",
"PRAGMA journal_mode = WAL",
"PRAGMA read_uncommitted = ON",
}
var ddls = []string{
`CREATE TABLE IF NOT EXISTS event (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key BLOB NOT NULL,
tipset_key_cid BLOB NOT NULL,
emitter_addr BLOB NOT NULL,
event_index INTEGER NOT NULL,
message_cid BLOB NOT NULL,
message_index INTEGER NOT NULL,
reverted INTEGER NOT NULL
)`,
`CREATE TABLE IF NOT EXISTS event_entry (
event_id INTEGER,
indexed INTEGER NOT NULL,
flags BLOB NOT NULL,
key TEXT NOT NULL,
value BLOB NOT NULL
)`,
// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,
// version 1.
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
}
const schemaVersion = 1
const (
insertEvent = `INSERT OR IGNORE INTO event
(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted)
VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry
(event_id, indexed, flags, key, value)
VALUES(?, ?, ?, ?, ?)`
)
type EventIndex struct {
db *sql.DB
}
func NewEventIndex(path string) (*EventIndex, error) {
db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil {
return nil, xerrors.Errorf("open sqlite3 database: %w", err)
}
for _, pragma := range pragmas {
if _, err := db.Exec(pragma); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("exec pragma %q: %w", pragma, err)
}
}
q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if err == sql.ErrNoRows || !q.Next() {
// empty database, create the schema
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("exec ddl %q: %w", ddl, err)
}
}
} else if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("looking for _meta table: %w", err)
} else {
// Ensure we don't open a database from a different schema version
row := db.QueryRow("SELECT max(version) FROM _meta")
var version int
err := row.Scan(&version)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: no version found")
}
if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
}
}
return &EventIndex{
db: db,
}, nil
}
func (ei *EventIndex) Close() error {
if ei.db == nil {
return nil
}
return ei.db.Close()
}
func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)
ems, err := te.messages(ctx)
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}
tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
stmtEvent, err := tx.Prepare(insertEvent)
if err != nil {
return xerrors.Errorf("prepare insert event: %w", err)
}
stmtEntry, err := tx.Prepare(insertEntry)
if err != nil {
return xerrors.Errorf("prepare insert entry: %w", err)
}
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[ev.Emitter] = addr
}
tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}
res, err := stmtEvent.Exec(
te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
revert, // reverted
)
if err != nil {
return xerrors.Errorf("exec insert event: %w", err)
}
lastID, err := res.LastInsertId()
if err != nil {
return xerrors.Errorf("get last row id: %w", err)
}
for _, entry := range ev.Entries {
_, err := stmtEntry.Exec(
lastID, // event_id
entry.Flags&indexed == indexed, // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Value, // value
)
if err != nil {
return xerrors.Errorf("exec insert entry: %w", err)
}
}
}
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
return nil
}
// PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
clauses := []string{}
values := []any{}
joins := []string{}
if f.tipsetCid != cid.Undef {
clauses = append(clauses, "event.tipset_key_cid=?")
values = append(values, f.tipsetCid.Bytes())
} else {
if f.minHeight >= 0 {
clauses = append(clauses, "event.height>=?")
values = append(values, f.minHeight)
}
if f.maxHeight >= 0 {
clauses = append(clauses, "event.height<=?")
values = append(values, f.maxHeight)
}
}
if len(f.addresses) > 0 {
subclauses := []string{}
for _, addr := range f.addresses {
subclauses = append(subclauses, "emitter_addr=?")
values = append(values, addr.Bytes())
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
if len(f.keys) > 0 {
join := 0
for key, vals := range f.keys {
join++
joinAlias := fmt.Sprintf("ee%d", join)
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
values = append(values, key)
subclauses := []string{}
for _, val := range vals {
subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias))
values = append(values, val)
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
}
s := `SELECT
event.id,
event.height,
event.tipset_key,
event.tipset_key_cid,
event.emitter_addr,
event.event_index,
event.message_cid,
event.message_index,
event.reverted,
event_entry.flags,
event_entry.key,
event_entry.value
FROM event JOIN event_entry ON event.id=event_entry.event_id`
if len(joins) > 0 {
s = s + ", " + strings.Join(joins, ", ")
}
if len(clauses) > 0 {
s = s + " WHERE " + strings.Join(clauses, " AND ")
}
s += " ORDER BY event.height DESC"
stmt, err := ei.db.Prepare(s)
if err != nil {
return xerrors.Errorf("prepare prefill query: %w", err)
}
q, err := stmt.QueryContext(ctx, values...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return xerrors.Errorf("exec prefill query: %w", err)
}
var ces []*CollectedEvent
var currentID int64 = -1
var ce *CollectedEvent
for q.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var row struct {
id int64
height uint64
tipsetKey []byte
tipsetKeyCid []byte
emitterAddr []byte
eventIndex int
messageCid []byte
messageIndex int
reverted bool
flags []byte
key string
value []byte
}
if err := q.Scan(
&row.id,
&row.height,
&row.tipsetKey,
&row.tipsetKeyCid,
&row.emitterAddr,
&row.eventIndex,
&row.messageCid,
&row.messageIndex,
&row.reverted,
&row.flags,
&row.key,
&row.value,
); err != nil {
return xerrors.Errorf("read prefill row: %w", err)
}
if row.id != currentID {
if ce != nil {
ces = append(ces, ce)
ce = nil
// Unfortunately we can't easily incorporate the max results limit into the query due to the
// unpredictable number of rows caused by joins
// Break here to stop collecting rows
if f.maxResults > 0 && len(ces) >= f.maxResults {
break
}
}
currentID = row.id
ce = &CollectedEvent{
EventIdx: row.eventIndex,
Reverted: row.reverted,
Height: abi.ChainEpoch(row.height),
MsgIdx: row.messageIndex,
}
ce.EmitterAddr, err = address.NewFromBytes(row.emitterAddr)
if err != nil {
return xerrors.Errorf("parse emitter addr: %w", err)
}
ce.TipSetKey, err = types.TipSetKeyFromBytes(row.tipsetKey)
if err != nil {
return xerrors.Errorf("parse tipsetkey: %w", err)
}
ce.MsgCid, err = cid.Cast(row.messageCid)
if err != nil {
return xerrors.Errorf("parse message cid: %w", err)
}
}
ce.Entries = append(ce.Entries, types.EventEntry{
Flags: row.flags[0],
Key: row.key,
Value: row.value,
})
}
if ce != nil {
ces = append(ces, ce)
}
if len(ces) == 0 {
return nil
}
// collected event list is in inverted order since we selected only the most recent events
// sort it into height order
sort.Slice(ces, func(i, j int) bool { return ces[i].Height < ces[j].Height })
f.setCollectedEvents(ces)
return nil
}

View File

@ -0,0 +1,283 @@
package filter
import (
"context"
pseudo "math/rand"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
)
func TestEventIndexPrefillFilter(t *testing.T) {
rng := pseudo.New(pseudo.NewSource(299792458))
a1 := randomF4Addr(t, rng)
a2 := randomF4Addr(t, rng)
a1ID := abi.ActorID(1)
a2ID := abi.ActorID(2)
addrMap := addressMap{}
addrMap.add(a1ID, a1)
addrMap.add(a2ID, a2)
ev1 := fakeEvent(
a1ID,
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr1")},
},
[]kv{
{k: "amount", v: []byte("2988181")},
},
)
st := newStore()
events := []*types.Event{ev1}
em := executedMessage{
msg: fakeMessage(randomF4Addr(t, rng), randomF4Addr(t, rng)),
rct: fakeReceipt(t, rng, st, events),
evs: events,
}
events14000 := buildTipSetEvents(t, rng, 14000, em)
cid14000, err := events14000.msgTs.Key().Cid()
require.NoError(t, err, "tipset cid")
noCollectedEvents := []*CollectedEvent{}
oneCollectedEvent := []*CollectedEvent{
{
Entries: ev1.Entries,
EmitterAddr: a1,
EventIdx: 0,
Reverted: false,
Height: 14000,
TipSetKey: events14000.msgTs.Key(),
MsgIdx: 0,
MsgCid: em.msg.Cid(),
},
}
workDir, err := os.MkdirTemp("", "lotusevents")
require.NoError(t, err, "create temporary work directory")
defer func() {
_ = os.RemoveAll(workDir)
}()
t.Logf("using work dir %q", workDir)
dbPath := filepath.Join(workDir, "actorevents.db")
ei, err := NewEventIndex(dbPath)
require.NoError(t, err, "create event index")
if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events")
}
testCases := []struct {
name string
filter *EventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
minHeight: 14001,
maxHeight: -1,
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
minHeight: -1,
maxHeight: 13999,
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match tipset min height",
filter: &EventFilter{
minHeight: 14000,
maxHeight: -1,
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match tipset cid",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch address",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match address",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match one entry",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("cancel"),
[]byte("propose"),
[]byte("approval"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("cancel"),
[]byte("propose"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"method": {
[]byte("approval"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"signer": {
[]byte("addr1"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"approver": {
[]byte("addr1"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"signer": {
[]byte("addr2"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"amount": {
[]byte("2988181"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
}
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.PrefillFilter(context.Background(), tc.filter); err != nil {
require.NoError(t, err, "prefill filter events")
}
coll := tc.filter.TakeCollectedEvents(context.Background())
require.ElementsMatch(t, coll, tc.want)
})
}
}

View File

@ -124,6 +124,9 @@ func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, err
}
m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*MemPoolFilter)
}
m.filters[id.String()] = f
m.mu.Unlock()

View File

@ -111,6 +111,9 @@ func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error
}
m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*TipSetFilter)
}
m.filters[id.String()] = f
m.mu.Unlock()

View File

@ -15,6 +15,7 @@
* [ChainExport](#ChainExport)
* [ChainGetBlock](#ChainGetBlock)
* [ChainGetBlockMessages](#ChainGetBlockMessages)
* [ChainGetEvents](#ChainGetEvents)
* [ChainGetGenesis](#ChainGetGenesis)
* [ChainGetMessage](#ChainGetMessage)
* [ChainGetMessagesInTipset](#ChainGetMessagesInTipset)
@ -612,6 +613,37 @@ Response:
}
```
### ChainGetEvents
ChainGetEvents returns the events under an event AMT root CID.
Perms: read
Inputs:
```json
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
]
```
Response:
```json
[
{
"Emitter": 1000,
"Entries": [
{
"Flags": 7,
"Key": "string value",
"Value": "Ynl0ZSBhcnJheQ=="
}
]
}
]
```
### ChainGetGenesis
ChainGetGenesis returns the genesis tipset.
@ -2485,7 +2517,7 @@ Inputs:
Response:
```json
[
"0x0707070707070707070707070707070707070707070707070707070707070707"
{}
]
```
@ -2506,7 +2538,7 @@ Inputs:
Response:
```json
[
"0x0707070707070707070707070707070707070707070707070707070707070707"
{}
]
```
@ -2532,7 +2564,7 @@ Inputs:
Response:
```json
[
"0x0707070707070707070707070707070707070707070707070707070707070707"
{}
]
```
@ -2703,7 +2735,21 @@ Response:
"effectiveGasPrice": "0x0",
"logsBloom": "0x07",
"logs": [
"string value"
{
"address": "0x0707070707070707070707070707070707070707",
"data": [
"0x0707070707070707070707070707070707070707070707070707070707070707"
],
"topics": [
"0x0707070707070707070707070707070707070707070707070707070707070707"
],
"removed": true,
"logIndex": "0x5",
"transactionIndex": "0x5",
"transactionHash": "0x0707070707070707070707070707070707070707070707070707070707070707",
"blockHash": "0x0707070707070707070707070707070707070707070707070707070707070707",
"blockNumber": "0x5"
}
]
}
```
@ -2796,9 +2842,7 @@ Perms: write
Inputs:
```json
[
[
"string value"
],
"string value",
{
"topics": [
[

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit 14151f5e623b37c9d47aee5192af6b3eeeae4a35
Subproject commit 39bed0d7a477eae618d310a476233eafe3e6b571

1
go.mod
View File

@ -117,6 +117,7 @@ require (
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-maddr-filter v0.1.0
github.com/mattn/go-isatty v0.0.16
github.com/mattn/go-sqlite3 v1.14.16
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-base32 v0.0.4

2
go.sum
View File

@ -1317,6 +1317,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-xmlrpc v0.0.3/go.mod h1:mqc2dz7tP5x5BKlCahN/n+hs7OSZKJkS9JsHNBRlrxA=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=

View File

@ -1,176 +0,0 @@
// stm: #integration
package itests
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestActorEventsMpool(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(t, err)
// install filter
filterID, err := client.EthNewPendingTransactionFilter(ctx)
require.NoError(t, err)
const iterations = 100
// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(t, err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(t, err)
count += len(msgs)
if count == iterations {
waitAllCh <- struct{}{}
}
}
}
}
}
}()
var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}
sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(t, err)
// expect to have seen iteration number of mpool messages
require.Equal(t, iterations, len(res.NewTransactionHashes))
}
func TestActorEventsTipsets(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(t, err)
// install filter
filterID, err := client.EthNewBlockFilter(ctx)
require.NoError(t, err)
const iterations = 100
// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(t, err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(t, err)
count += len(msgs)
if count == iterations {
waitAllCh <- struct{}{}
}
}
}
}
}
}()
var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}
sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(t, err)
// expect to have seen iteration number of tipsets
require.Equal(t, iterations, len(res.NewBlockHashes))
}

594
itests/eth_filter_test.go Normal file
View File

@ -0,0 +1,594 @@
// stm: #integration
package itests
import (
"context"
"encoding/hex"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestEthNewPendingTransactionFilter(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(t, err)
// install filter
filterID, err := client.EthNewPendingTransactionFilter(ctx)
require.NoError(t, err)
const iterations = 100
// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(t, err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(t, err)
count += len(msgs)
if count == iterations {
waitAllCh <- struct{}{}
}
}
}
}
}
}()
var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}
sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(t, err)
// expect to have seen iteration number of mpool messages
require.Equal(t, iterations, len(res.Results))
}
func TestEthNewBlockFilter(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(t, err)
// install filter
filterID, err := client.EthNewBlockFilter(ctx)
require.NoError(t, err)
const iterations = 30
// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(t, err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply || change.Type == store.HCRevert {
count++
if count == iterations {
waitAllCh <- struct{}{}
}
}
}
}
}
}()
var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}
sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(t, err)
// expect to have seen iteration number of tipsets
require.Equal(t, iterations, len(res.Results))
}
func TestEthNewFilterCatchAll(t *testing.T) {
require := require.New(t)
kit.QuietMiningLogs()
blockTime := 100 * time.Millisecond
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// install contract
contractHex, err := os.ReadFile("contracts/events.bin")
require.NoError(err)
contract, err := hex.DecodeString(string(contractHex))
require.NoError(err)
fromAddr, err := client.WalletDefaultAddress(ctx)
require.NoError(err)
result := client.EVM().DeployContract(ctx, fromAddr, contract)
idAddr, err := address.NewIDAddress(result.ActorID)
require.NoError(err)
t.Logf("actor ID address is %s", idAddr)
// install filter
filterID, err := client.EthNewFilter(ctx, &api.EthFilterSpec{})
require.NoError(err)
const iterations = 10
type msgInTipset struct {
msg api.Message
ts *types.TipSet
}
msgChan := make(chan msgInTipset, iterations)
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply || change.Type == store.HCRevert {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(err)
count += len(msgs)
for _, m := range msgs {
select {
case msgChan <- msgInTipset{msg: m, ts: change.Val}:
default:
}
}
if count == iterations {
close(msgChan)
close(waitAllCh)
return
}
}
}
}
}
}()
time.Sleep(blockTime * 6)
for i := 0; i < iterations; i++ {
// log a four topic event with data
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
received := make(map[api.EthHash]msgInTipset)
for m := range msgChan {
eh, err := api.NewEthHashFromCid(m.msg.Cid)
require.NoError(err)
received[eh] = m
}
require.Equal(iterations, len(received), "all messages on chain")
ts, err := client.ChainHead(ctx)
require.NoError(err)
actor, err := client.StateGetActor(ctx, idAddr, ts.Key())
require.NoError(err)
require.NotNil(actor.Address)
ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address)
require.NoError(err)
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(err)
// expect to have seen iteration number of events
require.Equal(iterations, len(res.Results))
topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11})
topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22})
topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33})
topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44})
data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})
for _, r := range res.Results {
// since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results
rc, ok := r.(map[string]interface{})
require.True(ok, "result type")
elog, err := ParseEthLog(rc)
require.NoError(err)
require.Equal(ethContractAddr, elog.Address, "event address")
require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset
msg, exists := received[elog.TransactionHash]
require.True(exists, "message seen on chain")
tsCid, err := msg.ts.Key().Cid()
require.NoError(err)
tsCidHash, err := api.NewEthHashFromCid(tsCid)
require.NoError(err)
require.Equal(tsCidHash, elog.BlockHash, "block hash")
require.Equal(4, len(elog.Topics), "number of topics")
require.Equal(topic1Hash, elog.Topics[0], "topic1")
require.Equal(topic2Hash, elog.Topics[1], "topic2")
require.Equal(topic3Hash, elog.Topics[2], "topic3")
require.Equal(topic4Hash, elog.Topics[3], "topic4")
require.Equal(1, len(elog.Data), "number of data")
require.Equal(data1Hash, elog.Data[0], "data1")
}
}
func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) {
el := &api.EthLog{}
ethHash := func(k string, v interface{}) (api.EthHash, error) {
s, ok := v.(string)
if !ok {
return api.EthHash{}, xerrors.Errorf(k + " not a string")
}
return api.EthHashFromHex(s)
}
ethUint64 := func(k string, v interface{}) (api.EthUint64, error) {
s, ok := v.(string)
if !ok {
return 0, xerrors.Errorf(k + " not a string")
}
parsedInt, err := strconv.ParseUint(strings.Replace(s, "0x", "", -1), 16, 64)
if err != nil {
return 0, err
}
return api.EthUint64(parsedInt), nil
}
var err error
for k, v := range in {
switch k {
case "removed":
b, ok := v.(bool)
if ok {
el.Removed = b
continue
}
s, ok := v.(string)
if !ok {
return nil, xerrors.Errorf(k + " not a string")
}
el.Removed, err = strconv.ParseBool(s)
if err != nil {
return nil, err
}
case "address":
s, ok := v.(string)
if !ok {
return nil, xerrors.Errorf(k + " not a string")
}
el.Address, err = api.EthAddressFromHex(s)
if err != nil {
return nil, err
}
case "logIndex":
el.LogIndex, err = ethUint64(k, v)
if err != nil {
return nil, err
}
case "transactionIndex":
el.TransactionIndex, err = ethUint64(k, v)
if err != nil {
return nil, err
}
case "blockNumber":
el.BlockNumber, err = ethUint64(k, v)
if err != nil {
return nil, err
}
case "transactionHash":
el.TransactionHash, err = ethHash(k, v)
if err != nil {
return nil, err
}
case "blockHash":
el.BlockHash, err = ethHash(k, v)
if err != nil {
return nil, err
}
case "data":
sl, ok := v.([]interface{})
if !ok {
return nil, xerrors.Errorf(k + " not a slice")
}
for _, s := range sl {
data, err := ethHash(k, s)
if err != nil {
return nil, err
}
el.Data = append(el.Data, data)
}
case "topics":
sl, ok := v.([]interface{})
if !ok {
return nil, xerrors.Errorf(k + " not a slice")
}
for _, s := range sl {
topic, err := ethHash(k, s)
if err != nil {
return nil, err
}
el.Topics = append(el.Topics, topic)
}
}
}
return el, err
}
func TestEthGetLogsAll(t *testing.T) {
require := require.New(t)
kit.QuietMiningLogs()
blockTime := 100 * time.Millisecond
dbpath := filepath.Join(t.TempDir(), "actorevents.db")
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// install contract
contractHex, err := os.ReadFile("contracts/events.bin")
require.NoError(err)
contract, err := hex.DecodeString(string(contractHex))
require.NoError(err)
fromAddr, err := client.WalletDefaultAddress(ctx)
require.NoError(err)
result := client.EVM().DeployContract(ctx, fromAddr, contract)
idAddr, err := address.NewIDAddress(result.ActorID)
require.NoError(err)
t.Logf("actor ID address is %s", idAddr)
const iterations = 10
type msgInTipset struct {
msg api.Message
ts *types.TipSet
}
msgChan := make(chan msgInTipset, iterations)
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply || change.Type == store.HCRevert {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(err)
count += len(msgs)
for _, m := range msgs {
select {
case msgChan <- msgInTipset{msg: m, ts: change.Val}:
default:
}
}
if count == iterations {
close(msgChan)
close(waitAllCh)
return
}
}
}
}
}
}()
time.Sleep(blockTime * 6)
for i := 0; i < iterations; i++ {
// log a four topic event with data
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
received := make(map[api.EthHash]msgInTipset)
for m := range msgChan {
eh, err := api.NewEthHashFromCid(m.msg.Cid)
require.NoError(err)
received[eh] = m
}
require.Equal(iterations, len(received), "all messages on chain")
head, err := client.ChainHead(ctx)
require.NoError(err)
actor, err := client.StateGetActor(ctx, idAddr, head.Key())
require.NoError(err)
require.NotNil(actor.Address)
ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address)
require.NoError(err)
topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11})
topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22})
topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33})
topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44})
data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})
pstring := func(s string) *string { return &s }
// get logs
res, err := client.EthGetLogs(ctx, &api.EthFilterSpec{
FromBlock: pstring("0x0"),
})
require.NoError(err)
// expect to have all messages sent
require.Equal(len(received), len(res.Results))
for _, r := range res.Results {
// since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results
rc, ok := r.(map[string]interface{})
require.True(ok, "result type")
elog, err := ParseEthLog(rc)
require.NoError(err)
require.Equal(ethContractAddr, elog.Address, "event address")
require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset
msg, exists := received[elog.TransactionHash]
require.True(exists, "message seen on chain")
tsCid, err := msg.ts.Key().Cid()
require.NoError(err)
tsCidHash, err := api.NewEthHashFromCid(tsCid)
require.NoError(err)
require.Equal(tsCidHash, elog.BlockHash, "block hash")
require.Equal(4, len(elog.Topics), "number of topics")
require.Equal(topic1Hash, elog.Topics[0], "topic1")
require.Equal(topic2Hash, elog.Topics[1], "topic2")
require.Equal(topic3Hash, elog.Topics[2], "topic3")
require.Equal(topic4Hash, elog.Topics[3], "topic4")
require.Equal(1, len(elog.Data), "number of data")
require.Equal(data1Hash, elog.Data[0], "data1")
}
}

View File

@ -280,3 +280,19 @@ func SplitstoreMessges() NodeOpt {
return nil
})
}
func RealTimeFilterAPI() NodeOpt {
return WithCfgOpt(func(cfg *config.FullNode) error {
cfg.ActorEvent.EnableRealTimeFilterAPI = true
return nil
})
}
func HistoricFilterAPI(dbpath string) NodeOpt {
return WithCfgOpt(func(cfg *config.FullNode) error {
cfg.ActorEvent.EnableRealTimeFilterAPI = true
cfg.ActorEvent.EnableHistoricFilterAPI = true
cfg.ActorEvent.ActorEventDatabasePath = dbpath
return nil
})
}

View File

@ -69,6 +69,14 @@ this time become eligible for automatic deletion.`,
Comment: `MaxFilterHeightRange specifies the maximum range of heights that can be used in a filter (to avoid querying
the entire chain)`,
},
{
Name: "ActorEventDatabasePath",
Type: "string",
Comment: `EventHistoryDatabasePath is the full path to a sqlite database that will be used to index actor events to
support the historic filter APIs. If the database does not exist it will be created. The directory containing
the database must already exist and be writeable.`,
},
},
"Backup": []DocField{
{

View File

@ -624,6 +624,11 @@ type ActorEventConfig struct {
// the entire chain)
MaxFilterHeightRange uint64
// EventHistoryDatabasePath is the full path to a sqlite database that will be used to index actor events to
// support the historic filter APIs. If the database does not exist it will be created. The directory containing
// the database must already exist and be writeable.
ActorEventDatabasePath string
// Others, not implemented yet:
// Set a limit on the number of active websocket subscriptions (may be zero)
// Set a timeout for subscription clients

View File

@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"io"
"math"
"strconv"
"strings"
"sync"
@ -24,6 +25,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/util/adt"
@ -654,6 +656,33 @@ func (a *ChainAPI) ChainBlockstoreInfo(ctx context.Context) (map[string]interfac
return info.Info(), nil
}
// ChainGetEvents returns the events under an event AMT root CID.
//
// TODO (raulk) make copies of this logic elsewhere use this (e.g. itests, CLI, events filter).
func (a *ChainAPI) ChainGetEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) {
store := cbor.NewCborStore(a.ExposedBlockstore)
evtArr, err := amt4.LoadAMT(ctx, store, root, amt4.UseTreeBitWidth(5))
if err != nil {
return nil, xerrors.Errorf("load events amt: %w", err)
}
ret := make([]types.Event, 0, evtArr.Len())
var evt types.Event
err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error {
if u > math.MaxInt {
return xerrors.Errorf("too many events")
}
if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil {
return err
}
ret = append(ret, evt)
return nil
})
return ret, err
}
func (a *ChainAPI) ChainPrune(ctx context.Context, opts api.PruneOpts) error {
pruner, ok := a.BaseBlockstore.(interface {
PruneChain(opts api.PruneOpts) error

View File

@ -59,7 +59,6 @@ type EthModuleAPI interface {
EthCall(ctx context.Context, tx api.EthCall, blkParam string) (api.EthBytes, error)
EthMaxPriorityFeePerGas(ctx context.Context) (api.EthBigInt, error)
EthSendRawTransaction(ctx context.Context, rawTx api.EthBytes) (api.EthHash, error)
// EthFeeHistory(ctx context.Context, blkCount string)
}
type EthEventAPI interface {
@ -70,7 +69,7 @@ type EthEventAPI interface {
EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error)
EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error)
EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error)
}
@ -97,12 +96,13 @@ type EthModule struct {
var _ EthModuleAPI = (*EthModule)(nil)
type EthEvent struct {
EthModuleAPI
Chain *store.ChainStore
EventFilterManager *filter.EventFilterManager
TipSetFilterManager *filter.TipSetFilterManager
MemPoolFilterManager *filter.MemPoolFilterManager
FilterStore filter.FilterStore
SubManager ethSubscriptionManager
SubManager *EthSubscriptionManager
MaxFilterHeightRange abi.ChainEpoch
}
@ -236,10 +236,19 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash api.Eth
return nil, nil
}
receipt, err := api.NewEthTxReceipt(tx, msgLookup, replay)
var events []types.Event
if rct := replay.MsgRct; rct != nil && rct.EventsRoot != nil {
events, err = a.ChainAPI.ChainGetEvents(ctx, *rct.EventsRoot)
if err != nil {
return nil, nil
}
}
receipt, err := a.newEthTxReceipt(ctx, tx, msgLookup, replay, events)
if err != nil {
return nil, nil
}
return &receipt, nil
}
@ -870,9 +879,96 @@ func (a *EthModule) newEthTxFromFilecoinMessageLookup(ctx context.Context, msgLo
return tx, nil
}
func (e *EthEvent) EthGetLogs(ctx context.Context, filter *api.EthFilterSpec) (*api.EthFilterResult, error) {
// TODO: implement EthGetLogs
return nil, api.ErrNotSupported
func (a *EthModule) newEthTxReceipt(ctx context.Context, tx api.EthTx, lookup *api.MsgLookup, replay *api.InvocResult, events []types.Event) (api.EthTxReceipt, error) {
receipt := api.EthTxReceipt{
TransactionHash: tx.Hash,
TransactionIndex: tx.TransactionIndex,
BlockHash: tx.BlockHash,
BlockNumber: tx.BlockNumber,
From: tx.From,
To: tx.To,
StateRoot: api.EmptyEthHash,
LogsBloom: []byte{0},
}
if receipt.To == nil && lookup.Receipt.ExitCode.IsSuccess() {
// Create and Create2 return the same things.
var ret eam.CreateReturn
if err := ret.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
return api.EthTxReceipt{}, xerrors.Errorf("failed to parse contract creation result: %w", err)
}
addr := api.EthAddress(ret.EthAddress)
receipt.ContractAddress = &addr
}
if lookup.Receipt.ExitCode.IsSuccess() {
receipt.Status = 1
}
if lookup.Receipt.ExitCode.IsError() {
receipt.Status = 0
}
if len(events) > 0 {
receipt.Logs = make([]api.EthLog, 0, len(events))
for i, evt := range events {
l := api.EthLog{
Removed: false,
LogIndex: api.EthUint64(i),
TransactionIndex: tx.TransactionIndex,
TransactionHash: tx.Hash,
BlockHash: tx.BlockHash,
BlockNumber: tx.BlockNumber,
}
for _, entry := range evt.Entries {
hash := api.EthHashData(entry.Value)
if entry.Key == api.EthTopic1 || entry.Key == api.EthTopic2 || entry.Key == api.EthTopic3 || entry.Key == api.EthTopic4 {
l.Topics = append(l.Topics, hash)
} else {
l.Data = append(l.Data, hash)
}
}
addr, err := address.NewIDAddress(uint64(evt.Emitter))
if err != nil {
return api.EthTxReceipt{}, xerrors.Errorf("failed to create ID address: %w", err)
}
l.Address, err = a.lookupEthAddress(ctx, addr)
if err != nil {
return api.EthTxReceipt{}, xerrors.Errorf("failed to resolve Ethereum address: %w", err)
}
receipt.Logs = append(receipt.Logs, l)
}
}
receipt.GasUsed = api.EthUint64(lookup.Receipt.GasUsed)
// TODO: handle CumulativeGasUsed
receipt.CumulativeGasUsed = api.EmptyEthInt
effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(lookup.Receipt.GasUsed))
receipt.EffectiveGasPrice = api.EthBigInt(effectiveGasPrice)
return receipt, nil
}
func (e *EthEvent) EthGetLogs(ctx context.Context, filterSpec *api.EthFilterSpec) (*api.EthFilterResult, error) {
if e.EventFilterManager == nil {
return nil, api.ErrNotSupported
}
// Create a temporary filter
f, err := e.installEthFilterSpec(ctx, filterSpec)
if err != nil {
return nil, err
}
ces := f.TakeCollectedEvents(ctx)
_ = e.uninstallFilter(ctx, f)
return ethFilterResultFromEvents(ces)
}
func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) {
@ -915,11 +1011,7 @@ func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*a
return nil, xerrors.Errorf("wrong filter type")
}
func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) (api.EthFilterID, error) {
if e.FilterStore == nil || e.EventFilterManager == nil {
return "", api.ErrNotSupported
}
func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *api.EthFilterSpec) (*filter.EventFilter, error) {
var (
minHeight abi.ChainEpoch
maxHeight abi.ChainEpoch
@ -928,39 +1020,39 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec)
keys = map[string][][]byte{}
)
if filter.BlockHash != nil {
if filter.FromBlock != nil || filter.ToBlock != nil {
return "", xerrors.Errorf("must not specify block hash and from/to block")
if filterSpec.BlockHash != nil {
if filterSpec.FromBlock != nil || filterSpec.ToBlock != nil {
return nil, xerrors.Errorf("must not specify block hash and from/to block")
}
// TODO: derive a tipset hash from eth hash - might need to push this down into the EventFilterManager
} else {
if filter.FromBlock == nil || *filter.FromBlock == "latest" {
if filterSpec.FromBlock == nil || *filterSpec.FromBlock == "latest" {
ts := e.Chain.GetHeaviestTipSet()
minHeight = ts.Height()
} else if *filter.FromBlock == "earliest" {
} else if *filterSpec.FromBlock == "earliest" {
minHeight = 0
} else if *filter.FromBlock == "pending" {
return "", api.ErrNotSupported
} else if *filterSpec.FromBlock == "pending" {
return nil, api.ErrNotSupported
} else {
epoch, err := strconv.ParseUint(*filter.FromBlock, 10, 64)
epoch, err := api.EthUint64FromHex(*filterSpec.FromBlock)
if err != nil {
return "", xerrors.Errorf("invalid epoch")
return nil, xerrors.Errorf("invalid epoch")
}
minHeight = abi.ChainEpoch(epoch)
}
if filter.ToBlock == nil || *filter.ToBlock == "latest" {
if filterSpec.ToBlock == nil || *filterSpec.ToBlock == "latest" {
// here latest means the latest at the time
maxHeight = -1
} else if *filter.ToBlock == "earliest" {
} else if *filterSpec.ToBlock == "earliest" {
maxHeight = 0
} else if *filter.ToBlock == "pending" {
return "", api.ErrNotSupported
} else if *filterSpec.ToBlock == "pending" {
return nil, api.ErrNotSupported
} else {
epoch, err := strconv.ParseUint(*filter.ToBlock, 10, 64)
epoch, err := api.EthUint64FromHex(*filterSpec.FromBlock)
if err != nil {
return "", xerrors.Errorf("invalid epoch")
return nil, xerrors.Errorf("invalid epoch")
}
maxHeight = abi.ChainEpoch(epoch)
}
@ -970,33 +1062,33 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec)
// Here the client is looking for events between the head and some future height
ts := e.Chain.GetHeaviestTipSet()
if maxHeight-ts.Height() > e.MaxFilterHeightRange {
return "", xerrors.Errorf("invalid epoch range")
return nil, xerrors.Errorf("invalid epoch range")
}
} else if minHeight >= 0 && maxHeight == -1 {
// Here the client is looking for events between some time in the past and the current head
ts := e.Chain.GetHeaviestTipSet()
if ts.Height()-minHeight > e.MaxFilterHeightRange {
return "", xerrors.Errorf("invalid epoch range")
return nil, xerrors.Errorf("invalid epoch range")
}
} else if minHeight >= 0 && maxHeight >= 0 {
if minHeight > maxHeight || maxHeight-minHeight > e.MaxFilterHeightRange {
return "", xerrors.Errorf("invalid epoch range")
return nil, xerrors.Errorf("invalid epoch range")
}
}
}
// Convert all addresses to filecoin f4 addresses
for _, ea := range filter.Address {
for _, ea := range filterSpec.Address {
a, err := ea.ToFilecoinAddress()
if err != nil {
return "", xerrors.Errorf("invalid address %x", ea)
return nil, xerrors.Errorf("invalid address %x", ea)
}
addresses = append(addresses, a)
}
for idx, vals := range filter.Topics {
for idx, vals := range filterSpec.Topics {
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
keyvals := make([][]byte, len(vals))
@ -1006,7 +1098,15 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec)
keys[key] = keyvals
}
f, err := e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys)
return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys)
}
func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *api.EthFilterSpec) (api.EthFilterID, error) {
if e.FilterStore == nil || e.EventFilterManager == nil {
return "", api.ErrNotSupported
}
f, err := e.installEthFilterSpec(ctx, filterSpec)
if err != nil {
return "", err
}
@ -1119,39 +1219,31 @@ const (
EthSubscribeEventTypeLogs = "logs"
)
func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
if e.SubManager == nil {
return nil, api.ErrNotSupported
}
// Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the
// method to be "eth_subscription". This probably doesn't matter in practice.
// Validate event types and parameters first
for _, et := range eventTypes {
switch et {
case EthSubscribeEventTypeHeads:
case EthSubscribeEventTypeLogs:
default:
return nil, xerrors.Errorf("unsupported event type: %s", et)
}
}
sub, err := e.SubManager.StartSubscription(ctx)
if err != nil {
return nil, err
}
for _, et := range eventTypes {
switch et {
case EthSubscribeEventTypeHeads:
f, err := e.TipSetFilterManager.Install(ctx)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
return nil, err
}
sub.addFilter(ctx, f)
switch eventType {
case EthSubscribeEventTypeHeads:
f, err := e.TipSetFilterManager.Install(ctx)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
return nil, err
}
sub.addFilter(ctx, f)
case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{}
case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{}
if params != nil {
for idx, vals := range params.Topics {
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
@ -1161,21 +1253,27 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params
}
keys[key] = keyvals
}
f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
return nil, err
}
sub.addFilter(ctx, f)
}
f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
return nil, err
}
sub.addFilter(ctx, f)
default:
return nil, xerrors.Errorf("unsupported event type: %s", eventType)
}
return sub.out, nil
}
func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) {
if e.SubManager == nil {
return false, api.ErrNotSupported
}
filters, err := e.SubManager.StopSubscription(ctx, string(id))
if err != nil {
return false, nil
@ -1227,16 +1325,8 @@ type filterTipSetCollector interface {
TakeCollectedTipSets(context.Context) []types.TipSetKey
}
var (
ethTopic1 = "topic1"
ethTopic2 = "topic2"
ethTopic3 = "topic3"
ethTopic4 = "topic4"
)
func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResult, error) {
res := &api.EthFilterResult{}
for _, ev := range evs {
log := api.EthLog{
Removed: ev.Reverted,
@ -1247,9 +1337,9 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu
var err error
for _, entry := range ev.Event.Entries {
for _, entry := range ev.Entries {
hash := api.EthHashData(entry.Value)
if entry.Key == ethTopic1 || entry.Key == ethTopic2 || entry.Key == ethTopic3 || entry.Key == ethTopic4 {
if entry.Key == api.EthTopic1 || entry.Key == api.EthTopic2 || entry.Key == api.EthTopic3 || entry.Key == api.EthTopic4 {
log.Topics = append(log.Topics, hash)
} else {
log.Data = append(log.Data, hash)
@ -1275,7 +1365,7 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu
return nil, err
}
res.NewLogs = append(res.NewLogs, log)
res.Results = append(res.Results, log)
}
return res, nil
@ -1294,7 +1384,7 @@ func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*api.EthFilterResult, e
return nil, err
}
res.NewBlockHashes = append(res.NewBlockHashes, hash)
res.Results = append(res.Results, hash)
}
return res, nil
@ -1309,18 +1399,19 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) {
return nil, err
}
res.NewTransactionHashes = append(res.NewTransactionHashes, hash)
res.Results = append(res.Results, hash)
}
return res, nil
}
type ethSubscriptionManager struct {
type EthSubscriptionManager struct {
EthModuleAPI
mu sync.Mutex
subs map[string]*ethSubscription
}
func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) {
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) {
id, err := uuid.NewRandom()
if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err)
@ -1329,10 +1420,11 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
ctx, quit := context.WithCancel(ctx)
sub := &ethSubscription{
id: id.String(),
in: make(chan interface{}, 200),
out: make(chan api.EthSubscriptionResponse),
quit: quit,
EthModuleAPI: e.EthModuleAPI,
id: id.String(),
in: make(chan interface{}, 200),
out: make(chan api.EthSubscriptionResponse),
quit: quit,
}
e.mu.Lock()
@ -1347,7 +1439,7 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
return sub, nil
}
func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) {
func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) {
e.mu.Lock()
defer e.mu.Unlock()
@ -1362,6 +1454,7 @@ func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string
}
type ethSubscription struct {
EthModuleAPI
id string
in chan interface{}
out chan api.EthSubscriptionResponse
@ -1394,7 +1487,24 @@ func (e *ethSubscription) start(ctx context.Context) {
case *filter.CollectedEvent:
resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt})
case *types.TipSet:
resp.Result = vt
// Sadly convoluted since the logic for conversion to eth block is long and buried away
// in unexported methods of EthModule
tsCid, err := vt.Key().Cid()
if err != nil {
break
}
hash, err := api.NewEthHashFromCid(tsCid)
if err != nil {
break
}
eb, err := e.EthGetBlockByHash(ctx, hash, true)
if err != nil {
break
}
resp.Result = eb
default:
log.Warnf("unexpected subscription value type: %T", vt)
}

View File

@ -31,18 +31,23 @@ type EventAPI struct {
var _ events.EventAPI = &EventAPI{}
func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) {
func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.EthModuleAPI) (*full.EthEvent, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, em full.EthModuleAPI) (*full.EthEvent, error) {
ee := &full.EthEvent{
EthModuleAPI: em,
Chain: cs,
MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange),
}
if !cfg.EnableRealTimeFilterAPI && !cfg.EnableHistoricFilterAPI {
if !cfg.EnableRealTimeFilterAPI {
// all event functionality is disabled
// the historic filter API relies on the real time one
return ee, nil
}
ee.SubManager = &full.EthSubscriptionManager{
EthModuleAPI: em,
}
ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters)
// Start garbage collection for filters
@ -53,67 +58,79 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy
},
})
if cfg.EnableRealTimeFilterAPI {
ee.EventFilterManager = &filter.EventFilterManager{
ChainStore: cs,
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}
addr, err := sm.LookupRobustAddress(ctx, idAddr, ts)
if err != nil {
return address.Undef, false
}
// if robust address is not f4 then we won't match against it so bail early
if addr.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(addr.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return addr, true
},
MaxFilterResults: cfg.MaxFilterResults,
}
ee.TipSetFilterManager = &filter.TipSetFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
ee.MemPoolFilterManager = &filter.MemPoolFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
// Enable indexing of actor events
var eventIndex *filter.EventIndex
if cfg.EnableHistoricFilterAPI {
var err error
eventIndex, err = filter.NewEventIndex(cfg.ActorEventDatabasePath)
if err != nil {
return nil, err
}
const ChainHeadConfidence = 1
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence)
if err != nil {
return err
}
// ignore returned tipsets
_ = ev.Observe(ee.EventFilterManager)
_ = ev.Observe(ee.TipSetFilterManager)
ch, err := mp.Updates(ctx)
if err != nil {
return err
}
go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch)
return nil
OnStop: func(ctx context.Context) error {
return eventIndex.Close()
},
})
}
if cfg.EnableHistoricFilterAPI {
// TODO: enable indexer
ee.EventFilterManager = &filter.EventFilterManager{
ChainStore: cs,
EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
// we only want to match using f4 addresses
idAddr, err := address.NewIDAddress(uint64(emitter))
if err != nil {
return address.Undef, false
}
actor, err := sm.LoadActor(ctx, idAddr, ts)
if err != nil || actor.Address == nil {
return address.Undef, false
}
// if robust address is not f4 then we won't match against it so bail early
if actor.Address.Protocol() != address.Delegated {
return address.Undef, false
}
// we have an f4 address, make sure it's assigned by the EAM
if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID {
return address.Undef, false
}
return *actor.Address, true
},
MaxFilterResults: cfg.MaxFilterResults,
}
ee.TipSetFilterManager = &filter.TipSetFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
ee.MemPoolFilterManager = &filter.MemPoolFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
const ChainHeadConfidence = 1
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence)
if err != nil {
return err
}
// ignore returned tipsets
_ = ev.Observe(ee.EventFilterManager)
_ = ev.Observe(ee.TipSetFilterManager)
ch, err := mp.Updates(ctx)
if err != nil {
return err
}
go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch)
return nil
},
})
return ee, nil
}

View File

@ -99,6 +99,16 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
rpcServer.AliasMethod("eth_estimateGas", "Filecoin.EthEstimateGas")
rpcServer.AliasMethod("eth_call", "Filecoin.EthCall")
rpcServer.AliasMethod("eth_getLogs", "Filecoin.EthGetLogs")
rpcServer.AliasMethod("eth_getFilterChanges", "Filecoin.EthGetFilterChanges")
rpcServer.AliasMethod("eth_getFilterLogs", "Filecoin.EthGetFilterLogs")
rpcServer.AliasMethod("eth_newFilter", "Filecoin.EthNewFilter")
rpcServer.AliasMethod("eth_newBlockFilter", "Filecoin.EthNewBlockFilter")
rpcServer.AliasMethod("eth_newPendingTransactionFilter", "Filecoin.EthNewPendingTransactionFilter")
rpcServer.AliasMethod("eth_uninstallFilter", "Filecoin.EthUninstallFilter")
rpcServer.AliasMethod("eth_subscribe", "Filecoin.EthSubscribe")
rpcServer.AliasMethod("eth_unsubscribe", "Filecoin.EthUnsubscribe")
rpcServer.AliasMethod("net_version", "Filecoin.NetVersion")
rpcServer.AliasMethod("net_listening", "Filecoin.NetListening")