diff --git a/cli/util/api.go b/cli/util/api.go index 3d06f9e31..1d6928c3f 100644 --- a/cli/util/api.go +++ b/cli/util/api.go @@ -359,7 +359,7 @@ func GetFullNodeAPIV1(ctx *cli.Context, opts ...GetFullNodeOption) (v1api.FullNo var closers []jsonrpc.ClientCloser for _, head := range heads { - v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header) + v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header, rpcOpts...) if err != nil { log.Warnf("Not able to establish connection to node with addr: ", head.addr) continue diff --git a/gateway/eth_sub.go b/gateway/eth_sub.go index 2f82fc32b..76d913983 100644 --- a/gateway/eth_sub.go +++ b/gateway/eth_sub.go @@ -24,11 +24,12 @@ func NewEthSubHandler() *EthSubHandler { } } -func (e *EthSubHandler) addSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error { +func (e *EthSubHandler) AddSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error { e.lk.Lock() defer e.lk.Unlock() for _, p := range e.queued[id] { + p := p // copy if err := sink(ctx, &p); err != nil { return err } @@ -38,7 +39,7 @@ func (e *EthSubHandler) addSub(ctx context.Context, id ethtypes.EthSubscriptionI return nil } -func (e *EthSubHandler) removeSub(id ethtypes.EthSubscriptionID) { +func (e *EthSubHandler) RemoveSub(id ethtypes.EthSubscriptionID) { e.lk.Lock() defer e.lk.Unlock() diff --git a/gateway/proxy_eth.go b/gateway/proxy_eth.go index 9518b2a15..4e8136835 100644 --- a/gateway/proxy_eth.go +++ b/gateway/proxy_eth.go @@ -463,7 +463,7 @@ func (gw *Node) EthSubscribe(ctx context.Context, eventType string, params *etht return ethtypes.EthSubscriptionID{}, err } - err = gw.subHnd.addSub(ctx, sub, func(ctx context.Context, response *ethtypes.EthSubscriptionResponse) error { + err = gw.subHnd.AddSub(ctx, sub, func(ctx context.Context, response *ethtypes.EthSubscriptionResponse) error { outParam, err := json.Marshal(response) if err != nil { return err @@ -502,7 +502,7 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI delete(ft.userSubscriptions, id) if gw.subHnd != nil { - gw.subHnd.removeSub(id) + gw.subHnd.RemoveSub(id) } return ok, nil diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index 5fbe1cb6a..1127ffdb2 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -438,15 +438,15 @@ func TestEthSubscribeLogsNoTopicSpec(t *testing.T) { t.Logf("actor ID address is %s", idAddr) // install filter - respCh, err := client.EthSubscribe(ctx, "logs", nil) + subId, err := client.EthSubscribe(ctx, "logs", nil) require.NoError(err) - subResponses := []ethtypes.EthSubscriptionResponse{} - go func() { - for resp := range respCh { - subResponses = append(subResponses, resp) - } - }() + var subResponses []ethtypes.EthSubscriptionResponse + err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + subResponses = append(subResponses, *resp) + return nil + }) + require.NoError(err) const iterations = 10 ethContractAddr, messages := invokeLogFourData(t, client, iterations) diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 8b92202dc..6d4ca1c12 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -47,6 +47,7 @@ import ( "github.com/filecoin-project/lotus/chain/wallet/key" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" + "github.com/filecoin-project/lotus/gateway" "github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/markets/idxprov" "github.com/filecoin-project/lotus/markets/idxprov/idxprov_test" @@ -210,7 +211,7 @@ func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble { n.genesis.accounts = append(n.genesis.accounts, genacc) } - *full = TestFullNode{t: n.t, options: options, DefaultKey: key} + *full = TestFullNode{t: n.t, options: options, DefaultKey: key, EthSubRouter: gateway.NewEthSubHandler()} n.inactive.fullnodes = append(n.inactive.fullnodes, full) return n diff --git a/itests/kit/node_full.go b/itests/kit/node_full.go index 682ae118a..3e80ed688 100644 --- a/itests/kit/node_full.go +++ b/itests/kit/node_full.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet/key" cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/gateway" "github.com/filecoin-project/lotus/node" ) @@ -46,6 +47,10 @@ type TestFullNode struct { Stop node.StopFunc + // gateway handler makes it convenient to register callbalks per topic, so we + // also use it for tests + EthSubRouter *gateway.EthSubHandler + options nodeOpts } diff --git a/itests/kit/rpc.go b/itests/kit/rpc.go index f8c2c6e53..5d40ac3e9 100644 --- a/itests/kit/rpc.go +++ b/itests/kit/rpc.go @@ -13,6 +13,8 @@ import ( manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/node" @@ -52,7 +54,12 @@ func fullRpc(t *testing.T, f *TestFullNode) (*TestFullNode, Closer) { fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String()) sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String()) - cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil) + rpcOpts := []jsonrpc.Option{ + jsonrpc.WithClientHandler("Filecoin", f.EthSubRouter), + jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"), + } + + cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, rpcOpts...) require.NoError(t, err) f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 898776d10..83ea6fcd5 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -134,6 +134,7 @@ type EthEvent struct { FilterStore filter.FilterStore SubManager *EthSubscriptionManager MaxFilterHeightRange abi.ChainEpoch + SubscribtionCtx context.Context } var _ EthEventAPI = (*EthEvent)(nil) @@ -1112,7 +1113,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") } - sub, err := e.SubManager.StartSubscription(ctx, ethCb.EthSubscription) + sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription) if err != nil { return ethtypes.EthSubscriptionID{}, err } diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index eb5afb8e6..55a79a59a 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -40,6 +40,7 @@ func EthEventAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo ee := &full.EthEvent{ Chain: cs, MaxFilterHeightRange: abi.ChainEpoch(cfg.Events.MaxFilterHeightRange), + SubscribtionCtx: ctx, } if !cfg.EnableEthRPC || cfg.Events.DisableRealTimeFilterAPI {