rpc: Switch eth_subscribe to reverse calls

This commit is contained in:
Łukasz Magiera 2023-01-16 14:54:26 +01:00
parent af72e6f6ac
commit 6491becbe1
9 changed files with 881 additions and 814 deletions

View File

@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/filecoin-project/go-jsonrpc"
"github.com/google/uuid" "github.com/google/uuid"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -831,7 +833,7 @@ type FullNode interface {
// - logs: notify new event logs that match a criteria // - logs: notify new event logs that match a criteria
// params contains additional parameters used with the log event type // params contains additional parameters used with the log event type
// The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called. // The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called.
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) //perm:write EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) //perm:write
// Unsubscribe from a websocket subscription // Unsubscribe from a websocket subscription
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) //perm:write EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) //perm:write
@ -849,6 +851,12 @@ type FullNode interface {
RaftLeader(ctx context.Context) (peer.ID, error) //perm:read RaftLeader(ctx context.Context) (peer.ID, error) //perm:read
} }
// reverse interface to the client, called after EthSubscribe
type EthSubscriber interface {
// note: the parameter is ethtypes.EthSubscriptionResponse serialized as json object
EthSubscription(ctx context.Context, r jsonrpc.RawParams) error //rpc_method:eth_subscription notify:true
}
type StorageAsk struct { type StorageAsk struct {
Response *storagemarket.StorageAsk Response *storagemarket.StorageAsk

View File

@ -7,21 +7,13 @@ import (
"encoding/json" "encoding/json"
"time" "time"
"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v8/paych" "github.com/filecoin-project/go-state-types/builtin/v8/paych"
@ -31,7 +23,6 @@ import (
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
abinetwork "github.com/filecoin-project/go-state-types/network" abinetwork "github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/go-state-types/proof" "github.com/filecoin-project/go-state-types/proof"
apitypes "github.com/filecoin-project/lotus/api/types" apitypes "github.com/filecoin-project/lotus/api/types"
builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin" builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -44,25 +35,38 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"golang.org/x/xerrors"
) )
var ErrNotSupported = xerrors.New("method not supported") var ErrNotSupported = xerrors.New("method not supported")
type ChainIOStruct struct { type ChainIOStruct struct {
Internal struct { Internal ChainIOMethods
}
type ChainIOMethods struct {
ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) `` ChainHasObj func(p0 context.Context, p1 cid.Cid) (bool, error) ``
ChainPutObj func(p0 context.Context, p1 blocks.Block) error `` ChainPutObj func(p0 context.Context, p1 blocks.Block) error ``
ChainReadObj func(p0 context.Context, p1 cid.Cid) ([]byte, error) `` ChainReadObj func(p0 context.Context, p1 cid.Cid) ([]byte, error) ``
} }
}
type ChainIOStub struct { type ChainIOStub struct {
} }
type CommonStruct struct { type CommonStruct struct {
Internal struct { Internal CommonMethods
}
type CommonMethods struct {
AuthNew func(p0 context.Context, p1 []auth.Permission) ([]byte, error) `perm:"admin"` AuthNew func(p0 context.Context, p1 []auth.Permission) ([]byte, error) `perm:"admin"`
AuthVerify func(p0 context.Context, p1 string) ([]auth.Permission, error) `perm:"read"` AuthVerify func(p0 context.Context, p1 string) ([]auth.Permission, error) `perm:"read"`
@ -85,7 +89,6 @@ type CommonStruct struct {
Version func(p0 context.Context) (APIVersion, error) `perm:"read"` Version func(p0 context.Context) (APIVersion, error) `perm:"read"`
} }
}
type CommonStub struct { type CommonStub struct {
} }
@ -95,8 +98,10 @@ type CommonNetStruct struct {
NetStruct NetStruct
Internal struct { Internal CommonNetMethods
} }
type CommonNetMethods struct {
} }
type CommonNetStub struct { type CommonNetStub struct {
@ -105,12 +110,26 @@ type CommonNetStub struct {
NetStub NetStub
} }
type EthSubscriberStruct struct {
Internal EthSubscriberMethods
}
type EthSubscriberMethods struct {
EthSubscription func(p0 context.Context, p1 jsonrpc.RawParams) error `notify:"true"rpc_method:"eth_subscription"`
}
type EthSubscriberStub struct {
}
type FullNodeStruct struct { type FullNodeStruct struct {
CommonStruct CommonStruct
NetStruct NetStruct
Internal struct { Internal FullNodeMethods
}
type FullNodeMethods struct {
ChainBlockstoreInfo func(p0 context.Context) (map[string]interface{}, error) `perm:"read"` ChainBlockstoreInfo func(p0 context.Context) (map[string]interface{}, error) `perm:"read"`
ChainCheckBlockstore func(p0 context.Context) error `perm:"admin"` ChainCheckBlockstore func(p0 context.Context) error `perm:"admin"`
@ -281,7 +300,7 @@ type FullNodeStruct struct {
EthSendRawTransaction func(p0 context.Context, p1 ethtypes.EthBytes) (ethtypes.EthHash, error) `perm:"read"` EthSendRawTransaction func(p0 context.Context, p1 ethtypes.EthBytes) (ethtypes.EthHash, error) `perm:"read"`
EthSubscribe func(p0 context.Context, p1 string, p2 *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) `perm:"write"` EthSubscribe func(p0 context.Context, p1 string, p2 *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) `perm:"write"`
EthUninstallFilter func(p0 context.Context, p1 ethtypes.EthFilterID) (bool, error) `perm:"write"` EthUninstallFilter func(p0 context.Context, p1 ethtypes.EthFilterID) (bool, error) `perm:"write"`
@ -585,7 +604,6 @@ type FullNodeStruct struct {
Web3ClientVersion func(p0 context.Context) (string, error) `perm:"read"` Web3ClientVersion func(p0 context.Context) (string, error) `perm:"read"`
} }
}
type FullNodeStub struct { type FullNodeStub struct {
CommonStub CommonStub
@ -594,7 +612,10 @@ type FullNodeStub struct {
} }
type GatewayStruct struct { type GatewayStruct struct {
Internal struct { Internal GatewayMethods
}
type GatewayMethods struct {
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) `` ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) ``
ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) `` ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) ``
@ -739,13 +760,15 @@ type GatewayStruct struct {
WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) `` WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) ``
} }
}
type GatewayStub struct { type GatewayStub struct {
} }
type NetStruct struct { type NetStruct struct {
Internal struct { Internal NetMethods
}
type NetMethods struct {
ID func(p0 context.Context) (peer.ID, error) `perm:"read"` ID func(p0 context.Context) (peer.ID, error) `perm:"read"`
NetAddrsListen func(p0 context.Context) (peer.AddrInfo, error) `perm:"read"` NetAddrsListen func(p0 context.Context) (peer.AddrInfo, error) `perm:"read"`
@ -794,15 +817,16 @@ type NetStruct struct {
NetStat func(p0 context.Context, p1 string) (NetStat, error) `perm:"read"` NetStat func(p0 context.Context, p1 string) (NetStat, error) `perm:"read"`
} }
}
type NetStub struct { type NetStub struct {
} }
type SignableStruct struct { type SignableStruct struct {
Internal struct { Internal SignableMethods
Sign func(p0 context.Context, p1 SignFunc) error ``
} }
type SignableMethods struct {
Sign func(p0 context.Context, p1 SignFunc) error ``
} }
type SignableStub struct { type SignableStub struct {
@ -813,7 +837,10 @@ type StorageMinerStruct struct {
NetStruct NetStruct
Internal struct { Internal StorageMinerMethods
}
type StorageMinerMethods struct {
ActorAddress func(p0 context.Context) (address.Address, error) `perm:"read"` ActorAddress func(p0 context.Context) (address.Address, error) `perm:"read"`
ActorAddressConfig func(p0 context.Context) (AddressConfig, error) `perm:"read"` ActorAddressConfig func(p0 context.Context) (AddressConfig, error) `perm:"read"`
@ -1078,7 +1105,6 @@ type StorageMinerStruct struct {
WorkerStats func(p0 context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"` WorkerStats func(p0 context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"`
} }
}
type StorageMinerStub struct { type StorageMinerStub struct {
CommonStub CommonStub
@ -1087,7 +1113,10 @@ type StorageMinerStub struct {
} }
type WalletStruct struct { type WalletStruct struct {
Internal struct { Internal WalletMethods
}
type WalletMethods struct {
WalletDelete func(p0 context.Context, p1 address.Address) error `perm:"admin"` WalletDelete func(p0 context.Context, p1 address.Address) error `perm:"admin"`
WalletExport func(p0 context.Context, p1 address.Address) (*types.KeyInfo, error) `perm:"admin"` WalletExport func(p0 context.Context, p1 address.Address) (*types.KeyInfo, error) `perm:"admin"`
@ -1102,13 +1131,15 @@ type WalletStruct struct {
WalletSign func(p0 context.Context, p1 address.Address, p2 []byte, p3 MsgMeta) (*crypto.Signature, error) `perm:"admin"` WalletSign func(p0 context.Context, p1 address.Address, p2 []byte, p3 MsgMeta) (*crypto.Signature, error) `perm:"admin"`
} }
}
type WalletStub struct { type WalletStub struct {
} }
type WorkerStruct struct { type WorkerStruct struct {
Internal struct { Internal WorkerMethods
}
type WorkerMethods struct {
AddPiece func(p0 context.Context, p1 storiface.SectorRef, p2 []abi.UnpaddedPieceSize, p3 abi.UnpaddedPieceSize, p4 storiface.Data) (storiface.CallID, error) `perm:"admin"` AddPiece func(p0 context.Context, p1 storiface.SectorRef, p2 []abi.UnpaddedPieceSize, p3 abi.UnpaddedPieceSize, p4 storiface.Data) (storiface.CallID, error) `perm:"admin"`
DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data) (storiface.CallID, error) `perm:"admin"` DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data) (storiface.CallID, error) `perm:"admin"`
@ -1183,7 +1214,6 @@ type WorkerStruct struct {
WaitQuiet func(p0 context.Context) error `perm:"admin"` WaitQuiet func(p0 context.Context) error `perm:"admin"`
} }
}
type WorkerStub struct { type WorkerStub struct {
} }
@ -1342,6 +1372,17 @@ func (s *CommonStub) Version(p0 context.Context) (APIVersion, error) {
return *new(APIVersion), ErrNotSupported return *new(APIVersion), ErrNotSupported
} }
func (s *EthSubscriberStruct) EthSubscription(p0 context.Context, p1 jsonrpc.RawParams) error {
if s.Internal.EthSubscription == nil {
return ErrNotSupported
}
return s.Internal.EthSubscription(p0, p1)
}
func (s *EthSubscriberStub) EthSubscription(p0 context.Context, p1 jsonrpc.RawParams) error {
return ErrNotSupported
}
func (s *FullNodeStruct) ChainBlockstoreInfo(p0 context.Context) (map[string]interface{}, error) { func (s *FullNodeStruct) ChainBlockstoreInfo(p0 context.Context) (map[string]interface{}, error) {
if s.Internal.ChainBlockstoreInfo == nil { if s.Internal.ChainBlockstoreInfo == nil {
return *new(map[string]interface{}), ErrNotSupported return *new(map[string]interface{}), ErrNotSupported
@ -2277,15 +2318,15 @@ func (s *FullNodeStub) EthSendRawTransaction(p0 context.Context, p1 ethtypes.Eth
return *new(ethtypes.EthHash), ErrNotSupported return *new(ethtypes.EthHash), ErrNotSupported
} }
func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 string, p2 *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) { func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 string, p2 *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) {
if s.Internal.EthSubscribe == nil { if s.Internal.EthSubscribe == nil {
return nil, ErrNotSupported return *new(ethtypes.EthSubscriptionID), ErrNotSupported
} }
return s.Internal.EthSubscribe(p0, p1, p2) return s.Internal.EthSubscribe(p0, p1, p2)
} }
func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 string, p2 *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) { func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 string, p2 *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) {
return nil, ErrNotSupported return *new(ethtypes.EthSubscriptionID), ErrNotSupported
} }
func (s *FullNodeStruct) EthUninstallFilter(p0 context.Context, p1 ethtypes.EthFilterID) (bool, error) { func (s *FullNodeStruct) EthUninstallFilter(p0 context.Context, p1 ethtypes.EthFilterID) (bool, error) {
@ -6955,6 +6996,7 @@ func (s *WorkerStub) WaitQuiet(p0 context.Context) error {
var _ ChainIO = new(ChainIOStruct) var _ ChainIO = new(ChainIOStruct)
var _ Common = new(CommonStruct) var _ Common = new(CommonStruct)
var _ CommonNet = new(CommonNetStruct) var _ CommonNet = new(CommonNetStruct)
var _ EthSubscriber = new(EthSubscriberStruct)
var _ FullNode = new(FullNodeStruct) var _ FullNode = new(FullNodeStruct)
var _ Gateway = new(GatewayStruct) var _ Gateway = new(GatewayStruct)
var _ Net = new(NetStruct) var _ Net = new(NetStruct)

View File

@ -5,11 +5,6 @@ package v0api
import ( import (
"context" "context"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
@ -22,7 +17,6 @@ import (
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
abinetwork "github.com/filecoin-project/go-state-types/network" abinetwork "github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types" apitypes "github.com/filecoin-project/lotus/api/types"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -30,6 +24,10 @@ import (
marketevents "github.com/filecoin-project/lotus/markets/loggers" marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports" "github.com/filecoin-project/lotus/node/repo/imports"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"golang.org/x/xerrors"
) )
var ErrNotSupported = xerrors.New("method not supported") var ErrNotSupported = xerrors.New("method not supported")
@ -39,7 +37,10 @@ type FullNodeStruct struct {
NetStruct NetStruct
Internal struct { Internal FullNodeMethods
}
type FullNodeMethods struct {
BeaconGetEntry func(p0 context.Context, p1 abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"` BeaconGetEntry func(p0 context.Context, p1 abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"`
ChainDeleteObj func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` ChainDeleteObj func(p0 context.Context, p1 cid.Cid) error `perm:"admin"`
@ -416,7 +417,6 @@ type FullNodeStruct struct {
WalletVerify func(p0 context.Context, p1 address.Address, p2 []byte, p3 *crypto.Signature) (bool, error) `perm:"read"` WalletVerify func(p0 context.Context, p1 address.Address, p2 []byte, p3 *crypto.Signature) (bool, error) `perm:"read"`
} }
}
type FullNodeStub struct { type FullNodeStub struct {
CommonStub CommonStub
@ -425,7 +425,10 @@ type FullNodeStub struct {
} }
type GatewayStruct struct { type GatewayStruct struct {
Internal struct { Internal GatewayMethods
}
type GatewayMethods struct {
ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) `` ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*api.BlockMessages, error) ``
ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) `` ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) ``
@ -490,7 +493,6 @@ type GatewayStruct struct {
WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) `` WalletBalance func(p0 context.Context, p1 address.Address) (types.BigInt, error) ``
} }
}
type GatewayStub struct { type GatewayStub struct {
} }

View File

@ -34,7 +34,7 @@ func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinu
m := mux.NewRouter() m := mux.NewRouter()
serveRpc := func(path string, hnd interface{}) { serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithServerErrors(lapi.RPCErrors))...) rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))...)
rpcServer.Register("Filecoin", hnd) rpcServer.Register("Filecoin", hnd)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")

View File

@ -259,7 +259,7 @@ func generate(path, pkg, outpkg, outfile string) error {
if len(tf) != 2 { if len(tf) != 2 {
continue continue
} }
if tf[0] != "perm" { // todo: allow more tag types if tf[0] != "perm" && tf[0] != "rpc_method" && tf[0] != "notify" { // todo: allow more tag types
continue continue
} }
info.Methods[mname].Tags[tf[0]] = tf info.Methods[mname].Tags[tf[0]] = tf
@ -302,12 +302,14 @@ type {{.Num}}Struct struct {
{{range .Include}} {{range .Include}}
{{.}}Struct {{.}}Struct
{{end}} {{end}}
Internal struct { Internal {{.Num}}Methods
}
type {{.Num}}Methods struct {
{{range .Methods}} {{range .Methods}}
{{.Num}} func({{.NamedParams}}) ({{.Results}}) `+"`"+`{{range .Tags}}{{index . 0}}:"{{index . 1}}"{{end}}`+"`"+` {{.Num}} func({{.NamedParams}}) ({{.Results}}) `+"`"+`{{range .Tags}}{{index . 0}}:"{{index . 1}}"{{end}}`+"`"+`
{{end}} {{end}}
} }
}
type {{.Num}}Stub struct { type {{.Num}}Stub struct {
{{range .Include}} {{range .Include}}

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.25.2 github.com/filecoin-project/go-fil-markets v1.25.2
github.com/filecoin-project/go-jsonrpc v0.1.9 github.com/filecoin-project/go-jsonrpc v0.1.10-0.20230116095704-da92324a76e1
github.com/filecoin-project/go-legs v0.4.4 github.com/filecoin-project/go-legs v0.4.4
github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-paramfetch v0.0.4

2
go.sum
View File

@ -342,6 +342,8 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AG
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g=
github.com/filecoin-project/go-jsonrpc v0.1.9 h1:HRWLxo7HAWzI3xZGeFG4LZJoYpms+Q+8kwmMTLnyS3A= github.com/filecoin-project/go-jsonrpc v0.1.9 h1:HRWLxo7HAWzI3xZGeFG4LZJoYpms+Q+8kwmMTLnyS3A=
github.com/filecoin-project/go-jsonrpc v0.1.9/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-jsonrpc v0.1.9/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
github.com/filecoin-project/go-jsonrpc v0.1.10-0.20230116095704-da92324a76e1 h1:GcF3gSvesv1epB2SrTfalrYvhFzT1UnmSS1Bh+jprQY=
github.com/filecoin-project/go-jsonrpc v0.1.10-0.20230116095704-da92324a76e1/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
github.com/filecoin-project/go-legs v0.4.4 h1:mpMmAOOnamaz0CV9rgeKhEWA8j9kMC+f+UGCGrxKaZo= github.com/filecoin-project/go-legs v0.4.4 h1:mpMmAOOnamaz0CV9rgeKhEWA8j9kMC+f+UGCGrxKaZo=
github.com/filecoin-project/go-legs v0.4.4/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s= github.com/filecoin-project/go-legs v0.4.4/go.mod h1:JQ3hA6xpJdbR8euZ2rO0jkxaMxeidXf0LDnVuqPAe9s=
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=

View File

@ -3,8 +3,10 @@ package full
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/go-jsonrpc"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -75,7 +77,7 @@ type EthEventAPI interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
} }
@ -1100,16 +1102,19 @@ const (
EthSubscribeEventTypeLogs = "logs" EthSubscribeEventTypeLogs = "logs"
) )
func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error) { func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error) {
if e.SubManager == nil { if e.SubManager == nil {
return nil, api.ErrNotSupported return ethtypes.EthSubscriptionID{}, api.ErrNotSupported
} }
// Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the
// method to be "eth_subscription". This probably doesn't matter in practice.
sub, err := e.SubManager.StartSubscription(ctx) ethCb, ok := jsonrpc.ExtractReverseClient[api.EthSubscriberMethods](ctx)
if !ok {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
}
sub, err := e.SubManager.StartSubscription(ctx, ethCb.EthSubscription)
if err != nil { if err != nil {
return nil, err return ethtypes.EthSubscriptionID{}, err
} }
switch eventType { switch eventType {
@ -1118,7 +1123,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
if err != nil { if err != nil {
// clean up any previous filters added and stop the sub // clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id) _, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err return ethtypes.EthSubscriptionID{}, err
} }
sub.addFilter(ctx, f) sub.addFilter(ctx, f)
@ -1138,14 +1143,14 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
if err != nil { if err != nil {
// clean up any previous filters added and stop the sub // clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id) _, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err return ethtypes.EthSubscriptionID{}, err
} }
sub.addFilter(ctx, f) sub.addFilter(ctx, f)
default: default:
return nil, xerrors.Errorf("unsupported event type: %s", eventType) return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", eventType)
} }
return sub.out, nil return sub.id, nil
} }
func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) { func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error) {
@ -1294,7 +1299,7 @@ type EthSubscriptionManager struct {
subs map[ethtypes.EthSubscriptionID]*ethSubscription subs map[ethtypes.EthSubscriptionID]*ethSubscription
} }
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { // nolint func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint
rawid, err := uuid.NewRandom() rawid, err := uuid.NewRandom()
if err != nil { if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err) return nil, xerrors.Errorf("new uuid: %w", err)
@ -1310,7 +1315,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
ChainAPI: e.ChainAPI, ChainAPI: e.ChainAPI,
id: id, id: id,
in: make(chan interface{}, 200), in: make(chan interface{}, 200),
out: make(chan ethtypes.EthSubscriptionResponse, 20), out: out,
quit: quit, quit: quit,
} }
@ -1340,13 +1345,15 @@ func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtyp
return sub.filters, nil return sub.filters, nil
} }
type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error
type ethSubscription struct { type ethSubscription struct {
Chain *store.ChainStore Chain *store.ChainStore
StateAPI StateAPI StateAPI StateAPI
ChainAPI ChainAPI ChainAPI ChainAPI
id ethtypes.EthSubscriptionID id ethtypes.EthSubscriptionID
in chan interface{} in chan interface{}
out chan ethtypes.EthSubscriptionResponse out ethSubscriptionCallback
mu sync.Mutex mu sync.Mutex
filters []filter.Filter filters []filter.Filter
@ -1390,10 +1397,15 @@ func (e *ethSubscription) start(ctx context.Context) {
continue continue
} }
select { outParam, err := json.Marshal(resp)
case e.out <- resp: if err != nil {
default: log.Warnw("marshaling subscription response", "sub", e.id, "error", err)
// Skip if client is not reading responses continue
}
if err := e.out(ctx, outParam); err != nil {
log.Warnw("sending subscription response", "sub", e.id, "error", err)
continue
} }
} }
} }
@ -1405,7 +1417,6 @@ func (e *ethSubscription) stop() {
if e.quit != nil { if e.quit != nil {
e.quit() e.quit()
close(e.out)
e.quit = nil e.quit = nil
} }
} }

View File

@ -75,7 +75,7 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
m := mux.NewRouter() m := mux.NewRouter()
serveRpc := func(path string, hnd interface{}) { serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithServerErrors(api.RPCErrors))...) rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithReverseClient[api.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(api.RPCErrors))...)
rpcServer.Register("Filecoin", hnd) rpcServer.Register("Filecoin", hnd)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")