Merge pull request #10027 from filecoin-project/feat/correct-eth-sub

feat: rpc: correct `eth_subscribe` implementation
This commit is contained in:
Jiaying Wang 2023-01-31 09:44:59 -05:00 committed by GitHub
commit dc5f865d54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1269 additions and 1190 deletions

View File

@ -16,6 +16,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v8/paych" "github.com/filecoin-project/go-state-types/builtin/v8/paych"
@ -831,7 +832,7 @@ type FullNode interface {
// - logs: notify new event logs that match a criteria // - logs: notify new event logs that match a criteria
// params contains additional parameters used with the log event type // params contains additional parameters used with the log event type
// The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called. // The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called.
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) //perm:write EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) //perm:write
// Unsubscribe from a websocket subscription // Unsubscribe from a websocket subscription
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) //perm:write EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) //perm:write
@ -849,6 +850,12 @@ type FullNode interface {
RaftLeader(ctx context.Context) (peer.ID, error) //perm:read RaftLeader(ctx context.Context) (peer.ID, error) //perm:read
} }
// reverse interface to the client, called after EthSubscribe
type EthSubscriber interface {
// note: the parameter is ethtypes.EthSubscriptionResponse serialized as json object
EthSubscription(ctx context.Context, r jsonrpc.RawParams) error //rpc_method:eth_subscription notify:true
}
type StorageAsk struct { type StorageAsk struct {
Response *storagemarket.StorageAsk Response *storagemarket.StorageAsk

View File

@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/miner" "github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
@ -102,6 +103,6 @@ type Gateway interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
} }

View File

@ -35,10 +35,10 @@ func NewFullNodeRPCV0(ctx context.Context, addr string, requestHeader http.Heade
} }
// NewFullNodeRPCV1 creates a new http jsonrpc client. // NewFullNodeRPCV1 creates a new http jsonrpc client.
func NewFullNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header) (api.FullNode, jsonrpc.ClientCloser, error) { func NewFullNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.FullNode, jsonrpc.ClientCloser, error) {
var res v1api.FullNodeStruct var res v1api.FullNodeStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin", closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), requestHeader, jsonrpc.WithErrors(api.RPCErrors)) api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(api.RPCErrors)}, opts...)...)
return &res, closer, err return &res, closer, err
} }

View File

@ -23,6 +23,7 @@ import (
bitfield "github.com/filecoin-project/go-bitfield" bitfield "github.com/filecoin-project/go-bitfield"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
retrievalmarket "github.com/filecoin-project/go-fil-markets/retrievalmarket" retrievalmarket "github.com/filecoin-project/go-fil-markets/retrievalmarket"
jsonrpc "github.com/filecoin-project/go-jsonrpc"
auth "github.com/filecoin-project/go-jsonrpc/auth" auth "github.com/filecoin-project/go-jsonrpc/auth"
abi "github.com/filecoin-project/go-state-types/abi" abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big" big "github.com/filecoin-project/go-state-types/big"
@ -1388,18 +1389,18 @@ func (mr *MockFullNodeMockRecorder) EthSendRawTransaction(arg0, arg1 interface{}
} }
// EthSubscribe mocks base method. // EthSubscribe mocks base method.
func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 string, arg2 *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) { func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EthSubscribe", arg0, arg1, arg2) ret := m.ctrl.Call(m, "EthSubscribe", arg0, arg1)
ret0, _ := ret[0].(<-chan ethtypes.EthSubscriptionResponse) ret0, _ := ret[0].(ethtypes.EthSubscriptionID)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// EthSubscribe indicates an expected call of EthSubscribe. // EthSubscribe indicates an expected call of EthSubscribe.
func (mr *MockFullNodeMockRecorder) EthSubscribe(arg0, arg1, arg2 interface{}) *gomock.Call { func (mr *MockFullNodeMockRecorder) EthSubscribe(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EthSubscribe", reflect.TypeOf((*MockFullNode)(nil).EthSubscribe), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EthSubscribe", reflect.TypeOf((*MockFullNode)(nil).EthSubscribe), arg0, arg1)
} }
// EthUninstallFilter mocks base method. // EthUninstallFilter mocks base method.

File diff suppressed because it is too large Load Diff

View File

@ -39,383 +39,385 @@ type FullNodeStruct struct {
NetStruct NetStruct
Internal struct { Internal FullNodeMethods
BeaconGetEntry func(p0 context.Context, p1 abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"` }
ChainDeleteObj func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` type FullNodeMethods struct {
BeaconGetEntry func(p0 context.Context, p1 abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"`
ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"` ChainDeleteObj func(p0 context.Context, p1 cid.Cid) error `perm:"admin"`
ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"` ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"`
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) `perm:"read"` ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"`
ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) `perm:"read"` ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) `perm:"read"`
ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) `perm:"read"` ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetMessagesInTipset func(p0 context.Context, p1 types.TipSetKey) ([]api.Message, error) `perm:"read"` ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) `perm:"read"`
ChainGetNode func(p0 context.Context, p1 string) (*api.IpldObject, error) `perm:"read"` ChainGetMessagesInTipset func(p0 context.Context, p1 types.TipSetKey) ([]api.Message, error) `perm:"read"`
ChainGetParentMessages func(p0 context.Context, p1 cid.Cid) ([]api.Message, error) `perm:"read"` ChainGetNode func(p0 context.Context, p1 string) (*api.IpldObject, error) `perm:"read"`
ChainGetParentReceipts func(p0 context.Context, p1 cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` ChainGetParentMessages func(p0 context.Context, p1 cid.Cid) ([]api.Message, error) `perm:"read"`
ChainGetPath func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey) ([]*api.HeadChange, error) `perm:"read"` ChainGetParentReceipts func(p0 context.Context, p1 cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"`
ChainGetRandomnessFromBeacon func(p0 context.Context, p1 types.TipSetKey, p2 crypto.DomainSeparationTag, p3 abi.ChainEpoch, p4 []byte) (abi.Randomness, error) `perm:"read"` ChainGetPath func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey) ([]*api.HeadChange, error) `perm:"read"`
ChainGetRandomnessFromTickets func(p0 context.Context, p1 types.TipSetKey, p2 crypto.DomainSeparationTag, p3 abi.ChainEpoch, p4 []byte) (abi.Randomness, error) `perm:"read"` ChainGetRandomnessFromBeacon func(p0 context.Context, p1 types.TipSetKey, p2 crypto.DomainSeparationTag, p3 abi.ChainEpoch, p4 []byte) (abi.Randomness, error) `perm:"read"`
ChainGetTipSet func(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) `perm:"read"` ChainGetRandomnessFromTickets func(p0 context.Context, p1 types.TipSetKey, p2 crypto.DomainSeparationTag, p3 abi.ChainEpoch, p4 []byte) (abi.Randomness, error) `perm:"read"`
ChainGetTipSetByHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) `perm:"read"` ChainGetTipSet func(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) `perm:"read"`
ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) `perm:"read"` ChainGetTipSetByHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) `perm:"read"`
ChainHead func(p0 context.Context) (*types.TipSet, error) `perm:"read"` ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) `perm:"read"`
ChainNotify func(p0 context.Context) (<-chan []*api.HeadChange, error) `perm:"read"` ChainHead func(p0 context.Context) (*types.TipSet, error) `perm:"read"`
ChainPutObj func(p0 context.Context, p1 blocks.Block) error `` ChainNotify func(p0 context.Context) (<-chan []*api.HeadChange, error) `perm:"read"`
ChainReadObj func(p0 context.Context, p1 cid.Cid) ([]byte, error) `perm:"read"` ChainPutObj func(p0 context.Context, p1 blocks.Block) error ``
ChainSetHead func(p0 context.Context, p1 types.TipSetKey) error `perm:"admin"` ChainReadObj func(p0 context.Context, p1 cid.Cid) ([]byte, error) `perm:"read"`
ChainStatObj func(p0 context.Context, p1 cid.Cid, p2 cid.Cid) (api.ObjStat, error) `perm:"read"` ChainSetHead func(p0 context.Context, p1 types.TipSetKey) error `perm:"admin"`
ChainTipSetWeight func(p0 context.Context, p1 types.TipSetKey) (types.BigInt, error) `perm:"read"` ChainStatObj func(p0 context.Context, p1 cid.Cid, p2 cid.Cid) (api.ObjStat, error) `perm:"read"`
ClientCalcCommP func(p0 context.Context, p1 string) (*api.CommPRet, error) `perm:"write"` ChainTipSetWeight func(p0 context.Context, p1 types.TipSetKey) (types.BigInt, error) `perm:"read"`
ClientCancelDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` ClientCalcCommP func(p0 context.Context, p1 string) (*api.CommPRet, error) `perm:"write"`
ClientCancelRetrievalDeal func(p0 context.Context, p1 retrievalmarket.DealID) error `perm:"write"` ClientCancelDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
ClientDataTransferUpdates func(p0 context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` ClientCancelRetrievalDeal func(p0 context.Context, p1 retrievalmarket.DealID) error `perm:"write"`
ClientDealPieceCID func(p0 context.Context, p1 cid.Cid) (api.DataCIDSize, error) `perm:"read"` ClientDataTransferUpdates func(p0 context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
ClientDealSize func(p0 context.Context, p1 cid.Cid) (api.DataSize, error) `perm:"read"` ClientDealPieceCID func(p0 context.Context, p1 cid.Cid) (api.DataCIDSize, error) `perm:"read"`
ClientFindData func(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]api.QueryOffer, error) `perm:"read"` ClientDealSize func(p0 context.Context, p1 cid.Cid) (api.DataSize, error) `perm:"read"`
ClientGenCar func(p0 context.Context, p1 api.FileRef, p2 string) error `perm:"write"` ClientFindData func(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientGetDealInfo func(p0 context.Context, p1 cid.Cid) (*api.DealInfo, error) `perm:"read"` ClientGenCar func(p0 context.Context, p1 api.FileRef, p2 string) error `perm:"write"`
ClientGetDealStatus func(p0 context.Context, p1 uint64) (string, error) `perm:"read"` ClientGetDealInfo func(p0 context.Context, p1 cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientGetDealUpdates func(p0 context.Context) (<-chan api.DealInfo, error) `perm:"write"` ClientGetDealStatus func(p0 context.Context, p1 uint64) (string, error) `perm:"read"`
ClientGetRetrievalUpdates func(p0 context.Context) (<-chan api.RetrievalInfo, error) `perm:"write"` ClientGetDealUpdates func(p0 context.Context) (<-chan api.DealInfo, error) `perm:"write"`
ClientHasLocal func(p0 context.Context, p1 cid.Cid) (bool, error) `perm:"write"` ClientGetRetrievalUpdates func(p0 context.Context) (<-chan api.RetrievalInfo, error) `perm:"write"`
ClientImport func(p0 context.Context, p1 api.FileRef) (*api.ImportRes, error) `perm:"admin"` ClientHasLocal func(p0 context.Context, p1 cid.Cid) (bool, error) `perm:"write"`
ClientListDataTransfers func(p0 context.Context) ([]api.DataTransferChannel, error) `perm:"write"` ClientImport func(p0 context.Context, p1 api.FileRef) (*api.ImportRes, error) `perm:"admin"`
ClientListDeals func(p0 context.Context) ([]api.DealInfo, error) `perm:"write"` ClientListDataTransfers func(p0 context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
ClientListImports func(p0 context.Context) ([]api.Import, error) `perm:"write"` ClientListDeals func(p0 context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientListRetrievals func(p0 context.Context) ([]api.RetrievalInfo, error) `perm:"write"` ClientListImports func(p0 context.Context) ([]api.Import, error) `perm:"write"`
ClientMinerQueryOffer func(p0 context.Context, p1 address.Address, p2 cid.Cid, p3 *cid.Cid) (api.QueryOffer, error) `perm:"read"` ClientListRetrievals func(p0 context.Context) ([]api.RetrievalInfo, error) `perm:"write"`
ClientQueryAsk func(p0 context.Context, p1 peer.ID, p2 address.Address) (*storagemarket.StorageAsk, error) `perm:"read"` ClientMinerQueryOffer func(p0 context.Context, p1 address.Address, p2 cid.Cid, p3 *cid.Cid) (api.QueryOffer, error) `perm:"read"`
ClientRemoveImport func(p0 context.Context, p1 imports.ID) error `perm:"admin"` ClientQueryAsk func(p0 context.Context, p1 peer.ID, p2 address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` ClientRemoveImport func(p0 context.Context, p1 imports.ID) error `perm:"admin"`
ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error `perm:"admin"` ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"`
ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"` ClientRetrieve func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) error `perm:"admin"`
ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"` ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"`
ClientStartDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"admin"` ClientRetrieveWithEvents func(p0 context.Context, p1 RetrievalOrder, p2 *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientStatelessDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"write"` ClientStartDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"` ClientStatelessDeal func(p0 context.Context, p1 *api.StartDealParams) (*cid.Cid, error) `perm:"write"`
GasEstimateFeeCap func(p0 context.Context, p1 *types.Message, p2 int64, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"` CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"`
GasEstimateGasLimit func(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (int64, error) `perm:"read"` GasEstimateFeeCap func(p0 context.Context, p1 *types.Message, p2 int64, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"`
GasEstimateGasPremium func(p0 context.Context, p1 uint64, p2 address.Address, p3 int64, p4 types.TipSetKey) (types.BigInt, error) `perm:"read"` GasEstimateGasLimit func(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (int64, error) `perm:"read"`
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) `perm:"read"` GasEstimateGasPremium func(p0 context.Context, p1 uint64, p2 address.Address, p3 int64, p4 types.TipSetKey) (types.BigInt, error) `perm:"read"`
MarketAddBalance func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (cid.Cid, error) `perm:"sign"` GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) `perm:"read"`
MarketGetReserved func(p0 context.Context, p1 address.Address) (types.BigInt, error) `perm:"sign"` MarketAddBalance func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (cid.Cid, error) `perm:"sign"`
MarketReleaseFunds func(p0 context.Context, p1 address.Address, p2 types.BigInt) error `perm:"sign"` MarketGetReserved func(p0 context.Context, p1 address.Address) (types.BigInt, error) `perm:"sign"`
MarketReserveFunds func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (cid.Cid, error) `perm:"sign"` MarketReleaseFunds func(p0 context.Context, p1 address.Address, p2 types.BigInt) error `perm:"sign"`
MarketWithdraw func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (cid.Cid, error) `perm:"sign"` MarketReserveFunds func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (cid.Cid, error) `perm:"sign"`
MinerCreateBlock func(p0 context.Context, p1 *api.BlockTemplate) (*types.BlockMsg, error) `perm:"write"` MarketWithdraw func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (cid.Cid, error) `perm:"sign"`
MinerGetBaseInfo func(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) `perm:"read"` MinerCreateBlock func(p0 context.Context, p1 *api.BlockTemplate) (*types.BlockMsg, error) `perm:"write"`
MpoolBatchPush func(p0 context.Context, p1 []*types.SignedMessage) ([]cid.Cid, error) `perm:"write"` MinerGetBaseInfo func(p0 context.Context, p1 address.Address, p2 abi.ChainEpoch, p3 types.TipSetKey) (*api.MiningBaseInfo, error) `perm:"read"`
MpoolBatchPushMessage func(p0 context.Context, p1 []*types.Message, p2 *api.MessageSendSpec) ([]*types.SignedMessage, error) `perm:"sign"` MpoolBatchPush func(p0 context.Context, p1 []*types.SignedMessage) ([]cid.Cid, error) `perm:"write"`
MpoolBatchPushUntrusted func(p0 context.Context, p1 []*types.SignedMessage) ([]cid.Cid, error) `perm:"write"` MpoolBatchPushMessage func(p0 context.Context, p1 []*types.Message, p2 *api.MessageSendSpec) ([]*types.SignedMessage, error) `perm:"sign"`
MpoolClear func(p0 context.Context, p1 bool) error `perm:"write"` MpoolBatchPushUntrusted func(p0 context.Context, p1 []*types.SignedMessage) ([]cid.Cid, error) `perm:"write"`
MpoolGetConfig func(p0 context.Context) (*types.MpoolConfig, error) `perm:"read"` MpoolClear func(p0 context.Context, p1 bool) error `perm:"write"`
MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) `perm:"read"` MpoolGetConfig func(p0 context.Context) (*types.MpoolConfig, error) `perm:"read"`
MpoolPending func(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` MpoolGetNonce func(p0 context.Context, p1 address.Address) (uint64, error) `perm:"read"`
MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) `perm:"write"` MpoolPending func(p0 context.Context, p1 types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPushMessage func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"` MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushUntrusted func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) `perm:"write"` MpoolPushMessage func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
MpoolSelect func(p0 context.Context, p1 types.TipSetKey, p2 float64) ([]*types.SignedMessage, error) `perm:"read"` MpoolPushUntrusted func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolSetConfig func(p0 context.Context, p1 *types.MpoolConfig) error `perm:"admin"` MpoolSelect func(p0 context.Context, p1 types.TipSetKey, p2 float64) ([]*types.SignedMessage, error) `perm:"read"`
MpoolSub func(p0 context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"` MpoolSetConfig func(p0 context.Context, p1 *types.MpoolConfig) error `perm:"admin"`
MsigAddApprove func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 address.Address, p6 bool) (cid.Cid, error) `perm:"sign"` MpoolSub func(p0 context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"`
MsigAddCancel func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 bool) (cid.Cid, error) `perm:"sign"` MsigAddApprove func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 address.Address, p6 bool) (cid.Cid, error) `perm:"sign"`
MsigAddPropose func(p0 context.Context, p1 address.Address, p2 address.Address, p3 address.Address, p4 bool) (cid.Cid, error) `perm:"sign"` MsigAddCancel func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 bool) (cid.Cid, error) `perm:"sign"`
MsigApprove func(p0 context.Context, p1 address.Address, p2 uint64, p3 address.Address) (cid.Cid, error) `perm:"sign"` MsigAddPropose func(p0 context.Context, p1 address.Address, p2 address.Address, p3 address.Address, p4 bool) (cid.Cid, error) `perm:"sign"`
MsigApproveTxnHash func(p0 context.Context, p1 address.Address, p2 uint64, p3 address.Address, p4 address.Address, p5 types.BigInt, p6 address.Address, p7 uint64, p8 []byte) (cid.Cid, error) `perm:"sign"` MsigApprove func(p0 context.Context, p1 address.Address, p2 uint64, p3 address.Address) (cid.Cid, error) `perm:"sign"`
MsigCancel func(p0 context.Context, p1 address.Address, p2 uint64, p3 address.Address, p4 types.BigInt, p5 address.Address, p6 uint64, p7 []byte) (cid.Cid, error) `perm:"sign"` MsigApproveTxnHash func(p0 context.Context, p1 address.Address, p2 uint64, p3 address.Address, p4 address.Address, p5 types.BigInt, p6 address.Address, p7 uint64, p8 []byte) (cid.Cid, error) `perm:"sign"`
MsigCreate func(p0 context.Context, p1 uint64, p2 []address.Address, p3 abi.ChainEpoch, p4 types.BigInt, p5 address.Address, p6 types.BigInt) (cid.Cid, error) `perm:"sign"` MsigCancel func(p0 context.Context, p1 address.Address, p2 uint64, p3 address.Address, p4 types.BigInt, p5 address.Address, p6 uint64, p7 []byte) (cid.Cid, error) `perm:"sign"`
MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) `perm:"read"` MsigCreate func(p0 context.Context, p1 uint64, p2 []address.Address, p3 abi.ChainEpoch, p4 types.BigInt, p5 address.Address, p6 types.BigInt) (cid.Cid, error) `perm:"sign"`
MsigGetPending func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]*api.MsigTransaction, error) `perm:"read"` MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) `perm:"read"`
MsigGetVested func(p0 context.Context, p1 address.Address, p2 types.TipSetKey, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"` MsigGetPending func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]*api.MsigTransaction, error) `perm:"read"`
MsigGetVestingSchedule func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MsigVesting, error) `perm:"read"` MsigGetVested func(p0 context.Context, p1 address.Address, p2 types.TipSetKey, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"`
MsigPropose func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 address.Address, p5 uint64, p6 []byte) (cid.Cid, error) `perm:"sign"` MsigGetVestingSchedule func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MsigVesting, error) `perm:"read"`
MsigRemoveSigner func(p0 context.Context, p1 address.Address, p2 address.Address, p3 address.Address, p4 bool) (cid.Cid, error) `perm:"sign"` MsigPropose func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 address.Address, p5 uint64, p6 []byte) (cid.Cid, error) `perm:"sign"`
MsigSwapApprove func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 address.Address, p6 address.Address) (cid.Cid, error) `perm:"sign"` MsigRemoveSigner func(p0 context.Context, p1 address.Address, p2 address.Address, p3 address.Address, p4 bool) (cid.Cid, error) `perm:"sign"`
MsigSwapCancel func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 address.Address) (cid.Cid, error) `perm:"sign"` MsigSwapApprove func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 address.Address, p6 address.Address) (cid.Cid, error) `perm:"sign"`
MsigSwapPropose func(p0 context.Context, p1 address.Address, p2 address.Address, p3 address.Address, p4 address.Address) (cid.Cid, error) `perm:"sign"` MsigSwapCancel func(p0 context.Context, p1 address.Address, p2 address.Address, p3 uint64, p4 address.Address, p5 address.Address) (cid.Cid, error) `perm:"sign"`
PaychAllocateLane func(p0 context.Context, p1 address.Address) (uint64, error) `perm:"sign"` MsigSwapPropose func(p0 context.Context, p1 address.Address, p2 address.Address, p3 address.Address, p4 address.Address) (cid.Cid, error) `perm:"sign"`
PaychAvailableFunds func(p0 context.Context, p1 address.Address) (*api.ChannelAvailableFunds, error) `perm:"sign"` PaychAllocateLane func(p0 context.Context, p1 address.Address) (uint64, error) `perm:"sign"`
PaychAvailableFundsByFromTo func(p0 context.Context, p1 address.Address, p2 address.Address) (*api.ChannelAvailableFunds, error) `perm:"sign"` PaychAvailableFunds func(p0 context.Context, p1 address.Address) (*api.ChannelAvailableFunds, error) `perm:"sign"`
PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"` PaychAvailableFundsByFromTo func(p0 context.Context, p1 address.Address, p2 address.Address) (*api.ChannelAvailableFunds, error) `perm:"sign"`
PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*api.ChannelInfo, error) `perm:"sign"` PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"`
PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"` PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
PaychList func(p0 context.Context) ([]address.Address, error) `perm:"read"` PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"`
PaychNewPayment func(p0 context.Context, p1 address.Address, p2 address.Address, p3 []api.VoucherSpec) (*api.PaymentInfo, error) `perm:"sign"` PaychList func(p0 context.Context) ([]address.Address, error) `perm:"read"`
PaychSettle func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"` PaychNewPayment func(p0 context.Context, p1 address.Address, p2 address.Address, p3 []api.VoucherSpec) (*api.PaymentInfo, error) `perm:"sign"`
PaychStatus func(p0 context.Context, p1 address.Address) (*api.PaychStatus, error) `perm:"read"` PaychSettle func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"`
PaychVoucherAdd func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher, p3 []byte, p4 types.BigInt) (types.BigInt, error) `perm:"write"` PaychStatus func(p0 context.Context, p1 address.Address) (*api.PaychStatus, error) `perm:"read"`
PaychVoucherCheckSpendable func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher, p3 []byte, p4 []byte) (bool, error) `perm:"read"` PaychVoucherAdd func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher, p3 []byte, p4 types.BigInt) (types.BigInt, error) `perm:"write"`
PaychVoucherCheckValid func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher) error `perm:"read"` PaychVoucherCheckSpendable func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher, p3 []byte, p4 []byte) (bool, error) `perm:"read"`
PaychVoucherCreate func(p0 context.Context, p1 address.Address, p2 types.BigInt, p3 uint64) (*api.VoucherCreateResult, error) `perm:"sign"` PaychVoucherCheckValid func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher) error `perm:"read"`
PaychVoucherList func(p0 context.Context, p1 address.Address) ([]*paych.SignedVoucher, error) `perm:"write"` PaychVoucherCreate func(p0 context.Context, p1 address.Address, p2 types.BigInt, p3 uint64) (*api.VoucherCreateResult, error) `perm:"sign"`
PaychVoucherSubmit func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher, p3 []byte, p4 []byte) (cid.Cid, error) `perm:"sign"` PaychVoucherList func(p0 context.Context, p1 address.Address) ([]*paych.SignedVoucher, error) `perm:"write"`
StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `perm:"read"` PaychVoucherSubmit func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher, p3 []byte, p4 []byte) (cid.Cid, error) `perm:"sign"`
StateActorCodeCIDs func(p0 context.Context, p1 abinetwork.Version) (map[string]cid.Cid, error) `perm:"read"` StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `perm:"read"`
StateActorManifestCID func(p0 context.Context, p1 abinetwork.Version) (cid.Cid, error) `perm:"read"` StateActorCodeCIDs func(p0 context.Context, p1 abinetwork.Version) (map[string]cid.Cid, error) `perm:"read"`
StateAllMinerFaults func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) ([]*api.Fault, error) `perm:"read"` StateActorManifestCID func(p0 context.Context, p1 abinetwork.Version) (cid.Cid, error) `perm:"read"`
StateCall func(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*api.InvocResult, error) `perm:"read"` StateAllMinerFaults func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) ([]*api.Fault, error) `perm:"read"`
StateChangedActors func(p0 context.Context, p1 cid.Cid, p2 cid.Cid) (map[string]types.Actor, error) `perm:"read"` StateCall func(p0 context.Context, p1 *types.Message, p2 types.TipSetKey) (*api.InvocResult, error) `perm:"read"`
StateCirculatingSupply func(p0 context.Context, p1 types.TipSetKey) (abi.TokenAmount, error) `perm:"read"` StateChangedActors func(p0 context.Context, p1 cid.Cid, p2 cid.Cid) (map[string]types.Actor, error) `perm:"read"`
StateCompute func(p0 context.Context, p1 abi.ChainEpoch, p2 []*types.Message, p3 types.TipSetKey) (*api.ComputeStateOutput, error) `perm:"read"` StateCirculatingSupply func(p0 context.Context, p1 types.TipSetKey) (abi.TokenAmount, error) `perm:"read"`
StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (api.DealCollateralBounds, error) `perm:"read"` StateCompute func(p0 context.Context, p1 abi.ChainEpoch, p2 []*types.Message, p3 types.TipSetKey) (*api.ComputeStateOutput, error) `perm:"read"`
StateDecodeParams func(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) `perm:"read"` StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (api.DealCollateralBounds, error) `perm:"read"`
StateGetActor func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) `perm:"read"` StateDecodeParams func(p0 context.Context, p1 address.Address, p2 abi.MethodNum, p3 []byte, p4 types.TipSetKey) (interface{}, error) `perm:"read"`
StateGetAllocation func(p0 context.Context, p1 address.Address, p2 verifregtypes.AllocationId, p3 types.TipSetKey) (*verifregtypes.Allocation, error) `perm:"read"` StateGetActor func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) `perm:"read"`
StateGetAllocationForPendingDeal func(p0 context.Context, p1 abi.DealID, p2 types.TipSetKey) (*verifregtypes.Allocation, error) `perm:"read"` StateGetAllocation func(p0 context.Context, p1 address.Address, p2 verifregtypes.AllocationId, p3 types.TipSetKey) (*verifregtypes.Allocation, error) `perm:"read"`
StateGetAllocations func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (map[verifregtypes.AllocationId]verifregtypes.Allocation, error) `perm:"read"` StateGetAllocationForPendingDeal func(p0 context.Context, p1 abi.DealID, p2 types.TipSetKey) (*verifregtypes.Allocation, error) `perm:"read"`
StateGetClaim func(p0 context.Context, p1 address.Address, p2 verifregtypes.ClaimId, p3 types.TipSetKey) (*verifregtypes.Claim, error) `perm:"read"` StateGetAllocations func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (map[verifregtypes.AllocationId]verifregtypes.Allocation, error) `perm:"read"`
StateGetClaims func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (map[verifregtypes.ClaimId]verifregtypes.Claim, error) `perm:"read"` StateGetClaim func(p0 context.Context, p1 address.Address, p2 verifregtypes.ClaimId, p3 types.TipSetKey) (*verifregtypes.Claim, error) `perm:"read"`
StateGetNetworkParams func(p0 context.Context) (*api.NetworkParams, error) `perm:"read"` StateGetClaims func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (map[verifregtypes.ClaimId]verifregtypes.Claim, error) `perm:"read"`
StateGetRandomnessFromBeacon func(p0 context.Context, p1 crypto.DomainSeparationTag, p2 abi.ChainEpoch, p3 []byte, p4 types.TipSetKey) (abi.Randomness, error) `perm:"read"` StateGetNetworkParams func(p0 context.Context) (*api.NetworkParams, error) `perm:"read"`
StateGetRandomnessFromTickets func(p0 context.Context, p1 crypto.DomainSeparationTag, p2 abi.ChainEpoch, p3 []byte, p4 types.TipSetKey) (abi.Randomness, error) `perm:"read"` StateGetRandomnessFromBeacon func(p0 context.Context, p1 crypto.DomainSeparationTag, p2 abi.ChainEpoch, p3 []byte, p4 types.TipSetKey) (abi.Randomness, error) `perm:"read"`
StateGetReceipt func(p0 context.Context, p1 cid.Cid, p2 types.TipSetKey) (*types.MessageReceipt, error) `perm:"read"` StateGetRandomnessFromTickets func(p0 context.Context, p1 crypto.DomainSeparationTag, p2 abi.ChainEpoch, p3 []byte, p4 types.TipSetKey) (abi.Randomness, error) `perm:"read"`
StateListActors func(p0 context.Context, p1 types.TipSetKey) ([]address.Address, error) `perm:"read"` StateGetReceipt func(p0 context.Context, p1 cid.Cid, p2 types.TipSetKey) (*types.MessageReceipt, error) `perm:"read"`
StateListMessages func(p0 context.Context, p1 *api.MessageMatch, p2 types.TipSetKey, p3 abi.ChainEpoch) ([]cid.Cid, error) `perm:"read"` StateListActors func(p0 context.Context, p1 types.TipSetKey) ([]address.Address, error) `perm:"read"`
StateListMiners func(p0 context.Context, p1 types.TipSetKey) ([]address.Address, error) `perm:"read"` StateListMessages func(p0 context.Context, p1 *api.MessageMatch, p2 types.TipSetKey, p3 abi.ChainEpoch) ([]cid.Cid, error) `perm:"read"`
StateLookupID func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `perm:"read"` StateListMiners func(p0 context.Context, p1 types.TipSetKey) ([]address.Address, error) `perm:"read"`
StateMarketBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MarketBalance, error) `perm:"read"` StateLookupID func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `perm:"read"`
StateMarketDeals func(p0 context.Context, p1 types.TipSetKey) (map[string]*api.MarketDeal, error) `perm:"read"` StateMarketBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MarketBalance, error) `perm:"read"`
StateMarketParticipants func(p0 context.Context, p1 types.TipSetKey) (map[string]api.MarketBalance, error) `perm:"read"` StateMarketDeals func(p0 context.Context, p1 types.TipSetKey) (map[string]*api.MarketDeal, error) `perm:"read"`
StateMarketStorageDeal func(p0 context.Context, p1 abi.DealID, p2 types.TipSetKey) (*api.MarketDeal, error) `perm:"read"` StateMarketParticipants func(p0 context.Context, p1 types.TipSetKey) (map[string]api.MarketBalance, error) `perm:"read"`
StateMinerActiveSectors func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]*miner.SectorOnChainInfo, error) `perm:"read"` StateMarketStorageDeal func(p0 context.Context, p1 abi.DealID, p2 types.TipSetKey) (*api.MarketDeal, error) `perm:"read"`
StateMinerAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) `perm:"read"` StateMinerActiveSectors func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]*miner.SectorOnChainInfo, error) `perm:"read"`
StateMinerDeadlines func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]api.Deadline, error) `perm:"read"` StateMinerAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) `perm:"read"`
StateMinerFaults func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (bitfield.BitField, error) `perm:"read"` StateMinerDeadlines func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]api.Deadline, error) `perm:"read"`
StateMinerInfo func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MinerInfo, error) `perm:"read"` StateMinerFaults func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (bitfield.BitField, error) `perm:"read"`
StateMinerInitialPledgeCollateral func(p0 context.Context, p1 address.Address, p2 miner.SectorPreCommitInfo, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"` StateMinerInfo func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MinerInfo, error) `perm:"read"`
StateMinerPartitions func(p0 context.Context, p1 address.Address, p2 uint64, p3 types.TipSetKey) ([]api.Partition, error) `perm:"read"` StateMinerInitialPledgeCollateral func(p0 context.Context, p1 address.Address, p2 miner.SectorPreCommitInfo, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"`
StateMinerPower func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*api.MinerPower, error) `perm:"read"` StateMinerPartitions func(p0 context.Context, p1 address.Address, p2 uint64, p3 types.TipSetKey) ([]api.Partition, error) `perm:"read"`
StateMinerPreCommitDepositForPower func(p0 context.Context, p1 address.Address, p2 miner.SectorPreCommitInfo, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"` StateMinerPower func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*api.MinerPower, error) `perm:"read"`
StateMinerProvingDeadline func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*dline.Info, error) `perm:"read"` StateMinerPreCommitDepositForPower func(p0 context.Context, p1 address.Address, p2 miner.SectorPreCommitInfo, p3 types.TipSetKey) (types.BigInt, error) `perm:"read"`
StateMinerRecoveries func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (bitfield.BitField, error) `perm:"read"` StateMinerProvingDeadline func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*dline.Info, error) `perm:"read"`
StateMinerSectorAllocated func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (bool, error) `perm:"read"` StateMinerRecoveries func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (bitfield.BitField, error) `perm:"read"`
StateMinerSectorCount func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MinerSectors, error) `perm:"read"` StateMinerSectorAllocated func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (bool, error) `perm:"read"`
StateMinerSectors func(p0 context.Context, p1 address.Address, p2 *bitfield.BitField, p3 types.TipSetKey) ([]*miner.SectorOnChainInfo, error) `perm:"read"` StateMinerSectorCount func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MinerSectors, error) `perm:"read"`
StateNetworkName func(p0 context.Context) (dtypes.NetworkName, error) `perm:"read"` StateMinerSectors func(p0 context.Context, p1 address.Address, p2 *bitfield.BitField, p3 types.TipSetKey) ([]*miner.SectorOnChainInfo, error) `perm:"read"`
StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (apitypes.NetworkVersion, error) `perm:"read"` StateNetworkName func(p0 context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateReadState func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*api.ActorState, error) `perm:"read"` StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (apitypes.NetworkVersion, error) `perm:"read"`
StateReplay func(p0 context.Context, p1 types.TipSetKey, p2 cid.Cid) (*api.InvocResult, error) `perm:"read"` StateReadState func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*api.ActorState, error) `perm:"read"`
StateSearchMsg func(p0 context.Context, p1 cid.Cid) (*api.MsgLookup, error) `perm:"read"` StateReplay func(p0 context.Context, p1 types.TipSetKey, p2 cid.Cid) (*api.InvocResult, error) `perm:"read"`
StateSearchMsgLimited func(p0 context.Context, p1 cid.Cid, p2 abi.ChainEpoch) (*api.MsgLookup, error) `perm:"read"` StateSearchMsg func(p0 context.Context, p1 cid.Cid) (*api.MsgLookup, error) `perm:"read"`
StateSectorExpiration func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*lminer.SectorExpiration, error) `perm:"read"` StateSearchMsgLimited func(p0 context.Context, p1 cid.Cid, p2 abi.ChainEpoch) (*api.MsgLookup, error) `perm:"read"`
StateSectorGetInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*miner.SectorOnChainInfo, error) `perm:"read"` StateSectorExpiration func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*lminer.SectorExpiration, error) `perm:"read"`
StateSectorPartition func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*lminer.SectorLocation, error) `perm:"read"` StateSectorGetInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*miner.SectorOnChainInfo, error) `perm:"read"`
StateSectorPreCommitInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) `perm:"read"` StateSectorPartition func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*lminer.SectorLocation, error) `perm:"read"`
StateVMCirculatingSupplyInternal func(p0 context.Context, p1 types.TipSetKey) (api.CirculatingSupply, error) `perm:"read"` StateSectorPreCommitInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) `perm:"read"`
StateVerifiedClientStatus func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*abi.StoragePower, error) `perm:"read"` StateVMCirculatingSupplyInternal func(p0 context.Context, p1 types.TipSetKey) (api.CirculatingSupply, error) `perm:"read"`
StateVerifiedRegistryRootKey func(p0 context.Context, p1 types.TipSetKey) (address.Address, error) `perm:"read"` StateVerifiedClientStatus func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*abi.StoragePower, error) `perm:"read"`
StateVerifierStatus func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*abi.StoragePower, error) `perm:"read"` StateVerifiedRegistryRootKey func(p0 context.Context, p1 types.TipSetKey) (address.Address, error) `perm:"read"`
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64) (*api.MsgLookup, error) `perm:"read"` StateVerifierStatus func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*abi.StoragePower, error) `perm:"read"`
StateWaitMsgLimited func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch) (*api.MsgLookup, error) `perm:"read"` StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64) (*api.MsgLookup, error) `perm:"read"`
SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"` StateWaitMsgLimited func(p0 context.Context, p1 cid.Cid, p2 uint64, p3 abi.ChainEpoch) (*api.MsgLookup, error) `perm:"read"`
SyncCheckpoint func(p0 context.Context, p1 types.TipSetKey) error `perm:"admin"` SyncCheckBad func(p0 context.Context, p1 cid.Cid) (string, error) `perm:"read"`
SyncIncomingBlocks func(p0 context.Context) (<-chan *types.BlockHeader, error) `perm:"read"` SyncCheckpoint func(p0 context.Context, p1 types.TipSetKey) error `perm:"admin"`
SyncMarkBad func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` SyncIncomingBlocks func(p0 context.Context) (<-chan *types.BlockHeader, error) `perm:"read"`
SyncState func(p0 context.Context) (*api.SyncState, error) `perm:"read"` SyncMarkBad func(p0 context.Context, p1 cid.Cid) error `perm:"admin"`
SyncSubmitBlock func(p0 context.Context, p1 *types.BlockMsg) error `perm:"write"` SyncState func(p0 context.Context) (*api.SyncState, error) `perm:"read"`
SyncUnmarkAllBad func(p0 context.Context) error `perm:"admin"` SyncSubmitBlock func(p0 context.Context, p1 *types.BlockMsg) error `perm:"write"`
SyncUnmarkBad func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` SyncUnmarkAllBad func(p0 context.Context) error `perm:"admin"`
SyncValidateTipset func(p0 context.Context, p1 types.TipSetKey) (bool, error) `perm:"read"` SyncUnmarkBad func(p0 context.Context, p1 cid.Cid) error `perm:"admin"`
WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) `perm:"read"` SyncValidateTipset func(p0 context.Context, p1 types.TipSetKey) (bool, error) `perm:"read"`
WalletDefaultAddress func(p0 context.Context) (address.Address, error) `perm:"write"` WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) `perm:"read"`
WalletDelete func(p0 context.Context, p1 address.Address) error `perm:"admin"` WalletDefaultAddress func(p0 context.Context) (address.Address, error) `perm:"write"`
WalletExport func(p0 context.Context, p1 address.Address) (*types.KeyInfo, error) `perm:"admin"` WalletDelete func(p0 context.Context, p1 address.Address) error `perm:"admin"`
WalletHas func(p0 context.Context, p1 address.Address) (bool, error) `perm:"write"` WalletExport func(p0 context.Context, p1 address.Address) (*types.KeyInfo, error) `perm:"admin"`
WalletImport func(p0 context.Context, p1 *types.KeyInfo) (address.Address, error) `perm:"admin"` WalletHas func(p0 context.Context, p1 address.Address) (bool, error) `perm:"write"`
WalletList func(p0 context.Context) ([]address.Address, error) `perm:"write"` WalletImport func(p0 context.Context, p1 *types.KeyInfo) (address.Address, error) `perm:"admin"`
WalletNew func(p0 context.Context, p1 types.KeyType) (address.Address, error) `perm:"write"` WalletList func(p0 context.Context) ([]address.Address, error) `perm:"write"`
WalletSetDefault func(p0 context.Context, p1 address.Address) error `perm:"write"` WalletNew func(p0 context.Context, p1 types.KeyType) (address.Address, error) `perm:"write"`
WalletSign func(p0 context.Context, p1 address.Address, p2 []byte) (*crypto.Signature, error) `perm:"sign"` WalletSetDefault func(p0 context.Context, p1 address.Address) error `perm:"write"`
WalletSignMessage func(p0 context.Context, p1 address.Address, p2 *types.Message) (*types.SignedMessage, error) `perm:"sign"` WalletSign func(p0 context.Context, p1 address.Address, p2 []byte) (*crypto.Signature, error) `perm:"sign"`
WalletValidateAddress func(p0 context.Context, p1 string) (address.Address, error) `perm:"read"` WalletSignMessage func(p0 context.Context, p1 address.Address, p2 *types.Message) (*types.SignedMessage, error) `perm:"sign"`
WalletVerify func(p0 context.Context, p1 address.Address, p2 []byte, p3 *crypto.Signature) (bool, error) `perm:"read"` WalletValidateAddress func(p0 context.Context, p1 string) (address.Address, error) `perm:"read"`
}
WalletVerify func(p0 context.Context, p1 address.Address, p2 []byte, p3 *crypto.Signature) (bool, error) `perm:"read"`
} }
type FullNodeStub struct { type FullNodeStub struct {
@ -425,71 +427,73 @@ type FullNodeStub struct {
} }
type GatewayStruct struct { type GatewayStruct struct {
Internal struct { Internal GatewayMethods
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) `` }
ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) `` type GatewayMethods struct {
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) ``
ChainGetTipSet func(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) `` ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) ``
ChainGetTipSetByHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) `` ChainGetTipSet func(p0 context.Context, p1 types.TipSetKey) (*types.TipSet, error) ``
ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) `` ChainGetTipSetByHeight func(p0 context.Context, p1 abi.ChainEpoch, p2 types.TipSetKey) (*types.TipSet, error) ``
ChainHead func(p0 context.Context) (*types.TipSet, error) `` ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) ``
ChainNotify func(p0 context.Context) (<-chan []*api.HeadChange, error) `` ChainHead func(p0 context.Context) (*types.TipSet, error) ``
ChainPutObj func(p0 context.Context, p1 blocks.Block) error `` ChainNotify func(p0 context.Context) (<-chan []*api.HeadChange, error) ``
ChainReadObj func(p0 context.Context, p1 cid.Cid) ([]byte, error) `` ChainPutObj func(p0 context.Context, p1 blocks.Block) error ``
GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) `` ChainReadObj func(p0 context.Context, p1 cid.Cid) ([]byte, error) ``
MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) `` GasEstimateMessageGas func(p0 context.Context, p1 *types.Message, p2 *api.MessageSendSpec, p3 types.TipSetKey) (*types.Message, error) ``
MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) `` MpoolPush func(p0 context.Context, p1 *types.SignedMessage) (cid.Cid, error) ``
MsigGetPending func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]*api.MsigTransaction, error) `` MsigGetAvailableBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (types.BigInt, error) ``
MsigGetVested func(p0 context.Context, p1 address.Address, p2 types.TipSetKey, p3 types.TipSetKey) (types.BigInt, error) `` MsigGetPending func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) ([]*api.MsigTransaction, error) ``
StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `` MsigGetVested func(p0 context.Context, p1 address.Address, p2 types.TipSetKey, p3 types.TipSetKey) (types.BigInt, error) ``
StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (api.DealCollateralBounds, error) `` StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) ``
StateGetActor func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) `` StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (api.DealCollateralBounds, error) ``
StateGetReceipt func(p0 context.Context, p1 cid.Cid, p2 types.TipSetKey) (*types.MessageReceipt, error) `` StateGetActor func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*types.Actor, error) ``
StateListMiners func(p0 context.Context, p1 types.TipSetKey) ([]address.Address, error) `` StateGetReceipt func(p0 context.Context, p1 cid.Cid, p2 types.TipSetKey) (*types.MessageReceipt, error) ``
StateLookupID func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `` StateListMiners func(p0 context.Context, p1 types.TipSetKey) ([]address.Address, error) ``
StateMarketBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MarketBalance, error) `` StateLookupID func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) ``
StateMarketStorageDeal func(p0 context.Context, p1 abi.DealID, p2 types.TipSetKey) (*api.MarketDeal, error) `` StateMarketBalance func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MarketBalance, error) ``
StateMinerInfo func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MinerInfo, error) `` StateMarketStorageDeal func(p0 context.Context, p1 abi.DealID, p2 types.TipSetKey) (*api.MarketDeal, error) ``
StateMinerPower func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*api.MinerPower, error) `` StateMinerInfo func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (api.MinerInfo, error) ``
StateMinerProvingDeadline func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*dline.Info, error) `` StateMinerPower func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*api.MinerPower, error) ``
StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (abinetwork.Version, error) `` StateMinerProvingDeadline func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*dline.Info, error) ``
StateSearchMsg func(p0 context.Context, p1 cid.Cid) (*api.MsgLookup, error) `` StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (abinetwork.Version, error) ``
StateSectorGetInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*miner.SectorOnChainInfo, error) `` StateSearchMsg func(p0 context.Context, p1 cid.Cid) (*api.MsgLookup, error) ``
StateVerifiedClientStatus func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*abi.StoragePower, error) `` StateSectorGetInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*miner.SectorOnChainInfo, error) ``
StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64) (*api.MsgLookup, error) `` StateVerifiedClientStatus func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*abi.StoragePower, error) ``
Version func(p0 context.Context) (api.APIVersion, error) `` StateWaitMsg func(p0 context.Context, p1 cid.Cid, p2 uint64) (*api.MsgLookup, error) ``
WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) `` Version func(p0 context.Context) (api.APIVersion, error) ``
}
WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) ``
} }
type GatewayStub struct { type GatewayStub struct {

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -363,6 +363,18 @@ func (h *EthHash) UnmarshalJSON(b []byte) error {
return nil return nil
} }
func (h EthHash) String() string {
return "0x" + hex.EncodeToString(h[:])
}
// Should ONLY be used for blocks and Filecoin messages. Eth transactions expect a different hashing scheme.
func (h EthHash) ToCid() cid.Cid {
// err is always nil
mh, _ := multihash.EncodeName(h[:], "blake2b-256")
return cid.NewCidV1(cid.DagCBOR, mh)
}
func decodeHexString(s string, expectedLen int) ([]byte, error) { func decodeHexString(s string, expectedLen int) ([]byte, error) {
s = handleHexStringPrefix(s) s = handleHexStringPrefix(s)
if len(s) != expectedLen*2 { if len(s) != expectedLen*2 {
@ -420,18 +432,6 @@ func EthHashFromTxBytes(b []byte) EthHash {
return ethHash return ethHash
} }
func (h EthHash) String() string {
return "0x" + hex.EncodeToString(h[:])
}
// Should ONLY be used for blocks and Filecoin messages. Eth transactions expect a different hashing scheme.
func (h EthHash) ToCid() cid.Cid {
// err is always nil
mh, _ := multihash.EncodeName(h[:], "blake2b-256")
return cid.NewCidV1(cid.DagCBOR, mh)
}
type EthFeeHistory struct { type EthFeeHistory struct {
OldestBlock uint64 `json:"oldestBlock"` OldestBlock uint64 `json:"oldestBlock"`
BaseFeePerGas []EthBigInt `json:"baseFeePerGas"` BaseFeePerGas []EthBigInt `json:"baseFeePerGas"`
@ -441,9 +441,33 @@ type EthFeeHistory struct {
type EthFilterID EthHash type EthFilterID EthHash
func (h EthFilterID) MarshalJSON() ([]byte, error) {
return (EthHash)(h).MarshalJSON()
}
func (h *EthFilterID) UnmarshalJSON(b []byte) error {
return (*EthHash)(h).UnmarshalJSON(b)
}
func (h EthFilterID) String() string {
return (EthHash)(h).String()
}
// An opaque identifier generated by the Lotus node to refer to an active subscription. // An opaque identifier generated by the Lotus node to refer to an active subscription.
type EthSubscriptionID EthHash type EthSubscriptionID EthHash
func (h EthSubscriptionID) MarshalJSON() ([]byte, error) {
return (EthHash)(h).MarshalJSON()
}
func (h *EthSubscriptionID) UnmarshalJSON(b []byte) error {
return (*EthHash)(h).UnmarshalJSON(b)
}
func (h EthSubscriptionID) String() string {
return (EthHash)(h).String()
}
type EthFilterSpec struct { type EthFilterSpec struct {
// Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first, // Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first,
// "pending" for not yet committed messages. // "pending" for not yet committed messages.
@ -595,6 +619,43 @@ type EthLog struct {
BlockNumber EthUint64 `json:"blockNumber"` BlockNumber EthUint64 `json:"blockNumber"`
} }
// EthSubscribeParams handles raw jsonrpc params for eth_subscribe
type EthSubscribeParams struct {
EventType string
Params *EthSubscriptionParams
}
func (e *EthSubscribeParams) UnmarshalJSON(b []byte) error {
var params []json.RawMessage
err := json.Unmarshal(b, &params)
if err != nil {
return err
}
switch len(params) {
case 2:
err = json.Unmarshal(params[1], &e.Params)
if err != nil {
return err
}
fallthrough
case 1:
err = json.Unmarshal(params[0], &e.EventType)
if err != nil {
return err
}
default:
return xerrors.Errorf("expected 1 or 2 params, got %d", len(params))
}
return nil
}
func (e EthSubscribeParams) MarshalJSON() ([]byte, error) {
if e.Params != nil {
return json.Marshal([]interface{}{e.EventType, e.Params})
}
return json.Marshal([]interface{}{e.EventType})
}
type EthSubscriptionParams struct { type EthSubscriptionParams struct {
// List of topics to be matched. // List of topics to be matched.
// Optional, default: empty list // Optional, default: empty list

View File

@ -93,6 +93,48 @@ func TestEthHash(t *testing.T) {
h1, err := EthHashFromCid(c) h1, err := EthHashFromCid(c)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, h, h1) require.Equal(t, h, h1)
jm, err := json.Marshal(h)
require.NoError(t, err)
require.Equal(t, hash, string(jm))
}
}
func TestEthFilterID(t *testing.T) {
testcases := []string{
`"0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"`,
`"0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"`,
}
for _, hash := range testcases {
var h EthFilterID
err := h.UnmarshalJSON([]byte(hash))
require.Nil(t, err)
require.Equal(t, h.String(), strings.Replace(hash, `"`, "", -1))
jm, err := json.Marshal(h)
require.NoError(t, err)
require.Equal(t, hash, string(jm))
}
}
func TestEthSubscriptionID(t *testing.T) {
testcases := []string{
`"0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184"`,
`"0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"`,
}
for _, hash := range testcases {
var h EthSubscriptionID
err := h.UnmarshalJSON([]byte(hash))
require.Nil(t, err)
require.Equal(t, h.String(), strings.Replace(hash, `"`, "", -1))
jm, err := json.Marshal(h)
require.NoError(t, err)
require.Equal(t, hash, string(jm))
} }
} }

View File

@ -319,11 +319,33 @@ func GetFullNodeAPIV1Single(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientClo
return v1API, closer, nil return v1API, closer, nil
} }
func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) { type GetFullNodeOptions struct {
ethSubHandler api.EthSubscriber
}
type GetFullNodeOption func(*GetFullNodeOptions)
func FullNodeWithEthSubscribtionHandler(sh api.EthSubscriber) GetFullNodeOption {
return func(opts *GetFullNodeOptions) {
opts.ethSubHandler = sh
}
}
func GetFullNodeAPIV1(ctx *cli.Context, opts ...GetFullNodeOption) (v1api.FullNode, jsonrpc.ClientCloser, error) {
if tn, ok := ctx.App.Metadata["testnode-full"]; ok { if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
return tn.(v1api.FullNode), func() {}, nil return tn.(v1api.FullNode), func() {}, nil
} }
var options GetFullNodeOptions
for _, opt := range opts {
opt(&options)
}
var rpcOpts []jsonrpc.Option
if options.ethSubHandler != nil {
rpcOpts = append(rpcOpts, jsonrpc.WithClientHandler("Filecoin", options.ethSubHandler), jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"))
}
heads, err := GetRawAPIMulti(ctx, repo.FullNode, "v1") heads, err := GetRawAPIMulti(ctx, repo.FullNode, "v1")
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -337,7 +359,7 @@ func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, e
var closers []jsonrpc.ClientCloser var closers []jsonrpc.ClientCloser
for _, head := range heads { for _, head := range heads {
v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header) v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header, rpcOpts...)
if err != nil { if err != nil {
log.Warnf("Not able to establish connection to node with addr: ", head.addr) log.Warnf("Not able to establish connection to node with addr: ", head.addr)
continue continue

View File

@ -162,7 +162,9 @@ var runCmd = &cli.Command{
log.Fatalf("Cannot register the view: %v", err) log.Fatalf("Cannot register the view: %v", err)
} }
api, closer, err := lcli.GetFullNodeAPIV1(cctx) subHnd := gateway.NewEthSubHandler()
api, closer, err := lcli.GetFullNodeAPIV1(cctx, cliutil.FullNodeWithEthSubscribtionHandler(subHnd))
if err != nil { if err != nil {
return err return err
} }
@ -195,7 +197,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err) return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err)
} }
gwapi := gateway.NewNode(api, lookbackCap, waitLookback, rateLimit, rateLimitTimeout) gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, rateLimit, rateLimitTimeout)
h, err := gateway.Handler(gwapi, api, perConnRateLimit, connPerMinute, serverOptions...) h, err := gateway.Handler(gwapi, api, perConnRateLimit, connPerMinute, serverOptions...)
if err != nil { if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler") return xerrors.Errorf("failed to set up gateway HTTP handler")

View File

@ -2520,40 +2520,7 @@ Perms: write
Inputs: Inputs:
```json ```json
[ [
[ "0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
] ]
``` ```
@ -2574,40 +2541,7 @@ Perms: write
Inputs: Inputs:
```json ```json
[ [
[ "0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
] ]
``` ```
@ -2881,43 +2815,7 @@ Perms: write
Inputs: `null` Inputs: `null`
Response: Response: `"0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"`
```json
[
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
```
### EthNewFilter ### EthNewFilter
Installs a persistent filter based on given filter spec. Installs a persistent filter based on given filter spec.
@ -2938,43 +2836,7 @@ Inputs:
] ]
``` ```
Response: Response: `"0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"`
```json
[
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
```
### EthNewPendingTransactionFilter ### EthNewPendingTransactionFilter
Installs a persistent filter to notify when new messages arrive in the message pool. Installs a persistent filter to notify when new messages arrive in the message pool.
@ -2984,43 +2846,7 @@ Perms: write
Inputs: `null` Inputs: `null`
Response: Response: `"0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"`
```json
[
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
```
### EthProtocolVersion ### EthProtocolVersion
@ -3060,57 +2886,11 @@ Perms: write
Inputs: Inputs:
```json ```json
[ [
"string value", "Bw=="
{
"topics": [
[
"0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"
]
]
}
] ]
``` ```
Response: Response: `"0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"`
```json
{
"subscription": [
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
],
"result": {}
}
```
### EthUninstallFilter ### EthUninstallFilter
Uninstalls a filter with given id. Uninstalls a filter with given id.
@ -3121,40 +2901,7 @@ Perms: write
Inputs: Inputs:
```json ```json
[ [
[ "0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
] ]
``` ```
@ -3169,40 +2916,7 @@ Perms: write
Inputs: Inputs:
```json ```json
[ [
[ "0x37690cfec6c1bf4c3b9288c7a5d783e98731e90b0a4c177c2a374c7a9427355e"
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
] ]
``` ```

71
gateway/eth_sub.go Normal file
View File

@ -0,0 +1,71 @@
package gateway
import (
"context"
"sync"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)
type EthSubHandler struct {
queued map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse
sinks map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error
lk sync.Mutex
}
func NewEthSubHandler() *EthSubHandler {
return &EthSubHandler{
queued: make(map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse),
sinks: make(map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error),
}
}
func (e *EthSubHandler) AddSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error {
e.lk.Lock()
defer e.lk.Unlock()
for _, p := range e.queued[id] {
p := p // copy
if err := sink(ctx, &p); err != nil {
return err
}
}
delete(e.queued, id)
e.sinks[id] = sink
return nil
}
func (e *EthSubHandler) RemoveSub(id ethtypes.EthSubscriptionID) {
e.lk.Lock()
defer e.lk.Unlock()
delete(e.sinks, id)
delete(e.queued, id)
}
func (e *EthSubHandler) EthSubscription(ctx context.Context, r jsonrpc.RawParams) error {
p, err := jsonrpc.DecodeParams[ethtypes.EthSubscriptionResponse](r)
if err != nil {
return err
}
e.lk.Lock()
sink := e.sinks[p.SubscriptionID]
if sink == nil {
e.queued[p.SubscriptionID] = append(e.queued[p.SubscriptionID], p)
e.lk.Unlock()
return nil
}
e.lk.Unlock()
return sink(ctx, &p) // todo track errors and auto-unsubscribe on rpc conn close?
}
var _ api.EthSubscriber = (*EthSubHandler)(nil)

View File

@ -27,14 +27,14 @@ const perConnLimiterKey perConnLimiterKeyType = "limiter"
type filterTrackerKeyType string type filterTrackerKeyType string
const filterTrackerKey filterTrackerKeyType = "filterTracker" const statefulCallTrackerKey filterTrackerKeyType = "statefulCallTracker"
// Handler returns a gateway http.Handler, to be mounted as-is on the server. // Handler returns a gateway http.Handler, to be mounted as-is on the server.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) { func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) {
m := mux.NewRouter() m := mux.NewRouter()
serveRpc := func(path string, hnd interface{}) { serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithServerErrors(lapi.RPCErrors))...) rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))...)
rpcServer.Register("Filecoin", hnd) rpcServer.Register("Filecoin", hnd)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")
@ -90,7 +90,7 @@ func (h RateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter)) r = r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter))
// also add a filter tracker to the context // also add a filter tracker to the context
r = r.WithContext(context.WithValue(r.Context(), filterTrackerKey, newFilterTracker())) r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker()))
h.handler.ServeHTTP(w, r) h.handler.ServeHTTP(w, r)
} }

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-state-types/network"
@ -117,7 +118,7 @@ type TargetAPI interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
} }
@ -125,6 +126,7 @@ var _ TargetAPI = *new(api.FullNode) // gateway depends on latest
type Node struct { type Node struct {
target TargetAPI target TargetAPI
subHnd *EthSubHandler
lookbackCap time.Duration lookbackCap time.Duration
stateWaitLookbackLimit abi.ChainEpoch stateWaitLookbackLimit abi.ChainEpoch
rateLimiter *rate.Limiter rateLimiter *rate.Limiter
@ -141,7 +143,7 @@ var (
) )
// NewNode creates a new gateway node. // NewNode creates a new gateway node.
func NewNode(api TargetAPI, lookbackCap time.Duration, stateWaitLookbackLimit abi.ChainEpoch, rateLimit int64, rateLimitTimeout time.Duration) *Node { func NewNode(api TargetAPI, sHnd *EthSubHandler, lookbackCap time.Duration, stateWaitLookbackLimit abi.ChainEpoch, rateLimit int64, rateLimitTimeout time.Duration) *Node {
var limit rate.Limit var limit rate.Limit
if rateLimit == 0 { if rateLimit == 0 {
limit = rate.Inf limit = rate.Inf
@ -150,6 +152,7 @@ func NewNode(api TargetAPI, lookbackCap time.Duration, stateWaitLookbackLimit ab
} }
return &Node{ return &Node{
target: api, target: api,
subHnd: sHnd,
lookbackCap: lookbackCap, lookbackCap: lookbackCap,
stateWaitLookbackLimit: stateWaitLookbackLimit, stateWaitLookbackLimit: stateWaitLookbackLimit,
rateLimiter: rate.NewLimiter(limit, stateRateLimitTokens), rateLimiter: rate.NewLimiter(limit, stateRateLimitTokens),

View File

@ -89,7 +89,7 @@ func TestGatewayAPIChainGetTipSetByHeight(t *testing.T) {
tt := tt tt := tt
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
mock := &mockGatewayDepsAPI{} mock := &mockGatewayDepsAPI{}
a := NewNode(mock, DefaultLookbackCap, DefaultStateWaitLookbackLimit, 0, time.Minute) a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, 0, time.Minute)
// Create tipsets from genesis up to tskh and return the highest // Create tipsets from genesis up to tskh and return the highest
ts := mock.createTipSets(tt.args.tskh, tt.args.genesisTS) ts := mock.createTipSets(tt.args.tskh, tt.args.genesisTS)
@ -245,7 +245,7 @@ func TestGatewayVersion(t *testing.T) {
//stm: @GATEWAY_NODE_GET_VERSION_001 //stm: @GATEWAY_NODE_GET_VERSION_001
ctx := context.Background() ctx := context.Background()
mock := &mockGatewayDepsAPI{} mock := &mockGatewayDepsAPI{}
a := NewNode(mock, DefaultLookbackCap, DefaultStateWaitLookbackLimit, 0, time.Minute) a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, 0, time.Minute)
v, err := a.Version(ctx) v, err := a.Version(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -256,7 +256,7 @@ func TestGatewayLimitTokensAvailable(t *testing.T) {
ctx := context.Background() ctx := context.Background()
mock := &mockGatewayDepsAPI{} mock := &mockGatewayDepsAPI{}
tokens := 3 tokens := 3
a := NewNode(mock, DefaultLookbackCap, DefaultStateWaitLookbackLimit, int64(tokens), time.Minute) a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, int64(tokens), time.Minute)
require.NoError(t, a.limit(ctx, tokens), "requests should not be limited when there are enough tokens available") require.NoError(t, a.limit(ctx, tokens), "requests should not be limited when there are enough tokens available")
} }
@ -264,7 +264,7 @@ func TestGatewayLimitTokensNotAvailable(t *testing.T) {
ctx := context.Background() ctx := context.Background()
mock := &mockGatewayDepsAPI{} mock := &mockGatewayDepsAPI{}
tokens := 3 tokens := 3
a := NewNode(mock, DefaultLookbackCap, DefaultStateWaitLookbackLimit, int64(1), time.Millisecond) a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, int64(1), time.Millisecond)
var err error var err error
// try to be rate limited // try to be rate limited
for i := 0; i <= 1000; i++ { for i := 0; i <= 1000; i++ {

View File

@ -3,12 +3,14 @@ package gateway
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"sync" "sync"
"time" "time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
@ -352,7 +354,7 @@ func (gw *Node) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID
return nil, err return nil, err
} }
ft := filterTrackerFromContext(ctx) ft := statefulCallFromContext(ctx)
ft.lk.Lock() ft.lk.Lock()
_, ok := ft.userFilters[id] _, ok := ft.userFilters[id]
ft.lk.Unlock() ft.lk.Unlock()
@ -369,7 +371,7 @@ func (gw *Node) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID) (
return nil, err return nil, err
} }
ft := filterTrackerFromContext(ctx) ft := statefulCallFromContext(ctx)
ft.lk.Lock() ft.lk.Lock()
_, ok := ft.userFilters[id] _, ok := ft.userFilters[id]
ft.lk.Unlock() ft.lk.Unlock()
@ -417,7 +419,7 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID)
} }
// check if the filter belongs to this connection // check if the filter belongs to this connection
ft := filterTrackerFromContext(ctx) ft := statefulCallFromContext(ctx)
ft.lk.Lock() ft.lk.Lock()
defer ft.lk.Unlock() defer ft.lk.Unlock()
@ -434,18 +436,88 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID)
return ok, nil return ok, nil
} }
func (gw *Node) EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) { func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
return nil, xerrors.Errorf("not implemented") // validate params
_, err := jsonrpc.DecodeParams[ethtypes.EthSubscribeParams](p)
if err != nil {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("decoding params: %w", err)
}
if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return ethtypes.EthSubscriptionID{}, err
}
if gw.subHnd == nil {
return ethtypes.EthSubscriptionID{}, xerrors.New("subscription support not enabled")
}
ethCb, ok := jsonrpc.ExtractReverseClient[api.EthSubscriberMethods](ctx)
if !ok {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
}
ft := statefulCallFromContext(ctx)
ft.lk.Lock()
defer ft.lk.Unlock()
if len(ft.userSubscriptions) >= EthMaxFiltersPerConn {
return ethtypes.EthSubscriptionID{}, fmt.Errorf("too many subscriptions")
}
sub, err := gw.target.EthSubscribe(ctx, p)
if err != nil {
return ethtypes.EthSubscriptionID{}, err
}
err = gw.subHnd.AddSub(ctx, sub, func(ctx context.Context, response *ethtypes.EthSubscriptionResponse) error {
outParam, err := json.Marshal(response)
if err != nil {
return err
}
return ethCb.EthSubscription(ctx, outParam)
})
if err != nil {
return ethtypes.EthSubscriptionID{}, err
}
ft.userSubscriptions[sub] = time.Now()
return sub, err
} }
func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) { func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) {
return false, xerrors.Errorf("not implemented") if err := gw.limit(ctx, stateRateLimitTokens); err != nil {
return false, err
}
// check if the filter belongs to this connection
ft := statefulCallFromContext(ctx)
ft.lk.Lock()
defer ft.lk.Unlock()
if _, ok := ft.userSubscriptions[id]; !ok {
return false, nil
}
ok, err := gw.target.EthUnsubscribe(ctx, id)
if err != nil {
return false, err
}
delete(ft.userSubscriptions, id)
if gw.subHnd != nil {
gw.subHnd.RemoveSub(id)
}
return ok, nil
} }
var EthMaxFiltersPerConn = 16 // todo make this configurable var EthMaxFiltersPerConn = 16 // todo make this configurable
func addUserFilterLimited(ctx context.Context, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) { func addUserFilterLimited(ctx context.Context, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) {
ft := filterTrackerFromContext(ctx) ft := statefulCallFromContext(ctx)
ft.lk.Lock() ft.lk.Lock()
defer ft.lk.Unlock() defer ft.lk.Unlock()
@ -463,19 +535,21 @@ func addUserFilterLimited(ctx context.Context, cb func() (ethtypes.EthFilterID,
return id, nil return id, nil
} }
func filterTrackerFromContext(ctx context.Context) *filterTracker { func statefulCallFromContext(ctx context.Context) *statefulCallTracker {
return ctx.Value(filterTrackerKey).(*filterTracker) return ctx.Value(statefulCallTrackerKey).(*statefulCallTracker)
} }
type filterTracker struct { type statefulCallTracker struct {
lk sync.Mutex lk sync.Mutex
userFilters map[ethtypes.EthFilterID]time.Time userFilters map[ethtypes.EthFilterID]time.Time
userSubscriptions map[ethtypes.EthSubscriptionID]time.Time
} }
// called per request (ws connection) // called per request (ws connection)
func newFilterTracker() *filterTracker { func newStatefulCallTracker() *statefulCallTracker {
return &filterTracker{ return &statefulCallTracker{
userFilters: make(map[ethtypes.EthFilterID]time.Time), userFilters: make(map[ethtypes.EthFilterID]time.Time),
userSubscriptions: make(map[ethtypes.EthSubscriptionID]time.Time),
} }
} }

View File

@ -259,7 +259,7 @@ func generate(path, pkg, outpkg, outfile string) error {
if len(tf) != 2 { if len(tf) != 2 {
continue continue
} }
if tf[0] != "perm" { // todo: allow more tag types if tf[0] != "perm" && tf[0] != "rpc_method" && tf[0] != "notify" { // todo: allow more tag types
continue continue
} }
info.Methods[mname].Tags[tf[0]] = tf info.Methods[mname].Tags[tf[0]] = tf
@ -302,12 +302,14 @@ type {{.Num}}Struct struct {
{{range .Include}} {{range .Include}}
{{.}}Struct {{.}}Struct
{{end}} {{end}}
Internal struct { Internal {{.Num}}Methods
}
type {{.Num}}Methods struct {
{{range .Methods}} {{range .Methods}}
{{.Num}} func({{.NamedParams}}) ({{.Results}}) `+"`"+`{{range .Tags}}{{index . 0}}:"{{index . 1}}"{{end}}`+"`"+` {{.Num}} func({{.NamedParams}}) ({{.Results}}) `+"`"+`{{range .Tags}}{{index . 0}}:"{{index . 1}}"{{end}}`+"`"+`
{{end}} {{end}}
} }
}
type {{.Num}}Stub struct { type {{.Num}}Stub struct {
{{range .Include}} {{range .Include}}

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.25.2 github.com/filecoin-project/go-fil-markets v1.25.2
github.com/filecoin-project/go-jsonrpc v0.1.9 github.com/filecoin-project/go-jsonrpc v0.2.0
github.com/filecoin-project/go-legs v0.4.4 github.com/filecoin-project/go-legs v0.4.4
github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-paramfetch v0.0.4

5
go.sum
View File

@ -340,8 +340,8 @@ github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0/go.mod h1:7aWZdaQ1b16BVoQUYR+
github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGyDjJjYSRX7hp/FGOStdqrWyDI= github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGyDjJjYSRX7hp/FGOStdqrWyDI=
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI=
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g=
github.com/filecoin-project/go-jsonrpc v0.1.9 h1:HRWLxo7HAWzI3xZGeFG4LZJoYpms+Q+8kwmMTLnyS3A= github.com/filecoin-project/go-jsonrpc v0.2.0 h1:lhcu0Oa7xxwdclU4EyBPgnZihiZOaTFiAnW6P3NRqko=
github.com/filecoin-project/go-jsonrpc v0.1.9/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-jsonrpc v0.2.0/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/filecoin-project/go-legs v0.4.4 h1:mpMmAOOnamaz0CV9rgeKhEWA8j9kMC+f+UGCGrxKaZo= github.com/filecoin-project/go-legs v0.4.4 h1:mpMmAOOnamaz0CV9rgeKhEWA8j9kMC+f+UGCGrxKaZo=
github.com/filecoin-project/go-legs v0.4.4/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s= github.com/filecoin-project/go-legs v0.4.4/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
@ -824,7 +824,6 @@ github.com/ipfs/go-log/v2 v2.0.1/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBW
github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0=
github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw= github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.0.8/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscwkWG+dw=
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.2/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM= github.com/ipfs/go-log/v2 v2.1.2/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=

View File

@ -6,6 +6,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
"os" "os"
"sort" "sort"
@ -21,6 +22,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
@ -29,6 +31,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes" "github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/itests/kit"
res "github.com/filecoin-project/lotus/lib/result"
) )
// SolidityContractDef holds information about one of the test contracts // SolidityContractDef holds information about one of the test contracts
@ -438,15 +441,15 @@ func TestEthSubscribeLogsNoTopicSpec(t *testing.T) {
t.Logf("actor ID address is %s", idAddr) t.Logf("actor ID address is %s", idAddr)
// install filter // install filter
respCh, err := client.EthSubscribe(ctx, "logs", nil) subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "logs"})).Assert(require.NoError))
require.NoError(err) require.NoError(err)
subResponses := []ethtypes.EthSubscriptionResponse{} var subResponses []ethtypes.EthSubscriptionResponse
go func() { err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
for resp := range respCh { subResponses = append(subResponses, *resp)
subResponses = append(subResponses, resp) return nil
} })
}() require.NoError(err)
const iterations = 10 const iterations = 10
ethContractAddr, messages := invokeLogFourData(t, client, iterations) ethContractAddr, messages := invokeLogFourData(t, client, iterations)
@ -567,44 +570,33 @@ func TestEthSubscribeLogs(t *testing.T) {
testResponses := map[string]chan ethtypes.EthSubscriptionResponse{} testResponses := map[string]chan ethtypes.EthSubscriptionResponse{}
// quit is used to signal that we're ready to start testing collected results
quit := make(chan struct{})
// Create all the filters // Create all the filters
for _, tc := range testCases { for _, tc := range testCases {
// subscribe to topics in filter // subscribe to topics in filter
subCh, err := client.EthSubscribe(ctx, "logs", &ethtypes.EthSubscriptionParams{Topics: tc.spec.Topics}) subParam, err := json.Marshal(ethtypes.EthSubscribeParams{
EventType: "logs",
Params: &ethtypes.EthSubscriptionParams{Topics: tc.spec.Topics},
})
require.NoError(err)
subId, err := client.EthSubscribe(ctx, subParam)
require.NoError(err) require.NoError(err)
responseCh := make(chan ethtypes.EthSubscriptionResponse, len(invocations)) responseCh := make(chan ethtypes.EthSubscriptionResponse, len(invocations))
testResponses[tc.name] = responseCh testResponses[tc.name] = responseCh
// start a goroutine to forward responses from subscription to a buffered channel with guaranteed capacity err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
go func(subCh <-chan ethtypes.EthSubscriptionResponse, responseCh chan<- ethtypes.EthSubscriptionResponse, quit chan struct{}) { responseCh <- *resp
defer func() { return nil
close(responseCh) })
}() require.NoError(err)
for {
select {
case resp := <-subCh:
responseCh <- resp
case <-quit:
return
case <-ctx.Done():
return
}
}
}(subCh, responseCh, quit)
} }
// Perform all the invocations // Perform all the invocations
messages := invokeAndWaitUntilAllOnChain(t, client, invocations) messages := invokeAndWaitUntilAllOnChain(t, client, invocations)
// wait a little for subscriptions to gather results and then tell all the goroutines to stop // wait a little for subscriptions to gather results
time.Sleep(blockTime * 6) time.Sleep(blockTime * 6)
close(quit)
for _, tc := range testCases { for _, tc := range testCases {
tc := tc // appease the lint despot tc := tc // appease the lint despot
@ -612,6 +604,9 @@ func TestEthSubscribeLogs(t *testing.T) {
responseCh, ok := testResponses[tc.name] responseCh, ok := testResponses[tc.name]
require.True(ok) require.True(ok)
// don't expect any more responses
close(responseCh)
var elogs []*ethtypes.EthLog var elogs []*ethtypes.EthLog
for resp := range responseCh { for resp := range responseCh {
rlist, ok := resp.Result.([]interface{}) rlist, ok := resp.Result.([]interface{})

View File

@ -290,7 +290,7 @@ func startNodes(
ens.InterconnectAll().BeginMining(blocktime) ens.InterconnectAll().BeginMining(blocktime)
// Create a gateway server in front of the full node // Create a gateway server in front of the full node
gwapi := gateway.NewNode(full, lookbackCap, stateWaitLookbackLimit, 0, time.Minute) gwapi := gateway.NewNode(full, nil, lookbackCap, stateWaitLookbackLimit, 0, time.Minute)
handler, err := gateway.Handler(gwapi, full, 0, 0) handler, err := gateway.Handler(gwapi, full, 0, 0)
require.NoError(t, err) require.NoError(t, err)

View File

@ -47,6 +47,7 @@ import (
"github.com/filecoin-project/lotus/chain/wallet/key" "github.com/filecoin-project/lotus/chain/wallet/key"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
"github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/markets/idxprov" "github.com/filecoin-project/lotus/markets/idxprov"
"github.com/filecoin-project/lotus/markets/idxprov/idxprov_test" "github.com/filecoin-project/lotus/markets/idxprov/idxprov_test"
@ -210,7 +211,7 @@ func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble {
n.genesis.accounts = append(n.genesis.accounts, genacc) n.genesis.accounts = append(n.genesis.accounts, genacc)
} }
*full = TestFullNode{t: n.t, options: options, DefaultKey: key} *full = TestFullNode{t: n.t, options: options, DefaultKey: key, EthSubRouter: gateway.NewEthSubHandler()}
n.inactive.fullnodes = append(n.inactive.fullnodes, full) n.inactive.fullnodes = append(n.inactive.fullnodes, full)
return n return n

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet/key" "github.com/filecoin-project/lotus/chain/wallet/key"
cliutil "github.com/filecoin-project/lotus/cli/util" cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
) )
@ -46,6 +47,10 @@ type TestFullNode struct {
Stop node.StopFunc Stop node.StopFunc
// gateway handler makes it convenient to register callbalks per topic, so we
// also use it for tests
EthSubRouter *gateway.EthSubHandler
options nodeOpts options nodeOpts
} }

View File

@ -13,6 +13,8 @@ import (
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
@ -52,7 +54,12 @@ func fullRpc(t *testing.T, f *TestFullNode) (*TestFullNode, Closer) {
fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String()) fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String()) sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String())
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil) rpcOpts := []jsonrpc.Option{
jsonrpc.WithClientHandler("Filecoin", f.EthSubRouter),
jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"),
}
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, rpcOpts...)
require.NoError(t, err) require.NoError(t, err)
f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl

View File

@ -30,6 +30,12 @@ func Wrap[T any](value T, err error) Result[T] {
} }
} }
func (r *Result[T]) Unwrap() (T, error) { func (r Result[T]) Unwrap() (T, error) {
return r.Value, r.Error return r.Value, r.Error
} }
func (r Result[T]) Assert(noErrFn func(err error, msgAndArgs ...interface{})) T {
noErrFn(r.Error)
return r.Value
}

View File

@ -3,6 +3,7 @@ package full
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
@ -16,6 +17,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
builtintypes "github.com/filecoin-project/go-state-types/builtin" builtintypes "github.com/filecoin-project/go-state-types/builtin"
@ -75,7 +77,7 @@ type EthEventAPI interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) EthSubscribe(ctx context.Context, params jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
} }
@ -132,6 +134,7 @@ type EthEvent struct {
FilterStore filter.FilterStore FilterStore filter.FilterStore
SubManager *EthSubscriptionManager SubManager *EthSubscriptionManager
MaxFilterHeightRange abi.ChainEpoch MaxFilterHeightRange abi.ChainEpoch
SubscribtionCtx context.Context
} }
var _ EthEventAPI = (*EthEvent)(nil) var _ EthEventAPI = (*EthEvent)(nil)
@ -1100,37 +1103,45 @@ const (
EthSubscribeEventTypeLogs = "logs" EthSubscribeEventTypeLogs = "logs"
) )
func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) { func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
if e.SubManager == nil { params, err := jsonrpc.DecodeParams[ethtypes.EthSubscribeParams](p)
return nil, api.ErrNotSupported
}
// Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the
// method to be "eth_subscription". This probably doesn't matter in practice.
sub, err := e.SubManager.StartSubscription(ctx)
if err != nil { if err != nil {
return nil, err return ethtypes.EthSubscriptionID{}, xerrors.Errorf("decoding params: %w", err)
} }
switch eventType { if e.SubManager == nil {
return ethtypes.EthSubscriptionID{}, api.ErrNotSupported
}
ethCb, ok := jsonrpc.ExtractReverseClient[api.EthSubscriberMethods](ctx)
if !ok {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
}
sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription)
if err != nil {
return ethtypes.EthSubscriptionID{}, err
}
switch params.EventType {
case EthSubscribeEventTypeHeads: case EthSubscribeEventTypeHeads:
f, err := e.TipSetFilterManager.Install(ctx) f, err := e.TipSetFilterManager.Install(ctx)
if err != nil { if err != nil {
// clean up any previous filters added and stop the sub // clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id) _, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err return ethtypes.EthSubscriptionID{}, err
} }
sub.addFilter(ctx, f) sub.addFilter(ctx, f)
case EthSubscribeEventTypeLogs: case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{} keys := map[string][][]byte{}
if params != nil { if params.Params != nil {
var err error var err error
keys, err = parseEthTopics(params.Topics) keys, err = parseEthTopics(params.Params.Topics)
if err != nil { if err != nil {
// clean up any previous filters added and stop the sub // clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id) _, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err return ethtypes.EthSubscriptionID{}, err
} }
} }
@ -1138,14 +1149,14 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
if err != nil { if err != nil {
// clean up any previous filters added and stop the sub // clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id) _, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err return ethtypes.EthSubscriptionID{}, err
} }
sub.addFilter(ctx, f) sub.addFilter(ctx, f)
default: default:
return nil, xerrors.Errorf("unsupported event type: %s", eventType) return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", params.EventType)
} }
return sub.out, nil return sub.id, nil
} }
func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) { func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) {
@ -1294,7 +1305,7 @@ type EthSubscriptionManager struct {
subs map[ethtypes.EthSubscriptionID]*ethSubscription subs map[ethtypes.EthSubscriptionID]*ethSubscription
} }
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { // nolint func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint
rawid, err := uuid.NewRandom() rawid, err := uuid.NewRandom()
if err != nil { if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err) return nil, xerrors.Errorf("new uuid: %w", err)
@ -1310,7 +1321,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
ChainAPI: e.ChainAPI, ChainAPI: e.ChainAPI,
id: id, id: id,
in: make(chan interface{}, 200), in: make(chan interface{}, 200),
out: make(chan ethtypes.EthSubscriptionResponse, 20), out: out,
quit: quit, quit: quit,
} }
@ -1340,13 +1351,15 @@ func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtyp
return sub.filters, nil return sub.filters, nil
} }
type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error
type ethSubscription struct { type ethSubscription struct {
Chain *store.ChainStore Chain *store.ChainStore
StateAPI StateAPI StateAPI StateAPI
ChainAPI ChainAPI ChainAPI ChainAPI
id ethtypes.EthSubscriptionID id ethtypes.EthSubscriptionID
in chan interface{} in chan interface{}
out chan ethtypes.EthSubscriptionResponse out ethSubscriptionCallback
mu sync.Mutex mu sync.Mutex
filters []filter.Filter filters []filter.Filter
@ -1390,10 +1403,15 @@ func (e *ethSubscription) start(ctx context.Context) {
continue continue
} }
select { outParam, err := json.Marshal(resp)
case e.out <- resp: if err != nil {
default: log.Warnw("marshaling subscription response", "sub", e.id, "error", err)
// Skip if client is not reading responses continue
}
if err := e.out(ctx, outParam); err != nil {
log.Warnw("sending subscription response", "sub", e.id, "error", err)
continue
} }
} }
} }
@ -1405,7 +1423,6 @@ func (e *ethSubscription) stop() {
if e.quit != nil { if e.quit != nil {
e.quit() e.quit()
close(e.out)
e.quit = nil e.quit = nil
} }
} }

View File

@ -40,6 +40,7 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo
ee := &full.EthEvent{ ee := &full.EthEvent{
Chain: cs, Chain: cs,
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange), MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
SubscribtionCtx: ctx,
} }
if !cfg.EnableEthRPC || cfg.Events.DisableRealTimeFilterAPI { if !cfg.EnableEthRPC || cfg.Events.DisableRealTimeFilterAPI {

View File

@ -75,7 +75,7 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
m := mux.NewRouter() m := mux.NewRouter()
serveRpc := func(path string, hnd interface{}) { serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithServerErrors(api.RPCErrors))...) rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithReverseClient[api.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(api.RPCErrors))...)
rpcServer.Register("Filecoin", hnd) rpcServer.Register("Filecoin", hnd)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")