sealmgr: Implement all sealing steps

This commit is contained in:
Łukasz Magiera 2020-03-05 19:18:33 +01:00
parent 4beed065b6
commit f06bf6721e
4 changed files with 117 additions and 35 deletions

View File

@ -49,7 +49,7 @@ func (m *Sealing) PledgeSector() error {
size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
rt, _, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
_, rt, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
log.Error(err)
return

View File

@ -54,23 +54,48 @@ func (l *localWorker) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sn
}
func (l *localWorker) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
panic("implement me")
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (l *localWorker) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
panic("implement me")
sb, err := l.sb()
if err != nil {
return cid.Undef, cid.Undef, err
}
return sb.SealPreCommit2(ctx, sectorNum, phase1Out)
}
func (l *localWorker) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
panic("implement me")
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (l *localWorker) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
panic("implement me")
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit2(ctx, sectorNum, phase1Out)
}
func (l *localWorker) FinalizeSector(context.Context, abi.SectorNumber) error {
panic("implement me")
func (l *localWorker) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
sb, err := l.sb()
if err != nil {
return err
}
return sb.FinalizeSector(ctx, sectorNum)
}
func (l *localWorker) TaskTypes() map[sealmgr.TaskType]struct{} {

View File

@ -97,32 +97,19 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, s
panic("implement me")
}
func (m *Manager) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sn abi.SectorNumber, r io.Reader, existingPieces []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
// TODO: consider multiple paths vs workers when initially allocating
var best []config.StorageMeta
var err error
if len(existingPieces) == 0 { // new
best, err = m.storage.findBestAllocStorage(sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.storage.findSector(m.minerID(), sn, sectorbuilder.FTUnsealed)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
}
var candidateWorkers []Worker
bestPaths := map[int]config.StorageMeta{}
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.StorageMeta) ([]Worker, map[int]config.StorageMeta) {
var workers []Worker
paths := map[int]config.StorageMeta{}
for i, worker := range m.workers {
if _, ok := worker.TaskTypes()[sealmgr.TTAddPiece]; !ok {
if _, ok := worker.TaskTypes()[task]; !ok {
continue
}
// check if the worker has access to the path we selected
var st *config.StorageMeta
for _, p := range worker.Paths() {
for _, m := range best {
for _, m := range inPaths {
if p.ID == m.ID {
if st != nil && st.Weight > p.Weight {
continue
@ -137,37 +124,103 @@ func (m *Manager) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sn abi
continue
}
bestPaths[i] = *st
candidateWorkers = append(candidateWorkers, worker)
paths[i] = *st
workers = append(workers, worker)
}
return workers, paths
}
func (m *Manager) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sn abi.SectorNumber, r io.Reader, existingPieces []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
// TODO: consider multiple paths vs workers when initially allocating
var best []config.StorageMeta
var err error
if len(existingPieces) == 0 { // new
best, err = m.storage.findBestAllocStorage(sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.storage.findSector(m.minerID(), sn, sectorbuilder.FTUnsealed)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTAddPiece, best)
if len(candidateWorkers) == 0 {
return abi.PieceInfo{}, xerrors.New("no worker found")
}
// TODO: schedule(addpiece, ..., )
// TODO: remove sectorbuilder abstraction, pass path directly
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].AddPiece(ctx, sz, sn, r, existingPieces)
}
func (m *Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
panic("implement me")
// TODO: also consider where the unsealed data sits
best, err := m.storage.findBestAllocStorage(sectorbuilder.FTCache | sectorbuilder.FTSealed, true)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (m *Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
panic("implement me")
// TODO: allow workers to fetch the sectors
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache | sectorbuilder.FTSealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit2(ctx, sectorNum, phase1Out)
}
func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
panic("implement me")
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache | sectorbuilder.FTSealed)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
panic("implement me")
for _, worker := range m.workers {
if _, ok := worker.TaskTypes()[sealmgr.TTCommit2]; !ok {
continue
}
return worker.SealCommit2(ctx, sectorNum, phase1Out)
}
return nil, xerrors.New("no worker found")
}
func (m *Manager) FinalizeSector(context.Context, abi.SectorNumber) error {
panic("implement me")
func (m *Manager) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache | sectorbuilder.FTSealed | sectorbuilder.FTUnsealed)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) // find last worker with the sector
// TODO: Move the sector to long-term storage
return candidateWorkers[0].FinalizeSector(ctx, sectorNum)
}
func (m *Manager) minerID() abi.ActorID {

View File

@ -156,6 +156,10 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
if !sealing && !p.meta.CanStore {
continue
}
p.sectors[abi.SectorID{
Miner: mid,
Number: id,
}] |= fileType
// TODO: Check free space
// TODO: Calc weights