279 lines
7.1 KiB
Go
279 lines
7.1 KiB
Go
package p2p
|
|
|
|
import (
|
|
"container/heap"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/logger"
|
|
"github.com/ethereum/go-ethereum/logger/glog"
|
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
|
)
|
|
|
|
const (
|
|
// This is the amount of time spent waiting in between
|
|
// redialing a certain node.
|
|
dialHistoryExpiration = 30 * time.Second
|
|
|
|
// Discovery lookups are throttled and can only run
|
|
// once every few seconds.
|
|
lookupInterval = 4 * time.Second
|
|
)
|
|
|
|
// dialstate schedules dials and discovery lookups.
|
|
// it get's a chance to compute new tasks on every iteration
|
|
// of the main loop in Server.run.
|
|
type dialstate struct {
|
|
maxDynDials int
|
|
ntab discoverTable
|
|
|
|
lookupRunning bool
|
|
bootstrapped bool
|
|
|
|
dialing map[discover.NodeID]connFlag
|
|
lookupBuf []*discover.Node // current discovery lookup results
|
|
randomNodes []*discover.Node // filled from Table
|
|
static map[discover.NodeID]*discover.Node
|
|
hist *dialHistory
|
|
}
|
|
|
|
type discoverTable interface {
|
|
Self() *discover.Node
|
|
Close()
|
|
Bootstrap([]*discover.Node)
|
|
Lookup(target discover.NodeID) []*discover.Node
|
|
ReadRandomNodes([]*discover.Node) int
|
|
}
|
|
|
|
// the dial history remembers recent dials.
|
|
type dialHistory []pastDial
|
|
|
|
// pastDial is an entry in the dial history.
|
|
type pastDial struct {
|
|
id discover.NodeID
|
|
exp time.Time
|
|
}
|
|
|
|
type task interface {
|
|
Do(*Server)
|
|
}
|
|
|
|
// A dialTask is generated for each node that is dialed.
|
|
type dialTask struct {
|
|
flags connFlag
|
|
dest *discover.Node
|
|
}
|
|
|
|
// discoverTask runs discovery table operations.
|
|
// Only one discoverTask is active at any time.
|
|
//
|
|
// If bootstrap is true, the task runs Table.Bootstrap,
|
|
// otherwise it performs a random lookup and leaves the
|
|
// results in the task.
|
|
type discoverTask struct {
|
|
bootstrap bool
|
|
results []*discover.Node
|
|
}
|
|
|
|
// A waitExpireTask is generated if there are no other tasks
|
|
// to keep the loop in Server.run ticking.
|
|
type waitExpireTask struct {
|
|
time.Duration
|
|
}
|
|
|
|
func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate {
|
|
s := &dialstate{
|
|
maxDynDials: maxdyn,
|
|
ntab: ntab,
|
|
static: make(map[discover.NodeID]*discover.Node),
|
|
dialing: make(map[discover.NodeID]connFlag),
|
|
randomNodes: make([]*discover.Node, maxdyn/2),
|
|
hist: new(dialHistory),
|
|
}
|
|
for _, n := range static {
|
|
s.static[n.ID] = n
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (s *dialstate) addStatic(n *discover.Node) {
|
|
s.static[n.ID] = n
|
|
}
|
|
|
|
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
|
|
var newtasks []task
|
|
addDial := func(flag connFlag, n *discover.Node) bool {
|
|
_, dialing := s.dialing[n.ID]
|
|
if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
|
|
return false
|
|
}
|
|
s.dialing[n.ID] = flag
|
|
newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
|
|
return true
|
|
}
|
|
|
|
// Compute number of dynamic dials necessary at this point.
|
|
needDynDials := s.maxDynDials
|
|
for _, p := range peers {
|
|
if p.rw.is(dynDialedConn) {
|
|
needDynDials--
|
|
}
|
|
}
|
|
for _, flag := range s.dialing {
|
|
if flag&dynDialedConn != 0 {
|
|
needDynDials--
|
|
}
|
|
}
|
|
|
|
// Expire the dial history on every invocation.
|
|
s.hist.expire(now)
|
|
|
|
// Create dials for static nodes if they are not connected.
|
|
for _, n := range s.static {
|
|
addDial(staticDialedConn, n)
|
|
}
|
|
|
|
// Use random nodes from the table for half of the necessary
|
|
// dynamic dials.
|
|
randomCandidates := needDynDials / 2
|
|
if randomCandidates > 0 && s.bootstrapped {
|
|
n := s.ntab.ReadRandomNodes(s.randomNodes)
|
|
for i := 0; i < randomCandidates && i < n; i++ {
|
|
if addDial(dynDialedConn, s.randomNodes[i]) {
|
|
needDynDials--
|
|
}
|
|
}
|
|
}
|
|
// Create dynamic dials from random lookup results, removing tried
|
|
// items from the result buffer.
|
|
i := 0
|
|
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
|
|
if addDial(dynDialedConn, s.lookupBuf[i]) {
|
|
needDynDials--
|
|
}
|
|
}
|
|
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
|
|
// Launch a discovery lookup if more candidates are needed. The
|
|
// first discoverTask bootstraps the table and won't return any
|
|
// results.
|
|
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
|
|
s.lookupRunning = true
|
|
newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped})
|
|
}
|
|
|
|
// Launch a timer to wait for the next node to expire if all
|
|
// candidates have been tried and no task is currently active.
|
|
// This should prevent cases where the dialer logic is not ticked
|
|
// because there are no pending events.
|
|
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
|
|
t := &waitExpireTask{s.hist.min().exp.Sub(now)}
|
|
newtasks = append(newtasks, t)
|
|
}
|
|
return newtasks
|
|
}
|
|
|
|
func (s *dialstate) taskDone(t task, now time.Time) {
|
|
switch t := t.(type) {
|
|
case *dialTask:
|
|
s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration))
|
|
delete(s.dialing, t.dest.ID)
|
|
case *discoverTask:
|
|
if t.bootstrap {
|
|
s.bootstrapped = true
|
|
}
|
|
s.lookupRunning = false
|
|
s.lookupBuf = append(s.lookupBuf, t.results...)
|
|
}
|
|
}
|
|
|
|
func (t *dialTask) Do(srv *Server) {
|
|
addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
|
|
glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
|
|
fd, err := srv.Dialer.Dial("tcp", addr.String())
|
|
if err != nil {
|
|
glog.V(logger.Detail).Infof("dial error: %v", err)
|
|
return
|
|
}
|
|
mfd := newMeteredConn(fd, false)
|
|
|
|
srv.setupConn(mfd, t.flags, t.dest)
|
|
}
|
|
func (t *dialTask) String() string {
|
|
return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
|
|
}
|
|
|
|
func (t *discoverTask) Do(srv *Server) {
|
|
if t.bootstrap {
|
|
srv.ntab.Bootstrap(srv.BootstrapNodes)
|
|
return
|
|
}
|
|
// newTasks generates a lookup task whenever dynamic dials are
|
|
// necessary. Lookups need to take some time, otherwise the
|
|
// event loop spins too fast.
|
|
next := srv.lastLookup.Add(lookupInterval)
|
|
if now := time.Now(); now.Before(next) {
|
|
time.Sleep(next.Sub(now))
|
|
}
|
|
srv.lastLookup = time.Now()
|
|
var target discover.NodeID
|
|
rand.Read(target[:])
|
|
t.results = srv.ntab.Lookup(target)
|
|
}
|
|
|
|
func (t *discoverTask) String() (s string) {
|
|
if t.bootstrap {
|
|
s = "discovery bootstrap"
|
|
} else {
|
|
s = "discovery lookup"
|
|
}
|
|
if len(t.results) > 0 {
|
|
s += fmt.Sprintf(" (%d results)", len(t.results))
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (t waitExpireTask) Do(*Server) {
|
|
time.Sleep(t.Duration)
|
|
}
|
|
func (t waitExpireTask) String() string {
|
|
return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration)
|
|
}
|
|
|
|
// Use only these methods to access or modify dialHistory.
|
|
func (h dialHistory) min() pastDial {
|
|
return h[0]
|
|
}
|
|
func (h *dialHistory) add(id discover.NodeID, exp time.Time) {
|
|
heap.Push(h, pastDial{id, exp})
|
|
}
|
|
func (h dialHistory) contains(id discover.NodeID) bool {
|
|
for _, v := range h {
|
|
if v.id == id {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
func (h *dialHistory) expire(now time.Time) {
|
|
for h.Len() > 0 && h.min().exp.Before(now) {
|
|
heap.Pop(h)
|
|
}
|
|
}
|
|
|
|
// heap.Interface boilerplate
|
|
func (h dialHistory) Len() int { return len(h) }
|
|
func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) }
|
|
func (h dialHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
func (h *dialHistory) Push(x interface{}) {
|
|
*h = append(*h, x.(pastDial))
|
|
}
|
|
func (h *dialHistory) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|