Scaffolding for UnsealRange

This commit is contained in:
Łukasz Magiera 2020-05-14 03:01:38 +02:00
parent 920c43f589
commit 4065c94c1f
3 changed files with 70 additions and 15 deletions

View File

@ -28,7 +28,10 @@ type URLs []string
type Worker interface {
ffiwrapper.StorageSealer
Fetch(context.Context, abi.SectorID, stores.SectorFileType, bool) error
UnsealPiece(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize) error
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
@ -46,7 +49,7 @@ type Worker interface {
type SectorManager interface {
SectorSize() abi.SectorSize
ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
ReadPieceFromSealedSector(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ffiwrapper.StorageSealer
storage.Prover
@ -73,6 +76,7 @@ type SealerConfig struct {
AllowPreCommit1 bool
AllowPreCommit2 bool
AllowCommit bool
AllowUnseal bool
}
type StorageAuth http.Header
@ -107,7 +111,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
go m.sched.runSched()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReadUnsealed,
}
if sc.AllowPreCommit1 {
localTasks = append(localTasks, sealtasks.TTPreCommit1)
@ -118,6 +122,9 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
if sc.AllowCommit {
localTasks = append(localTasks, sealtasks.TTCommit2)
}
if sc.AllowUnseal {
localTasks = append(localTasks, sealtasks.TTUnseal)
}
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType,
@ -172,10 +179,6 @@ func (m *Manager) SectorSize() abi.SectorSize {
return sz
}
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func schedNop(context.Context, Worker) error {
return nil
}
@ -186,6 +189,49 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool) fun
}
}
func (m *Manager) ReadPieceFromSealedSector(ctx context.Context, sink io.Writer, sector abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
}
var selector WorkerSelector
if len(best) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed)
} else { // append to existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
if err != nil {
return xerrors.Errorf("creating unsealPiece selector: %w", err)
}
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
// TODO!!!! make schedFetch COPY stores.FTSealed and stores.FTCache
// Moving those to a temp sealing storage may make PoSts fail
err = m.sched.Schedule(ctx, sealtasks.TTUnseal, selector, schedFetch(sector, stores.FTUnsealed|stores.FTSealed|stores.FTCache, true), func(ctx context.Context, w Worker) error {
return w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)
})
if err != nil {
return err
}
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("creating readPiece selector: %w", err)
}
err = m.sched.Schedule(ctx, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error {
return w.ReadPiece(ctx, sink, sector, offset, size)
})
if err != nil {
return xerrors.Errorf("reading piece from sealed sector: %w", err)
}
return nil
}
func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
log.Warnf("stub NewSector")
return nil
@ -196,7 +242,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
var err error
if len(existingPieces) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed)
} else { // append to existing
} else { // use existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
if err != nil {

View File

@ -288,3 +288,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
},
},
}
func init() {
ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately
ResourceTable[sealtasks.TTReadUnsealed] = ResourceTable[sealtasks.TTFetch]
}

View File

@ -12,6 +12,8 @@ const (
TTFinalize TaskType = "seal/v0/finalize"
TTFetch TaskType = "seal/v0/fetch"
TTUnseal TaskType = "seal/v0/unseal"
TTReadUnsealed TaskType = "seal/v0/unsealread"
)
var order = map[TaskType]int{
@ -22,6 +24,8 @@ var order = map[TaskType]int{
TTCommit1: 3,
TTFetch: 2,
TTFinalize: 1,
TTUnseal: 0,
TTReadUnsealed: 0,
}
func (a TaskType) Less(b TaskType) bool {