workrs: refactor sector store for more composability

This commit is contained in:
Łukasz Magiera 2020-03-11 08:22:21 +01:00
parent 7e2e30f94b
commit 71afcb0333
10 changed files with 223 additions and 167 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
)
type workerStorage struct {
@ -38,6 +39,11 @@ func (w *workerStorage) AcquireSector(ctx context.Context, id abi.SectorNumber,
best := si[0].URLs // TODO: not necessarily true
sname := sectorutil.SectorName(abi.SectorID{
Miner: w.mid,
Number: id,
})
w.fetch(best, )
}

View File

@ -23,7 +23,10 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
"github.com/filecoin-project/go-fil-markets/storagemarket"
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-actors/actors/runtime"
storage2 "github.com/filecoin-project/specs-storage/storage"
@ -256,7 +259,7 @@ func Online() Option {
// Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
Override(new(advmgr.LocalStorage), From(new(repo.LockedRepo))),
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
Override(new(advmgr.SectorIDCounter), modules.SectorIDCounter),
Override(new(*advmgr.Manager), advmgr.New),

View File

@ -13,13 +13,14 @@ import (
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
type localWorker struct {
scfg *sectorbuilder.Config
storage *storage
storage *stores.Local
}
type localWorkerPathProvider struct {
@ -32,7 +33,10 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, id abi.Sect
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("get miner ID: %w", err)
}
return l.w.storage.acquireSector(abi.ActorID(mid), id, existing, allocate, sealing)
return l.w.storage.AcquireSector(abi.SectorID{
Miner: abi.ActorID(mid),
Number: id,
}, existing, allocate, sealing)
}
func (l *localWorker) sb() (sectorbuilder.Basic, error) {
@ -103,7 +107,7 @@ func (l *localWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{},
}
func (l *localWorker) Paths(context.Context) ([]api.StoragePath, error) {
return l.storage.local(), nil
return l.storage.Local(), nil
}
var _ Worker = &localWorker{}

View File

@ -11,8 +11,10 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
@ -39,16 +41,16 @@ type Manager struct {
scfg *sectorbuilder.Config
sc SectorIDCounter
storage *storage
ls stores.LocalStorage
storage *stores.Local
remoteHnd *stores.FetchHandler
storage2.Prover
}
func New(ls LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manager, error) {
stor := &storage{
localStorage: ls,
}
if err := stor.open(); err != nil {
func New(ls stores.LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manager, error) {
stor, err := stores.NewLocal(ls)
if err != nil {
return nil, err
}
@ -69,7 +71,9 @@ func New(ls LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manag
scfg: cfg,
sc: sc,
ls: ls,
storage: stor,
remoteHnd: &stores.FetchHandler{Store: stor},
Prover: prover,
}
@ -83,11 +87,11 @@ func (m *Manager) AddLocalStorage(path string) error {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := m.storage.openPath(path); err != nil {
if err := m.storage.OpenPath(path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
if err := m.storage.localStorage.SetStorage(func(sc *config.StorageConfig) {
if err := m.ls.SetStorage(func(sc *config.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
@ -96,7 +100,7 @@ func (m *Manager) AddLocalStorage(path string) error {
}
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.storage.ServeHTTP(w, r)
m.remoteHnd.ServeHTTP(w, r)
}
func (m *Manager) SectorSize() abi.SectorSize {
@ -163,9 +167,9 @@ func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPie
var best []config.StorageMeta
var err error
if len(existingPieces) == 0 { // new
best, err = m.storage.findBestAllocStorage(sectorbuilder.FTUnsealed, true)
best, err = m.storage.FindBestAllocStorage(sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.storage.findSector(m.minerID(), sn, sectorbuilder.FTUnsealed)
best, err = m.storage.FindSector(m.minerID(), sn, sectorbuilder.FTUnsealed)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
@ -185,7 +189,7 @@ func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPie
func (m *Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
// TODO: also consider where the unsealed data sits
best, err := m.storage.findBestAllocStorage(sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
best, err := m.storage.FindBestAllocStorage(sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -200,7 +204,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber
func (m *Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
// TODO: allow workers to fetch the sectors
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
best, err := m.storage.FindSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -213,7 +217,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber
}
func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output storage2.Commit1Out, err error) {
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
best, err := m.storage.FindSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -243,7 +247,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, p
}
func (m *Manager) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
best, err := m.storage.FindSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}

View File

@ -4,13 +4,15 @@ import (
"context"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors"
)
type readonlyProvider struct {
miner abi.ActorID
stor *storage
stor *stores.Local
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
@ -18,5 +20,8 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorNumbe
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
}
return l.stor.acquireSector(l.miner, id, existing, allocate, sealing)
return l.stor.AcquireSector(abi.SectorID{
Miner: l.miner,
Number: id,
}, existing, allocate, sealing)
}

View File

@ -1,10 +0,0 @@
package advmgr
import (
"github.com/filecoin-project/lotus/node/config"
)
type LocalStorage interface {
GetStorage() (config.StorageConfig, error)
SetStorage(func(*config.StorageConfig)) error
}

View File

@ -0,0 +1,31 @@
package sectorutil
import (
"fmt"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
func ParseSectorID(baseName string) (abi.SectorID, error) {
var n abi.SectorNumber
var mid abi.ActorID
read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n)
if err != nil {
return abi.SectorID{}, xerrors.Errorf(": %w", err)
}
if read != 2 {
return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read)
}
return abi.SectorID{
Miner: mid,
Number: n,
}, nil
}
func SectorName(sid abi.SectorID) string {
return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number)
}

View File

@ -0,0 +1,107 @@
package stores
import (
"io"
"net/http"
"os"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
)
var log = logging.Logger("stores")
type FetchHandler struct {
Store
}
func (handler *FetchHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /storage/
mux := mux.NewRouter()
mux.HandleFunc("/{type}/{id}", handler.remoteGetSector).Methods("GET")
log.Infof("SERVEGETREMOTE %s", r.URL)
mux.ServeHTTP(w, r)
}
func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := sectorutil.ParseSectorID(vars["id"])
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
ft, err := ftFromString(vars["type"])
if err != nil {
return
}
paths, done, err := handler.Store.AcquireSector(id, ft, 0, false)
if err != nil {
return
}
defer done()
var path string
switch ft {
case sectorbuilder.FTUnsealed:
path = paths.Unsealed
case sectorbuilder.FTSealed:
path = paths.Sealed
case sectorbuilder.FTCache:
path = paths.Cache
}
if path == "" {
log.Error("acquired path was empty")
w.WriteHeader(500)
return
}
stat, err := os.Stat(path)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(path)
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
w.WriteHeader(200)
if _, err := io.Copy(w, rd); err != nil { // TODO: default 32k buf may be too small
log.Error(err)
return
}
}
func ftFromString(t string) (sectorbuilder.SectorFileType, error) {
switch t {
case sectorbuilder.FTUnsealed.String():
return sectorbuilder.FTUnsealed, nil
case sectorbuilder.FTSealed.String():
return sectorbuilder.FTSealed, nil
case sectorbuilder.FTCache.String():
return sectorbuilder.FTCache, nil
default:
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}
}

View File

@ -0,0 +1,11 @@
package stores
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/go-sectorbuilder"
)
type Store interface {
AcquireSector(s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error)
}

View File

@ -1,32 +1,31 @@
package advmgr
package stores
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"sync"
"github.com/gorilla/mux"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
)
const metaFile = "sectorstore.json"
type LocalStorage interface {
GetStorage() (config.StorageConfig, error)
SetStorage(func(*config.StorageConfig)) error
}
const MetaFile = "sectorstore.json"
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
type storage struct {
type Local struct {
localLk sync.RWMutex
localStorage LocalStorage
@ -42,8 +41,15 @@ type path struct {
sectors map[abi.SectorID]sectorbuilder.SectorFileType
}
func (st *storage) openPath(p string) error {
mb, err := ioutil.ReadFile(filepath.Join(p, metaFile))
func NewLocal(ls LocalStorage) (*Local, error) {
l := &Local{
localStorage: ls,
}
return l, l.open()
}
func (st *Local) OpenPath(p string) error {
mb, err := ioutil.ReadFile(filepath.Join(p, MetaFile))
if err != nil {
return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
}
@ -75,7 +81,7 @@ func (st *storage) openPath(p string) error {
}
for _, ent := range ents {
sid, err := parseSectorID(ent.Name())
sid, err := sectorutil.ParseSectorID(ent.Name())
if err != nil {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
}
@ -89,7 +95,7 @@ func (st *storage) openPath(p string) error {
return nil
}
func (st *storage) open() error {
func (st *Local) open() error {
st.localLk.Lock()
defer st.localLk.Unlock()
@ -99,7 +105,7 @@ func (st *storage) open() error {
}
for _, path := range cfg.StoragePaths {
err := st.openPath(path.Path)
err := st.OpenPath(path.Path)
if err != nil {
return xerrors.Errorf("opening path %s: %w", path.Path, err)
}
@ -108,7 +114,7 @@ func (st *storage) open() error {
return nil
}
func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
func (st *Local) AcquireSector(sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
@ -124,10 +130,7 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
for _, p := range st.paths {
p.lk.Lock()
s, ok := p.sectors[abi.SectorID{
Miner: mid,
Number: id,
}]
s, ok := p.sectors[sid]
p.lk.Unlock()
if !ok {
continue
@ -136,10 +139,10 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
continue
}
if p.local == "" {
continue // TODO: fetch
continue
}
spath := filepath.Join(p.local, fileType.String(), fmt.Sprintf("s-t0%d-%d", mid, id))
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
switch fileType {
case sectorbuilder.FTUnsealed:
@ -170,16 +173,13 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
}
p.lk.Lock()
p.sectors[abi.SectorID{
Miner: mid,
Number: id,
}] |= fileType
p.sectors[sid] |= fileType
p.lk.Unlock()
// TODO: Check free space
// TODO: Calc weights
best = filepath.Join(p.local, fileType.String(), fmt.Sprintf("s-t0%d-%d", mid, id))
best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
break // todo: the first path won't always be the best
}
@ -203,7 +203,7 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
return out, st.localLk.RUnlock, nil
}
func (st *storage) findBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) {
func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) {
var out []config.StorageMeta
for _, p := range st.paths {
@ -227,7 +227,7 @@ func (st *storage) findBestAllocStorage(allocate sectorbuilder.SectorFileType, s
return out, nil
}
func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) {
func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) {
var out []config.StorageMeta
for _, p := range st.paths {
p.lk.Lock()
@ -248,7 +248,7 @@ func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbu
return out, nil
}
func (st *storage) local() []api.StoragePath {
func (st *Local) Local() []api.StoragePath {
var out []api.StoragePath
for _, p := range st.paths {
if p.local == "" {
@ -266,108 +266,3 @@ func (st *storage) local() []api.StoragePath {
return out
}
func (st *storage) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /storage/
mux := mux.NewRouter()
mux.HandleFunc("/{type}/{id}", st.remoteGetSector).Methods("GET")
log.Infof("SERVEGETREMOTE %s", r.URL)
mux.ServeHTTP(w, r)
}
func (st *storage) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := parseSectorID(vars["id"])
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
ft, err := ftFromString(vars["type"])
if err != nil {
return
}
paths, done, err := st.acquireSector(id.Miner, id.Number, ft, 0, false)
if err != nil {
return
}
defer done()
var path string
switch ft {
case sectorbuilder.FTUnsealed:
path = paths.Unsealed
case sectorbuilder.FTSealed:
path = paths.Sealed
case sectorbuilder.FTCache:
path = paths.Cache
}
if path == "" {
log.Error("acquired path was empty")
w.WriteHeader(500)
return
}
stat, err := os.Stat(path)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(path)
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
w.WriteHeader(200)
if _, err := io.Copy(w, rd); err != nil { // TODO: default 32k buf may be too small
log.Error(err)
return
}
}
func ftFromString(t string) (sectorbuilder.SectorFileType, error) {
switch t {
case sectorbuilder.FTUnsealed.String():
return sectorbuilder.FTUnsealed, nil
case sectorbuilder.FTSealed.String():
return sectorbuilder.FTSealed, nil
case sectorbuilder.FTCache.String():
return sectorbuilder.FTCache, nil
default:
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}
}
func parseSectorID(baseName string) (abi.SectorID, error) {
var n abi.SectorNumber
var mid abi.ActorID
read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n)
if err != nil {
return abi.SectorID{}, xerrors.Errorf(": %w", err)
}
if read != 2 {
return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read)
}
return abi.SectorID{
Miner: mid,
Number: n,
}, nil
}