plugeth/p2p/discv5/ticket.go
Felföldi Zsolt 92580d69d3 p2p, p2p/discover, p2p/discv5: implement UDP port sharing (#15200)
This commit affects p2p/discv5 "topic discovery" by running it on
the same UDP port where the old discovery works. This is realized
by giving an "unhandled" packet channel to the old v4 discovery
packet handler where all invalid packets are sent. These packets
are then processed by v5. v5 packets are always invalid when
interpreted by v4 and vice versa. This is ensured by adding one
to the first byte of the packet hash in v5 packets.

DiscoveryV5Bootnodes is also changed to point to new bootnodes
that are implementing the changed packet format with modified
hash. Existing and new v5 bootnodes are both running on different
ports ATM.
2018-01-22 13:38:34 +01:00

955 lines
25 KiB
Go

// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package discv5
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"math/rand"
"sort"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
const (
ticketTimeBucketLen = time.Minute
timeWindow = 10 // * ticketTimeBucketLen
wantTicketsInWindow = 10
collectFrequency = time.Second * 30
registerFrequency = time.Second * 60
maxCollectDebt = 10
maxRegisterDebt = 5
keepTicketConst = time.Minute * 10
keepTicketExp = time.Minute * 5
targetWaitTime = time.Minute * 10
topicQueryTimeout = time.Second * 5
topicQueryResend = time.Minute
// topic radius detection
maxRadius = 0xffffffffffffffff
radiusTC = time.Minute * 20
radiusBucketsPerBit = 8
minSlope = 1
minPeakSize = 40
maxNoAdjust = 20
lookupWidth = 8
minRightSum = 20
searchForceQuery = 4
)
// timeBucket represents absolute monotonic time in minutes.
// It is used as the index into the per-topic ticket buckets.
type timeBucket int
type ticket struct {
topics []Topic
regTime []mclock.AbsTime // Per-topic local absolute time when the ticket can be used.
// The serial number that was issued by the server.
serial uint32
// Used by registrar, tracks absolute time when the ticket was created.
issueTime mclock.AbsTime
// Fields used only by registrants
node *Node // the registrar node that signed this ticket
refCnt int // tracks number of topics that will be registered using this ticket
pong []byte // encoded pong packet signed by the registrar
}
// ticketRef refers to a single topic in a ticket.
type ticketRef struct {
t *ticket
idx int // index of the topic in t.topics and t.regTime
}
func (ref ticketRef) topic() Topic {
return ref.t.topics[ref.idx]
}
func (ref ticketRef) topicRegTime() mclock.AbsTime {
return ref.t.regTime[ref.idx]
}
func pongToTicket(localTime mclock.AbsTime, topics []Topic, node *Node, p *ingressPacket) (*ticket, error) {
wps := p.data.(*pong).WaitPeriods
if len(topics) != len(wps) {
return nil, fmt.Errorf("bad wait period list: got %d values, want %d", len(topics), len(wps))
}
if rlpHash(topics) != p.data.(*pong).TopicHash {
return nil, fmt.Errorf("bad topic hash")
}
t := &ticket{
issueTime: localTime,
node: node,
topics: topics,
pong: p.rawData,
regTime: make([]mclock.AbsTime, len(wps)),
}
// Convert wait periods to local absolute time.
for i, wp := range wps {
t.regTime[i] = localTime + mclock.AbsTime(time.Second*time.Duration(wp))
}
return t, nil
}
func ticketToPong(t *ticket, pong *pong) {
pong.Expiration = uint64(t.issueTime / mclock.AbsTime(time.Second))
pong.TopicHash = rlpHash(t.topics)
pong.TicketSerial = t.serial
pong.WaitPeriods = make([]uint32, len(t.regTime))
for i, regTime := range t.regTime {
pong.WaitPeriods[i] = uint32(time.Duration(regTime-t.issueTime) / time.Second)
}
}
type ticketStore struct {
// radius detector and target address generator
// exists for both searched and registered topics
radius map[Topic]*topicRadius
// Contains buckets (for each absolute minute) of tickets
// that can be used in that minute.
// This is only set if the topic is being registered.
tickets map[Topic]*topicTickets
regQueue []Topic // Topic registration queue for round robin attempts
regSet map[Topic]struct{} // Topic registration queue contents for fast filling
nodes map[*Node]*ticket
nodeLastReq map[*Node]reqInfo
lastBucketFetched timeBucket
nextTicketCached *ticketRef
nextTicketReg mclock.AbsTime
searchTopicMap map[Topic]searchTopic
nextTopicQueryCleanup mclock.AbsTime
queriesSent map[*Node]map[common.Hash]sentQuery
}
type searchTopic struct {
foundChn chan<- *Node
}
type sentQuery struct {
sent mclock.AbsTime
lookup lookupInfo
}
type topicTickets struct {
buckets map[timeBucket][]ticketRef
nextLookup mclock.AbsTime
nextReg mclock.AbsTime
}
func newTicketStore() *ticketStore {
return &ticketStore{
radius: make(map[Topic]*topicRadius),
tickets: make(map[Topic]*topicTickets),
regSet: make(map[Topic]struct{}),
nodes: make(map[*Node]*ticket),
nodeLastReq: make(map[*Node]reqInfo),
searchTopicMap: make(map[Topic]searchTopic),
queriesSent: make(map[*Node]map[common.Hash]sentQuery),
}
}
// addTopic starts tracking a topic. If register is true,
// the local node will register the topic and tickets will be collected.
func (s *ticketStore) addTopic(topic Topic, register bool) {
log.Trace("Adding discovery topic", "topic", topic, "register", register)
if s.radius[topic] == nil {
s.radius[topic] = newTopicRadius(topic)
}
if register && s.tickets[topic] == nil {
s.tickets[topic] = &topicTickets{buckets: make(map[timeBucket][]ticketRef)}
}
}
func (s *ticketStore) addSearchTopic(t Topic, foundChn chan<- *Node) {
s.addTopic(t, false)
if s.searchTopicMap[t].foundChn == nil {
s.searchTopicMap[t] = searchTopic{foundChn: foundChn}
}
}
func (s *ticketStore) removeSearchTopic(t Topic) {
if st := s.searchTopicMap[t]; st.foundChn != nil {
delete(s.searchTopicMap, t)
}
}
// removeRegisterTopic deletes all tickets for the given topic.
func (s *ticketStore) removeRegisterTopic(topic Topic) {
log.Trace("Removing discovery topic", "topic", topic)
if s.tickets[topic] == nil {
log.Warn("Removing non-existent discovery topic", "topic", topic)
return
}
for _, list := range s.tickets[topic].buckets {
for _, ref := range list {
ref.t.refCnt--
if ref.t.refCnt == 0 {
delete(s.nodes, ref.t.node)
delete(s.nodeLastReq, ref.t.node)
}
}
}
delete(s.tickets, topic)
}
func (s *ticketStore) regTopicSet() []Topic {
topics := make([]Topic, 0, len(s.tickets))
for topic := range s.tickets {
topics = append(topics, topic)
}
return topics
}
// nextRegisterLookup returns the target of the next lookup for ticket collection.
func (s *ticketStore) nextRegisterLookup() (lookupInfo, time.Duration) {
// Queue up any new topics (or discarded ones), preserving iteration order
for topic := range s.tickets {
if _, ok := s.regSet[topic]; !ok {
s.regQueue = append(s.regQueue, topic)
s.regSet[topic] = struct{}{}
}
}
// Iterate over the set of all topics and look up the next suitable one
for len(s.regQueue) > 0 {
// Fetch the next topic from the queue, and ensure it still exists
topic := s.regQueue[0]
s.regQueue = s.regQueue[1:]
delete(s.regSet, topic)
if s.tickets[topic] == nil {
continue
}
// If the topic needs more tickets, return it
if s.tickets[topic].nextLookup < mclock.Now() {
next, delay := s.radius[topic].nextTarget(false), 100*time.Millisecond
log.Trace("Found discovery topic to register", "topic", topic, "target", next.target, "delay", delay)
return next, delay
}
}
// No registration topics found or all exhausted, sleep
delay := 40 * time.Second
log.Trace("No topic found to register", "delay", delay)
return lookupInfo{}, delay
}
func (s *ticketStore) nextSearchLookup(topic Topic) lookupInfo {
tr := s.radius[topic]
target := tr.nextTarget(tr.radiusLookupCnt >= searchForceQuery)
if target.radiusLookup {
tr.radiusLookupCnt++
} else {
tr.radiusLookupCnt = 0
}
return target
}
// ticketsInWindow returns the tickets of a given topic in the registration window.
func (s *ticketStore) ticketsInWindow(topic Topic) []ticketRef {
// Sanity check that the topic still exists before operating on it
if s.tickets[topic] == nil {
log.Warn("Listing non-existing discovery tickets", "topic", topic)
return nil
}
// Gather all the tickers in the next time window
var tickets []ticketRef
buckets := s.tickets[topic].buckets
for idx := timeBucket(0); idx < timeWindow; idx++ {
tickets = append(tickets, buckets[s.lastBucketFetched+idx]...)
}
log.Trace("Retrieved discovery registration tickets", "topic", topic, "from", s.lastBucketFetched, "tickets", len(tickets))
return tickets
}
func (s *ticketStore) removeExcessTickets(t Topic) {
tickets := s.ticketsInWindow(t)
if len(tickets) <= wantTicketsInWindow {
return
}
sort.Sort(ticketRefByWaitTime(tickets))
for _, r := range tickets[wantTicketsInWindow:] {
s.removeTicketRef(r)
}
}
type ticketRefByWaitTime []ticketRef
// Len is the number of elements in the collection.
func (s ticketRefByWaitTime) Len() int {
return len(s)
}
func (r ticketRef) waitTime() mclock.AbsTime {
return r.t.regTime[r.idx] - r.t.issueTime
}
// Less reports whether the element with
// index i should sort before the element with index j.
func (s ticketRefByWaitTime) Less(i, j int) bool {
return s[i].waitTime() < s[j].waitTime()
}
// Swap swaps the elements with indexes i and j.
func (s ticketRefByWaitTime) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s *ticketStore) addTicketRef(r ticketRef) {
topic := r.t.topics[r.idx]
tickets := s.tickets[topic]
if tickets == nil {
log.Warn("Adding ticket to non-existent topic", "topic", topic)
return
}
bucket := timeBucket(r.t.regTime[r.idx] / mclock.AbsTime(ticketTimeBucketLen))
tickets.buckets[bucket] = append(tickets.buckets[bucket], r)
r.t.refCnt++
min := mclock.Now() - mclock.AbsTime(collectFrequency)*maxCollectDebt
if tickets.nextLookup < min {
tickets.nextLookup = min
}
tickets.nextLookup += mclock.AbsTime(collectFrequency)
//s.removeExcessTickets(topic)
}
func (s *ticketStore) nextFilteredTicket() (*ticketRef, time.Duration) {
now := mclock.Now()
for {
ticket, wait := s.nextRegisterableTicket()
if ticket == nil {
return ticket, wait
}
log.Trace("Found discovery ticket to register", "node", ticket.t.node, "serial", ticket.t.serial, "wait", wait)
regTime := now + mclock.AbsTime(wait)
topic := ticket.t.topics[ticket.idx]
if regTime >= s.tickets[topic].nextReg {
return ticket, wait
}
s.removeTicketRef(*ticket)
}
}
func (s *ticketStore) ticketRegistered(ref ticketRef) {
now := mclock.Now()
topic := ref.t.topics[ref.idx]
tickets := s.tickets[topic]
min := now - mclock.AbsTime(registerFrequency)*maxRegisterDebt
if min > tickets.nextReg {
tickets.nextReg = min
}
tickets.nextReg += mclock.AbsTime(registerFrequency)
s.tickets[topic] = tickets
s.removeTicketRef(ref)
}
// nextRegisterableTicket returns the next ticket that can be used
// to register.
//
// If the returned wait time <= zero the ticket can be used. For a positive
// wait time, the caller should requery the next ticket later.
//
// A ticket can be returned more than once with <= zero wait time in case
// the ticket contains multiple topics.
func (s *ticketStore) nextRegisterableTicket() (*ticketRef, time.Duration) {
now := mclock.Now()
if s.nextTicketCached != nil {
return s.nextTicketCached, time.Duration(s.nextTicketCached.topicRegTime() - now)
}
for bucket := s.lastBucketFetched; ; bucket++ {
var (
empty = true // true if there are no tickets
nextTicket ticketRef // uninitialized if this bucket is empty
)
for _, tickets := range s.tickets {
//s.removeExcessTickets(topic)
if len(tickets.buckets) != 0 {
empty = false
list := tickets.buckets[bucket]
for _, ref := range list {
//debugLog(fmt.Sprintf(" nrt bucket = %d node = %x sn = %v wait = %v", bucket, ref.t.node.ID[:8], ref.t.serial, time.Duration(ref.topicRegTime()-now)))
if nextTicket.t == nil || ref.topicRegTime() < nextTicket.topicRegTime() {
nextTicket = ref
}
}
}
}
if empty {
return nil, 0
}
if nextTicket.t != nil {
s.nextTicketCached = &nextTicket
return &nextTicket, time.Duration(nextTicket.topicRegTime() - now)
}
s.lastBucketFetched = bucket
}
}
// removeTicket removes a ticket from the ticket store
func (s *ticketStore) removeTicketRef(ref ticketRef) {
log.Trace("Removing discovery ticket reference", "node", ref.t.node.ID, "serial", ref.t.serial)
topic := ref.topic()
tickets := s.tickets[topic]
if tickets == nil {
log.Warn("Removing tickets from unknown topic", "topic", topic)
return
}
bucket := timeBucket(ref.t.regTime[ref.idx] / mclock.AbsTime(ticketTimeBucketLen))
list := tickets.buckets[bucket]
idx := -1
for i, bt := range list {
if bt.t == ref.t {
idx = i
break
}
}
if idx == -1 {
panic(nil)
}
list = append(list[:idx], list[idx+1:]...)
if len(list) != 0 {
tickets.buckets[bucket] = list
} else {
delete(tickets.buckets, bucket)
}
ref.t.refCnt--
if ref.t.refCnt == 0 {
delete(s.nodes, ref.t.node)
delete(s.nodeLastReq, ref.t.node)
}
// Make nextRegisterableTicket return the next available ticket.
s.nextTicketCached = nil
}
type lookupInfo struct {
target common.Hash
topic Topic
radiusLookup bool
}
type reqInfo struct {
pingHash []byte
lookup lookupInfo
time mclock.AbsTime
}
// returns -1 if not found
func (t *ticket) findIdx(topic Topic) int {
for i, tt := range t.topics {
if tt == topic {
return i
}
}
return -1
}
func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte) {
now := mclock.Now()
for i, n := range nodes {
if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
if lookup.radiusLookup {
if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
}
} else {
if s.nodes[n] == nil {
s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
}
}
}
}
}
func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte, query func(n *Node, topic Topic) []byte) {
now := mclock.Now()
for i, n := range nodes {
if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius {
if lookup.radiusLookup {
if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC {
s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now}
}
} // else {
if s.canQueryTopic(n, lookup.topic) {
hash := query(n, lookup.topic)
if hash != nil {
s.addTopicQuery(common.BytesToHash(hash), n, lookup)
}
}
//}
}
}
}
func (s *ticketStore) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t *ticket) {
for i, topic := range t.topics {
if tt, ok := s.radius[topic]; ok {
tt.adjustWithTicket(now, targetHash, ticketRef{t, i})
}
}
}
func (s *ticketStore) addTicket(localTime mclock.AbsTime, pingHash []byte, ticket *ticket) {
log.Trace("Adding discovery ticket", "node", ticket.node.ID, "serial", ticket.serial)
lastReq, ok := s.nodeLastReq[ticket.node]
if !(ok && bytes.Equal(pingHash, lastReq.pingHash)) {
return
}
s.adjustWithTicket(localTime, lastReq.lookup.target, ticket)
if lastReq.lookup.radiusLookup || s.nodes[ticket.node] != nil {
return
}
topic := lastReq.lookup.topic
topicIdx := ticket.findIdx(topic)
if topicIdx == -1 {
return
}
bucket := timeBucket(localTime / mclock.AbsTime(ticketTimeBucketLen))
if s.lastBucketFetched == 0 || bucket < s.lastBucketFetched {
s.lastBucketFetched = bucket
}
if _, ok := s.tickets[topic]; ok {
wait := ticket.regTime[topicIdx] - localTime
rnd := rand.ExpFloat64()
if rnd > 10 {
rnd = 10
}
if float64(wait) < float64(keepTicketConst)+float64(keepTicketExp)*rnd {
// use the ticket to register this topic
//fmt.Println("addTicket", ticket.node.ID[:8], ticket.node.addr().String(), ticket.serial, ticket.pong)
s.addTicketRef(ticketRef{ticket, topicIdx})
}
}
if ticket.refCnt > 0 {
s.nextTicketCached = nil
s.nodes[ticket.node] = ticket
}
}
func (s *ticketStore) getNodeTicket(node *Node) *ticket {
if s.nodes[node] == nil {
log.Trace("Retrieving node ticket", "node", node.ID, "serial", nil)
} else {
log.Trace("Retrieving node ticket", "node", node.ID, "serial", s.nodes[node].serial)
}
return s.nodes[node]
}
func (s *ticketStore) canQueryTopic(node *Node, topic Topic) bool {
qq := s.queriesSent[node]
if qq != nil {
now := mclock.Now()
for _, sq := range qq {
if sq.lookup.topic == topic && sq.sent > now-mclock.AbsTime(topicQueryResend) {
return false
}
}
}
return true
}
func (s *ticketStore) addTopicQuery(hash common.Hash, node *Node, lookup lookupInfo) {
now := mclock.Now()
qq := s.queriesSent[node]
if qq == nil {
qq = make(map[common.Hash]sentQuery)
s.queriesSent[node] = qq
}
qq[hash] = sentQuery{sent: now, lookup: lookup}
s.cleanupTopicQueries(now)
}
func (s *ticketStore) cleanupTopicQueries(now mclock.AbsTime) {
if s.nextTopicQueryCleanup > now {
return
}
exp := now - mclock.AbsTime(topicQueryResend)
for n, qq := range s.queriesSent {
for h, q := range qq {
if q.sent < exp {
delete(qq, h)
}
}
if len(qq) == 0 {
delete(s.queriesSent, n)
}
}
s.nextTopicQueryCleanup = now + mclock.AbsTime(topicQueryTimeout)
}
func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNode) (timeout bool) {
now := mclock.Now()
//fmt.Println("got", from.addr().String(), hash, len(nodes))
qq := s.queriesSent[from]
if qq == nil {
return true
}
q, ok := qq[hash]
if !ok || now > q.sent+mclock.AbsTime(topicQueryTimeout) {
return true
}
inside := float64(0)
if len(nodes) > 0 {
inside = 1
}
s.radius[q.lookup.topic].adjust(now, q.lookup.target, from.sha, inside)
chn := s.searchTopicMap[q.lookup.topic].foundChn
if chn == nil {
//fmt.Println("no channel")
return false
}
for _, node := range nodes {
ip := node.IP
if ip.IsUnspecified() || ip.IsLoopback() {
ip = from.IP
}
n := NewNode(node.ID, ip, node.UDP, node.TCP)
select {
case chn <- n:
default:
return false
}
}
return false
}
type topicRadius struct {
topic Topic
topicHashPrefix uint64
radius, minRadius uint64
buckets []topicRadiusBucket
converged bool
radiusLookupCnt int
}
type topicRadiusEvent int
const (
trOutside topicRadiusEvent = iota
trInside
trNoAdjust
trCount
)
type topicRadiusBucket struct {
weights [trCount]float64
lastTime mclock.AbsTime
value float64
lookupSent map[common.Hash]mclock.AbsTime
}
func (b *topicRadiusBucket) update(now mclock.AbsTime) {
if now == b.lastTime {
return
}
exp := math.Exp(-float64(now-b.lastTime) / float64(radiusTC))
for i, w := range b.weights {
b.weights[i] = w * exp
}
b.lastTime = now
for target, tm := range b.lookupSent {
if now-tm > mclock.AbsTime(respTimeout) {
b.weights[trNoAdjust] += 1
delete(b.lookupSent, target)
}
}
}
func (b *topicRadiusBucket) adjust(now mclock.AbsTime, inside float64) {
b.update(now)
if inside <= 0 {
b.weights[trOutside] += 1
} else {
if inside >= 1 {
b.weights[trInside] += 1
} else {
b.weights[trInside] += inside
b.weights[trOutside] += 1 - inside
}
}
}
func newTopicRadius(t Topic) *topicRadius {
topicHash := crypto.Keccak256Hash([]byte(t))
topicHashPrefix := binary.BigEndian.Uint64(topicHash[0:8])
return &topicRadius{
topic: t,
topicHashPrefix: topicHashPrefix,
radius: maxRadius,
minRadius: maxRadius,
}
}
func (r *topicRadius) getBucketIdx(addrHash common.Hash) int {
prefix := binary.BigEndian.Uint64(addrHash[0:8])
var log2 float64
if prefix != r.topicHashPrefix {
log2 = math.Log2(float64(prefix ^ r.topicHashPrefix))
}
bucket := int((64 - log2) * radiusBucketsPerBit)
max := 64*radiusBucketsPerBit - 1
if bucket > max {
return max
}
if bucket < 0 {
return 0
}
return bucket
}
func (r *topicRadius) targetForBucket(bucket int) common.Hash {
min := math.Pow(2, 64-float64(bucket+1)/radiusBucketsPerBit)
max := math.Pow(2, 64-float64(bucket)/radiusBucketsPerBit)
a := uint64(min)
b := randUint64n(uint64(max - min))
xor := a + b
if xor < a {
xor = ^uint64(0)
}
prefix := r.topicHashPrefix ^ xor
var target common.Hash
binary.BigEndian.PutUint64(target[0:8], prefix)
globalRandRead(target[8:])
return target
}
// package rand provides a Read function in Go 1.6 and later, but
// we can't use it yet because we still support Go 1.5.
func globalRandRead(b []byte) {
pos := 0
val := 0
for n := 0; n < len(b); n++ {
if pos == 0 {
val = rand.Int()
pos = 7
}
b[n] = byte(val)
val >>= 8
pos--
}
}
func (r *topicRadius) isInRadius(addrHash common.Hash) bool {
nodePrefix := binary.BigEndian.Uint64(addrHash[0:8])
dist := nodePrefix ^ r.topicHashPrefix
return dist < r.radius
}
func (r *topicRadius) chooseLookupBucket(a, b int) int {
if a < 0 {
a = 0
}
if a > b {
return -1
}
c := 0
for i := a; i <= b; i++ {
if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
c++
}
}
if c == 0 {
return -1
}
rnd := randUint(uint32(c))
for i := a; i <= b; i++ {
if i >= len(r.buckets) || r.buckets[i].weights[trNoAdjust] < maxNoAdjust {
if rnd == 0 {
return i
}
rnd--
}
}
panic(nil) // should never happen
}
func (r *topicRadius) needMoreLookups(a, b int, maxValue float64) bool {
var max float64
if a < 0 {
a = 0
}
if b >= len(r.buckets) {
b = len(r.buckets) - 1
if r.buckets[b].value > max {
max = r.buckets[b].value
}
}
if b >= a {
for i := a; i <= b; i++ {
if r.buckets[i].value > max {
max = r.buckets[i].value
}
}
}
return maxValue-max < minPeakSize
}
func (r *topicRadius) recalcRadius() (radius uint64, radiusLookup int) {
maxBucket := 0
maxValue := float64(0)
now := mclock.Now()
v := float64(0)
for i := range r.buckets {
r.buckets[i].update(now)
v += r.buckets[i].weights[trOutside] - r.buckets[i].weights[trInside]
r.buckets[i].value = v
//fmt.Printf("%v %v | ", v, r.buckets[i].weights[trNoAdjust])
}
//fmt.Println()
slopeCross := -1
for i, b := range r.buckets {
v := b.value
if v < float64(i)*minSlope {
slopeCross = i
break
}
if v > maxValue {
maxValue = v
maxBucket = i + 1
}
}
minRadBucket := len(r.buckets)
sum := float64(0)
for minRadBucket > 0 && sum < minRightSum {
minRadBucket--
b := r.buckets[minRadBucket]
sum += b.weights[trInside] + b.weights[trOutside]
}
r.minRadius = uint64(math.Pow(2, 64-float64(minRadBucket)/radiusBucketsPerBit))
lookupLeft := -1
if r.needMoreLookups(0, maxBucket-lookupWidth-1, maxValue) {
lookupLeft = r.chooseLookupBucket(maxBucket-lookupWidth, maxBucket-1)
}
lookupRight := -1
if slopeCross != maxBucket && (minRadBucket <= maxBucket || r.needMoreLookups(maxBucket+lookupWidth, len(r.buckets)-1, maxValue)) {
for len(r.buckets) <= maxBucket+lookupWidth {
r.buckets = append(r.buckets, topicRadiusBucket{lookupSent: make(map[common.Hash]mclock.AbsTime)})
}
lookupRight = r.chooseLookupBucket(maxBucket, maxBucket+lookupWidth-1)
}
if lookupLeft == -1 {
radiusLookup = lookupRight
} else {
if lookupRight == -1 {
radiusLookup = lookupLeft
} else {
if randUint(2) == 0 {
radiusLookup = lookupLeft
} else {
radiusLookup = lookupRight
}
}
}
//fmt.Println("mb", maxBucket, "sc", slopeCross, "mrb", minRadBucket, "ll", lookupLeft, "lr", lookupRight, "mv", maxValue)
if radiusLookup == -1 {
// no more radius lookups needed at the moment, return a radius
r.converged = true
rad := maxBucket
if minRadBucket < rad {
rad = minRadBucket
}
radius = ^uint64(0)
if rad > 0 {
radius = uint64(math.Pow(2, 64-float64(rad)/radiusBucketsPerBit))
}
r.radius = radius
}
return
}
func (r *topicRadius) nextTarget(forceRegular bool) lookupInfo {
if !forceRegular {
_, radiusLookup := r.recalcRadius()
if radiusLookup != -1 {
target := r.targetForBucket(radiusLookup)
r.buckets[radiusLookup].lookupSent[target] = mclock.Now()
return lookupInfo{target: target, topic: r.topic, radiusLookup: true}
}
}
radExt := r.radius / 2
if radExt > maxRadius-r.radius {
radExt = maxRadius - r.radius
}
rnd := randUint64n(r.radius) + randUint64n(2*radExt)
if rnd > radExt {
rnd -= radExt
} else {
rnd = radExt - rnd
}
prefix := r.topicHashPrefix ^ rnd
var target common.Hash
binary.BigEndian.PutUint64(target[0:8], prefix)
globalRandRead(target[8:])
return lookupInfo{target: target, topic: r.topic, radiusLookup: false}
}
func (r *topicRadius) adjustWithTicket(now mclock.AbsTime, targetHash common.Hash, t ticketRef) {
wait := t.t.regTime[t.idx] - t.t.issueTime
inside := float64(wait)/float64(targetWaitTime) - 0.5
if inside > 1 {
inside = 1
}
if inside < 0 {
inside = 0
}
r.adjust(now, targetHash, t.t.node.sha, inside)
}
func (r *topicRadius) adjust(now mclock.AbsTime, targetHash, addrHash common.Hash, inside float64) {
bucket := r.getBucketIdx(addrHash)
//fmt.Println("adjust", bucket, len(r.buckets), inside)
if bucket >= len(r.buckets) {
return
}
r.buckets[bucket].adjust(now, inside)
delete(r.buckets[bucket].lookupSent, targetHash)
}