Ethereum compatible actor event API

This commit is contained in:
Ian Davis 2022-11-10 11:27:58 +00:00
parent 4eb94b5cea
commit 69210d0917
17 changed files with 2533 additions and 23 deletions

View File

@ -811,6 +811,11 @@ workflows:
- gofmt
- gen-check
- docs-check
- test:
name: test-itest-actor_events
suite: itest-actor_events
target: "./itests/actor_events_test.go"
- test:
name: test-itest-api
suite: itest-api

View File

@ -26,7 +26,7 @@ import (
abinetwork "github.com/filecoin-project/go-state-types/network"
apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/chain/actors/builtin"
builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/types"
@ -794,6 +794,41 @@ type FullNode interface {
EthSendRawTransaction(ctx context.Context, rawTx EthBytes) (EthHash, error) //perm:read
// Returns event logs matching given filter spec.
EthGetLogs(ctx context.Context, filter *EthFilterSpec) (*EthFilterResult, error) //perm:read
// Polling method for a filter, returns event logs which occurred since last poll.
// (requires write perm since timestamp of last filter execution will be written)
EthGetFilterChanges(ctx context.Context, id EthFilterID) (*EthFilterResult, error) //perm:write
// Returns event logs matching filter with given id.
// (requires write perm since timestamp of last filter execution will be written)
EthGetFilterLogs(ctx context.Context, id EthFilterID) (*EthFilterResult, error) //perm:write
// Installs a persistent filter based on given filter spec.
EthNewFilter(ctx context.Context, filter *EthFilterSpec) (EthFilterID, error) //perm:write
// Installs a persistent filter to notify when a new block arrives.
EthNewBlockFilter(ctx context.Context) (EthFilterID, error) //perm:write
// Installs a persistent filter to notify when new messages arrive in the message pool.
EthNewPendingTransactionFilter(ctx context.Context) (EthFilterID, error) //perm:write
// Uninstalls a filter with given id.
EthUninstallFilter(ctx context.Context, id EthFilterID) (bool, error) //perm:write
// Subscribe to different event types using websockets
// eventTypes is one or more of:
// - newHeads: notify when new blocks arrive.
// - pendingTransactions: notify when new messages arrive in the message pool.
// - logs: notify new event logs that match a criteria
// params contains additional parameters used with the log event type
// The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called.
EthSubscribe(ctx context.Context, eventTypes []string, params EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write
// Unsubscribe from a websocket subscription
EthUnsubscribe(ctx context.Context, id EthSubscriptionID) (bool, error) //perm:write
// CreateBackup creates node backup onder the specified file name. The
// method requires that the lotus daemon is running with the
// LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
@ -1184,7 +1219,7 @@ type CirculatingSupply struct {
type MiningBaseInfo struct {
MinerPower types.BigInt
NetworkPower types.BigInt
Sectors []builtin.ExtendedSectorInfo
Sectors []builtinactors.ExtendedSectorInfo
WorkerKey address.Address
SectorSize abi.SectorSize
PrevBeaconEntry types.BeaconEntry
@ -1201,7 +1236,7 @@ type BlockTemplate struct {
Messages []*types.SignedMessage
Epoch abi.ChainEpoch
Timestamp uint64
WinningPoStProof []builtin.PoStProof
WinningPoStProof []builtinactors.PoStProof
}
type DataSize struct {

View File

@ -22,7 +22,7 @@ import (
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
abinetwork "github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/chain/actors/builtin"
builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
@ -152,7 +152,7 @@ type StorageMiner interface {
WorkerStats(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) //perm:admin
WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) //perm:admin
//storiface.WorkerReturn
// storiface.WorkerReturn
ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storiface.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true
@ -175,7 +175,7 @@ type StorageMiner interface {
// SealingSchedDiag dumps internal sealing scheduler state
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin
//SealingSchedRemove removes a request from sealing pipeline
// SealingSchedRemove removes a request from sealing pipeline
SealingRemoveRequest(ctx context.Context, schedId uuid.UUID) error //perm:admin
// paths.SectorIndex
@ -322,7 +322,7 @@ type StorageMiner interface {
CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef, expensive bool) (map[abi.SectorNumber]string, error) //perm:admin
ComputeProof(ctx context.Context, ssi []builtin.ExtendedSectorInfo, rand abi.PoStRandomness, poStEpoch abi.ChainEpoch, nv abinetwork.Version) ([]builtin.PoStProof, error) //perm:read
ComputeProof(ctx context.Context, ssi []builtinactors.ExtendedSectorInfo, rand abi.PoStRandomness, poStEpoch abi.ChainEpoch, nv abinetwork.Version) ([]builtinactors.PoStProof, error) //perm:read
// RecoverFault can be used to declare recoveries manually. It sends messages
// to the miner actor with details of recovered sectors and returns the CID of messages. It honors the

View File

@ -298,7 +298,8 @@ func init() {
"title": "Lotus RPC API",
"version": "1.2.1/generated=2020-11-22T08:22:42-06:00",
},
"methods": []interface{}{}},
"methods": []interface{}{},
},
)
addExample(api.CheckStatusCode(0))
@ -335,7 +336,8 @@ func init() {
NumConnsInbound: 3,
NumConnsOutbound: 4,
NumFD: 5,
}})
},
})
addExample(api.NetLimit{
Memory: 123,
StreamsInbound: 1,
@ -374,10 +376,18 @@ func init() {
ethFeeHistoryReward := [][]api.EthBigInt{}
addExample(&ethFeeHistoryReward)
addExample(api.EthFilterID("c5564560217c43e4bc0484df655e9019"))
addExample(api.EthSubscriptionID("b62df77831484129adf6682332ad0725"))
pstring := func(s string) *string { return &s }
addExample(&api.EthFilterSpec{
FromBlock: pstring("2301220"),
Address: []api.EthAddress{ethaddr},
})
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {
switch pkg {
case "api": // latest
switch name {

View File

@ -11,6 +11,7 @@ import (
"strings"
"github.com/ipfs/go-cid"
"github.com/minio/blake2b-simd"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
"golang.org/x/xerrors"
@ -48,9 +49,7 @@ func (e *EthUint64) UnmarshalJSON(b []byte) error {
type EthBigInt big.Int
var (
EthBigIntZero = EthBigInt{Int: big.Zero().Int}
)
var EthBigIntZero = EthBigInt{Int: big.Zero().Int}
func (e EthBigInt) MarshalJSON() ([]byte, error) {
if e.Int == nil {
@ -396,7 +395,6 @@ func handlePrefix(s *string) {
func decodeHexString(s string, length int) ([]byte, error) {
b, err := hex.DecodeString(s)
if err != nil {
return []byte{}, xerrors.Errorf("cannot parse hash: %w", err)
}
@ -423,6 +421,10 @@ func EthHashFromHex(s string) (EthHash, error) {
return h, nil
}
func EthHashData(b []byte) EthHash {
return EthHash(blake2b.Sum256(b))
}
func (h EthHash) String() string {
return "0x" + hex.EncodeToString(h[:])
}
@ -440,3 +442,177 @@ type EthFeeHistory struct {
GasUsedRatio []float64 `json:"gasUsedRatio"`
Reward *[][]EthBigInt `json:"reward,omitempty"`
}
// An opaque identifier generated by the Lotus node to refer to an installed filter.
type EthFilterID string
// An opaque identifier generated by the Lotus node to refer to an active subscription.
type EthSubscriptionID string
type EthFilterSpec struct {
// Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first,
// "pending" for not yet committed messages.
// Optional, default: "latest".
FromBlock *string `json:"fromBlock,omitempty"`
// Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first,
// "pending" for not yet committed messages.
// Optional, default: "latest".
ToBlock *string `json:"toBlock,omitempty"`
// Actor address or a list of addresses from which event logs should originate.
// Optional, default nil.
// The JSON decoding must treat a string as equivalent to an array with one value, for example
// "0x8888f1f195afa192cfee86069858" must be decoded as [ "0x8888f1f195afa192cfee86069858" ]
Address EthAddressList `json:"address"`
// List of topics to be matched.
// Optional, default: empty list
Topics EthTopicSpec `json:"topics"`
// Restricts event logs returned to those in receipts contained in the tipset this block is part of.
// If BlockHash is present in in the filter criteria, then neither FromBlock nor ToBlock are allowed.
// Added in EIP-234
BlockHash *EthHash `json:"blockHash,omitempty"`
}
// EthAddressSpec represents a list of addresses.
// The JSON decoding must treat a string as equivalent to an array with one value, for example
// "0x8888f1f195afa192cfee86069858" must be decoded as [ "0x8888f1f195afa192cfee86069858" ]
type EthAddressList []EthAddress
func (e *EthAddressList) UnmarshalJSON(b []byte) error {
if len(b) > 0 && b[0] == '[' {
var addrs []EthAddress
err := json.Unmarshal(b, &addrs)
if err != nil {
return err
}
*e = addrs
return nil
}
var addr EthAddress
err := json.Unmarshal(b, &addr)
if err != nil {
return err
}
*e = []EthAddress{addr}
return nil
}
// TopicSpec represents a specification for matching by topic. An empty spec means all topics
// will be matched. Otherwise topics are matched conjunctively in the first dimension of the
// slice and disjunctively in the second dimension. Topics are matched in order.
// An event log with topics [A, B] will be matched by the following topic specs:
// [] "all"
// [[A]] "A in first position (and anything after)"
// [[A]] "A in first position (and anything after)"
// [nil, [B] ] "anything in first position AND B in second position (and anything after)"
// [[A], [B]] "A in first position AND B in second position (and anything after)"
// [[A, B], [A, B]] "(A OR B) in first position AND (A OR B) in second position (and anything after)"
//
// The JSON decoding must treat string values as equivalent to arrays with one value, for example
// { "A", [ "B", "C" ] } must be decoded as [ [ A ], [ B, C ] ]
type EthTopicSpec []EthHashList
type EthHashList []EthHash
func (e *EthHashList) UnmarshalJSON(b []byte) error {
if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) {
return nil
}
if len(b) > 0 && b[0] == '[' {
var hashes []EthHash
err := json.Unmarshal(b, &hashes)
if err != nil {
return err
}
*e = hashes
return nil
}
var hash EthHash
err := json.Unmarshal(b, &hash)
if err != nil {
return err
}
*e = []EthHash{hash}
return nil
}
// FilterResult represents the response from executing a filter: a list of bloack hashes, a list of transaction hashes
// or a list of logs
// This is a union type. Only one field will be populated.
// The JSON encoding must produce an array of the populated field.
type EthFilterResult struct {
// List of block hashes. Only populated when the filter has been installed via EthNewBlockFilter
NewBlockHashes []EthHash
// List of transaction hashes. Only populated when the filter has been installed via EthNewPendingTransactionFilter
NewTransactionHashes []EthHash
// List of event logs. Only populated when the filter has been installed via EthNewFilter
NewLogs []EthLog
}
func (h EthFilterResult) MarshalJSON() ([]byte, error) {
if h.NewBlockHashes != nil {
return json.Marshal(h.NewBlockHashes)
}
if h.NewTransactionHashes != nil {
return json.Marshal(h.NewTransactionHashes)
}
if h.NewLogs != nil {
return json.Marshal(h.NewLogs)
}
return []byte{'[', ']'}, nil
}
type EthLog struct {
// Address is the address of the actor that produced the event log.
Address EthAddress `json:"address"`
// Data is the values of the event log, excluding topics
Data []EthHash `json:"data"`
// List of topics associated with the event log.
Topics []EthHash `json:"topics"`
// Following fields are derived from the transaction containing the log
// Indicates whether the log was removed due to a chain reorganization.
Removed bool `json:"removed"`
// LogIndex is the index of the event log in the sequence of events produced by the message execution.
// (this is the index in the events AMT on the message receipt)
LogIndex EthUint64 `json:"logIndex"`
// TransactionIndex is the index in the tipset of the transaction that produced the event log.
// The index corresponds to the sequence of messages produced by ChainGetParentMessages
TransactionIndex EthUint64 `json:"transactionIndex"`
// TransactionHash is the cid of the transaction that produced the event log.
TransactionHash EthHash `json:"transactionHash"`
// BlockHash is the hash of a block in the tipset containing the message receipt of the message execution.
// This may be passed to ChainGetParentReceipts to obtain a list of receipts. The receipt
// containing the events will be at TransactionIndex in the receipt list.
BlockHash EthHash `json:"blockHash"`
// BlockNumber is the epoch at which the message was executed. This is the epoch containing
// the message receipt.
BlockNumber EthUint64 `json:"blockNumber"`
}
type EthSubscriptionParams struct {
// List of topics to be matched.
// Optional, default: empty list
Topics EthTopicSpec `json:"topics,omitempty"`
}
type EthSubscriptionResponse struct {
// The persistent identifier for the subscription which can be used to unsubscribe.
SubscriptionID EthSubscriptionID `json:"subscription"`
// The object matching the subscription. This may be a Block (tipset), a Transaction (message) or an EthLog
Result interface{} `json:"result"`
}

View File

@ -2,6 +2,7 @@
package api
import (
"encoding/json"
"strings"
"testing"
@ -30,6 +31,7 @@ func TestEthIntMarshalJSON(t *testing.T) {
require.Equal(t, j, tc.Output)
}
}
func TestEthIntUnmarshalJSON(t *testing.T) {
testcases := []TestCase{
{[]byte("\"0x0\""), EthUint64(0)},
@ -155,3 +157,215 @@ func TestUnmarshalEthBytes(t *testing.T) {
require.Equal(t, string(data), tc)
}
}
func TestEthFilterResultMarshalJSON(t *testing.T) {
hash1, err := EthHashFromHex("013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184")
require.NoError(t, err, "eth hash")
hash2, err := EthHashFromHex("ab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738")
require.NoError(t, err, "eth hash")
addr, err := EthAddressFromHex("d4c5fb16488Aa48081296299d54b0c648C9333dA")
require.NoError(t, err, "eth address")
log := EthLog{
Removed: true,
LogIndex: 5,
TransactionIndex: 45,
TransactionHash: hash1,
BlockHash: hash2,
BlockNumber: 53,
Topics: []EthHash{hash1},
Data: []EthHash{hash1},
Address: addr,
}
logjson, err := json.Marshal(log)
require.NoError(t, err, "log json")
testcases := []struct {
res EthFilterResult
want string
}{
{
res: EthFilterResult{},
want: "[]",
},
{
res: EthFilterResult{
NewBlockHashes: []EthHash{hash1, hash2},
},
want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`,
},
{
res: EthFilterResult{
NewTransactionHashes: []EthHash{hash1, hash2},
},
want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`,
},
{
res: EthFilterResult{
NewLogs: []EthLog{log},
},
want: `[` + string(logjson) + `]`,
},
}
for _, tc := range testcases {
data, err := json.Marshal(tc.res)
require.NoError(t, err)
require.Equal(t, tc.want, string(data))
}
}
func TestEthFilterSpecUnmarshalJSON(t *testing.T) {
hash1, err := EthHashFromHex("013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184")
require.NoError(t, err, "eth hash")
hash2, err := EthHashFromHex("ab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738")
require.NoError(t, err, "eth hash")
addr, err := EthAddressFromHex("d4c5fb16488Aa48081296299d54b0c648C9333dA")
require.NoError(t, err, "eth address")
pstring := func(s string) *string { return &s }
phash := func(h EthHash) *EthHash { return &h }
testcases := []struct {
input string
want EthFilterSpec
}{
{
input: `{"fromBlock":"latest"}`,
want: EthFilterSpec{FromBlock: pstring("latest")},
},
{
input: `{"toBlock":"pending"}`,
want: EthFilterSpec{ToBlock: pstring("pending")},
},
{
input: `{"address":["0xd4c5fb16488Aa48081296299d54b0c648C9333dA"]}`,
want: EthFilterSpec{Address: EthAddressList{addr}},
},
{
input: `{"address":"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"}`,
want: EthFilterSpec{Address: EthAddressList{addr}},
},
{
input: `{"blockHash":"0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"}`,
want: EthFilterSpec{BlockHash: phash(hash1)},
},
{
input: `{"topics":["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"]}`,
want: EthFilterSpec{
Topics: EthTopicSpec{
{hash1},
},
},
},
{
input: `{"topics":["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]}`,
want: EthFilterSpec{
Topics: EthTopicSpec{
{hash1},
{hash2},
},
},
},
{
input: `{"topics":[null, ["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]]}`,
want: EthFilterSpec{
Topics: EthTopicSpec{
nil,
{hash1, hash2},
},
},
},
{
input: `{"topics":[null, "0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"]}`,
want: EthFilterSpec{
Topics: EthTopicSpec{
nil,
{hash1},
},
},
},
}
for _, tc := range testcases {
var got EthFilterSpec
err := json.Unmarshal([]byte(tc.input), &got)
require.NoError(t, err)
require.Equal(t, tc.want, got)
}
}
func TestEthAddressListUnmarshalJSON(t *testing.T) {
addr1, err := EthAddressFromHex("d4c5fb16488Aa48081296299d54b0c648C9333dA")
require.NoError(t, err, "eth address")
addr2, err := EthAddressFromHex("abbbfb16488Aa48081296299d54b0c648C9333dA")
require.NoError(t, err, "eth address")
testcases := []struct {
input string
want EthAddressList
}{
{
input: `["0xd4c5fb16488Aa48081296299d54b0c648C9333dA"]`,
want: EthAddressList{addr1},
},
{
input: `["0xd4c5fb16488Aa48081296299d54b0c648C9333dA","abbbfb16488Aa48081296299d54b0c648C9333dA"]`,
want: EthAddressList{addr1, addr2},
},
{
input: `"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"`,
want: EthAddressList{addr1},
},
}
for _, tc := range testcases {
var got EthAddressList
err := json.Unmarshal([]byte(tc.input), &got)
require.NoError(t, err)
require.Equal(t, tc.want, got)
}
}
func TestEthHashListUnmarshalJSON(t *testing.T) {
hash1, err := EthHashFromHex("013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184")
require.NoError(t, err, "eth hash")
hash2, err := EthHashFromHex("ab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738")
require.NoError(t, err, "eth hash")
testcases := []struct {
input string
want *EthHashList
}{
{
input: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"]`,
want: &EthHashList{hash1},
},
{
input: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`,
want: &EthHashList{hash1, hash2},
},
{
input: `"0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"`,
want: &EthHashList{hash1},
},
{
input: `null`,
want: nil,
},
}
for _, tc := range testcases {
var got *EthHashList
err := json.Unmarshal([]byte(tc.input), &got)
require.NoError(t, err)
require.Equal(t, tc.want, got)
}
}

View File

@ -0,0 +1,394 @@
package filter
import (
"bytes"
"context"
"sync"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
cstore "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
const indexed uint8 = 0x01
type EventFilter struct {
id string
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
tipsetCid cid.Cid
addresses []address.Address // list of actor ids that originated the event
keys map[string][][]byte // map of key names to a list of alternate values that may match
maxResults int // maximum number of results to collect, 0 is unlimited
mu sync.Mutex
collected []*CollectedEvent
lastTaken time.Time
ch chan<- interface{}
}
var _ Filter = (*EventFilter)(nil)
type CollectedEvent struct {
Event *types.Event
EventIdx int // index of the event within the list of emitted events
Reverted bool
Height abi.ChainEpoch
TipSetKey types.TipSetKey // tipset that contained the message
MsgIdx int // index of the message in the tipset
MsgCid cid.Cid // cid of message that produced event
}
func (f *EventFilter) ID() string {
return f.id
}
func (f *EventFilter) SetSubChannel(ch chan<- interface{}) {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = ch
f.collected = nil
}
func (f *EventFilter) ClearSubChannel() {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = nil
}
func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool) error {
if !f.matchTipset(te) {
return nil
}
ems, err := te.messages(ctx)
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
if !f.matchAddress(ev.Emitter) {
continue
}
if !f.matchKeys(ev.Entries) {
continue
}
// event matches filter, so record it
cev := &CollectedEvent{
Event: ev,
EventIdx: evIdx,
Reverted: revert,
Height: te.msgTs.Height(),
TipSetKey: te.msgTs.Key(),
MsgCid: em.Message().Cid(),
MsgIdx: msgIdx,
}
f.mu.Lock()
// if we have a subscription channel then push event to it
if f.ch != nil {
f.ch <- cev
f.mu.Unlock()
continue
}
if f.maxResults > 0 && len(f.collected) == f.maxResults {
copy(f.collected, f.collected[1:])
f.collected = f.collected[:len(f.collected)-1]
}
f.collected = append(f.collected, cev)
f.mu.Unlock()
}
}
return nil
}
func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
f.mu.Lock()
collected := f.collected
f.collected = nil
f.lastTaken = time.Now().UTC()
f.mu.Unlock()
return collected
}
func (f *EventFilter) LastTaken() time.Time {
f.mu.Lock()
defer f.mu.Unlock()
return f.lastTaken
}
// matchTipset reports whether this filter matches the given tipset
func (f *EventFilter) matchTipset(te *TipSetEvents) bool {
if f.tipsetCid != cid.Undef {
tsCid, err := te.Cid()
if err != nil {
return false
}
return f.tipsetCid.Equals(tsCid)
}
if f.minHeight >= 0 && f.minHeight > te.Height() {
return false
}
if f.maxHeight >= 0 && f.maxHeight < te.Height() {
return false
}
return true
}
func (f *EventFilter) matchAddress(o address.Address) bool {
if len(f.addresses) == 0 {
return true
}
// Assume short lists of addresses
// TODO: binary search for longer lists
for _, a := range f.addresses {
if a == o {
return true
}
}
return false
}
func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
if len(f.keys) == 0 {
return true
}
// TODO: optimize this naive algorithm
// Note keys names may be repeated so we may have multiple opportunities to match
matched := map[string]bool{}
for _, ee := range ees {
// Skip an entry that is not indexable
if ee.Flags&indexed != indexed {
continue
}
keyname := string(ee.Key)
// skip if we have already matched this key
if matched[keyname] {
continue
}
wantlist, ok := f.keys[keyname]
if !ok {
continue
}
for _, w := range wantlist {
if bytes.Equal(w, ee.Value) {
matched[keyname] = true
break
}
}
if len(matched) == len(f.keys) {
// all keys have been matched
return true
}
}
return false
}
type TipSetEvents struct {
rctTs *types.TipSet // rctTs is the tipset containing the receipts of executed messages
msgTs *types.TipSet // msgTs is the tipset containing the messages that have been executed
load func(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error)
once sync.Once // for lazy population of ems
ems []executedMessage
err error
}
func (te *TipSetEvents) Height() abi.ChainEpoch {
return te.msgTs.Height()
}
func (te *TipSetEvents) Cid() (cid.Cid, error) {
return te.msgTs.Key().Cid()
}
func (te *TipSetEvents) messages(ctx context.Context) ([]executedMessage, error) {
te.once.Do(func() {
// populate executed message list
ems, err := te.load(ctx, te.msgTs, te.rctTs)
if err != nil {
te.err = err
return
}
te.ems = ems
})
return te.ems, te.err
}
type executedMessage struct {
msg *types.Message
rct *types.MessageReceipt
// events extracted from receipt
evs []*types.Event
}
func (e *executedMessage) Message() *types.Message {
return e.msg
}
func (e *executedMessage) Receipt() *types.MessageReceipt {
return e.rct
}
func (e *executedMessage) Events() []*types.Event {
return e.evs
}
type EventFilterManager struct {
ChainStore *cstore.ChainStore
MaxFilterResults int
mu sync.Mutex // guards mutations to filters
filters map[string]*EventFilter
}
func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.filters) == 0 {
return nil
}
tse := &TipSetEvents{
msgTs: from,
rctTs: to,
load: m.loadExecutedMessages,
}
// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false); err != nil {
return err
}
}
return nil
}
func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.filters) == 0 {
return nil
}
tse := &TipSetEvents{
msgTs: to,
rctTs: from,
load: m.loadExecutedMessages,
}
// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true); err != nil {
return err
}
}
return nil
}
func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keys map[string][][]byte) (*EventFilter, error) {
id, err := uuid.NewRandom()
if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err)
}
f := &EventFilter{
id: id.String(),
minHeight: minHeight,
maxHeight: maxHeight,
tipsetCid: tipsetCid,
addresses: addresses,
keys: keys,
maxResults: m.MaxFilterResults,
}
m.mu.Lock()
m.filters[id.String()] = f
m.mu.Unlock()
return f, nil
}
func (m *EventFilterManager) Remove(ctx context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, found := m.filters[id]; !found {
return ErrFilterNotFound
}
delete(m.filters, id)
return nil
}
func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
msgs, err := m.ChainStore.MessagesForTipset(ctx, msgTs)
if err != nil {
return nil, xerrors.Errorf("read messages: %w", err)
}
st := m.ChainStore.ActorStore(ctx)
arr, err := blockadt.AsArray(st, rctTs.Blocks()[0].ParentMessageReceipts)
if err != nil {
return nil, xerrors.Errorf("load receipts amt: %w", err)
}
if uint64(len(msgs)) != arr.Length() {
return nil, xerrors.Errorf("mismatching message and receipt counts (%d msgs, %d rcts)", len(msgs), arr.Length())
}
ems := make([]executedMessage, len(msgs))
for i := 0; i < len(msgs); i++ {
ems[i].msg = msgs[i].VMMessage()
var rct types.MessageReceipt
found, err := arr.Get(uint64(i), &rct)
if err != nil {
return nil, xerrors.Errorf("load receipt: %w", err)
}
if !found {
return nil, xerrors.Errorf("receipt %d not found", i)
}
ems[i].rct = &rct
evtArr, err := blockadt.AsArray(st, rct.EventsRoot)
if err != nil {
return nil, xerrors.Errorf("load events amt: %w", err)
}
ems[i].evs = make([]*types.Event, evtArr.Length())
var evt types.Event
_ = arr.ForEach(&evt, func(i int64) error {
cpy := evt
ems[i].evs[int(i)] = &cpy
return nil
})
}
return ems, nil
}

View File

@ -0,0 +1,415 @@
package filter
import (
"context"
pseudo "math/rand"
"testing"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
mh "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/types"
)
func TestEventFilterCollectEvents(t *testing.T) {
rng := pseudo.New(pseudo.NewSource(299792458))
a1 := randomActorAddr(t, rng)
a2 := randomActorAddr(t, rng)
ev1 := fakeEvent(
a1,
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr1")},
},
[]kv{
{k: "amount", v: []byte("2988181")},
},
)
st := newStore()
events := []*types.Event{ev1}
em := executedMessage{
msg: fakeMessage(randomActorAddr(t, rng), randomActorAddr(t, rng)),
rct: fakeReceipt(t, rng, st, events),
evs: events,
}
events14000 := buildTipSetEvents(t, rng, 14000, em)
cid14000, err := events14000.msgTs.Key().Cid()
require.NoError(t, err, "tipset cid")
noCollectedEvents := []*CollectedEvent{}
oneCollectedEvent := []*CollectedEvent{
{
Event: ev1,
EventIdx: 0,
Reverted: false,
Height: 14000,
TipSetKey: events14000.msgTs.Key(),
MsgIdx: 0,
MsgCid: em.msg.Cid(),
},
}
testCases := []struct {
name string
filter *EventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
minHeight: 14001,
maxHeight: -1,
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
minHeight: -1,
maxHeight: 13999,
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match tipset min height",
filter: &EventFilter{
minHeight: 14000,
maxHeight: -1,
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match tipset cid",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch address",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match address",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match one entry",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("cancel"),
[]byte("propose"),
[]byte("approval"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("cancel"),
[]byte("propose"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"method": {
[]byte("approval"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"signer": {
[]byte("addr1"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"approver": {
[]byte("addr1"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"signer": {
[]byte("addr2"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"amount": {
[]byte("2988181"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
}
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := tc.filter.CollectEvents(context.Background(), tc.te, false); err != nil {
require.NoError(t, err, "collect events")
}
coll := tc.filter.TakeCollectedEvents(context.Background())
require.ElementsMatch(t, coll, tc.want)
})
}
}
type kv struct {
k string
v []byte
}
func fakeEvent(emitter address.Address, indexed []kv, unindexed []kv) *types.Event {
ev := &types.Event{
Emitter: emitter,
}
for _, in := range indexed {
ev.Entries = append(ev.Entries, types.EventEntry{
Flags: 0x01,
Key: []byte(in.k),
Value: in.v,
})
}
for _, in := range unindexed {
ev.Entries = append(ev.Entries, types.EventEntry{
Flags: 0x00,
Key: []byte(in.k),
Value: in.v,
})
}
return ev
}
func randomActorAddr(tb testing.TB, rng *pseudo.Rand) address.Address {
tb.Helper()
addr, err := address.NewActorAddress(randomBytes(32, rng))
require.NoError(tb, err)
return addr
}
func randomIDAddr(tb testing.TB, rng *pseudo.Rand) address.Address {
tb.Helper()
addr, err := address.NewIDAddress(uint64(rng.Int63()))
require.NoError(tb, err)
return addr
}
func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid {
tb.Helper()
cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY}
c, err := cb.Sum(randomBytes(10, rng))
require.NoError(tb, err)
return c
}
func randomBytes(n int, rng *pseudo.Rand) []byte {
buf := make([]byte, n)
rng.Read(buf)
return buf
}
func fakeMessage(to, from address.Address) *types.Message {
return &types.Message{
To: to,
From: from,
Nonce: 197,
Method: 1,
Params: []byte("some random bytes"),
GasLimit: 126723,
GasPremium: types.NewInt(4),
GasFeeCap: types.NewInt(120),
}
}
func fakeReceipt(tb testing.TB, rng *pseudo.Rand, st adt.Store, events []*types.Event) *types.MessageReceipt {
arr := blockadt.MakeEmptyArray(st)
for _, ev := range events {
err := arr.AppendContinuous(ev)
require.NoError(tb, err, "append event")
}
eventsRoot, err := arr.Root()
require.NoError(tb, err, "flush events amt")
return &types.MessageReceipt{
ExitCode: exitcode.Ok,
Return: randomBytes(32, rng),
GasUsed: rng.Int63(),
Events: eventsRoot,
}
}
func fakeTipSet(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, parents []cid.Cid) *types.TipSet {
tb.Helper()
ts, err := types.NewTipSet([]*types.BlockHeader{
{
Height: h,
Miner: randomIDAddr(tb, rng),
Parents: parents,
Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}},
ParentStateRoot: randomCid(tb, rng),
Messages: randomCid(tb, rng),
ParentMessageReceipts: randomCid(tb, rng),
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
},
{
Height: h,
Miner: randomIDAddr(tb, rng),
Parents: parents,
Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}},
ParentStateRoot: randomCid(tb, rng),
Messages: randomCid(tb, rng),
ParentMessageReceipts: randomCid(tb, rng),
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
},
})
require.NoError(tb, err)
return ts
}
func newStore() adt.Store {
ctx := context.Background()
bs := blockstore.NewMemorySync()
store := cbor.NewCborStore(bs)
return adt.WrapStore(ctx, store)
}
func buildTipSetEvents(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, em executedMessage) *TipSetEvents {
tb.Helper()
msgTs := fakeTipSet(tb, rng, h, []cid.Cid{})
rctTs := fakeTipSet(tb, rng, h+1, msgTs.Cids())
return &TipSetEvents{
msgTs: msgTs,
rctTs: rctTs,
load: func(ctx context.Context, msgTs, rctTs *types.TipSet) ([]executedMessage, error) {
return []executedMessage{em}, nil
},
}
}

View File

@ -0,0 +1,141 @@
package filter
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
)
type MemPoolFilter struct {
id string
maxResults int // maximum number of results to collect, 0 is unlimited
ch chan<- interface{}
mu sync.Mutex
collected []cid.Cid
lastTaken time.Time
}
var _ Filter = (*MemPoolFilter)(nil)
func (f *MemPoolFilter) ID() string {
return f.id
}
func (f *MemPoolFilter) SetSubChannel(ch chan<- interface{}) {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = ch
f.collected = nil
}
func (f *MemPoolFilter) ClearSubChannel() {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = nil
}
func (f *MemPoolFilter) CollectMessage(ctx context.Context, msg *types.SignedMessage) {
f.mu.Lock()
defer f.mu.Unlock()
// if we have a subscription channel then push message to it
if f.ch != nil {
f.ch <- msg
return
}
if f.maxResults > 0 && len(f.collected) == f.maxResults {
copy(f.collected, f.collected[1:])
f.collected = f.collected[:len(f.collected)-1]
}
f.collected = append(f.collected, msg.Cid())
}
func (f *MemPoolFilter) TakeCollectedMessages(context.Context) []cid.Cid {
f.mu.Lock()
collected := f.collected
f.collected = nil
f.lastTaken = time.Now().UTC()
f.mu.Unlock()
return collected
}
func (f *MemPoolFilter) LastTaken() time.Time {
f.mu.Lock()
defer f.mu.Unlock()
return f.lastTaken
}
type MemPoolFilterManager struct {
MaxFilterResults int
mu sync.Mutex // guards mutations to filters
filters map[string]*MemPoolFilter
}
func (m *MemPoolFilterManager) WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate) {
for {
select {
case <-ctx.Done():
return
case u := <-ch:
m.processUpdate(ctx, u)
}
}
}
func (m *MemPoolFilterManager) processUpdate(ctx context.Context, u api.MpoolUpdate) {
// only process added messages
if u.Type == api.MpoolRemove {
return
}
m.mu.Lock()
defer m.mu.Unlock()
if len(m.filters) == 0 {
return
}
// TODO: could run this loop in parallel with errgroup if we expect large numbers of filters
for _, f := range m.filters {
f.CollectMessage(ctx, u.Message)
}
}
func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, error) {
id, err := uuid.NewRandom()
if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err)
}
f := &MemPoolFilter{
id: id.String(),
maxResults: m.MaxFilterResults,
}
m.mu.Lock()
m.filters[id.String()] = f
m.mu.Unlock()
return f, nil
}
func (m *MemPoolFilterManager) Remove(ctx context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, found := m.filters[id]; !found {
return ErrFilterNotFound
}
delete(m.filters, id)
return nil
}

View File

@ -0,0 +1,93 @@
package filter
import (
"context"
"errors"
"sync"
"time"
)
type Filter interface {
ID() string
LastTaken() time.Time
SetSubChannel(chan<- interface{})
ClearSubChannel()
}
type FilterStore interface {
Add(context.Context, Filter) error
Get(context.Context, string) (Filter, error)
Remove(context.Context, string) error
NotTakenSince(when time.Time) []Filter // returns a list of filters that have not had their collected results taken
}
var (
ErrFilterAlreadyRegistered = errors.New("filter already registered")
ErrFilterNotFound = errors.New("filter not found")
ErrMaximumNumberOfFilters = errors.New("maximum number of filters registered")
)
type memFilterStore struct {
max int
mu sync.Mutex
filters map[string]Filter
}
var _ FilterStore = (*memFilterStore)(nil)
func NewMemFilterStore(maxFilters int) FilterStore {
return &memFilterStore{
max: maxFilters,
filters: make(map[string]Filter),
}
}
func (m *memFilterStore) Add(_ context.Context, f Filter) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.filters) >= m.max {
return ErrMaximumNumberOfFilters
}
if _, exists := m.filters[f.ID()]; exists {
return ErrFilterAlreadyRegistered
}
m.filters[f.ID()] = f
return nil
}
func (m *memFilterStore) Get(_ context.Context, id string) (Filter, error) {
m.mu.Lock()
f, found := m.filters[id]
m.mu.Unlock()
if !found {
return nil, ErrFilterNotFound
}
return f, nil
}
func (m *memFilterStore) Remove(_ context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.filters[id]; !exists {
return ErrFilterNotFound
}
delete(m.filters, id)
return nil
}
func (m *memFilterStore) NotTakenSince(when time.Time) []Filter {
m.mu.Lock()
defer m.mu.Unlock()
var res []Filter
for _, f := range m.filters {
if f.LastTaken().Before(when) {
res = append(res, f)
}
}
return res
}

View File

@ -0,0 +1,128 @@
package filter
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
)
type TipSetFilter struct {
id string
maxResults int // maximum number of results to collect, 0 is unlimited
ch chan<- interface{}
mu sync.Mutex
collected []types.TipSetKey
lastTaken time.Time
}
var _ Filter = (*TipSetFilter)(nil)
func (f *TipSetFilter) ID() string {
return f.id
}
func (f *TipSetFilter) SetSubChannel(ch chan<- interface{}) {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = ch
f.collected = nil
}
func (f *TipSetFilter) ClearSubChannel() {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = nil
}
func (f *TipSetFilter) CollectTipSet(ctx context.Context, ts *types.TipSet) {
f.mu.Lock()
defer f.mu.Unlock()
// if we have a subscription channel then push tipset to it
if f.ch != nil {
f.ch <- ts
return
}
if f.maxResults > 0 && len(f.collected) == f.maxResults {
copy(f.collected, f.collected[1:])
f.collected = f.collected[:len(f.collected)-1]
}
f.collected = append(f.collected, ts.Key())
}
func (f *TipSetFilter) TakeCollectedTipSets(context.Context) []types.TipSetKey {
f.mu.Lock()
collected := f.collected
f.collected = nil
f.lastTaken = time.Now().UTC()
f.mu.Unlock()
return collected
}
func (f *TipSetFilter) LastTaken() time.Time {
f.mu.Lock()
defer f.mu.Unlock()
return f.lastTaken
}
type TipSetFilterManager struct {
MaxFilterResults int
mu sync.Mutex // guards mutations to filters
filters map[string]*TipSetFilter
}
func (m *TipSetFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.filters) == 0 {
return nil
}
// TODO: could run this loop in parallel with errgroup
for _, f := range m.filters {
f.CollectTipSet(ctx, to)
}
return nil
}
func (m *TipSetFilterManager) Revert(ctx context.Context, from, to *types.TipSet) error {
return nil
}
func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error) {
id, err := uuid.NewRandom()
if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err)
}
f := &TipSetFilter{
id: id.String(),
maxResults: m.MaxFilterResults,
}
m.mu.Lock()
m.filters[id.String()] = f
m.mu.Unlock()
return f, nil
}
func (m *TipSetFilterManager) Remove(ctx context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, found := m.filters[id]; !found {
return ErrFilterNotFound
}
delete(m.filters, id)
return nil
}

176
itests/actor_events_test.go Normal file
View File

@ -0,0 +1,176 @@
// stm: #integration
package itests
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestActorEventsMpool(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(t, err)
// install filter
filterID, err := client.EthNewPendingTransactionFilter(ctx)
require.NoError(t, err)
const iterations = 100
// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(t, err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(t, err)
count += len(msgs)
if count == iterations {
waitAllCh <- struct{}{}
}
}
}
}
}
}()
var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}
sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(t, err)
// expect to have seen iteration number of mpool messages
require.Equal(t, iterations, len(res.NewTransactionHashes))
}
func TestActorEventsTipsets(t *testing.T) {
ctx := context.Background()
kit.QuietMiningLogs()
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// create a new address where to send funds.
addr, err := client.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// get the existing balance from the default wallet to then split it.
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
require.NoError(t, err)
// install filter
filterID, err := client.EthNewBlockFilter(ctx)
require.NoError(t, err)
const iterations = 100
// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
toSend := big.Div(bal, big.NewInt(2))
each := big.Div(toSend, big.NewInt(iterations))
waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(t, err)
<-headChangeCh // skip hccurrent
count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(t, err)
count += len(msgs)
if count == iterations {
waitAllCh <- struct{}{}
}
}
}
}
}
}()
var sms []*types.SignedMessage
for i := 0; i < iterations; i++ {
msg := &types.Message{
From: client.DefaultKey.Address,
To: addr,
Value: each,
}
sm, err := client.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}
// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(t, err)
// expect to have seen iteration number of tipsets
require.Equal(t, iterations, len(res.NewBlockHashes))
}

View File

@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/market"
@ -156,6 +157,7 @@ var ChainNode = Options(
Override(new(messagesigner.MpoolNonceAPI), From(new(*messagepool.MessagePool))),
Override(new(full.ChainModuleAPI), From(new(full.ChainModule))),
Override(new(full.EthModuleAPI), From(new(full.EthModule))),
Override(new(full.EthEventAPI), From(new(full.EthEvent))),
Override(new(full.GasModuleAPI), From(new(full.GasModule))),
Override(new(full.MpoolModuleAPI), From(new(full.MpoolModule))),
Override(new(full.StateModuleAPI), From(new(full.StateModule))),
@ -239,6 +241,10 @@ func ConfigFullNode(c interface{}) Option {
Unset(new(*wallet.LocalWallet)),
Override(new(wallet.Default), wallet.NilDefault),
),
// Actor event filtering support
Override(new(events.EventAPI), From(new(modules.EventAPI))),
Override(new(full.EthEventAPI), modules.EthEvent(cfg.ActorEvent)),
)
}

View File

@ -71,11 +71,12 @@ func defCommon() Common {
DirectPeers: nil,
},
}
}
var DefaultDefaultMaxFee = types.MustParseFIL("0.07")
var DefaultSimultaneousTransfers = uint64(20)
var (
DefaultDefaultMaxFee = types.MustParseFIL("0.07")
DefaultSimultaneousTransfers = uint64(20)
)
// DefaultFullNode returns the default config
func DefaultFullNode() *FullNode {
@ -98,6 +99,14 @@ func DefaultFullNode() *FullNode {
HotStoreFullGCFrequency: 20,
},
},
ActorEvent: ActorEventConfig{
EnableRealTimeFilterAPI: false,
EnableHistoricFilterAPI: false,
FilterTTL: Duration(time.Hour * 24),
MaxFilters: 100,
MaxFilterResults: 10000,
MaxFilterHeightRange: 2880, // conservative limit of one day
},
}
}
@ -253,8 +262,10 @@ func DefaultStorageMiner() *StorageMiner {
return cfg
}
var _ encoding.TextMarshaler = (*Duration)(nil)
var _ encoding.TextUnmarshaler = (*Duration)(nil)
var (
_ encoding.TextMarshaler = (*Duration)(nil)
_ encoding.TextUnmarshaler = (*Duration)(nil)
)
// Duration is a wrapper type for time.Duration
// for decoding and encoding from/to TOML

View File

@ -27,6 +27,7 @@ type FullNode struct {
Wallet Wallet
Fees FeeConfig
Chainstore Chainstore
ActorEvent ActorEventConfig
}
// // Common
@ -168,7 +169,6 @@ type DealmakingConfig struct {
}
type IndexProviderConfig struct {
// Enable set whether to enable indexing announcement to the network and expose endpoints that
// allow indexer nodes to process announcements. Enabled by default.
Enable bool
@ -601,3 +601,31 @@ type Wallet struct {
type FeeConfig struct {
DefaultMaxFee types.FIL
}
type ActorEventConfig struct {
// EnableRealTimeFilterAPI enables APIs that can create and query filters for actor events as they are emitted.
EnableRealTimeFilterAPI bool
// EnableHistoricFilterAPI enables APIs that can create and query filters for actor events that occurred in the past.
// A queryable index of events will be maintained.
EnableHistoricFilterAPI bool
// FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than
// this time become eligible for automatic deletion.
FilterTTL Duration
// MaxFilters specifies the maximum number of filters that may exist at any one time.
MaxFilters int
// MaxFilterResults specifies the maximum number of results that can be accumulated by an actor event filter.
MaxFilterResults int
// MaxFilterHeightRange specifies the maximum range of heights that can be used in a filter (to avoid querying
// the entire chain)
MaxFilterHeightRange uint64
// Others, not implemented yet:
// Set a limit on the number of active websocket subscriptions (may be zero)
// Set a timeout for subscription clients
// Set upper bound on index size
}

View File

@ -3,9 +3,13 @@ package full
import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/fx"
@ -17,11 +21,12 @@ import (
builtintypes "github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/builtin/v10/eam"
"github.com/filecoin-project/go-state-types/builtin/v10/evm"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
@ -57,7 +62,22 @@ type EthModuleAPI interface {
// EthFeeHistory(ctx context.Context, blkCount string)
}
var _ EthModuleAPI = *new(api.FullNode)
type EthEventAPI interface {
EthGetLogs(ctx context.Context, filter *api.EthFilterSpec) (*api.EthFilterResult, error)
EthGetFilterChanges(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error)
EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error)
EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) (api.EthFilterID, error)
EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error)
EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error)
}
var (
_ EthModuleAPI = *new(api.FullNode)
_ EthEventAPI = *new(api.FullNode)
)
// EthModule provides a default implementation of EthModuleAPI.
// It can be swapped out with another implementation through Dependency
@ -76,12 +96,25 @@ type EthModule struct {
var _ EthModuleAPI = (*EthModule)(nil)
type EthEvent struct {
Chain *store.ChainStore
EventFilterManager *filter.EventFilterManager
TipSetFilterManager *filter.TipSetFilterManager
MemPoolFilterManager *filter.MemPoolFilterManager
FilterStore filter.FilterStore
SubManager ethSubscriptionManager
MaxFilterHeightRange abi.ChainEpoch
}
var _ EthEventAPI = (*EthEvent)(nil)
type EthAPI struct {
fx.In
Chain *store.ChainStore
EthModuleAPI
EthEventAPI
}
func (a *EthModule) StateNetworkName(ctx context.Context) (dtypes.NetworkName, error) {
@ -463,7 +496,7 @@ func (a *EthModule) EthProtocolVersion(ctx context.Context) (api.EthUint64, erro
}
func (a *EthModule) EthMaxPriorityFeePerGas(ctx context.Context) (api.EthBigInt, error) {
gasPremium, err := a.GasAPI.GasEstimateGasPremium(ctx, 0, builtin.SystemActorAddr, 10000, types.EmptyTSK)
gasPremium, err := a.GasAPI.GasEstimateGasPremium(ctx, 0, builtinactors.SystemActorAddr, 10000, types.EmptyTSK)
if err != nil {
return api.EthBigInt(big.Zero()), err
}
@ -826,3 +859,554 @@ func (a *EthModule) ethTxFromFilecoinMessageLookup(ctx context.Context, msgLooku
}
return tx, nil
}
func (e *EthEvent) EthGetLogs(ctx context.Context, filter *api.EthFilterSpec) (*api.EthFilterResult, error) {
// TODO: implement EthGetLogs
return nil, api.ErrNotSupported
}
func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) {
if e.FilterStore == nil {
return nil, api.ErrNotSupported
}
f, err := e.FilterStore.Get(ctx, string(id))
if err != nil {
return nil, err
}
switch fc := f.(type) {
case filterEventCollector:
return ethFilterResultFromEvents(fc.TakeCollectedEvents(ctx))
case filterTipSetCollector:
return ethFilterResultFromTipSets(fc.TakeCollectedTipSets(ctx))
case filterMessageCollector:
return ethFilterResultFromMessages(fc.TakeCollectedMessages(ctx))
}
return nil, xerrors.Errorf("unknown filter type")
}
func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) {
if e.FilterStore == nil {
return nil, api.ErrNotSupported
}
f, err := e.FilterStore.Get(ctx, string(id))
if err != nil {
return nil, err
}
switch fc := f.(type) {
case filterEventCollector:
return ethFilterResultFromEvents(fc.TakeCollectedEvents(ctx))
}
return nil, xerrors.Errorf("wrong filter type")
}
func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) (api.EthFilterID, error) {
if e.FilterStore == nil || e.EventFilterManager == nil {
return "", api.ErrNotSupported
}
var (
minHeight abi.ChainEpoch
maxHeight abi.ChainEpoch
tipsetCid cid.Cid
addresses []address.Address
keys = map[string][][]byte{}
)
if filter.BlockHash != nil {
if filter.FromBlock != nil || filter.ToBlock != nil {
return "", xerrors.Errorf("must not specify block hash and from/to block")
}
// TODO: derive a tipset hash from eth hash - might need to push this down into the EventFilterManager
} else {
if filter.FromBlock == nil || *filter.FromBlock == "latest" {
ts := e.Chain.GetHeaviestTipSet()
minHeight = ts.Height()
} else if *filter.FromBlock == "earliest" {
minHeight = 0
} else if *filter.FromBlock == "pending" {
return "", api.ErrNotSupported
} else {
epoch, err := strconv.ParseUint(*filter.FromBlock, 10, 64)
if err != nil {
return "", xerrors.Errorf("invalid epoch")
}
minHeight = abi.ChainEpoch(epoch)
}
if filter.ToBlock == nil || *filter.ToBlock == "latest" {
// here latest means the latest at the time
maxHeight = -1
} else if *filter.ToBlock == "earliest" {
maxHeight = 0
} else if *filter.ToBlock == "pending" {
return "", api.ErrNotSupported
} else {
epoch, err := strconv.ParseUint(*filter.ToBlock, 10, 64)
if err != nil {
return "", xerrors.Errorf("invalid epoch")
}
maxHeight = abi.ChainEpoch(epoch)
}
// Validate height ranges are within limits set by node operator
if minHeight == -1 && maxHeight > 0 {
// Here the client is looking for events between the head and some future height
ts := e.Chain.GetHeaviestTipSet()
if maxHeight-ts.Height() > e.MaxFilterHeightRange {
return "", xerrors.Errorf("invalid epoch range")
}
} else if minHeight >= 0 && maxHeight == -1 {
// Here the client is looking for events between some time in the past and the current head
ts := e.Chain.GetHeaviestTipSet()
if ts.Height()-minHeight > e.MaxFilterHeightRange {
return "", xerrors.Errorf("invalid epoch range")
}
} else if minHeight >= 0 && maxHeight >= 0 {
if minHeight > maxHeight || maxHeight-minHeight > e.MaxFilterHeightRange {
return "", xerrors.Errorf("invalid epoch range")
}
}
}
for _, ea := range filter.Address {
a, err := ea.ToFilecoinAddress()
if err != nil {
return "", xerrors.Errorf("invalid address %x", ea)
}
addresses = append(addresses, a)
}
for idx, vals := range filter.Topics {
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
keyvals := make([][]byte, len(vals))
for i, v := range vals {
keyvals[i] = v[:]
}
keys[key] = keyvals
}
f, err := e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys)
if err != nil {
return "", err
}
if err := e.FilterStore.Add(ctx, f); err != nil {
// Could not record in store, attempt to delete filter to clean up
err2 := e.TipSetFilterManager.Remove(ctx, f.ID())
if err2 != nil {
return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
}
return "", err
}
return api.EthFilterID(f.ID()), nil
}
func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error) {
if e.FilterStore == nil || e.TipSetFilterManager == nil {
return "", api.ErrNotSupported
}
f, err := e.TipSetFilterManager.Install(ctx)
if err != nil {
return "", err
}
if err := e.FilterStore.Add(ctx, f); err != nil {
// Could not record in store, attempt to delete filter to clean up
err2 := e.TipSetFilterManager.Remove(ctx, f.ID())
if err2 != nil {
return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
}
return "", err
}
return api.EthFilterID(f.ID()), nil
}
func (e *EthEvent) EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error) {
if e.FilterStore == nil || e.MemPoolFilterManager == nil {
return "", api.ErrNotSupported
}
f, err := e.MemPoolFilterManager.Install(ctx)
if err != nil {
return "", err
}
if err := e.FilterStore.Add(ctx, f); err != nil {
// Could not record in store, attempt to delete filter to clean up
err2 := e.MemPoolFilterManager.Remove(ctx, f.ID())
if err2 != nil {
return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
}
return "", err
}
return api.EthFilterID(f.ID()), nil
}
func (e *EthEvent) EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error) {
if e.FilterStore == nil {
return false, api.ErrNotSupported
}
f, err := e.FilterStore.Get(ctx, string(id))
if err != nil {
if errors.Is(err, filter.ErrFilterNotFound) {
return false, nil
}
return false, err
}
if err := e.uninstallFilter(ctx, f); err != nil {
return false, err
}
return true, nil
}
func (e *EthEvent) uninstallFilter(ctx context.Context, f filter.Filter) error {
switch f.(type) {
case *filter.EventFilter:
err := e.EventFilterManager.Remove(ctx, f.ID())
if err != nil && !errors.Is(err, filter.ErrFilterNotFound) {
return err
}
case *filter.TipSetFilter:
err := e.TipSetFilterManager.Remove(ctx, f.ID())
if err != nil && !errors.Is(err, filter.ErrFilterNotFound) {
return err
}
case *filter.MemPoolFilter:
err := e.MemPoolFilterManager.Remove(ctx, f.ID())
if err != nil && !errors.Is(err, filter.ErrFilterNotFound) {
return err
}
default:
return xerrors.Errorf("unknown filter type")
}
return e.FilterStore.Remove(ctx, f.ID())
}
const (
EthSubscribeEventTypeHeads = "newHeads"
EthSubscribeEventTypeLogs = "logs"
)
func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
// Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the
// method to be "eth_subscription". This probably doesn't matter in practice.
// Validate event types and parameters first
for _, et := range eventTypes {
switch et {
case EthSubscribeEventTypeHeads:
case EthSubscribeEventTypeLogs:
default:
return nil, xerrors.Errorf("unsupported event type: %s", et)
}
}
sub, err := e.SubManager.StartSubscription(ctx)
if err != nil {
return nil, err
}
for _, et := range eventTypes {
switch et {
case EthSubscribeEventTypeHeads:
f, err := e.TipSetFilterManager.Install(ctx)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
return nil, err
}
sub.addFilter(ctx, f)
case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{}
for idx, vals := range params.Topics {
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
keyvals := make([][]byte, len(vals))
for i, v := range vals {
keyvals[i] = v[:]
}
keys[key] = keyvals
}
f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
return nil, err
}
sub.addFilter(ctx, f)
}
}
return sub.out, nil
}
func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) {
filters, err := e.SubManager.StopSubscription(ctx, string(id))
if err != nil {
return false, nil
}
for _, f := range filters {
if err := e.uninstallFilter(ctx, f); err != nil {
// this will leave the filter a zombie, collecting events up to the maximum allowed
log.Warnf("failed to remove filter when unsubscribing: %v", err)
}
}
return true, nil
}
// GC runs a garbage collection loop, deleting filters that have not been used within the ttl window
func (e *EthEvent) GC(ctx context.Context, ttl time.Duration) {
if e.FilterStore == nil {
return
}
tt := time.NewTicker(time.Minute * 30)
defer tt.Stop()
for {
select {
case <-ctx.Done():
return
case <-tt.C:
fs := e.FilterStore.NotTakenSince(time.Now().Add(-ttl))
for _, f := range fs {
if err := e.uninstallFilter(ctx, f); err != nil {
log.Warnf("Failed to remove actor event filter during garbage collection: %v", err)
}
}
}
}
}
type filterEventCollector interface {
TakeCollectedEvents(context.Context) []*filter.CollectedEvent
}
type filterMessageCollector interface {
TakeCollectedMessages(context.Context) []cid.Cid
}
type filterTipSetCollector interface {
TakeCollectedTipSets(context.Context) []types.TipSetKey
}
var (
ethTopic1 = []byte("topic1")
ethTopic2 = []byte("topic2")
ethTopic3 = []byte("topic3")
ethTopic4 = []byte("topic4")
)
func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResult, error) {
res := &api.EthFilterResult{}
for _, ev := range evs {
log := api.EthLog{
Removed: ev.Reverted,
LogIndex: api.EthUint64(ev.EventIdx),
TransactionIndex: api.EthUint64(ev.MsgIdx),
BlockNumber: api.EthUint64(ev.Height),
}
var err error
for _, entry := range ev.Event.Entries {
hash := api.EthHashData(entry.Value)
if bytes.Equal(entry.Key, ethTopic1) || bytes.Equal(entry.Key, ethTopic2) || bytes.Equal(entry.Key, ethTopic3) || bytes.Equal(entry.Key, ethTopic4) {
log.Topics = append(log.Topics, hash)
} else {
log.Data = append(log.Data, hash)
}
}
log.Address, err = api.EthAddressFromFilecoinAddress(ev.Event.Emitter)
if err != nil {
return nil, err
}
log.TransactionHash, err = api.EthHashFromCid(ev.MsgCid)
if err != nil {
return nil, err
}
c, err := ev.TipSetKey.Cid()
if err != nil {
return nil, err
}
log.BlockHash, err = api.EthHashFromCid(c)
if err != nil {
return nil, err
}
res.NewLogs = append(res.NewLogs, log)
}
return res, nil
}
func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*api.EthFilterResult, error) {
res := &api.EthFilterResult{}
for _, tsk := range tsks {
c, err := tsk.Cid()
if err != nil {
return nil, err
}
hash, err := api.EthHashFromCid(c)
if err != nil {
return nil, err
}
res.NewBlockHashes = append(res.NewBlockHashes, hash)
}
return res, nil
}
func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) {
res := &api.EthFilterResult{}
for _, c := range cs {
hash, err := api.EthHashFromCid(c)
if err != nil {
return nil, err
}
res.NewTransactionHashes = append(res.NewTransactionHashes, hash)
}
return res, nil
}
type ethSubscriptionManager struct {
mu sync.Mutex
subs map[string]*ethSubscription
}
func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) {
id, err := uuid.NewRandom()
if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err)
}
ctx, quit := context.WithCancel(ctx)
sub := &ethSubscription{
id: id.String(),
in: make(chan interface{}, 200),
out: make(chan api.EthSubscriptionResponse),
quit: quit,
}
e.mu.Lock()
if e.subs == nil {
e.subs = make(map[string]*ethSubscription)
}
e.subs[sub.id] = sub
e.mu.Unlock()
go sub.start(ctx)
return sub, nil
}
func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) {
e.mu.Lock()
defer e.mu.Unlock()
sub, ok := e.subs[id]
if !ok {
return nil, xerrors.Errorf("subscription not found")
}
sub.stop()
delete(e.subs, id)
return sub.filters, nil
}
type ethSubscription struct {
id string
in chan interface{}
out chan api.EthSubscriptionResponse
mu sync.Mutex
filters []filter.Filter
quit func()
}
func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
e.mu.Lock()
defer e.mu.Unlock()
f.SetSubChannel(e.in)
e.filters = append(e.filters, f)
}
func (e *ethSubscription) start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case v := <-e.in:
resp := api.EthSubscriptionResponse{
SubscriptionID: api.EthSubscriptionID(e.id),
}
var err error
switch vt := v.(type) {
case *filter.CollectedEvent:
resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt})
case *types.TipSet:
resp.Result = vt
default:
log.Warnf("unexpected subscription value type: %T", vt)
}
if err != nil {
continue
}
select {
case e.out <- resp:
default:
// Skip if client is not reading responses
}
}
}
}
func (e *ethSubscription) stop() {
e.mu.Lock()
defer e.mu.Unlock()
if e.quit != nil {
e.quit()
close(e.out)
e.quit = nil
}
}

View File

@ -0,0 +1,94 @@
package modules
import (
"context"
"time"
"go.uber.org/fx"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/filter"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
type EventAPI struct {
fx.In
full.ChainAPI
full.StateAPI
}
var _ events.EventAPI = &EventAPI{}
func EthEvent(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) {
ee := &full.EthEvent{
Chain: cs,
MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange),
}
if !cfg.EnableRealTimeFilterAPI && !cfg.EnableHistoricFilterAPI {
// all event functionality is disabled
return ee, nil
}
ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters)
// Start garbage collection for filters
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
go ee.GC(ctx, time.Duration(cfg.FilterTTL))
return nil
},
})
if cfg.EnableRealTimeFilterAPI {
ee.EventFilterManager = &filter.EventFilterManager{
ChainStore: cs,
MaxFilterResults: cfg.MaxFilterResults,
}
ee.TipSetFilterManager = &filter.TipSetFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
ee.MemPoolFilterManager = &filter.MemPoolFilterManager{
MaxFilterResults: cfg.MaxFilterResults,
}
const ChainHeadConfidence = 1
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence)
if err != nil {
return err
}
// ignore returned tipsets
_ = ev.Observe(ee.EventFilterManager)
_ = ev.Observe(ee.TipSetFilterManager)
ch, err := mp.Updates(ctx)
if err != nil {
return err
}
go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch)
return nil
},
})
}
if cfg.EnableHistoricFilterAPI {
// TODO: enable indexer
}
return ee, nil
}
}