lotus/lib/jsonrpc/handler.go

263 lines
6.6 KiB
Go
Raw Normal View History

2019-07-03 17:39:07 +00:00
package jsonrpc
import (
"bytes"
2019-07-12 17:12:38 +00:00
"context"
"encoding/base64"
"encoding/json"
"fmt"
2019-07-12 17:12:38 +00:00
"io"
"reflect"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
"golang.org/x/xerrors"
)
type rpcHandler struct {
paramReceivers []reflect.Type
nParams int
receiver reflect.Value
handlerFunc reflect.Value
hasCtx int
errOut int
valOut int
}
2019-07-12 17:12:38 +00:00
type handlers map[string]rpcHandler
2019-07-12 17:12:38 +00:00
// Request / response
type request struct {
Jsonrpc string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
Method string `json:"method"`
Params []param `json:"params"`
Meta map[string]string `json:"meta,omitempty"`
}
type respError struct {
Code int `json:"code"`
Message string `json:"message"`
}
2019-07-02 13:49:10 +00:00
func (e *respError) Error() string {
if e.Code >= -32768 && e.Code <= -32000 {
return fmt.Sprintf("RPC error (%d): %s", e.Code, e.Message)
}
return e.Message
}
type response struct {
Jsonrpc string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
2019-07-02 19:08:30 +00:00
ID int64 `json:"id"`
Error *respError `json:"error,omitempty"`
}
2019-07-12 17:12:38 +00:00
// Register
func (h handlers) register(namespace string, r interface{}) {
val := reflect.ValueOf(r)
//TODO: expect ptr
for i := 0; i < val.NumMethod(); i++ {
method := val.Type().Method(i)
funcType := method.Func.Type()
hasCtx := 0
if funcType.NumIn() >= 2 && funcType.In(1) == contextType {
hasCtx = 1
}
ins := funcType.NumIn() - 1 - hasCtx
recvs := make([]reflect.Type, ins)
for i := 0; i < ins; i++ {
recvs[i] = method.Type.In(i + 1 + hasCtx)
}
valOut, errOut, _ := processFuncOut(funcType)
h[namespace+"."+method.Name] = rpcHandler{
paramReceivers: recvs,
nParams: ins,
handlerFunc: method.Func,
receiver: val,
hasCtx: hasCtx,
errOut: errOut,
valOut: valOut,
}
}
}
// Handle
2019-07-15 16:21:48 +00:00
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
type chanOut func(reflect.Value, int64) error
2019-07-12 17:12:38 +00:00
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
2019-07-15 16:21:48 +00:00
wf := func(cb func(io.Writer)) {
cb(w)
}
var req request
2019-07-12 17:12:38 +00:00
if err := json.NewDecoder(r).Decode(&req); err != nil {
rpcError(wf, &req, rpcParseError, xerrors.Errorf("unmarshaling request: %w", err))
return
}
2019-07-22 18:13:41 +00:00
h.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
2019-07-12 17:12:38 +00:00
}
2019-09-16 16:40:26 +00:00
func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []reflect.Value, err error) {
2019-07-25 14:08:41 +00:00
defer func() {
if i := recover(); i != nil {
2019-09-16 16:40:26 +00:00
err = xerrors.Errorf("panic in rpc method '%s': %s", methodName, i)
2019-07-25 14:08:41 +00:00
log.Error(err)
}
}()
out = f.Call(params)
return out, nil
}
func (handlers) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) {
if req.Meta == nil {
return ctx, nil
}
if eSC, ok := req.Meta["SpanContext"]; ok {
bSC := make([]byte, base64.StdEncoding.DecodedLen(len(eSC)))
_, err := base64.StdEncoding.Decode(bSC, []byte(eSC))
if err != nil {
log.Errorf("SpanContext: decode", "error", err)
return ctx, nil
}
sc, ok := propagation.FromBinary(bSC)
if !ok {
log.Errorf("SpanContext: could not create span", "data", bSC)
return ctx, nil
}
ctx, span := trace.StartSpanWithRemoteParent(ctx, "api.handle", sc)
span.AddAttributes(trace.StringAttribute("method", req.Method))
return ctx, span
}
return ctx, nil
}
2019-07-22 18:13:41 +00:00
func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
// Not sure if we need to sanitize the incoming req.Method or not.
ctx, span := h.getSpan(ctx, req)
ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method))
defer span.End()
2019-07-12 17:12:38 +00:00
handler, ok := h[req.Method]
if !ok {
2019-07-12 17:12:38 +00:00
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
stats.Record(ctx, metrics.RPCInvalidMethod.M(1))
2019-07-22 18:13:41 +00:00
done(false)
return
}
if len(req.Params) != handler.nParams {
2019-07-12 17:12:38 +00:00
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
stats.Record(ctx, metrics.RPCRequestError.M(1))
2019-07-22 18:13:41 +00:00
done(false)
return
}
outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan
defer done(outCh)
if chOut == nil && outCh {
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method))
stats.Record(ctx, metrics.RPCRequestError.M(1))
return
}
callParams := make([]reflect.Value, 1+handler.hasCtx+handler.nParams)
callParams[0] = handler.receiver
if handler.hasCtx == 1 {
2019-07-12 17:12:38 +00:00
callParams[1] = reflect.ValueOf(ctx)
}
for i := 0; i < handler.nParams; i++ {
rp := reflect.New(handler.paramReceivers[i])
if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil {
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s': %w", handler.handlerFunc, err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
return
}
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Elem().Interface())
}
2019-07-12 17:12:38 +00:00
///////////////////
2019-09-16 16:40:26 +00:00
callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
2019-07-25 14:08:41 +00:00
if err != nil {
rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err))
stats.Record(ctx, metrics.RPCRequestError.M(1))
2019-07-25 14:08:41 +00:00
return
}
2019-07-02 19:08:30 +00:00
if req.ID == nil {
return // notification
}
2019-07-12 17:12:38 +00:00
///////////////////
resp := response{
Jsonrpc: "2.0",
2019-07-02 19:08:30 +00:00
ID: *req.ID,
}
if handler.errOut != -1 {
err := callResult[handler.errOut].Interface()
if err != nil {
log.Warnf("error in RPC call to '%s': %+v", req.Method, err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),
}
}
}
if handler.valOut != -1 {
resp.Result = callResult[handler.valOut].Interface()
}
if resp.Result != nil && reflect.TypeOf(resp.Result).Kind() == reflect.Chan {
// Channel responses are sent from channel control goroutine.
// Sending responses here could cause deadlocks on writeLk, or allow
// sending channel messages before this rpc call returns
//noinspection GoNilness // already checked above
err = chOut(callResult[handler.valOut], *req.ID)
if err == nil {
return // channel goroutine handles responding
}
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),
2019-07-22 18:13:41 +00:00
}
}
w(func(w io.Writer) {
2019-07-15 16:21:48 +00:00
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Error(err)
stats.Record(ctx, metrics.RPCResponseError.M(1))
2019-07-15 16:21:48 +00:00
return
}
})
}