curio: Add task storage to SDRTrees

This commit is contained in:
Łukasz Magiera 2024-03-24 11:22:43 +01:00 committed by Łukasz Magiera
parent c7b64bd6a9
commit 7b00cc5827
5 changed files with 102 additions and 58 deletions

View File

@ -98,6 +98,8 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
}
}
} else {
// No related reservation, acquire storage as usual
var err error
paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
if err != nil {
@ -159,17 +161,7 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID,
return nil
}
func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (cid.Cid, error) {
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()
return proof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size)
}
func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid) (cid.Cid, cid.Cid, error) {
func (sb *SealCalls) TreeDRC(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (cid.Cid, cid.Cid, error) {
p1o, err := sb.makePhase1Out(unsealed, sector.ProofType)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
@ -181,6 +173,15 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns
}
defer releaseSector()
treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("building tree-d: %w", err)
}
if treeDUnsealed != unsealed {
return cid.Undef, cid.Undef, xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid")
}
{
// create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place
ssize, err := sector.ProofType.SectorSize()
@ -224,7 +225,16 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns
}
}
return ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed)
sl, uns, err := ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err)
}
if uns != unsealed {
return cid.Undef, cid.Undef, xerrors.Errorf("unsealed cid changed after sealing")
}
return sl, uns, nil
}
func (sb *SealCalls) GenerateSynthPoRep() {

View File

@ -158,7 +158,7 @@ func (t *TaskStorage) Claim(taskID int) error {
// be fetched, so we will need to create reservations for that too.
// NOTE localStore.AcquireSector does not open or create any files, nor does it reserve space. It only proposes
// paths to be used.
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, storiface.PathSealing, storiface.AcquireMove)
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, t.pathType, storiface.AcquireMove)
if err != nil {
return err
}

View File

@ -186,22 +186,12 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
ProofType: sectorParams.RegSealProof,
}
// D
treeUnsealed, err := t.sc.TreeD(ctx, sref, abi.PaddedPieceSize(ssize), dataReader, unpaddedData)
if err != nil {
return false, xerrors.Errorf("computing tree d: %w", err)
}
// R / C
sealed, unsealed, err := t.sc.TreeRC(ctx, sref, commd)
// D / R / C
sealed, unsealed, err := t.sc.TreeDRC(ctx, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData)
if err != nil {
return false, xerrors.Errorf("computing tree r and c: %w", err)
}
if unsealed != treeUnsealed {
return false, xerrors.Errorf("tree-d and tree-r/c unsealed CIDs disagree")
}
// todo synth porep
// todo porep challenge check
@ -228,13 +218,19 @@ func (t *TreesTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Task
}
func (t *TreesTask) TypeDetails() harmonytask.TaskTypeDetails {
ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size
if isDevnet {
ssize = abi.SectorSize(2 << 20)
}
return harmonytask.TaskTypeDetails{
Max: t.max,
Name: "SDRTrees",
Cost: resources.Resources{
Cpu: 1,
Gpu: 1,
Ram: 8000 << 20, // todo
Cpu: 1,
Gpu: 1,
Ram: 8000 << 20, // todo
Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing),
},
MaxFailures: 3,
Follows: nil,
@ -245,6 +241,21 @@ func (t *TreesTask) Adder(taskFunc harmonytask.AddTaskFunc) {
t.sp.pollers[pollerTrees].Set(taskFunc)
}
func (t *TreesTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) {
var refs []ffi.SectorRef
err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_r = $1`, id)
if err != nil {
return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err)
}
if len(refs) != 1 {
return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs))
}
return refs[0], nil
}
type UrlPieceReader struct {
Url string
RawSize int64 // the exact number of bytes read, if we read more or less that's an error

View File

@ -53,7 +53,15 @@ type path struct {
reservations map[abi.SectorID]storiface.SectorFileType
}
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
// statExistingSectorForReservation is optional parameter for stat method
// which will make it take into account existing sectors when calculating
// available space for new reservations
type statExistingSectorForReservation struct {
id abi.SectorID
ft storiface.SectorFileType
}
func (p *path) stat(ls LocalStorage, newReserve ...statExistingSectorForReservation) (fsutil.FsStat, error) {
start := time.Now()
stat, err := ls.Stat(p.local)
@ -63,34 +71,49 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
stat.Reserved = p.reserved
accountExistingFiles := func(id abi.SectorID, fileType storiface.SectorFileType) error {
sp := p.sectorPath(id, fileType)
used, err := ls.DiskUsage(sp)
if err == os.ErrNotExist {
p, ferr := tempFetchDest(sp, false)
if ferr != nil {
return ferr
}
used, err = ls.DiskUsage(p)
}
if err != nil {
// we don't care about 'not exist' errors, as storage can be
// reserved before any files are written, so this error is not
// unexpected
if !os.IsNotExist(err) {
log.Warnf("getting disk usage of '%s': %+v", p.sectorPath(id, fileType), err)
}
return nil
}
stat.Reserved -= used
return nil
}
for id, ft := range p.reservations {
for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 {
for _, fileType := range ft.AllSet() {
if err := accountExistingFiles(id, fileType); err != nil {
return fsutil.FsStat{}, err
}
}
}
for _, reservation := range newReserve {
for _, fileType := range reservation.ft.AllSet() {
if p.reservations[reservation.id]&fileType != 0 {
// already accounted for
continue
}
sp := p.sectorPath(id, fileType)
used, err := ls.DiskUsage(sp)
if err == os.ErrNotExist {
p, ferr := tempFetchDest(sp, false)
if ferr != nil {
return fsutil.FsStat{}, ferr
}
used, err = ls.DiskUsage(p)
if err := accountExistingFiles(reservation.id, fileType); err != nil {
return fsutil.FsStat{}, err
}
if err != nil {
// we don't care about 'not exist' errors, as storage can be
// reserved before any files are written, so this error is not
// unexpected
if !os.IsNotExist(err) {
log.Warnf("getting disk usage of '%s': %+v", p.sectorPath(id, fileType), err)
}
continue
}
stat.Reserved -= used
}
}
@ -414,11 +437,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
deferredDone()
}()
for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 {
continue
}
for _, fileType := range ft.AllSet() {
id := storiface.ID(storiface.PathByType(storageIDs, fileType))
p, ok := st.paths[id]
@ -426,7 +445,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
return nil, errPathNotFound
}
stat, err := p.stat(st.localStorage)
stat, err := p.stat(st.localStorage, statExistingSectorForReservation{sid.ID, fileType})
if err != nil {
return nil, xerrors.Errorf("getting local storage stat: %w", err)
}

View File

@ -167,6 +167,8 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
}
log.Debugw("Fetching sector data without existing reservation", "sector", s, "toFetch", toFetch, "fetchPaths", fetchPaths, "fetchIDs", fetchIDs)
overheadTable := storiface.FSOverheadSeal
if pathType == storiface.PathStorage {
overheadTable = storiface.FsOverheadFinalized
@ -183,6 +185,8 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist
} else {
fetchPaths = settings.Into.Paths
fetchIDs = settings.Into.IDs
log.Debugw("Fetching sector data with existing reservation", "sector", s, "toFetch", toFetch, "fetchPaths", fetchPaths, "fetchIDs", fetchIDs)
}
for _, fileType := range toFetch.AllSet() {