diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index f553cb5f4..90491ab31 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -83,14 +83,14 @@ func NewKadParams() *KadParams { // Kademlia is a table of live peers and a db of known peers (node records) type Kademlia struct { lock sync.RWMutex - *KadParams // Kademlia configuration parameters - base []byte // immutable baseaddress of the table - addrs *pot.Pot // pots container for known peer addresses - conns *pot.Pot // pots container for live peer connections - depth uint8 // stores the last current depth of saturation - nDepth int // stores the last neighbourhood depth - nDepthC chan int // returned by DepthC function to signal neighbourhood depth change - addrCountC chan int // returned by AddrCountC function to signal peer count change + *KadParams // Kademlia configuration parameters + base []byte // immutable baseaddress of the table + addrs *pot.Pot // pots container for known peer addresses + conns *pot.Pot // pots container for live peer connections + depth uint8 // stores the last current depth of saturation + nDepth int // stores the last neighbourhood depth + nDepthMu sync.RWMutex // protects neighbourhood depth nDepth + nDepthSig []chan struct{} // signals when neighbourhood depth nDepth is changed } // NewKademlia creates a Kademlia table for base address addr @@ -175,12 +175,8 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { } size++ } - // send new address count value only if there are new addresses - if k.addrCountC != nil && size-known > 0 { - k.addrCountC <- k.addrs.Size() - } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return nil } @@ -323,10 +319,6 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { k.addrs, _, _, _ = pot.Swap(k.addrs, p, Pof, func(v pot.Val) pot.Val { return a }) - // send new address count value only if the peer is inserted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } } // calculate if depth of saturation changed depth := uint8(k.saturation()) @@ -335,75 +327,72 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { changed = true k.depth = depth } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() return k.depth, changed } -// NeighbourhoodDepthC returns the channel that sends a new kademlia -// neighbourhood depth on each change. -// Not receiving from the returned channel will block On function -// when the neighbourhood depth is changed. -// TODO: Why is this exported, and if it should be; why can't we have more subscribers than one? -func (k *Kademlia) NeighbourhoodDepthC() <-chan int { - k.lock.Lock() - defer k.lock.Unlock() - if k.nDepthC == nil { - k.nDepthC = make(chan int) +// setNeighbourhoodDepth calculates neighbourhood depth with depthForPot, +// sets it to the nDepth and sends a signal to every nDepthSig channel. +func (k *Kademlia) setNeighbourhoodDepth() { + nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) + var changed bool + k.nDepthMu.Lock() + if nDepth != k.nDepth { + k.nDepth = nDepth + changed = true } - return k.nDepthC -} + k.nDepthMu.Unlock() -// CloseNeighbourhoodDepthC closes the channel returned by -// NeighbourhoodDepthC and stops sending neighbourhood change. -func (k *Kademlia) CloseNeighbourhoodDepthC() { - k.lock.Lock() - defer k.lock.Unlock() - - if k.nDepthC != nil { - close(k.nDepthC) - k.nDepthC = nil - } -} - -// sendNeighbourhoodDepthChange sends new neighbourhood depth to k.nDepth channel -// if it is initialized. -func (k *Kademlia) sendNeighbourhoodDepthChange() { - // nDepthC is initialized when NeighbourhoodDepthC is called and returned by it. - // It provides signaling of neighbourhood depth change. - // This part of the code is sending new neighbourhood depth to nDepthC if that condition is met. - if k.nDepthC != nil { - nDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base) - if nDepth != k.nDepth { - k.nDepth = nDepth - k.nDepthC <- nDepth + if len(k.nDepthSig) > 0 && changed { + for _, c := range k.nDepthSig { + // Every nDepthSig channel has a buffer capacity of 1, + // so every receiver will get the signal even if the + // select statement has the default case to avoid blocking. + select { + case c <- struct{}{}: + default: + } } } } -// AddrCountC returns the channel that sends a new -// address count value on each change. -// Not receiving from the returned channel will block Register function -// when address count value changes. -func (k *Kademlia) AddrCountC() <-chan int { - k.lock.Lock() - defer k.lock.Unlock() - - if k.addrCountC == nil { - k.addrCountC = make(chan int) - } - return k.addrCountC +// NeighbourhoodDepth returns the value calculated by depthForPot function +// in setNeighbourhoodDepth method. +func (k *Kademlia) NeighbourhoodDepth() int { + k.nDepthMu.RLock() + defer k.nDepthMu.RUnlock() + return k.nDepth } -// CloseAddrCountC closes the channel returned by -// AddrCountC and stops sending address count change. -func (k *Kademlia) CloseAddrCountC() { +// SubscribeToNeighbourhoodDepthChange returns the channel that signals +// when neighbourhood depth value is changed. The current neighbourhood depth +// is returned by NeighbourhoodDepth method. Returned function unsubscribes +// the channel from signaling and releases the resources. Returned function is safe +// to be called multiple times. +func (k *Kademlia) SubscribeToNeighbourhoodDepthChange() (c <-chan struct{}, unsubscribe func()) { + channel := make(chan struct{}, 1) + var closeOnce sync.Once + k.lock.Lock() defer k.lock.Unlock() - if k.addrCountC != nil { - close(k.addrCountC) - k.addrCountC = nil + k.nDepthSig = append(k.nDepthSig, channel) + + unsubscribe = func() { + k.lock.Lock() + defer k.lock.Unlock() + + for i, c := range k.nDepthSig { + if c == channel { + k.nDepthSig = append(k.nDepthSig[:i], k.nDepthSig[i+1:]...) + break + } + } + + closeOnce.Do(func() { close(channel) }) } + + return channel, unsubscribe } // Off removes a peer from among live peers @@ -429,11 +418,7 @@ func (k *Kademlia) Off(p *Peer) { // v cannot be nil, but no need to check return nil }) - // send new address count value only if the peer is deleted - if k.addrCountC != nil { - k.addrCountC <- k.addrs.Size() - } - k.sendNeighbourhoodDepthChange() + k.setNeighbourhoodDepth() } } @@ -491,13 +476,6 @@ func (k *Kademlia) eachAddr(base []byte, o int, f func(*BzzAddr, int) bool) { }) } -// NeighbourhoodDepth returns the depth for the pot, see depthForPot -func (k *Kademlia) NeighbourhoodDepth() (depth int) { - k.lock.RLock() - defer k.lock.RUnlock() - return depthForPot(k.conns, k.NeighbourhoodSize, k.base) -} - // neighbourhoodRadiusForPot returns the neighbourhood radius of the kademlia // neighbourhood radius encloses the nearest neighbour set with size >= neighbourhoodSize // i.e., neighbourhood radius is the deepest PO such that all bins not shallower altogether diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index 93b990138..035879cd3 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -560,3 +560,113 @@ func newTestDiscoveryPeer(addr pot.Address, kad *Kademlia) *Peer { } return NewPeer(bp, kad) } + +// TestKademlia_SubscribeToNeighbourhoodDepthChange checks if correct +// signaling over SubscribeToNeighbourhoodDepthChange channels are made +// when neighbourhood depth is changed. +func TestKademlia_SubscribeToNeighbourhoodDepthChange(t *testing.T) { + + testSignal := func(t *testing.T, k *testKademlia, prevDepth int, c <-chan struct{}) (newDepth int) { + t.Helper() + + select { + case _, ok := <-c: + if !ok { + t.Error("closed signal channel") + } + newDepth = k.NeighbourhoodDepth() + if prevDepth == newDepth { + t.Error("depth not changed") + } + return newDepth + case <-time.After(2 * time.Second): + t.Error("timeout") + } + return newDepth + } + + t.Run("single subscription", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + testSignal(t, k, depth, c) + }) + + t.Run("multiple subscriptions", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c1, u1 := k.SubscribeToNeighbourhoodDepthChange() + defer u1() + + c2, u2 := k.SubscribeToNeighbourhoodDepthChange() + defer u2() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + testSignal(t, k, depth, c1) + + testSignal(t, k, depth, c2) + }) + + t.Run("multiple changes", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + depth := k.NeighbourhoodDepth() + + k.On("11111101", "01000000", "10000000", "00000010") + + depth = testSignal(t, k, depth, c) + + k.On("11111101", "01000010", "10000010", "00000110") + + testSignal(t, k, depth, c) + }) + + t.Run("no depth change", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + c, u := k.SubscribeToNeighbourhoodDepthChange() + defer u() + + // does not trigger the depth change + k.On("11111101") + + select { + case _, ok := <-c: + if !ok { + t.Error("closed signal channel") + } + t.Error("signal received") + case <-time.After(1 * time.Second): + // all fine + } + }) + + t.Run("no new peers", func(t *testing.T) { + k := newTestKademlia(t, "00000000") + + changeC, unsubscribe := k.SubscribeToNeighbourhoodDepthChange() + defer unsubscribe() + + select { + case _, ok := <-changeC: + if !ok { + t.Error("closed signal channel") + } + t.Error("signal received") + case <-time.After(1 * time.Second): + // all fine + } + }) +} diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index aa2c817ea..1b4a14ea2 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -145,7 +145,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int switch r := req.(type) { case *ChunkDeliveryMsgRetrieval: msg = (*ChunkDeliveryMsg)(r) - peerPO := chunk.Proximity(sp.ID().Bytes(), msg.Addr) + peerPO := chunk.Proximity(sp.BzzAddr.Over(), msg.Addr) po := chunk.Proximity(d.kad.BaseAddr(), msg.Addr) depth := d.kad.NeighbourhoodDepth() // chunks within the area of responsibility should always sync @@ -186,8 +186,6 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int } func (d *Delivery) Close() { - d.kad.CloseNeighbourhoodDepthC() - d.kad.CloseAddrCountC() close(d.quit) } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 4037243c1..5f73f7cc8 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -156,7 +156,7 @@ func TestRequestFromPeers(t *testing.T) { // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } @@ -196,7 +196,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil) // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished sp := &Peer{ - Peer: protocolsPeer, + BzzPeer: &network.BzzPeer{Peer: protocolsPeer, BzzAddr: addr}, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: r, } diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index b60d2fcc9..821cdaa9a 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -175,7 +175,11 @@ type QuitMsg struct { } func (p *Peer) handleQuitMsg(req *QuitMsg) error { - return p.removeClient(req.Stream) + err := p.removeClient(req.Stream) + if _, ok := err.(*notFoundError); ok { + return nil + } + return err } // OfferedHashesMsg is the protocol msg for offering to hand over a diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 2514dcad4..17ce0d798 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -24,8 +24,10 @@ import ( "time" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/p2p/protocols" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/network" pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/stream/intervals" "github.com/ethereum/go-ethereum/swarm/spancontext" @@ -54,7 +56,7 @@ var ErrMaxPeerServers = errors.New("max peer servers") // Peer is the Peer extension for the streaming protocol type Peer struct { - *protocols.Peer + *network.BzzPeer streamer *Registry pq *pq.PriorityQueue serverMu sync.RWMutex @@ -74,9 +76,9 @@ type WrappedPriorityMsg struct { } // NewPeer is the constructor for Peer -func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { +func NewPeer(peer *network.BzzPeer, streamer *Registry) *Peer { p := &Peer{ - Peer: peer, + BzzPeer: peer, pq: pq.New(int(PriorityQueue), PriorityQueueCap), streamer: streamer, servers: make(map[Stream]*server), @@ -417,3 +419,165 @@ func (p *Peer) close() { s.Close() } } + +// runUpdateSyncing is a long running function that creates the initial +// syncing subscriptions to the peer and waits for neighbourhood depth change +// to create new ones or quit existing ones based on the new neighbourhood depth +// and if peer enters or leaves nearest neighbourhood by using +// syncSubscriptionsDiff and updateSyncSubscriptions functions. +func (p *Peer) runUpdateSyncing() { + timer := time.NewTimer(p.streamer.syncUpdateDelay) + defer timer.Stop() + + select { + case <-timer.C: + case <-p.streamer.quit: + return + } + + kad := p.streamer.delivery.kad + po := chunk.Proximity(p.BzzAddr.Over(), kad.BaseAddr()) + + depth := kad.NeighbourhoodDepth() + + log.Debug("update syncing subscriptions: initial", "peer", p.ID(), "po", po, "depth", depth) + + // initial subscriptions + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, -1, depth, kad.MaxProxDisplay)) + + depthChangeSignal, unsubscribeDepthChangeSignal := kad.SubscribeToNeighbourhoodDepthChange() + defer unsubscribeDepthChangeSignal() + + prevDepth := depth + for { + select { + case _, ok := <-depthChangeSignal: + if !ok { + return + } + // update subscriptions for this peer when depth changes + depth := kad.NeighbourhoodDepth() + log.Debug("update syncing subscriptions", "peer", p.ID(), "po", po, "depth", depth) + p.updateSyncSubscriptions(syncSubscriptionsDiff(po, prevDepth, depth, kad.MaxProxDisplay)) + prevDepth = depth + case <-p.streamer.quit: + return + } + } + log.Debug("update syncing subscriptions: exiting", "peer", p.ID()) +} + +// updateSyncSubscriptions accepts two slices of integers, the first one +// representing proximity order bins for required syncing subscriptions +// and the second one representing bins for syncing subscriptions that +// need to be removed. This function sends request for subscription +// messages and quit messages for provided bins. +func (p *Peer) updateSyncSubscriptions(subBins, quitBins []int) { + if p.streamer.getPeer(p.ID()) == nil { + log.Debug("update syncing subscriptions", "peer not found", p.ID()) + return + } + log.Debug("update syncing subscriptions", "peer", p.ID(), "subscribe", subBins, "quit", quitBins) + for _, po := range subBins { + p.subscribeSync(po) + } + for _, po := range quitBins { + p.quitSync(po) + } +} + +// subscribeSync send the request for syncing subscriptions to the peer +// using subscriptionFunc. This function is used to request syncing subscriptions +// when new peer is added to the registry and on neighbourhood depth change. +func (p *Peer) subscribeSync(po int) { + err := subscriptionFunc(p.streamer, p.ID(), uint8(po)) + if err != nil { + log.Error("subscription", "err", err) + } +} + +// quitSync sends the quit message for live and history syncing streams to the peer. +// This function is used in runUpdateSyncing indirectly over updateSyncSubscriptions +// to remove unneeded syncing subscriptions on neighbourhood depth change. +func (p *Peer) quitSync(po int) { + live := NewStream("SYNC", FormatSyncBinKey(uint8(po)), true) + history := getHistoryStream(live) + err := p.streamer.Quit(p.ID(), live) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", live) + } + err = p.streamer.Quit(p.ID(), history) + if err != nil && err != p2p.ErrShuttingDown { + log.Error("quit", "err", err, "peer", p.ID(), "stream", history) + } + + err = p.removeServer(live) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } + err = p.removeServer(history) + if err != nil { + log.Error("remove server", "err", err, "peer", p.ID(), "stream", live) + } +} + +// syncSubscriptionsDiff calculates to which proximity order bins a peer +// (with po peerPO) needs to be subscribed after kademlia neighbourhood depth +// change from prevDepth to newDepth. Max argument limits the number of +// proximity order bins. Returned values are slices of integers which represent +// proximity order bins, the first one to which additional subscriptions need to +// be requested and the second one which subscriptions need to be quit. Argument +// prevDepth with value less then 0 represents no previous depth, used for +// initial syncing subscriptions. +func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []int) { + newStart, newEnd := syncBins(peerPO, newDepth, max) + if prevDepth < 0 { + // no previous depth, return the complete range + // for subscriptions requests and nothing for quitting + return intRange(newStart, newEnd), nil + } + + prevStart, prevEnd := syncBins(peerPO, prevDepth, max) + + if newStart < prevStart { + subBins = append(subBins, intRange(newStart, prevStart)...) + } + + if prevStart < newStart { + quitBins = append(quitBins, intRange(prevStart, newStart)...) + } + + if newEnd < prevEnd { + quitBins = append(quitBins, intRange(newEnd, prevEnd)...) + } + + if prevEnd < newEnd { + subBins = append(subBins, intRange(prevEnd, newEnd)...) + } + + return subBins, quitBins +} + +// syncBins returns the range to which proximity order bins syncing +// subscriptions need to be requested, based on peer proximity and +// kademlia neighbourhood depth. Returned range is [start,end), inclusive for +// start and exclusive for end. +func syncBins(peerPO, depth, max int) (start, end int) { + if peerPO < depth { + // subscribe only to peerPO bin if it is not + // in the nearest neighbourhood + return peerPO, peerPO + 1 + } + // subscribe from depth to max bin if the peer + // is in the nearest neighbourhood + return depth, max + 1 +} + +// intRange returns the slice of integers [start,end). The start +// is inclusive and the end is not. +func intRange(start, end int) (r []int) { + for i := start; i < end; i++ { + r = append(r, i) + } + return r +} diff --git a/swarm/network/stream/peer_test.go b/swarm/network/stream/peer_test.go new file mode 100644 index 000000000..98c5cc010 --- /dev/null +++ b/swarm/network/stream/peer_test.go @@ -0,0 +1,309 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package stream + +import ( + "context" + "fmt" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/network" + "github.com/ethereum/go-ethereum/swarm/network/simulation" + "github.com/ethereum/go-ethereum/swarm/state" +) + +// TestSyncSubscriptionsDiff validates the output of syncSubscriptionsDiff +// function for various arguments. +func TestSyncSubscriptionsDiff(t *testing.T) { + max := network.NewKadParams().MaxProxDisplay + for _, tc := range []struct { + po, prevDepth, newDepth int + subBins, quitBins []int + }{ + { + po: 0, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 0, + subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: -1, newDepth: 1, + subBins: []int{0}, + }, + { + po: 1, prevDepth: -1, newDepth: 1, + subBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 2, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 3, prevDepth: -1, newDepth: 2, + subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: -1, newDepth: 2, + subBins: []int{1}, + }, + { + po: 0, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 1, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16 + }, + { + po: 0, prevDepth: 0, newDepth: 1, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 0, prevDepth: 0, newDepth: 2, // 0-16 -> 0 + quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 1, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 1, prevDepth: 1, newDepth: 0, // 1-16 -> 0-16 + subBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16 + quitBins: []int{0}, + }, + { + po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16 + quitBins: []int{0, 1, 2, 3}, + }, + { + po: 4, prevDepth: 0, newDepth: 5, // 0-16 -> 4 + quitBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 0, // 4 -> 0-16 + subBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + { + po: 4, prevDepth: 5, newDepth: 6, // 4 -> 4 + }, + } { + subBins, quitBins := syncSubscriptionsDiff(tc.po, tc.prevDepth, tc.newDepth, max) + if fmt.Sprint(subBins) != fmt.Sprint(tc.subBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got subBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, subBins, tc.subBins) + } + if fmt.Sprint(quitBins) != fmt.Sprint(tc.quitBins) { + t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got quitBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, quitBins, tc.quitBins) + } + } +} + +// TestUpdateSyncingSubscriptions validates that syncing subscriptions are correctly +// made on initial node connections and that subscriptions are correctly changed +// when kademlia neighbourhood depth is changed by connecting more nodes. +func TestUpdateSyncingSubscriptions(t *testing.T) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SyncUpdateDelay: 100 * time.Millisecond, + Syncing: SyncingAutoSubscribe, + }, nil) + cleanup = func() { + r.Close() + clean() + } + bucket.Store("bzz-address", addr) + return r, cleanup, nil + }, + }) + defer sim.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + // initial nodes, first one as pivot center of the start + ids, err := sim.AddNodesAndConnectStar(10) + if err != nil { + return err + } + + // pivot values + pivotRegistryID := ids[0] + pivotRegistry := sim.Service("streamer", pivotRegistryID).(*Registry) + pivotKademlia := pivotRegistry.delivery.kad + // nodes proximities from the pivot node + nodeProximities := make(map[string]int) + for _, id := range ids[1:] { + bzzAddr, ok := sim.NodeItem(id, "bzz-address") + if !ok { + t.Fatal("no bzz address for node") + } + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) + } + // wait until sync subscriptions are done for all nodes + waitForSubscriptions(t, pivotRegistry, ids[1:]...) + + // check initial sync streams + err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + if err != nil { + return err + } + + // add more nodes until the depth is changed + prevDepth := pivotKademlia.NeighbourhoodDepth() + var noDepthChangeChecked bool // true it there was a check when no depth is changed + for { + ids, err := sim.AddNodes(5) + if err != nil { + return err + } + // add new nodes to sync subscriptions check + for _, id := range ids { + bzzAddr, ok := sim.NodeItem(id, "bzz-address") + if !ok { + t.Fatal("no bzz address for node") + } + nodeProximities[id.String()] = chunk.Proximity(pivotKademlia.BaseAddr(), bzzAddr.(*network.BzzAddr).Over()) + } + err = sim.Net.ConnectNodesStar(ids, pivotRegistryID) + if err != nil { + return err + } + waitForSubscriptions(t, pivotRegistry, ids...) + + newDepth := pivotKademlia.NeighbourhoodDepth() + // depth is not changed, check if streams are still correct + if newDepth == prevDepth { + err = checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + if err != nil { + return err + } + noDepthChangeChecked = true + } + // do the final check when depth is changed and + // there has been at least one check + // for the case when depth is not changed + if newDepth != prevDepth && noDepthChangeChecked { + // check sync streams for changed depth + return checkSyncStreamsWithRetry(pivotRegistry, nodeProximities) + } + prevDepth = newDepth + } + }) + if result.Error != nil { + t.Fatal(result.Error) + } +} + +// waitForSubscriptions is a test helper function that blocks until +// stream server subscriptions are established on the provided registry +// to the nodes with provided IDs. +func waitForSubscriptions(t *testing.T, r *Registry, ids ...enode.ID) { + t.Helper() + + for retries := 0; retries < 100; retries++ { + subs := r.api.GetPeerServerSubscriptions() + if allSubscribed(subs, ids) { + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("missing subscriptions") +} + +// allSubscribed returns true if nodes with ids have subscriptions +// in provided subs map. +func allSubscribed(subs map[string][]string, ids []enode.ID) bool { + for _, id := range ids { + if s, ok := subs[id.String()]; !ok || len(s) == 0 { + return false + } + } + return true +} + +// checkSyncStreamsWithRetry is calling checkSyncStreams with retries. +func checkSyncStreamsWithRetry(r *Registry, nodeProximities map[string]int) (err error) { + for retries := 0; retries < 5; retries++ { + err = checkSyncStreams(r, nodeProximities) + if err == nil { + return nil + } + time.Sleep(500 * time.Millisecond) + } + return err +} + +// checkSyncStreams validates that registry contains expected sync +// subscriptions to nodes with proximities in a map nodeProximities. +func checkSyncStreams(r *Registry, nodeProximities map[string]int) error { + depth := r.delivery.kad.NeighbourhoodDepth() + maxPO := r.delivery.kad.MaxProxDisplay + for id, po := range nodeProximities { + wantStreams := syncStreams(po, depth, maxPO) + gotStreams := nodeStreams(r, id) + + if r.getPeer(enode.HexID(id)) == nil { + // ignore removed peer + continue + } + + if !reflect.DeepEqual(gotStreams, wantStreams) { + return fmt.Errorf("node %s got streams %v, want %v", id, gotStreams, wantStreams) + } + } + return nil +} + +// syncStreams returns expected sync streams that need to be +// established between a node with kademlia neighbourhood depth +// and a node with proximity order po. +func syncStreams(po, depth, maxPO int) (streams []string) { + start, end := syncBins(po, depth, maxPO) + for bin := start; bin < end; bin++ { + streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), false).String()) + streams = append(streams, NewStream("SYNC", FormatSyncBinKey(uint8(bin)), true).String()) + } + return streams +} + +// nodeStreams returns stream server subscriptions on a registry +// to the peer with provided id. +func nodeStreams(r *Registry, id string) []string { + streams := r.api.GetPeerServerSubscriptions()[id] + sort.Strings(streams) + return streams +} diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 10a8f7ec5..99235af66 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -66,22 +66,24 @@ var subscriptionFunc = doRequestSubscription // Registry registry for outgoing and incoming streamer constructors type Registry struct { - addr enode.ID - api *API - skipCheck bool - clientMu sync.RWMutex - serverMu sync.RWMutex - peersMu sync.RWMutex - serverFuncs map[string]func(*Peer, string, bool) (Server, error) - clientFuncs map[string]func(*Peer, string, bool) (Client, error) - peers map[enode.ID]*Peer - delivery *Delivery - intervalsStore state.Store - maxPeerServers int - spec *protocols.Spec //this protocol's spec - balance protocols.Balance //implements protocols.Balance, for accounting - prices protocols.Prices //implements protocols.Prices, provides prices to accounting - quit chan struct{} // terminates registry goroutines + addr enode.ID + api *API + skipCheck bool + clientMu sync.RWMutex + serverMu sync.RWMutex + peersMu sync.RWMutex + serverFuncs map[string]func(*Peer, string, bool) (Server, error) + clientFuncs map[string]func(*Peer, string, bool) (Client, error) + peers map[enode.ID]*Peer + delivery *Delivery + intervalsStore state.Store + maxPeerServers int + spec *protocols.Spec //this protocol's spec + balance protocols.Balance //implements protocols.Balance, for accounting + prices protocols.Prices //implements protocols.Prices, provides prices to accounting + quit chan struct{} // terminates registry goroutines + syncMode SyncingOption + syncUpdateDelay time.Duration } // RegistryOptions holds optional values for NewRegistry constructor. @@ -104,16 +106,18 @@ func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStor quit := make(chan struct{}) streamer := &Registry{ - addr: localID, - skipCheck: options.SkipCheck, - serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), - clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), - peers: make(map[enode.ID]*Peer), - delivery: delivery, - intervalsStore: intervalsStore, - maxPeerServers: options.MaxPeerServers, - balance: balance, - quit: quit, + addr: localID, + skipCheck: options.SkipCheck, + serverFuncs: make(map[string]func(*Peer, string, bool) (Server, error)), + clientFuncs: make(map[string]func(*Peer, string, bool) (Client, error)), + peers: make(map[enode.ID]*Peer), + delivery: delivery, + intervalsStore: intervalsStore, + maxPeerServers: options.MaxPeerServers, + balance: balance, + quit: quit, + syncUpdateDelay: options.SyncUpdateDelay, + syncMode: options.Syncing, } streamer.setupSpec() @@ -127,103 +131,6 @@ func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStor RegisterSwarmSyncerClient(streamer, netStore) } - // if syncing is set to automatically subscribe to the syncing stream, start the subscription process - if options.Syncing == SyncingAutoSubscribe { - // latestIntC function ensures that - // - receiving from the in chan is not blocked by processing inside the for loop - // - the latest int value is delivered to the loop after the processing is done - // In context of NeighbourhoodDepthC: - // after the syncing is done updating inside the loop, we do not need to update on the intermediate - // depth changes, only to the latest one - latestIntC := func(in <-chan int) <-chan int { - out := make(chan int, 1) - - go func() { - defer close(out) - - for { - select { - case i, ok := <-in: - if !ok { - return - } - select { - case <-out: - default: - } - out <- i - case <-quit: - return - } - } - }() - - return out - } - - kad := streamer.delivery.kad - // get notification channels from Kademlia before returning - // from this function to avoid race with Close method and - // the goroutine created below - depthC := latestIntC(kad.NeighbourhoodDepthC()) - addressBookSizeC := latestIntC(kad.AddrCountC()) - - go func() { - // wait for kademlia table to be healthy - // but return if Registry is closed before - select { - case <-time.After(options.SyncUpdateDelay): - case <-quit: - return - } - - // initial requests for syncing subscription to peers - streamer.updateSyncing() - - for depth := range depthC { - log.Debug("Kademlia neighbourhood depth change", "depth", depth) - - // Prevent too early sync subscriptions by waiting until there are no - // new peers connecting. Sync streams updating will be done after no - // peers are connected for at least SyncUpdateDelay period. - timer := time.NewTimer(options.SyncUpdateDelay) - // Hard limit to sync update delay, preventing long delays - // on a very dynamic network - maxTimer := time.NewTimer(3 * time.Minute) - loop: - for { - select { - case <-maxTimer.C: - // force syncing update when a hard timeout is reached - log.Trace("Sync subscriptions update on hard timeout") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case <-timer.C: - // start syncing as no new peers has been added to kademlia - // for some time - log.Trace("Sync subscriptions update") - // request for syncing subscription to new peers - streamer.updateSyncing() - break loop - case size := <-addressBookSizeC: - log.Trace("Kademlia address book size changed on depth change", "size", size) - // new peers has been added to kademlia, - // reset the timer to prevent early sync subscriptions - if !timer.Stop() { - <-timer.C - } - timer.Reset(options.SyncUpdateDelay) - case <-quit: - break loop - } - } - timer.Stop() - maxTimer.Stop() - } - }() - } - return streamer } @@ -422,8 +329,13 @@ func (r *Registry) peersCount() (c int) { // Run protocol run function func (r *Registry) Run(p *network.BzzPeer) error { - sp := NewPeer(p.Peer, r) + sp := NewPeer(p, r) r.setPeer(sp) + + if r.syncMode == SyncingAutoSubscribe { + go sp.runUpdateSyncing() + } + defer r.deletePeer(sp) defer close(sp.quit) defer sp.close() @@ -431,116 +343,17 @@ func (r *Registry) Run(p *network.BzzPeer) error { return sp.Run(sp.HandleMsg) } -// updateSyncing subscribes to SYNC streams by iterating over the -// kademlia connections and bins. If there are existing SYNC streams -// and they are no longer required after iteration, request to Quit -// them will be send to appropriate peers. -func (r *Registry) updateSyncing() { - kad := r.delivery.kad - // map of all SYNC streams for all peers - // used at the and of the function to remove servers - // that are not needed anymore - subs := make(map[enode.ID]map[Stream]struct{}) - r.peersMu.RLock() - for id, peer := range r.peers { - peer.serverMu.RLock() - for stream := range peer.servers { - if stream.Name == "SYNC" { - if _, ok := subs[id]; !ok { - subs[id] = make(map[Stream]struct{}) - } - subs[id][stream] = struct{}{} - } - } - peer.serverMu.RUnlock() - } - r.peersMu.RUnlock() - - // start requesting subscriptions from peers - r.requestPeerSubscriptions(kad, subs) - - // remove SYNC servers that do not need to be subscribed - for id, streams := range subs { - if len(streams) == 0 { - continue - } - peer := r.getPeer(id) - if peer == nil { - continue - } - for stream := range streams { - log.Debug("Remove sync server", "peer", id, "stream", stream) - err := r.Quit(peer.ID(), stream) - if err != nil && err != p2p.ErrShuttingDown { - log.Error("quit", "err", err, "peer", peer.ID(), "stream", stream) - } - } - } -} - -// requestPeerSubscriptions calls on each live peer in the kademlia table -// and sends a `RequestSubscription` to peers according to their bin -// and their relationship with kademlia's depth. -// Also check `TestRequestPeerSubscriptions` in order to understand the -// expected behavior. -// The function expects: -// * the kademlia -// * a map of subscriptions -// * the actual function to subscribe -// (in case of the test, it doesn't do real subscriptions) -func (r *Registry) requestPeerSubscriptions(kad *network.Kademlia, subs map[enode.ID]map[Stream]struct{}) { - - var startPo int - var endPo int - var ok bool - - // kademlia's depth - kadDepth := kad.NeighbourhoodDepth() - // request subscriptions for all nodes and bins - // nil as base takes the node's base; we need to pass 255 as `EachConn` runs - // from deepest bins backwards - kad.EachConn(nil, 255, func(p *network.Peer, po int) bool { - // nodes that do not provide stream protocol - // should not be subscribed, e.g. bootnodes - if !p.HasCap("stream") { - return true - } - //if the peer's bin is shallower than the kademlia depth, - //only the peer's bin should be subscribed - if po < kadDepth { - startPo = po - endPo = po - } else { - //if the peer's bin is equal or deeper than the kademlia depth, - //each bin from the depth up to k.MaxProxDisplay should be subscribed - startPo = kadDepth - endPo = kad.MaxProxDisplay - } - - for bin := startPo; bin <= endPo; bin++ { - //do the actual subscription - ok = subscriptionFunc(r, p, uint8(bin), subs) - } - return ok - }) -} - // doRequestSubscription sends the actual RequestSubscription to the peer -func doRequestSubscription(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { - log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", p.ID(), "bin", bin) +func doRequestSubscription(r *Registry, id enode.ID, bin uint8) error { + log.Debug("Requesting subscription by registry:", "registry", r.addr, "peer", id, "bin", bin) // bin is always less then 256 and it is safe to convert it to type uint8 stream := NewStream("SYNC", FormatSyncBinKey(bin), true) - if streams, ok := subs[p.ID()]; ok { - // delete live and history streams from the map, so that it won't be removed with a Quit request - delete(streams, stream) - delete(streams, getHistoryStream(stream)) - } - err := r.RequestSubscription(p.ID(), stream, NewRange(0, 0), High) + err := r.RequestSubscription(id, stream, NewRange(0, 0), High) if err != nil { - log.Debug("Request subscription", "err", err, "peer", p.ID(), "stream", stream) - return false + log.Debug("Request subscription", "err", err, "peer", id, "stream", stream) + return err } - return true + return nil } func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error { diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index c7da05014..767112b2b 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -28,9 +28,6 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/swarm/testutil" - - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p/enode" @@ -39,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" + "github.com/ethereum/go-ethereum/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -965,162 +963,6 @@ func TestHasPriceImplementation(t *testing.T) { } } -/* -TestRequestPeerSubscriptions is a unit test for stream's pull sync subscriptions. - -The test does: - * assign each connected peer to a bin map - * build up a known kademlia in advance - * run the EachConn function, which returns supposed subscription bins - * store all supposed bins per peer in a map - * check that all peers have the expected subscriptions - -This kad table and its peers are copied from network.TestKademliaCase1, -it represents an edge case but for the purpose of testing the -syncing subscriptions it is just fine. - -Addresses used in this test are discovered as part of the simulation network -in higher level tests for streaming. They were generated randomly. - -The resulting kademlia looks like this: -========================================================================= -Fri Dec 21 20:02:39 UTC 2018 KΛÐΞMLIΛ hive: queen's address: 7efef1 -population: 12 (12), MinProxBinSize: 2, MinBinSize: 2, MaxBinSize: 4 -000 2 8196 835f | 2 8196 (0) 835f (0) -001 2 2690 28f0 | 2 2690 (0) 28f0 (0) -002 2 4d72 4a45 | 2 4d72 (0) 4a45 (0) -003 1 646e | 1 646e (0) -004 3 769c 76d1 7656 | 3 769c (0) 76d1 (0) 7656 (0) -============ DEPTH: 5 ========================================== -005 1 7a48 | 1 7a48 (0) -006 1 7cbd | 1 7cbd (0) -007 0 | 0 -008 0 | 0 -009 0 | 0 -010 0 | 0 -011 0 | 0 -012 0 | 0 -013 0 | 0 -014 0 | 0 -015 0 | 0 -========================================================================= -*/ -func TestRequestPeerSubscriptions(t *testing.T) { - // the pivot address; this is the actual kademlia node - pivotAddr := "7efef1c41d77f843ad167be95f6660567eb8a4a59f39240000cce2e0d65baf8e" - - // a map of bin number to addresses from the given kademlia - binMap := make(map[int][]string) - binMap[0] = []string{ - "835fbbf1d16ba7347b6e2fc552d6e982148d29c624ea20383850df3c810fa8fc", - "81968a2d8fb39114342ee1da85254ec51e0608d7f0f6997c2a8354c260a71009", - } - binMap[1] = []string{ - "28f0bc1b44658548d6e05dd16d4c2fe77f1da5d48b6774bc4263b045725d0c19", - "2690a910c33ee37b91eb6c4e0731d1d345e2dc3b46d308503a6e85bbc242c69e", - } - binMap[2] = []string{ - "4a45f1fc63e1a9cb9dfa44c98da2f3d20c2923e5d75ff60b2db9d1bdb0c54d51", - "4d72a04ddeb851a68cd197ef9a92a3e2ff01fbbff638e64929dd1a9c2e150112", - } - binMap[3] = []string{ - "646e9540c84f6a2f9cf6585d45a4c219573b4fd1b64a3c9a1386fc5cf98c0d4d", - } - binMap[4] = []string{ - "7656caccdc79cd8d7ce66d415cc96a718e8271c62fb35746bfc2b49faf3eebf3", - "76d1e83c71ca246d042e37ff1db181f2776265fbcfdc890ce230bfa617c9c2f0", - "769ce86aa90b518b7ed382f9fdacfbed93574e18dc98fe6c342e4f9f409c2d5a", - } - binMap[5] = []string{ - "7a48f75f8ca60487ae42d6f92b785581b40b91f2da551ae73d5eae46640e02e8", - } - binMap[6] = []string{ - "7cbd42350bde8e18ae5b955b5450f8e2cef3419f92fbf5598160c60fd78619f0", - } - - // create the pivot's kademlia - addr := common.FromHex(pivotAddr) - k := network.NewKademlia(addr, network.NewKadParams()) - - // construct the peers and the kademlia - for _, binaddrs := range binMap { - for _, a := range binaddrs { - addr := common.FromHex(a) - k.On(network.NewPeer(&network.BzzPeer{BzzAddr: &network.BzzAddr{OAddr: addr}}, k)) - } - } - - // TODO: check kad table is same - // currently k.String() prints date so it will never be the same :) - // --> implement JSON representation of kad table - log.Debug(k.String()) - - // simulate that we would do subscriptions: just store the bin numbers - fakeSubscriptions := make(map[string][]int) - //after the test, we need to reset the subscriptionFunc to the default - defer func() { subscriptionFunc = doRequestSubscription }() - // define the function which should run for each connection - // instead of doing real subscriptions, we just store the bin numbers - subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { - // get the peer ID - peerstr := fmt.Sprintf("%x", p.Over()) - // create the array of bins per peer - if _, ok := fakeSubscriptions[peerstr]; !ok { - fakeSubscriptions[peerstr] = make([]int, 0) - } - // store the (fake) bin subscription - log.Debug(fmt.Sprintf("Adding fake subscription for peer %s with bin %d", peerstr, bin)) - fakeSubscriptions[peerstr] = append(fakeSubscriptions[peerstr], int(bin)) - return true - } - // create just a simple Registry object in order to be able to call... - r := &Registry{} - r.requestPeerSubscriptions(k, nil) - // calculate the kademlia depth - kdepth := k.NeighbourhoodDepth() - - // now, check that all peers have the expected (fake) subscriptions - // iterate the bin map - for bin, peers := range binMap { - // for every peer... - for _, peer := range peers { - // ...get its (fake) subscriptions - fakeSubsForPeer := fakeSubscriptions[peer] - // if the peer's bin is shallower than the kademlia depth... - if bin < kdepth { - // (iterate all (fake) subscriptions) - for _, subbin := range fakeSubsForPeer { - // ...only the peer's bin should be "subscribed" - // (and thus have only one subscription) - if subbin != bin || len(fakeSubsForPeer) != 1 { - t.Fatalf("Did not get expected subscription for bin < depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin) - } - } - } else { //if the peer's bin is equal or higher than the kademlia depth... - // (iterate all (fake) subscriptions) - for i, subbin := range fakeSubsForPeer { - // ...each bin from the peer's bin number up to k.MaxProxDisplay should be "subscribed" - // as we start from depth we can use the iteration index to check - if subbin != i+kdepth { - t.Fatalf("Did not get expected subscription for bin > depth; bin of peer %s: %d, subscription: %d", peer, bin, subbin) - } - // the last "subscription" should be k.MaxProxDisplay - if i == len(fakeSubsForPeer)-1 && subbin != k.MaxProxDisplay { - t.Fatalf("Expected last subscription to be: %d, but is: %d", k.MaxProxDisplay, subbin) - } - } - } - } - } - // print some output - for p, subs := range fakeSubscriptions { - log.Debug(fmt.Sprintf("Peer %s has the following fake subscriptions: ", p)) - for _, bin := range subs { - log.Debug(fmt.Sprintf("%d,", bin)) - } - } -} - // TestGetServerSubscriptions is a unit test for the api.GetPeerServerSubscriptions() function func TestGetServerSubscriptions(t *testing.T) { // create an amount of dummy peers @@ -1204,15 +1046,13 @@ func TestGetServerSubscriptionsRPC(t *testing.T) { defer func() { subscriptionFunc = doRequestSubscription }() // we use this subscriptionFunc for this test: just increases count and calls the actual subscription - subscriptionFunc = func(r *Registry, p *network.Peer, bin uint8, subs map[enode.ID]map[Stream]struct{}) bool { + subscriptionFunc = func(r *Registry, id enode.ID, bin uint8) error { // syncing starts after syncUpdateDelay and loops after that Duration; we only want to count at the first iteration // in the first iteration, subs will be empty (no existing subscriptions), thus we can use this check // this avoids flakyness - if len(subs) == 0 { - expectedMsgCount.inc() - } - doRequestSubscription(r, p, bin, subs) - return true + expectedMsgCount.inc() + doRequestSubscription(r, id, bin) + return nil } // create a standard sim sim := simulation.New(map[string]simulation.ServiceFunc{ diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index 79b04a307..043192903 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -88,6 +88,11 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { + //TODO: maybe add unit test for intervals usage in netstore/localstore together with SwarmSyncerServer? + if from > 0 { + from-- + } + descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop()