From 69210d0917c2519dd1e9c2f5cea071b4e12ab2d2 Mon Sep 17 00:00:00 2001 From: Ian Davis Date: Thu, 10 Nov 2022 11:27:58 +0000 Subject: [PATCH] Ethereum compatible actor event API --- .circleci/config.yml | 5 + api/api_full.go | 41 ++- api/api_storage.go | 8 +- api/docgen/docgen.go | 16 +- api/eth_types.go | 184 +++++++++- api/eth_types_test.go | 214 +++++++++++ chain/events/filter/event.go | 394 ++++++++++++++++++++ chain/events/filter/event_test.go | 415 +++++++++++++++++++++ chain/events/filter/mempool.go | 141 +++++++ chain/events/filter/store.go | 93 +++++ chain/events/filter/tipset.go | 128 +++++++ itests/actor_events_test.go | 176 +++++++++ node/builder_chain.go | 6 + node/config/def.go | 21 +- node/config/types.go | 30 +- node/impl/full/eth.go | 590 +++++++++++++++++++++++++++++- node/modules/actorevent.go | 94 +++++ 17 files changed, 2533 insertions(+), 23 deletions(-) create mode 100644 chain/events/filter/event.go create mode 100644 chain/events/filter/event_test.go create mode 100644 chain/events/filter/mempool.go create mode 100644 chain/events/filter/store.go create mode 100644 chain/events/filter/tipset.go create mode 100644 itests/actor_events_test.go create mode 100644 node/modules/actorevent.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 4af2bfc12..8ca10eade 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -811,6 +811,11 @@ workflows: - gofmt - gen-check - docs-check + - test: + name: test-itest-actor_events + suite: itest-actor_events + target: "./itests/actor_events_test.go" + - test: name: test-itest-api suite: itest-api diff --git a/api/api_full.go b/api/api_full.go index d431b55ac..211cb3b59 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -26,7 +26,7 @@ import ( abinetwork "github.com/filecoin-project/go-state-types/network" apitypes "github.com/filecoin-project/lotus/api/types" - "github.com/filecoin-project/lotus/chain/actors/builtin" + builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/types" @@ -794,6 +794,41 @@ type FullNode interface { EthSendRawTransaction(ctx context.Context, rawTx EthBytes) (EthHash, error) //perm:read + // Returns event logs matching given filter spec. + EthGetLogs(ctx context.Context, filter *EthFilterSpec) (*EthFilterResult, error) //perm:read + + // Polling method for a filter, returns event logs which occurred since last poll. + // (requires write perm since timestamp of last filter execution will be written) + EthGetFilterChanges(ctx context.Context, id EthFilterID) (*EthFilterResult, error) //perm:write + + // Returns event logs matching filter with given id. + // (requires write perm since timestamp of last filter execution will be written) + EthGetFilterLogs(ctx context.Context, id EthFilterID) (*EthFilterResult, error) //perm:write + + // Installs a persistent filter based on given filter spec. + EthNewFilter(ctx context.Context, filter *EthFilterSpec) (EthFilterID, error) //perm:write + + // Installs a persistent filter to notify when a new block arrives. + EthNewBlockFilter(ctx context.Context) (EthFilterID, error) //perm:write + + // Installs a persistent filter to notify when new messages arrive in the message pool. + EthNewPendingTransactionFilter(ctx context.Context) (EthFilterID, error) //perm:write + + // Uninstalls a filter with given id. + EthUninstallFilter(ctx context.Context, id EthFilterID) (bool, error) //perm:write + + // Subscribe to different event types using websockets + // eventTypes is one or more of: + // - newHeads: notify when new blocks arrive. + // - pendingTransactions: notify when new messages arrive in the message pool. + // - logs: notify new event logs that match a criteria + // params contains additional parameters used with the log event type + // The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called. + EthSubscribe(ctx context.Context, eventTypes []string, params EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write + + // Unsubscribe from a websocket subscription + EthUnsubscribe(ctx context.Context, id EthSubscriptionID) (bool, error) //perm:write + // CreateBackup creates node backup onder the specified file name. The // method requires that the lotus daemon is running with the // LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that @@ -1184,7 +1219,7 @@ type CirculatingSupply struct { type MiningBaseInfo struct { MinerPower types.BigInt NetworkPower types.BigInt - Sectors []builtin.ExtendedSectorInfo + Sectors []builtinactors.ExtendedSectorInfo WorkerKey address.Address SectorSize abi.SectorSize PrevBeaconEntry types.BeaconEntry @@ -1201,7 +1236,7 @@ type BlockTemplate struct { Messages []*types.SignedMessage Epoch abi.ChainEpoch Timestamp uint64 - WinningPoStProof []builtin.PoStProof + WinningPoStProof []builtinactors.PoStProof } type DataSize struct { diff --git a/api/api_storage.go b/api/api_storage.go index 100be5cca..b4a0cc5f7 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -22,7 +22,7 @@ import ( "github.com/filecoin-project/go-state-types/builtin/v9/miner" abinetwork "github.com/filecoin-project/go-state-types/network" - "github.com/filecoin-project/lotus/chain/actors/builtin" + builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer/fsutil" @@ -152,7 +152,7 @@ type StorageMiner interface { WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) //perm:admin WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) //perm:admin - //storiface.WorkerReturn + // storiface.WorkerReturn ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storiface.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true @@ -175,7 +175,7 @@ type StorageMiner interface { // SealingSchedDiag dumps internal sealing scheduler state SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin - //SealingSchedRemove removes a request from sealing pipeline + // SealingSchedRemove removes a request from sealing pipeline SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error //perm:admin // paths.SectorIndex @@ -322,7 +322,7 @@ type StorageMiner interface { CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef, expensive bool) (map[abi.SectorNumber]string, error) //perm:admin - ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, poStEpoch abi.ChainEpoch, nv abinetwork.Version) ([]builtin.PoStProof, error) //perm:read + ComputeProof(ctx context.Context, ssi []builtinactors.ExtendedSectorInfo, rand abi.PoStRandomness, poStEpoch abi.ChainEpoch, nv abinetwork.Version) ([]builtinactors.PoStProof, error) //perm:read // RecoverFault can be used to declare recoveries manually. It sends messages // to the miner actor with details of recovered sectors and returns the CID of messages. It honors the diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index fc6c82157..b27df57dd 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -298,7 +298,8 @@ func init() { "title": "Lotus RPC API", "version": "1.2.1/generated=2020-11-22T08:22:42-06:00", }, - "methods": []interface{}{}}, + "methods": []interface{}{}, + }, ) addExample(api.CheckStatusCode(0)) @@ -335,7 +336,8 @@ func init() { NumConnsInbound: 3, NumConnsOutbound: 4, NumFD: 5, - }}) + }, + }) addExample(api.NetLimit{ Memory: 123, StreamsInbound: 1, @@ -374,10 +376,18 @@ func init() { ethFeeHistoryReward := [][]api.EthBigInt{} addExample(ðFeeHistoryReward) + + addExample(api.EthFilterID("c5564560217c43e4bc0484df655e9019")) + addExample(api.EthSubscriptionID("b62df77831484129adf6682332ad0725")) + + pstring := func(s string) *string { return &s } + addExample(&api.EthFilterSpec{ + FromBlock: pstring("2301220"), + Address: []api.EthAddress{ethaddr}, + }) } func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) { - switch pkg { case "api": // latest switch name { diff --git a/api/eth_types.go b/api/eth_types.go index 03bf85e05..d8dd6e64d 100644 --- a/api/eth_types.go +++ b/api/eth_types.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/ipfs/go-cid" + "github.com/minio/blake2b-simd" "github.com/multiformats/go-multihash" "github.com/multiformats/go-varint" "golang.org/x/xerrors" @@ -48,9 +49,7 @@ func (e *EthUint64) UnmarshalJSON(b []byte) error { type EthBigInt big.Int -var ( - EthBigIntZero = EthBigInt{Int: big.Zero().Int} -) +var EthBigIntZero = EthBigInt{Int: big.Zero().Int} func (e EthBigInt) MarshalJSON() ([]byte, error) { if e.Int == nil { @@ -396,7 +395,6 @@ func handlePrefix(s *string) { func decodeHexString(s string, length int) ([]byte, error) { b, err := hex.DecodeString(s) - if err != nil { return []byte{}, xerrors.Errorf("cannot parse hash: %w", err) } @@ -423,6 +421,10 @@ func EthHashFromHex(s string) (EthHash, error) { return h, nil } +func EthHashData(b []byte) EthHash { + return EthHash(blake2b.Sum256(b)) +} + func (h EthHash) String() string { return "0x" + hex.EncodeToString(h[:]) } @@ -440,3 +442,177 @@ type EthFeeHistory struct { GasUsedRatio []float64 `json:"gasUsedRatio"` Reward *[][]EthBigInt `json:"reward,omitempty"` } + +// An opaque identifier generated by the Lotus node to refer to an installed filter. +type EthFilterID string + +// An opaque identifier generated by the Lotus node to refer to an active subscription. +type EthSubscriptionID string + +type EthFilterSpec struct { + // Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first, + // "pending" for not yet committed messages. + // Optional, default: "latest". + FromBlock *string `json:"fromBlock,omitempty"` + + // Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first, + // "pending" for not yet committed messages. + // Optional, default: "latest". + ToBlock *string `json:"toBlock,omitempty"` + + // Actor address or a list of addresses from which event logs should originate. + // Optional, default nil. + // The JSON decoding must treat a string as equivalent to an array with one value, for example + // "0x8888f1f195afa192cfee86069858" must be decoded as [ "0x8888f1f195afa192cfee86069858" ] + Address EthAddressList `json:"address"` + + // List of topics to be matched. + // Optional, default: empty list + Topics EthTopicSpec `json:"topics"` + + // Restricts event logs returned to those in receipts contained in the tipset this block is part of. + // If BlockHash is present in in the filter criteria, then neither FromBlock nor ToBlock are allowed. + // Added in EIP-234 + BlockHash *EthHash `json:"blockHash,omitempty"` +} + +// EthAddressSpec represents a list of addresses. +// The JSON decoding must treat a string as equivalent to an array with one value, for example +// "0x8888f1f195afa192cfee86069858" must be decoded as [ "0x8888f1f195afa192cfee86069858" ] +type EthAddressList []EthAddress + +func (e *EthAddressList) UnmarshalJSON(b []byte) error { + if len(b) > 0 && b[0] == '[' { + var addrs []EthAddress + err := json.Unmarshal(b, &addrs) + if err != nil { + return err + } + *e = addrs + return nil + } + var addr EthAddress + err := json.Unmarshal(b, &addr) + if err != nil { + return err + } + *e = []EthAddress{addr} + return nil +} + +// TopicSpec represents a specification for matching by topic. An empty spec means all topics +// will be matched. Otherwise topics are matched conjunctively in the first dimension of the +// slice and disjunctively in the second dimension. Topics are matched in order. +// An event log with topics [A, B] will be matched by the following topic specs: +// [] "all" +// [[A]] "A in first position (and anything after)" +// [[A]] "A in first position (and anything after)" +// [nil, [B] ] "anything in first position AND B in second position (and anything after)" +// [[A], [B]] "A in first position AND B in second position (and anything after)" +// [[A, B], [A, B]] "(A OR B) in first position AND (A OR B) in second position (and anything after)" +// +// The JSON decoding must treat string values as equivalent to arrays with one value, for example +// { "A", [ "B", "C" ] } must be decoded as [ [ A ], [ B, C ] ] +type EthTopicSpec []EthHashList + +type EthHashList []EthHash + +func (e *EthHashList) UnmarshalJSON(b []byte) error { + if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) { + return nil + } + if len(b) > 0 && b[0] == '[' { + var hashes []EthHash + err := json.Unmarshal(b, &hashes) + if err != nil { + return err + } + *e = hashes + return nil + } + var hash EthHash + err := json.Unmarshal(b, &hash) + if err != nil { + return err + } + *e = []EthHash{hash} + return nil +} + +// FilterResult represents the response from executing a filter: a list of bloack hashes, a list of transaction hashes +// or a list of logs +// This is a union type. Only one field will be populated. +// The JSON encoding must produce an array of the populated field. +type EthFilterResult struct { + // List of block hashes. Only populated when the filter has been installed via EthNewBlockFilter + NewBlockHashes []EthHash + + // List of transaction hashes. Only populated when the filter has been installed via EthNewPendingTransactionFilter + NewTransactionHashes []EthHash + + // List of event logs. Only populated when the filter has been installed via EthNewFilter + NewLogs []EthLog +} + +func (h EthFilterResult) MarshalJSON() ([]byte, error) { + if h.NewBlockHashes != nil { + return json.Marshal(h.NewBlockHashes) + } + if h.NewTransactionHashes != nil { + return json.Marshal(h.NewTransactionHashes) + } + if h.NewLogs != nil { + return json.Marshal(h.NewLogs) + } + return []byte{'[', ']'}, nil +} + +type EthLog struct { + // Address is the address of the actor that produced the event log. + Address EthAddress `json:"address"` + + // Data is the values of the event log, excluding topics + Data []EthHash `json:"data"` + + // List of topics associated with the event log. + Topics []EthHash `json:"topics"` + + // Following fields are derived from the transaction containing the log + + // Indicates whether the log was removed due to a chain reorganization. + Removed bool `json:"removed"` + + // LogIndex is the index of the event log in the sequence of events produced by the message execution. + // (this is the index in the events AMT on the message receipt) + LogIndex EthUint64 `json:"logIndex"` + + // TransactionIndex is the index in the tipset of the transaction that produced the event log. + // The index corresponds to the sequence of messages produced by ChainGetParentMessages + TransactionIndex EthUint64 `json:"transactionIndex"` + + // TransactionHash is the cid of the transaction that produced the event log. + TransactionHash EthHash `json:"transactionHash"` + + // BlockHash is the hash of a block in the tipset containing the message receipt of the message execution. + // This may be passed to ChainGetParentReceipts to obtain a list of receipts. The receipt + // containing the events will be at TransactionIndex in the receipt list. + BlockHash EthHash `json:"blockHash"` + + // BlockNumber is the epoch at which the message was executed. This is the epoch containing + // the message receipt. + BlockNumber EthUint64 `json:"blockNumber"` +} + +type EthSubscriptionParams struct { + // List of topics to be matched. + // Optional, default: empty list + Topics EthTopicSpec `json:"topics,omitempty"` +} + +type EthSubscriptionResponse struct { + // The persistent identifier for the subscription which can be used to unsubscribe. + SubscriptionID EthSubscriptionID `json:"subscription"` + + // The object matching the subscription. This may be a Block (tipset), a Transaction (message) or an EthLog + Result interface{} `json:"result"` +} diff --git a/api/eth_types_test.go b/api/eth_types_test.go index 46ce4f49a..cea465965 100644 --- a/api/eth_types_test.go +++ b/api/eth_types_test.go @@ -2,6 +2,7 @@ package api import ( + "encoding/json" "strings" "testing" @@ -30,6 +31,7 @@ func TestEthIntMarshalJSON(t *testing.T) { require.Equal(t, j, tc.Output) } } + func TestEthIntUnmarshalJSON(t *testing.T) { testcases := []TestCase{ {[]byte("\"0x0\""), EthUint64(0)}, @@ -155,3 +157,215 @@ func TestUnmarshalEthBytes(t *testing.T) { require.Equal(t, string(data), tc) } } + +func TestEthFilterResultMarshalJSON(t *testing.T) { + hash1, err := EthHashFromHex("013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184") + require.NoError(t, err, "eth hash") + + hash2, err := EthHashFromHex("ab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738") + require.NoError(t, err, "eth hash") + + addr, err := EthAddressFromHex("d4c5fb16488Aa48081296299d54b0c648C9333dA") + require.NoError(t, err, "eth address") + + log := EthLog{ + Removed: true, + LogIndex: 5, + TransactionIndex: 45, + TransactionHash: hash1, + BlockHash: hash2, + BlockNumber: 53, + Topics: []EthHash{hash1}, + Data: []EthHash{hash1}, + Address: addr, + } + logjson, err := json.Marshal(log) + require.NoError(t, err, "log json") + + testcases := []struct { + res EthFilterResult + want string + }{ + { + res: EthFilterResult{}, + want: "[]", + }, + + { + res: EthFilterResult{ + NewBlockHashes: []EthHash{hash1, hash2}, + }, + want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, + }, + + { + res: EthFilterResult{ + NewTransactionHashes: []EthHash{hash1, hash2}, + }, + want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, + }, + + { + res: EthFilterResult{ + NewLogs: []EthLog{log}, + }, + want: `[` + string(logjson) + `]`, + }, + } + + for _, tc := range testcases { + data, err := json.Marshal(tc.res) + require.NoError(t, err) + require.Equal(t, tc.want, string(data)) + } +} + +func TestEthFilterSpecUnmarshalJSON(t *testing.T) { + hash1, err := EthHashFromHex("013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184") + require.NoError(t, err, "eth hash") + + hash2, err := EthHashFromHex("ab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738") + require.NoError(t, err, "eth hash") + + addr, err := EthAddressFromHex("d4c5fb16488Aa48081296299d54b0c648C9333dA") + require.NoError(t, err, "eth address") + + pstring := func(s string) *string { return &s } + phash := func(h EthHash) *EthHash { return &h } + + testcases := []struct { + input string + want EthFilterSpec + }{ + { + input: `{"fromBlock":"latest"}`, + want: EthFilterSpec{FromBlock: pstring("latest")}, + }, + { + input: `{"toBlock":"pending"}`, + want: EthFilterSpec{ToBlock: pstring("pending")}, + }, + { + input: `{"address":["0xd4c5fb16488Aa48081296299d54b0c648C9333dA"]}`, + want: EthFilterSpec{Address: EthAddressList{addr}}, + }, + { + input: `{"address":"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"}`, + want: EthFilterSpec{Address: EthAddressList{addr}}, + }, + { + input: `{"blockHash":"0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"}`, + want: EthFilterSpec{BlockHash: phash(hash1)}, + }, + { + input: `{"topics":["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"]}`, + want: EthFilterSpec{ + Topics: EthTopicSpec{ + {hash1}, + }, + }, + }, + { + input: `{"topics":["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]}`, + want: EthFilterSpec{ + Topics: EthTopicSpec{ + {hash1}, + {hash2}, + }, + }, + }, + { + input: `{"topics":[null, ["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]]}`, + want: EthFilterSpec{ + Topics: EthTopicSpec{ + nil, + {hash1, hash2}, + }, + }, + }, + { + input: `{"topics":[null, "0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"]}`, + want: EthFilterSpec{ + Topics: EthTopicSpec{ + nil, + {hash1}, + }, + }, + }, + } + + for _, tc := range testcases { + var got EthFilterSpec + err := json.Unmarshal([]byte(tc.input), &got) + require.NoError(t, err) + require.Equal(t, tc.want, got) + } +} + +func TestEthAddressListUnmarshalJSON(t *testing.T) { + addr1, err := EthAddressFromHex("d4c5fb16488Aa48081296299d54b0c648C9333dA") + require.NoError(t, err, "eth address") + + addr2, err := EthAddressFromHex("abbbfb16488Aa48081296299d54b0c648C9333dA") + require.NoError(t, err, "eth address") + + testcases := []struct { + input string + want EthAddressList + }{ + { + input: `["0xd4c5fb16488Aa48081296299d54b0c648C9333dA"]`, + want: EthAddressList{addr1}, + }, + { + input: `["0xd4c5fb16488Aa48081296299d54b0c648C9333dA","abbbfb16488Aa48081296299d54b0c648C9333dA"]`, + want: EthAddressList{addr1, addr2}, + }, + { + input: `"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"`, + want: EthAddressList{addr1}, + }, + } + for _, tc := range testcases { + var got EthAddressList + err := json.Unmarshal([]byte(tc.input), &got) + require.NoError(t, err) + require.Equal(t, tc.want, got) + } +} + +func TestEthHashListUnmarshalJSON(t *testing.T) { + hash1, err := EthHashFromHex("013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184") + require.NoError(t, err, "eth hash") + + hash2, err := EthHashFromHex("ab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738") + require.NoError(t, err, "eth hash") + + testcases := []struct { + input string + want *EthHashList + }{ + { + input: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"]`, + want: &EthHashList{hash1}, + }, + { + input: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, + want: &EthHashList{hash1, hash2}, + }, + { + input: `"0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"`, + want: &EthHashList{hash1}, + }, + { + input: `null`, + want: nil, + }, + } + for _, tc := range testcases { + var got *EthHashList + err := json.Unmarshal([]byte(tc.input), &got) + require.NoError(t, err) + require.Equal(t, tc.want, got) + } +} diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go new file mode 100644 index 000000000..c81aed7fe --- /dev/null +++ b/chain/events/filter/event.go @@ -0,0 +1,394 @@ +package filter + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" + + cstore "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" +) + +const indexed uint8 = 0x01 + +type EventFilter struct { + id string + minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum + maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum + tipsetCid cid.Cid + addresses []address.Address // list of actor ids that originated the event + keys map[string][][]byte // map of key names to a list of alternate values that may match + maxResults int // maximum number of results to collect, 0 is unlimited + + mu sync.Mutex + collected []*CollectedEvent + lastTaken time.Time + ch chan<- interface{} +} + +var _ Filter = (*EventFilter)(nil) + +type CollectedEvent struct { + Event *types.Event + EventIdx int // index of the event within the list of emitted events + Reverted bool + Height abi.ChainEpoch + TipSetKey types.TipSetKey // tipset that contained the message + MsgIdx int // index of the message in the tipset + MsgCid cid.Cid // cid of message that produced event +} + +func (f *EventFilter) ID() string { + return f.id +} + +func (f *EventFilter) SetSubChannel(ch chan<- interface{}) { + f.mu.Lock() + defer f.mu.Unlock() + f.ch = ch + f.collected = nil +} + +func (f *EventFilter) ClearSubChannel() { + f.mu.Lock() + defer f.mu.Unlock() + f.ch = nil +} + +func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool) error { + if !f.matchTipset(te) { + return nil + } + + ems, err := te.messages(ctx) + if err != nil { + return xerrors.Errorf("load executed messages: %w", err) + } + for msgIdx, em := range ems { + for evIdx, ev := range em.Events() { + if !f.matchAddress(ev.Emitter) { + continue + } + if !f.matchKeys(ev.Entries) { + continue + } + + // event matches filter, so record it + cev := &CollectedEvent{ + Event: ev, + EventIdx: evIdx, + Reverted: revert, + Height: te.msgTs.Height(), + TipSetKey: te.msgTs.Key(), + MsgCid: em.Message().Cid(), + MsgIdx: msgIdx, + } + + f.mu.Lock() + // if we have a subscription channel then push event to it + if f.ch != nil { + f.ch <- cev + f.mu.Unlock() + continue + } + + if f.maxResults > 0 && len(f.collected) == f.maxResults { + copy(f.collected, f.collected[1:]) + f.collected = f.collected[:len(f.collected)-1] + } + f.collected = append(f.collected, cev) + f.mu.Unlock() + } + } + + return nil +} + +func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent { + f.mu.Lock() + collected := f.collected + f.collected = nil + f.lastTaken = time.Now().UTC() + f.mu.Unlock() + + return collected +} + +func (f *EventFilter) LastTaken() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + return f.lastTaken +} + +// matchTipset reports whether this filter matches the given tipset +func (f *EventFilter) matchTipset(te *TipSetEvents) bool { + if f.tipsetCid != cid.Undef { + tsCid, err := te.Cid() + if err != nil { + return false + } + return f.tipsetCid.Equals(tsCid) + } + + if f.minHeight >= 0 && f.minHeight > te.Height() { + return false + } + if f.maxHeight >= 0 && f.maxHeight < te.Height() { + return false + } + return true +} + +func (f *EventFilter) matchAddress(o address.Address) bool { + if len(f.addresses) == 0 { + return true + } + // Assume short lists of addresses + // TODO: binary search for longer lists + for _, a := range f.addresses { + if a == o { + return true + } + } + return false +} + +func (f *EventFilter) matchKeys(ees []types.EventEntry) bool { + if len(f.keys) == 0 { + return true + } + // TODO: optimize this naive algorithm + // Note keys names may be repeated so we may have multiple opportunities to match + + matched := map[string]bool{} + for _, ee := range ees { + // Skip an entry that is not indexable + if ee.Flags&indexed != indexed { + continue + } + + keyname := string(ee.Key) + + // skip if we have already matched this key + if matched[keyname] { + continue + } + + wantlist, ok := f.keys[keyname] + if !ok { + continue + } + + for _, w := range wantlist { + if bytes.Equal(w, ee.Value) { + matched[keyname] = true + break + } + } + + if len(matched) == len(f.keys) { + // all keys have been matched + return true + } + + } + + return false +} + +type TipSetEvents struct { + rctTs *types.TipSet // rctTs is the tipset containing the receipts of executed messages + msgTs *types.TipSet // msgTs is the tipset containing the messages that have been executed + + load func(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) + + once sync.Once // for lazy population of ems + ems []executedMessage + err error +} + +func (te *TipSetEvents) Height() abi.ChainEpoch { + return te.msgTs.Height() +} + +func (te *TipSetEvents) Cid() (cid.Cid, error) { + return te.msgTs.Key().Cid() +} + +func (te *TipSetEvents) messages(ctx context.Context) ([]executedMessage, error) { + te.once.Do(func() { + // populate executed message list + ems, err := te.load(ctx, te.msgTs, te.rctTs) + if err != nil { + te.err = err + return + } + te.ems = ems + }) + return te.ems, te.err +} + +type executedMessage struct { + msg *types.Message + rct *types.MessageReceipt + // events extracted from receipt + evs []*types.Event +} + +func (e *executedMessage) Message() *types.Message { + return e.msg +} + +func (e *executedMessage) Receipt() *types.MessageReceipt { + return e.rct +} + +func (e *executedMessage) Events() []*types.Event { + return e.evs +} + +type EventFilterManager struct { + ChainStore *cstore.ChainStore + MaxFilterResults int + + mu sync.Mutex // guards mutations to filters + filters map[string]*EventFilter +} + +func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.filters) == 0 { + return nil + } + + tse := &TipSetEvents{ + msgTs: from, + rctTs: to, + load: m.loadExecutedMessages, + } + + // TODO: could run this loop in parallel with errgroup if there are many filters + for _, f := range m.filters { + if err := f.CollectEvents(ctx, tse, false); err != nil { + return err + } + } + + return nil +} + +func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) error { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.filters) == 0 { + return nil + } + + tse := &TipSetEvents{ + msgTs: to, + rctTs: from, + load: m.loadExecutedMessages, + } + + // TODO: could run this loop in parallel with errgroup if there are many filters + for _, f := range m.filters { + if err := f.CollectEvents(ctx, tse, true); err != nil { + return err + } + } + + return nil +} + +func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keys map[string][][]byte) (*EventFilter, error) { + id, err := uuid.NewRandom() + if err != nil { + return nil, xerrors.Errorf("new uuid: %w", err) + } + + f := &EventFilter{ + id: id.String(), + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keys: keys, + maxResults: m.MaxFilterResults, + } + + m.mu.Lock() + m.filters[id.String()] = f + m.mu.Unlock() + + return f, nil +} + +func (m *EventFilterManager) Remove(ctx context.Context, id string) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, found := m.filters[id]; !found { + return ErrFilterNotFound + } + delete(m.filters, id) + return nil +} + +func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + msgs, err := m.ChainStore.MessagesForTipset(ctx, msgTs) + if err != nil { + return nil, xerrors.Errorf("read messages: %w", err) + } + + st := m.ChainStore.ActorStore(ctx) + + arr, err := blockadt.AsArray(st, rctTs.Blocks()[0].ParentMessageReceipts) + if err != nil { + return nil, xerrors.Errorf("load receipts amt: %w", err) + } + + if uint64(len(msgs)) != arr.Length() { + return nil, xerrors.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length()) + } + + ems := make([]executedMessage, len(msgs)) + + for i := 0; i < len(msgs); i++ { + ems[i].msg = msgs[i].VMMessage() + + var rct types.MessageReceipt + found, err := arr.Get(uint64(i), &rct) + if err != nil { + return nil, xerrors.Errorf("load receipt: %w", err) + } + if !found { + return nil, xerrors.Errorf("receipt %d not found", i) + } + ems[i].rct = &rct + + evtArr, err := blockadt.AsArray(st, rct.EventsRoot) + if err != nil { + return nil, xerrors.Errorf("load events amt: %w", err) + } + + ems[i].evs = make([]*types.Event, evtArr.Length()) + var evt types.Event + _ = arr.ForEach(&evt, func(i int64) error { + cpy := evt + ems[i].evs[int(i)] = &cpy + return nil + }) + + } + + return ems, nil +} diff --git a/chain/events/filter/event_test.go b/chain/events/filter/event_test.go new file mode 100644 index 000000000..76cad096e --- /dev/null +++ b/chain/events/filter/event_test.go @@ -0,0 +1,415 @@ +package filter + +import ( + "context" + pseudo "math/rand" + "testing" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/exitcode" + blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" + + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/types" +) + +func TestEventFilterCollectEvents(t *testing.T) { + rng := pseudo.New(pseudo.NewSource(299792458)) + a1 := randomActorAddr(t, rng) + a2 := randomActorAddr(t, rng) + + ev1 := fakeEvent( + a1, + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr1")}, + }, + []kv{ + {k: "amount", v: []byte("2988181")}, + }, + ) + + st := newStore() + events := []*types.Event{ev1} + em := executedMessage{ + msg: fakeMessage(randomActorAddr(t, rng), randomActorAddr(t, rng)), + rct: fakeReceipt(t, rng, st, events), + evs: events, + } + + events14000 := buildTipSetEvents(t, rng, 14000, em) + cid14000, err := events14000.msgTs.Key().Cid() + require.NoError(t, err, "tipset cid") + + noCollectedEvents := []*CollectedEvent{} + oneCollectedEvent := []*CollectedEvent{ + { + Event: ev1, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), + }, + } + + testCases := []struct { + name string + filter *EventFilter + te *TipSetEvents + want []*CollectedEvent + }{ + { + name: "nomatch tipset min height", + filter: &EventFilter{ + minHeight: 14001, + maxHeight: -1, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch tipset max height", + filter: &EventFilter{ + minHeight: -1, + maxHeight: 13999, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match tipset min height", + filter: &EventFilter{ + minHeight: 14000, + maxHeight: -1, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match tipset cid", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "nomatch address", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a2}, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match address", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match one entry", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + }, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match one entry with alternate values", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("cancel"), + []byte("propose"), + []byte("approval"), + }, + }, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "nomatch one entry by missing value", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("cancel"), + []byte("propose"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry by missing key", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "method": { + []byte("approval"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match one entry with multiple keys", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr1"), + }, + }, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "nomatch one entry with one mismatching key", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + "approver": { + []byte("addr1"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one mismatching value", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr2"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one unindexed key", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "amount": { + []byte("2988181"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + } + + for _, tc := range testCases { + tc := tc // appease lint + t.Run(tc.name, func(t *testing.T) { + if err := tc.filter.CollectEvents(context.Background(), tc.te, false); err != nil { + require.NoError(t, err, "collect events") + } + + coll := tc.filter.TakeCollectedEvents(context.Background()) + require.ElementsMatch(t, coll, tc.want) + }) + } +} + +type kv struct { + k string + v []byte +} + +func fakeEvent(emitter address.Address, indexed []kv, unindexed []kv) *types.Event { + ev := &types.Event{ + Emitter: emitter, + } + + for _, in := range indexed { + ev.Entries = append(ev.Entries, types.EventEntry{ + Flags: 0x01, + Key: []byte(in.k), + Value: in.v, + }) + } + + for _, in := range unindexed { + ev.Entries = append(ev.Entries, types.EventEntry{ + Flags: 0x00, + Key: []byte(in.k), + Value: in.v, + }) + } + + return ev +} + +func randomActorAddr(tb testing.TB, rng *pseudo.Rand) address.Address { + tb.Helper() + addr, err := address.NewActorAddress(randomBytes(32, rng)) + require.NoError(tb, err) + + return addr +} + +func randomIDAddr(tb testing.TB, rng *pseudo.Rand) address.Address { + tb.Helper() + addr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(tb, err) + return addr +} + +func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid { + tb.Helper() + cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} + c, err := cb.Sum(randomBytes(10, rng)) + require.NoError(tb, err) + return c +} + +func randomBytes(n int, rng *pseudo.Rand) []byte { + buf := make([]byte, n) + rng.Read(buf) + return buf +} + +func fakeMessage(to, from address.Address) *types.Message { + return &types.Message{ + To: to, + From: from, + Nonce: 197, + Method: 1, + Params: []byte("some random bytes"), + GasLimit: 126723, + GasPremium: types.NewInt(4), + GasFeeCap: types.NewInt(120), + } +} + +func fakeReceipt(tb testing.TB, rng *pseudo.Rand, st adt.Store, events []*types.Event) *types.MessageReceipt { + arr := blockadt.MakeEmptyArray(st) + for _, ev := range events { + err := arr.AppendContinuous(ev) + require.NoError(tb, err, "append event") + } + eventsRoot, err := arr.Root() + require.NoError(tb, err, "flush events amt") + + return &types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: randomBytes(32, rng), + GasUsed: rng.Int63(), + Events: eventsRoot, + } +} + +func fakeTipSet(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, parents []cid.Cid) *types.TipSet { + tb.Helper() + ts, err := types.NewTipSet([]*types.BlockHeader{ + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + }) + + require.NoError(tb, err) + + return ts +} + +func newStore() adt.Store { + ctx := context.Background() + bs := blockstore.NewMemorySync() + store := cbor.NewCborStore(bs) + return adt.WrapStore(ctx, store) +} + +func buildTipSetEvents(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, em executedMessage) *TipSetEvents { + tb.Helper() + + msgTs := fakeTipSet(tb, rng, h, []cid.Cid{}) + rctTs := fakeTipSet(tb, rng, h+1, msgTs.Cids()) + + return &TipSetEvents{ + msgTs: msgTs, + rctTs: rctTs, + load: func(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) { + return []executedMessage{em}, nil + }, + } +} diff --git a/chain/events/filter/mempool.go b/chain/events/filter/mempool.go new file mode 100644 index 000000000..dcea0f54c --- /dev/null +++ b/chain/events/filter/mempool.go @@ -0,0 +1,141 @@ +package filter + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" +) + +type MemPoolFilter struct { + id string + maxResults int // maximum number of results to collect, 0 is unlimited + ch chan<- interface{} + + mu sync.Mutex + collected []cid.Cid + lastTaken time.Time +} + +var _ Filter = (*MemPoolFilter)(nil) + +func (f *MemPoolFilter) ID() string { + return f.id +} + +func (f *MemPoolFilter) SetSubChannel(ch chan<- interface{}) { + f.mu.Lock() + defer f.mu.Unlock() + f.ch = ch + f.collected = nil +} + +func (f *MemPoolFilter) ClearSubChannel() { + f.mu.Lock() + defer f.mu.Unlock() + f.ch = nil +} + +func (f *MemPoolFilter) CollectMessage(ctx context.Context, msg *types.SignedMessage) { + f.mu.Lock() + defer f.mu.Unlock() + + // if we have a subscription channel then push message to it + if f.ch != nil { + f.ch <- msg + return + } + + if f.maxResults > 0 && len(f.collected) == f.maxResults { + copy(f.collected, f.collected[1:]) + f.collected = f.collected[:len(f.collected)-1] + } + f.collected = append(f.collected, msg.Cid()) +} + +func (f *MemPoolFilter) TakeCollectedMessages(context.Context) []cid.Cid { + f.mu.Lock() + collected := f.collected + f.collected = nil + f.lastTaken = time.Now().UTC() + f.mu.Unlock() + + return collected +} + +func (f *MemPoolFilter) LastTaken() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + return f.lastTaken +} + +type MemPoolFilterManager struct { + MaxFilterResults int + + mu sync.Mutex // guards mutations to filters + filters map[string]*MemPoolFilter +} + +func (m *MemPoolFilterManager) WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate) { + for { + select { + case <-ctx.Done(): + return + case u := <-ch: + m.processUpdate(ctx, u) + } + } +} + +func (m *MemPoolFilterManager) processUpdate(ctx context.Context, u api.MpoolUpdate) { + // only process added messages + if u.Type == api.MpoolRemove { + return + } + + m.mu.Lock() + defer m.mu.Unlock() + + if len(m.filters) == 0 { + return + } + + // TODO: could run this loop in parallel with errgroup if we expect large numbers of filters + for _, f := range m.filters { + f.CollectMessage(ctx, u.Message) + } +} + +func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, error) { + id, err := uuid.NewRandom() + if err != nil { + return nil, xerrors.Errorf("new uuid: %w", err) + } + + f := &MemPoolFilter{ + id: id.String(), + maxResults: m.MaxFilterResults, + } + + m.mu.Lock() + m.filters[id.String()] = f + m.mu.Unlock() + + return f, nil +} + +func (m *MemPoolFilterManager) Remove(ctx context.Context, id string) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, found := m.filters[id]; !found { + return ErrFilterNotFound + } + delete(m.filters, id) + return nil +} diff --git a/chain/events/filter/store.go b/chain/events/filter/store.go new file mode 100644 index 000000000..2f8a09875 --- /dev/null +++ b/chain/events/filter/store.go @@ -0,0 +1,93 @@ +package filter + +import ( + "context" + "errors" + "sync" + "time" +) + +type Filter interface { + ID() string + LastTaken() time.Time + SetSubChannel(chan<- interface{}) + ClearSubChannel() +} + +type FilterStore interface { + Add(context.Context, Filter) error + Get(context.Context, string) (Filter, error) + Remove(context.Context, string) error + NotTakenSince(when time.Time) []Filter // returns a list of filters that have not had their collected results taken +} + +var ( + ErrFilterAlreadyRegistered = errors.New("filter already registered") + ErrFilterNotFound = errors.New("filter not found") + ErrMaximumNumberOfFilters = errors.New("maximum number of filters registered") +) + +type memFilterStore struct { + max int + mu sync.Mutex + filters map[string]Filter +} + +var _ FilterStore = (*memFilterStore)(nil) + +func NewMemFilterStore(maxFilters int) FilterStore { + return &memFilterStore{ + max: maxFilters, + filters: make(map[string]Filter), + } +} + +func (m *memFilterStore) Add(_ context.Context, f Filter) error { + m.mu.Lock() + defer m.mu.Unlock() + + if len(m.filters) >= m.max { + return ErrMaximumNumberOfFilters + } + + if _, exists := m.filters[f.ID()]; exists { + return ErrFilterAlreadyRegistered + } + m.filters[f.ID()] = f + return nil +} + +func (m *memFilterStore) Get(_ context.Context, id string) (Filter, error) { + m.mu.Lock() + f, found := m.filters[id] + m.mu.Unlock() + if !found { + return nil, ErrFilterNotFound + } + return f, nil +} + +func (m *memFilterStore) Remove(_ context.Context, id string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.filters[id]; !exists { + return ErrFilterNotFound + } + delete(m.filters, id) + return nil +} + +func (m *memFilterStore) NotTakenSince(when time.Time) []Filter { + m.mu.Lock() + defer m.mu.Unlock() + + var res []Filter + for _, f := range m.filters { + if f.LastTaken().Before(when) { + res = append(res, f) + } + } + + return res +} diff --git a/chain/events/filter/tipset.go b/chain/events/filter/tipset.go new file mode 100644 index 000000000..1f43b09a3 --- /dev/null +++ b/chain/events/filter/tipset.go @@ -0,0 +1,128 @@ +package filter + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/types" +) + +type TipSetFilter struct { + id string + maxResults int // maximum number of results to collect, 0 is unlimited + ch chan<- interface{} + + mu sync.Mutex + collected []types.TipSetKey + lastTaken time.Time +} + +var _ Filter = (*TipSetFilter)(nil) + +func (f *TipSetFilter) ID() string { + return f.id +} + +func (f *TipSetFilter) SetSubChannel(ch chan<- interface{}) { + f.mu.Lock() + defer f.mu.Unlock() + f.ch = ch + f.collected = nil +} + +func (f *TipSetFilter) ClearSubChannel() { + f.mu.Lock() + defer f.mu.Unlock() + f.ch = nil +} + +func (f *TipSetFilter) CollectTipSet(ctx context.Context, ts *types.TipSet) { + f.mu.Lock() + defer f.mu.Unlock() + + // if we have a subscription channel then push tipset to it + if f.ch != nil { + f.ch <- ts + return + } + + if f.maxResults > 0 && len(f.collected) == f.maxResults { + copy(f.collected, f.collected[1:]) + f.collected = f.collected[:len(f.collected)-1] + } + f.collected = append(f.collected, ts.Key()) +} + +func (f *TipSetFilter) TakeCollectedTipSets(context.Context) []types.TipSetKey { + f.mu.Lock() + collected := f.collected + f.collected = nil + f.lastTaken = time.Now().UTC() + f.mu.Unlock() + + return collected +} + +func (f *TipSetFilter) LastTaken() time.Time { + f.mu.Lock() + defer f.mu.Unlock() + return f.lastTaken +} + +type TipSetFilterManager struct { + MaxFilterResults int + + mu sync.Mutex // guards mutations to filters + filters map[string]*TipSetFilter +} + +func (m *TipSetFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.filters) == 0 { + return nil + } + + // TODO: could run this loop in parallel with errgroup + for _, f := range m.filters { + f.CollectTipSet(ctx, to) + } + + return nil +} + +func (m *TipSetFilterManager) Revert(ctx context.Context, from, to *types.TipSet) error { + return nil +} + +func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error) { + id, err := uuid.NewRandom() + if err != nil { + return nil, xerrors.Errorf("new uuid: %w", err) + } + + f := &TipSetFilter{ + id: id.String(), + maxResults: m.MaxFilterResults, + } + + m.mu.Lock() + m.filters[id.String()] = f + m.mu.Unlock() + + return f, nil +} + +func (m *TipSetFilterManager) Remove(ctx context.Context, id string) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, found := m.filters[id]; !found { + return ErrFilterNotFound + } + delete(m.filters, id) + return nil +} diff --git a/itests/actor_events_test.go b/itests/actor_events_test.go new file mode 100644 index 000000000..f0f05418e --- /dev/null +++ b/itests/actor_events_test.go @@ -0,0 +1,176 @@ +// stm: #integration +package itests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/big" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" +) + +func TestActorEventsMpool(t *testing.T) { + ctx := context.Background() + + kit.QuietMiningLogs() + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // create a new address where to send funds. + addr, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + // get the existing balance from the default wallet to then split it. + bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) + require.NoError(t, err) + + // install filter + filterID, err := client.EthNewPendingTransactionFilter(ctx) + require.NoError(t, err) + + const iterations = 100 + + // we'll send half our balance (saving the other half for gas), + // in `iterations` increments. + toSend := big.Div(bal, big.NewInt(2)) + each := big.Div(toSend, big.NewInt(iterations)) + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(t, err) + <-headChangeCh // skip hccurrent + + count := 0 + for { + select { + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply { + msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) + require.NoError(t, err) + count += len(msgs) + if count == iterations { + waitAllCh <- struct{}{} + } + } + } + } + } + }() + + var sms []*types.SignedMessage + for i := 0; i < iterations; i++ { + msg := &types.Message{ + From: client.DefaultKey.Address, + To: addr, + Value: each, + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + select { + case <-waitAllCh: + case <-time.After(time.Minute): + t.Errorf("timeout to wait for pack messages") + } + + // collect filter results + res, err := client.EthGetFilterChanges(ctx, filterID) + require.NoError(t, err) + + // expect to have seen iteration number of mpool messages + require.Equal(t, iterations, len(res.NewTransactionHashes)) +} + +func TestActorEventsTipsets(t *testing.T) { + ctx := context.Background() + + kit.QuietMiningLogs() + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // create a new address where to send funds. + addr, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + // get the existing balance from the default wallet to then split it. + bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) + require.NoError(t, err) + + // install filter + filterID, err := client.EthNewBlockFilter(ctx) + require.NoError(t, err) + + const iterations = 100 + + // we'll send half our balance (saving the other half for gas), + // in `iterations` increments. + toSend := big.Div(bal, big.NewInt(2)) + each := big.Div(toSend, big.NewInt(iterations)) + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(t, err) + <-headChangeCh // skip hccurrent + + count := 0 + for { + select { + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply { + msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) + require.NoError(t, err) + count += len(msgs) + if count == iterations { + waitAllCh <- struct{}{} + } + } + } + } + } + }() + + var sms []*types.SignedMessage + for i := 0; i < iterations; i++ { + msg := &types.Message{ + From: client.DefaultKey.Address, + To: addr, + Value: each, + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + select { + case <-waitAllCh: + case <-time.After(time.Minute): + t.Errorf("timeout to wait for pack messages") + } + + // collect filter results + res, err := client.EthGetFilterChanges(ctx, filterID) + require.NoError(t, err) + + // expect to have seen iteration number of tipsets + require.Equal(t, iterations, len(res.NewBlockHashes)) +} diff --git a/node/builder_chain.go b/node/builder_chain.go index 3157a354e..3777f33a0 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/market" @@ -156,6 +157,7 @@ var ChainNode = Options( Override(new(messagesigner.MpoolNonceAPI), From(new(*messagepool.MessagePool))), Override(new(full.ChainModuleAPI), From(new(full.ChainModule))), Override(new(full.EthModuleAPI), From(new(full.EthModule))), + Override(new(full.EthEventAPI), From(new(full.EthEvent))), Override(new(full.GasModuleAPI), From(new(full.GasModule))), Override(new(full.MpoolModuleAPI), From(new(full.MpoolModule))), Override(new(full.StateModuleAPI), From(new(full.StateModule))), @@ -239,6 +241,10 @@ func ConfigFullNode(c interface{}) Option { Unset(new(*wallet.LocalWallet)), Override(new(wallet.Default), wallet.NilDefault), ), + + // Actor event filtering support + Override(new(events.EventAPI), From(new(modules.EventAPI))), + Override(new(full.EthEventAPI), modules.EthEvent(cfg.ActorEvent)), ) } diff --git a/node/config/def.go b/node/config/def.go index a6e6fc66a..747b9b284 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -71,11 +71,12 @@ func defCommon() Common { DirectPeers: nil, }, } - } -var DefaultDefaultMaxFee = types.MustParseFIL("0.07") -var DefaultSimultaneousTransfers = uint64(20) +var ( + DefaultDefaultMaxFee = types.MustParseFIL("0.07") + DefaultSimultaneousTransfers = uint64(20) +) // DefaultFullNode returns the default config func DefaultFullNode() *FullNode { @@ -98,6 +99,14 @@ func DefaultFullNode() *FullNode { HotStoreFullGCFrequency: 20, }, }, + ActorEvent: ActorEventConfig{ + EnableRealTimeFilterAPI: false, + EnableHistoricFilterAPI: false, + FilterTTL: Duration(time.Hour * 24), + MaxFilters: 100, + MaxFilterResults: 10000, + MaxFilterHeightRange: 2880, // conservative limit of one day + }, } } @@ -253,8 +262,10 @@ func DefaultStorageMiner() *StorageMiner { return cfg } -var _ encoding.TextMarshaler = (*Duration)(nil) -var _ encoding.TextUnmarshaler = (*Duration)(nil) +var ( + _ encoding.TextMarshaler = (*Duration)(nil) + _ encoding.TextUnmarshaler = (*Duration)(nil) +) // Duration is a wrapper type for time.Duration // for decoding and encoding from/to TOML diff --git a/node/config/types.go b/node/config/types.go index dbfa2e432..0032930fd 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -27,6 +27,7 @@ type FullNode struct { Wallet Wallet Fees FeeConfig Chainstore Chainstore + ActorEvent ActorEventConfig } // // Common @@ -168,7 +169,6 @@ type DealmakingConfig struct { } type IndexProviderConfig struct { - // Enable set whether to enable indexing announcement to the network and expose endpoints that // allow indexer nodes to process announcements. Enabled by default. Enable bool @@ -601,3 +601,31 @@ type Wallet struct { type FeeConfig struct { DefaultMaxFee types.FIL } + +type ActorEventConfig struct { + // EnableRealTimeFilterAPI enables APIs that can create and query filters for actor events as they are emitted. + EnableRealTimeFilterAPI bool + + // EnableHistoricFilterAPI enables APIs that can create and query filters for actor events that occurred in the past. + // A queryable index of events will be maintained. + EnableHistoricFilterAPI bool + + // FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than + // this time become eligible for automatic deletion. + FilterTTL Duration + + // MaxFilters specifies the maximum number of filters that may exist at any one time. + MaxFilters int + + // MaxFilterResults specifies the maximum number of results that can be accumulated by an actor event filter. + MaxFilterResults int + + // MaxFilterHeightRange specifies the maximum range of heights that can be used in a filter (to avoid querying + // the entire chain) + MaxFilterHeightRange uint64 + + // Others, not implemented yet: + // Set a limit on the number of active websocket subscriptions (may be zero) + // Set a timeout for subscription clients + // Set upper bound on index size +} diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 9b5472e37..a06c5448a 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -3,9 +3,13 @@ package full import ( "bytes" "context" + "errors" "fmt" "strconv" + "sync" + "time" + "github.com/google/uuid" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" "go.uber.org/fx" @@ -17,11 +21,12 @@ import ( builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/builtin/v10/eam" "github.com/filecoin-project/go-state-types/builtin/v10/evm" - "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" + builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin" + "github.com/filecoin-project/lotus/chain/events/filter" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" @@ -57,7 +62,22 @@ type EthModuleAPI interface { // EthFeeHistory(ctx context.Context, blkCount string) } -var _ EthModuleAPI = *new(api.FullNode) +type EthEventAPI interface { + EthGetLogs(ctx context.Context, filter *api.EthFilterSpec) (*api.EthFilterResult, error) + EthGetFilterChanges(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) + EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) + EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) (api.EthFilterID, error) + EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error) + EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error) + EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error) + EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) + EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) +} + +var ( + _ EthModuleAPI = *new(api.FullNode) + _ EthEventAPI = *new(api.FullNode) +) // EthModule provides a default implementation of EthModuleAPI. // It can be swapped out with another implementation through Dependency @@ -76,12 +96,25 @@ type EthModule struct { var _ EthModuleAPI = (*EthModule)(nil) +type EthEvent struct { + Chain *store.ChainStore + EventFilterManager *filter.EventFilterManager + TipSetFilterManager *filter.TipSetFilterManager + MemPoolFilterManager *filter.MemPoolFilterManager + FilterStore filter.FilterStore + SubManager ethSubscriptionManager + MaxFilterHeightRange abi.ChainEpoch +} + +var _ EthEventAPI = (*EthEvent)(nil) + type EthAPI struct { fx.In Chain *store.ChainStore EthModuleAPI + EthEventAPI } func (a *EthModule) StateNetworkName(ctx context.Context) (dtypes.NetworkName, error) { @@ -463,7 +496,7 @@ func (a *EthModule) EthProtocolVersion(ctx context.Context) (api.EthUint64, erro } func (a *EthModule) EthMaxPriorityFeePerGas(ctx context.Context) (api.EthBigInt, error) { - gasPremium, err := a.GasAPI.GasEstimateGasPremium(ctx, 0, builtin.SystemActorAddr, 10000, types.EmptyTSK) + gasPremium, err := a.GasAPI.GasEstimateGasPremium(ctx, 0, builtinactors.SystemActorAddr, 10000, types.EmptyTSK) if err != nil { return api.EthBigInt(big.Zero()), err } @@ -826,3 +859,554 @@ func (a *EthModule) ethTxFromFilecoinMessageLookup(ctx context.Context, msgLooku } return tx, nil } + +func (e *EthEvent) EthGetLogs(ctx context.Context, filter *api.EthFilterSpec) (*api.EthFilterResult, error) { + // TODO: implement EthGetLogs + return nil, api.ErrNotSupported +} + +func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) { + if e.FilterStore == nil { + return nil, api.ErrNotSupported + } + + f, err := e.FilterStore.Get(ctx, string(id)) + if err != nil { + return nil, err + } + + switch fc := f.(type) { + case filterEventCollector: + return ethFilterResultFromEvents(fc.TakeCollectedEvents(ctx)) + case filterTipSetCollector: + return ethFilterResultFromTipSets(fc.TakeCollectedTipSets(ctx)) + case filterMessageCollector: + return ethFilterResultFromMessages(fc.TakeCollectedMessages(ctx)) + } + + return nil, xerrors.Errorf("unknown filter type") +} + +func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) { + if e.FilterStore == nil { + return nil, api.ErrNotSupported + } + + f, err := e.FilterStore.Get(ctx, string(id)) + if err != nil { + return nil, err + } + + switch fc := f.(type) { + case filterEventCollector: + return ethFilterResultFromEvents(fc.TakeCollectedEvents(ctx)) + } + + return nil, xerrors.Errorf("wrong filter type") +} + +func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) (api.EthFilterID, error) { + if e.FilterStore == nil || e.EventFilterManager == nil { + return "", api.ErrNotSupported + } + + var ( + minHeight abi.ChainEpoch + maxHeight abi.ChainEpoch + tipsetCid cid.Cid + addresses []address.Address + keys = map[string][][]byte{} + ) + + if filter.BlockHash != nil { + if filter.FromBlock != nil || filter.ToBlock != nil { + return "", xerrors.Errorf("must not specify block hash and from/to block") + } + + // TODO: derive a tipset hash from eth hash - might need to push this down into the EventFilterManager + } else { + if filter.FromBlock == nil || *filter.FromBlock == "latest" { + ts := e.Chain.GetHeaviestTipSet() + minHeight = ts.Height() + } else if *filter.FromBlock == "earliest" { + minHeight = 0 + } else if *filter.FromBlock == "pending" { + return "", api.ErrNotSupported + } else { + epoch, err := strconv.ParseUint(*filter.FromBlock, 10, 64) + if err != nil { + return "", xerrors.Errorf("invalid epoch") + } + minHeight = abi.ChainEpoch(epoch) + } + + if filter.ToBlock == nil || *filter.ToBlock == "latest" { + // here latest means the latest at the time + maxHeight = -1 + } else if *filter.ToBlock == "earliest" { + maxHeight = 0 + } else if *filter.ToBlock == "pending" { + return "", api.ErrNotSupported + } else { + epoch, err := strconv.ParseUint(*filter.ToBlock, 10, 64) + if err != nil { + return "", xerrors.Errorf("invalid epoch") + } + maxHeight = abi.ChainEpoch(epoch) + } + + // Validate height ranges are within limits set by node operator + if minHeight == -1 && maxHeight > 0 { + // Here the client is looking for events between the head and some future height + ts := e.Chain.GetHeaviestTipSet() + if maxHeight-ts.Height() > e.MaxFilterHeightRange { + return "", xerrors.Errorf("invalid epoch range") + } + } else if minHeight >= 0 && maxHeight == -1 { + // Here the client is looking for events between some time in the past and the current head + ts := e.Chain.GetHeaviestTipSet() + if ts.Height()-minHeight > e.MaxFilterHeightRange { + return "", xerrors.Errorf("invalid epoch range") + } + + } else if minHeight >= 0 && maxHeight >= 0 { + if minHeight > maxHeight || maxHeight-minHeight > e.MaxFilterHeightRange { + return "", xerrors.Errorf("invalid epoch range") + } + } + + } + for _, ea := range filter.Address { + a, err := ea.ToFilecoinAddress() + if err != nil { + return "", xerrors.Errorf("invalid address %x", ea) + } + addresses = append(addresses, a) + } + + for idx, vals := range filter.Topics { + // Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4 + key := fmt.Sprintf("topic%d", idx+1) + keyvals := make([][]byte, len(vals)) + for i, v := range vals { + keyvals[i] = v[:] + } + keys[key] = keyvals + } + + f, err := e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys) + if err != nil { + return "", err + } + + if err := e.FilterStore.Add(ctx, f); err != nil { + // Could not record in store, attempt to delete filter to clean up + err2 := e.TipSetFilterManager.Remove(ctx, f.ID()) + if err2 != nil { + return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) + } + + return "", err + } + + return api.EthFilterID(f.ID()), nil +} + +func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error) { + if e.FilterStore == nil || e.TipSetFilterManager == nil { + return "", api.ErrNotSupported + } + + f, err := e.TipSetFilterManager.Install(ctx) + if err != nil { + return "", err + } + + if err := e.FilterStore.Add(ctx, f); err != nil { + // Could not record in store, attempt to delete filter to clean up + err2 := e.TipSetFilterManager.Remove(ctx, f.ID()) + if err2 != nil { + return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) + } + + return "", err + } + + return api.EthFilterID(f.ID()), nil +} + +func (e *EthEvent) EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error) { + if e.FilterStore == nil || e.MemPoolFilterManager == nil { + return "", api.ErrNotSupported + } + + f, err := e.MemPoolFilterManager.Install(ctx) + if err != nil { + return "", err + } + + if err := e.FilterStore.Add(ctx, f); err != nil { + // Could not record in store, attempt to delete filter to clean up + err2 := e.MemPoolFilterManager.Remove(ctx, f.ID()) + if err2 != nil { + return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) + } + + return "", err + } + + return api.EthFilterID(f.ID()), nil +} + +func (e *EthEvent) EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error) { + if e.FilterStore == nil { + return false, api.ErrNotSupported + } + + f, err := e.FilterStore.Get(ctx, string(id)) + if err != nil { + if errors.Is(err, filter.ErrFilterNotFound) { + return false, nil + } + return false, err + } + + if err := e.uninstallFilter(ctx, f); err != nil { + return false, err + } + + return true, nil +} + +func (e *EthEvent) uninstallFilter(ctx context.Context, f filter.Filter) error { + switch f.(type) { + case *filter.EventFilter: + err := e.EventFilterManager.Remove(ctx, f.ID()) + if err != nil && !errors.Is(err, filter.ErrFilterNotFound) { + return err + } + case *filter.TipSetFilter: + err := e.TipSetFilterManager.Remove(ctx, f.ID()) + if err != nil && !errors.Is(err, filter.ErrFilterNotFound) { + return err + } + case *filter.MemPoolFilter: + err := e.MemPoolFilterManager.Remove(ctx, f.ID()) + if err != nil && !errors.Is(err, filter.ErrFilterNotFound) { + return err + } + default: + return xerrors.Errorf("unknown filter type") + } + + return e.FilterStore.Remove(ctx, f.ID()) +} + +const ( + EthSubscribeEventTypeHeads = "newHeads" + EthSubscribeEventTypeLogs = "logs" +) + +func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { + // Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the + // method to be "eth_subscription". This probably doesn't matter in practice. + + // Validate event types and parameters first + for _, et := range eventTypes { + switch et { + case EthSubscribeEventTypeHeads: + case EthSubscribeEventTypeLogs: + default: + return nil, xerrors.Errorf("unsupported event type: %s", et) + + } + } + + sub, err := e.SubManager.StartSubscription(ctx) + if err != nil { + return nil, err + } + + for _, et := range eventTypes { + switch et { + case EthSubscribeEventTypeHeads: + f, err := e.TipSetFilterManager.Install(ctx) + if err != nil { + // clean up any previous filters added and stop the sub + _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) + return nil, err + } + sub.addFilter(ctx, f) + + case EthSubscribeEventTypeLogs: + keys := map[string][][]byte{} + for idx, vals := range params.Topics { + // Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4 + key := fmt.Sprintf("topic%d", idx+1) + keyvals := make([][]byte, len(vals)) + for i, v := range vals { + keyvals[i] = v[:] + } + keys[key] = keyvals + } + + f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys) + if err != nil { + // clean up any previous filters added and stop the sub + _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) + return nil, err + } + sub.addFilter(ctx, f) + } + } + + return sub.out, nil +} + +func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) { + filters, err := e.SubManager.StopSubscription(ctx, string(id)) + if err != nil { + return false, nil + } + + for _, f := range filters { + if err := e.uninstallFilter(ctx, f); err != nil { + // this will leave the filter a zombie, collecting events up to the maximum allowed + log.Warnf("failed to remove filter when unsubscribing: %v", err) + } + } + + return true, nil +} + +// GC runs a garbage collection loop, deleting filters that have not been used within the ttl window +func (e *EthEvent) GC(ctx context.Context, ttl time.Duration) { + if e.FilterStore == nil { + return + } + + tt := time.NewTicker(time.Minute * 30) + defer tt.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-tt.C: + fs := e.FilterStore.NotTakenSince(time.Now().Add(-ttl)) + for _, f := range fs { + if err := e.uninstallFilter(ctx, f); err != nil { + log.Warnf("Failed to remove actor event filter during garbage collection: %v", err) + } + } + } + } +} + +type filterEventCollector interface { + TakeCollectedEvents(context.Context) []*filter.CollectedEvent +} + +type filterMessageCollector interface { + TakeCollectedMessages(context.Context) []cid.Cid +} + +type filterTipSetCollector interface { + TakeCollectedTipSets(context.Context) []types.TipSetKey +} + +var ( + ethTopic1 = []byte("topic1") + ethTopic2 = []byte("topic2") + ethTopic3 = []byte("topic3") + ethTopic4 = []byte("topic4") +) + +func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResult, error) { + res := &api.EthFilterResult{} + + for _, ev := range evs { + log := api.EthLog{ + Removed: ev.Reverted, + LogIndex: api.EthUint64(ev.EventIdx), + TransactionIndex: api.EthUint64(ev.MsgIdx), + BlockNumber: api.EthUint64(ev.Height), + } + + var err error + + for _, entry := range ev.Event.Entries { + hash := api.EthHashData(entry.Value) + if bytes.Equal(entry.Key, ethTopic1) || bytes.Equal(entry.Key, ethTopic2) || bytes.Equal(entry.Key, ethTopic3) || bytes.Equal(entry.Key, ethTopic4) { + log.Topics = append(log.Topics, hash) + } else { + log.Data = append(log.Data, hash) + } + } + + log.Address, err = api.EthAddressFromFilecoinAddress(ev.Event.Emitter) + if err != nil { + return nil, err + } + + log.TransactionHash, err = api.EthHashFromCid(ev.MsgCid) + if err != nil { + return nil, err + } + + c, err := ev.TipSetKey.Cid() + if err != nil { + return nil, err + } + log.BlockHash, err = api.EthHashFromCid(c) + if err != nil { + return nil, err + } + + res.NewLogs = append(res.NewLogs, log) + } + + return res, nil +} + +func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*api.EthFilterResult, error) { + res := &api.EthFilterResult{} + + for _, tsk := range tsks { + c, err := tsk.Cid() + if err != nil { + return nil, err + } + hash, err := api.EthHashFromCid(c) + if err != nil { + return nil, err + } + + res.NewBlockHashes = append(res.NewBlockHashes, hash) + } + + return res, nil +} + +func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) { + res := &api.EthFilterResult{} + + for _, c := range cs { + hash, err := api.EthHashFromCid(c) + if err != nil { + return nil, err + } + + res.NewTransactionHashes = append(res.NewTransactionHashes, hash) + } + + return res, nil +} + +type ethSubscriptionManager struct { + mu sync.Mutex + subs map[string]*ethSubscription +} + +func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { + id, err := uuid.NewRandom() + if err != nil { + return nil, xerrors.Errorf("new uuid: %w", err) + } + + ctx, quit := context.WithCancel(ctx) + + sub := ðSubscription{ + id: id.String(), + in: make(chan interface{}, 200), + out: make(chan api.EthSubscriptionResponse), + quit: quit, + } + + e.mu.Lock() + if e.subs == nil { + e.subs = make(map[string]*ethSubscription) + } + e.subs[sub.id] = sub + e.mu.Unlock() + + go sub.start(ctx) + + return sub, nil +} + +func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) { + e.mu.Lock() + defer e.mu.Unlock() + + sub, ok := e.subs[id] + if !ok { + return nil, xerrors.Errorf("subscription not found") + } + sub.stop() + delete(e.subs, id) + + return sub.filters, nil +} + +type ethSubscription struct { + id string + in chan interface{} + out chan api.EthSubscriptionResponse + + mu sync.Mutex + filters []filter.Filter + quit func() +} + +func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { + e.mu.Lock() + defer e.mu.Unlock() + + f.SetSubChannel(e.in) + e.filters = append(e.filters, f) +} + +func (e *ethSubscription) start(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case v := <-e.in: + resp := api.EthSubscriptionResponse{ + SubscriptionID: api.EthSubscriptionID(e.id), + } + + var err error + switch vt := v.(type) { + case *filter.CollectedEvent: + resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt}) + case *types.TipSet: + resp.Result = vt + default: + log.Warnf("unexpected subscription value type: %T", vt) + } + + if err != nil { + continue + } + + select { + case e.out <- resp: + default: + // Skip if client is not reading responses + } + } + } +} + +func (e *ethSubscription) stop() { + e.mu.Lock() + defer e.mu.Unlock() + + if e.quit != nil { + e.quit() + close(e.out) + e.quit = nil + } +} diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go new file mode 100644 index 000000000..7e64470f4 --- /dev/null +++ b/node/modules/actorevent.go @@ -0,0 +1,94 @@ +package modules + +import ( + "context" + "time" + + "go.uber.org/fx" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/events" + "github.com/filecoin-project/lotus/chain/events/filter" + "github.com/filecoin-project/lotus/chain/messagepool" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/impl/full" + "github.com/filecoin-project/lotus/node/modules/helpers" +) + +type EventAPI struct { + fx.In + + full.ChainAPI + full.StateAPI +} + +var _ events.EventAPI = &EventAPI{} + +func EthEvent(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) { + ee := &full.EthEvent{ + Chain: cs, + MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange), + } + + if !cfg.EnableRealTimeFilterAPI && !cfg.EnableHistoricFilterAPI { + // all event functionality is disabled + return ee, nil + } + + ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters) + + // Start garbage collection for filters + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go ee.GC(ctx, time.Duration(cfg.FilterTTL)) + return nil + }, + }) + + if cfg.EnableRealTimeFilterAPI { + ee.EventFilterManager = &filter.EventFilterManager{ + ChainStore: cs, + MaxFilterResults: cfg.MaxFilterResults, + } + ee.TipSetFilterManager = &filter.TipSetFilterManager{ + MaxFilterResults: cfg.MaxFilterResults, + } + ee.MemPoolFilterManager = &filter.MemPoolFilterManager{ + MaxFilterResults: cfg.MaxFilterResults, + } + + const ChainHeadConfidence = 1 + + ctx := helpers.LifecycleCtx(mctx, lc) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence) + if err != nil { + return err + } + // ignore returned tipsets + _ = ev.Observe(ee.EventFilterManager) + _ = ev.Observe(ee.TipSetFilterManager) + + ch, err := mp.Updates(ctx) + if err != nil { + return err + } + go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch) + + return nil + }, + }) + + } + + if cfg.EnableHistoricFilterAPI { + // TODO: enable indexer + } + + return ee, nil + } +}