Merge pull request #4024 from zgfzgf/sector-sched

modify for unsafe
This commit is contained in:
Łukasz Magiera 2020-11-24 16:19:17 +01:00 committed by GitHub
commit fe92a6e2d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -348,24 +348,25 @@ func (sh *scheduler) trySched() {
sh.workersLk.RLock() sh.workersLk.RLock()
defer sh.workersLk.RUnlock() defer sh.workersLk.RUnlock()
windows := make([]schedWindow, len(sh.openWindows)) windowsLen := len(sh.openWindows)
acceptableWindows := make([][]int, sh.schedQueue.Len()) queuneLen := sh.schedQueue.Len()
log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) log.Debugf("SCHED %d queued; %d open windows", queuneLen, windowsLen)
if len(sh.openWindows) == 0 { if windowsLen == 0 || queuneLen == 0 {
// nothing to schedule on // nothing to schedule on
return return
} }
windows := make([]schedWindow, windowsLen)
acceptableWindows := make([][]int, queuneLen)
// Step 1 // Step 1
concurrency := len(sh.openWindows) throttle := make(chan struct{}, windowsLen)
throttle := make(chan struct{}, concurrency)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(sh.schedQueue.Len()) wg.Add(queuneLen)
for i := 0; i < queuneLen; i++ {
for i := 0; i < sh.schedQueue.Len(); i++ {
throttle <- struct{}{} throttle <- struct{}{}
go func(sqi int) { go func(sqi int) {
@ -450,8 +451,9 @@ func (sh *scheduler) trySched() {
// Step 2 // Step 2
scheduled := 0 scheduled := 0
rmQueue := make([]int, 0, queuneLen)
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { for sqi := 0; sqi < queuneLen; sqi++ {
task := (*sh.schedQueue)[sqi] task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType] needRes := ResourceTable[task.taskType][task.sector.ProofType]
@ -486,11 +488,16 @@ func (sh *scheduler) trySched() {
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
sh.schedQueue.Remove(sqi) rmQueue = append(rmQueue, sqi)
sqi--
scheduled++ scheduled++
} }
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.schedQueue.Remove(rmQueue[i])
}
}
// Step 3 // Step 3
if scheduled == 0 { if scheduled == 0 {
@ -515,7 +522,7 @@ func (sh *scheduler) trySched() {
} }
// Rewrite sh.openWindows array, removing scheduled windows // Rewrite sh.openWindows array, removing scheduled windows
newOpenWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)-len(scheduledWindows)) newOpenWindows := make([]*schedWindowRequest, 0, windowsLen-len(scheduledWindows))
for wnd, window := range sh.openWindows { for wnd, window := range sh.openWindows {
if _, scheduled := scheduledWindows[wnd]; scheduled { if _, scheduled := scheduledWindows[wnd]; scheduled {
// keep unscheduled windows open // keep unscheduled windows open