From e060cd2f37774a408eaeac9beb7eced6aebec516 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 28 Feb 2024 20:57:12 +0100 Subject: [PATCH 01/15] lppiece: Implement Piece Park --- cmd/lotus-provider/tasks/tasks.go | 17 +- .../harmonydb/sql/20240228-piece-park.sql | 30 ++++ node/config/types.go | 8 + provider/lpffi/piece_funcs.go | 65 +++++++ provider/lppiece/task_park_piece.go | 162 ++++++++++++++++++ provider/lpseal/task_trees.go | 11 ++ storage/paths/http_handler.go | 19 +- storage/sealer/ffiwrapper/basicfs/fs.go | 3 + storage/sealer/storiface/filetype.go | 19 +- storage/sealer/storiface/storage.go | 11 ++ 10 files changed, 326 insertions(+), 19 deletions(-) create mode 100644 lib/harmony/harmonydb/sql/20240228-piece-park.sql create mode 100644 provider/lpffi/piece_funcs.go create mode 100644 provider/lppiece/task_park_piece.go diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index 6ecd680d0..11c12216b 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -3,6 +3,9 @@ package tasks import ( "context" + "github.com/filecoin-project/lotus/lib/lazy" + "github.com/filecoin-project/lotus/lib/must" + "github.com/filecoin-project/lotus/provider/lppiece" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" @@ -64,6 +67,18 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } } + slrLazy := lazy.MakeLazy(func() (*lpffi.SealCalls, error) { + return lpffi.NewSealCalls(stor, lstor, si), nil + }) + + { + // Piece handling + if cfg.Subsystems.EnableParkPiece { + parkPieceTask := lppiece.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks) + activeTasks = append(activeTasks, parkPieceTask) + } + } + hasAnySealingTask := cfg.Subsystems.EnableSealSDR || cfg.Subsystems.EnableSealSDRTrees || cfg.Subsystems.EnableSendPrecommitMsg || @@ -79,7 +94,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task sp = lpseal.NewPoller(db, full) go sp.RunPoller(ctx) - slr = lpffi.NewSealCalls(stor, lstor, si) + slr = must.One(slrLazy.Val()) } // NOTE: Tasks with the LEAST priority are at the top diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql new file mode 100644 index 000000000..c58ecdafe --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -0,0 +1,30 @@ +create table parked_pieces ( + id serial primary key, + created_at timestamp default current_timestamp, + + piece_cid text not null, + piece_padded_size bigint not null, + piece_raw_size text not null, + + complete boolean not null default false +); + +create table parked_piece_refs ( + ref_id serial primary key, + piece_id int not null, + + foreign key (piece_id) references parked_pieces(id) on delete cascade +); + +create table park_piece_tasks ( + task_id bigint not null + constraint park_piece_tasks_pk + primary key, + + piece_ref_id int not null, + + data_url text not null, + data_headers jsonb not null default '{}', + data_raw_size bigint not null, + data_delete_on_finalize bool not null +); diff --git a/node/config/types.go b/node/config/types.go index 789d24103..d770512c5 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -112,6 +112,14 @@ type ProviderSubsystemsConfig struct { EnableWinningPost bool WinningPostMaxTasks int + // EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching + // pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is + // only applicable when integrating with boost, and should be enabled on nodes which will hold deal data + // from boost until sectors containing the related pieces have the TreeD/TreeR constructed. + // Note that future Curio implementations will have a separate task type for fetching pieces from the internet. + EnableParkPiece bool + ParkPieceMaxTasks int + // EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation // creating 11 layer files in sector cache directory. // diff --git a/provider/lpffi/piece_funcs.go b/provider/lpffi/piece_funcs.go new file mode 100644 index 000000000..4e2816786 --- /dev/null +++ b/provider/lpffi/piece_funcs.go @@ -0,0 +1,65 @@ +package lpffi + +import ( + "context" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "golang.org/x/xerrors" + "io" + "os" + "time" +) + +func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumber, size int64, data io.Reader) error { + // todo: config(?): allow setting PathStorage for this + // todo storage reservations + paths, done, err := sb.sectors.AcquireSector(ctx, nil, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing) + if err != nil { + return err + } + defer done() + + dest := paths.Piece + tempDest := dest + ".tmp" + + destFile, err := os.OpenFile(tempDest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return xerrors.Errorf("creating temp piece file '%s': %w", tempDest, err) + } + + removeTemp := true + defer func() { + if removeTemp { + rerr := os.Remove(tempDest) + if rerr != nil { + log.Errorf("removing temp file: %+v", rerr) + } + } + }() + + copyStart := time.Now() + + n, err := io.CopyBuffer(destFile, io.LimitReader(data, size), make([]byte, 8<<20)) + if err != nil { + _ = destFile.Close() + return xerrors.Errorf("copying piece data: %w", err) + } + + if err := destFile.Close(); err != nil { + return xerrors.Errorf("closing temp piece file: %w", err) + } + + if n != size { + return xerrors.Errorf("short write: %d", n) + } + + copyEnd := time.Now() + + log.Infow("wrote parked piece", "piece", pieceID, "size", size, "duration", copyEnd.Sub(copyStart), "dest", dest, "MiB/s", float64(size)/(1<<20)/copyEnd.Sub(copyStart).Seconds()) + + if err := os.Rename(tempDest, dest); err != nil { + return xerrors.Errorf("rename temp piece to dest %s -> %s: %w", tempDest, dest, err) + } + + removeTemp = false + return nil +} diff --git a/provider/lppiece/task_park_piece.go b/provider/lppiece/task_park_piece.go new file mode 100644 index 000000000..370775803 --- /dev/null +++ b/provider/lppiece/task_park_piece.go @@ -0,0 +1,162 @@ +package lppiece + +import ( + "context" + "encoding/json" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/provider/lpffi" + "github.com/filecoin-project/lotus/provider/lpseal" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + "net/http" + "time" +) + +var log = logging.Logger("lppiece") + +// ParkPieceTask gets a piece from some origin, and parks it in storage +// Pieces are always f00, piece ID is mapped to pieceCID in the DB +type ParkPieceTask struct { + db *harmonydb.DB + sc *lpffi.SealCalls + + TF promise.Promise[harmonytask.AddTaskFunc] + + max int +} + +func NewParkPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *ParkPieceTask { + return &ParkPieceTask{ + db: db, + sc: sc, + + max: max, + } +} + +func (p *ParkPieceTask) PullPiece(ctx context.Context, pieceCID cid.Cid, rawSize int64, paddedSize abi.PaddedPieceSize, dataUrl string, headers http.Header) error { + p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + var pieceID int + err = tx.QueryRow(`INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) VALUES ($1, $2, $3) RETURNING id`, pieceCID.String(), int64(paddedSize), rawSize).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece: %w", err) + } + + var refID int + err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id) VALUES ($1) RETURNING ref_id`, pieceID).Scan(&refID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece ref: %w", err) + } + + headersJson, err := json.Marshal(headers) + if err != nil { + return false, xerrors.Errorf("marshaling headers: %w", err) + } + + _, err = tx.Exec(`INSERT INTO park_piece_tasks (task_id, piece_ref_id, data_url, data_headers, data_raw_size, data_delete_on_finalize) + VALUES ($1, $2, $3, $4, $5, $6)`, id, refID, dataUrl, headersJson, rawSize, false) + if err != nil { + return false, xerrors.Errorf("inserting park piece task: %w", err) + } + + return true, nil + }) + + return nil +} + +func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + var taskData []struct { + PieceID int `db:"id"` + PieceCreatedAt time.Time `db:"created_at"` + PieceCID string `db:"piece_cid"` + Complete bool `db:"complete"` + + DataURL string `db:"data_url"` + DataHeaders string `db:"data_headers"` + DataRawSize int64 `db:"data_raw_size"` + DataDeleteOnFinalize bool `db:"data_delete_on_finalize"` + } + + err = p.db.Select(ctx, &taskData, ` + select + pp.id, + pp.created_at, + pp.piece_cid, + pp.complete, + ppt.data_url, + ppt.data_headers, + ppt.data_raw_size, + ppt.data_delete_on_finalize + from park_piece_tasks ppt + join parked_piece_refs ppr on ppt.piece_ref_id = ppr.ref_id + join parked_pieces pp on ppr.piece_id = pp.id + where ppt.task_id = $1 + `, taskID) + if err != nil { + return false, err + } + + if len(taskData) != 1 { + return false, xerrors.Errorf("expected 1 task, got %d", len(taskData)) + } + + if taskData[0].Complete { + log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", taskData[0].PieceCID) + return true, nil + } + + upr := &lpseal.UrlPieceReader{ + Url: taskData[0].DataURL, + RawSize: taskData[0].DataRawSize, + } + defer func() { + _ = upr.Close() + }() + + pnum := storiface.PieceNumber(taskData[0].PieceID) + + if err := p.sc.WritePiece(ctx, pnum, taskData[0].DataRawSize, upr); err != nil { + return false, xerrors.Errorf("write piece: %w", err) + } + + _, err = p.db.Exec(ctx, `update parked_pieces set complete = true where id = $1`, taskData[0].PieceID) + if err != nil { + return false, xerrors.Errorf("marking piece as complete: %w", err) + } + + return true, nil +} + +func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := ids[0] + return &id, nil +} + +func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: p.max, + Name: "ParkPiece", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 0, + Ram: 64 << 20, + Storage: nil, // TODO + }, + MaxFailures: 10, + } +} + +func (p *ParkPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) { + p.TF.Set(taskFunc) +} + +var _ harmonytask.TaskInterface = &ParkPieceTask{} diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index da0fcf1e9..a632c5e87 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -200,6 +200,7 @@ type UrlPieceReader struct { RawSize int64 // the exact number of bytes read, if we read more or less that's an error readSoFar int64 + closed bool active io.ReadCloser // auto-closed on EOF } @@ -239,6 +240,7 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) { // If EOF is reached, close the reader if err == io.EOF { cerr := u.active.Close() + u.closed = true if cerr != nil { log.Errorf("error closing http piece reader: %s", cerr) } @@ -253,4 +255,13 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) { return n, err } +func (u *UrlPieceReader) Close() error { + if !u.closed { + u.closed = true + return u.active.Close() + } + + return nil +} + var _ harmonytask.TaskInterface = &TreesTask{} diff --git a/storage/paths/http_handler.go b/storage/paths/http_handler.go index 57de578a6..92f4162db 100644 --- a/storage/paths/http_handler.go +++ b/storage/paths/http_handler.go @@ -9,12 +9,10 @@ import ( "strconv" "time" + "github.com/filecoin-project/go-state-types/abi" "github.com/gorilla/mux" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/storage/sealer/partialfile" "github.com/filecoin-project/lotus/storage/sealer/storiface" @@ -340,18 +338,5 @@ func (handler *FetchHandler) generatePoRepVanillaProof(w http.ResponseWriter, r } func FileTypeFromString(t string) (storiface.SectorFileType, error) { - switch t { - case storiface.FTUnsealed.String(): - return storiface.FTUnsealed, nil - case storiface.FTSealed.String(): - return storiface.FTSealed, nil - case storiface.FTCache.String(): - return storiface.FTCache, nil - case storiface.FTUpdate.String(): - return storiface.FTUpdate, nil - case storiface.FTUpdateCache.String(): - return storiface.FTUpdateCache, nil - default: - return 0, xerrors.Errorf("unknown sector file type: '%s'", t) - } + return storiface.TypeFromString(t) } diff --git a/storage/sealer/ffiwrapper/basicfs/fs.go b/storage/sealer/ffiwrapper/basicfs/fs.go index 4fd8e271f..47c7f526e 100644 --- a/storage/sealer/ffiwrapper/basicfs/fs.go +++ b/storage/sealer/ffiwrapper/basicfs/fs.go @@ -39,6 +39,9 @@ func (b *Provider) AcquireSector(ctx context.Context, id storiface.SectorRef, ex if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUpdateCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint return storiface.SectorPaths{}, nil, err } + if err := os.Mkdir(filepath.Join(b.Root, storiface.FTPiece.String()), 0755); err != nil && !os.IsExist(err) { // nolint + return storiface.SectorPaths{}, nil, err + } done := func() {} diff --git a/storage/sealer/storiface/filetype.go b/storage/sealer/storiface/filetype.go index ec3c5450c..887dda688 100644 --- a/storage/sealer/storiface/filetype.go +++ b/storage/sealer/storiface/filetype.go @@ -9,16 +9,22 @@ import ( ) const ( + // "regular" sectors FTUnsealed SectorFileType = 1 << iota FTSealed FTCache + + // snap FTUpdate FTUpdateCache + // Piece Park + FTPiece + FileTypes = iota ) -var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache} +var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache, FTPiece} const ( FTNone SectorFileType = 0 @@ -39,6 +45,7 @@ var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads FTUpdate: FSOverheadDen, FTUpdateCache: FSOverheadDen*2 + 1, FTCache: 141, // 11 layers + D(2x ssize) + C + R' + FTPiece: FSOverheadDen, } // sector size * disk / fs overhead. FSOverheadDen is like the unit of sector size @@ -49,6 +56,7 @@ var FsOverheadFinalized = map[SectorFileType]int{ FTUpdate: FSOverheadDen, FTUpdateCache: 1, FTCache: 1, + FTPiece: FSOverheadDen, } type SectorFileType int @@ -65,6 +73,8 @@ func TypeFromString(s string) (SectorFileType, error) { return FTUpdate, nil case "update-cache": return FTUpdateCache, nil + case "piece": + return FTPiece, nil default: return 0, xerrors.Errorf("unknown sector file type '%s'", s) } @@ -82,6 +92,8 @@ func (t SectorFileType) String() string { return "update" case FTUpdateCache: return "update-cache" + case FTPiece: + return "piece" default: return fmt.Sprintf("", t, (t & ((1 << FileTypes) - 1)).Strings()) } @@ -206,6 +218,7 @@ type SectorPaths struct { Cache string Update string UpdateCache string + Piece string } func ParseSectorID(baseName string) (abi.SectorID, error) { @@ -242,6 +255,8 @@ func PathByType(sps SectorPaths, fileType SectorFileType) string { return sps.Update case FTUpdateCache: return sps.UpdateCache + case FTPiece: + return sps.Piece } panic("requested unknown path type") @@ -259,5 +274,7 @@ func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) { sps.Update = p case FTUpdateCache: sps.UpdateCache = p + case FTPiece: + sps.Piece = p } } diff --git a/storage/sealer/storiface/storage.go b/storage/sealer/storiface/storage.go index fe4e1e208..75cc9399c 100644 --- a/storage/sealer/storiface/storage.go +++ b/storage/sealer/storiface/storage.go @@ -20,6 +20,17 @@ type SectorRef struct { var NoSectorRef = SectorRef{} +// PieceNumber is a reference to a piece in the storage system; mapping between +// pieces in the storage system and piece CIDs is maintained by the storage index +type PieceNumber uint64 + +func (pn PieceNumber) Ref() SectorRef { + return SectorRef{ + ID: abi.SectorID{Miner: 0, Number: abi.SectorNumber(pn)}, + ProofType: abi.RegisteredSealProof_StackedDrg64GiBV1, // This only cares about TreeD which is the same for all sizes + } +} + type ProverPoSt interface { GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, ppt abi.RegisteredPoStProof, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error) From 0800e6e5a7c015b89057d15616745330ca64f79c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 28 Feb 2024 21:50:12 +0100 Subject: [PATCH 02/15] make gen --- build/openrpc/miner.json | 10 ++++++---- cmd/lotus-provider/tasks/tasks.go | 6 +++--- documentation/en/api-v0-methods-miner.md | 2 ++ .../en/default-lotus-provider-config.toml | 12 ++++++++++++ .../harmonydb/sql/20240228-piece-park.sql | 16 +++++++++++----- node/config/doc_gen.go | 16 ++++++++++++++++ provider/lpffi/piece_funcs.go | 6 ++++-- provider/lppiece/task_park_piece.go | 13 ++++++++----- storage/paths/http_handler.go | 3 ++- 9 files changed, 64 insertions(+), 20 deletions(-) diff --git a/build/openrpc/miner.json b/build/openrpc/miner.json index 0686987c8..2b0887ae2 100644 --- a/build/openrpc/miner.json +++ b/build/openrpc/miner.json @@ -9711,6 +9711,7 @@ 0, 1, 0, + 0, 0 ], "Read": [ @@ -9718,6 +9719,7 @@ 3, 0, 0, + 0, 0 ] } @@ -9736,8 +9738,8 @@ "title": "number", "type": "number" }, - "maxItems": 5, - "minItems": 5, + "maxItems": 6, + "minItems": 6, "type": "array" }, "Sector": { @@ -9760,8 +9762,8 @@ "title": "number", "type": "number" }, - "maxItems": 5, - "minItems": 5, + "maxItems": 6, + "minItems": 6, "type": "array" } }, diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index 11c12216b..71e794545 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -3,9 +3,6 @@ package tasks import ( "context" - "github.com/filecoin-project/lotus/lib/lazy" - "github.com/filecoin-project/lotus/lib/must" - "github.com/filecoin-project/lotus/provider/lppiece" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" @@ -13,11 +10,14 @@ import ( "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/lazy" + "github.com/filecoin-project/lotus/lib/must" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/provider" "github.com/filecoin-project/lotus/provider/chainsched" "github.com/filecoin-project/lotus/provider/lpffi" "github.com/filecoin-project/lotus/provider/lpmessage" + "github.com/filecoin-project/lotus/provider/lppiece" "github.com/filecoin-project/lotus/provider/lpseal" "github.com/filecoin-project/lotus/provider/lpwinning" ) diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index b133930bc..dd2f511db 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -3954,6 +3954,7 @@ Response: 0, 1, 0, + 0, 0 ], "Read": [ @@ -3961,6 +3962,7 @@ Response: 3, 0, 0, + 0, 0 ] } diff --git a/documentation/en/default-lotus-provider-config.toml b/documentation/en/default-lotus-provider-config.toml index dd9921769..83a7c243d 100644 --- a/documentation/en/default-lotus-provider-config.toml +++ b/documentation/en/default-lotus-provider-config.toml @@ -25,6 +25,18 @@ # type: int #WinningPostMaxTasks = 0 + # EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching + # pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is + # only applicable when integrating with boost, and should be enabled on nodes which will hold deal data + # from boost until sectors containing the related pieces have the TreeD/TreeR constructed. + # Note that future Curio implementations will have a separate task type for fetching pieces from the internet. + # + # type: bool + #EnableParkPiece = false + + # type: int + #ParkPieceMaxTasks = 0 + # EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation # creating 11 layer files in sector cache directory. # diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index c58ecdafe..a1bf0efa9 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -1,17 +1,23 @@ create table parked_pieces ( - id serial primary key, + id bigserial primary key, created_at timestamp default current_timestamp, piece_cid text not null, piece_padded_size bigint not null, piece_raw_size text not null, - + complete boolean not null default false ); +/* + * This table is used to keep track of the references to the parked pieces + * so that we can delete them when they are no longer needed. + * + * All references into the parked_pieces table should be done through this table. + */ create table parked_piece_refs ( - ref_id serial primary key, - piece_id int not null, + ref_id bigserial primary key, + piece_id bigint not null, foreign key (piece_id) references parked_pieces(id) on delete cascade ); @@ -21,7 +27,7 @@ create table park_piece_tasks ( constraint park_piece_tasks_pk primary key, - piece_ref_id int not null, + piece_ref_id bigint not null, data_url text not null, data_headers jsonb not null default '{}', diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index f28c5abd8..c49dad3ac 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -1033,6 +1033,22 @@ documentation.`, Comment: ``, }, + { + Name: "EnableParkPiece", + Type: "bool", + + Comment: `EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching +pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is +only applicable when integrating with boost, and should be enabled on nodes which will hold deal data +from boost until sectors containing the related pieces have the TreeD/TreeR constructed. +Note that future Curio implementations will have a separate task type for fetching pieces from the internet.`, + }, + { + Name: "ParkPieceMaxTasks", + Type: "int", + + Comment: ``, + }, { Name: "EnableSealSDR", Type: "bool", diff --git a/provider/lpffi/piece_funcs.go b/provider/lpffi/piece_funcs.go index 4e2816786..2ee75151b 100644 --- a/provider/lpffi/piece_funcs.go +++ b/provider/lpffi/piece_funcs.go @@ -2,11 +2,13 @@ package lpffi import ( "context" - "github.com/filecoin-project/lotus/storage/sealer/storiface" - "golang.org/x/xerrors" "io" "os" "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumber, size int64, data io.Reader) error { diff --git a/provider/lppiece/task_park_piece.go b/provider/lppiece/task_park_piece.go index 370775803..2ddcd7617 100644 --- a/provider/lppiece/task_park_piece.go +++ b/provider/lppiece/task_park_piece.go @@ -3,7 +3,15 @@ package lppiece import ( "context" "encoding/json" + "net/http" + "time" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" @@ -11,11 +19,6 @@ import ( "github.com/filecoin-project/lotus/provider/lpffi" "github.com/filecoin-project/lotus/provider/lpseal" "github.com/filecoin-project/lotus/storage/sealer/storiface" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - "net/http" - "time" ) var log = logging.Logger("lppiece") diff --git a/storage/paths/http_handler.go b/storage/paths/http_handler.go index 92f4162db..c828f6006 100644 --- a/storage/paths/http_handler.go +++ b/storage/paths/http_handler.go @@ -9,11 +9,12 @@ import ( "strconv" "time" - "github.com/filecoin-project/go-state-types/abi" "github.com/gorilla/mux" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/storage/sealer/partialfile" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/tarutil" From 1bb228898ea1936724576a1aeb46c4b98f484321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 29 Feb 2024 11:11:40 +0100 Subject: [PATCH 03/15] Integrate PiecePark into boost-proxy --- cmd/lotus-shed/lpdeal.go | 79 ++++++-- .../harmonydb/sql/20240228-piece-park.sql | 25 ++- provider/lppiece/task_park_piece.go | 170 ++++++++++-------- 3 files changed, 173 insertions(+), 101 deletions(-) diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go index 01f886015..d5c718f66 100644 --- a/cmd/lotus-shed/lpdeal.go +++ b/cmd/lotus-shed/lpdeal.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "fmt" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/google/uuid" "io" "net" "net/http" @@ -14,7 +16,6 @@ import ( "time" "github.com/fatih/color" - "github.com/ipfs/go-cid" "github.com/mitchellh/go-homedir" manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" @@ -427,7 +428,7 @@ var lpBoostProxyCmd = &cli.Command{ } pieceInfoLk := new(sync.Mutex) - pieceInfos := map[cid.Cid][]pieceInfo{} + pieceInfos := map[uuid.UUID][]pieceInfo{} ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) { origPieceData := pieceData @@ -449,25 +450,68 @@ var lpBoostProxyCmd = &cli.Command{ done: make(chan struct{}), } + pieceUUID := uuid.New() + pieceInfoLk.Lock() - pieceInfos[deal.DealProposal.PieceCID] = append(pieceInfos[deal.DealProposal.PieceCID], pi) + pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi) pieceInfoLk.Unlock() // /piece?piece_cid=xxxx dataUrl := rootUrl dataUrl.Path = "/piece" - dataUrl.RawQuery = "piece_cid=" + deal.DealProposal.PieceCID.String() + dataUrl.RawQuery = "piece_id=" + pieceUUID.String() + + // add piece entry + + var refID int64 + + comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // Add parked_piece, on conflict do nothing + var pieceID int64 + err = tx.QueryRow(` + INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) + VALUES ($1, $2, $3) + ON CONFLICT (piece_cid) DO UPDATE + SET piece_cid = EXCLUDED.piece_cid + RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("upserting parked piece and getting id: %w", err) + } + + // Add parked_piece_ref + err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) + VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece ref: %w", err) + } + + // If everything went well, commit the transaction + return true, nil // This will commit the transaction + }, harmonydb.OptionRetry()) + if err != nil { + return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err) + } + if !comm { + return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit") + } + + // wait for piece to be parked + + <-pi.done + + pieceIDUrl := url.URL{ + Scheme: "pieceref", + Opaque: fmt.Sprintf("%d", refID), + } // make a sector - so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), dataUrl, nil) + so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil) if err != nil { return api.SectorOffset{}, err } color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset) - <-pi.done - return so, nil } @@ -484,10 +528,12 @@ var lpBoostProxyCmd = &cli.Command{ ast.Internal.StorageGetLocks = si.StorageGetLocks var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) { - // /piece?piece_cid=xxxx - pieceCid, err := cid.Decode(r.URL.Query().Get("piece_cid")) + // /piece?piece_id=xxxx + pieceUUID := r.URL.Query().Get("piece_id") + + pu, err := uuid.Parse(pieceUUID) if err != nil { - http.Error(w, "bad piece_cid", http.StatusBadRequest) + http.Error(w, "bad piece id", http.StatusBadRequest) return } @@ -496,13 +542,13 @@ var lpBoostProxyCmd = &cli.Command{ return } - fmt.Printf("%s request for piece from %s\n", pieceCid, r.RemoteAddr) + fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr) pieceInfoLk.Lock() - pis, ok := pieceInfos[pieceCid] + pis, ok := pieceInfos[pu] if !ok { http.Error(w, "piece not found", http.StatusNotFound) - color.Red("%s not found", pieceCid) + color.Red("%s not found", pu) pieceInfoLk.Unlock() return } @@ -511,7 +557,10 @@ var lpBoostProxyCmd = &cli.Command{ pi := pis[0] pis = pis[1:] - pieceInfos[pieceCid] = pis + pieceInfos[pu] = pis + if len(pis) == 0 { + delete(pieceInfos, pu) + } pieceInfoLk.Unlock() @@ -533,7 +582,7 @@ var lpBoostProxyCmd = &cli.Command{ return } - color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pieceCid, float64(n)/(1024*1024), took, mbps) + color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps) } finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast) diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index a1bf0efa9..ebba68fae 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -2,11 +2,14 @@ create table parked_pieces ( id bigserial primary key, created_at timestamp default current_timestamp, - piece_cid text not null, + piece_cid text not null unique constraint parked_pieces_piece_cid_key, piece_padded_size bigint not null, piece_raw_size text not null, - complete boolean not null default false + complete boolean not null default false, + task_id bigint default null, + + foreign key (task_id) references harmony_task (id) on delete set null ); /* @@ -14,23 +17,15 @@ create table parked_pieces ( * so that we can delete them when they are no longer needed. * * All references into the parked_pieces table should be done through this table. + * + * data_url is optional for refs which also act as data sources. */ create table parked_piece_refs ( ref_id bigserial primary key, piece_id bigint not null, + data_url text, + data_headers jsonb not null default '{}', + foreign key (piece_id) references parked_pieces(id) on delete cascade ); - -create table park_piece_tasks ( - task_id bigint not null - constraint park_piece_tasks_pk - primary key, - - piece_ref_id bigint not null, - - data_url text not null, - data_headers jsonb not null default '{}', - data_raw_size bigint not null, - data_delete_on_finalize bool not null -); diff --git a/provider/lppiece/task_park_piece.go b/provider/lppiece/task_park_piece.go index 2ddcd7617..7fb5a0ac3 100644 --- a/provider/lppiece/task_park_piece.go +++ b/provider/lppiece/task_park_piece.go @@ -3,15 +3,12 @@ package lppiece import ( "context" "encoding/json" - "net/http" + "strconv" "time" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" @@ -22,6 +19,7 @@ import ( ) var log = logging.Logger("lppiece") +var PieceParkPollInterval = time.Second * 15 // ParkPieceTask gets a piece from some origin, and parks it in storage // Pieces are always f00, piece ID is mapped to pieceCID in the DB @@ -35,108 +33,138 @@ type ParkPieceTask struct { } func NewParkPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *ParkPieceTask { - return &ParkPieceTask{ + pt := &ParkPieceTask{ db: db, sc: sc, max: max, } + go pt.pollPieceTasks(context.Background()) + return pt } -func (p *ParkPieceTask) PullPiece(ctx context.Context, pieceCID cid.Cid, rawSize int64, paddedSize abi.PaddedPieceSize, dataUrl string, headers http.Header) error { - p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { - var pieceID int - err = tx.QueryRow(`INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) VALUES ($1, $2, $3) RETURNING id`, pieceCID.String(), int64(paddedSize), rawSize).Scan(&pieceID) - if err != nil { - return false, xerrors.Errorf("inserting parked piece: %w", err) +func (p *ParkPieceTask) pollPieceTasks(ctx context.Context) { + for { + // select parked pieces with no task_id + var pieceIDs []struct { + ID storiface.PieceNumber `db:"id"` } - var refID int - err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id) VALUES ($1) RETURNING ref_id`, pieceID).Scan(&refID) + err := p.db.Select(ctx, &pieceIDs, `SELECT id FROM parked_pieces WHERE complete = FALSE AND task_id IS NULL`) if err != nil { - return false, xerrors.Errorf("inserting parked piece ref: %w", err) + log.Errorf("failed to get parked pieces: %s", err) + time.Sleep(PieceParkPollInterval) + continue } - headersJson, err := json.Marshal(headers) - if err != nil { - return false, xerrors.Errorf("marshaling headers: %w", err) + if len(pieceIDs) == 0 { + time.Sleep(PieceParkPollInterval) + continue } - _, err = tx.Exec(`INSERT INTO park_piece_tasks (task_id, piece_ref_id, data_url, data_headers, data_raw_size, data_delete_on_finalize) - VALUES ($1, $2, $3, $4, $5, $6)`, id, refID, dataUrl, headersJson, rawSize, false) - if err != nil { - return false, xerrors.Errorf("inserting park piece task: %w", err) + for _, pieceID := range pieceIDs { + // create a task for each piece + p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + // update + n, err := tx.Exec(`UPDATE parked_pieces SET task_id = $1 WHERE id = $2 AND complete = FALSE AND task_id IS NULL`, id, pieceID.ID) + if err != nil { + return false, xerrors.Errorf("updating parked piece: %w", err) + } + + // commit only if we updated the piece + return n > 0, nil + }) } - - return true, nil - }) - - return nil + } } func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() - var taskData []struct { - PieceID int `db:"id"` - PieceCreatedAt time.Time `db:"created_at"` - PieceCID string `db:"piece_cid"` - Complete bool `db:"complete"` - - DataURL string `db:"data_url"` - DataHeaders string `db:"data_headers"` - DataRawSize int64 `db:"data_raw_size"` - DataDeleteOnFinalize bool `db:"data_delete_on_finalize"` + // Define a struct to hold piece data. + var piecesData []struct { + PieceID int64 `db:"id"` + PieceCreatedAt time.Time `db:"created_at"` + PieceCID string `db:"piece_cid"` + Complete bool `db:"complete"` + PiecePaddedSize int64 `db:"piece_padded_size"` + PieceRawSize string `db:"piece_raw_size"` } - err = p.db.Select(ctx, &taskData, ` - select - pp.id, - pp.created_at, - pp.piece_cid, - pp.complete, - ppt.data_url, - ppt.data_headers, - ppt.data_raw_size, - ppt.data_delete_on_finalize - from park_piece_tasks ppt - join parked_piece_refs ppr on ppt.piece_ref_id = ppr.ref_id - join parked_pieces pp on ppr.piece_id = pp.id - where ppt.task_id = $1 - `, taskID) + // Select the piece data using the task ID. + err = p.db.Select(ctx, &piecesData, ` + SELECT id, created_at, piece_cid, complete, piece_padded_size, piece_raw_size + FROM parked_pieces + WHERE task_id = $1 + `, taskID) if err != nil { - return false, err + return false, xerrors.Errorf("fetching piece data: %w", err) } - if len(taskData) != 1 { - return false, xerrors.Errorf("expected 1 task, got %d", len(taskData)) + if len(piecesData) == 0 { + return false, xerrors.Errorf("no piece data found for task_id: %d", taskID) } - if taskData[0].Complete { - log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", taskData[0].PieceCID) + pieceData := piecesData[0] + + if pieceData.Complete { + log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", pieceData.PieceCID) return true, nil } - upr := &lpseal.UrlPieceReader{ - Url: taskData[0].DataURL, - RawSize: taskData[0].DataRawSize, - } - defer func() { - _ = upr.Close() - }() - - pnum := storiface.PieceNumber(taskData[0].PieceID) - - if err := p.sc.WritePiece(ctx, pnum, taskData[0].DataRawSize, upr); err != nil { - return false, xerrors.Errorf("write piece: %w", err) + // Define a struct for reference data. + var refData []struct { + DataURL string `db:"data_url"` + DataHeaders json.RawMessage `db:"data_headers"` } - _, err = p.db.Exec(ctx, `update parked_pieces set complete = true where id = $1`, taskData[0].PieceID) + // Now, select the first reference data that has a URL. + err = p.db.Select(ctx, &refData, ` + SELECT data_url, data_headers + FROM parked_piece_refs + WHERE piece_id = $1 AND data_url IS NOT NULL + LIMIT 1 + `, pieceData.PieceID) if err != nil { - return false, xerrors.Errorf("marking piece as complete: %w", err) + return false, xerrors.Errorf("fetching reference data: %w", err) } - return true, nil + if len(refData) == 0 { + return false, xerrors.Errorf("no refs found for piece_id: %d", pieceData.PieceID) + } + + // Convert piece_raw_size from string to int64. + pieceRawSize, err := strconv.ParseInt(pieceData.PieceRawSize, 10, 64) + if err != nil { + return false, xerrors.Errorf("parsing piece raw size: %w", err) + } + + if refData[0].DataURL != "" { + upr := &lpseal.UrlPieceReader{ + Url: refData[0].DataURL, + RawSize: pieceRawSize, + } + defer func() { + _ = upr.Close() + }() + + pnum := storiface.PieceNumber(pieceData.PieceID) + + if err := p.sc.WritePiece(ctx, pnum, pieceRawSize, upr); err != nil { + return false, xerrors.Errorf("write piece: %w", err) + } + + // Update the piece as complete after a successful write. + _, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE WHERE id = $1`, pieceData.PieceID) + if err != nil { + return false, xerrors.Errorf("marking piece as complete: %w", err) + } + + return true, nil + } + + // If no URL is found, this indicates an issue since at least one URL is expected. + return false, xerrors.Errorf("no data URL found for piece_id: %d", pieceData.PieceID) } func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { From 6b3038d51a2109cae0246b74dd3a7bc6747f7d03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 29 Feb 2024 23:41:55 +0100 Subject: [PATCH 04/15] fix lint --- cmd/lotus-shed/lpdeal.go | 4 ++-- provider/lppiece/task_park_piece.go | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go index d5c718f66..0a585bf66 100644 --- a/cmd/lotus-shed/lpdeal.go +++ b/cmd/lotus-shed/lpdeal.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/google/uuid" "io" "net" "net/http" @@ -16,6 +14,7 @@ import ( "time" "github.com/fatih/color" + "github.com/google/uuid" "github.com/mitchellh/go-homedir" manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" @@ -36,6 +35,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/must" "github.com/filecoin-project/lotus/lib/nullreader" "github.com/filecoin-project/lotus/metrics/proxy" diff --git a/provider/lppiece/task_park_piece.go b/provider/lppiece/task_park_piece.go index 7fb5a0ac3..034ef9b2c 100644 --- a/provider/lppiece/task_park_piece.go +++ b/provider/lppiece/task_park_piece.go @@ -63,6 +63,8 @@ func (p *ParkPieceTask) pollPieceTasks(ctx context.Context) { } for _, pieceID := range pieceIDs { + pieceID := pieceID + // create a task for each piece p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { // update From b90cf19604df174cbf0da1e7263f19fd803f020b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 1 Mar 2024 00:07:51 +0100 Subject: [PATCH 05/15] lpseal: PiecePark in SDRTrees --- provider/lpffi/piece_funcs.go | 4 +++ provider/lpffi/sdr_funcs.go | 4 +-- provider/lpseal/task_trees.go | 58 ++++++++++++++++++++++++++++++++--- storage/paths/remote.go | 40 ++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 6 deletions(-) diff --git a/provider/lpffi/piece_funcs.go b/provider/lpffi/piece_funcs.go index 2ee75151b..986d0fe4d 100644 --- a/provider/lpffi/piece_funcs.go +++ b/provider/lpffi/piece_funcs.go @@ -65,3 +65,7 @@ func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumb removeTemp = false return nil } + +func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) { + return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece) +} diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index 05a7bbd80..77019e4d2 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -42,7 +42,7 @@ type SealCalls struct { externCalls ExternalSealer*/ } -func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls { +func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls { return &SealCalls{ sectors: &storageProvider{ storage: st, @@ -54,7 +54,7 @@ func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCa } type storageProvider struct { - storage paths.Store + storage *paths.Remote localStore *paths.Local sindex paths.SectorIndex storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation] diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index a632c5e87..31775d290 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -4,6 +4,8 @@ import ( "context" "io" "net/http" + "net/url" + "strconv" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -88,6 +90,15 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done var dataReader io.Reader var unpaddedData bool + var closers []io.Closer + defer func() { + for _, c := range closers { + if err := c.Close(); err != nil { + log.Errorw("error closing piece reader", "error", err) + } + } + }() + if len(pieces) > 0 { pieceInfos := make([]abi.PieceInfo, len(pieces)) pieceReaders := make([]io.Reader, len(pieces)) @@ -106,10 +117,49 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // make pieceReader if p.DataUrl != nil { - pieceReaders[i], _ = padreader.New(&UrlPieceReader{ - Url: *p.DataUrl, - RawSize: *p.DataRawSize, - }, uint64(*p.DataRawSize)) + dataUrl := *p.DataUrl + + goUrl, err := url.Parse(dataUrl) + if err != nil { + return false, xerrors.Errorf("parsing data URL: %w", err) + } + + if goUrl.Scheme == "pieceref" { + // url is to a piece reference + + refNum, err := strconv.ParseInt(goUrl.Opaque, 10, 64) + if err != nil { + return false, xerrors.Errorf("parsing piece reference number: %w", err) + } + + // get pieceID + var pieceID []struct { + PieceID storiface.PieceNumber `db:"piece_id"` + } + err = t.db.Select(ctx, &pieceID, `SELECT piece_id FROM parked_piece_refs WHERE ref_id = $1`, refNum) + if err != nil { + return false, xerrors.Errorf("getting pieceID: %w", err) + } + + if len(pieceID) != 1 { + return false, xerrors.Errorf("expected 1 pieceID, got %d", len(pieceID)) + } + + pr, err := t.sc.PieceReader(ctx, pieceID[0].PieceID) + if err != nil { + return false, xerrors.Errorf("getting piece reader: %w", err) + } + + closers = append(closers, pr) + + pieceReaders[i], _ = padreader.New(pr, uint64(*p.DataRawSize)) + } else { + pieceReaders[i], _ = padreader.New(&UrlPieceReader{ + Url: dataUrl, + RawSize: *p.DataRawSize, + }, uint64(*p.DataRawSize)) + } + } else { // padding piece (w/o fr32 padding, added in TreeD) pieceReaders[i] = nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded()) } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index 882c98c4a..dfceaeace 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -747,6 +747,46 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size return nil, nil } +func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storiface.SectorFileType) (io.ReadCloser, error) { + paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) + if err != nil { + return nil, xerrors.Errorf("acquire local: %w", err) + } + + path := storiface.PathByType(paths, ft) + if path != "" { + return os.Open(path) + } + + si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false) + if err != nil { + log.Debugf("Reader, did not find file on any of the workers %s (%s)", path, ft.String()) + return nil, err + } + + if len(si) == 0 { + return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) + } + + sort.Slice(si, func(i, j int) bool { + return si[i].Weight > si[j].Weight + }) + + for _, info := range si { + for _, url := range info.URLs { + rd, err := r.readRemote(ctx, url, 0, 0) + if err != nil { + log.Warnw("reading from remote", "url", url, "error", err) + continue + } + + return rd, err + } + } + + return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound) +} + func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) { log.Warnf("reserve called on remote store, sectorID: %v", sid.ID) return func() { From 10453cd5afa515117c39cee087849e4522ec4151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 1 Mar 2024 00:23:30 +0100 Subject: [PATCH 06/15] lpseal: Rm piecepark refs in finalize --- provider/lpseal/finalize_pieces.go | 51 ++++++++++++++++++++++++++++++ provider/lpseal/task_finalize.go | 4 +++ 2 files changed, 55 insertions(+) create mode 100644 provider/lpseal/finalize_pieces.go diff --git a/provider/lpseal/finalize_pieces.go b/provider/lpseal/finalize_pieces.go new file mode 100644 index 000000000..8d52bf331 --- /dev/null +++ b/provider/lpseal/finalize_pieces.go @@ -0,0 +1,51 @@ +package lpseal + +import ( + "context" + "net/url" + "strconv" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" +) + +func DropSectorPieceRefs(ctx context.Context, db *harmonydb.DB, sid abi.SectorID) error { + //_, err := db.Exec(ctx, `SELECT FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number) + + var PieceURL []struct { + URL string `db:"data_url"` + } + + err := db.Select(ctx, &PieceURL, `SELECT data_url FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number) + if err != nil { + return xerrors.Errorf("getting piece url: %w", err) + } + + for _, pu := range PieceURL { + gourl, err := url.Parse(pu.URL) + if err != nil { + log.Errorw("failed to parse piece url", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + continue + } + + if gourl.Scheme == "pieceref" { + refID, err := strconv.ParseInt(gourl.Opaque, 10, 64) + if err != nil { + log.Errorw("failed to parse piece ref id", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + continue + } + + n, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = $1`, refID) + if err != nil { + log.Errorw("failed to delete piece ref", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + } + + log.Debugw("deleted piece ref", "url", pu.URL, "miner", sid.Miner, "sector", sid.Number, "rows", n) + } + } + + return err +} diff --git a/provider/lpseal/task_finalize.go b/provider/lpseal/task_finalize.go index 8d425f76a..6246af733 100644 --- a/provider/lpseal/task_finalize.go +++ b/provider/lpseal/task_finalize.go @@ -69,6 +69,10 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do return false, xerrors.Errorf("finalizing sector: %w", err) } + if err := DropSectorPieceRefs(ctx, f.db, sector.ID); err != nil { + return false, xerrors.Errorf("dropping sector piece refs: %w", err) + } + // set after_finalize _, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID) if err != nil { From e93a3e0d4a6c0f25c3494ac5976521a9464afb35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 11 Mar 2024 21:35:16 +0100 Subject: [PATCH 07/15] lppiece: GC task --- cmd/lotus-provider/tasks/tasks.go | 3 +- .../harmonydb/sql/20240228-piece-park.sql | 2 + provider/lpffi/piece_funcs.go | 4 + provider/lppiece/task_cleanup_piece.go | 130 ++++++++++++++++++ 4 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 provider/lppiece/task_cleanup_piece.go diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index 71e794545..309cebd01 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -75,7 +75,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task // Piece handling if cfg.Subsystems.EnableParkPiece { parkPieceTask := lppiece.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks) - activeTasks = append(activeTasks, parkPieceTask) + cleanupPieceTask := lppiece.NewCleanupPieceTask(db, must.One(slrLazy.Val()), 0) + activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask) } } diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index ebba68fae..b07fd3973 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -9,6 +9,8 @@ create table parked_pieces ( complete boolean not null default false, task_id bigint default null, + cleanup_task_id bigint default null, + foreign key (task_id) references harmony_task (id) on delete set null ); diff --git a/provider/lpffi/piece_funcs.go b/provider/lpffi/piece_funcs.go index 986d0fe4d..9ffc0b3d1 100644 --- a/provider/lpffi/piece_funcs.go +++ b/provider/lpffi/piece_funcs.go @@ -69,3 +69,7 @@ func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumb func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) { return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece) } + +func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error { + return sb.sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil) +} diff --git a/provider/lppiece/task_cleanup_piece.go b/provider/lppiece/task_cleanup_piece.go new file mode 100644 index 000000000..d7f24037a --- /dev/null +++ b/provider/lppiece/task_cleanup_piece.go @@ -0,0 +1,130 @@ +package lppiece + +import ( + "context" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/provider/lpffi" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +type CleanupPieceTask struct { + max int + db *harmonydb.DB + sc *lpffi.SealCalls + + TF promise.Promise[harmonytask.AddTaskFunc] +} + +func NewCleanupPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *CleanupPieceTask { + pt := &CleanupPieceTask{ + db: db, + sc: sc, + + max: max, + } + go pt.pollCleanupTasks(context.Background()) + return pt +} + +func (c *CleanupPieceTask) pollCleanupTasks(ctx context.Context) { + for { + // select pieces with no refs and null cleanup_task_id + var pieceIDs []struct { + ID storiface.PieceNumber `db:"id"` + } + + err := c.db.Select(ctx, &pieceIDs, `SELECT id FROM parked_pieces WHERE cleanup_task_id IS NULL AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = parked_pieces.id) = 0`) + if err != nil { + log.Errorf("failed to get parked pieces: %s", err) + time.Sleep(PieceParkPollInterval) + continue + } + + if len(pieceIDs) == 0 { + time.Sleep(PieceParkPollInterval) + continue + } + + for _, pieceID := range pieceIDs { + pieceID := pieceID + + // create a task for each piece + c.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + // update + n, err := tx.Exec(`UPDATE parked_pieces SET cleanup_task_id = $1 WHERE id = $2 AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = parked_pieces.id) = 0`, id, pieceID.ID) + if err != nil { + return false, xerrors.Errorf("updating parked piece: %w", err) + } + + // commit only if we updated the piece + return n > 0, nil + }) + } + } +} + +func (c *CleanupPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + // select by cleanup_task_id + var pieceID int64 + + err = c.db.QueryRow(ctx, "SELECT piece_id FROM parked_pieces WHERE cleanup_task_id = $1", taskID).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("query parked_piece: %w", err) + } + + // delete from parked_pieces where id = $1 where ref count = 0 + // note: we delete from the db first because that guarantees that the piece is no longer in use + // if storage delete fails, it will be retried later is other cleanup tasks + n, err := c.db.Exec(ctx, "DELETE FROM parked_pieces WHERE id = $1 AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = $1) = 0", pieceID) + if err != nil { + return false, xerrors.Errorf("delete parked_piece: %w", err) + } + + if n == 0 { + return true, nil + } + + // remove from storage + err = c.sc.RemovePiece(ctx, storiface.PieceNumber(pieceID)) + if err != nil { + log.Errorw("remove piece", "piece_id", pieceID, "error", err) + } + + return true, nil +} + +func (c *CleanupPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + // the remove call runs on paths.Remote storage, so it doesn't really matter where it runs + + id := ids[0] + return &id, nil +} + +func (c *CleanupPieceTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: c.max, + Name: "DropPiece", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 0, + Ram: 64 << 20, + Storage: nil, + }, + MaxFailures: 10, + } +} + +func (c *CleanupPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) { + c.TF.Set(taskFunc) +} + +var _ harmonytask.TaskInterface = &CleanupPieceTask{} From 27317a94896f93a6bec14e2af3c23c0bcb2c1971 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Mar 2024 12:24:00 +0100 Subject: [PATCH 08/15] fix 20240228-piece-park.sql --- lib/harmony/harmonydb/sql/20240228-piece-park.sql | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index b07fd3973..8b6719dc5 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -2,7 +2,7 @@ create table parked_pieces ( id bigserial primary key, created_at timestamp default current_timestamp, - piece_cid text not null unique constraint parked_pieces_piece_cid_key, + piece_cid text not null, piece_padded_size bigint not null, piece_raw_size text not null, @@ -11,7 +11,8 @@ create table parked_pieces ( cleanup_task_id bigint default null, - foreign key (task_id) references harmony_task (id) on delete set null + foreign key (task_id) references harmony_task (id) on delete set null, + unique (piece_cid) ); /* From 465ec58a7c7a705321b331ace7947731dd96ca07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Mar 2024 13:03:14 +0100 Subject: [PATCH 09/15] lpdeal: Fix adapter deadlock with duplicate pieces --- cmd/lotus-shed/lpdeal.go | 56 +++++++++++++++---- .../harmonydb/sql/20240228-piece-park.sql | 2 +- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go index 0a585bf66..b6237e13f 100644 --- a/cmd/lotus-shed/lpdeal.go +++ b/cmd/lotus-shed/lpdeal.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "database/sql" "fmt" "io" "net" @@ -452,6 +453,8 @@ var lpBoostProxyCmd = &cli.Command{ pieceUUID := uuid.New() + color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID) + pieceInfoLk.Lock() pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi) pieceInfoLk.Unlock() @@ -464,23 +467,36 @@ var lpBoostProxyCmd = &cli.Command{ // add piece entry var refID int64 + var pieceWasCreated bool comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - // Add parked_piece, on conflict do nothing var pieceID int64 - err = tx.QueryRow(` - INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) - VALUES ($1, $2, $3) - ON CONFLICT (piece_cid) DO UPDATE - SET piece_cid = EXCLUDED.piece_cid - RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) + // Attempt to select the piece ID first + err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) + if err != nil { - return false, xerrors.Errorf("upserting parked piece and getting id: %w", err) + if err == sql.ErrNoRows { + // Piece does not exist, attempt to insert + err = tx.QueryRow(` + INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) + VALUES ($1, $2, $3) + ON CONFLICT (piece_cid) DO NOTHING + RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) + } + pieceWasCreated = true // New piece was created + } else { + // Some other error occurred during select + return false, xerrors.Errorf("checking existing parked piece: %w", err) + } + } else { + pieceWasCreated = false // Piece already exists, no new piece was created } // Add parked_piece_ref err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) - VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) + VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) if err != nil { return false, xerrors.Errorf("inserting parked piece ref: %w", err) } @@ -496,8 +512,28 @@ var lpBoostProxyCmd = &cli.Command{ } // wait for piece to be parked + if pieceWasCreated { + <-pi.done + } else { + // If the piece was not created, we need to close the done channel + close(pi.done) - <-pi.done + go func() { + // close the data reader (drain to eof if it's not a closer) + if closer, ok := pieceData.(io.Closer); ok { + if err := closer.Close(); err != nil { + log.Warnw("closing pieceData in DataCid", "error", err) + } + } else { + log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData)) + + _, err := io.Copy(io.Discard, pieceData) + if err != nil { + log.Warnw("draining pieceData in DataCid", "error", err) + } + } + }() + } pieceIDUrl := url.URL{ Scheme: "pieceref", diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index 8b6719dc5..95792628a 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -4,7 +4,7 @@ create table parked_pieces ( piece_cid text not null, piece_padded_size bigint not null, - piece_raw_size text not null, + piece_raw_size bigint not null, complete boolean not null default false, task_id bigint default null, From b123e700eaed4045e05a55b8673b3165474470cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Mar 2024 13:14:00 +0100 Subject: [PATCH 10/15] fix curio AcquireSector with reservations --- provider/lpffi/sdr_funcs.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index 77019e4d2..6ebcd1faf 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -69,7 +69,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask if taskID != nil { resv, ok = l.storageReservations.Load(*taskID) } - if ok { + if ok && resv != nil { if resv.Alloc != allocate || resv.Existing != existing { // this should never happen, only when task definition is wrong return storiface.SectorPaths{}, nil, xerrors.Errorf("storage reservation type mismatch") @@ -78,6 +78,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate) paths = resv.Paths + storageIDs = resv.PathIDs releaseStorage = resv.Release } else { var err error From 6b361443ba8c0d20d4045c47d07e05b62d4b5ed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Mar 2024 14:29:33 +0100 Subject: [PATCH 11/15] sdrtrees: Fix min expiration math --- provider/lpseal/task_submit_precommit.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/provider/lpseal/task_submit_precommit.go b/provider/lpseal/task_submit_precommit.go index 9f6233f39..f7229a125 100644 --- a/provider/lpseal/task_submit_precommit.go +++ b/provider/lpseal/task_submit_precommit.go @@ -9,12 +9,15 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + actorstypes "github.com/filecoin-project/go-state-types/actors" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" miner12 "github.com/filecoin-project/go-state-types/builtin/v12/miner" + "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" @@ -27,6 +30,7 @@ import ( type SubmitPrecommitTaskApi interface { StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) ctladdr.NodeApi } @@ -136,6 +140,23 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo } } + nv, err := s.api.StateNetworkVersion(ctx, types.EmptyTSK) + if err != nil { + return false, xerrors.Errorf("getting network version: %w", err) + } + av, err := actorstypes.VersionForNetwork(nv) + if err != nil { + return false, xerrors.Errorf("failed to get actors version: %w", err) + } + msd, err := policy.GetMaxProveCommitDuration(av, sectorParams.RegSealProof) + if err != nil { + return false, xerrors.Errorf("failed to get max prove commit duration: %w", err) + } + + if minExpiration := sectorParams.TicketEpoch + policy.MaxPreCommitRandomnessLookback + msd + miner.MinSectorExpiration; params.Sectors[0].Expiration < minExpiration { + params.Sectors[0].Expiration = minExpiration + } + var pbuf bytes.Buffer if err := params.MarshalCBOR(&pbuf); err != nil { return false, xerrors.Errorf("serializing params: %w", err) From ad37cf5ead5a7269d61e6c0dfa152474139b0dff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Mar 2024 14:37:27 +0100 Subject: [PATCH 12/15] harmony: Dev env var to override gpu count --- lib/harmony/resources/getGPU.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/harmony/resources/getGPU.go b/lib/harmony/resources/getGPU.go index 3489e7491..62d5c091e 100644 --- a/lib/harmony/resources/getGPU.go +++ b/lib/harmony/resources/getGPU.go @@ -4,12 +4,23 @@ package resources import ( + "os" + "strconv" "strings" ffi "github.com/filecoin-project/filecoin-ffi" ) func getGPUDevices() float64 { // GPU boolean + if nstr := os.Getenv("HARMONY_OVERRIDE_GPUS"); nstr != "" { + n, err := strconv.ParseFloat(nstr, 64) + if err != nil { + logger.Errorf("parsing HARMONY_OVERRIDE_GPUS failed: %+v", err) + } else { + return n + } + } + gpus, err := ffi.GetGPUDevices() logger.Infow("GPUs", "list", gpus) if err != nil { From 5dfec4ab36f0e57d695a2e99104e8dd368d0456e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Mar 2024 14:44:58 +0100 Subject: [PATCH 13/15] lppiece: Fix piece cleanup task --- cmd/lotus-shed/lpdeal.go | 4 ++-- lib/harmony/harmonydb/sql/20240228-piece-park.sql | 1 + provider/lppiece/task_cleanup_piece.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go index b6237e13f..0819ddc13 100644 --- a/cmd/lotus-shed/lpdeal.go +++ b/cmd/lotus-shed/lpdeal.go @@ -3,7 +3,6 @@ package main import ( "bytes" "context" - "database/sql" "fmt" "io" "net" @@ -16,6 +15,7 @@ import ( "github.com/fatih/color" "github.com/google/uuid" + "github.com/jackc/pgx/v5" "github.com/mitchellh/go-homedir" manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" @@ -475,7 +475,7 @@ var lpBoostProxyCmd = &cli.Command{ err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) if err != nil { - if err == sql.ErrNoRows { + if err == pgx.ErrNoRows { // Piece does not exist, attempt to insert err = tx.QueryRow(` INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index 95792628a..9ee6b447f 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -12,6 +12,7 @@ create table parked_pieces ( cleanup_task_id bigint default null, foreign key (task_id) references harmony_task (id) on delete set null, + foreign key (cleanup_task_id) references harmony_task (id) on delete set null, unique (piece_cid) ); diff --git a/provider/lppiece/task_cleanup_piece.go b/provider/lppiece/task_cleanup_piece.go index d7f24037a..eeb4a44f0 100644 --- a/provider/lppiece/task_cleanup_piece.go +++ b/provider/lppiece/task_cleanup_piece.go @@ -76,7 +76,7 @@ func (c *CleanupPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) // select by cleanup_task_id var pieceID int64 - err = c.db.QueryRow(ctx, "SELECT piece_id FROM parked_pieces WHERE cleanup_task_id = $1", taskID).Scan(&pieceID) + err = c.db.QueryRow(ctx, "SELECT id FROM parked_pieces WHERE cleanup_task_id = $1", taskID).Scan(&pieceID) if err != nil { return false, xerrors.Errorf("query parked_piece: %w", err) } From 6ca55d18a71de7097fc2ffe835cb29c185dae5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 15 Mar 2024 14:10:48 +0100 Subject: [PATCH 14/15] address review --- provider/lppiece/task_park_piece.go | 27 ++++++++++++++++++++++++++- provider/lpseal/task_sdr.go | 2 -- storage/paths/remote.go | 2 ++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/provider/lppiece/task_park_piece.go b/provider/lppiece/task_park_piece.go index 034ef9b2c..62bb91660 100644 --- a/provider/lppiece/task_park_piece.go +++ b/provider/lppiece/task_park_piece.go @@ -175,6 +175,8 @@ func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask. } func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { + const maxSizePiece = 64 << 30 + return harmonytask.TaskTypeDetails{ Max: p.max, Name: "ParkPiece", @@ -182,12 +184,35 @@ func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 64 << 20, - Storage: nil, // TODO + Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing), }, MaxFailures: 10, } } +func (p *ParkPieceTask) taskToRef(id harmonytask.TaskID) (lpffi.SectorRef, error) { + var pieceIDs []struct { + ID storiface.PieceNumber `db:"id"` + } + + err := p.db.Select(context.Background(), &pieceIDs, `SELECT id FROM parked_pieces WHERE task_id = $1`, id) + if err != nil { + return lpffi.SectorRef{}, xerrors.Errorf("getting piece id: %w", err) + } + + if len(pieceIDs) != 1 { + return lpffi.SectorRef{}, xerrors.Errorf("expected 1 piece id, got %d", len(pieceIDs)) + } + + pref := pieceIDs[0].ID.Ref() + + return lpffi.SectorRef{ + SpID: int64(pref.ID.Miner), + SectorNumber: int64(pref.ID.Number), + RegSealProof: pref.ProofType, + }, nil +} + func (p *ParkPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) { p.TF.Set(taskFunc) } diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index 694ff6f46..ab86c6a82 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -187,8 +187,6 @@ func (s *SDRTask) getTicket(ctx context.Context, maddr address.Address) (abi.Sea } func (s *SDRTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - // todo check storage (reserve too?) - id := ids[0] return &id, nil } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index dfceaeace..9ff719954 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -747,6 +747,8 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size return nil, nil } +// ReaderSeq creates a simple sequential reader for a file. Does not work for +// file types which are a directory (e.g. FTCache). func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storiface.SectorFileType) (io.ReadCloser, error) { paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove) if err != nil { From 154cf09f523ee0a41e6b4ca5a808cb7dc5d8eb13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 17 Mar 2024 17:40:56 +0100 Subject: [PATCH 15/15] make gen --- curiosrc/piece/task_cleanup_piece.go | 2 +- node/config/doc_gen.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/curiosrc/piece/task_cleanup_piece.go b/curiosrc/piece/task_cleanup_piece.go index 43c5d6f0c..ed22ccb46 100644 --- a/curiosrc/piece/task_cleanup_piece.go +++ b/curiosrc/piece/task_cleanup_piece.go @@ -2,11 +2,11 @@ package piece import ( "context" - "github.com/filecoin-project/lotus/curiosrc/ffi" "time" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/curiosrc/ffi" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 092af75c7..dcb832976 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -330,6 +330,22 @@ documentation.`, Comment: ``, }, + { + Name: "EnableParkPiece", + Type: "bool", + + Comment: `EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching +pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is +only applicable when integrating with boost, and should be enabled on nodes which will hold deal data +from boost until sectors containing the related pieces have the TreeD/TreeR constructed. +Note that future Curio implementations will have a separate task type for fetching pieces from the internet.`, + }, + { + Name: "ParkPieceMaxTasks", + Type: "int", + + Comment: ``, + }, { Name: "EnableSealSDR", Type: "bool",