diff --git a/provider/lpseal/finalize_pieces.go b/provider/lpseal/finalize_pieces.go new file mode 100644 index 000000000..8d52bf331 --- /dev/null +++ b/provider/lpseal/finalize_pieces.go @@ -0,0 +1,51 @@ +package lpseal + +import ( + "context" + "net/url" + "strconv" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" +) + +func DropSectorPieceRefs(ctx context.Context, db *harmonydb.DB, sid abi.SectorID) error { + //_, err := db.Exec(ctx, `SELECT FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number) + + var PieceURL []struct { + URL string `db:"data_url"` + } + + err := db.Select(ctx, &PieceURL, `SELECT data_url FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number) + if err != nil { + return xerrors.Errorf("getting piece url: %w", err) + } + + for _, pu := range PieceURL { + gourl, err := url.Parse(pu.URL) + if err != nil { + log.Errorw("failed to parse piece url", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + continue + } + + if gourl.Scheme == "pieceref" { + refID, err := strconv.ParseInt(gourl.Opaque, 10, 64) + if err != nil { + log.Errorw("failed to parse piece ref id", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + continue + } + + n, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = $1`, refID) + if err != nil { + log.Errorw("failed to delete piece ref", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number) + } + + log.Debugw("deleted piece ref", "url", pu.URL, "miner", sid.Miner, "sector", sid.Number, "rows", n) + } + } + + return err +} diff --git a/provider/lpseal/task_finalize.go b/provider/lpseal/task_finalize.go index 8d425f76a..6246af733 100644 --- a/provider/lpseal/task_finalize.go +++ b/provider/lpseal/task_finalize.go @@ -69,6 +69,10 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do return false, xerrors.Errorf("finalizing sector: %w", err) } + if err := DropSectorPieceRefs(ctx, f.db, sector.ID); err != nil { + return false, xerrors.Errorf("dropping sector piece refs: %w", err) + } + // set after_finalize _, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID) if err != nil {