Merge pull request #273 from filecoin-project/fix/websocket-closing

fix websocket closing
This commit is contained in:
Whyrusleeping 2019-10-04 12:41:53 -06:00 committed by GitHub
commit 238142bc23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 119 additions and 63 deletions

View File

@ -8,36 +8,36 @@ import (
)
// NewCommonRPC creates a new http jsonrpc client.
func NewCommonRPC(addr string, requestHeader http.Header) (api.Common, error) {
func NewCommonRPC(addr string, requestHeader http.Header) (api.Common, jsonrpc.ClientCloser, error) {
var res api.CommonStruct
_, err := jsonrpc.NewMergeClient(addr, "Filecoin",
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
[]interface{}{
&res.Internal,
}, requestHeader)
return &res, err
return &res, closer, err
}
// NewFullNodeRPC creates a new http jsonrpc client.
func NewFullNodeRPC(addr string, requestHeader http.Header) (api.FullNode, error) {
func NewFullNodeRPC(addr string, requestHeader http.Header) (api.FullNode, jsonrpc.ClientCloser, error) {
var res api.FullNodeStruct
_, err := jsonrpc.NewMergeClient(addr, "Filecoin",
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
[]interface{}{
&res.CommonStruct.Internal,
&res.Internal,
}, requestHeader)
return &res, err
return &res, closer, err
}
// NewStorageMinerRPC creates a new http jsonrpc client for storage miner
func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMiner, error) {
func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMiner, jsonrpc.ClientCloser, error) {
var res api.StorageMinerStruct
_, err := jsonrpc.NewMergeClient(addr, "Filecoin",
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
[]interface{}{
&res.CommonStruct.Internal,
&res.Internal,
}, requestHeader)
return &res, err
return &res, closer, err
}

View File

@ -20,10 +20,12 @@ var authCreateAdminToken = &cli.Command{
Name: "create-admin-token",
Usage: "Create admin token",
Action: func(cctx *cli.Context) error {
napi, err := GetFullNodeAPI(cctx)
napi, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
// TODO: Probably tell the user how powerful this token is

View File

@ -25,10 +25,11 @@ var chainHeadCmd = &cli.Command{
Name: "head",
Usage: "Print chain head",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
head, err := api.ChainHead(ctx)
@ -53,10 +54,11 @@ var chainGetBlock = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if !cctx.Args().Present() {
@ -128,10 +130,11 @@ var chainReadObjCmd = &cli.Command{
Name: "read-obj",
Usage: "Read the raw bytes of an object",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
c, err := cid.Decode(cctx.Args().First())

View File

@ -31,10 +31,11 @@ var clientImportCmd = &cli.Command{
Name: "import",
Usage: "Import data",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
c, err := api.ClientImport(ctx, cctx.Args().First())
@ -50,10 +51,11 @@ var clientLocalCmd = &cli.Command{
Name: "local",
Usage: "List locally imported data",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
list, err := api.ClientListImports(ctx)
@ -71,10 +73,11 @@ var clientDealCmd = &cli.Command{
Name: "deal",
Usage: "Initialize storage deal with a miner",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if cctx.NArg() != 4 {
@ -128,10 +131,11 @@ var clientFindCmd = &cli.Command{
return err
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
// Check if we already have this data locally
@ -177,10 +181,11 @@ var clientRetrieveCmd = &cli.Command{
return nil
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var payer address.Address
@ -248,10 +253,11 @@ var clientQueryAskCmd = &cli.Command{
return err
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var pid peer.ID

View File

@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/api/client"
"github.com/filecoin-project/go-lotus/lib/jsonrpc"
"github.com/filecoin-project/go-lotus/node/repo"
)
@ -52,7 +53,7 @@ func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) {
return "ws://" + addr + "/rpc/v0", headers, nil
}
func GetAPI(ctx *cli.Context) (api.Common, error) {
func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) {
f := "repo"
if ctx.String("storagerepo") != "" {
f = "storagerepo"
@ -60,25 +61,25 @@ func GetAPI(ctx *cli.Context) (api.Common, error) {
addr, headers, err := getAPI(ctx, f)
if err != nil {
return nil, err
return nil, nil, err
}
return client.NewCommonRPC(addr, headers)
}
func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, error) {
func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(ctx, "repo")
if err != nil {
return nil, err
return nil, nil, err
}
return client.NewFullNodeRPC(addr, headers)
}
func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, error) {
func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(ctx, "storagerepo")
if err != nil {
return nil, err
return nil, nil, err
}
return client.NewStorageMinerRPC(addr, headers)

View File

@ -22,10 +22,11 @@ var createMinerCmd = &cli.Command{
return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID")
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
args := cctx.Args().Slice()

View File

@ -18,10 +18,11 @@ var mpoolPending = &cli.Command{
Name: "pending",
Usage: "Get pending messages",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

View File

@ -27,10 +27,11 @@ var netPeers = &cli.Command{
Name: "peers",
Usage: "Print peers",
Action: func(cctx *cli.Context) error {
api, err := GetAPI(cctx)
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
peers, err := api.NetPeers(ctx)
if err != nil {
@ -49,10 +50,11 @@ var netListen = &cli.Command{
Name: "listen",
Usage: "List listen addresses",
Action: func(cctx *cli.Context) error {
api, err := GetAPI(cctx)
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
addrs, err := api.NetAddrsListen(ctx)
@ -71,10 +73,11 @@ var netConnect = &cli.Command{
Name: "connect",
Usage: "Connect to a peer",
Action: func(cctx *cli.Context) error {
api, err := GetAPI(cctx)
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
pis, err := parseAddresses(ctx, cctx.Args().Slice())
@ -100,10 +103,11 @@ var netId = &cli.Command{
Name: "id",
Usage: "Get node identity",
Action: func(cctx *cli.Context) error {
api, err := GetAPI(cctx)
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

View File

@ -41,10 +41,11 @@ var paychGetCmd = &cli.Command{
return fmt.Errorf("parsing amount failed: %s", err)
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -62,10 +63,11 @@ var paychListCmd = &cli.Command{
Name: "list",
Usage: "List all locally registered payment channels",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -121,10 +123,11 @@ var paychVoucherCreateCmd = &cli.Command{
lane := cctx.Int("lane")
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -161,10 +164,11 @@ var paychVoucherCheckCmd = &cli.Command{
return err
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -195,10 +199,11 @@ var paychVoucherAddCmd = &cli.Command{
return err
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -230,10 +235,11 @@ var paychVoucherListCmd = &cli.Command{
return err
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -272,10 +278,11 @@ var paychVoucherBestSpendableCmd = &cli.Command{
return err
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -330,10 +337,11 @@ var paychVoucherSubmitCmd = &cli.Command{
return err
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

View File

@ -2,6 +2,7 @@ package cli
import (
"fmt"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"gopkg.in/urfave/cli.v2"
@ -17,10 +18,11 @@ var sendCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

View File

@ -25,10 +25,11 @@ var statePowerCmd = &cli.Command{
Name: "power",
Usage: "Query network or miner power",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -58,10 +59,11 @@ var stateSectorsCmd = &cli.Command{
Name: "sectors",
Usage: "Query the sector set of a miner",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -91,10 +93,11 @@ var stateProvingSetCmd = &cli.Command{
Name: "proving",
Usage: "Query the proving set of a miner",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -146,10 +149,11 @@ var stateReplaySetCmd = &cli.Command{
tscids = append(tscids, c)
}
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
@ -178,10 +182,11 @@ var statePledgeCollateralCmd = &cli.Command{
Name: "pledge-collateral",
Usage: "Get minimum miner pledge collateral",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

View File

@ -21,10 +21,11 @@ var syncStatusCmd = &cli.Command{
Name: "status",
Usage: "check sync status",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
ss, err := api.SyncState(ctx)

View File

@ -10,10 +10,11 @@ var versionCmd = &cli.Command{
Name: "version",
Usage: "Print version",
Action: func(cctx *cli.Context) error {
api, err := GetAPI(cctx)
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
// TODO: print more useful things

View File

@ -21,10 +21,11 @@ var walletNew = &cli.Command{
Name: "new",
Usage: "Generate a new key of the given type (bls or secp256k1)",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
t := cctx.Args().First()
@ -47,10 +48,11 @@ var walletList = &cli.Command{
Name: "list",
Usage: "List wallet address",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
addrs, err := api.WalletList(ctx)
@ -69,10 +71,11 @@ var walletBalance = &cli.Command{
Name: "balance",
Usage: "get account balance",
Action: func(cctx *cli.Context) error {
api, err := GetFullNodeAPI(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var addr address.Address

View File

@ -64,10 +64,11 @@ var runCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
nodeApi, err := lcli.GetFullNodeAPI(cctx)
nodeApi, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
v, err := nodeApi.Version(ctx)

View File

@ -72,10 +72,11 @@ var initCmd = &cli.Command{
log.Info("Trying to connect to full node RPC")
api, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config
api, closer, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
log.Info("Checking full node version")

View File

@ -2,12 +2,13 @@ package main
import (
"context"
"github.com/filecoin-project/go-lotus/build"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/filecoin-project/go-lotus/build"
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
@ -36,10 +37,11 @@ var runCmd = &cli.Command{
return xerrors.Errorf("fetching proof parameters: %w", err)
}
nodeApi, err := lcli.GetFullNodeAPI(cctx)
nodeApi, ncloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer ncloser()
ctx := lcli.DaemonContext(cctx)
v, err := nodeApi.Version(ctx)

View File

@ -13,10 +13,11 @@ var storeGarbageCmd = &cli.Command{
Name: "store-garbage",
Usage: "store random data in a sector",
Action: func(cctx *cli.Context) error {
nodeApi, err := lcli.GetStorageMinerAPI(cctx)
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
sectorId, err := nodeApi.StoreGarbageData(ctx)
@ -44,10 +45,11 @@ var sectorsStatusCmd = &cli.Command{
Name: "status",
Usage: "Get the seal status of a sector by its ID",
Action: func(cctx *cli.Context) error {
nodeApi, err := lcli.GetStorageMinerAPI(cctx)
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if !cctx.Args().Present() {
@ -80,10 +82,11 @@ var sectorsStagedListCmd = &cli.Command{
Name: "list-staged", // TODO: nest this under a 'staged' subcommand? idk
Usage: "List staged sectors",
Action: func(cctx *cli.Context) error {
nodeApi, err := lcli.GetStorageMinerAPI(cctx)
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
staged, err := nodeApi.SectorsStagedList(ctx)
@ -102,10 +105,11 @@ var sectorsStagedSealCmd = &cli.Command{
Name: "seal-staged", // TODO: nest this under a 'staged' subcommand? idk
Usage: "Seal staged sectors",
Action: func(cctx *cli.Context) error {
nodeApi, err := lcli.GetStorageMinerAPI(cctx)
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return nodeApi.SectorsStagedSeal(ctx)
@ -116,10 +120,11 @@ var sectorsRefsCmd = &cli.Command{
Name: "refs",
Usage: "List References to sectors",
Action: func(cctx *cli.Context) error {
nodeApi, err := lcli.GetStorageMinerAPI(cctx)
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
refs, err := nodeApi.SectorsRefs(ctx)

View File

@ -420,8 +420,10 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
case r, ok := <-c.incoming:
if !ok {
if c.incomingErr != nil {
if !websocket.IsCloseError(c.incomingErr, websocket.CloseNormalClosure) {
log.Warnw("websocket error", "error", c.incomingErr)
}
}
return // remote closed
}
@ -442,9 +444,15 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
}
c.sendRequest(req.req)
case <-c.stop:
c.writeLk.Lock()
cmsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")
if err := c.conn.WriteMessage(websocket.CloseMessage, cmsg); err != nil {
log.Warn("failed to write close message: ", err)
}
if err := c.conn.Close(); err != nil {
log.Warnw("websocket close error", "error", err)
}
c.writeLk.Unlock()
return
}
}

View File

@ -4,12 +4,13 @@ import (
"bytes"
"context"
"crypto/rand"
"github.com/libp2p/go-libp2p-core/crypto"
"io/ioutil"
"net/http/httptest"
"os"
"testing"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
@ -193,7 +194,7 @@ func rpcBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test
testServ := httptest.NewServer(rpcServer) // todo: close
var err error
fulls[i].FullNode, err = client.NewFullNodeRPC("ws://"+testServ.Listener.Addr().String(), nil)
fulls[i].FullNode, _, err = client.NewFullNodeRPC("ws://"+testServ.Listener.Addr().String(), nil)
if err != nil {
t.Fatal(err)
}
@ -206,7 +207,7 @@ func rpcBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test
testServ := httptest.NewServer(rpcServer) // todo: close
var err error
storers[i].StorageMiner, err = client.NewStorageMinerRPC("ws://"+testServ.Listener.Addr().String(), nil)
storers[i].StorageMiner, _, err = client.NewStorageMinerRPC("ws://"+testServ.Listener.Addr().String(), nil)
if err != nil {
t.Fatal(err)
}