lotus/gateway/handler.go

165 lines
4.2 KiB
Go
Raw Normal View History

2021-05-23 17:37:53 +00:00
package gateway
import (
2022-05-20 10:40:52 +00:00
"context"
"net"
2021-05-23 17:37:53 +00:00
"net/http"
2022-05-20 10:40:52 +00:00
"sync"
"time"
2021-05-23 17:37:53 +00:00
"contrib.go.opencensus.io/exporter/prometheus"
2022-06-14 15:00:51 +00:00
"github.com/gorilla/mux"
promclient "github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
2021-05-23 17:37:53 +00:00
"github.com/filecoin-project/go-jsonrpc"
2022-06-14 15:00:51 +00:00
2022-05-21 01:38:17 +00:00
lapi "github.com/filecoin-project/lotus/api"
2021-05-23 17:37:53 +00:00
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/metrics/proxy"
2022-05-21 01:38:17 +00:00
"github.com/filecoin-project/lotus/node"
2021-05-23 17:37:53 +00:00
)
2022-06-06 23:37:11 +00:00
type perConnLimiterKeyType string
2022-06-07 17:36:57 +00:00
const perConnLimiterKey perConnLimiterKeyType = "limiter"
2022-06-06 23:37:11 +00:00
type filterTrackerKeyType string
const filterTrackerKey filterTrackerKeyType = "filterTracker"
2021-05-23 17:37:53 +00:00
// Handler returns a gateway http.Handler, to be mounted as-is on the server.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) {
2021-05-23 17:37:53 +00:00
m := mux.NewRouter()
serveRpc := func(path string, hnd interface{}) {
2022-07-18 16:36:51 +00:00
rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithServerErrors(lapi.RPCErrors))...)
2021-05-23 17:37:53 +00:00
rpcServer.Register("Filecoin", hnd)
2022-05-27 11:23:32 +00:00
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")
2022-12-13 17:14:03 +00:00
lapi.CrateEthRPCAliases(rpcServer)
2021-05-23 17:37:53 +00:00
m.Handle(path, rpcServer)
}
2022-05-21 01:38:17 +00:00
ma := proxy.MetricedGatewayAPI(gwapi)
2021-05-23 17:37:53 +00:00
serveRpc("/rpc/v1", ma)
2022-05-21 01:38:17 +00:00
serveRpc("/rpc/v0", lapi.Wrap(new(v1api.FullNodeStruct), new(v0api.WrapperV1Full), ma))
2021-05-23 17:37:53 +00:00
registry := promclient.DefaultRegisterer.(*promclient.Registry)
exporter, err := prometheus.NewExporter(prometheus.Options{
Registry: registry,
Namespace: "lotus_gw",
})
if err != nil {
return nil, err
}
m.Handle("/debug/metrics", exporter)
2022-05-21 01:38:17 +00:00
m.Handle("/health/livez", node.NewLiveHandler(api))
m.Handle("/health/readyz", node.NewReadyHandler(api))
2021-05-23 17:37:53 +00:00
m.PathPrefix("/").Handler(http.DefaultServeMux)
/*ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}*/
2022-05-20 10:40:52 +00:00
rlh := NewRateLimiterHandler(m, rateLimit)
clh := NewConnectionRateLimiterHandler(rlh, connPerMinute)
return clh, nil
}
func NewRateLimiterHandler(handler http.Handler, rateLimit int64) *RateLimiterHandler {
limiter := limiterFromRateLimit(rateLimit)
return &RateLimiterHandler{
handler: handler,
limiter: limiter,
}
}
// Adds a rate limiter to the request context for per-connection rate limiting
type RateLimiterHandler struct {
handler http.Handler
limiter *rate.Limiter
}
func (h RateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter))
// also add a filter tracker to the context
r = r.WithContext(context.WithValue(r.Context(), filterTrackerKey, newFilterTracker()))
h.handler.ServeHTTP(w, r)
2022-05-20 10:40:52 +00:00
}
// this blocks new connections if there have already been too many.
func NewConnectionRateLimiterHandler(handler http.Handler, connPerMinute int64) *ConnectionRateLimiterHandler {
ipmap := make(map[string]int64)
return &ConnectionRateLimiterHandler{
ipmap: ipmap,
connPerMinute: connPerMinute,
handler: handler,
}
}
type ConnectionRateLimiterHandler struct {
mu sync.Mutex
ipmap map[string]int64
connPerMinute int64
handler http.Handler
}
func (h *ConnectionRateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.connPerMinute == 0 {
h.handler.ServeHTTP(w, r)
return
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
h.mu.Lock()
seen, ok := h.ipmap[host]
if !ok {
h.ipmap[host] = 1
h.mu.Unlock()
h.handler.ServeHTTP(w, r)
return
}
// rate limited
if seen > h.connPerMinute {
h.mu.Unlock()
w.WriteHeader(http.StatusTooManyRequests)
return
}
h.ipmap[host] = seen + 1
h.mu.Unlock()
go func() {
select {
case <-time.After(time.Minute):
h.mu.Lock()
defer h.mu.Unlock()
h.ipmap[host] = h.ipmap[host] - 1
if h.ipmap[host] <= 0 {
delete(h.ipmap, host)
}
}
}()
h.handler.ServeHTTP(w, r)
}
func limiterFromRateLimit(rateLimit int64) *rate.Limiter {
var limit rate.Limit
if rateLimit == 0 {
limit = rate.Inf
} else {
limit = rate.Every(time.Second / time.Duration(rateLimit))
}
return rate.NewLimiter(limit, stateRateLimitTokens)
2021-05-23 17:37:53 +00:00
}