itests: Fix TestEthSubscribeLogs

This commit is contained in:
Łukasz Magiera 2023-01-26 15:20:49 +01:00
parent 1286d76988
commit 965b1cf03c
9 changed files with 31 additions and 15 deletions

View File

@ -359,7 +359,7 @@ func GetFullNodeAPIV1(ctx *cli.Context, opts ...GetFullNodeOption) (v1api.FullNo
var closers []jsonrpc.ClientCloser var closers []jsonrpc.ClientCloser
for _, head := range heads { for _, head := range heads {
v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header) v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header, rpcOpts...)
if err != nil { if err != nil {
log.Warnf("Not able to establish connection to node with addr: ", head.addr) log.Warnf("Not able to establish connection to node with addr: ", head.addr)
continue continue

View File

@ -24,11 +24,12 @@ func NewEthSubHandler() *EthSubHandler {
} }
} }
func (e *EthSubHandler) addSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error { func (e *EthSubHandler) AddSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error {
e.lk.Lock() e.lk.Lock()
defer e.lk.Unlock() defer e.lk.Unlock()
for _, p := range e.queued[id] { for _, p := range e.queued[id] {
p := p // copy
if err := sink(ctx, &p); err != nil { if err := sink(ctx, &p); err != nil {
return err return err
} }
@ -38,7 +39,7 @@ func (e *EthSubHandler) addSub(ctx context.Context, id ethtypes.EthSubscriptionI
return nil return nil
} }
func (e *EthSubHandler) removeSub(id ethtypes.EthSubscriptionID) { func (e *EthSubHandler) RemoveSub(id ethtypes.EthSubscriptionID) {
e.lk.Lock() e.lk.Lock()
defer e.lk.Unlock() defer e.lk.Unlock()

View File

@ -463,7 +463,7 @@ func (gw *Node) EthSubscribe(ctx context.Context, eventType string, params *etht
return ethtypes.EthSubscriptionID{}, err return ethtypes.EthSubscriptionID{}, err
} }
err = gw.subHnd.addSub(ctx, sub, func(ctx context.Context, response *ethtypes.EthSubscriptionResponse) error { err = gw.subHnd.AddSub(ctx, sub, func(ctx context.Context, response *ethtypes.EthSubscriptionResponse) error {
outParam, err := json.Marshal(response) outParam, err := json.Marshal(response)
if err != nil { if err != nil {
return err return err
@ -502,7 +502,7 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI
delete(ft.userSubscriptions, id) delete(ft.userSubscriptions, id)
if gw.subHnd != nil { if gw.subHnd != nil {
gw.subHnd.removeSub(id) gw.subHnd.RemoveSub(id)
} }
return ok, nil return ok, nil

View File

@ -438,15 +438,15 @@ func TestEthSubscribeLogsNoTopicSpec(t *testing.T) {
t.Logf("actor ID address is %s", idAddr) t.Logf("actor ID address is %s", idAddr)
// install filter // install filter
respCh, err := client.EthSubscribe(ctx, "logs", nil) subId, err := client.EthSubscribe(ctx, "logs", nil)
require.NoError(err) require.NoError(err)
subResponses := []ethtypes.EthSubscriptionResponse{} var subResponses []ethtypes.EthSubscriptionResponse
go func() { err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
for resp := range respCh { subResponses = append(subResponses, *resp)
subResponses = append(subResponses, resp) return nil
} })
}() require.NoError(err)
const iterations = 10 const iterations = 10
ethContractAddr, messages := invokeLogFourData(t, client, iterations) ethContractAddr, messages := invokeLogFourData(t, client, iterations)

View File

@ -47,6 +47,7 @@ import (
"github.com/filecoin-project/lotus/chain/wallet/key" "github.com/filecoin-project/lotus/chain/wallet/key"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
"github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/markets/idxprov" "github.com/filecoin-project/lotus/markets/idxprov"
"github.com/filecoin-project/lotus/markets/idxprov/idxprov_test" "github.com/filecoin-project/lotus/markets/idxprov/idxprov_test"
@ -210,7 +211,7 @@ func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble {
n.genesis.accounts = append(n.genesis.accounts, genacc) n.genesis.accounts = append(n.genesis.accounts, genacc)
} }
*full = TestFullNode{t: n.t, options: options, DefaultKey: key} *full = TestFullNode{t: n.t, options: options, DefaultKey: key, EthSubRouter: gateway.NewEthSubHandler()}
n.inactive.fullnodes = append(n.inactive.fullnodes, full) n.inactive.fullnodes = append(n.inactive.fullnodes, full)
return n return n

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet/key" "github.com/filecoin-project/lotus/chain/wallet/key"
cliutil "github.com/filecoin-project/lotus/cli/util" cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/gateway"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
) )
@ -46,6 +47,10 @@ type TestFullNode struct {
Stop node.StopFunc Stop node.StopFunc
// gateway handler makes it convenient to register callbalks per topic, so we
// also use it for tests
EthSubRouter *gateway.EthSubHandler
options nodeOpts options nodeOpts
} }

View File

@ -13,6 +13,8 @@ import (
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
@ -52,7 +54,12 @@ func fullRpc(t *testing.T, f *TestFullNode) (*TestFullNode, Closer) {
fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String()) fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String()) sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String())
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil) rpcOpts := []jsonrpc.Option{
jsonrpc.WithClientHandler("Filecoin", f.EthSubRouter),
jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"),
}
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, rpcOpts...)
require.NoError(t, err) require.NoError(t, err)
f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl

View File

@ -134,6 +134,7 @@ type EthEvent struct {
FilterStore filter.FilterStore FilterStore filter.FilterStore
SubManager *EthSubscriptionManager SubManager *EthSubscriptionManager
MaxFilterHeightRange abi.ChainEpoch MaxFilterHeightRange abi.ChainEpoch
SubscribtionCtx context.Context
} }
var _ EthEventAPI = (*EthEvent)(nil) var _ EthEventAPI = (*EthEvent)(nil)
@ -1112,7 +1113,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
} }
sub, err := e.SubManager.StartSubscription(ctx, ethCb.EthSubscription) sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription)
if err != nil { if err != nil {
return ethtypes.EthSubscriptionID{}, err return ethtypes.EthSubscriptionID{}, err
} }

View File

@ -40,6 +40,7 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo
ee := &full.EthEvent{ ee := &full.EthEvent{
Chain: cs, Chain: cs,
MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange), MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange),
SubscribtionCtx: ctx,
} }
if !cfg.EnableEthRPC || cfg.Events.DisableRealTimeFilterAPI { if !cfg.EnableEthRPC || cfg.Events.DisableRealTimeFilterAPI {