From 6ca55d18a71de7097fc2ffe835cb29c185dae5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 15 Mar 2024 14:10:48 +0100 Subject: [PATCH] address review --- provider/lppiece/task_park_piece.go | 27 ++++++++++++++++++++++++++- provider/lpseal/task_sdr.go | 2 -- storage/paths/remote.go | 2 ++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/provider/lppiece/task_park_piece.go b/provider/lppiece/task_park_piece.go index 034ef9b2c..62bb91660 100644 --- a/provider/lppiece/task_park_piece.go +++ b/provider/lppiece/task_park_piece.go @@ -175,6 +175,8 @@ func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask. } func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { + const maxSizePiece = 64 << 30 + return harmonytask.TaskTypeDetails{ Max: p.max, Name: "ParkPiece", @@ -182,12 +184,35 @@ func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 64 << 20, - Storage: nil, // TODO + Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing), }, MaxFailures: 10, } } +func (p *ParkPieceTask) taskToRef(id harmonytask.TaskID) (lpffi.SectorRef, error) { + var pieceIDs []struct { + ID storiface.PieceNumber `db:"id"` + } + + err := p.db.Select(context.Background(), &pieceIDs, `SELECT id FROM parked_pieces WHERE task_id = $1`, id) + if err != nil { + return lpffi.SectorRef{}, xerrors.Errorf("getting piece id: %w", err) + } + + if len(pieceIDs) != 1 { + return lpffi.SectorRef{}, xerrors.Errorf("expected 1 piece id, got %d", len(pieceIDs)) + } + + pref := pieceIDs[0].ID.Ref() + + return lpffi.SectorRef{ + SpID: int64(pref.ID.Miner), + SectorNumber: int64(pref.ID.Number), + RegSealProof: pref.ProofType, + }, nil +} + func (p *ParkPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) { p.TF.Set(taskFunc) } diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index 694ff6f46..ab86c6a82 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -187,8 +187,6 @@ func (s *SDRTask) getTicket(ctx context.Context, maddr address.Address) (abi.Sea } func (s *SDRTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - // todo check storage (reserve too?) - id := ids[0] return &id, nil } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index dfceaeace..9ff719954 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -747,6 +747,8 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size return nil, nil } +// ReaderSeq creates a simple sequential reader for a file. Does not work for +// file types which are a directory (e.g. FTCache). func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storiface.SectorFileType) (io.ReadCloser, error) { paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil {