storagemgr: Wire up most of AddPiece logic

This commit is contained in:
Łukasz Magiera 2020-03-05 01:38:07 +01:00
parent 76698fe2db
commit 7db1dd52bd
8 changed files with 213 additions and 32 deletions

View File

@ -136,10 +136,10 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
{
b, err := json.MarshalIndent(&config.StorageMeta{
ID: uuid.New().String(),
Weight: 0, // read-only
CanCommit: false,
CanStore: false,
ID: uuid.New().String(),
Weight: 0, // read-only
CanSeal: false,
CanStore: false,
}, "", " ")
if err != nil {
return nil, nil, xerrors.Errorf("marshaling storage config: %w", err)

View File

@ -363,7 +363,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
SealProofType: spt,
PoStProofType: ppt,
Miner: a,
})
}, nil)
if err != nil {
return err
}

View File

@ -257,6 +257,7 @@ func Online() Option {
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
Override(new(advmgr.LocalStorage), From(new(repo.LockedRepo))),
Override(new(advmgr.SectorIDCounter), modules.SectorIDCounter),
Override(new(*advmgr.Manager), advmgr.New),
Override(new(sealmgr.Manager), From(new(*advmgr.Manager))),

View File

@ -21,9 +21,9 @@ type StorageConfig struct {
// [path]/metadata.json
type StorageMeta struct {
ID string
Weight int // 0 = readonly
Weight uint64 // 0 = readonly
CanCommit bool
CanSeal bool
CanStore bool
}

View File

@ -15,6 +15,7 @@ import (
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-fil-markets/storedcounter"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-statestore"
@ -38,6 +39,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
@ -98,6 +100,20 @@ func SectorBuilderConfig(ds dtypes.MetadataDS, fnapi api.FullNode) (*sectorbuild
return sb, nil
}
type sidsc struct {
sc *storedcounter.StoredCounter
}
func (s *sidsc) Next() (abi.SectorNumber, error) {
i, err := s.sc.Next()
return abi.SectorNumber(i), err
}
func SectorIDCounter(ds dtypes.MetadataDS) advmgr.SectorIDCounter {
sc := storedcounter.New(ds, datastore.NewKey("/storage/nextid"))
return &sidsc{sc}
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sealmgr.Manager, tktFn sealing.TicketFn) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds)
if err != nil {

View File

@ -4,10 +4,10 @@ import (
"context"
"io"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -15,6 +15,10 @@ import (
"github.com/filecoin-project/lotus/storage/sealmgr"
)
type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
type LocalStorage interface {
GetStorage() (config.StorageConfig, error)
SetStorage(config.StorageConfig) error
@ -31,18 +35,23 @@ type Path struct {
}
type Worker interface {
sealmgr.Worker
sectorbuilder.Sealer
TaskTypes() map[sealmgr.TaskType]struct{}
Paths() []Path
}
type Manager struct {
workers []sealmgr.Worker
workers []Worker
scfg *sectorbuilder.Config
sc SectorIDCounter
storage *storage
sectorbuilder.Prover
}
func New(ls LocalStorage, cfg *sectorbuilder.Config) (*Manager, error) {
func New(ls LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manager, error) {
stor := &storage{
localStorage: ls,
}
@ -62,6 +71,10 @@ func New(ls LocalStorage, cfg *sectorbuilder.Config) (*Manager, error) {
m := &Manager{
workers: nil,
scfg: cfg,
sc: sc,
storage: stor,
Prover: prover,
}
@ -69,40 +82,98 @@ func New(ls LocalStorage, cfg *sectorbuilder.Config) (*Manager, error) {
return m, nil
}
func (m Manager) SectorSize() abi.SectorSize {
func (m *Manager) SectorSize() abi.SectorSize {
sz, _ := m.scfg.SealProofType.SectorSize()
return sz
}
func (m *Manager) NewSector() (abi.SectorNumber, error) {
return m.sc.Next()
}
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func (m Manager) NewSector() (abi.SectorNumber, error) {
func (m *Manager) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sn abi.SectorNumber, r io.Reader, existingPieces []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
// TODO: consider multiple paths vs workers when initially allocating
var best []config.StorageMeta
var err error
if len(existingPieces) == 0 { // new
best, err = m.storage.findBestAllocStorage(sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.storage.findSector(m.minerID(), sn, sectorbuilder.FTUnsealed)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
}
var candidateWorkers []Worker
bestPaths := map[int]config.StorageMeta{}
for i, worker := range m.workers {
if _, ok := worker.TaskTypes()[sealmgr.TTAddPiece]; !ok {
continue
}
// check if the worker has access to the path we selected
var st *config.StorageMeta
for _, p := range worker.Paths() {
for _, m := range best {
if p.ID == m.ID {
if st != nil && st.Weight > p.Weight {
continue
}
p := m // copy
st = &p
}
}
}
if st == nil {
continue
}
bestPaths[i] = *st
candidateWorkers = append(candidateWorkers, worker)
}
if len(candidateWorkers) == 0 {
return abi.PieceInfo{}, xerrors.New("no worker selected")
}
// TODO: schedule(addpiece, ..., )
// TODO: remove sectorbuilder abstraction, pass path directly
return candidateWorkers[0].AddPiece(ctx, sz, sn, r, existingPieces)
}
func (m *Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
panic("implement me")
}
func (m Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
func (m *Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
panic("implement me")
}
func (m Manager) AddPiece(context.Context, abi.UnpaddedPieceSize, abi.SectorNumber, io.Reader, []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
panic("implement me")
}
func (m Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
panic("implement me")
}
func (m Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
func (m *Manager) FinalizeSector(context.Context, abi.SectorNumber) error {
panic("implement me")
}
func (m Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
panic("implement me")
}
func (m Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
panic("implement me")
}
func (m Manager) FinalizeSector(context.Context, abi.SectorNumber) error {
panic("implement me")
func (m *Manager) minerID() abi.ActorID {
mid, err := address.IDFromAddress(m.scfg.Miner)
if err != nil {
panic(err)
}
return abi.ActorID(mid)
}
var _ sealmgr.Manager = &Manager{}

View File

@ -94,13 +94,12 @@ func (st *storage) open() error {
}
func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
st.localLk.RLock()
if allocate != 0 {
st.localLk.RUnlock()
return sectorbuilder.SectorPaths{}, nil, xerrors.New("acquire alloc todo")
if existing | allocate != existing ^ allocate {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
st.localLk.RLock()
var out sectorbuilder.SectorPaths
for _, fileType := range pathTypes {
@ -135,9 +134,102 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
}
}
for _, fileType := range pathTypes {
if fileType & allocate == 0 {
continue
}
var best string
for _, p := range st.paths {
if sealing && !p.meta.CanSeal {
continue
}
if !sealing && !p.meta.CanStore {
continue
}
s, ok := p.sectors[abi.SectorID{
Miner: mid,
Number: id,
}]
if !ok {
continue
}
if s & fileType == 0 {
continue
}
// TODO: Check free space
// TODO: Calc weights
best = filepath.Join(p.local, fileType.String(), fmt.Sprintf("s-t0%d-%d", mid, id))
break // todo: the first path won't always be the best
}
if best == "" {
st.localLk.RUnlock()
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
}
switch fileType {
case sectorbuilder.FTUnsealed:
out.Unsealed = best
case sectorbuilder.FTSealed:
out.Sealed = best
case sectorbuilder.FTCache:
out.Cache = best
}
allocate ^= fileType
}
return out, st.localLk.RUnlock, nil
}
func (st *storage) findBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) {
var out []config.StorageMeta
for _, p := range st.paths {
if sealing && !p.meta.CanSeal {
continue
}
if !sealing && !p.meta.CanStore {
continue
}
// TODO: filter out of space
out = append(out, p.meta)
}
if len(out) == 0 {
return nil, xerrors.New("no good path found")
}
// todo: sort by some kind of preference
return out, nil
}
func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) {
var out []config.StorageMeta
for _, p := range st.paths {
t := p.sectors[abi.SectorID{
Miner: mid,
Number: sn,
}]
if t | typ == 0 {
continue
}
out = append(out, p.meta)
}
if len(out) == 0 {
return nil, xerrors.Errorf("sector %s/s-t0%d-%d not found", typ, mid, sn)
}
return out, nil
}
func parseSectorID(baseName string) (abi.SectorID, error) {
var n abi.SectorNumber
var mid abi.ActorID

View File

@ -3,6 +3,7 @@ package sealmgr
type TaskType string
const (
TTAddPiece TaskType = "seal/v0/addpiece"
TTPreCommit1 TaskType = "seal/v0/precommit/1"
TTPreCommit2 TaskType = "seal/v0/precommit/2" // Commit1 is called here too
TTCommit2 TaskType = "seal/v0/commit/2"