From aa611e49fdd76db047a98b71126b22f1a9ecae00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 23:12:37 +0200 Subject: [PATCH] rpcenc: Add timeout for readers --- lib/rpcenc/reader.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 2b205f0e8..346c551a1 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -11,6 +11,7 @@ import ( "reflect" "strconv" "sync" + "time" "github.com/google/uuid" logging "github.com/ipfs/go-log/v2" @@ -21,7 +22,9 @@ import ( sealing "github.com/filecoin-project/storage-fsm" ) -var log = logging.Logger("rpc") +var log = logging.Logger("rpcenc") + +var Timeout = 30 * time.Second type StreamType string const ( @@ -111,10 +114,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { wait: make(chan struct{}), } + tctx, cancel := context.WithTimeout(req.Context(), Timeout) + defer cancel() + select { case ch <- wr: - case <-req.Context().Done(): - log.Error("context error in reader stream handler (1): %v", req.Context().Err()) + case <-tctx.Done(): + close(ch) + log.Error("context error in reader stream handler (1): %v", tctx.Err()) resp.WriteHeader(500) return } @@ -158,8 +165,15 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { } readersLk.Unlock() + ctx, cancel := context.WithTimeout(ctx, Timeout) + defer cancel() + select { - case wr := <-ch: + case wr, ok := <-ch: + if !ok { + return reflect.Value{}, xerrors.Errorf("handler timed out") + } + return reflect.ValueOf(wr), nil case <-ctx.Done(): return reflect.Value{}, ctx.Err()