diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index 6ecd680d0..11c12216b 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -3,6 +3,9 @@ package tasks import ( "context" + "github.com/filecoin-project/lotus/lib/lazy" + "github.com/filecoin-project/lotus/lib/must" + "github.com/filecoin-project/lotus/provider/lppiece" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" @@ -64,6 +67,18 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } } + slrLazy := lazy.MakeLazy(func() (*lpffi.SealCalls, error) { + return lpffi.NewSealCalls(stor, lstor, si), nil + }) + + { + // Piece handling + if cfg.Subsystems.EnableParkPiece { + parkPieceTask := lppiece.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks) + activeTasks = append(activeTasks, parkPieceTask) + } + } + hasAnySealingTask := cfg.Subsystems.EnableSealSDR || cfg.Subsystems.EnableSealSDRTrees || cfg.Subsystems.EnableSendPrecommitMsg || @@ -79,7 +94,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task sp = lpseal.NewPoller(db, full) go sp.RunPoller(ctx) - slr = lpffi.NewSealCalls(stor, lstor, si) + slr = must.One(slrLazy.Val()) } // NOTE: Tasks with the LEAST priority are at the top diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql new file mode 100644 index 000000000..c58ecdafe --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -0,0 +1,30 @@ +create table parked_pieces ( + id serial primary key, + created_at timestamp default current_timestamp, + + piece_cid text not null, + piece_padded_size bigint not null, + piece_raw_size text not null, + + complete boolean not null default false +); + +create table parked_piece_refs ( + ref_id serial primary key, + piece_id int not null, + + foreign key (piece_id) references parked_pieces(id) on delete cascade +); + +create table park_piece_tasks ( + task_id bigint not null + constraint park_piece_tasks_pk + primary key, + + piece_ref_id int not null, + + data_url text not null, + data_headers jsonb not null default '{}', + data_raw_size bigint not null, + data_delete_on_finalize bool not null +); diff --git a/node/config/types.go b/node/config/types.go index 789d24103..d770512c5 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -112,6 +112,14 @@ type ProviderSubsystemsConfig struct { EnableWinningPost bool WinningPostMaxTasks int + // EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching + // pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is + // only applicable when integrating with boost, and should be enabled on nodes which will hold deal data + // from boost until sectors containing the related pieces have the TreeD/TreeR constructed. + // Note that future Curio implementations will have a separate task type for fetching pieces from the internet. + EnableParkPiece bool + ParkPieceMaxTasks int + // EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation // creating 11 layer files in sector cache directory. // diff --git a/provider/lpffi/piece_funcs.go b/provider/lpffi/piece_funcs.go new file mode 100644 index 000000000..4e2816786 --- /dev/null +++ b/provider/lpffi/piece_funcs.go @@ -0,0 +1,65 @@ +package lpffi + +import ( + "context" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "golang.org/x/xerrors" + "io" + "os" + "time" +) + +func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumber, size int64, data io.Reader) error { + // todo: config(?): allow setting PathStorage for this + // todo storage reservations + paths, done, err := sb.sectors.AcquireSector(ctx, nil, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing) + if err != nil { + return err + } + defer done() + + dest := paths.Piece + tempDest := dest + ".tmp" + + destFile, err := os.OpenFile(tempDest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return xerrors.Errorf("creating temp piece file '%s': %w", tempDest, err) + } + + removeTemp := true + defer func() { + if removeTemp { + rerr := os.Remove(tempDest) + if rerr != nil { + log.Errorf("removing temp file: %+v", rerr) + } + } + }() + + copyStart := time.Now() + + n, err := io.CopyBuffer(destFile, io.LimitReader(data, size), make([]byte, 8<<20)) + if err != nil { + _ = destFile.Close() + return xerrors.Errorf("copying piece data: %w", err) + } + + if err := destFile.Close(); err != nil { + return xerrors.Errorf("closing temp piece file: %w", err) + } + + if n != size { + return xerrors.Errorf("short write: %d", n) + } + + copyEnd := time.Now() + + log.Infow("wrote parked piece", "piece", pieceID, "size", size, "duration", copyEnd.Sub(copyStart), "dest", dest, "MiB/s", float64(size)/(1<<20)/copyEnd.Sub(copyStart).Seconds()) + + if err := os.Rename(tempDest, dest); err != nil { + return xerrors.Errorf("rename temp piece to dest %s -> %s: %w", tempDest, dest, err) + } + + removeTemp = false + return nil +} diff --git a/provider/lppiece/task_park_piece.go b/provider/lppiece/task_park_piece.go new file mode 100644 index 000000000..370775803 --- /dev/null +++ b/provider/lppiece/task_park_piece.go @@ -0,0 +1,162 @@ +package lppiece + +import ( + "context" + "encoding/json" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/provider/lpffi" + "github.com/filecoin-project/lotus/provider/lpseal" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + "net/http" + "time" +) + +var log = logging.Logger("lppiece") + +// ParkPieceTask gets a piece from some origin, and parks it in storage +// Pieces are always f00, piece ID is mapped to pieceCID in the DB +type ParkPieceTask struct { + db *harmonydb.DB + sc *lpffi.SealCalls + + TF promise.Promise[harmonytask.AddTaskFunc] + + max int +} + +func NewParkPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *ParkPieceTask { + return &ParkPieceTask{ + db: db, + sc: sc, + + max: max, + } +} + +func (p *ParkPieceTask) PullPiece(ctx context.Context, pieceCID cid.Cid, rawSize int64, paddedSize abi.PaddedPieceSize, dataUrl string, headers http.Header) error { + p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + var pieceID int + err = tx.QueryRow(`INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) VALUES ($1, $2, $3) RETURNING id`, pieceCID.String(), int64(paddedSize), rawSize).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece: %w", err) + } + + var refID int + err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id) VALUES ($1) RETURNING ref_id`, pieceID).Scan(&refID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece ref: %w", err) + } + + headersJson, err := json.Marshal(headers) + if err != nil { + return false, xerrors.Errorf("marshaling headers: %w", err) + } + + _, err = tx.Exec(`INSERT INTO park_piece_tasks (task_id, piece_ref_id, data_url, data_headers, data_raw_size, data_delete_on_finalize) + VALUES ($1, $2, $3, $4, $5, $6)`, id, refID, dataUrl, headersJson, rawSize, false) + if err != nil { + return false, xerrors.Errorf("inserting park piece task: %w", err) + } + + return true, nil + }) + + return nil +} + +func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + var taskData []struct { + PieceID int `db:"id"` + PieceCreatedAt time.Time `db:"created_at"` + PieceCID string `db:"piece_cid"` + Complete bool `db:"complete"` + + DataURL string `db:"data_url"` + DataHeaders string `db:"data_headers"` + DataRawSize int64 `db:"data_raw_size"` + DataDeleteOnFinalize bool `db:"data_delete_on_finalize"` + } + + err = p.db.Select(ctx, &taskData, ` + select + pp.id, + pp.created_at, + pp.piece_cid, + pp.complete, + ppt.data_url, + ppt.data_headers, + ppt.data_raw_size, + ppt.data_delete_on_finalize + from park_piece_tasks ppt + join parked_piece_refs ppr on ppt.piece_ref_id = ppr.ref_id + join parked_pieces pp on ppr.piece_id = pp.id + where ppt.task_id = $1 + `, taskID) + if err != nil { + return false, err + } + + if len(taskData) != 1 { + return false, xerrors.Errorf("expected 1 task, got %d", len(taskData)) + } + + if taskData[0].Complete { + log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", taskData[0].PieceCID) + return true, nil + } + + upr := &lpseal.UrlPieceReader{ + Url: taskData[0].DataURL, + RawSize: taskData[0].DataRawSize, + } + defer func() { + _ = upr.Close() + }() + + pnum := storiface.PieceNumber(taskData[0].PieceID) + + if err := p.sc.WritePiece(ctx, pnum, taskData[0].DataRawSize, upr); err != nil { + return false, xerrors.Errorf("write piece: %w", err) + } + + _, err = p.db.Exec(ctx, `update parked_pieces set complete = true where id = $1`, taskData[0].PieceID) + if err != nil { + return false, xerrors.Errorf("marking piece as complete: %w", err) + } + + return true, nil +} + +func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + id := ids[0] + return &id, nil +} + +func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: p.max, + Name: "ParkPiece", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 0, + Ram: 64 << 20, + Storage: nil, // TODO + }, + MaxFailures: 10, + } +} + +func (p *ParkPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) { + p.TF.Set(taskFunc) +} + +var _ harmonytask.TaskInterface = &ParkPieceTask{} diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index da0fcf1e9..a632c5e87 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -200,6 +200,7 @@ type UrlPieceReader struct { RawSize int64 // the exact number of bytes read, if we read more or less that's an error readSoFar int64 + closed bool active io.ReadCloser // auto-closed on EOF } @@ -239,6 +240,7 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) { // If EOF is reached, close the reader if err == io.EOF { cerr := u.active.Close() + u.closed = true if cerr != nil { log.Errorf("error closing http piece reader: %s", cerr) } @@ -253,4 +255,13 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) { return n, err } +func (u *UrlPieceReader) Close() error { + if !u.closed { + u.closed = true + return u.active.Close() + } + + return nil +} + var _ harmonytask.TaskInterface = &TreesTask{} diff --git a/storage/paths/http_handler.go b/storage/paths/http_handler.go index 57de578a6..92f4162db 100644 --- a/storage/paths/http_handler.go +++ b/storage/paths/http_handler.go @@ -9,12 +9,10 @@ import ( "strconv" "time" + "github.com/filecoin-project/go-state-types/abi" "github.com/gorilla/mux" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/storage/sealer/partialfile" "github.com/filecoin-project/lotus/storage/sealer/storiface" @@ -340,18 +338,5 @@ func (handler *FetchHandler) generatePoRepVanillaProof(w http.ResponseWriter, r } func FileTypeFromString(t string) (storiface.SectorFileType, error) { - switch t { - case storiface.FTUnsealed.String(): - return storiface.FTUnsealed, nil - case storiface.FTSealed.String(): - return storiface.FTSealed, nil - case storiface.FTCache.String(): - return storiface.FTCache, nil - case storiface.FTUpdate.String(): - return storiface.FTUpdate, nil - case storiface.FTUpdateCache.String(): - return storiface.FTUpdateCache, nil - default: - return 0, xerrors.Errorf("unknown sector file type: '%s'", t) - } + return storiface.TypeFromString(t) } diff --git a/storage/sealer/ffiwrapper/basicfs/fs.go b/storage/sealer/ffiwrapper/basicfs/fs.go index 4fd8e271f..47c7f526e 100644 --- a/storage/sealer/ffiwrapper/basicfs/fs.go +++ b/storage/sealer/ffiwrapper/basicfs/fs.go @@ -39,6 +39,9 @@ func (b *Provider) AcquireSector(ctx context.Context, id storiface.SectorRef, ex if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUpdateCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint return storiface.SectorPaths{}, nil, err } + if err := os.Mkdir(filepath.Join(b.Root, storiface.FTPiece.String()), 0755); err != nil && !os.IsExist(err) { // nolint + return storiface.SectorPaths{}, nil, err + } done := func() {} diff --git a/storage/sealer/storiface/filetype.go b/storage/sealer/storiface/filetype.go index ec3c5450c..887dda688 100644 --- a/storage/sealer/storiface/filetype.go +++ b/storage/sealer/storiface/filetype.go @@ -9,16 +9,22 @@ import ( ) const ( + // "regular" sectors FTUnsealed SectorFileType = 1 << iota FTSealed FTCache + + // snap FTUpdate FTUpdateCache + // Piece Park + FTPiece + FileTypes = iota ) -var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache} +var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache, FTPiece} const ( FTNone SectorFileType = 0 @@ -39,6 +45,7 @@ var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads FTUpdate: FSOverheadDen, FTUpdateCache: FSOverheadDen*2 + 1, FTCache: 141, // 11 layers + D(2x ssize) + C + R' + FTPiece: FSOverheadDen, } // sector size * disk / fs overhead. FSOverheadDen is like the unit of sector size @@ -49,6 +56,7 @@ var FsOverheadFinalized = map[SectorFileType]int{ FTUpdate: FSOverheadDen, FTUpdateCache: 1, FTCache: 1, + FTPiece: FSOverheadDen, } type SectorFileType int @@ -65,6 +73,8 @@ func TypeFromString(s string) (SectorFileType, error) { return FTUpdate, nil case "update-cache": return FTUpdateCache, nil + case "piece": + return FTPiece, nil default: return 0, xerrors.Errorf("unknown sector file type '%s'", s) } @@ -82,6 +92,8 @@ func (t SectorFileType) String() string { return "update" case FTUpdateCache: return "update-cache" + case FTPiece: + return "piece" default: return fmt.Sprintf("", t, (t & ((1 << FileTypes) - 1)).Strings()) } @@ -206,6 +218,7 @@ type SectorPaths struct { Cache string Update string UpdateCache string + Piece string } func ParseSectorID(baseName string) (abi.SectorID, error) { @@ -242,6 +255,8 @@ func PathByType(sps SectorPaths, fileType SectorFileType) string { return sps.Update case FTUpdateCache: return sps.UpdateCache + case FTPiece: + return sps.Piece } panic("requested unknown path type") @@ -259,5 +274,7 @@ func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) { sps.Update = p case FTUpdateCache: sps.UpdateCache = p + case FTPiece: + sps.Piece = p } } diff --git a/storage/sealer/storiface/storage.go b/storage/sealer/storiface/storage.go index fe4e1e208..75cc9399c 100644 --- a/storage/sealer/storiface/storage.go +++ b/storage/sealer/storiface/storage.go @@ -20,6 +20,17 @@ type SectorRef struct { var NoSectorRef = SectorRef{} +// PieceNumber is a reference to a piece in the storage system; mapping between +// pieces in the storage system and piece CIDs is maintained by the storage index +type PieceNumber uint64 + +func (pn PieceNumber) Ref() SectorRef { + return SectorRef{ + ID: abi.SectorID{Miner: 0, Number: abi.SectorNumber(pn)}, + ProofType: abi.RegisteredSealProof_StackedDrg64GiBV1, // This only cares about TreeD which is the same for all sizes + } +} + type ProverPoSt interface { GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, ppt abi.RegisteredPoStProof, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error)