From 6e90066a208a3946ff95865ecb2c66a1170df00f Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 20 Oct 2019 15:37:51 +0900 Subject: [PATCH] dont hang requests if websockets server shuts down --- cmd/lotus/rpc.go | 6 +++--- lib/jsonrpc/client.go | 27 +++++++++++++++++++++------ 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 598be1f7d..d3c1d8295 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -33,12 +33,12 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr string) error { sigChan := make(chan os.Signal, 2) go func() { <-sigChan - if err := stop(context.TODO()); err != nil { - log.Errorf("graceful shutting down failed: %s", err) - } if err := srv.Shutdown(context.TODO()); err != nil { log.Errorf("shutting down RPC server failed: %s", err) } + if err := stop(context.TODO()); err != nil { + log.Errorf("graceful shutting down failed: %s", err) + } }() signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 7218d087e..360e36aef 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -72,6 +72,7 @@ type client struct { namespace string requests chan clientRequest + exiting <-chan struct{} idCtr int64 } @@ -90,6 +91,7 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe stop := make(chan struct{}) exiting := make(chan struct{}) c.requests = make(chan clientRequest) + c.exiting = exiting handlers := map[string]rpcHandler{} go (&wsConn{ @@ -189,14 +191,20 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) return func() reflect.Value { return retVal }, chCtor } -func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) clientResponse { +func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) (clientResponse, error) { rchan := make(chan clientResponse, 1) - c.requests <- clientRequest{ + creq := clientRequest{ req: req, ready: rchan, retCh: chCtor, } + select { + case c.requests <- creq: + case <-c.exiting: + return clientResponse{}, fmt.Errorf("websocket routine exiting") + } + var ctxDone <-chan struct{} var resp clientResponse @@ -213,18 +221,22 @@ loop: case <-ctxDone: // send cancel request ctxDone = nil - c.requests <- clientRequest{ - + cancelReq := clientRequest{ req: request{ Jsonrpc: "2.0", Method: wsCancel, Params: []param{{v: reflect.ValueOf(*req.ID)}}, }, } + select { + case c.requests <- cancelReq: + case <-c.exiting: + log.Warn("failed to send request cancellation, websocket routing exited") + } } } - return resp + return resp, nil } type rpcFunc struct { @@ -314,7 +326,10 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) } } - resp := fn.client.sendRequest(ctx, req, chCtor) + resp, err := fn.client.sendRequest(ctx, req, chCtor) + if err != nil { + return fn.processError(fmt.Errorf("sendRequest failed: %w", err)) + } if resp.ID != *req.ID { return fn.processError(xerrors.New("request and response id didn't match"))