swarm/network: remove isproxbin bool from kad.Each* iterfunc (#18239)

* swarm/network, swarm/pss: remove isproxbin bool from kad.Each* iterfunc

* swarm/network: restore comment and unskip snapshot sync tests
This commit is contained in:
Viktor Trón 2019-01-10 03:36:19 +01:00 committed by GitHub
parent d70c4faf20
commit 6df3e4eeb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 25 additions and 35 deletions

View File

@ -65,7 +65,7 @@ func (d *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
// NotifyDepth sends a message to all connections if depth of saturation is changed // NotifyDepth sends a message to all connections if depth of saturation is changed
func NotifyDepth(depth uint8, kad *Kademlia) { func NotifyDepth(depth uint8, kad *Kademlia) {
f := func(val *Peer, po int, _ bool) bool { f := func(val *Peer, po int) bool {
val.NotifyDepth(depth) val.NotifyDepth(depth)
return true return true
} }
@ -74,7 +74,7 @@ func NotifyDepth(depth uint8, kad *Kademlia) {
// NotifyPeer informs all peers about a newly added node // NotifyPeer informs all peers about a newly added node
func NotifyPeer(p *BzzAddr, k *Kademlia) { func NotifyPeer(p *BzzAddr, k *Kademlia) {
f := func(val *Peer, po int, _ bool) bool { f := func(val *Peer, po int) bool {
val.NotifyPeer(p, uint8(po)) val.NotifyPeer(p, uint8(po))
return true return true
} }
@ -160,7 +160,7 @@ func (d *Peer) handleSubPeersMsg(msg *subPeersMsg) error {
if !d.sentPeers { if !d.sentPeers {
d.setDepth(msg.Depth) d.setDepth(msg.Depth)
var peers []*BzzAddr var peers []*BzzAddr
d.kad.EachConn(d.Over(), 255, func(p *Peer, po int, isproxbin bool) bool { d.kad.EachConn(d.Over(), 255, func(p *Peer, po int) bool {
if pob, _ := Pof(d, d.kad.BaseAddr(), 0); pob > po { if pob, _ := Pof(d, d.kad.BaseAddr(), 0); pob > po {
return false return false
} }

View File

@ -114,7 +114,7 @@ func (h *Hive) Stop() error {
} }
} }
log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4])) log.Info(fmt.Sprintf("%08x hive stopped, dropping peers", h.BaseAddr()[:4]))
h.EachConn(nil, 255, func(p *Peer, _ int, _ bool) bool { h.EachConn(nil, 255, func(p *Peer, _ int) bool {
log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4])) log.Info(fmt.Sprintf("%08x dropping peer %08x", h.BaseAddr()[:4], p.Address()[:4]))
p.Drop(nil) p.Drop(nil)
return true return true
@ -228,7 +228,7 @@ func (h *Hive) loadPeers() error {
// savePeers, savePeer implement persistence callback/ // savePeers, savePeer implement persistence callback/
func (h *Hive) savePeers() error { func (h *Hive) savePeers() error {
var peers []*BzzAddr var peers []*BzzAddr
h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int, _ bool) bool { h.Kademlia.EachAddr(nil, 256, func(pa *BzzAddr, i int) bool {
if pa == nil { if pa == nil {
log.Warn(fmt.Sprintf("empty addr: %v", i)) log.Warn(fmt.Sprintf("empty addr: %v", i))
return true return true

View File

@ -103,7 +103,7 @@ func TestHiveStatePersistance(t *testing.T) {
pp.Start(s1.Server) pp.Start(s1.Server)
i := 0 i := 0
pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int, nn bool) bool { pp.Kademlia.EachAddr(nil, 256, func(addr *BzzAddr, po int) bool {
delete(peers, addr.String()) delete(peers, addr.String())
i++ i++
return true return true

View File

@ -390,46 +390,42 @@ func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(con
// EachConn is an iterator with args (base, po, f) applies f to each live peer // EachConn is an iterator with args (base, po, f) applies f to each live peer
// that has proximity order po or less as measured from the base // that has proximity order po or less as measured from the base
// if base is nil, kademlia base address is used // if base is nil, kademlia base address is used
// It returns peers in order deepest to shallowest func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int) bool) {
func (k *Kademlia) EachConn(base []byte, o int, f func(*Peer, int, bool) bool) {
k.lock.RLock() k.lock.RLock()
defer k.lock.RUnlock() defer k.lock.RUnlock()
k.eachConn(base, o, f) k.eachConn(base, o, f)
} }
func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int, bool) bool) { func (k *Kademlia) eachConn(base []byte, o int, f func(*Peer, int) bool) {
if len(base) == 0 { if len(base) == 0 {
base = k.base base = k.base
} }
depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
k.conns.EachNeighbour(base, Pof, func(val pot.Val, po int) bool { k.conns.EachNeighbour(base, Pof, func(val pot.Val, po int) bool {
if po > o { if po > o {
return true return true
} }
return f(val.(*Peer), po, po >= depth) return f(val.(*Peer), po)
}) })
} }
// EachAddr called with (base, po, f) is an iterator applying f to each known peer // EachAddr called with (base, po, f) is an iterator applying f to each known peer
// that has proximity order o or less as measured from the base // that has proximity order o or less as measured from the base
// if base is nil, kademlia base address is used // if base is nil, kademlia base address is used
// It returns peers in order deepest to shallowest func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
func (k *Kademlia) EachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) {
k.lock.RLock() k.lock.RLock()
defer k.lock.RUnlock() defer k.lock.RUnlock()
k.eachAddr(base, o, f) k.eachAddr(base, o, f)
} }
func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int, bool) bool) { func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) {
if len(base) == 0 { if len(base) == 0 {
base = k.base base = k.base
} }
depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
k.addrs.EachNeighbour(base, Pof, func(val pot.Val, po int) bool { k.addrs.EachNeighbour(base, Pof, func(val pot.Val, po int) bool {
if po > o { if po > o {
return true return true
} }
return f(val.(*entry).BzzAddr, po, po >= depth) return f(val.(*entry).BzzAddr, po)
}) })
} }
@ -687,12 +683,11 @@ func (k *Kademlia) saturation() int {
// TODO move to separate testing tools file // TODO move to separate testing tools file
func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]byte) { func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]byte) {
pm := make(map[string]bool) pm := make(map[string]bool)
depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
// create a map with all peers at depth and deeper known in the kademlia // create a map with all peers at depth and deeper known in the kademlia
// in order deepest to shallowest compared to the kademlia base address k.eachAddr(nil, 255, func(p *BzzAddr, po int) bool {
// all bins (except self) are included (0 <= bin <= 255) // in order deepest to shallowest compared to the kademlia base address
depth := depthForPot(k.addrs, k.MinProxBinSize, k.base) // all bins (except self) are included (0 <= bin <= 255)
k.eachAddr(nil, 255, func(p *BzzAddr, po int, nn bool) bool {
if po < depth { if po < depth {
return false return false
} }
@ -724,12 +719,8 @@ func (k *Kademlia) knowNeighbours(addrs [][]byte) (got bool, n int, missing [][]
// It is used in Healthy function for testing only // It is used in Healthy function for testing only
func (k *Kademlia) connectedNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) { func (k *Kademlia) connectedNeighbours(peers [][]byte) (got bool, n int, missing [][]byte) {
pm := make(map[string]bool) pm := make(map[string]bool)
// create a map with all peers at depth and deeper that are connected in the kademlia
// in order deepest to shallowest compared to the kademlia base address
// all bins (except self) are included (0 <= bin <= 255)
depth := depthForPot(k.conns, k.MinProxBinSize, k.base) depth := depthForPot(k.conns, k.MinProxBinSize, k.base)
k.eachConn(nil, 255, func(p *Peer, po int, nn bool) bool { k.eachConn(nil, 255, func(p *Peer, po int) bool {
if po < depth { if po < depth {
return false return false
} }

View File

@ -232,7 +232,7 @@ func assertHealth(t *testing.T, k *Kademlia, expectHealthy bool, expectSaturatio
t.Helper() t.Helper()
kid := common.Bytes2Hex(k.BaseAddr()) kid := common.Bytes2Hex(k.BaseAddr())
addrs := [][]byte{k.BaseAddr()} addrs := [][]byte{k.BaseAddr()}
k.EachAddr(nil, 255, func(addr *BzzAddr, po int, _ bool) bool { k.EachAddr(nil, 255, func(addr *BzzAddr, po int) bool {
addrs = append(addrs, addr.Address()) addrs = append(addrs, addr.Address())
return true return true
}) })

View File

@ -92,7 +92,7 @@ func TestNetworkID(t *testing.T) {
if kademlias[node].addrs.Size() != len(netIDGroup)-1 { if kademlias[node].addrs.Size() != len(netIDGroup)-1 {
t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1) t.Fatalf("Kademlia size has not expected peer size. Kademlia size: %d, expected size: %d", kademlias[node].addrs.Size(), len(netIDGroup)-1)
} }
kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int, _ bool) bool { kademlias[node].EachAddr(nil, 0, func(addr *BzzAddr, _ int) bool {
found := false found := false
for _, nd := range netIDGroup { for _, nd := range netIDGroup {
if bytes.Equal(kademlias[nd].BaseAddr(), addr.Address()) { if bytes.Equal(kademlias[nd].BaseAddr(), addr.Address()) {

View File

@ -19,7 +19,6 @@ package stream
import ( import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
@ -245,7 +244,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
} }
} else { } else {
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
id := p.ID() id := p.ID()
if p.LightNode { if p.LightNode {
// skip light nodes // skip light nodes

View File

@ -336,7 +336,7 @@ func (p *Peer) handleWantedHashesMsg(ctx context.Context, req *WantedHashesMsg)
// launch in go routine since GetBatch blocks until new hashes arrive // launch in go routine since GetBatch blocks until new hashes arrive
go func() { go func() {
if err := p.SendOfferedHashes(s, req.From, req.To); err != nil { if err := p.SendOfferedHashes(s, req.From, req.To); err != nil {
log.Warn("SendOfferedHashes error", "err", err) log.Warn("SendOfferedHashes error", "peer", p.ID().TerminalString(), "err", err)
} }
}() }()
// go p.SendOfferedHashes(s, req.From, req.To) // go p.SendOfferedHashes(s, req.From, req.To)

View File

@ -964,7 +964,7 @@ func (p *Pss) forward(msg *PssMsg) error {
onlySendOnce = true onlySendOnce = true
} }
p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int, _ bool) bool { p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
if po < broadcastThreshold && sent > 0 { if po < broadcastThreshold && sent > 0 {
return false // stop iterating return false // stop iterating
} }

View File

@ -491,12 +491,12 @@ func TestAddressMatchProx(t *testing.T) {
// meanwhile test regression for kademlia since we are compiling the test parameters from different packages // meanwhile test regression for kademlia since we are compiling the test parameters from different packages
var proxes int var proxes int
var conns int var conns int
kad.EachConn(nil, peerCount, func(p *network.Peer, po int, prox bool) bool { depth := kad.NeighbourhoodDepth()
kad.EachConn(nil, peerCount, func(p *network.Peer, po int) bool {
conns++ conns++
if prox { if po >= depth {
proxes++ proxes++
} }
log.Trace("kadconn", "po", po, "peer", p, "prox", prox)
return true return true
}) })
if proxes != nnPeerCount { if proxes != nnPeerCount {