Patch for concurrent iterator & others (onto v1.11.6) #386
@ -63,8 +63,9 @@ func (s *Suite) dial() (*Conn, error) {
|
|||||||
conn.caps = []p2p.Cap{
|
conn.caps = []p2p.Cap{
|
||||||
{Name: "eth", Version: 66},
|
{Name: "eth", Version: 66},
|
||||||
{Name: "eth", Version: 67},
|
{Name: "eth", Version: 67},
|
||||||
|
{Name: "eth", Version: 68},
|
||||||
}
|
}
|
||||||
conn.ourHighestProtoVersion = 67
|
conn.ourHighestProtoVersion = 68
|
||||||
return &conn, nil
|
return &conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -359,6 +360,8 @@ func (s *Suite) waitAnnounce(conn *Conn, blockAnnouncement *NewBlock) error {
|
|||||||
return nil
|
return nil
|
||||||
|
|
||||||
// ignore tx announcements from previous tests
|
// ignore tx announcements from previous tests
|
||||||
|
case *NewPooledTransactionHashes66:
|
||||||
|
continue
|
||||||
case *NewPooledTransactionHashes:
|
case *NewPooledTransactionHashes:
|
||||||
continue
|
continue
|
||||||
case *Transactions:
|
case *Transactions:
|
||||||
|
@ -510,17 +510,18 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// generate 50 txs
|
// generate 50 txs
|
||||||
hashMap, _, err := generateTxs(s, 50)
|
_, txs, err := generateTxs(s, 50)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to generate transactions: %v", err)
|
t.Fatalf("failed to generate transactions: %v", err)
|
||||||
}
|
}
|
||||||
|
hashes := make([]common.Hash, len(txs))
|
||||||
// create new pooled tx hashes announcement
|
types := make([]byte, len(txs))
|
||||||
hashes := make([]common.Hash, 0)
|
sizes := make([]uint32, len(txs))
|
||||||
for _, hash := range hashMap {
|
for i, tx := range txs {
|
||||||
hashes = append(hashes, hash)
|
hashes[i] = tx.Hash()
|
||||||
|
types[i] = tx.Type()
|
||||||
|
sizes[i] = uint32(tx.Size())
|
||||||
}
|
}
|
||||||
announce := NewPooledTransactionHashes(hashes)
|
|
||||||
|
|
||||||
// send announcement
|
// send announcement
|
||||||
conn, err := s.dial()
|
conn, err := s.dial()
|
||||||
@ -531,7 +532,13 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
|
|||||||
if err = conn.peer(s.chain, nil); err != nil {
|
if err = conn.peer(s.chain, nil); err != nil {
|
||||||
t.Fatalf("peering failed: %v", err)
|
t.Fatalf("peering failed: %v", err)
|
||||||
}
|
}
|
||||||
if err = conn.Write(announce); err != nil {
|
|
||||||
|
var ann Message = NewPooledTransactionHashes{Types: types, Sizes: sizes, Hashes: hashes}
|
||||||
|
if conn.negotiatedProtoVersion < eth.ETH68 {
|
||||||
|
ann = NewPooledTransactionHashes66(hashes)
|
||||||
|
}
|
||||||
|
err = conn.Write(ann)
|
||||||
|
if err != nil {
|
||||||
t.Fatalf("failed to write to connection: %v", err)
|
t.Fatalf("failed to write to connection: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -546,6 +553,8 @@ func (s *Suite) TestNewPooledTxs(t *utesting.T) {
|
|||||||
return
|
return
|
||||||
|
|
||||||
// ignore propagated txs from previous tests
|
// ignore propagated txs from previous tests
|
||||||
|
case *NewPooledTransactionHashes66:
|
||||||
|
continue
|
||||||
case *NewPooledTransactionHashes:
|
case *NewPooledTransactionHashes:
|
||||||
continue
|
continue
|
||||||
case *Transactions:
|
case *Transactions:
|
||||||
|
@ -95,7 +95,7 @@ func sendSuccessfulTx(s *Suite, tx *types.Transaction, prevTx *types.Transaction
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("missing transaction: got %v missing %v", recTxs, tx.Hash())
|
return fmt.Errorf("missing transaction: got %v missing %v", recTxs, tx.Hash())
|
||||||
case *NewPooledTransactionHashes:
|
case *NewPooledTransactionHashes66:
|
||||||
txHashes := *msg
|
txHashes := *msg
|
||||||
// if you receive an old tx propagation, read from connection again
|
// if you receive an old tx propagation, read from connection again
|
||||||
if len(txHashes) == 1 && prevTx != nil {
|
if len(txHashes) == 1 && prevTx != nil {
|
||||||
@ -110,6 +110,34 @@ func sendSuccessfulTx(s *Suite, tx *types.Transaction, prevTx *types.Transaction
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("missing transaction announcement: got %v missing %v", txHashes, tx.Hash())
|
return fmt.Errorf("missing transaction announcement: got %v missing %v", txHashes, tx.Hash())
|
||||||
|
case *NewPooledTransactionHashes:
|
||||||
|
txHashes := msg.Hashes
|
||||||
|
if len(txHashes) != len(msg.Sizes) {
|
||||||
|
return fmt.Errorf("invalid msg size lengths: hashes: %v sizes: %v", len(txHashes), len(msg.Sizes))
|
||||||
|
}
|
||||||
|
if len(txHashes) != len(msg.Types) {
|
||||||
|
return fmt.Errorf("invalid msg type lengths: hashes: %v types: %v", len(txHashes), len(msg.Types))
|
||||||
|
}
|
||||||
|
// if you receive an old tx propagation, read from connection again
|
||||||
|
if len(txHashes) == 1 && prevTx != nil {
|
||||||
|
if txHashes[0] == prevTx.Hash() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for index, gotHash := range txHashes {
|
||||||
|
if gotHash == tx.Hash() {
|
||||||
|
if msg.Sizes[index] != uint32(tx.Size()) {
|
||||||
|
return fmt.Errorf("invalid tx size: got %v want %v", msg.Sizes[index], tx.Size())
|
||||||
|
}
|
||||||
|
if msg.Types[index] != tx.Type() {
|
||||||
|
return fmt.Errorf("invalid tx type: got %v want %v", msg.Types[index], tx.Type())
|
||||||
|
}
|
||||||
|
// Ok
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("missing transaction announcement: got %v missing %v", txHashes, tx.Hash())
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unexpected message in sendSuccessfulTx: %s", pretty.Sdump(msg))
|
return fmt.Errorf("unexpected message in sendSuccessfulTx: %s", pretty.Sdump(msg))
|
||||||
}
|
}
|
||||||
@ -201,8 +229,10 @@ func sendMultipleSuccessfulTxs(t *utesting.T, s *Suite, txs []*types.Transaction
|
|||||||
for _, tx := range *msg {
|
for _, tx := range *msg {
|
||||||
recvHashes = append(recvHashes, tx.Hash())
|
recvHashes = append(recvHashes, tx.Hash())
|
||||||
}
|
}
|
||||||
case *NewPooledTransactionHashes:
|
case *NewPooledTransactionHashes66:
|
||||||
recvHashes = append(recvHashes, *msg...)
|
recvHashes = append(recvHashes, *msg...)
|
||||||
|
case *NewPooledTransactionHashes:
|
||||||
|
recvHashes = append(recvHashes, msg.Hashes...)
|
||||||
default:
|
default:
|
||||||
if !strings.Contains(pretty.Sdump(msg), "i/o timeout") {
|
if !strings.Contains(pretty.Sdump(msg), "i/o timeout") {
|
||||||
return fmt.Errorf("unexpected message while waiting to receive txs: %s", pretty.Sdump(msg))
|
return fmt.Errorf("unexpected message while waiting to receive txs: %s", pretty.Sdump(msg))
|
||||||
@ -246,11 +276,16 @@ func checkMaliciousTxPropagation(s *Suite, txs []*types.Transaction, conn *Conn)
|
|||||||
if len(badTxs) > 0 {
|
if len(badTxs) > 0 {
|
||||||
return fmt.Errorf("received %d bad txs: \n%v", len(badTxs), badTxs)
|
return fmt.Errorf("received %d bad txs: \n%v", len(badTxs), badTxs)
|
||||||
}
|
}
|
||||||
case *NewPooledTransactionHashes:
|
case *NewPooledTransactionHashes66:
|
||||||
badTxs, _ := compareReceivedTxs(*msg, txs)
|
badTxs, _ := compareReceivedTxs(*msg, txs)
|
||||||
if len(badTxs) > 0 {
|
if len(badTxs) > 0 {
|
||||||
return fmt.Errorf("received %d bad txs: \n%v", len(badTxs), badTxs)
|
return fmt.Errorf("received %d bad txs: \n%v", len(badTxs), badTxs)
|
||||||
}
|
}
|
||||||
|
case *NewPooledTransactionHashes:
|
||||||
|
badTxs, _ := compareReceivedTxs(msg.Hashes, txs)
|
||||||
|
if len(badTxs) > 0 {
|
||||||
|
return fmt.Errorf("received %d bad txs: \n%v", len(badTxs), badTxs)
|
||||||
|
}
|
||||||
case *Error:
|
case *Error:
|
||||||
// Transaction should not be announced -> wait for timeout
|
// Transaction should not be announced -> wait for timeout
|
||||||
return nil
|
return nil
|
||||||
|
@ -126,8 +126,14 @@ type NewBlock eth.NewBlockPacket
|
|||||||
func (msg NewBlock) Code() int { return 23 }
|
func (msg NewBlock) Code() int { return 23 }
|
||||||
func (msg NewBlock) ReqID() uint64 { return 0 }
|
func (msg NewBlock) ReqID() uint64 { return 0 }
|
||||||
|
|
||||||
|
// NewPooledTransactionHashes66 is the network packet for the tx hash propagation message.
|
||||||
|
type NewPooledTransactionHashes66 eth.NewPooledTransactionHashesPacket66
|
||||||
|
|
||||||
|
func (msg NewPooledTransactionHashes66) Code() int { return 24 }
|
||||||
|
func (msg NewPooledTransactionHashes66) ReqID() uint64 { return 0 }
|
||||||
|
|
||||||
// NewPooledTransactionHashes is the network packet for the tx hash propagation message.
|
// NewPooledTransactionHashes is the network packet for the tx hash propagation message.
|
||||||
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66
|
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket68
|
||||||
|
|
||||||
func (msg NewPooledTransactionHashes) Code() int { return 24 }
|
func (msg NewPooledTransactionHashes) Code() int { return 24 }
|
||||||
func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 }
|
func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 }
|
||||||
@ -202,8 +208,13 @@ func (c *Conn) Read() Message {
|
|||||||
msg = new(NewBlockHashes)
|
msg = new(NewBlockHashes)
|
||||||
case (Transactions{}).Code():
|
case (Transactions{}).Code():
|
||||||
msg = new(Transactions)
|
msg = new(Transactions)
|
||||||
case (NewPooledTransactionHashes{}).Code():
|
case (NewPooledTransactionHashes66{}).Code():
|
||||||
msg = new(NewPooledTransactionHashes)
|
// Try decoding to eth68
|
||||||
|
ethMsg := new(NewPooledTransactionHashes)
|
||||||
|
if err := rlp.DecodeBytes(rawData, ethMsg); err == nil {
|
||||||
|
return ethMsg
|
||||||
|
}
|
||||||
|
msg = new(NewPooledTransactionHashes66)
|
||||||
case (GetPooledTransactions{}.Code()):
|
case (GetPooledTransactions{}.Code()):
|
||||||
ethMsg := new(eth.GetPooledTransactionsPacket66)
|
ethMsg := new(eth.GetPooledTransactionsPacket66)
|
||||||
if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
|
if err := rlp.DecodeBytes(rawData, ethMsg); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user