lotus/provider/lppiece/task_park_piece.go

221 lines
6.0 KiB
Go
Raw Normal View History

2024-02-28 19:57:12 +00:00
package lppiece
import (
"context"
"encoding/json"
2024-02-29 10:11:40 +00:00
"strconv"
2024-02-28 20:50:12 +00:00
"time"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
2024-02-28 19:57:12 +00:00
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/provider/lpffi"
"github.com/filecoin-project/lotus/provider/lpseal"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("lppiece")
2024-02-29 10:11:40 +00:00
var PieceParkPollInterval = time.Second * 15
2024-02-28 19:57:12 +00:00
// ParkPieceTask gets a piece from some origin, and parks it in storage
// Pieces are always f00, piece ID is mapped to pieceCID in the DB
type ParkPieceTask struct {
db *harmonydb.DB
sc *lpffi.SealCalls
TF promise.Promise[harmonytask.AddTaskFunc]
max int
}
func NewParkPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *ParkPieceTask {
2024-02-29 10:11:40 +00:00
pt := &ParkPieceTask{
2024-02-28 19:57:12 +00:00
db: db,
sc: sc,
max: max,
}
2024-02-29 10:11:40 +00:00
go pt.pollPieceTasks(context.Background())
return pt
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
func (p *ParkPieceTask) pollPieceTasks(ctx context.Context) {
for {
// select parked pieces with no task_id
var pieceIDs []struct {
ID storiface.PieceNumber `db:"id"`
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
err := p.db.Select(ctx, &pieceIDs, `SELECT id FROM parked_pieces WHERE complete = FALSE AND task_id IS NULL`)
2024-02-28 19:57:12 +00:00
if err != nil {
2024-02-29 10:11:40 +00:00
log.Errorf("failed to get parked pieces: %s", err)
time.Sleep(PieceParkPollInterval)
continue
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
if len(pieceIDs) == 0 {
time.Sleep(PieceParkPollInterval)
continue
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
for _, pieceID := range pieceIDs {
2024-02-29 22:41:55 +00:00
pieceID := pieceID
2024-02-29 10:11:40 +00:00
// create a task for each piece
p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
n, err := tx.Exec(`UPDATE parked_pieces SET task_id = $1 WHERE id = $2 AND complete = FALSE AND task_id IS NULL`, id, pieceID.ID)
if err != nil {
return false, xerrors.Errorf("updating parked piece: %w", err)
}
// commit only if we updated the piece
return n > 0, nil
})
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
}
2024-02-28 19:57:12 +00:00
}
func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()
2024-02-29 10:11:40 +00:00
// Define a struct to hold piece data.
var piecesData []struct {
PieceID int64 `db:"id"`
PieceCreatedAt time.Time `db:"created_at"`
PieceCID string `db:"piece_cid"`
Complete bool `db:"complete"`
PiecePaddedSize int64 `db:"piece_padded_size"`
PieceRawSize string `db:"piece_raw_size"`
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
// Select the piece data using the task ID.
err = p.db.Select(ctx, &piecesData, `
SELECT id, created_at, piece_cid, complete, piece_padded_size, piece_raw_size
FROM parked_pieces
WHERE task_id = $1
`, taskID)
2024-02-28 19:57:12 +00:00
if err != nil {
2024-02-29 10:11:40 +00:00
return false, xerrors.Errorf("fetching piece data: %w", err)
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
if len(piecesData) == 0 {
return false, xerrors.Errorf("no piece data found for task_id: %d", taskID)
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
pieceData := piecesData[0]
if pieceData.Complete {
log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", pieceData.PieceCID)
2024-02-28 19:57:12 +00:00
return true, nil
}
2024-02-29 10:11:40 +00:00
// Define a struct for reference data.
var refData []struct {
DataURL string `db:"data_url"`
DataHeaders json.RawMessage `db:"data_headers"`
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
// Now, select the first reference data that has a URL.
err = p.db.Select(ctx, &refData, `
SELECT data_url, data_headers
FROM parked_piece_refs
WHERE piece_id = $1 AND data_url IS NOT NULL
LIMIT 1
`, pieceData.PieceID)
if err != nil {
return false, xerrors.Errorf("fetching reference data: %w", err)
}
2024-02-28 19:57:12 +00:00
2024-02-29 10:11:40 +00:00
if len(refData) == 0 {
return false, xerrors.Errorf("no refs found for piece_id: %d", pieceData.PieceID)
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
// Convert piece_raw_size from string to int64.
pieceRawSize, err := strconv.ParseInt(pieceData.PieceRawSize, 10, 64)
2024-02-28 19:57:12 +00:00
if err != nil {
2024-02-29 10:11:40 +00:00
return false, xerrors.Errorf("parsing piece raw size: %w", err)
}
if refData[0].DataURL != "" {
upr := &lpseal.UrlPieceReader{
Url: refData[0].DataURL,
RawSize: pieceRawSize,
}
defer func() {
_ = upr.Close()
}()
pnum := storiface.PieceNumber(pieceData.PieceID)
if err := p.sc.WritePiece(ctx, pnum, pieceRawSize, upr); err != nil {
return false, xerrors.Errorf("write piece: %w", err)
}
// Update the piece as complete after a successful write.
_, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE WHERE id = $1`, pieceData.PieceID)
if err != nil {
return false, xerrors.Errorf("marking piece as complete: %w", err)
}
return true, nil
2024-02-28 19:57:12 +00:00
}
2024-02-29 10:11:40 +00:00
// If no URL is found, this indicates an issue since at least one URL is expected.
return false, xerrors.Errorf("no data URL found for piece_id: %d", pieceData.PieceID)
2024-02-28 19:57:12 +00:00
}
func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil
}
func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails {
2024-03-15 13:10:48 +00:00
const maxSizePiece = 64 << 30
2024-02-28 19:57:12 +00:00
return harmonytask.TaskTypeDetails{
Max: p.max,
Name: "ParkPiece",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,
Ram: 64 << 20,
2024-03-15 13:10:48 +00:00
Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing),
2024-02-28 19:57:12 +00:00
},
MaxFailures: 10,
}
}
2024-03-15 13:10:48 +00:00
func (p *ParkPieceTask) taskToRef(id harmonytask.TaskID) (lpffi.SectorRef, error) {
var pieceIDs []struct {
ID storiface.PieceNumber `db:"id"`
}
err := p.db.Select(context.Background(), &pieceIDs, `SELECT id FROM parked_pieces WHERE task_id = $1`, id)
if err != nil {
return lpffi.SectorRef{}, xerrors.Errorf("getting piece id: %w", err)
}
if len(pieceIDs) != 1 {
return lpffi.SectorRef{}, xerrors.Errorf("expected 1 piece id, got %d", len(pieceIDs))
}
pref := pieceIDs[0].ID.Ref()
return lpffi.SectorRef{
SpID: int64(pref.ID.Miner),
SectorNumber: int64(pref.ID.Number),
RegSealProof: pref.ProofType,
}, nil
}
2024-02-28 19:57:12 +00:00
func (p *ParkPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) {
p.TF.Set(taskFunc)
}
var _ harmonytask.TaskInterface = &ParkPieceTask{}