workers: get to executing tasks remotely!

This commit is contained in:
Łukasz Magiera 2020-03-19 18:11:45 +01:00
parent d87b7c264d
commit 80cca91584
8 changed files with 65 additions and 26 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
)
@ -222,7 +223,10 @@ var runCmd = &cli.Command{
// Create / expose the worker
workerApi := &worker{
LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore, nodeApi),
LocalWorker: advmgr.NewLocalWorker(advmgr.WorkerConfig{
SealProof: spt,
TaskTypes: []sealmgr.TaskType{sealmgr.TTPreCommit1, sealmgr.TTPreCommit2, sealmgr.TTCommit2},
}, remote, localStore, nodeApi),
}
mux := mux.NewRouter()

View File

@ -51,6 +51,7 @@ import (
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
@ -263,6 +264,8 @@ func Online() Option {
// Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(api.Common), From(new(common.CommonAPI))),
Override(new(*stores.Index), stores.NewIndex()),
Override(new(stores.SectorIndex), From(new(*stores.Index))),
Override(new(dtypes.MinerID), modules.MinerID),

View File

@ -138,8 +138,17 @@ func as(in interface{}, as interface{}) interface{} {
return reflect.MakeFunc(ctype, func(args []reflect.Value) (results []reflect.Value) {
outs := reflect.ValueOf(in).Call(args)
out := reflect.New(outType.Elem())
out.Elem().Set(outs[0])
if outs[0].Type().AssignableTo(outType.Elem()) {
// Out: Iface = In: *Struct; Out: Iface = In: OtherIface
out.Elem().Set(outs[0])
} else {
// Out: Iface = &(In: Struct)
t := reflect.New(outs[0].Type())
t.Elem().Set(outs[0])
out.Elem().Set(t)
}
outs[0] = out.Elem()
return outs

View File

@ -69,7 +69,10 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
m := &Manager{
workers: []Worker{
//&LocalWorker{scfg: cfg, localStore: lstor, storage: lstor, sindex: sindex},
NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType,
TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece},
}, stor, lstor, si),
},
scfg: cfg,
@ -212,6 +215,9 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best)
if len(candidateWorkers) == 0 {
return nil, xerrors.New("no suitable workers found") // TODO: wait?
}
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
@ -227,6 +233,9 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
if len(candidateWorkers) == 0 {
return storage2.SectorCids{}, xerrors.New("no suitable workers found") // TODO: wait?
}
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
@ -240,6 +249,9 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
if len(candidateWorkers) == 0 {
return nil, xerrors.New("no suitable workers found") // TODO: wait?
}
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly

View File

@ -4,7 +4,6 @@ import (
"context"
"io"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
@ -16,26 +15,41 @@ import (
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
type WorkerConfig struct {
SealProof abi.RegisteredProof
TaskTypes []sealmgr.TaskType
}
type LocalWorker struct {
scfg *sectorbuilder.Config
storage stores.Store
localStore *stores.Local
sindex stores.SectorIndex
acceptTasks map[sealmgr.TaskType]struct{}
}
func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker {
ppt, err := spt.RegisteredPoStProof()
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker {
ppt, err := wcfg.SealProof.RegisteredPoStProof()
if err != nil {
panic(err)
}
acceptTasks := map[sealmgr.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
}
return &LocalWorker{
scfg: &sectorbuilder.Config{
SealProofType: spt,
SealProofType: wcfg.SealProof,
PoStProofType: ppt,
},
storage: store,
localStore: local,
sindex: sindex,
acceptTasks: acceptTasks,
}
}
@ -49,6 +63,8 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
return sectorbuilder.SectorPaths{}, nil, err
}
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() {
done()
@ -134,12 +150,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) {
return map[sealmgr.TaskType]struct{}{
sealmgr.TTAddPiece: {},
sealmgr.TTPreCommit1: {},
sealmgr.TTPreCommit2: {},
sealmgr.TTCommit2: {},
}, nil
return l.acceptTasks, nil
}
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {

View File

@ -20,10 +20,10 @@ type FetchHandler struct {
Store
}
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /storage/
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /remote/
mux := mux.NewRouter()
mux.HandleFunc("/{type}/{id}", handler.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", handler.remoteGetSector).Methods("GET")
log.Infof("SERVEGETREMOTE %s", r.URL)

View File

@ -169,7 +169,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
out[j] = StorageInfo{
ID: id,
URLs: nil,
URLs: urls,
Weight: st.info.Weight,
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,

View File

@ -20,20 +20,20 @@ import (
)
type Remote struct {
local *Local
remote SectorIndex
auth http.Header
local *Local
index SectorIndex
auth http.Header
fetchLk sync.Mutex // TODO: this can be much smarter
// TODO: allow multiple parallel fetches
// (make sure to not fetch the same sector data twice)
}
func NewRemote(local *Local, remote SectorIndex, auth http.Header) *Remote {
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
return &Remote{
local: local,
remote: remote,
auth: auth,
local: local,
index: index,
auth: auth,
}
}
@ -69,7 +69,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
sectorutil.SetPathByType(&paths, fileType, ap)
sectorutil.SetPathByType(&stores, fileType, string(storageID))
if err := r.remote.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
}
}
@ -78,7 +78,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, func(), error) {
si, err := r.remote.StorageFindSector(ctx, s, fileType)
si, err := r.index.StorageFindSector(ctx, s, fileType)
if err != nil {
return "", "", nil, err
}
@ -111,7 +111,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
}
done()
return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr)
return "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
}
func (r *Remote) fetch(url, outname string) error {