lpseal: WORKING SDR PIPELINE

This commit is contained in:
Łukasz Magiera 2024-01-11 23:55:26 +01:00
parent f2e1b5a35c
commit 3eea16c528
3 changed files with 10 additions and 5 deletions

View File

@ -47,7 +47,7 @@ var pipelineStartCmd = &cli.Command{
&cli.BoolFlag{ &cli.BoolFlag{
Name: "synthetic", Name: "synthetic",
Usage: "Use synthetic PoRep", Usage: "Use synthetic PoRep",
Value: true, Value: false, // todo implement synthetic
}, },
&cli.StringSliceFlag{ // todo consider moving layers top level &cli.StringSliceFlag{ // todo consider moving layers top level
Name: "layers", Name: "layers",

View File

@ -167,6 +167,10 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns
return ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed) return ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed)
} }
func (sb *SealCalls) GenerateSynthPoRep() {
panic("todo")
}
func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) { func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sealed, unsealed cid.Cid, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness) ([]byte, error) {
vproof, err := sb.sectors.storage.GenetartePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed) vproof, err := sb.sectors.storage.GenetartePoRepVanillaProof(ctx, sn, sealed, unsealed, ticket, seed)
if err != nil { if err != nil {

View File

@ -229,7 +229,7 @@ func (s *SealPoller) poll(ctx context.Context) error {
}) })
} }
if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskPrecommitMsg == nil { if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && s.pollers[pollerCommitMsg].IsSet() {
s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber)
if err != nil { if err != nil {
@ -243,7 +243,7 @@ func (s *SealPoller) poll(ctx context.Context) error {
}) })
} }
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess { if task.AfterCommitMsg && !task.AfterCommitMsgSuccess && s.pollers[pollerCommitMsg].IsSet() {
var execResult []struct { var execResult []struct {
ExecutedTskCID string `db:"executed_tsk_cid"` ExecutedTskCID string `db:"executed_tsk_cid"`
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"` ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
@ -272,14 +272,15 @@ func (s *SealPoller) poll(ctx context.Context) error {
return xerrors.Errorf("get sector info: %w", err) return xerrors.Errorf("get sector info: %w", err)
} }
if si != nil { if si == nil {
log.Errorw("todo handle missing sector info (not found after cron)", "sp", task.SpID, "sector", task.SectorNumber, "exec_epoch", execResult[0].ExecutedTskEpoch, "exec_tskcid", execResult[0].ExecutedTskCID, "msg_cid", execResult[0].ExecutedMsgCID)
// todo handdle missing sector info (not found after cron) // todo handdle missing sector info (not found after cron)
} else { } else {
// yay! // yay!
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
after_commit_msg_success = true, commit_msg_tsk = $1 after_commit_msg_success = true, commit_msg_tsk = $1
WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success is NULL`, WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`,
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
if err != nil { if err != nil {
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)