From 3eea16c5286f76400fbac57efca4e33f5a0fbf46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 11 Jan 2024 23:55:26 +0100 Subject: [PATCH] lpseal: WORKING SDR PIPELINE --- cmd/lotus-provider/pipeline.go | 2 +- provider/lpffi/sdr_funcs.go | 4 ++++ provider/lpseal/poller.go | 9 +++++---- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cmd/lotus-provider/pipeline.go b/cmd/lotus-provider/pipeline.go index 8e01044e2..20eb93451 100644 --- a/cmd/lotus-provider/pipeline.go +++ b/cmd/lotus-provider/pipeline.go @@ -47,7 +47,7 @@ var pipelineStartCmd = &cli.Command{ &cli.BoolFlag{ Name: "synthetic", Usage: "Use synthetic PoRep", - Value: true, + Value: false, // todo implement synthetic }, &cli.StringSliceFlag{ // todo consider moving layers top level Name: "layers", diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index 5353ff87e..801b2e7ab 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -167,6 +167,10 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns return ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed) } +func (sb *SealCalls) GenerateSynthPoRep() { + panic("todo") +} + func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) { vproof, err := sb.sectors.storage.GenetartePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed) if err != nil { diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index 47e60ea6d..f2703067d 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -229,7 +229,7 @@ func (s *SealPoller) poll(ctx context.Context) error { }) } - if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskPrecommitMsg == nil { + if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() { s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) if err != nil { @@ -243,7 +243,7 @@ func (s *SealPoller) poll(ctx context.Context) error { }) } - if task.AfterCommitMsg && !task.AfterCommitMsgSuccess { + if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() { var execResult []struct { ExecutedTskCID string `db:"executed_tsk_cid"` ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` @@ -272,14 +272,15 @@ func (s *SealPoller) poll(ctx context.Context) error { return xerrors.Errorf("get sector info: %w", err) } - if si != nil { + if si == nil { + log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID) // todo handdle missing sector info (not found after cron) } else { // yay! _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_commit_msg_success = true, commit_msg_tsk = $1 - WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success is NULL`, + WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) if err != nil { return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)