swarm/network/simulation: use simulations.Event instead p2p.PeerEvent (#18098)

This commit is contained in:
Janoš Guljaš 2018-11-15 21:06:27 +01:00 committed by Viktor Trón
parent b91766fe6d
commit 324027640b
6 changed files with 101 additions and 55 deletions

View File

@ -20,16 +20,18 @@ import (
"context" "context"
"sync" "sync"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations"
) )
// PeerEvent is the type of the channel returned by Simulation.PeerEvents. // PeerEvent is the type of the channel returned by Simulation.PeerEvents.
type PeerEvent struct { type PeerEvent struct {
// NodeID is the ID of node that the event is caught on. // NodeID is the ID of node that the event is caught on.
NodeID enode.ID NodeID enode.ID
// PeerID is the ID of the peer node that the event is caught on.
PeerID enode.ID
// Event is the event that is caught. // Event is the event that is caught.
Event *p2p.PeerEvent Event *simulations.Event
// Error is the error that may have happened during event watching. // Error is the error that may have happened during event watching.
Error error Error error
} }
@ -37,9 +39,13 @@ type PeerEvent struct {
// PeerEventsFilter defines a filter on PeerEvents to exclude messages with // PeerEventsFilter defines a filter on PeerEvents to exclude messages with
// defined properties. Use PeerEventsFilter methods to set required options. // defined properties. Use PeerEventsFilter methods to set required options.
type PeerEventsFilter struct { type PeerEventsFilter struct {
t *p2p.PeerEventType eventType simulations.EventType
protocol *string
msgCode *uint64 connUp *bool
msgReceive *bool
protocol *string
msgCode *uint64
} }
// NewPeerEventsFilter returns a new PeerEventsFilter instance. // NewPeerEventsFilter returns a new PeerEventsFilter instance.
@ -47,20 +53,48 @@ func NewPeerEventsFilter() *PeerEventsFilter {
return &PeerEventsFilter{} return &PeerEventsFilter{}
} }
// Type sets the filter to only one peer event type. // Connect sets the filter to events when two nodes connect.
func (f *PeerEventsFilter) Type(t p2p.PeerEventType) *PeerEventsFilter { func (f *PeerEventsFilter) Connect() *PeerEventsFilter {
f.t = &t f.eventType = simulations.EventTypeConn
b := true
f.connUp = &b
return f
}
// Drop sets the filter to events when two nodes disconnect.
func (f *PeerEventsFilter) Drop() *PeerEventsFilter {
f.eventType = simulations.EventTypeConn
b := false
f.connUp = &b
return f
}
// ReceivedMessages sets the filter to only messages that are received.
func (f *PeerEventsFilter) ReceivedMessages() *PeerEventsFilter {
f.eventType = simulations.EventTypeMsg
b := true
f.msgReceive = &b
return f
}
// SentMessages sets the filter to only messages that are sent.
func (f *PeerEventsFilter) SentMessages() *PeerEventsFilter {
f.eventType = simulations.EventTypeMsg
b := false
f.msgReceive = &b
return f return f
} }
// Protocol sets the filter to only one message protocol. // Protocol sets the filter to only one message protocol.
func (f *PeerEventsFilter) Protocol(p string) *PeerEventsFilter { func (f *PeerEventsFilter) Protocol(p string) *PeerEventsFilter {
f.eventType = simulations.EventTypeMsg
f.protocol = &p f.protocol = &p
return f return f
} }
// MsgCode sets the filter to only one msg code. // MsgCode sets the filter to only one msg code.
func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter { func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter {
f.eventType = simulations.EventTypeMsg
f.msgCode = &c f.msgCode = &c
return f return f
} }
@ -80,19 +114,8 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...
go func(id enode.ID) { go func(id enode.ID) {
defer s.shutdownWG.Done() defer s.shutdownWG.Done()
client, err := s.Net.GetNode(id).Client() events := make(chan *simulations.Event)
if err != nil { sub := s.Net.Events().Subscribe(events)
subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(ctx, "admin", events, "peerEvents")
if err != nil {
subsWG.Done()
eventC <- PeerEvent{NodeID: id, Error: err}
return
}
defer sub.Unsubscribe() defer sub.Unsubscribe()
subsWG.Done() subsWG.Done()
@ -110,28 +133,55 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...
case <-s.Done(): case <-s.Done():
return return
case e := <-events: case e := <-events:
// ignore control events
if e.Control {
continue
}
match := len(filters) == 0 // if there are no filters match all events match := len(filters) == 0 // if there are no filters match all events
for _, f := range filters { for _, f := range filters {
if f.t != nil && *f.t != e.Type { if f.eventType == simulations.EventTypeConn && e.Conn != nil {
continue if *f.connUp != e.Conn.Up {
continue
}
// all connection filter parameters matched, break the loop
match = true
break
} }
if f.protocol != nil && *f.protocol != e.Protocol { if f.eventType == simulations.EventTypeMsg && e.Msg != nil {
continue if f.msgReceive != nil && *f.msgReceive != e.Msg.Received {
continue
}
if f.protocol != nil && *f.protocol != e.Msg.Protocol {
continue
}
if f.msgCode != nil && *f.msgCode != e.Msg.Code {
continue
}
// all message filter parameters matched, break the loop
match = true
break
} }
if f.msgCode != nil && e.MsgCode != nil && *f.msgCode != *e.MsgCode { }
continue var peerID enode.ID
switch e.Type {
case simulations.EventTypeConn:
peerID = e.Conn.One
if peerID == id {
peerID = e.Conn.Other
}
case simulations.EventTypeMsg:
peerID = e.Msg.One
if peerID == id {
peerID = e.Msg.Other
} }
// all filter parameters matched, break the loop
match = true
break
} }
if match { if match {
select { select {
case eventC <- PeerEvent{NodeID: id, Event: e}: case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Event: e}:
case <-ctx.Done(): case <-ctx.Done():
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
select { select {
case eventC <- PeerEvent{NodeID: id, Error: err}: case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Error: err}:
case <-s.Done(): case <-s.Done():
} }
} }

View File

@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network"
"github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/network/simulation"
@ -87,7 +86,7 @@ func ExampleSimulation_PeerEvents() {
log.Error("peer event", "err", e.Error) log.Error("peer event", "err", e.Error)
continue continue
} }
log.Info("peer event", "node", e.NodeID, "peer", e.Event.Peer, "msgcode", e.Event.MsgCode) log.Info("peer event", "node", e.NodeID, "peer", e.PeerID, "type", e.Event.Type)
} }
}() }()
} }
@ -100,7 +99,7 @@ func ExampleSimulation_PeerEvents_disconnections() {
disconnections := sim.PeerEvents( disconnections := sim.PeerEvents(
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), simulation.NewPeerEventsFilter().Drop(),
) )
go func() { go func() {
@ -109,7 +108,7 @@ func ExampleSimulation_PeerEvents_disconnections() {
log.Error("peer drop", "err", d.Error) log.Error("peer drop", "err", d.Error)
continue continue
} }
log.Warn("peer drop", "node", d.NodeID, "peer", d.Event.Peer) log.Warn("peer drop", "node", d.NodeID, "peer", d.PeerID)
} }
}() }()
} }
@ -124,8 +123,8 @@ func ExampleSimulation_PeerEvents_multipleFilters() {
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
// Watch when bzz messages 1 and 4 are received. // Watch when bzz messages 1 and 4 are received.
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("bzz").MsgCode(1), simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(1),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("bzz").MsgCode(4), simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(4),
) )
go func() { go func() {
@ -134,7 +133,7 @@ func ExampleSimulation_PeerEvents_multipleFilters() {
log.Error("bzz message", "err", m.Error) log.Error("bzz message", "err", m.Error)
continue continue
} }
log.Info("bzz message", "node", m.NodeID, "peer", m.Event.Peer) log.Info("bzz message", "node", m.NodeID, "peer", m.PeerID)
} }
}() }()
} }

View File

@ -565,13 +565,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
disconnections := sim.PeerEvents( disconnections := sim.PeerEvents(
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), simulation.NewPeerEventsFilter().Drop(),
) )
go func() { go func() {
for d := range disconnections { for d := range disconnections {
if d.Error != nil { if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal(d.Error) t.Fatal(d.Error)
} }
} }
@ -697,13 +697,13 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
disconnections := sim.PeerEvents( disconnections := sim.PeerEvents(
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), simulation.NewPeerEventsFilter().Drop(),
) )
go func() { go func() {
for d := range disconnections { for d := range disconnections {
if d.Error != nil { if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
b.Fatal(d.Error) b.Fatal(d.Error)
} }
} }

View File

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network"
@ -154,7 +153,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
disconnections := sim.PeerEvents( disconnections := sim.PeerEvents(
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), simulation.NewPeerEventsFilter().Drop(),
) )
err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
@ -165,7 +164,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
go func() { go func() {
for d := range disconnections { for d := range disconnections {
if d.Error != nil { if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal(d.Error) t.Fatal(d.Error)
} }
} }

View File

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
@ -210,12 +209,12 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
disconnections := sim.PeerEvents( disconnections := sim.PeerEvents(
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), simulation.NewPeerEventsFilter().Drop(),
) )
go func() { go func() {
for d := range disconnections { for d := range disconnections {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal("unexpected disconnect") t.Fatal("unexpected disconnect")
cancelSimRun() cancelSimRun()
} }
@ -402,12 +401,12 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
disconnections := sim.PeerEvents( disconnections := sim.PeerEvents(
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), simulation.NewPeerEventsFilter().Drop(),
) )
go func() { go func() {
for d := range disconnections { for d := range disconnections {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal("unexpected disconnect") t.Fatal("unexpected disconnect")
cancelSimRun() cancelSimRun()
} }
@ -428,7 +427,7 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
var subscriptionCount int var subscriptionCount int
filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4) filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4)
eventC := sim.PeerEvents(ctx, nodeIDs, filter) eventC := sim.PeerEvents(ctx, nodeIDs, filter)
for j, node := range nodeIDs { for j, node := range nodeIDs {

View File

@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/log"
@ -151,13 +150,13 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
disconnections := sim.PeerEvents( disconnections := sim.PeerEvents(
context.Background(), context.Background(),
sim.NodeIDs(), sim.NodeIDs(),
simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), simulation.NewPeerEventsFilter().Drop(),
) )
go func() { go func() {
for d := range disconnections { for d := range disconnections {
if d.Error != nil { if d.Error != nil {
log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
t.Fatal(d.Error) t.Fatal(d.Error)
} }
} }