dont hang requests if websockets server shuts down

This commit is contained in:
whyrusleeping 2019-10-20 15:37:51 +09:00
parent 1641ee48a1
commit 6e90066a20
2 changed files with 24 additions and 9 deletions

View File

@ -33,12 +33,12 @@ func serveRPC(a api.FullNode, stop node.StopFunc, addr string) error {
sigChan := make(chan os.Signal, 2)
go func() {
<-sigChan
if err := stop(context.TODO()); err != nil {
log.Errorf("graceful shutting down failed: %s", err)
}
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
if err := stop(context.TODO()); err != nil {
log.Errorf("graceful shutting down failed: %s", err)
}
}()
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)

View File

@ -72,6 +72,7 @@ type client struct {
namespace string
requests chan clientRequest
exiting <-chan struct{}
idCtr int64
}
@ -90,6 +91,7 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe
stop := make(chan struct{})
exiting := make(chan struct{})
c.requests = make(chan clientRequest)
c.exiting = exiting
handlers := map[string]rpcHandler{}
go (&wsConn{
@ -189,14 +191,20 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
return func() reflect.Value { return retVal }, chCtor
}
func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) clientResponse {
func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) (clientResponse, error) {
rchan := make(chan clientResponse, 1)
c.requests <- clientRequest{
creq := clientRequest{
req: req,
ready: rchan,
retCh: chCtor,
}
select {
case c.requests <- creq:
case <-c.exiting:
return clientResponse{}, fmt.Errorf("websocket routine exiting")
}
var ctxDone <-chan struct{}
var resp clientResponse
@ -213,18 +221,22 @@ loop:
case <-ctxDone: // send cancel request
ctxDone = nil
c.requests <- clientRequest{
cancelReq := clientRequest{
req: request{
Jsonrpc: "2.0",
Method: wsCancel,
Params: []param{{v: reflect.ValueOf(*req.ID)}},
},
}
select {
case c.requests <- cancelReq:
case <-c.exiting:
log.Warn("failed to send request cancellation, websocket routing exited")
}
}
}
return resp
return resp, nil
}
type rpcFunc struct {
@ -314,7 +326,10 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
}
}
resp := fn.client.sendRequest(ctx, req, chCtor)
resp, err := fn.client.sendRequest(ctx, req, chCtor)
if err != nil {
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}
if resp.ID != *req.ID {
return fn.processError(xerrors.New("request and response id didn't match"))