workers: Address review
This commit is contained in:
parent
290b7ebd26
commit
dce6d8e4ae
23
manager.go
23
manager.go
@ -3,6 +3,7 @@ package sectorstorage
|
|||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
@ -24,6 +25,8 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("advmgr")
|
var log = logging.Logger("advmgr")
|
||||||
|
|
||||||
|
var ErrNoWorkers = errors.New("no suitable workers found")
|
||||||
|
|
||||||
type URLs []string
|
type URLs []string
|
||||||
|
|
||||||
type Worker interface {
|
type Worker interface {
|
||||||
@ -71,9 +74,7 @@ type Manager struct {
|
|||||||
schedQueue *list.List // List[*workerRequest]
|
schedQueue *list.List // List[*workerRequest]
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) {
|
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) {
|
||||||
ctx := context.TODO()
|
|
||||||
|
|
||||||
lstor, err := stores.NewLocal(ctx, ls, si, urls)
|
lstor, err := stores.NewLocal(ctx, ls, si, urls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -84,7 +85,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
|
|||||||
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
token, err := ca.AuthNew(context.TODO(), []api.Permission{"admin"})
|
token, err := ca.AuthNew(ctx, []api.Permission{"admin"})
|
||||||
headers := http.Header{}
|
headers := http.Header{}
|
||||||
headers.Add("Authorization", "Bearer "+string(token))
|
headers.Add("Authorization", "Bearer "+string(token))
|
||||||
stor := stores.NewRemote(lstor, si, headers)
|
stor := stores.NewRemote(lstor, si, headers)
|
||||||
@ -272,7 +273,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
|||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best)
|
||||||
|
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return abi.PieceInfo{}, xerrors.New("no worker found")
|
return abi.PieceInfo{}, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers)
|
||||||
@ -296,7 +297,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
|
|||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best)
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return nil, xerrors.New("no suitable workers found")
|
return nil, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers)
|
||||||
@ -320,7 +321,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
|
|||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best)
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return storage.SectorCids{}, xerrors.New("no suitable workers found")
|
return storage.SectorCids{}, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers)
|
||||||
@ -342,7 +343,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
|
|||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best)
|
||||||
if len(candidateWorkers) == 0 {
|
if len(candidateWorkers) == 0 {
|
||||||
return nil, xerrors.New("no suitable workers found") // TODO: wait?
|
return nil, ErrNoWorkers
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Try very hard to execute on worker with access to the sectors
|
// TODO: Try very hard to execute on worker with access to the sectors
|
||||||
@ -373,6 +374,9 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
|
|||||||
candidateWorkers = append(candidateWorkers, id)
|
candidateWorkers = append(candidateWorkers, id)
|
||||||
}
|
}
|
||||||
m.workersLk.Unlock()
|
m.workersLk.Unlock()
|
||||||
|
if len(candidateWorkers) == 0 {
|
||||||
|
return nil, ErrNoWorkers
|
||||||
|
}
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers)
|
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -390,6 +394,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best)
|
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best)
|
||||||
|
if len(candidateWorkers) == 0 {
|
||||||
|
return ErrNoWorkers
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Remove sector from sealing stores
|
// TODO: Remove sector from sealing stores
|
||||||
// TODO: Move the sector to long-term storage
|
// TODO: Move the sector to long-term storage
|
||||||
|
Loading…
Reference in New Issue
Block a user