Merge pull request #1392 from bas-vk/ipcpipelining
Several bugfixes to IPC channel
This commit is contained in:
		
						commit
						4dfcd6012b
					
				| @ -432,17 +432,17 @@ func MakeAccountManager(ctx *cli.Context) *accounts.Manager { | ||||
| func IpcSocketPath(ctx *cli.Context) (ipcpath string) { | ||||
| 	if common.IsWindows() { | ||||
| 		ipcpath = common.DefaultIpcPath() | ||||
| 		if ipcpath != ctx.GlobalString(IPCPathFlag.Name) { | ||||
| 		if ctx.GlobalIsSet(IPCPathFlag.Name) { | ||||
| 			ipcpath = ctx.GlobalString(IPCPathFlag.Name) | ||||
| 		} | ||||
| 	} else { | ||||
| 		ipcpath = common.DefaultIpcPath() | ||||
| 		if ctx.GlobalString(IPCPathFlag.Name) != common.DefaultIpcPath() { | ||||
| 			ipcpath = ctx.GlobalString(IPCPathFlag.Name) | ||||
| 		} else if ctx.GlobalString(DataDirFlag.Name) != "" && | ||||
| 			ctx.GlobalString(DataDirFlag.Name) != common.DefaultDataDir() { | ||||
| 		if ctx.GlobalIsSet(DataDirFlag.Name) { | ||||
| 			ipcpath = filepath.Join(ctx.GlobalString(DataDirFlag.Name), "geth.ipc") | ||||
| 		} | ||||
| 		if ctx.GlobalIsSet(IPCPathFlag.Name) { | ||||
| 			ipcpath = ctx.GlobalString(IPCPathFlag.Name) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return | ||||
|  | ||||
| @ -10,7 +10,7 @@ import ( | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| 	READ_TIMEOUT      = 15 // read timeout in seconds
 | ||||
| 	READ_TIMEOUT      = 60 // in seconds
 | ||||
| 	MAX_REQUEST_SIZE  = 1024 * 1024 | ||||
| 	MAX_RESPONSE_SIZE = 1024 * 1024 | ||||
| ) | ||||
| @ -18,51 +18,43 @@ const ( | ||||
| // Json serialization support
 | ||||
| type JsonCodec struct { | ||||
| 	c net.Conn | ||||
| 	d *json.Decoder | ||||
| } | ||||
| 
 | ||||
| // Create new JSON coder instance
 | ||||
| func NewJsonCoder(conn net.Conn) ApiCoder { | ||||
| 	return &JsonCodec{ | ||||
| 		c: conn, | ||||
| 		d: json.NewDecoder(conn), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Serialize obj to JSON and write it to conn
 | ||||
| // Read incoming request and parse it to RPC request
 | ||||
| func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { | ||||
| 	bytesInBuffer := 0 | ||||
| 	buf := make([]byte, MAX_REQUEST_SIZE) | ||||
| 
 | ||||
| 	deadline := time.Now().Add(READ_TIMEOUT * time.Second) | ||||
| 	if err := self.c.SetDeadline(deadline); err != nil { | ||||
| 		return nil, false, err | ||||
| 	} | ||||
| 
 | ||||
| 	for { | ||||
| 		n, err := self.c.Read(buf[bytesInBuffer:]) | ||||
| 		if err != nil { | ||||
| 			self.c.Close() | ||||
| 			return nil, false, err | ||||
| 		} | ||||
| 
 | ||||
| 		bytesInBuffer += n | ||||
| 
 | ||||
| 		singleRequest := shared.Request{} | ||||
| 		err = json.Unmarshal(buf[:bytesInBuffer], &singleRequest) | ||||
| 		if err == nil { | ||||
| 			requests := make([]*shared.Request, 1) | ||||
| 			requests[0] = &singleRequest | ||||
| 			return requests, false, nil | ||||
| 		} | ||||
| 
 | ||||
| 		requests = make([]*shared.Request, 0) | ||||
| 		err = json.Unmarshal(buf[:bytesInBuffer], &requests) | ||||
| 		if err == nil { | ||||
| 			return requests, true, nil | ||||
| 	var incoming json.RawMessage | ||||
| 	err = self.d.Decode(&incoming) | ||||
| 	if err == nil { | ||||
| 		isBatch = incoming[0] == '[' | ||||
| 		if isBatch { | ||||
| 			requests = make([]*shared.Request, 0) | ||||
| 			err = json.Unmarshal(incoming, &requests) | ||||
| 		} else { | ||||
| 			requests = make([]*shared.Request, 1) | ||||
| 			var singleRequest shared.Request | ||||
| 			if err = json.Unmarshal(incoming, &singleRequest); err == nil { | ||||
| 				requests[0] = &singleRequest | ||||
| 			} | ||||
| 		} | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	self.c.Close() // timeout
 | ||||
| 	return nil, false, fmt.Errorf("Unable to read response") | ||||
| 	self.c.Close() | ||||
| 	return nil, false, err | ||||
| } | ||||
| 
 | ||||
| func (self *JsonCodec) ReadResponse() (interface{}, error) { | ||||
| @ -81,15 +73,15 @@ func (self *JsonCodec) ReadResponse() (interface{}, error) { | ||||
| 		} | ||||
| 		bytesInBuffer += n | ||||
| 
 | ||||
| 		var failure shared.ErrorResponse | ||||
| 		if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil { | ||||
| 			return failure, fmt.Errorf(failure.Error.Message) | ||||
| 		} | ||||
| 
 | ||||
| 		var success shared.SuccessResponse | ||||
| 		if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil { | ||||
| 			return success, nil | ||||
| 		} | ||||
| 
 | ||||
| 		var failure shared.ErrorResponse | ||||
| 		if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil { | ||||
| 			return failure, nil | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	self.c.Close() | ||||
|  | ||||
							
								
								
									
										141
									
								
								rpc/codec/json_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										141
									
								
								rpc/codec/json_test.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,141 @@ | ||||
| package codec | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"io" | ||||
| 	"net" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| type jsonTestConn struct { | ||||
| 	buffer *bytes.Buffer | ||||
| } | ||||
| 
 | ||||
| func newJsonTestConn(data []byte) *jsonTestConn { | ||||
| 	return &jsonTestConn{ | ||||
| 		buffer: bytes.NewBuffer(data), | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) Read(p []byte) (n int, err error) { | ||||
| 	return self.buffer.Read(p) | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) Write(p []byte) (n int, err error) { | ||||
| 	return self.buffer.Write(p) | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) Close() error { | ||||
| 	// not implemented
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) LocalAddr() net.Addr { | ||||
| 	// not implemented
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) RemoteAddr() net.Addr { | ||||
| 	// not implemented
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) SetDeadline(t time.Time) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) SetReadDeadline(t time.Time) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (self *jsonTestConn) SetWriteDeadline(t time.Time) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func TestJsonDecoderWithValidRequest(t *testing.T) { | ||||
| 	reqdata := []byte(`{"jsonrpc":"2.0","method":"modules","params":[],"id":64}`) | ||||
| 	decoder := newJsonTestConn(reqdata) | ||||
| 
 | ||||
| 	jsonDecoder := NewJsonCoder(decoder) | ||||
| 	requests, batch, err := jsonDecoder.ReadRequest() | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Read valid request failed - %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(requests) != 1 { | ||||
| 		t.Errorf("Expected to get a single request but got %d", len(requests)) | ||||
| 	} | ||||
| 
 | ||||
| 	if batch { | ||||
| 		t.Errorf("Got batch indication while expecting single request") | ||||
| 	} | ||||
| 
 | ||||
| 	if requests[0].Id != float64(64) { | ||||
| 		t.Errorf("Expected req.Id == 64 but got %v", requests[0].Id) | ||||
| 	} | ||||
| 
 | ||||
| 	if requests[0].Method != "modules" { | ||||
| 		t.Errorf("Expected req.Method == 'modules' got '%s'", requests[0].Method) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestJsonDecoderWithValidBatchRequest(t *testing.T) { | ||||
| 	reqdata := []byte(`[{"jsonrpc":"2.0","method":"modules","params":[],"id":64}, | ||||
| 		{"jsonrpc":"2.0","method":"modules","params":[],"id":64}]`) | ||||
| 	decoder := newJsonTestConn(reqdata) | ||||
| 
 | ||||
| 	jsonDecoder := NewJsonCoder(decoder) | ||||
| 	requests, batch, err := jsonDecoder.ReadRequest() | ||||
| 
 | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Read valid batch request failed - %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if len(requests) != 2 { | ||||
| 		t.Errorf("Expected to get two requests but got %d", len(requests)) | ||||
| 	} | ||||
| 
 | ||||
| 	if !batch { | ||||
| 		t.Errorf("Got no batch indication while expecting batch request") | ||||
| 	} | ||||
| 
 | ||||
| 	for i := 0; i < len(requests); i++ { | ||||
| 		if requests[i].Id != float64(64) { | ||||
| 			t.Errorf("Expected req.Id == 64 but got %v", requests[i].Id) | ||||
| 		} | ||||
| 
 | ||||
| 		if requests[i].Method != "modules" { | ||||
| 			t.Errorf("Expected req.Method == 'modules' got '%s'", requests[i].Method) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func TestJsonDecoderWithInvalidIncompleteMessage(t *testing.T) { | ||||
| 	reqdata := []byte(`{"jsonrpc":"2.0","method":"modules","pa`) | ||||
| 	decoder := newJsonTestConn(reqdata) | ||||
| 
 | ||||
| 	jsonDecoder := NewJsonCoder(decoder) | ||||
| 	requests, batch, err := jsonDecoder.ReadRequest() | ||||
| 
 | ||||
| 	if err != io.ErrUnexpectedEOF { | ||||
| 		t.Errorf("Expected to read an incomplete request err but got %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// remaining message
 | ||||
| 	decoder.Write([]byte(`rams":[],"id:64"}`)) | ||||
| 	requests, batch, err = jsonDecoder.ReadRequest() | ||||
| 
 | ||||
| 	if err == nil { | ||||
| 		t.Errorf("Expected an error but got nil") | ||||
| 	} | ||||
| 
 | ||||
| 	if len(requests) != 0 { | ||||
| 		t.Errorf("Expected to get no requests but got %d", len(requests)) | ||||
| 	} | ||||
| 
 | ||||
| 	if batch { | ||||
| 		t.Errorf("Got batch indication while expecting non batch") | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										18
									
								
								rpc/jeth.go
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								rpc/jeth.go
									
									
									
									
									
								
							| @ -3,6 +3,8 @@ package rpc | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 
 | ||||
| 	"fmt" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/jsre" | ||||
| 	"github.com/ethereum/go-ethereum/rpc/comms" | ||||
| 	"github.com/ethereum/go-ethereum/rpc/shared" | ||||
| @ -20,14 +22,13 @@ func NewJeth(ethApi shared.EthereumApi, re *jsre.JSRE, client comms.EthereumClie | ||||
| } | ||||
| 
 | ||||
| func (self *Jeth) err(call otto.FunctionCall, code int, msg string, id interface{}) (response otto.Value) { | ||||
| 	rpcerr := &shared.ErrorObject{code, msg} | ||||
| 	call.Otto.Set("ret_jsonrpc", shared.JsonRpcVersion) | ||||
| 	call.Otto.Set("ret_id", id) | ||||
| 	call.Otto.Set("ret_error", rpcerr) | ||||
| 	response, _ = call.Otto.Run(` | ||||
| 		ret_response = { jsonrpc: ret_jsonrpc, id: ret_id, error: ret_error }; | ||||
| 	`) | ||||
| 	return | ||||
| 	errObj := fmt.Sprintf("{\"message\": \"%s\", \"code\": %d}", msg, code) | ||||
| 	retResponse := fmt.Sprintf("ret_response = JSON.parse('{\"jsonrpc\": \"%s\", \"id\": %v, \"error\": %s}');", shared.JsonRpcVersion, id, errObj) | ||||
| 
 | ||||
| 	call.Otto.Run("ret_error = " + errObj) | ||||
| 	res, _ := call.Otto.Run(retResponse) | ||||
| 
 | ||||
| 	return res | ||||
| } | ||||
| 
 | ||||
| func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { | ||||
| @ -56,6 +57,7 @@ func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) { | ||||
| 			return self.err(call, -32603, err.Error(), req.Id) | ||||
| 		} | ||||
| 		respif, err = self.client.Recv() | ||||
| 
 | ||||
| 		if err != nil { | ||||
| 			return self.err(call, -32603, err.Error(), req.Id) | ||||
| 		} | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user