Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
6 changed files with 105 additions and 112 deletions
Showing only changes of commit a251bca67c - Show all commits

View File

@ -355,7 +355,7 @@ func (bn *bystander) loop() {
wasAdded = true
bn.notifyAdded()
case *v5wire.Findnode:
bn.conn.write(bn.l, &v5wire.Nodes{ReqID: p.ReqID, Total: 1}, nil)
bn.conn.write(bn.l, &v5wire.Nodes{ReqID: p.ReqID, RespCount: 1}, nil)
wasAdded = true
bn.notifyAdded()
case *v5wire.TalkRequest:

View File

@ -44,6 +44,8 @@ func (p *readError) Unwrap() error { return p.err }
func (p *readError) RequestID() []byte { return nil }
func (p *readError) SetRequestID([]byte) {}
func (p *readError) AppendLogInfo(ctx []interface{}) []interface{} { return ctx }
// readErrorf creates a readError with the given text.
func readErrorf(format string, args ...interface{}) *readError {
return &readError{fmt.Errorf(format, args...)}
@ -171,16 +173,16 @@ func (tc *conn) findnode(c net.PacketConn, dists []uint) ([]*enode.Node, error)
// Check total count. It should be greater than one
// and needs to be the same across all responses.
if first {
if resp.Total == 0 || resp.Total > 6 {
return nil, fmt.Errorf("invalid NODES response 'total' %d (not in (0,7))", resp.Total)
if resp.RespCount == 0 || resp.RespCount > 6 {
return nil, fmt.Errorf("invalid NODES response count %d (not in (0,7))", resp.RespCount)
}
total = resp.Total
total = resp.RespCount
n = int(total) - 1
first = false
} else {
n--
if resp.Total != total {
return nil, fmt.Errorf("invalid NODES response 'total' %d (!= %d)", resp.Total, total)
if resp.RespCount != total {
return nil, fmt.Errorf("invalid NODES response count %d (!= %d)", resp.RespCount, total)
}
}
// Check nodes.

View File

@ -70,6 +70,9 @@ type UDPv5 struct {
clock mclock.Clock
validSchemes enr.IdentityScheme
// misc buffers used during message handling
logcontext []interface{}
// talkreq handler registry
trlock sync.Mutex
trhandlers map[string]TalkRequestHandler
@ -158,6 +161,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
activeCallByNode: make(map[enode.ID]*callV5),
activeCallByAuth: make(map[v5wire.Nonce]*callV5),
callQueue: make(map[enode.ID][]*callV5),
// shutdown
closeCtx: closeCtx,
cancelCloseCtx: cancelCloseCtx,
@ -385,7 +389,7 @@ func (t *UDPv5) waitForNodes(c *callV5, distances []uint) ([]*enode.Node, error)
nodes = append(nodes, node)
}
if total == -1 {
total = min(int(response.Total), totalNodesResponseLimit)
total = min(int(response.RespCount), totalNodesResponseLimit)
}
if received++; received == total {
return nodes, nil
@ -601,13 +605,18 @@ func (t *UDPv5) sendResponse(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.P
// send sends a packet to the given node.
func (t *UDPv5) send(toID enode.ID, toAddr *net.UDPAddr, packet v5wire.Packet, c *v5wire.Whoareyou) (v5wire.Nonce, error) {
addr := toAddr.String()
t.logcontext = append(t.logcontext[:0], "id", toID, "addr", addr)
t.logcontext = packet.AppendLogInfo(t.logcontext)
enc, nonce, err := t.codec.Encode(toID, addr, packet, c)
if err != nil {
t.log.Warn(">> "+packet.Name(), "id", toID, "addr", addr, "err", err)
t.logcontext = append(t.logcontext, "err", err)
t.log.Warn(">> "+packet.Name(), t.logcontext...)
return nonce, err
}
_, err = t.conn.WriteToUDP(enc, toAddr)
t.log.Trace(">> "+packet.Name(), "id", toID, "addr", addr)
t.log.Trace(">> "+packet.Name(), t.logcontext...)
return nonce, err
}
@ -657,7 +666,9 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr *net.UDPAddr) error {
}
if packet.Kind() != v5wire.WhoareyouPacket {
// WHOAREYOU logged separately to report errors.
t.log.Trace("<< "+packet.Name(), "id", fromID, "addr", addr)
t.logcontext = append(t.logcontext[:0], "id", fromID, "addr", addr)
t.logcontext = packet.AppendLogInfo(t.logcontext)
t.log.Trace("<< "+packet.Name(), t.logcontext...)
}
t.handle(packet, fromID, fromAddr)
return nil
@ -712,7 +723,7 @@ func (t *UDPv5) handle(p v5wire.Packet, fromID enode.ID, fromAddr *net.UDPAddr)
case *v5wire.Nodes:
t.handleCallResponse(fromID, fromAddr, p)
case *v5wire.TalkRequest:
t.handleTalkRequest(p, fromID, fromAddr)
t.handleTalkRequest(fromID, fromAddr, p)
case *v5wire.TalkResponse:
t.handleCallResponse(fromID, fromAddr, p)
}
@ -827,7 +838,7 @@ func (t *UDPv5) collectTableNodes(rip net.IP, distances []uint, limit int) []*en
// packNodes creates NODES response packets for the given node list.
func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
if len(nodes) == 0 {
return []*v5wire.Nodes{{ReqID: reqid, Total: 1}}
return []*v5wire.Nodes{{ReqID: reqid, RespCount: 1}}
}
// This limit represents the available space for nodes in output packets. Maximum
@ -851,13 +862,13 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
resp = append(resp, p)
}
for _, msg := range resp {
msg.Total = uint8(len(resp))
msg.RespCount = uint8(len(resp))
}
return resp
}
// handleTalkRequest runs the talk request handler of the requested protocol.
func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAddr *net.UDPAddr) {
func (t *UDPv5) handleTalkRequest(fromID enode.ID, fromAddr *net.UDPAddr, p *v5wire.TalkRequest) {
t.trlock.Lock()
handler := t.trhandlers[p.Protocol]
t.trlock.Unlock()

View File

@ -208,8 +208,8 @@ func (test *udpV5Test) expectNodes(wantReqID []byte, wantTotal uint8, wantNodes
if !bytes.Equal(p.ReqID, wantReqID) {
test.t.Fatalf("wrong request ID %v in response, want %v", p.ReqID, wantReqID)
}
if p.Total != wantTotal {
test.t.Fatalf("wrong total response count %d, want %d", p.Total, wantTotal)
if p.RespCount != wantTotal {
test.t.Fatalf("wrong total response count %d, want %d", p.RespCount, wantTotal)
}
for _, record := range p.Nodes {
n, _ := enode.New(enode.ValidSchemesForTesting, record)
@ -301,14 +301,14 @@ func TestUDPv5_findnodeCall(t *testing.T) {
t.Fatalf("wrong distances in request: %v", p.Distances)
}
test.packetIn(&v5wire.Nodes{
ReqID: p.ReqID,
Total: 2,
Nodes: nodesToRecords(nodes[:4]),
ReqID: p.ReqID,
RespCount: 2,
Nodes: nodesToRecords(nodes[:4]),
})
test.packetIn(&v5wire.Nodes{
ReqID: p.ReqID,
Total: 2,
Nodes: nodesToRecords(nodes[4:]),
ReqID: p.ReqID,
RespCount: 2,
Nodes: nodesToRecords(nodes[4:]),
})
})
@ -409,16 +409,16 @@ func TestUDPv5_callTimeoutReset(t *testing.T) {
test.waitPacketOut(func(p *v5wire.Findnode, addr *net.UDPAddr, _ v5wire.Nonce) {
time.Sleep(respTimeout - 50*time.Millisecond)
test.packetIn(&v5wire.Nodes{
ReqID: p.ReqID,
Total: 2,
Nodes: nodesToRecords(nodes[:4]),
ReqID: p.ReqID,
RespCount: 2,
Nodes: nodesToRecords(nodes[:4]),
})
time.Sleep(respTimeout - 50*time.Millisecond)
test.packetIn(&v5wire.Nodes{
ReqID: p.ReqID,
Total: 2,
Nodes: nodesToRecords(nodes[4:]),
ReqID: p.ReqID,
RespCount: 2,
Nodes: nodesToRecords(nodes[4:]),
})
})
if err := <-done; err != nil {

View File

@ -92,7 +92,7 @@ func TestHandshake(t *testing.T) {
}
// A <- B NODES
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1})
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1})
net.nodeA.expectDecode(t, NodesMsg, nodes)
}
@ -150,7 +150,7 @@ func TestHandshake_norecord(t *testing.T) {
net.nodeB.expectDecode(t, FindnodeMsg, findnode)
// A <- B NODES
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1})
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1})
net.nodeA.expectDecode(t, NodesMsg, nodes)
}
@ -190,7 +190,7 @@ func TestHandshake_rekey(t *testing.T) {
net.nodeB.expectDecode(t, FindnodeMsg, findnode)
// A <- B NODES
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1})
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1})
net.nodeA.expectDecode(t, NodesMsg, nodes)
}
@ -225,7 +225,7 @@ func TestHandshake_rekey2(t *testing.T) {
net.nodeB.expectDecode(t, FindnodeMsg, findnode)
// A <- B NODES
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{Total: 1})
nodes, _ := net.nodeB.encode(t, net.nodeA, &Nodes{RespCount: 1})
net.nodeA.expectDecode(t, NodesMsg, nodes)
}

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
@ -32,6 +33,10 @@ type Packet interface {
Kind() byte // Kind returns the message type.
RequestID() []byte // Returns the request ID.
SetRequestID([]byte) // Sets the request ID.
// AppendLogInfo returns its argument 'ctx' with additional fields
// appended for logging purposes.
AppendLogInfo(ctx []interface{}) []interface{}
}
// Message types.
@ -44,9 +49,6 @@ const (
TalkResponseMsg
RequestTicketMsg
TicketMsg
RegtopicMsg
RegconfirmationMsg
TopicQueryMsg
UnknownPacket = byte(255) // any non-decryptable packet
WhoareyouPacket = byte(254) // the WHOAREYOU packet
@ -59,7 +61,7 @@ type (
Nonce Nonce
}
// Whoareyou contains the handshake challenge.
// WHOAREYOU contains the handshake challenge.
Whoareyou struct {
ChallengeData []byte // Encoded challenge
Nonce Nonce // Nonce of request packet
@ -73,13 +75,13 @@ type (
sent mclock.AbsTime // for handshake GC.
}
// Ping is sent during liveness checks.
// PING is sent during liveness checks.
Ping struct {
ReqID []byte
ENRSeq uint64
}
// Pong is the reply to Ping.
// PONG is the reply to PING.
Pong struct {
ReqID []byte
ENRSeq uint64
@ -87,62 +89,35 @@ type (
ToPort uint16 // packet, which provides a way to discover the external address (after NAT).
}
// Findnode is a query for nodes in the given bucket.
// FINDNODE is a query for nodes in the given bucket.
Findnode struct {
ReqID []byte
Distances []uint
// OpID is for debugging purposes and is not part of the packet encoding.
// It identifies the 'operation' on behalf of which the request was sent.
OpID uint64 `rlp:"-"`
}
// Nodes is the reply to Findnode and Topicquery.
// NODES is a response to FINDNODE.
Nodes struct {
ReqID []byte
Total uint8
Nodes []*enr.Record
ReqID []byte
RespCount uint8 // total number of responses to the request
Nodes []*enr.Record
}
// TalkRequest is an application-level request.
// TALKREQ is an application-level request.
TalkRequest struct {
ReqID []byte
Protocol string
Message []byte
}
// TalkResponse is the reply to TalkRequest.
// TALKRESP is the reply to TALKREQ.
TalkResponse struct {
ReqID []byte
Message []byte
}
// RequestTicket requests a ticket for a topic queue.
RequestTicket struct {
ReqID []byte
Topic []byte
}
// Ticket is the response to RequestTicket.
Ticket struct {
ReqID []byte
Ticket []byte
}
// Regtopic registers the sender in a topic queue using a ticket.
Regtopic struct {
ReqID []byte
Ticket []byte
ENR *enr.Record
}
// Regconfirmation is the reply to Regtopic.
Regconfirmation struct {
ReqID []byte
Registered bool
}
// TopicQuery asks for nodes with the given topic.
TopicQuery struct {
ReqID []byte
Topic []byte
}
)
// DecodeMessage decodes the message body of a packet.
@ -161,16 +136,6 @@ func DecodeMessage(ptype byte, body []byte) (Packet, error) {
dec = new(TalkRequest)
case TalkResponseMsg:
dec = new(TalkResponse)
case RequestTicketMsg:
dec = new(RequestTicket)
case TicketMsg:
dec = new(Ticket)
case RegtopicMsg:
dec = new(Regtopic)
case RegconfirmationMsg:
dec = new(Regconfirmation)
case TopicQueryMsg:
dec = new(TopicQuery)
default:
return nil, fmt.Errorf("unknown packet type %d", ptype)
}
@ -188,62 +153,77 @@ func (*Whoareyou) Kind() byte { return WhoareyouPacket }
func (*Whoareyou) RequestID() []byte { return nil }
func (*Whoareyou) SetRequestID([]byte) {}
func (*Whoareyou) AppendLogInfo(ctx []interface{}) []interface{} {
return ctx
}
func (*Unknown) Name() string { return "UNKNOWN/v5" }
func (*Unknown) Kind() byte { return UnknownPacket }
func (*Unknown) RequestID() []byte { return nil }
func (*Unknown) SetRequestID([]byte) {}
func (*Unknown) AppendLogInfo(ctx []interface{}) []interface{} {
return ctx
}
func (*Ping) Name() string { return "PING/v5" }
func (*Ping) Kind() byte { return PingMsg }
func (p *Ping) RequestID() []byte { return p.ReqID }
func (p *Ping) SetRequestID(id []byte) { p.ReqID = id }
func (p *Ping) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq)
}
func (*Pong) Name() string { return "PONG/v5" }
func (*Pong) Kind() byte { return PongMsg }
func (p *Pong) RequestID() []byte { return p.ReqID }
func (p *Pong) SetRequestID(id []byte) { p.ReqID = id }
func (*Findnode) Name() string { return "FINDNODE/v5" }
func (*Findnode) Kind() byte { return FindnodeMsg }
func (p *Pong) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "req", hexutil.Bytes(p.ReqID), "enrseq", p.ENRSeq)
}
func (p *Findnode) Name() string { return "FINDNODE/v5" }
func (p *Findnode) Kind() byte { return FindnodeMsg }
func (p *Findnode) RequestID() []byte { return p.ReqID }
func (p *Findnode) SetRequestID(id []byte) { p.ReqID = id }
func (p *Findnode) AppendLogInfo(ctx []interface{}) []interface{} {
ctx = append(ctx, "req", hexutil.Bytes(p.ReqID))
if p.OpID != 0 {
ctx = append(ctx, "opid", p.OpID)
}
return ctx
}
func (*Nodes) Name() string { return "NODES/v5" }
func (*Nodes) Kind() byte { return NodesMsg }
func (p *Nodes) RequestID() []byte { return p.ReqID }
func (p *Nodes) SetRequestID(id []byte) { p.ReqID = id }
func (p *Nodes) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx,
"req", hexutil.Bytes(p.ReqID),
"tot", p.RespCount,
"n", len(p.Nodes),
)
}
func (*TalkRequest) Name() string { return "TALKREQ/v5" }
func (*TalkRequest) Kind() byte { return TalkRequestMsg }
func (p *TalkRequest) RequestID() []byte { return p.ReqID }
func (p *TalkRequest) SetRequestID(id []byte) { p.ReqID = id }
func (p *TalkRequest) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "proto", p.Protocol, "reqid", hexutil.Bytes(p.ReqID), "len", len(p.Message))
}
func (*TalkResponse) Name() string { return "TALKRESP/v5" }
func (*TalkResponse) Kind() byte { return TalkResponseMsg }
func (p *TalkResponse) RequestID() []byte { return p.ReqID }
func (p *TalkResponse) SetRequestID(id []byte) { p.ReqID = id }
func (*RequestTicket) Name() string { return "REQTICKET/v5" }
func (*RequestTicket) Kind() byte { return RequestTicketMsg }
func (p *RequestTicket) RequestID() []byte { return p.ReqID }
func (p *RequestTicket) SetRequestID(id []byte) { p.ReqID = id }
func (*Regtopic) Name() string { return "REGTOPIC/v5" }
func (*Regtopic) Kind() byte { return RegtopicMsg }
func (p *Regtopic) RequestID() []byte { return p.ReqID }
func (p *Regtopic) SetRequestID(id []byte) { p.ReqID = id }
func (*Ticket) Name() string { return "TICKET/v5" }
func (*Ticket) Kind() byte { return TicketMsg }
func (p *Ticket) RequestID() []byte { return p.ReqID }
func (p *Ticket) SetRequestID(id []byte) { p.ReqID = id }
func (*Regconfirmation) Name() string { return "REGCONFIRMATION/v5" }
func (*Regconfirmation) Kind() byte { return RegconfirmationMsg }
func (p *Regconfirmation) RequestID() []byte { return p.ReqID }
func (p *Regconfirmation) SetRequestID(id []byte) { p.ReqID = id }
func (*TopicQuery) Name() string { return "TOPICQUERY/v5" }
func (*TopicQuery) Kind() byte { return TopicQueryMsg }
func (p *TopicQuery) RequestID() []byte { return p.ReqID }
func (p *TopicQuery) SetRequestID(id []byte) { p.ReqID = id }
func (p *TalkResponse) AppendLogInfo(ctx []interface{}) []interface{} {
return append(ctx, "req", p.ReqID, "len", len(p.Message))
}