lotus/storage/sealmgr/advmgr/worker_local.go

199 lines
5.3 KiB
Go
Raw Normal View History

2020-03-05 02:18:22 +00:00
package advmgr
import (
"context"
"io"
2020-03-20 22:30:17 +00:00
"os"
2020-03-05 02:18:22 +00:00
2020-03-20 22:30:17 +00:00
"github.com/elastic/go-sysinfo"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-sectorbuilder"
2020-03-05 02:18:22 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
2020-03-11 05:49:17 +00:00
storage2 "github.com/filecoin-project/specs-storage/storage"
2020-03-20 22:30:17 +00:00
"github.com/filecoin-project/lotus/api"
2020-03-05 02:18:22 +00:00
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
2020-03-05 02:18:22 +00:00
)
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
type WorkerConfig struct {
SealProof abi.RegisteredProof
TaskTypes []sealmgr.TaskType
}
2020-03-11 21:23:16 +00:00
type LocalWorker struct {
2020-03-13 11:59:19 +00:00
scfg *sectorbuilder.Config
storage stores.Store
2020-03-13 01:37:38 +00:00
localStore *stores.Local
sindex stores.SectorIndex
acceptTasks map[sealmgr.TaskType]struct{}
2020-03-13 01:37:38 +00:00
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker {
ppt, err := wcfg.SealProof.RegisteredPoStProof()
2020-03-13 01:37:38 +00:00
if err != nil {
panic(err)
}
acceptTasks := map[sealmgr.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
}
2020-03-13 01:37:38 +00:00
return &LocalWorker{
2020-03-13 11:59:19 +00:00
scfg: &sectorbuilder.Config{
SealProofType: wcfg.SealProof,
2020-03-13 01:37:38 +00:00
PoStProofType: ppt,
},
storage: store,
localStore: local,
sindex: sindex,
acceptTasks: acceptTasks,
2020-03-13 01:37:38 +00:00
}
2020-03-05 02:18:22 +00:00
}
type localWorkerPathProvider struct {
2020-03-11 21:23:16 +00:00
w *LocalWorker
2020-03-05 02:18:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
}
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
return paths, func() {
done()
for _, fileType := range pathTypes {
if fileType&allocate == 0 {
continue
}
sid := sectorutil.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil {
log.Errorf("declare sector error: %+v", err)
}
}
}, nil
2020-03-05 02:18:22 +00:00
}
2020-03-11 21:23:16 +00:00
func (l *LocalWorker) sb() (sectorbuilder.Basic, error) {
2020-03-05 19:21:06 +00:00
return sectorbuilder.New(&localWorkerPathProvider{w: l}, l.scfg)
2020-03-05 02:18:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
sb, err := l.sb()
if err != nil {
return err
}
return sb.NewSector(ctx, sector)
}
func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
2020-03-05 02:18:22 +00:00
sb, err := l.sb()
if err != nil {
return abi.PieceInfo{}, err
}
2020-03-17 20:19:52 +00:00
return sb.AddPiece(ctx, sector, epcs, sz, r)
2020-03-05 02:18:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
2020-03-05 18:18:33 +00:00
sb, err := l.sb()
if err != nil {
return nil, err
}
2020-03-17 20:19:52 +00:00
return sb.SealPreCommit1(ctx, sector, ticket, pieces)
2020-03-05 02:18:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
2020-03-05 18:18:33 +00:00
sb, err := l.sb()
if err != nil {
2020-03-17 20:19:52 +00:00
return storage2.SectorCids{}, err
2020-03-05 18:18:33 +00:00
}
2020-03-17 20:19:52 +00:00
return sb.SealPreCommit2(ctx, sector, phase1Out)
2020-03-05 02:18:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
2020-03-05 18:18:33 +00:00
sb, err := l.sb()
if err != nil {
return nil, err
}
2020-03-17 20:19:52 +00:00
return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
2020-03-05 02:18:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
2020-03-05 18:18:33 +00:00
sb, err := l.sb()
if err != nil {
return nil, err
}
2020-03-17 20:19:52 +00:00
return sb.SealCommit2(ctx, sector, phase1Out)
2020-03-05 02:18:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
2020-03-05 18:18:33 +00:00
sb, err := l.sb()
if err != nil {
return err
}
2020-03-17 20:19:52 +00:00
return sb.FinalizeSector(ctx, sector)
2020-03-05 02:18:22 +00:00
}
2020-03-11 21:23:16 +00:00
func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) {
return l.acceptTasks, nil
2020-03-05 02:18:22 +00:00
}
2020-03-19 15:10:19 +00:00
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
2020-03-05 02:18:22 +00:00
}
2020-03-20 22:30:17 +00:00
func (l *LocalWorker) Info(context.Context) (api.WorkerInfo, error) {
hostname, err := os.Hostname() // TODO: allow overriding from config
if err != nil {
panic(err)
}
gpus, err := ffi.GetGPUDevices()
if err != nil {
log.Errorf("getting gpu devices failed: %+v", err)
}
h, err := sysinfo.Host()
if err != nil {
return api.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err)
}
mem, err := h.Memory()
if err != nil {
return api.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
}
return api.WorkerInfo{
Hostname: hostname,
Resources: api.WorkerResources{
MemPhysical: mem.Total,
MemSwap: mem.VirtualTotal,
MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process
GPUs: gpus,
},
}, nil
}
2020-03-11 21:23:16 +00:00
var _ Worker = &LocalWorker{}