lpseal: Implement SDR-Trees

This commit is contained in:
Łukasz Magiera 2023-12-20 14:45:19 +01:00
parent d79bc0ba13
commit f74c3ccd2d
6 changed files with 150 additions and 15 deletions

View File

@ -72,6 +72,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
sdrTask := lpseal.NewSDRTask(full, db, sp, slr, cfg.Subsystems.SealSDRMaxTasks) sdrTask := lpseal.NewSDRTask(full, db, sp, slr, cfg.Subsystems.SealSDRMaxTasks)
activeTasks = append(activeTasks, sdrTask) activeTasks = append(activeTasks, sdrTask)
} }
if cfg.Subsystems.EnableSealSDRTrees {
treesTask := lpseal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
activeTasks = append(activeTasks, treesTask)
}
} }
log.Infow("This lotus_provider instance handles", log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs, "miner_addresses", maddrs,

View File

@ -11,6 +11,7 @@ import (
func getGPUDevices() float64 { // GPU boolean func getGPUDevices() float64 { // GPU boolean
gpus, err := ffi.GetGPUDevices() gpus, err := ffi.GetGPUDevices()
logger.Infow("GPUs", "list", gpus)
if err != nil { if err != nil {
logger.Errorf("getting gpu devices failed: %+v", err) logger.Errorf("getting gpu devices failed: %+v", err)
} }

View File

@ -103,9 +103,17 @@ type ProviderSubsystemsConfig struct {
GuiAddress string GuiAddress string
// EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation // EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation
// creating layers. In lotus-miner this was run as part of PreCommit1. // creating layers.
// In lotus-miner this was run as part of PreCommit1.
EnableSealSDR bool EnableSealSDR bool
SealSDRMaxTasks int SealSDRMaxTasks int
// EnableSealSDRTrees enables the SDR pipeline tree-building task to run.
// This task handles encoding of unsealed data into last sdr layer and building
// of TreeR, TreeC and TreeD.
// In lotus-miner this was run as part of PreCommit2 (TreeD was run in PreCommit1).
EnableSealSDRTrees bool
SealSDRTreesMaxTasks int
} }
type DAGStoreConfig struct { type DAGStoreConfig struct {

View File

@ -2,14 +2,19 @@ package lpffi
import ( import (
"context" "context"
"encoding/json"
"fmt"
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
commcid "github.com/filecoin-project/go-fil-commcid" commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"io"
"os"
) )
var log = logging.Logger("lpffi") var log = logging.Logger("lpffi")
@ -104,6 +109,92 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef
return nil return nil
} }
func (sb *SealCalls) TreeRC() { func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid) (cid.Cid, cid.Cid, error) {
ffi.SealPreCommitPhase2() p1o, err := sb.makePhase1Out(unsealed, sector.ProofType)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
}
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()
{
// create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err)
}
// paths.Sealed is a string filepath
f, err := os.Create(paths.Sealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("creating sealed sector file: %w", err)
}
if err := f.Truncate(int64(ssize)); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("truncating sealed sector file: %w", err)
}
if os.Getenv("SEAL_WRITE_UNSEALED") == "1" {
// expliticly write zeros to unsealed sector
_, err := io.CopyN(f, nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize)), int64(ssize))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("writing zeros to sealed sector file: %w", err)
}
}
}
return ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed)
}
func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) ([]byte, error) {
commd, err := commcid.CIDToDataCommitmentV1(unsCid)
if err != nil {
return nil, xerrors.Errorf("make uns cid: %w", err)
}
type Config struct {
ID string `json:"id"`
Path string `json:"path"`
RowsToDiscard int `json:"rows_to_discard"`
Size int `json:"size"`
}
type Labels struct {
Labels []Config `json:"labels"`
}
var phase1Output struct {
CommD [32]byte `json:"comm_d"`
Config Config `json:"config"` // TreeD
Labels map[string]*Labels `json:"labels"`
RegisteredProof string `json:"registered_proof"`
}
copy(phase1Output.CommD[:], commd)
phase1Output.Config.ID = "tree-d"
phase1Output.Config.Path = "/placeholder"
phase1Output.Labels = map[string]*Labels{}
switch spt {
case abi.RegisteredSealProof_StackedDrg2KiBV1_1, abi.RegisteredSealProof_StackedDrg2KiBV1_1_Feat_SyntheticPoRep:
phase1Output.Config.RowsToDiscard = 0
phase1Output.Config.Size = 127
phase1Output.Labels["StackedDrg2KiBV1"].Labels = make([]Config, 2)
phase1Output.RegisteredProof = "StackedDrg2KiBV1_1"
for i, l := range phase1Output.Labels["StackedDrg2KiBV1"].Labels {
l.ID = fmt.Sprintf("layer-%d", i+1)
l.Path = "/placeholder"
l.RowsToDiscard = 0
l.Size = 64
}
default:
panic("todo")
}
return json.Marshal(phase1Output)
} }

View File

@ -109,7 +109,7 @@ func (s *SealPoller) poll(ctx context.Context) error {
if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { if task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() {
s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3`, id, task.SpID, task.SectorNumber) n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber)
if err != nil { if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
} }
@ -120,16 +120,19 @@ func (s *SealPoller) poll(ctx context.Context) error {
return true, nil return true, nil
}) })
} }
if task.TaskTreeD == nil { if task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTrees].IsSet() && task.AfterSDR {
// todo start tree d task s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1
WHERE sp_id = $2 AND sector_number = $3 and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber)
if err != nil {
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
} }
// todo those two are really one pc2 return true, nil
if task.TaskTreeC == nil && task.AfterSDR { })
// todo start tree c task
}
if task.TaskTreeR == nil && task.AfterTreeC {
// todo start tree r task
} }
if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD { if task.TaskPrecommitMsg == nil && task.AfterTreeR && task.AfterTreeD {

View File

@ -20,6 +20,16 @@ type TreesTask struct {
max int max int
} }
func NewTreesTask(sp *SealPoller, db *harmonydb.DB, sc *lpffi.SealCalls, maxTrees int) *TreesTask {
return &TreesTask{
sp: sp,
db: db,
sc: sc,
max: maxTrees,
}
}
func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background() ctx := context.Background()
@ -76,7 +86,25 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
ProofType: sectorParams.RegSealProof, ProofType: sectorParams.RegSealProof,
} }
t.sc. sealed, unsealed, err := t.sc.TreeRC(ctx, sref, commd)
if err != nil {
return false, xerrors.Errorf("computing tree r and c: %w", err)
}
// todo tree d!! (?)
n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
SET after_tree_r = true, after_tree_c = true, after_tree_d = true, tree_r_cid = $1, tree_d_cid = $3
WHERE sp_id = $1 AND sector_number = $2`,
sectorParams.SpID, sectorParams.SectorNumber, sealed, unsealed)
if err != nil {
return false, xerrors.Errorf("store sdr-trees success: updating pipeline: %w", err)
}
if n != 1 {
return false, xerrors.Errorf("store sdr-trees success: updated %d rows", n)
}
return true, nil
} }
func (t *TreesTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { func (t *TreesTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
@ -93,7 +121,7 @@ func (t *TreesTask) TypeDetails() harmonytask.TaskTypeDetails {
Cost: resources.Resources{ Cost: resources.Resources{
Cpu: 1, Cpu: 1,
Gpu: 1, Gpu: 1,
Ram: 8000, // todo Ram: 8000 << 20, // todo
}, },
MaxFailures: 3, MaxFailures: 3,
Follows: nil, Follows: nil,