lotus/storage/sealmgr/advmgr/manager.go

255 lines
7.5 KiB
Go
Raw Normal View History

2020-03-03 22:19:22 +00:00
package advmgr
import (
"context"
"io"
2020-03-11 05:49:17 +00:00
"net/http"
2020-03-03 22:19:22 +00:00
"github.com/ipfs/go-cid"
2020-03-11 01:57:52 +00:00
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
2020-03-03 22:19:22 +00:00
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
2020-03-11 05:49:17 +00:00
2020-03-03 22:19:22 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
2020-03-06 05:30:47 +00:00
storage2 "github.com/filecoin-project/specs-storage/storage"
2020-03-03 22:19:22 +00:00
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
2020-03-11 01:57:52 +00:00
var log = logging.Logger("advmgr")
2020-03-16 17:50:07 +00:00
type URLs []string
2020-03-03 22:19:22 +00:00
type Worker interface {
sectorbuilder.Sealer
2020-03-03 22:19:22 +00:00
2020-03-11 01:57:52 +00:00
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error)
2020-03-13 11:59:19 +00:00
Paths(context.Context) ([]stores.StoragePath, error)
2020-03-03 22:19:22 +00:00
}
type Manager struct {
workers []Worker
2020-03-05 19:21:06 +00:00
scfg *sectorbuilder.Config
2020-03-13 11:59:19 +00:00
ls stores.LocalStorage
storage *stores.Local
remoteHnd *stores.FetchHandler
2020-03-03 22:19:22 +00:00
2020-03-06 05:30:47 +00:00
storage2.Prover
}
2020-03-18 01:08:11 +00:00
func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, urls URLs, sindex stores.SectorIndex) (*Manager, error) {
stor, err := stores.NewLocal(ls)
if err != nil {
return nil, err
}
2020-03-16 17:50:07 +00:00
if err := stores.DeclareLocalStorage(context.TODO(), si, stor, urls, 10); err != nil {
log.Errorf("Declaring local storage failed: %+v")
}
2020-03-17 20:19:52 +00:00
prover, err := sectorbuilder.New(&readonlyProvider{stor: stor}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
m := &Manager{
2020-03-05 19:21:06 +00:00
workers: []Worker{
2020-03-18 01:08:11 +00:00
&LocalWorker{scfg: cfg, localStore: stor, storage: stor, sindex: sindex},
2020-03-05 02:18:22 +00:00
},
scfg: cfg,
2020-03-13 11:59:19 +00:00
ls: ls,
storage: stor,
remoteHnd: &stores.FetchHandler{Store: stor},
Prover: prover,
}
return m, nil
2020-03-03 22:19:22 +00:00
}
func (m *Manager) AddLocalStorage(path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := m.storage.OpenPath(path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
if err := m.ls.SetStorage(func(sc *config.StorageConfig) {
2020-03-09 19:22:30 +00:00
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
return nil
}
2020-03-11 05:49:17 +00:00
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.remoteHnd.ServeHTTP(w, r)
2020-03-11 05:49:17 +00:00
}
func (m *Manager) SectorSize() abi.SectorSize {
sz, _ := m.scfg.SealProofType.SectorSize()
return sz
2020-03-03 22:19:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
2020-03-03 22:19:22 +00:00
panic("implement me")
}
2020-03-13 11:59:19 +00:00
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.StorageMeta) ([]Worker, map[int]stores.StorageMeta) {
2020-03-05 18:18:33 +00:00
var workers []Worker
2020-03-13 11:59:19 +00:00
paths := map[int]stores.StorageMeta{}
for i, worker := range m.workers {
2020-03-11 01:57:52 +00:00
tt, err := worker.TaskTypes(context.TODO())
if err != nil {
log.Errorf("error getting supported worker task types: %+v", err)
continue
}
if _, ok := tt[task]; !ok {
continue
}
2020-03-11 05:49:17 +00:00
phs, err := worker.Paths(context.TODO())
if err != nil {
log.Errorf("error getting worker paths: %+v", err)
continue
}
// check if the worker has access to the path we selected
2020-03-13 11:59:19 +00:00
var st *stores.StorageMeta
2020-03-11 05:49:17 +00:00
for _, p := range phs {
for _, meta := range inPaths {
if p.ID == meta.ID {
if st != nil && st.Weight > p.Weight {
continue
}
2020-03-11 05:49:17 +00:00
p := meta // copy
st = &p
}
}
}
if st == nil {
continue
}
2020-03-05 18:18:33 +00:00
paths[i] = *st
workers = append(workers, worker)
}
return workers, paths
}
2020-03-17 20:19:52 +00:00
func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
log.Warnf("stub NewSector")
return nil
}
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
2020-03-05 18:18:33 +00:00
// TODO: consider multiple paths vs workers when initially allocating
2020-03-13 11:59:19 +00:00
var best []stores.StorageMeta
2020-03-05 18:18:33 +00:00
var err error
if len(existingPieces) == 0 { // new
best, err = m.storage.FindBestAllocStorage(sectorbuilder.FTUnsealed, true)
2020-03-05 18:18:33 +00:00
} else { // append to existing
2020-03-17 20:19:52 +00:00
best, err = m.storage.FindSector(sector, sectorbuilder.FTUnsealed)
2020-03-05 18:18:33 +00:00
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
}
2020-03-05 18:18:33 +00:00
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTAddPiece, best)
if len(candidateWorkers) == 0 {
return abi.PieceInfo{}, xerrors.New("no worker found")
}
2020-03-05 18:18:33 +00:00
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
2020-03-17 20:19:52 +00:00
return candidateWorkers[0].AddPiece(ctx, sector, existingPieces, sz, r)
2020-03-03 22:19:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
2020-03-05 18:18:33 +00:00
// TODO: also consider where the unsealed data sits
best, err := m.storage.FindBestAllocStorage(sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
2020-03-05 18:18:33 +00:00
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
2020-03-17 20:19:52 +00:00
return candidateWorkers[0].SealPreCommit1(ctx, sector, ticket, pieces)
2020-03-03 22:19:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
2020-03-05 18:18:33 +00:00
// TODO: allow workers to fetch the sectors
2020-03-17 20:19:52 +00:00
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
2020-03-05 18:18:33 +00:00
if err != nil {
2020-03-17 20:19:52 +00:00
return storage2.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
2020-03-05 18:18:33 +00:00
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
2020-03-17 20:19:52 +00:00
return candidateWorkers[0].SealPreCommit2(ctx, sector, phase1Out)
2020-03-03 22:19:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
2020-03-05 18:18:33 +00:00
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
2020-03-17 20:19:52 +00:00
return candidateWorkers[0].SealCommit1(ctx, sector, ticket, seed, pieces, cids)
2020-03-03 22:19:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
2020-03-05 18:18:33 +00:00
for _, worker := range m.workers {
2020-03-11 01:57:52 +00:00
tt, err := worker.TaskTypes(context.TODO())
if err != nil {
log.Errorf("error getting supported worker task types: %+v", err)
continue
}
if _, ok := tt[sealmgr.TTCommit2]; !ok {
2020-03-05 18:18:33 +00:00
continue
}
2020-03-17 20:19:52 +00:00
return worker.SealCommit2(ctx, sector, phase1Out)
2020-03-05 18:18:33 +00:00
}
return nil, xerrors.New("no worker found")
2020-03-03 22:19:22 +00:00
}
2020-03-17 20:19:52 +00:00
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
2020-03-05 18:18:33 +00:00
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) // find last worker with the sector
// TODO: Move the sector to long-term storage
2020-03-17 20:19:52 +00:00
return candidateWorkers[0].FinalizeSector(ctx, sector)
}
2020-03-03 22:19:22 +00:00
var _ sealmgr.Manager = &Manager{}