rpc: honour pending requests before tearing conn down (#3814)

This commit is contained in:
Péter Szilágyi 2017-03-24 13:07:12 +02:00 committed by Felix Lange
parent 37e252587a
commit 1018bf6a00

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -143,6 +144,8 @@ func hasOption(option CodecOption, options []CodecOption) bool {
// requests until the codec returns an error when reading a request (in most cases // requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false. // an EOF). It executes requests in parallel when singleShot is false.
func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error { func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
var pend sync.WaitGroup
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
const size = 64 << 10 const size = 64 << 10
@ -150,7 +153,6 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
buf = buf[:runtime.Stack(buf, false)] buf = buf[:runtime.Stack(buf, false)]
log.Error(fmt.Sprint(string(buf))) log.Error(fmt.Sprint(string(buf)))
} }
s.codecsMu.Lock() s.codecsMu.Lock()
s.codecs.Remove(codec) s.codecs.Remove(codec)
s.codecsMu.Unlock() s.codecsMu.Unlock()
@ -179,8 +181,13 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
for atomic.LoadInt32(&s.run) == 1 { for atomic.LoadInt32(&s.run) == 1 {
reqs, batch, err := s.readRequest(codec) reqs, batch, err := s.readRequest(codec)
if err != nil { if err != nil {
// If a parsing error occurred, send an error
if err.Error() != "EOF" {
log.Debug(fmt.Sprintf("read error %v\n", err)) log.Debug(fmt.Sprintf("read error %v\n", err))
codec.Write(codec.CreateErrorResponse(nil, err)) codec.Write(codec.CreateErrorResponse(nil, err))
}
// Error or end of stream, wait for requests and tear down
pend.Wait()
return nil return nil
} }
@ -199,20 +206,27 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
} }
return nil return nil
} }
// If a single shot request is executing, run and return immediately
if singleShot && batch { if singleShot {
if batch {
s.execBatch(ctx, codec, reqs) s.execBatch(ctx, codec, reqs)
return nil
} else if singleShot && !batch {
s.exec(ctx, codec, reqs[0])
return nil
} else if !singleShot && batch {
go s.execBatch(ctx, codec, reqs)
} else { } else {
go s.exec(ctx, codec, reqs[0]) s.exec(ctx, codec, reqs[0])
} }
return nil
} }
// For multi-shot connections, start a goroutine to serve and loop back
pend.Add(1)
go func(reqs []*serverRequest, batch bool) {
defer pend.Done()
if batch {
s.execBatch(ctx, codec, reqs)
} else {
s.exec(ctx, codec, reqs[0])
}
}(reqs, batch)
}
return nil return nil
} }