p2p: improve readability of dial task scheduling code

This commit is contained in:
Felix Lange 2016-05-02 17:01:13 +02:00
parent 1c20313a6a
commit 32bb280179
2 changed files with 78 additions and 29 deletions

View File

@ -398,12 +398,11 @@ type dialer interface {
func (srv *Server) run(dialstate dialer) { func (srv *Server) run(dialstate dialer) {
defer srv.loopWG.Done() defer srv.loopWG.Done()
var ( var (
peers = make(map[discover.NodeID]*Peer) peers = make(map[discover.NodeID]*Peer)
trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes))
tasks []task
pendingTasks []task
taskdone = make(chan task, maxActiveDialTasks) taskdone = make(chan task, maxActiveDialTasks)
runningTasks []task
queuedTasks []task // tasks that can't run yet
) )
// Put trusted nodes into a map to speed up checks. // Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be // Trusted peers are loaded on startup and cannot be
@ -412,39 +411,39 @@ func (srv *Server) run(dialstate dialer) {
trusted[n.ID] = true trusted[n.ID] = true
} }
// Some task list helpers. // removes t from runningTasks
delTask := func(t task) { delTask := func(t task) {
for i := range tasks { for i := range runningTasks {
if tasks[i] == t { if runningTasks[i] == t {
tasks = append(tasks[:i], tasks[i+1:]...) runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
break break
} }
} }
} }
scheduleTasks := func(new []task) { // starts until max number of active tasks is satisfied
pt := append(pendingTasks, new...) startTasks := func(ts []task) (rest []task) {
start := maxActiveDialTasks - len(tasks) i := 0
if len(pt) < start { for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
start = len(pt) t := ts[i]
glog.V(logger.Detail).Infoln("new task:", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
} }
if start > 0 { return ts[i:]
tasks = append(tasks, pt[:start]...) }
for _, t := range pt[:start] { scheduleTasks := func() {
t := t // Start from queue first.
glog.V(logger.Detail).Infoln("new task:", t) queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
go func() { t.Do(srv); taskdone <- t }() // Query dialer for new tasks and start as many as possible now.
} if len(runningTasks) < maxActiveDialTasks {
copy(pt, pt[start:]) nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
pendingTasks = pt[:len(pt)-start] queuedTasks = append(queuedTasks, startTasks(nt)...)
} }
} }
running: running:
for { for {
// Query the dialer for new tasks and launch them. scheduleTasks()
now := time.Now()
nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now)
scheduleTasks(nt)
select { select {
case <-srv.quit: case <-srv.quit:
@ -466,7 +465,7 @@ running:
// can update its state and remove it from the active // can update its state and remove it from the active
// tasks list. // tasks list.
glog.V(logger.Detail).Infoln("<-taskdone:", t) glog.V(logger.Detail).Infoln("<-taskdone:", t)
dialstate.taskDone(t, now) dialstate.taskDone(t, time.Now())
delTask(t) delTask(t)
case c := <-srv.posthandshake: case c := <-srv.posthandshake:
// A connection has passed the encryption handshake so // A connection has passed the encryption handshake so
@ -513,7 +512,7 @@ running:
// Wait for peers to shut down. Pending connections and tasks are // Wait for peers to shut down. Pending connections and tasks are
// not handled here and will terminate soon-ish because srv.quit // not handled here and will terminate soon-ish because srv.quit
// is closed. // is closed.
glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks)) glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(runningTasks))
for len(peers) > 0 { for len(peers) > 0 {
p := <-srv.delpeer p := <-srv.delpeer
glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p) glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p)

View File

@ -235,6 +235,56 @@ func TestServerTaskScheduling(t *testing.T) {
} }
} }
// This test checks that Server doesn't drop tasks,
// even if newTasks returns more than the maximum number of tasks.
func TestServerManyTasks(t *testing.T) {
alltasks := make([]task, 300)
for i := range alltasks {
alltasks[i] = &testTask{index: i}
}
var (
srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true}
done = make(chan *testTask)
start, end = 0, 0
)
defer srv.Stop()
srv.loopWG.Add(1)
go srv.run(taskgen{
newFunc: func(running int, peers map[discover.NodeID]*Peer) []task {
start, end = end, end+maxActiveDialTasks+10
if end > len(alltasks) {
end = len(alltasks)
}
return alltasks[start:end]
},
doneFunc: func(tt task) {
done <- tt.(*testTask)
},
})
doneset := make(map[int]bool)
timeout := time.After(2 * time.Second)
for len(doneset) < len(alltasks) {
select {
case tt := <-done:
if doneset[tt.index] {
t.Errorf("task %d got done more than once", tt.index)
} else {
doneset[tt.index] = true
}
case <-timeout:
t.Errorf("%d of %d tasks got done within 2s", len(doneset), len(alltasks))
for i := 0; i < len(alltasks); i++ {
if !doneset[i] {
t.Logf("task %d not done", i)
}
}
return
}
}
}
type taskgen struct { type taskgen struct {
newFunc func(running int, peers map[discover.NodeID]*Peer) []task newFunc func(running int, peers map[discover.NodeID]*Peer) []task
doneFunc func(task) doneFunc func(task)