From f74c3ccd2d2b5081be1ada6561388cd00b5b039b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 20 Dec 2023 14:45:19 +0100 Subject: [PATCH] lpseal: Implement SDR-Trees --- cmd/lotus-provider/tasks/tasks.go | 4 ++ lib/harmony/resources/getGPU.go | 1 + node/config/types.go | 10 +++- provider/lpffi/sdr_funcs.go | 95 ++++++++++++++++++++++++++++++- provider/lpseal/poller.go | 23 ++++---- provider/lpseal/task_trees.go | 32 ++++++++++- 6 files changed, 150 insertions(+), 15 deletions(-) diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index 418031157..b81814423 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -72,6 +72,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task sdrTask := lpseal.NewSDRTask(full, db, sp, slr, cfg.Subsystems.SealSDRMaxTasks) activeTasks = append(activeTasks, sdrTask) } + if cfg.Subsystems.EnableSealSDRTrees { + treesTask := lpseal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) + activeTasks = append(activeTasks, treesTask) + } } log.Infow("This lotus_provider instance handles", "miner_addresses", maddrs, diff --git a/lib/harmony/resources/getGPU.go b/lib/harmony/resources/getGPU.go index 9a73bcd0d..3489e7491 100644 --- a/lib/harmony/resources/getGPU.go +++ b/lib/harmony/resources/getGPU.go @@ -11,6 +11,7 @@ import ( func getGPUDevices() float64 { // GPU boolean gpus, err := ffi.GetGPUDevices() + logger.Infow("GPUs", "list", gpus) if err != nil { logger.Errorf("getting gpu devices failed: %+v", err) } diff --git a/node/config/types.go b/node/config/types.go index 65fa4da02..e91d1dcf5 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -103,9 +103,17 @@ type ProviderSubsystemsConfig struct { GuiAddress string // EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation - // creating layers. In lotus-miner this was run as part of PreCommit1. + // creating layers. + // In lotus-miner this was run as part of PreCommit1. EnableSealSDR bool SealSDRMaxTasks int + + // EnableSealSDRTrees enables the SDR pipeline tree-building task to run. + // This task handles encoding of unsealed data into last sdr layer and building + // of TreeR, TreeC and TreeD. + // In lotus-miner this was run as part of PreCommit2 (TreeD was run in PreCommit1). + EnableSealSDRTrees bool + SealSDRTreesMaxTasks int } type DAGStoreConfig struct { diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index 39b2dffde..09099af84 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -2,14 +2,19 @@ package lpffi import ( "context" + "encoding/json" + "fmt" ffi "github.com/filecoin-project/filecoin-ffi" commcid "github.com/filecoin-project/go-fil-commcid" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" + "io" + "os" ) var log = logging.Logger("lpffi") @@ -104,6 +109,92 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef return nil } -func (sb *SealCalls) TreeRC() { - ffi.SealPreCommitPhase2() +func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid) (cid.Cid, cid.Cid, error) { + p1o, err := sb.makePhase1Out(unsealed, sector.ProofType) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) + } + + paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) + } + defer releaseSector() + + { + // create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place + ssize, err := sector.ProofType.SectorSize() + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err) + } + + // paths.Sealed is a string filepath + f, err := os.Create(paths.Sealed) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("creating sealed sector file: %w", err) + } + if err := f.Truncate(int64(ssize)); err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("truncating sealed sector file: %w", err) + } + + if os.Getenv("SEAL_WRITE_UNSEALED") == "1" { + // expliticly write zeros to unsealed sector + _, err := io.CopyN(f, nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize)), int64(ssize)) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("writing zeros to sealed sector file: %w", err) + } + } + } + + return ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed) +} + +func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) ([]byte, error) { + commd, err := commcid.CIDToDataCommitmentV1(unsCid) + if err != nil { + return nil, xerrors.Errorf("make uns cid: %w", err) + } + + type Config struct { + ID string `json:"id"` + Path string `json:"path"` + RowsToDiscard int `json:"rows_to_discard"` + Size int `json:"size"` + } + + type Labels struct { + Labels []Config `json:"labels"` + } + + var phase1Output struct { + CommD [32]byte `json:"comm_d"` + Config Config `json:"config"` // TreeD + Labels map[string]*Labels `json:"labels"` + RegisteredProof string `json:"registered_proof"` + } + + copy(phase1Output.CommD[:], commd) + + phase1Output.Config.ID = "tree-d" + phase1Output.Config.Path = "/placeholder" + phase1Output.Labels = map[string]*Labels{} + + switch spt { + case abi.RegisteredSealProof_StackedDrg2KiBV1_1, abi.RegisteredSealProof_StackedDrg2KiBV1_1_Feat_SyntheticPoRep: + phase1Output.Config.RowsToDiscard = 0 + phase1Output.Config.Size = 127 + phase1Output.Labels["StackedDrg2KiBV1"].Labels = make([]Config, 2) + phase1Output.RegisteredProof = "StackedDrg2KiBV1_1" + + for i, l := range phase1Output.Labels["StackedDrg2KiBV1"].Labels { + l.ID = fmt.Sprintf("layer-%d", i+1) + l.Path = "/placeholder" + l.RowsToDiscard = 0 + l.Size = 64 + } + default: + panic("todo") + } + + return json.Marshal(phase1Output) } diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index a9f7b3ade..d3fae5990 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -109,7 +109,7 @@ func (s *SealPoller) poll(ctx context.Context) error { if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3`, id, task.SpID, task.SectorNumber) + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -120,16 +120,19 @@ func (s *SealPoller) poll(ctx context.Context) error { return true, nil }) } - if task.TaskTreeD == nil { - // todo start tree d task - } + if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR { + s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 + WHERE sp_id = $2 AND sector_number = $3 and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } - // todo those two are really one pc2 - if task.TaskTreeC == nil && task.AfterSDR { - // todo start tree c task - } - if task.TaskTreeR == nil && task.AfterTreeC { - // todo start tree r task + return true, nil + }) } if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD { diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index 612794b99..610f280f0 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -20,6 +20,16 @@ type TreesTask struct { max int } +func NewTreesTask(sp *SealPoller, db *harmonydb.DB, sc *lpffi.SealCalls, maxTrees int) *TreesTask { + return &TreesTask{ + sp: sp, + db: db, + sc: sc, + + max: maxTrees, + } +} + func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() @@ -76,7 +86,25 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done ProofType: sectorParams.RegSealProof, } - t.sc. + sealed, unsealed, err := t.sc.TreeRC(ctx, sref, commd) + if err != nil { + return false, xerrors.Errorf("computing tree r and c: %w", err) + } + + // todo tree d!! (?) + + n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline + SET after_tree_r = true, after_tree_c = true, after_tree_d = true, tree_r_cid = $1, tree_d_cid = $3 + WHERE sp_id = $1 AND sector_number = $2`, + sectorParams.SpID, sectorParams.SectorNumber, sealed, unsealed) + if err != nil { + return false, xerrors.Errorf("store sdr-trees success: updating pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("store sdr-trees success: updated %d rows", n) + } + + return true, nil } func (t *TreesTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { @@ -93,7 +121,7 @@ func (t *TreesTask) TypeDetails() harmonytask.TaskTypeDetails { Cost: resources.Resources{ Cpu: 1, Gpu: 1, - Ram: 8000, // todo + Ram: 8000 << 20, // todo }, MaxFailures: 3, Follows: nil,