lpseal: Rm piecepark refs in finalize
This commit is contained in:
parent
b90cf19604
commit
10453cd5af
51
provider/lpseal/finalize_pieces.go
Normal file
51
provider/lpseal/finalize_pieces.go
Normal file
@ -0,0 +1,51 @@
|
||||
package lpseal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
)
|
||||
|
||||
func DropSectorPieceRefs(ctx context.Context, db *harmonydb.DB, sid abi.SectorID) error {
|
||||
//_, err := db.Exec(ctx, `SELECT FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number)
|
||||
|
||||
var PieceURL []struct {
|
||||
URL string `db:"data_url"`
|
||||
}
|
||||
|
||||
err := db.Select(ctx, &PieceURL, `SELECT data_url FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting piece url: %w", err)
|
||||
}
|
||||
|
||||
for _, pu := range PieceURL {
|
||||
gourl, err := url.Parse(pu.URL)
|
||||
if err != nil {
|
||||
log.Errorw("failed to parse piece url", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number)
|
||||
continue
|
||||
}
|
||||
|
||||
if gourl.Scheme == "pieceref" {
|
||||
refID, err := strconv.ParseInt(gourl.Opaque, 10, 64)
|
||||
if err != nil {
|
||||
log.Errorw("failed to parse piece ref id", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number)
|
||||
continue
|
||||
}
|
||||
|
||||
n, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = $1`, refID)
|
||||
if err != nil {
|
||||
log.Errorw("failed to delete piece ref", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number)
|
||||
}
|
||||
|
||||
log.Debugw("deleted piece ref", "url", pu.URL, "miner", sid.Miner, "sector", sid.Number, "rows", n)
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
@ -69,6 +69,10 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
|
||||
return false, xerrors.Errorf("finalizing sector: %w", err)
|
||||
}
|
||||
|
||||
if err := DropSectorPieceRefs(ctx, f.db, sector.ID); err != nil {
|
||||
return false, xerrors.Errorf("dropping sector piece refs: %w", err)
|
||||
}
|
||||
|
||||
// set after_finalize
|
||||
_, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user