Merge pull request #428 from filecoin-project/fix/jsonrpc-ws-exiting

dont hang requests if websockets server shuts down
This commit is contained in:
Łukasz Magiera 2019-10-21 10:12:41 +02:00 committed by GitHub
commit 31590fcd30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -72,6 +72,7 @@ type client struct {
namespace string namespace string
requests chan clientRequest requests chan clientRequest
exiting <-chan struct{}
idCtr int64 idCtr int64
} }
@ -90,6 +91,7 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe
stop := make(chan struct{}) stop := make(chan struct{})
exiting := make(chan struct{}) exiting := make(chan struct{})
c.requests = make(chan clientRequest) c.requests = make(chan clientRequest)
c.exiting = exiting
handlers := map[string]rpcHandler{} handlers := map[string]rpcHandler{}
go (&wsConn{ go (&wsConn{
@ -189,14 +191,20 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
return func() reflect.Value { return retVal }, chCtor return func() reflect.Value { return retVal }, chCtor
} }
func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) clientResponse { func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) (clientResponse, error) {
rchan := make(chan clientResponse, 1) rchan := make(chan clientResponse, 1)
c.requests <- clientRequest{ creq := clientRequest{
req: req, req: req,
ready: rchan, ready: rchan,
retCh: chCtor, retCh: chCtor,
} }
select {
case c.requests <- creq:
case <-c.exiting:
return clientResponse{}, fmt.Errorf("websocket routine exiting")
}
var ctxDone <-chan struct{} var ctxDone <-chan struct{}
var resp clientResponse var resp clientResponse
@ -213,18 +221,22 @@ loop:
case <-ctxDone: // send cancel request case <-ctxDone: // send cancel request
ctxDone = nil ctxDone = nil
c.requests <- clientRequest{ cancelReq := clientRequest{
req: request{ req: request{
Jsonrpc: "2.0", Jsonrpc: "2.0",
Method: wsCancel, Method: wsCancel,
Params: []param{{v: reflect.ValueOf(*req.ID)}}, Params: []param{{v: reflect.ValueOf(*req.ID)}},
}, },
} }
select {
case c.requests <- cancelReq:
case <-c.exiting:
log.Warn("failed to send request cancellation, websocket routing exited")
}
} }
} }
return resp return resp, nil
} }
type rpcFunc struct { type rpcFunc struct {
@ -314,7 +326,10 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
} }
} }
resp := fn.client.sendRequest(ctx, req, chCtor) resp, err := fn.client.sendRequest(ctx, req, chCtor)
if err != nil {
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}
if resp.ID != *req.ID { if resp.ID != *req.ID {
return fn.processError(xerrors.New("request and response id didn't match")) return fn.processError(xerrors.New("request and response id didn't match"))