Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
Showing only changes of commit ca9bce9a45 - Show all commits

View File

@ -59,6 +59,12 @@ const (
maxClientSubscriptionBuffer = 20000 maxClientSubscriptionBuffer = 20000
) )
const (
httpScheme = "http"
wsScheme = "ws"
ipcScheme = "ipc"
)
// BatchElem is an element in a batch request. // BatchElem is an element in a batch request.
type BatchElem struct { type BatchElem struct {
Method string Method string
@ -75,7 +81,7 @@ type BatchElem struct {
// Client represents a connection to an RPC server. // Client represents a connection to an RPC server.
type Client struct { type Client struct {
idgen func() ID // for subscriptions idgen func() ID // for subscriptions
isHTTP bool scheme string // connection type: http, ws or ipc
services *serviceRegistry services *serviceRegistry
idCounter uint32 idCounter uint32
@ -111,6 +117,10 @@ type clientConn struct {
func (c *Client) newClientConn(conn ServerCodec) *clientConn { func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.WithValue(context.Background(), clientContextKey{}, c) ctx := context.WithValue(context.Background(), clientContextKey{}, c)
// Http connections have already set the scheme
if !c.isHTTP() && c.scheme != "" {
ctx = context.WithValue(ctx, "scheme", c.scheme)
}
handler := newHandler(ctx, conn, c.idgen, c.services) handler := newHandler(ctx, conn, c.idgen, c.services)
return &clientConn{conn, handler} return &clientConn{conn, handler}
} }
@ -136,7 +146,7 @@ func (op *requestOp) wait(ctx context.Context, c *Client) (*jsonrpcMessage, erro
select { select {
case <-ctx.Done(): case <-ctx.Done():
// Send the timeout to dispatch so it can remove the request IDs. // Send the timeout to dispatch so it can remove the request IDs.
if !c.isHTTP { if !c.isHTTP() {
select { select {
case c.reqTimeout <- op: case c.reqTimeout <- op:
case <-c.closing: case <-c.closing:
@ -203,10 +213,18 @@ func newClient(initctx context.Context, connect reconnectFunc) (*Client, error)
} }
func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client {
_, isHTTP := conn.(*httpConn) scheme := ""
switch conn.(type) {
case *httpConn:
scheme = httpScheme
case *websocketCodec:
scheme = wsScheme
case *jsonCodec:
scheme = ipcScheme
}
c := &Client{ c := &Client{
idgen: idgen, idgen: idgen,
isHTTP: isHTTP, scheme: scheme,
services: services, services: services,
writeConn: conn, writeConn: conn,
close: make(chan struct{}), close: make(chan struct{}),
@ -219,7 +237,7 @@ func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *C
reqSent: make(chan error, 1), reqSent: make(chan error, 1),
reqTimeout: make(chan *requestOp), reqTimeout: make(chan *requestOp),
} }
if !isHTTP { if !c.isHTTP() {
go c.dispatch(conn) go c.dispatch(conn)
} }
return c return c
@ -250,7 +268,7 @@ func (c *Client) SupportedModules() (map[string]string, error) {
// Close closes the client, aborting any in-flight requests. // Close closes the client, aborting any in-flight requests.
func (c *Client) Close() { func (c *Client) Close() {
if c.isHTTP { if c.isHTTP() {
return return
} }
select { select {
@ -264,7 +282,7 @@ func (c *Client) Close() {
// This method only works for clients using HTTP, it doesn't have // This method only works for clients using HTTP, it doesn't have
// any effect for clients using another transport. // any effect for clients using another transport.
func (c *Client) SetHeader(key, value string) { func (c *Client) SetHeader(key, value string) {
if !c.isHTTP { if !c.isHTTP() {
return return
} }
conn := c.writeConn.(*httpConn) conn := c.writeConn.(*httpConn)
@ -298,7 +316,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
} }
op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)} op := &requestOp{ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage, 1)}
if c.isHTTP { if c.isHTTP() {
err = c.sendHTTP(ctx, op, msg) err = c.sendHTTP(ctx, op, msg)
} else { } else {
err = c.send(ctx, op, msg) err = c.send(ctx, op, msg)
@ -357,7 +375,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
} }
var err error var err error
if c.isHTTP { if c.isHTTP() {
err = c.sendBatchHTTP(ctx, op, msgs) err = c.sendBatchHTTP(ctx, op, msgs)
} else { } else {
err = c.send(ctx, op, msgs) err = c.send(ctx, op, msgs)
@ -402,7 +420,7 @@ func (c *Client) Notify(ctx context.Context, method string, args ...interface{})
} }
msg.ID = nil msg.ID = nil
if c.isHTTP { if c.isHTTP() {
return c.sendHTTP(ctx, op, msg) return c.sendHTTP(ctx, op, msg)
} }
return c.send(ctx, op, msg) return c.send(ctx, op, msg)
@ -440,7 +458,7 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, channel interf
if chanVal.IsNil() { if chanVal.IsNil() {
panic("channel given to Subscribe must not be nil") panic("channel given to Subscribe must not be nil")
} }
if c.isHTTP { if c.isHTTP() {
return nil, ErrNotificationsUnsupported return nil, ErrNotificationsUnsupported
} }
@ -642,3 +660,7 @@ func (c *Client) read(codec ServerCodec) {
c.readOp <- readOp{msgs, batch} c.readOp <- readOp{msgs, batch}
} }
} }
func (c *Client) isHTTP() bool {
return c.scheme == httpScheme
}