added pipelining support
This commit is contained in:
		
							parent
							
								
									130f3b270a
								
							
						
					
					
						commit
						41de1cb723
					
				| @ -18,19 +18,19 @@ const ( | |||||||
| // Json serialization support
 | // Json serialization support
 | ||||||
| type JsonCodec struct { | type JsonCodec struct { | ||||||
| 	c net.Conn | 	c net.Conn | ||||||
|  | 	d *json.Decoder | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Create new JSON coder instance
 | // Create new JSON coder instance
 | ||||||
| func NewJsonCoder(conn net.Conn) ApiCoder { | func NewJsonCoder(conn net.Conn) ApiCoder { | ||||||
| 	return &JsonCodec{ | 	return &JsonCodec{ | ||||||
| 		c: conn, | 		c: conn, | ||||||
|  | 		d: json.NewDecoder(conn), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Serialize obj to JSON and write it to conn
 | // Serialize obj to JSON and write it to conn
 | ||||||
| func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { | func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) { | ||||||
| 	bytesInBuffer := 0 |  | ||||||
| 	buf := make([]byte, MAX_REQUEST_SIZE) |  | ||||||
| 
 | 
 | ||||||
| 	deadline := time.Now().Add(READ_TIMEOUT * time.Second) | 	deadline := time.Now().Add(READ_TIMEOUT * time.Second) | ||||||
| 	if err := self.c.SetDeadline(deadline); err != nil { | 	if err := self.c.SetDeadline(deadline); err != nil { | ||||||
| @ -38,31 +38,36 @@ func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	for { | 	for { | ||||||
| 		n, err := self.c.Read(buf[bytesInBuffer:]) | 		var err error | ||||||
| 		if err != nil { |  | ||||||
| 			self.c.Close() |  | ||||||
| 			return nil, false, err |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		bytesInBuffer += n |  | ||||||
| 
 |  | ||||||
| 		singleRequest := shared.Request{} | 		singleRequest := shared.Request{} | ||||||
| 		err = json.Unmarshal(buf[:bytesInBuffer], &singleRequest) | 		if err = self.d.Decode(&singleRequest); err == nil { | ||||||
| 		if err == nil { |  | ||||||
| 			requests := make([]*shared.Request, 1) | 			requests := make([]*shared.Request, 1) | ||||||
| 			requests[0] = &singleRequest | 			requests[0] = &singleRequest | ||||||
| 			return requests, false, nil | 			return requests, false, nil | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | 		fmt.Printf("err %T %v\n", err) | ||||||
|  | 
 | ||||||
|  | 		if opErr, ok := err.(*net.OpError); ok { | ||||||
|  | 			if opErr.Timeout() { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
| 		requests = make([]*shared.Request, 0) | 		requests = make([]*shared.Request, 0) | ||||||
| 		err = json.Unmarshal(buf[:bytesInBuffer], &requests) | 		if err = self.d.Decode(&requests); err == nil { | ||||||
| 		if err == nil { |  | ||||||
| 			return requests, true, nil | 			return requests, true, nil | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 		if opErr, ok := err.(*net.OpError); ok { | ||||||
|  | 			if opErr.Timeout() { | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	self.c.Close() // timeout
 | 	self.c.Close() // timeout
 | ||||||
| 	return nil, false, fmt.Errorf("Unable to read response") | 	return nil, false, fmt.Errorf("Timeout reading request") | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (self *JsonCodec) ReadResponse() (interface{}, error) { | func (self *JsonCodec) ReadResponse() (interface{}, error) { | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user