rpclib: Id -> ID, fix TestCtx

This commit is contained in:
Łukasz Magiera 2019-07-02 21:08:30 +02:00
parent 0f8f61fc7b
commit 5eb60c7ce7
3 changed files with 31 additions and 17 deletions

View File

@ -16,7 +16,7 @@ var (
contextType = reflect.TypeOf(new(context.Context)).Elem() contextType = reflect.TypeOf(new(context.Context)).Elem()
) )
// ErrClient is an error which occured on the client side the library // ErrClient is an error which occurred on the client side the library
type ErrClient struct { type ErrClient struct {
err error err error
} }
@ -39,7 +39,7 @@ func (r *result) UnmarshalJSON(raw []byte) error {
type clientResponse struct { type clientResponse struct {
Jsonrpc string `json:"jsonrpc"` Jsonrpc string `json:"jsonrpc"`
Result result `json:"result"` Result result `json:"result"`
Id int64 `json:"id"` ID int64 `json:"id"`
Error *respError `json:"error,omitempty"` Error *respError `json:"error,omitempty"`
} }
@ -117,7 +117,7 @@ func NewClient(addr string, namespace string, handler interface{}) {
req := request{ req := request{
Jsonrpc: "2.0", Jsonrpc: "2.0",
Id: &id, ID: &id,
Method: namespace + "." + f.Name, Method: namespace + "." + f.Name,
Params: params, Params: params,
} }
@ -142,7 +142,6 @@ func NewClient(addr string, namespace string, handler interface{}) {
if err != nil { if err != nil {
return processError(err) return processError(err)
} }
defer httpResp.Body.Close()
// process response // process response
@ -155,7 +154,11 @@ func NewClient(addr string, namespace string, handler interface{}) {
return processError(err) return processError(err)
} }
if resp.Id != *req.Id { if err := httpResp.Body.Close(); err != nil {
return processError(err)
}
if resp.ID != *req.ID {
return processError(errors.New("request and response id didn't match")) return processError(errors.New("request and response id didn't match"))
} }

View File

@ -10,10 +10,8 @@ import (
const ( const (
rpcParseError = -32700 rpcParseError = -32700
// rpcInvalidRequest = -32600
rpcMethodNotFound = -32601 rpcMethodNotFound = -32601
rpcInvalidParams = -32602 rpcInvalidParams = -32602
// rpcInternalError = -32603
) )
type rpcHandler struct { type rpcHandler struct {
@ -59,7 +57,7 @@ func (p *param) MarshalJSON() ([]byte, error) {
type request struct { type request struct {
Jsonrpc string `json:"jsonrpc"` Jsonrpc string `json:"jsonrpc"`
Id *int64 `json:"id,omitempty"` ID *int64 `json:"id,omitempty"`
Method string `json:"method"` Method string `json:"method"`
Params []param `json:"params"` Params []param `json:"params"`
} }
@ -79,7 +77,7 @@ func (e *respError) Error() string {
type response struct { type response struct {
Jsonrpc string `json:"jsonrpc"` Jsonrpc string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"` Result interface{} `json:"result,omitempty"`
Id int64 `json:"id"` ID int64 `json:"id"`
Error *respError `json:"error,omitempty"` Error *respError `json:"error,omitempty"`
} }
@ -119,13 +117,13 @@ func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
callResult := handler.handlerFunc.Call(callParams) callResult := handler.handlerFunc.Call(callParams)
if req.Id == nil { if req.ID == nil {
return // notification return // notification
} }
resp := response{ resp := response{
Jsonrpc: "2.0", Jsonrpc: "2.0",
Id: *req.Id, ID: *req.ID,
} }
if handler.errOut != -1 { if handler.errOut != -1 {
@ -149,13 +147,13 @@ func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (s *RPCServer) rpcError(w http.ResponseWriter, req *request, code int, err error) { func (s *RPCServer) rpcError(w http.ResponseWriter, req *request, code int, err error) {
w.WriteHeader(500) w.WriteHeader(500)
if req.Id == nil { // notification if req.ID == nil { // notification
return return
} }
resp := response{ resp := response{
Jsonrpc: "2.0", Jsonrpc: "2.0",
Id: *req.Id, ID: *req.ID,
Error: &respError{ Error: &respError{
Code: code, Code: code,
Message: err.Error(), Message: err.Error(),

View File

@ -5,7 +5,7 @@ import (
"errors" "errors"
"net/http/httptest" "net/http/httptest"
"strconv" "strconv"
"sync/atomic" "sync"
"testing" "testing"
"time" "time"
) )
@ -177,13 +177,17 @@ func TestRPC(t *testing.T) {
} }
type CtxHandler struct { type CtxHandler struct {
lk sync.Mutex
cancelled bool cancelled bool
i int32 i int
} }
func (h *CtxHandler) Test(ctx context.Context) { func (h *CtxHandler) Test(ctx context.Context) {
h.lk.Lock()
defer h.lk.Unlock()
timeout := time.After(300 * time.Millisecond) timeout := time.After(300 * time.Millisecond)
atomic.AddInt32(&h.i, 1) h.i++
select { select {
case <-timeout: case <-timeout:
@ -215,19 +219,28 @@ func TestCtx(t *testing.T) {
defer cancel() defer cancel()
client.Test(ctx) client.Test(ctx)
serverHandler.lk.Lock()
if !serverHandler.cancelled { if !serverHandler.cancelled {
t.Error("expected cancellation on the server side") t.Error("expected cancellation on the server side")
} }
serverHandler.cancelled = false serverHandler.cancelled = false
serverHandler.lk.Unlock()
var noCtxClient struct { var noCtxClient struct {
Test func() Test func()
} }
NewClient(testServ.URL, "CtxHandler", &noCtxClient) NewClient(testServ.URL, "CtxHandler", &noCtxClient)
noCtxClient.Test() noCtxClient.Test()
if serverHandler.cancelled || atomic.LoadInt32(&serverHandler.i) != 2 {
serverHandler.lk.Lock()
if serverHandler.cancelled || serverHandler.i != 2 {
t.Error("wrong serverHandler state") t.Error("wrong serverHandler state")
} }
serverHandler.lk.Unlock()
} }