lotus/lib/sectorbuilder/remote.go

175 lines
2.9 KiB
Go
Raw Normal View History

2019-11-21 00:52:59 +00:00
package sectorbuilder
import (
"context"
"golang.org/x/xerrors"
)
type WorkerTaskType int
const (
WorkerPreCommit WorkerTaskType = iota
WorkerCommit
)
type WorkerTask struct {
Type WorkerTaskType
TaskID uint64
SectorID uint64
// preCommit
2019-11-21 16:10:04 +00:00
SealTicket SealTicket
Pieces []PublicPieceInfo
2019-11-21 19:51:48 +00:00
// commit
SealSeed SealSeed
Rspco RawSealPreCommitOutput
2019-11-21 00:52:59 +00:00
}
type workerCall struct {
task WorkerTask
2019-11-21 16:10:04 +00:00
ret chan SealRes
2019-11-21 00:52:59 +00:00
}
func (sb *SectorBuilder) AddWorker(ctx context.Context, cfg WorkerCfg) (<-chan WorkerTask, error) {
2019-11-21 00:52:59 +00:00
sb.remoteLk.Lock()
2019-12-04 16:53:32 +00:00
defer sb.remoteLk.Unlock()
2019-11-21 00:52:59 +00:00
taskCh := make(chan WorkerTask)
r := &remote{
sealTasks: taskCh,
busy: 0,
}
2019-11-21 18:38:43 +00:00
sb.remoteCtr++
sb.remotes[sb.remoteCtr] = r
2019-11-21 00:52:59 +00:00
go sb.remoteWorker(ctx, r, cfg)
2019-11-21 00:52:59 +00:00
return taskCh, nil
}
func (sb *SectorBuilder) returnTask(task workerCall) {
var ret chan workerCall
switch task.task.Type {
case WorkerPreCommit:
ret = sb.precommitTasks
case WorkerCommit:
ret = sb.commitTasks
default:
log.Error("unknown task type", task.task.Type)
}
2019-11-21 00:52:59 +00:00
go func() {
select {
case ret <- task:
2019-11-21 00:52:59 +00:00
case <-sb.stopping:
return
}
}()
}
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote, cfg WorkerCfg) {
2019-11-21 00:52:59 +00:00
defer log.Warn("Remote worker disconnected")
2019-11-21 18:38:43 +00:00
defer func() {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
for i, vr := range sb.remotes {
if vr == r {
delete(sb.remotes, i)
return
}
}
}()
precommits := sb.precommitTasks
if cfg.NoPreCommit {
precommits = nil
}
commits := sb.commitTasks
if cfg.NoCommit {
commits = nil
}
2019-11-21 00:52:59 +00:00
for {
select {
case task := <-commits:
sb.doTask(ctx, r, task)
case task := <-precommits:
sb.doTask(ctx, r, task)
2019-11-21 00:52:59 +00:00
case <-ctx.Done():
return
case <-sb.stopping:
return
}
r.lk.Lock()
r.busy = 0
r.lk.Unlock()
2019-11-21 00:52:59 +00:00
}
}
func (sb *SectorBuilder) doTask(ctx context.Context, r *remote, task workerCall) {
resCh := make(chan SealRes)
sb.remoteLk.Lock()
sb.remoteResults[task.task.TaskID] = resCh
sb.remoteLk.Unlock()
// send the task
select {
case r.sealTasks <- task.task:
case <-ctx.Done():
sb.returnTask(task)
return
}
r.lk.Lock()
r.busy = task.task.TaskID
r.lk.Unlock()
// wait for the result
select {
case res := <-resCh:
// send the result back to the caller
select {
case task.ret <- res:
case <-ctx.Done():
return
case <-sb.stopping:
return
}
case <-ctx.Done():
log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err())
return
case <-sb.stopping:
return
}
}
2019-11-21 00:52:59 +00:00
func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error {
sb.remoteLk.Lock()
rres, ok := sb.remoteResults[task]
if ok {
delete(sb.remoteResults, task)
}
sb.remoteLk.Unlock()
if !ok {
return xerrors.Errorf("task %d not found", task)
}
select {
case rres <- res:
return nil
case <-ctx.Done():
return ctx.Err()
}
}