diff --git a/api/client/client.go b/api/client/client.go index e40e0c854..f6d662a91 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -8,36 +8,36 @@ import ( ) // NewCommonRPC creates a new http jsonrpc client. -func NewCommonRPC(addr string, requestHeader http.Header) (api.Common, error) { +func NewCommonRPC(addr string, requestHeader http.Header) (api.Common, jsonrpc.ClientCloser, error) { var res api.CommonStruct - _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", []interface{}{ &res.Internal, }, requestHeader) - return &res, err + return &res, closer, err } // NewFullNodeRPC creates a new http jsonrpc client. -func NewFullNodeRPC(addr string, requestHeader http.Header) (api.FullNode, error) { +func NewFullNodeRPC(addr string, requestHeader http.Header) (api.FullNode, jsonrpc.ClientCloser, error) { var res api.FullNodeStruct - _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", []interface{}{ &res.CommonStruct.Internal, &res.Internal, }, requestHeader) - return &res, err + return &res, closer, err } // NewStorageMinerRPC creates a new http jsonrpc client for storage miner -func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMiner, error) { +func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMiner, jsonrpc.ClientCloser, error) { var res api.StorageMinerStruct - _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", []interface{}{ &res.CommonStruct.Internal, &res.Internal, }, requestHeader) - return &res, err + return &res, closer, err } diff --git a/cli/auth.go b/cli/auth.go index f4b5ea132..e56f7676f 100644 --- a/cli/auth.go +++ b/cli/auth.go @@ -20,10 +20,12 @@ var authCreateAdminToken = &cli.Command{ Name: "create-admin-token", Usage: "Create admin token", Action: func(cctx *cli.Context) error { - napi, err := GetFullNodeAPI(cctx) + napi, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() + ctx := ReqContext(cctx) // TODO: Probably tell the user how powerful this token is diff --git a/cli/chain.go b/cli/chain.go index a2e1660f7..4eef95f5e 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -25,10 +25,11 @@ var chainHeadCmd = &cli.Command{ Name: "head", Usage: "Print chain head", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) head, err := api.ChainHead(ctx) @@ -53,10 +54,11 @@ var chainGetBlock = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) if !cctx.Args().Present() { @@ -128,10 +130,11 @@ var chainReadObjCmd = &cli.Command{ Name: "read-obj", Usage: "Read the raw bytes of an object", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) c, err := cid.Decode(cctx.Args().First()) diff --git a/cli/client.go b/cli/client.go index 625e32d3b..8b7eda72c 100644 --- a/cli/client.go +++ b/cli/client.go @@ -31,10 +31,11 @@ var clientImportCmd = &cli.Command{ Name: "import", Usage: "Import data", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) c, err := api.ClientImport(ctx, cctx.Args().First()) @@ -50,10 +51,11 @@ var clientLocalCmd = &cli.Command{ Name: "local", Usage: "List locally imported data", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) list, err := api.ClientListImports(ctx) @@ -71,10 +73,11 @@ var clientDealCmd = &cli.Command{ Name: "deal", Usage: "Initialize storage deal with a miner", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) if cctx.NArg() != 4 { @@ -128,10 +131,11 @@ var clientFindCmd = &cli.Command{ return err } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) // Check if we already have this data locally @@ -177,10 +181,11 @@ var clientRetrieveCmd = &cli.Command{ return nil } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) var payer address.Address @@ -248,10 +253,11 @@ var clientQueryAskCmd = &cli.Command{ return err } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) var pid peer.ID diff --git a/cli/cmd.go b/cli/cmd.go index 872c65bb4..5aef9509e 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api/client" + "github.com/filecoin-project/go-lotus/lib/jsonrpc" "github.com/filecoin-project/go-lotus/node/repo" ) @@ -52,7 +53,7 @@ func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { return "ws://" + addr + "/rpc/v0", headers, nil } -func GetAPI(ctx *cli.Context) (api.Common, error) { +func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) { f := "repo" if ctx.String("storagerepo") != "" { f = "storagerepo" @@ -60,25 +61,25 @@ func GetAPI(ctx *cli.Context) (api.Common, error) { addr, headers, err := getAPI(ctx, f) if err != nil { - return nil, err + return nil, nil, err } return client.NewCommonRPC(addr, headers) } -func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, error) { +func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error) { addr, headers, err := getAPI(ctx, "repo") if err != nil { - return nil, err + return nil, nil, err } return client.NewFullNodeRPC(addr, headers) } -func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, error) { +func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, error) { addr, headers, err := getAPI(ctx, "storagerepo") if err != nil { - return nil, err + return nil, nil, err } return client.NewStorageMinerRPC(addr, headers) diff --git a/cli/createminer.go b/cli/createminer.go index 47f9b38aa..bb2f8aa96 100644 --- a/cli/createminer.go +++ b/cli/createminer.go @@ -22,10 +22,11 @@ var createMinerCmd = &cli.Command{ return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID") } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() args := cctx.Args().Slice() diff --git a/cli/mpool.go b/cli/mpool.go index f37402e52..d38d938f3 100644 --- a/cli/mpool.go +++ b/cli/mpool.go @@ -18,10 +18,11 @@ var mpoolPending = &cli.Command{ Name: "pending", Usage: "Get pending messages", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) diff --git a/cli/net.go b/cli/net.go index ce254c20a..b8cbad74e 100644 --- a/cli/net.go +++ b/cli/net.go @@ -27,10 +27,11 @@ var netPeers = &cli.Command{ Name: "peers", Usage: "Print peers", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, closer, err := GetAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) peers, err := api.NetPeers(ctx) if err != nil { @@ -49,10 +50,11 @@ var netListen = &cli.Command{ Name: "listen", Usage: "List listen addresses", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, closer, err := GetAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) addrs, err := api.NetAddrsListen(ctx) @@ -71,10 +73,11 @@ var netConnect = &cli.Command{ Name: "connect", Usage: "Connect to a peer", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, closer, err := GetAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) pis, err := parseAddresses(ctx, cctx.Args().Slice()) @@ -100,10 +103,11 @@ var netId = &cli.Command{ Name: "id", Usage: "Get node identity", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, closer, err := GetAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) diff --git a/cli/paych.go b/cli/paych.go index 15091bf30..1b11d809c 100644 --- a/cli/paych.go +++ b/cli/paych.go @@ -41,10 +41,11 @@ var paychGetCmd = &cli.Command{ return fmt.Errorf("parsing amount failed: %s", err) } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -62,10 +63,11 @@ var paychListCmd = &cli.Command{ Name: "list", Usage: "List all locally registered payment channels", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -121,10 +123,11 @@ var paychVoucherCreateCmd = &cli.Command{ lane := cctx.Int("lane") - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -161,10 +164,11 @@ var paychVoucherCheckCmd = &cli.Command{ return err } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -195,10 +199,11 @@ var paychVoucherAddCmd = &cli.Command{ return err } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -230,10 +235,11 @@ var paychVoucherListCmd = &cli.Command{ return err } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -272,10 +278,11 @@ var paychVoucherBestSpendableCmd = &cli.Command{ return err } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -330,10 +337,11 @@ var paychVoucherSubmitCmd = &cli.Command{ return err } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) diff --git a/cli/send.go b/cli/send.go index 5ed9b10e8..81e40e750 100644 --- a/cli/send.go +++ b/cli/send.go @@ -2,6 +2,7 @@ package cli import ( "fmt" + "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" "gopkg.in/urfave/cli.v2" @@ -17,10 +18,11 @@ var sendCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) diff --git a/cli/state.go b/cli/state.go index 1de4511fb..7984694f4 100644 --- a/cli/state.go +++ b/cli/state.go @@ -25,10 +25,11 @@ var statePowerCmd = &cli.Command{ Name: "power", Usage: "Query network or miner power", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -58,10 +59,11 @@ var stateSectorsCmd = &cli.Command{ Name: "sectors", Usage: "Query the sector set of a miner", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -91,10 +93,11 @@ var stateProvingSetCmd = &cli.Command{ Name: "proving", Usage: "Query the proving set of a miner", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -146,10 +149,11 @@ var stateReplaySetCmd = &cli.Command{ tscids = append(tscids, c) } - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) @@ -178,10 +182,11 @@ var statePledgeCollateralCmd = &cli.Command{ Name: "pledge-collateral", Usage: "Get minimum miner pledge collateral", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) diff --git a/cli/sync.go b/cli/sync.go index 8007c6e0c..868412665 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -21,10 +21,11 @@ var syncStatusCmd = &cli.Command{ Name: "status", Usage: "check sync status", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) ss, err := api.SyncState(ctx) diff --git a/cli/version.go b/cli/version.go index 2ccdb4360..f451b4514 100644 --- a/cli/version.go +++ b/cli/version.go @@ -10,10 +10,11 @@ var versionCmd = &cli.Command{ Name: "version", Usage: "Print version", Action: func(cctx *cli.Context) error { - api, err := GetAPI(cctx) + api, closer, err := GetAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) // TODO: print more useful things diff --git a/cli/wallet.go b/cli/wallet.go index 6d70ff948..0d78dae4d 100644 --- a/cli/wallet.go +++ b/cli/wallet.go @@ -21,10 +21,11 @@ var walletNew = &cli.Command{ Name: "new", Usage: "Generate a new key of the given type (bls or secp256k1)", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) t := cctx.Args().First() @@ -47,10 +48,11 @@ var walletList = &cli.Command{ Name: "list", Usage: "List wallet address", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) addrs, err := api.WalletList(ctx) @@ -69,10 +71,11 @@ var walletBalance = &cli.Command{ Name: "balance", Usage: "get account balance", Action: func(cctx *cli.Context) error { - api, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } + defer closer() ctx := ReqContext(cctx) var addr address.Address diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 03c61f9fe..28f9d9bd7 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -72,10 +72,11 @@ var initCmd = &cli.Command{ log.Info("Trying to connect to full node RPC") - api, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config + api, closer, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config if err != nil { return err } + defer closer() ctx := lcli.ReqContext(cctx) log.Info("Checking full node version") diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index bd1f74be7..360532cad 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -2,12 +2,13 @@ package main import ( "context" - "github.com/filecoin-project/go-lotus/build" "net/http" "os" "os/signal" "syscall" + "github.com/filecoin-project/go-lotus/build" + "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" @@ -36,10 +37,11 @@ var runCmd = &cli.Command{ return xerrors.Errorf("fetching proof parameters: %w", err) } - nodeApi, err := lcli.GetFullNodeAPI(cctx) + nodeApi, ncloser, err := lcli.GetFullNodeAPI(cctx) if err != nil { return err } + defer ncloser() ctx := lcli.DaemonContext(cctx) v, err := nodeApi.Version(ctx) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 16d151b5e..b048b2f67 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -13,10 +13,11 @@ var storeGarbageCmd = &cli.Command{ Name: "store-garbage", Usage: "store random data in a sector", Action: func(cctx *cli.Context) error { - nodeApi, err := lcli.GetStorageMinerAPI(cctx) + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } + defer closer() ctx := lcli.ReqContext(cctx) sectorId, err := nodeApi.StoreGarbageData(ctx) @@ -44,10 +45,11 @@ var sectorsStatusCmd = &cli.Command{ Name: "status", Usage: "Get the seal status of a sector by its ID", Action: func(cctx *cli.Context) error { - nodeApi, err := lcli.GetStorageMinerAPI(cctx) + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } + defer closer() ctx := lcli.ReqContext(cctx) if !cctx.Args().Present() { @@ -80,10 +82,11 @@ var sectorsStagedListCmd = &cli.Command{ Name: "list-staged", // TODO: nest this under a 'staged' subcommand? idk Usage: "List staged sectors", Action: func(cctx *cli.Context) error { - nodeApi, err := lcli.GetStorageMinerAPI(cctx) + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } + defer closer() ctx := lcli.ReqContext(cctx) staged, err := nodeApi.SectorsStagedList(ctx) @@ -102,10 +105,11 @@ var sectorsStagedSealCmd = &cli.Command{ Name: "seal-staged", // TODO: nest this under a 'staged' subcommand? idk Usage: "Seal staged sectors", Action: func(cctx *cli.Context) error { - nodeApi, err := lcli.GetStorageMinerAPI(cctx) + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } + defer closer() ctx := lcli.ReqContext(cctx) return nodeApi.SectorsStagedSeal(ctx) @@ -116,10 +120,11 @@ var sectorsRefsCmd = &cli.Command{ Name: "refs", Usage: "List References to sectors", Action: func(cctx *cli.Context) error { - nodeApi, err := lcli.GetStorageMinerAPI(cctx) + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } + defer closer() ctx := lcli.ReqContext(cctx) refs, err := nodeApi.SectorsRefs(ctx) diff --git a/lib/jsonrpc/websocket.go b/lib/jsonrpc/websocket.go index 79674cc9d..8994b223f 100644 --- a/lib/jsonrpc/websocket.go +++ b/lib/jsonrpc/websocket.go @@ -420,7 +420,9 @@ func (c *wsConn) handleWsConn(ctx context.Context) { case r, ok := <-c.incoming: if !ok { if c.incomingErr != nil { - log.Warnw("websocket error", "error", c.incomingErr) + if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) { + log.Warnw("websocket error", "error", c.incomingErr) + } } return // remote closed } @@ -442,6 +444,10 @@ func (c *wsConn) handleWsConn(ctx context.Context) { } c.sendRequest(req.req) case <-c.stop: + cmsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") + if err := c.conn.WriteMessage(websocket.CloseMessage, cmsg); err != nil { + log.Warn("failed to write close message: ", err) + } if err := c.conn.Close(); err != nil { log.Warnw("websocket close error", "error", err) }