node, rpc: add configurable HTTP request limit (#28948)
Adds a configurable HTTP request limit, and bumps the engine default
This commit is contained in:
parent
449d3f0d87
commit
69f5d5ba1f
@ -41,6 +41,7 @@ const (
|
||||
// needs of all CLs.
|
||||
engineAPIBatchItemLimit = 2000
|
||||
engineAPIBatchResponseSizeLimit = 250 * 1000 * 1000
|
||||
engineAPIBodyLimit = 128 * 1024 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -453,14 +453,16 @@ func (n *Node) startRPC() error {
|
||||
jwtSecret: secret,
|
||||
batchItemLimit: engineAPIBatchItemLimit,
|
||||
batchResponseSizeLimit: engineAPIBatchResponseSizeLimit,
|
||||
httpBodyLimit: engineAPIBodyLimit,
|
||||
}
|
||||
if err := server.enableRPC(allAPIs, httpConfig{
|
||||
err := server.enableRPC(allAPIs, httpConfig{
|
||||
CorsAllowedOrigins: DefaultAuthCors,
|
||||
Vhosts: n.config.AuthVirtualHosts,
|
||||
Modules: DefaultAuthModules,
|
||||
prefix: DefaultAuthPrefix,
|
||||
rpcEndpointConfig: sharedConfig,
|
||||
}); err != nil {
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
servers = append(servers, server)
|
||||
|
@ -56,6 +56,7 @@ type rpcEndpointConfig struct {
|
||||
jwtSecret []byte // optional JWT secret
|
||||
batchItemLimit int
|
||||
batchResponseSizeLimit int
|
||||
httpBodyLimit int
|
||||
}
|
||||
|
||||
type rpcHandler struct {
|
||||
@ -304,6 +305,9 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {
|
||||
// Create RPC server and handler.
|
||||
srv := rpc.NewServer()
|
||||
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
|
||||
if config.httpBodyLimit > 0 {
|
||||
srv.SetHTTPBodyLimit(config.httpBodyLimit)
|
||||
}
|
||||
if err := RegisterApis(apis, config.Modules, srv); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -336,6 +340,9 @@ func (h *httpServer) enableWS(apis []rpc.API, config wsConfig) error {
|
||||
// Create RPC server and handler.
|
||||
srv := rpc.NewServer()
|
||||
srv.SetBatchLimits(config.batchItemLimit, config.batchResponseSizeLimit)
|
||||
if config.httpBodyLimit > 0 {
|
||||
srv.SetHTTPBodyLimit(config.httpBodyLimit)
|
||||
}
|
||||
if err := RegisterApis(apis, config.Modules, srv); err != nil {
|
||||
return err
|
||||
}
|
||||
|
16
rpc/http.go
16
rpc/http.go
@ -33,7 +33,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxRequestContentLength = 1024 * 1024 * 5
|
||||
defaultBodyLimit = 5 * 1024 * 1024
|
||||
contentType = "application/json"
|
||||
)
|
||||
|
||||
@ -253,8 +253,8 @@ type httpServerConn struct {
|
||||
r *http.Request
|
||||
}
|
||||
|
||||
func newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec {
|
||||
body := io.LimitReader(r.Body, maxRequestContentLength)
|
||||
func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) ServerCodec {
|
||||
body := io.LimitReader(r.Body, int64(s.httpBodyLimit))
|
||||
conn := &httpServerConn{Reader: body, Writer: w, r: r}
|
||||
|
||||
encoder := func(v any, isErrorResponse bool) error {
|
||||
@ -312,7 +312,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return
|
||||
}
|
||||
if code, err := validateRequest(r); err != nil {
|
||||
if code, err := s.validateRequest(r); err != nil {
|
||||
http.Error(w, err.Error(), code)
|
||||
return
|
||||
}
|
||||
@ -330,19 +330,19 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// until EOF, writes the response to w, and orders the server to process a
|
||||
// single request.
|
||||
w.Header().Set("content-type", contentType)
|
||||
codec := newHTTPServerConn(r, w)
|
||||
codec := s.newHTTPServerConn(r, w)
|
||||
defer codec.close()
|
||||
s.serveSingleRequest(ctx, codec)
|
||||
}
|
||||
|
||||
// validateRequest returns a non-zero response code and error message if the
|
||||
// request is invalid.
|
||||
func validateRequest(r *http.Request) (int, error) {
|
||||
func (s *Server) validateRequest(r *http.Request) (int, error) {
|
||||
if r.Method == http.MethodPut || r.Method == http.MethodDelete {
|
||||
return http.StatusMethodNotAllowed, errors.New("method not allowed")
|
||||
}
|
||||
if r.ContentLength > maxRequestContentLength {
|
||||
err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, maxRequestContentLength)
|
||||
if r.ContentLength > int64(s.httpBodyLimit) {
|
||||
err := fmt.Errorf("content length too large (%d>%d)", r.ContentLength, s.httpBodyLimit)
|
||||
return http.StatusRequestEntityTooLarge, err
|
||||
}
|
||||
// Allow OPTIONS (regardless of content-type)
|
||||
|
@ -40,11 +40,13 @@ func confirmStatusCode(t *testing.T, got, want int) {
|
||||
|
||||
func confirmRequestValidationCode(t *testing.T, method, contentType, body string, expectedStatusCode int) {
|
||||
t.Helper()
|
||||
|
||||
s := NewServer()
|
||||
request := httptest.NewRequest(method, "http://url.com", strings.NewReader(body))
|
||||
if len(contentType) > 0 {
|
||||
request.Header.Set("Content-Type", contentType)
|
||||
}
|
||||
code, err := validateRequest(request)
|
||||
code, err := s.validateRequest(request)
|
||||
if code == 0 {
|
||||
if err != nil {
|
||||
t.Errorf("validation: got error %v, expected nil", err)
|
||||
@ -64,7 +66,7 @@ func TestHTTPErrorResponseWithPut(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHTTPErrorResponseWithMaxContentLength(t *testing.T) {
|
||||
body := make([]rune, maxRequestContentLength+1)
|
||||
body := make([]rune, defaultBodyLimit+1)
|
||||
confirmRequestValidationCode(t,
|
||||
http.MethodPost, contentType, string(body), http.StatusRequestEntityTooLarge)
|
||||
}
|
||||
@ -104,7 +106,7 @@ func TestHTTPResponseWithEmptyGet(t *testing.T) {
|
||||
|
||||
// This checks that maxRequestContentLength is not applied to the response of a request.
|
||||
func TestHTTPRespBodyUnlimited(t *testing.T) {
|
||||
const respLength = maxRequestContentLength * 3
|
||||
const respLength = defaultBodyLimit * 3
|
||||
|
||||
s := NewServer()
|
||||
defer s.Stop()
|
||||
|
@ -51,6 +51,7 @@ type Server struct {
|
||||
run atomic.Bool
|
||||
batchItemLimit int
|
||||
batchResponseLimit int
|
||||
httpBodyLimit int
|
||||
}
|
||||
|
||||
// NewServer creates a new server instance with no registered handlers.
|
||||
@ -58,6 +59,7 @@ func NewServer() *Server {
|
||||
server := &Server{
|
||||
idgen: randomIDGenerator(),
|
||||
codecs: make(map[ServerCodec]struct{}),
|
||||
httpBodyLimit: defaultBodyLimit,
|
||||
}
|
||||
server.run.Store(true)
|
||||
// Register the default service providing meta information about the RPC service such
|
||||
@ -78,6 +80,13 @@ func (s *Server) SetBatchLimits(itemLimit, maxResponseSize int) {
|
||||
s.batchResponseLimit = maxResponseSize
|
||||
}
|
||||
|
||||
// SetHTTPBodyLimit sets the size limit for HTTP requests.
|
||||
//
|
||||
// This method should be called before processing any requests via ServeHTTP.
|
||||
func (s *Server) SetHTTPBodyLimit(limit int) {
|
||||
s.httpBodyLimit = limit
|
||||
}
|
||||
|
||||
// RegisterName creates a service for the given receiver type under the given name. When no
|
||||
// methods on the given receiver match the criteria to be either a RPC method or a
|
||||
// subscription an error is returned. Otherwise a new service is created and added to the
|
||||
|
@ -97,7 +97,7 @@ func TestWebsocketLargeCall(t *testing.T) {
|
||||
|
||||
// This call sends slightly less than the limit and should work.
|
||||
var result echoResult
|
||||
arg := strings.Repeat("x", maxRequestContentLength-200)
|
||||
arg := strings.Repeat("x", defaultBodyLimit-200)
|
||||
if err := client.Call(&result, "test_echo", arg, 1); err != nil {
|
||||
t.Fatalf("valid call didn't work: %v", err)
|
||||
}
|
||||
@ -106,7 +106,7 @@ func TestWebsocketLargeCall(t *testing.T) {
|
||||
}
|
||||
|
||||
// This call sends twice the allowed size and shouldn't work.
|
||||
arg = strings.Repeat("x", maxRequestContentLength*2)
|
||||
arg = strings.Repeat("x", defaultBodyLimit*2)
|
||||
err = client.Call(&result, "test_echo", arg)
|
||||
if err == nil {
|
||||
t.Fatal("no error for too large call")
|
||||
|
Loading…
Reference in New Issue
Block a user