jsonrpc: Channel support
This commit is contained in:
parent
978bd5c17e
commit
ff4d1b5819
@ -121,4 +121,6 @@ type API interface {
|
|||||||
|
|
||||||
// Version provides information about API provider
|
// Version provides information about API provider
|
||||||
Version(context.Context) (Version, error)
|
Version(context.Context) (Version, error)
|
||||||
|
|
||||||
|
TestCh(context.Context) (<-chan int, error)
|
||||||
}
|
}
|
||||||
|
@ -41,9 +41,15 @@ type Struct struct {
|
|||||||
NetPeers func(context.Context) ([]peer.AddrInfo, error)
|
NetPeers func(context.Context) ([]peer.AddrInfo, error)
|
||||||
NetConnect func(context.Context, peer.AddrInfo) error
|
NetConnect func(context.Context, peer.AddrInfo) error
|
||||||
NetAddrsListen func(context.Context) (peer.AddrInfo, error)
|
NetAddrsListen func(context.Context) (peer.AddrInfo, error)
|
||||||
|
|
||||||
|
TestCh func(context.Context) (<-chan int, error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Struct) TestCh(ctx context.Context) (<-chan int, error) {
|
||||||
|
return c.Internal.TestCh(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Struct) ClientListImports(ctx context.Context) ([]Import, error) {
|
func (c *Struct) ClientListImports(ctx context.Context) ([]Import, error) {
|
||||||
return c.Internal.ClientListImports(ctx)
|
return c.Internal.ClientListImports(ctx)
|
||||||
}
|
}
|
||||||
|
26
cli/cmd.go
26
cli/cmd.go
@ -2,6 +2,7 @@ package cli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
@ -67,4 +68,29 @@ var Commands = []*cli.Command{
|
|||||||
versionCmd,
|
versionCmd,
|
||||||
walletCmd,
|
walletCmd,
|
||||||
createMinerCmd,
|
createMinerCmd,
|
||||||
|
|
||||||
|
{
|
||||||
|
Name:"testch",
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
api, err := getAPI(cctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ctx := reqContext(cctx)
|
||||||
|
|
||||||
|
c, err := api.TestCh(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case n := <-c:
|
||||||
|
fmt.Println(n)
|
||||||
|
case <- ctx.Done():
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
@ -34,17 +34,10 @@ func (e *ErrClient) Unwrap(err error) error {
|
|||||||
return e.err
|
return e.err
|
||||||
}
|
}
|
||||||
|
|
||||||
type result []byte
|
|
||||||
|
|
||||||
func (p *result) UnmarshalJSON(raw []byte) error {
|
|
||||||
*p = make([]byte, len(raw))
|
|
||||||
copy(*p, raw)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type clientResponse struct {
|
type clientResponse struct {
|
||||||
Jsonrpc string `json:"jsonrpc"`
|
Jsonrpc string `json:"jsonrpc"`
|
||||||
Result result `json:"result"`
|
Result json.RawMessage `json:"result"`
|
||||||
ID int64 `json:"id"`
|
ID int64 `json:"id"`
|
||||||
Error *respError `json:"error,omitempty"`
|
Error *respError `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
@ -52,6 +45,8 @@ type clientResponse struct {
|
|||||||
type clientRequest struct {
|
type clientRequest struct {
|
||||||
req request
|
req request
|
||||||
ready chan clientResponse
|
ready chan clientResponse
|
||||||
|
|
||||||
|
retCh func() func([]byte, bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClientCloser is used to close Client from further use
|
// ClientCloser is used to close Client from further use
|
||||||
@ -100,7 +95,7 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
|
|||||||
out := make([]reflect.Value, nout)
|
out := make([]reflect.Value, nout)
|
||||||
|
|
||||||
if valOut != -1 {
|
if valOut != -1 {
|
||||||
out[valOut] = rval.Elem()
|
out[valOut] = rval
|
||||||
}
|
}
|
||||||
if errOut != -1 {
|
if errOut != -1 {
|
||||||
out[errOut] = reflect.New(errorType).Elem()
|
out[errOut] = reflect.New(errorType).Elem()
|
||||||
@ -130,6 +125,7 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
|
|||||||
if ftyp.NumIn() > 0 && ftyp.In(0) == contextType {
|
if ftyp.NumIn() > 0 && ftyp.In(0) == contextType {
|
||||||
hasCtx = 1
|
hasCtx = 1
|
||||||
}
|
}
|
||||||
|
retCh := valOut != -1 && ftyp.Out(valOut).Kind() == reflect.Chan
|
||||||
|
|
||||||
fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) {
|
fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) {
|
||||||
id := atomic.AddInt64(&idCtr, 1)
|
id := atomic.AddInt64(&idCtr, 1)
|
||||||
@ -140,6 +136,31 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var retVal reflect.Value
|
||||||
|
var chCtor func() func([]byte, bool)
|
||||||
|
if retCh {
|
||||||
|
chCtor = func() func([]byte, bool) {
|
||||||
|
// unpack chan type to make sure it's reflect.BothDir
|
||||||
|
ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem())
|
||||||
|
ch := reflect.MakeChan(ctyp, 0) // todo: buffer?
|
||||||
|
retVal = ch.Convert(ftyp.Out(valOut))
|
||||||
|
|
||||||
|
return func(result []byte, ok bool) {
|
||||||
|
if !ok {
|
||||||
|
ch.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val := reflect.New(ftyp.Out(valOut).Elem())
|
||||||
|
if err := json.Unmarshal(result, val.Interface()); err != nil {
|
||||||
|
log.Errorf("error unmarshaling chan response: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ch.Send(val.Elem()) // todo: select on ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
req := request{
|
req := request{
|
||||||
Jsonrpc: "2.0",
|
Jsonrpc: "2.0",
|
||||||
ID: &id,
|
ID: &id,
|
||||||
@ -151,6 +172,8 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
|
|||||||
requests <- clientRequest{
|
requests <- clientRequest{
|
||||||
req: req,
|
req: req,
|
||||||
ready: rchan,
|
ready: rchan,
|
||||||
|
|
||||||
|
retCh: chCtor,
|
||||||
}
|
}
|
||||||
var ctxDone <-chan struct{}
|
var ctxDone <-chan struct{}
|
||||||
var resp clientResponse
|
var resp clientResponse
|
||||||
@ -159,6 +182,7 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
|
|||||||
ctxDone = args[0].Interface().(context.Context).Done()
|
ctxDone = args[0].Interface().(context.Context).Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wait for response, handle context cancellation
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -176,24 +200,25 @@ func NewClient(addr string, namespace string, handler interface{}) (ClientCloser
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var rval reflect.Value
|
|
||||||
|
|
||||||
if valOut != -1 {
|
if valOut != -1 && !retCh {
|
||||||
rval = reflect.New(ftyp.Out(valOut))
|
retVal = reflect.New(ftyp.Out(valOut))
|
||||||
|
|
||||||
if resp.Result != nil {
|
if resp.Result != nil {
|
||||||
log.Debugw("rpc result", "type", ftyp.Out(valOut))
|
log.Debugw("rpc result", "type", ftyp.Out(valOut))
|
||||||
if err := json.Unmarshal(resp.Result, rval.Interface()); err != nil {
|
if err := json.Unmarshal(resp.Result, retVal.Interface()); err != nil {
|
||||||
return processError(xerrors.Errorf("unmarshaling result: %w", err))
|
return processError(xerrors.Errorf("unmarshaling result: %w", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
retVal = retVal.Elem()
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.ID != *req.ID {
|
if resp.ID != *req.ID {
|
||||||
return processError(errors.New("request and response id didn't match"))
|
return processError(errors.New("request and response id didn't match"))
|
||||||
}
|
}
|
||||||
|
|
||||||
return processResponse(resp, rval)
|
return processResponse(resp, retVal)
|
||||||
})
|
})
|
||||||
|
|
||||||
val.Elem().Field(i).Set(fn)
|
val.Elem().Field(i).Set(fn)
|
||||||
|
@ -95,6 +95,7 @@ func (h handlers) register(namespace string, r interface{}) {
|
|||||||
// Handle
|
// Handle
|
||||||
|
|
||||||
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
|
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
|
||||||
|
type chanOut func(reflect.Value) interface{}
|
||||||
|
|
||||||
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
|
||||||
wf := func(cb func(io.Writer)) {
|
wf := func(cb func(io.Writer)) {
|
||||||
@ -107,20 +108,28 @@ func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rp
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
h.handle(ctx, req, wf, rpcError, func() {})
|
h.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func()) {
|
func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
|
||||||
defer done()
|
|
||||||
|
|
||||||
handler, ok := h[req.Method]
|
handler, ok := h[req.Method]
|
||||||
if !ok {
|
if !ok {
|
||||||
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
|
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
|
||||||
|
done(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(req.Params) != handler.nParams {
|
if len(req.Params) != handler.nParams {
|
||||||
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
|
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
|
||||||
|
done(false)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
outCh := handler.valOut != -1 && handler.handlerFunc.Type().Out(handler.valOut).Kind() == reflect.Chan
|
||||||
|
defer done(outCh)
|
||||||
|
|
||||||
|
if chOut == nil && outCh {
|
||||||
|
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -165,6 +174,11 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
|
|||||||
}
|
}
|
||||||
if handler.valOut != -1 {
|
if handler.valOut != -1 {
|
||||||
resp.Result = callResult[handler.valOut].Interface()
|
resp.Result = callResult[handler.valOut].Interface()
|
||||||
|
|
||||||
|
if reflect.TypeOf(resp.Result).Kind() == reflect.Chan {
|
||||||
|
//noinspection GoNilness // already checked above
|
||||||
|
resp.Result = chOut(callResult[handler.valOut])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
w(func(w io.Writer) {
|
w(func(w io.Writer) {
|
||||||
|
@ -60,6 +60,8 @@ func rpcError(wf func(func(io.Writer)), req *request, code int, err error) {
|
|||||||
hw.WriteHeader(500)
|
hw.WriteHeader(500)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Warnf("rpc error: %s", err)
|
||||||
|
|
||||||
if req.ID == nil { // notification
|
if req.ID == nil { // notification
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -73,7 +75,11 @@ func rpcError(wf func(func(io.Writer)), req *request, code int, err error) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = json.NewEncoder(w).Encode(resp)
|
err = json.NewEncoder(w).Encode(resp)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to write rpc error: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,12 +6,16 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
const wsCancel = "xrpc.cancel"
|
const wsCancel = "xrpc.cancel"
|
||||||
|
const chValue = "xrpc.ch.val"
|
||||||
|
const chClose = "xrpc.ch.close"
|
||||||
|
|
||||||
type frame struct {
|
type frame struct {
|
||||||
// common
|
// common
|
||||||
@ -23,7 +27,7 @@ type frame struct {
|
|||||||
Params []param `json:"params,omitempty"`
|
Params []param `json:"params,omitempty"`
|
||||||
|
|
||||||
// response
|
// response
|
||||||
Result result `json:"result,omitempty"`
|
Result json.RawMessage `json:"result,omitempty"`
|
||||||
Error *respError `json:"error,omitempty"`
|
Error *respError `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,6 +35,7 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
incoming := make(chan io.Reader)
|
incoming := make(chan io.Reader)
|
||||||
var incErr error
|
var incErr error
|
||||||
|
|
||||||
|
// nextMessage wait for one message and puts it to the incoming channel
|
||||||
nextMessage := func() {
|
nextMessage := func() {
|
||||||
mtype, r, err := conn.NextReader()
|
mtype, r, err := conn.NextReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -47,6 +52,9 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
var writeLk sync.Mutex
|
var writeLk sync.Mutex
|
||||||
|
|
||||||
|
// nextWriter waits for writeLk and invokes the cb callback with WS message
|
||||||
|
// writer when the lock is acquired
|
||||||
nextWriter := func(cb func(io.Writer)) {
|
nextWriter := func(cb func(io.Writer)) {
|
||||||
writeLk.Lock()
|
writeLk.Lock()
|
||||||
defer writeLk.Unlock()
|
defer writeLk.Unlock()
|
||||||
@ -65,12 +73,113 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sendReq := func(req request) {
|
||||||
|
writeLk.Lock()
|
||||||
|
if err := conn.WriteJSON(req); err != nil {
|
||||||
|
log.Error("handle me:", err)
|
||||||
|
writeLk.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeLk.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the first message
|
||||||
go nextMessage()
|
go nextMessage()
|
||||||
|
|
||||||
|
// inflight are requests we sent to the remote
|
||||||
inflight := map[int64]clientRequest{}
|
inflight := map[int64]clientRequest{}
|
||||||
|
|
||||||
|
// handling are the calls we handle
|
||||||
handling := map[int64]context.CancelFunc{}
|
handling := map[int64]context.CancelFunc{}
|
||||||
var handlingLk sync.Mutex
|
var handlingLk sync.Mutex
|
||||||
|
|
||||||
|
// ////
|
||||||
|
// Subscriptions (func() <-chan Typ - like methods)
|
||||||
|
|
||||||
|
var chOnce sync.Once
|
||||||
|
var outId uint64
|
||||||
|
type chReg struct { id uint64; ch reflect.Value }
|
||||||
|
registerCh := make(chan chReg)
|
||||||
|
defer close(registerCh)
|
||||||
|
|
||||||
|
handleOutChans := func() {
|
||||||
|
regV := reflect.ValueOf(registerCh)
|
||||||
|
|
||||||
|
cases := []reflect.SelectCase{
|
||||||
|
{ // registration chan always 0
|
||||||
|
Dir: reflect.SelectRecv,
|
||||||
|
Chan: regV,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var caseToId []uint64
|
||||||
|
|
||||||
|
|
||||||
|
for {
|
||||||
|
chosen, val, ok := reflect.Select(cases)
|
||||||
|
|
||||||
|
if chosen == 0 { // control channel
|
||||||
|
if !ok {
|
||||||
|
// not closing any channels as we're on receiving end.
|
||||||
|
// Also, context cancellation below should take care of any running
|
||||||
|
// requests
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
registration := val.Interface().(chReg)
|
||||||
|
|
||||||
|
caseToId = append(caseToId, registration.id)
|
||||||
|
cases = append(cases, reflect.SelectCase{
|
||||||
|
Dir: reflect.SelectRecv,
|
||||||
|
Chan: registration.ch,
|
||||||
|
})
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
n := len(caseToId)
|
||||||
|
if n > 0 {
|
||||||
|
cases[chosen] = cases[n]
|
||||||
|
caseToId[chosen - 1] = caseToId[n - 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
cases = cases[:n]
|
||||||
|
caseToId = caseToId[:n-1]
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sendReq(request{
|
||||||
|
Jsonrpc: "2.0",
|
||||||
|
ID: nil, // notification
|
||||||
|
Method: chValue,
|
||||||
|
Params: []param{{v: reflect.ValueOf(caseToId[chosen - 1])}, {v: val}},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handleChanOut := func(ch reflect.Value) interface{} {
|
||||||
|
chOnce.Do(func() {
|
||||||
|
go handleOutChans()
|
||||||
|
})
|
||||||
|
id := atomic.AddUint64(&outId, 1)
|
||||||
|
|
||||||
|
registerCh <- chReg{
|
||||||
|
id: id,
|
||||||
|
ch: ch,
|
||||||
|
}
|
||||||
|
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
|
// client side subs
|
||||||
|
|
||||||
|
chanHandlers := map[uint64]func(m []byte, ok bool){}
|
||||||
|
|
||||||
|
// ////
|
||||||
|
|
||||||
|
// on close, make sure to return from all pending calls, and cancel context
|
||||||
|
// on all calls we handle
|
||||||
defer func() {
|
defer func() {
|
||||||
for id, req := range inflight {
|
for id, req := range inflight {
|
||||||
req.ready <- clientResponse{
|
req.ready <- clientResponse{
|
||||||
@ -89,6 +198,7 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// cancelCtx is a built-in rpc which handles context cancellation over rpc
|
||||||
cancelCtx := func(req frame) {
|
cancelCtx := func(req frame) {
|
||||||
if req.ID != nil {
|
if req.ID != nil {
|
||||||
log.Warnf("%s call with ID set, won't respond", wsCancel)
|
log.Warnf("%s call with ID set, won't respond", wsCancel)
|
||||||
@ -125,6 +235,10 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get message type by method name:
|
||||||
|
// "" - response
|
||||||
|
// "xrpc.*" - builtin
|
||||||
|
// anything else - incoming remote call
|
||||||
switch frame.Method {
|
switch frame.Method {
|
||||||
case "": // Response to our call
|
case "": // Response to our call
|
||||||
req, ok := inflight[*frame.ID]
|
req, ok := inflight[*frame.ID]
|
||||||
@ -133,6 +247,17 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.retCh != nil {
|
||||||
|
// output is channel
|
||||||
|
var chid uint64
|
||||||
|
if err := json.Unmarshal(frame.Result, &chid); err != nil {
|
||||||
|
log.Error("failed to unmarshal channel id response: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
chanHandlers[chid] = req.retCh()
|
||||||
|
}
|
||||||
|
|
||||||
req.ready <- clientResponse{
|
req.ready <- clientResponse{
|
||||||
Jsonrpc: frame.Jsonrpc,
|
Jsonrpc: frame.Jsonrpc,
|
||||||
Result: frame.Result,
|
Result: frame.Result,
|
||||||
@ -142,6 +267,20 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
delete(inflight, *frame.ID)
|
delete(inflight, *frame.ID)
|
||||||
case wsCancel:
|
case wsCancel:
|
||||||
cancelCtx(frame)
|
cancelCtx(frame)
|
||||||
|
case chValue:
|
||||||
|
var chid uint64
|
||||||
|
if err := json.Unmarshal(frame.Params[0].data, &chid); err != nil {
|
||||||
|
log.Error("failed to unmarshal channel id in xrpc.ch.val: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hnd, ok := chanHandlers[chid]
|
||||||
|
if !ok {
|
||||||
|
log.Error("xrpc.ch.val: handler %d not found", chid)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hnd(frame.Params[1].data, true)
|
||||||
default: // Remote call
|
default: // Remote call
|
||||||
req := request{
|
req := request{
|
||||||
Jsonrpc: frame.Jsonrpc,
|
Jsonrpc: frame.Jsonrpc,
|
||||||
@ -155,7 +294,11 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
nw := func(cb func(io.Writer)) {
|
nw := func(cb func(io.Writer)) {
|
||||||
cb(ioutil.Discard)
|
cb(ioutil.Discard)
|
||||||
}
|
}
|
||||||
done := cf
|
done := func(keepctx bool) {
|
||||||
|
if !keepctx {
|
||||||
|
cf()
|
||||||
|
}
|
||||||
|
}
|
||||||
if frame.ID != nil {
|
if frame.ID != nil {
|
||||||
nw = nextWriter
|
nw = nextWriter
|
||||||
|
|
||||||
@ -163,27 +306,27 @@ func handleWsConn(ctx context.Context, conn *websocket.Conn, handler handlers, r
|
|||||||
handling[*frame.ID] = cf
|
handling[*frame.ID] = cf
|
||||||
handlingLk.Unlock()
|
handlingLk.Unlock()
|
||||||
|
|
||||||
done = func() {
|
done = func(keepctx bool) {
|
||||||
handlingLk.Lock()
|
handlingLk.Lock()
|
||||||
defer handlingLk.Unlock()
|
defer handlingLk.Unlock()
|
||||||
|
|
||||||
|
if !keepctx {
|
||||||
cf()
|
cf()
|
||||||
|
}
|
||||||
|
|
||||||
delete(handling, *frame.ID)
|
delete(handling, *frame.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go handler.handle(ctx, req, nw, rpcError, done)
|
go handler.handle(ctx, req, nw, rpcError, done, handleChanOut)
|
||||||
}
|
}
|
||||||
|
|
||||||
go nextMessage()
|
go nextMessage() // TODO: fix on errors
|
||||||
case req := <-requests:
|
case req := <-requests:
|
||||||
if req.req.ID != nil {
|
if req.req.ID != nil {
|
||||||
inflight[*req.req.ID] = req
|
inflight[*req.req.ID] = req
|
||||||
}
|
}
|
||||||
if err := conn.WriteJSON(req.req); err != nil {
|
sendReq(req.req)
|
||||||
log.Error("handle me:", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-stop:
|
case <-stop:
|
||||||
if err := conn.Close(); err != nil {
|
if err := conn.Close(); err != nil {
|
||||||
log.Debugf("websocket close error", "error", err)
|
log.Debugf("websocket close error", "error", err)
|
||||||
|
22
node/api.go
22
node/api.go
@ -2,6 +2,8 @@ package node
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/api"
|
"github.com/filecoin-project/go-lotus/api"
|
||||||
"github.com/filecoin-project/go-lotus/build"
|
"github.com/filecoin-project/go-lotus/build"
|
||||||
@ -28,6 +30,26 @@ type API struct {
|
|||||||
Wallet *chain.Wallet
|
Wallet *chain.Wallet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *API) TestCh(ctx context.Context) (<-chan int, error) {
|
||||||
|
out := make(chan int)
|
||||||
|
go func() {
|
||||||
|
defer close(out)
|
||||||
|
var n int
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
n++
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
fmt.Println("CTXCANCEL!")
|
||||||
|
return
|
||||||
|
case out <- n:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (a *API) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error {
|
func (a *API) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error {
|
||||||
if err := a.Chain.AddBlock(blk.Header); err != nil {
|
if err := a.Chain.AddBlock(blk.Header); err != nil {
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user