diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index 51f302bcc..cfe7e921a 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -98,7 +98,7 @@ var wdPostTaskCmd = &cli.Command{ return false, xerrors.Errorf("inserting into harmony_tests: %w", err) } return true, nil - }, harmonydb.RetrySerializationErr()) + }, harmonydb.OptionRetry()) if err != nil { return xerrors.Errorf("writing SQL transaction: %w", err) } diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index 6dd066d25..759cbd322 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -137,13 +137,13 @@ type TransactionOptions struct { type TransactionOption func(*TransactionOptions) -func RetrySerializationErr() TransactionOption { +func OptionRetry() TransactionOption { return func(o *TransactionOptions) { o.RetrySerializationError = true } } -func InitialSerializationErrorRetryWait(d time.Duration) TransactionOption { +func OptionSerialRetryTime(d time.Duration) TransactionOption { return func(o *TransactionOptions) { o.InitialSerializationErrorRetryWait = d } diff --git a/provider/lpmarket/deal_ingest.go b/provider/lpmarket/deal_ingest.go index bfab45e34..0858cf801 100644 --- a/provider/lpmarket/deal_ingest.go +++ b/provider/lpmarket/deal_ingest.go @@ -78,7 +78,7 @@ func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address } n := numbers[0] - _, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt) + _, err := tx.Exec("INSERT INTO sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) VALUES ($1, $2, $3)", mid, n, spt) if err != nil { return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err) } diff --git a/provider/lpmarket/fakelm/lmimpl.go b/provider/lpmarket/fakelm/lmimpl.go index fff0a3685..abbc4d04d 100644 --- a/provider/lpmarket/fakelm/lmimpl.go +++ b/provider/lpmarket/fakelm/lmimpl.go @@ -75,7 +75,7 @@ func (l *LMRPCProvider) SectorsStatus(ctx context.Context, sid abi.SectorNumber, DealID *int64 `db:"f05_deal_id"` } - err = l.db.Select(ctx, &ssip, "select ssip.piece_cid, ssip.f05_deal_id from sectors_sdr_pipeline p left join sectors_sdr_initial_pieces ssip on p.sp_id = ssip.sp_id and p.sector_number = ssip.sector_number where p.sp_id = $1 and p.sector_number = $2", l.minerID, sid) + err = l.db.Select(ctx, &ssip, "SELECT ssip.piece_cid, ssip.f05_deal_id FROM sectors_sdr_pipeline p LEFT JOIN sectors_sdr_initial_pieces ssip ON p.sp_id = ssip.sp_id AND p.sector_number = ssip.sector_number WHERE p.sp_id = $1 AND p.sector_number = $2", l.minerID, sid) if err != nil { return api.SectorInfo{}, err } diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index 2cf3c72a6..cbdcb3fd9 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -137,7 +137,7 @@ func (s *SealPoller) poll(ctx context.Context) error { task_id_commit_msg, after_commit_msg, after_commit_msg_success, failed, failed_reason - FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true or after_move_storage != true`) + FROM sectors_sdr_pipeline WHERE after_commit_msg_success != TRUE OR after_move_storage != TRUE`) if err != nil { return err } @@ -170,7 +170,7 @@ func (s *SealPoller) poll(ctx context.Context) error { func (s *SealPoller) pollStartSDR(ctx context.Context, task pollTask) { if !task.AfterSDR && task.TaskSDR == nil && s.pollers[pollerSDR].IsSet() { s.pollers[pollerSDR].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_sdr is null`, id, task.SpID, task.SectorNumber) + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_sdr IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -194,7 +194,7 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 - WHERE sp_id = $2 AND sector_number = $3 and after_sdr = true and task_id_tree_d is null and task_id_tree_c is null and task_id_tree_r is null`, id, task.SpID, task.SectorNumber) + WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -225,7 +225,7 @@ func (s *SealPoller) pollStartPoRep(ctx context.Context, task pollTask, ts *type ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) { s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber) + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_porep IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -245,7 +245,7 @@ func (t pollTask) afterPoRep() bool { func (s *SealPoller) pollStartFinalize(ctx context.Context, task pollTask, ts *types.TipSet) { if s.pollers[pollerFinalize].IsSet() && task.afterPoRep() && !task.AfterFinalize && task.TaskFinalize == nil { s.pollers[pollerFinalize].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_finalize is null`, id, task.SpID, task.SectorNumber) + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_finalize = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_finalize IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -265,7 +265,7 @@ func (t pollTask) afterFinalize() bool { func (s *SealPoller) pollStartMoveStorage(ctx context.Context, task pollTask) { if s.pollers[pollerMoveStorage].IsSet() && task.afterFinalize() && !task.AfterMoveStorage && task.TaskMoveStorage == nil { s.pollers[pollerMoveStorage].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_move_storage is null`, id, task.SpID, task.SectorNumber) + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_move_storage = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_move_storage IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } diff --git a/provider/lpseal/poller_commit_msg.go b/provider/lpseal/poller_commit_msg.go index 0350200ef..d17ccd906 100644 --- a/provider/lpseal/poller_commit_msg.go +++ b/provider/lpseal/poller_commit_msg.go @@ -17,7 +17,7 @@ import ( func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) { if task.afterPoRep() && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && !task.AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() { s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber) + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_commit_msg IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -37,7 +37,7 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used FROM sectors_sdr_pipeline spipeline JOIN message_waits ON spipeline.commit_msg_cid = message_waits.signed_message_cid - WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, task.SpID, task.SectorNumber) if err != nil { log.Errorw("failed to query message_waits", "error", err) } @@ -64,8 +64,8 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err // yay! _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET - after_commit_msg_success = true, commit_msg_tsk = $1 - WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success = false`, + after_commit_msg_success = TRUE, commit_msg_tsk = $1 + WHERE sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = FALSE`, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) if err != nil { return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) @@ -97,8 +97,8 @@ func (s *SealPoller) pollRetryCommitMsgSend(ctx context.Context, task pollTask, // make the pipeline entry seem like precommit send didn't happen, next poll loop will retry _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET - commit_msg_cid = null, task_id_commit_msg = null - WHERE commit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = false`, + commit_msg_cid = NULL, task_id_commit_msg = NULL + WHERE commit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_commit_msg_success = FALSE`, *execResult.CommitMsgCID, task.SpID, task.SectorNumber) if err != nil { return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err) diff --git a/provider/lpseal/poller_precommit_msg.go b/provider/lpseal/poller_precommit_msg.go index be80ec1df..5f6958b53 100644 --- a/provider/lpseal/poller_precommit_msg.go +++ b/provider/lpseal/poller_precommit_msg.go @@ -18,7 +18,7 @@ import ( func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterTrees() && s.pollers[pollerPrecommitMsg].IsSet() { s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_precommit_msg is null and after_tree_r = true and after_tree_d = true`, id, task.SpID, task.SectorNumber) + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_precommit_msg IS NULL AND after_tree_r = TRUE AND after_tree_d = TRUE`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -50,7 +50,7 @@ func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) err := s.db.Select(ctx, &execResult, `SELECT spipeline.precommit_msg_cid, spipeline.commit_msg_cid, executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used FROM sectors_sdr_pipeline spipeline JOIN message_waits ON spipeline.precommit_msg_cid = message_waits.signed_message_cid - WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber) + WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch IS NOT NULL`, task.SpID, task.SectorNumber) if err != nil { log.Errorw("failed to query message_waits", "error", err) } @@ -74,8 +74,8 @@ func (s *SealPoller) pollPrecommitMsgLanded(ctx context.Context, task pollTask) randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay() _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET - seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = true - WHERE sp_id = $3 AND sector_number = $4 and seed_epoch is NULL`, + seed_epoch = $1, precommit_msg_tsk = $2, after_precommit_msg_success = TRUE + WHERE sp_id = $3 AND sector_number = $4 AND seed_epoch IS NULL`, randHeight, execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber) if err != nil { return xerrors.Errorf("update sectors_sdr_pipeline: %w", err) @@ -108,8 +108,8 @@ func (s *SealPoller) pollRetryPrecommitMsgSend(ctx context.Context, task pollTas // make the pipeline entry seem like precommit send didn't happen, next poll loop will retry _, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET - precommit_msg_cid = null, task_id_precommit_msg = null - WHERE precommit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_precommit_msg_success = false`, + precommit_msg_cid = NULL, task_id_precommit_msg = NULL + WHERE precommit_msg_cid = $1 AND sp_id = $2 AND sector_number = $3 AND after_precommit_msg_success = FALSE`, *execResult.PrecommitMsgCID, task.SpID, task.SectorNumber) if err != nil { return xerrors.Errorf("update sectors_sdr_pipeline to retry precommit msg send: %w", err) diff --git a/provider/lpseal/sector_num_alloc.go b/provider/lpseal/sector_num_alloc.go index 15f6b3435..6958a5e6b 100644 --- a/provider/lpseal/sector_num_alloc.go +++ b/provider/lpseal/sector_num_alloc.go @@ -38,7 +38,7 @@ func AllocateSectorNumbers(ctx context.Context, a AllocAPI, db *harmonydb.DB, ma var dbAllocated bitfield.BitField var rawJson []byte - err = tx.QueryRow("select COALESCE(allocated, '[0]') from sectors_allocated_numbers sa FULL OUTER JOIN (SELECT 1) AS d ON true where sp_id = $1 or sp_id is null", mid).Scan(&rawJson) + err = tx.QueryRow("SELECT COALESCE(allocated, '[0]') from sectors_allocated_numbers sa FULL OUTER JOIN (SELECT 1) AS d ON TRUE WHERE sp_id = $1 OR sp_id IS NULL", mid).Scan(&rawJson) if err != nil { return false, xerrors.Errorf("querying allocated sector numbers: %w", err) } @@ -97,7 +97,7 @@ func AllocateSectorNumbers(ctx context.Context, a AllocAPI, db *harmonydb.DB, ma return false, xerrors.Errorf("marshaling allocated sector numbers: %w", err) } - _, err = tx.Exec("insert into sectors_allocated_numbers(sp_id, allocated) values($1, $2) on conflict(sp_id) do update set allocated = $2", mid, rawJson) + _, err = tx.Exec("INSERT INTO sectors_allocated_numbers(sp_id, allocated) VALUES($1, $2) ON CONFLICT(sp_id) DO UPDATE SET allocated = $2", mid, rawJson) if err != nil { return false, xerrors.Errorf("persisting allocated sector numbers: %w", err) } @@ -114,7 +114,7 @@ func AllocateSectorNumbers(ctx context.Context, a AllocAPI, db *harmonydb.DB, ma } return true, nil - }, harmonydb.RetrySerializationErr()) + }, harmonydb.OptionRetry()) if err != nil { return nil, xerrors.Errorf("allocating sector numbers: %w", err) diff --git a/provider/lpseal/task_finalize.go b/provider/lpseal/task_finalize.go index 92183655a..8d425f76a 100644 --- a/provider/lpseal/task_finalize.go +++ b/provider/lpseal/task_finalize.go @@ -40,7 +40,7 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do ctx := context.Background() err = f.db.Select(ctx, &tasks, ` - select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_finalize=$1`, taskID) + SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_finalize = $1`, taskID) if err != nil { return false, xerrors.Errorf("getting task: %w", err) } @@ -52,7 +52,7 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do var keepUnsealed bool - if err := f.db.QueryRow(ctx, `select coalesce(bool_or(not data_delete_on_finalize), false) from sectors_sdr_initial_pieces where sp_id=$1 and sector_number=$2`, task.SpID, task.SectorNumber).Scan(&keepUnsealed); err != nil { + if err := f.db.QueryRow(ctx, `SELECT COALESCE(BOOL_OR(NOT data_delete_on_finalize), FALSE) FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, task.SpID, task.SectorNumber).Scan(&keepUnsealed); err != nil { return false, err } @@ -98,9 +98,10 @@ func (f *FinalizeTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T } err := f.db.Select(ctx, &tasks, ` - select p.task_id_finalize, p.sp_id, p.sector_number, l.storage_id from sectors_sdr_pipeline p - inner join sector_location l on p.sp_id=l.miner_id and p.sector_number=l.sector_num - where task_id_finalize = ANY ($1) and l.sector_filetype=4`, indIDs) + SELECT p.task_id_finalize, p.sp_id, p.sector_number, l.storage_id FROM sectors_sdr_pipeline p + INNER JOIN sector_location l ON p.sp_id = l.miner_id AND p.sector_number = l.sector_num + WHERE task_id_finalize = ANY ($1) AND l.sector_filetype = 4 +`, indIDs) if err != nil { return nil, xerrors.Errorf("getting tasks: %w", err) } diff --git a/provider/lpseal/task_movestorage.go b/provider/lpseal/task_movestorage.go index aa16d08e9..2085214b7 100644 --- a/provider/lpseal/task_movestorage.go +++ b/provider/lpseal/task_movestorage.go @@ -41,7 +41,7 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) ctx := context.Background() err = m.db.Select(ctx, &tasks, ` - select sp_id, sector_number, reg_seal_proof from sectors_sdr_pipeline where task_id_move_storage=$1`, taskID) + SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_move_storage = $1`, taskID) if err != nil { return false, xerrors.Errorf("getting task: %w", err) } @@ -63,7 +63,7 @@ func (m *MoveStorageTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("moving storage: %w", err) } - _, err = m.db.Exec(ctx, `update sectors_sdr_pipeline set after_move_storage=true where task_id_move_storage=$1`, taskID) + _, err = m.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_move_storage = true WHERE task_id_move_storage = $1`, taskID) if err != nil { return false, xerrors.Errorf("updating task: %w", err) } diff --git a/provider/lpseal/task_porep.go b/provider/lpseal/task_porep.go index f13acab3d..ba0283e0c 100644 --- a/provider/lpseal/task_porep.go +++ b/provider/lpseal/task_porep.go @@ -116,7 +116,7 @@ func (p *PoRepTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done // store success! n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_porep = true, seed_value = $3, porep_proof = $4 + SET after_porep = TRUE, seed_value = $3, porep_proof = $4 WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, []byte(rand), proof) if err != nil { diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index d8bf907f8..68dca5bde 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -81,7 +81,7 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo err = s.db.Select(ctx, &pieces, ` SELECT piece_index, piece_cid, piece_size FROM sectors_sdr_initial_pieces - WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index asc`, sectorParams.SpID, sectorParams.SectorNumber) + WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber) if err != nil { return false, xerrors.Errorf("getting pieces: %w", err) } diff --git a/provider/lpseal/task_submit_commit.go b/provider/lpseal/task_submit_commit.go index 0c3a9ffe7..5b46b7e00 100644 --- a/provider/lpseal/task_submit_commit.go +++ b/provider/lpseal/task_submit_commit.go @@ -140,7 +140,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("pushing message to mpool: %w", err) } - _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = true WHERE sp_id = $2 AND sector_number = $3`, mcid, sectorParams.SpID, sectorParams.SectorNumber) + _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = TRUE WHERE sp_id = $2 AND sector_number = $3`, mcid, sectorParams.SpID, sectorParams.SectorNumber) if err != nil { return false, xerrors.Errorf("updating commit_msg_cid: %w", err) } diff --git a/provider/lpseal/task_submit_precommit.go b/provider/lpseal/task_submit_precommit.go index 616336103..9f6233f39 100644 --- a/provider/lpseal/task_submit_precommit.go +++ b/provider/lpseal/task_submit_precommit.go @@ -117,7 +117,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo err = s.db.Select(ctx, &pieces, ` SELECT piece_index, piece_cid, piece_size, f05_deal_id, f05_deal_end_epoch FROM sectors_sdr_initial_pieces - WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index asc`, sectorParams.SpID, sectorParams.SectorNumber) + WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber) if err != nil { return false, xerrors.Errorf("getting pieces: %w", err) } @@ -175,7 +175,7 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo // set precommit_msg_cid _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET precommit_msg_cid = $1, after_precommit_msg = true + SET precommit_msg_cid = $1, after_precommit_msg = TRUE WHERE task_id_precommit_msg = $2`, mcid, taskID) if err != nil { return false, xerrors.Errorf("updating precommit_msg_cid: %w", err) diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index 3f9b000cd..da0fcf1e9 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -51,7 +51,7 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done err = t.db.Select(ctx, §orParamsArr, ` SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline - WHERE task_id_tree_r = $1 and task_id_tree_c = $1 and task_id_tree_d = $1`, taskID) + WHERE task_id_tree_r = $1 AND task_id_tree_c = $1 AND task_id_tree_d = $1`, taskID) if err != nil { return false, xerrors.Errorf("getting sector params: %w", err) } @@ -74,7 +74,7 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done err = t.db.Select(ctx, &pieces, ` SELECT piece_index, piece_cid, piece_size, data_url, data_headers, data_raw_size FROM sectors_sdr_initial_pieces - WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index asc`, sectorParams.SpID, sectorParams.SectorNumber) + WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber) if err != nil { return false, xerrors.Errorf("getting pieces: %w", err) } diff --git a/storage/paths/db_index.go b/storage/paths/db_index.go index 4c935762f..a867da3f9 100644 --- a/storage/paths/db_index.go +++ b/storage/paths/db_index.go @@ -241,7 +241,7 @@ func (dbi *DBIndex) StorageAttach(ctx context.Context, si storiface.StorageInfo, return false, xerrors.Errorf("StorageAttach insert fails: %v", err) } return true, nil - }, harmonydb.RetrySerializationErr()) + }, harmonydb.OptionRetry()) return err } @@ -293,7 +293,7 @@ func (dbi *DBIndex) StorageDetach(ctx context.Context, id storiface.ID, url stri return false, err } return true, nil - }, harmonydb.RetrySerializationErr()) + }, harmonydb.OptionRetry()) if err != nil { return err } @@ -407,7 +407,7 @@ func (dbi *DBIndex) StorageDeclareSector(ctx context.Context, storageID storifac } return true, nil - }, harmonydb.RetrySerializationErr()) + }, harmonydb.OptionRetry()) return err } @@ -846,7 +846,7 @@ func (dbi *DBIndex) lock(ctx context.Context, sector abi.SectorID, read storifac } return true, nil - }, harmonydb.RetrySerializationErr()) + }, harmonydb.OptionRetry()) if err != nil { return false, err }