Merge pull request #11664 from filecoin-project/feat/lpdeal-cache

feat: curio deal cache (Piece Park)
This commit is contained in:
Łukasz Magiera 2024-03-18 21:13:35 +01:00 committed by GitHub
commit 356ea2d774
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 852 additions and 45 deletions

View File

@ -9711,6 +9711,7 @@
0,
1,
0,
0,
0
],
"Read": [
@ -9718,6 +9719,7 @@
3,
0,
0,
0,
0
]
}
@ -9736,8 +9738,8 @@
"title": "number",
"type": "number"
},
"maxItems": 5,
"minItems": 5,
"maxItems": 6,
"minItems": 6,
"type": "array"
},
"Sector": {
@ -9760,8 +9762,8 @@
"title": "number",
"type": "number"
},
"maxItems": 5,
"minItems": 5,
"maxItems": 6,
"minItems": 6,
"type": "array"
}
},

View File

@ -13,9 +13,12 @@ import (
"github.com/filecoin-project/lotus/curiosrc/chainsched"
"github.com/filecoin-project/lotus/curiosrc/ffi"
"github.com/filecoin-project/lotus/curiosrc/message"
"github.com/filecoin-project/lotus/curiosrc/piece"
"github.com/filecoin-project/lotus/curiosrc/seal"
"github.com/filecoin-project/lotus/curiosrc/winning"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/lazy"
"github.com/filecoin-project/lotus/lib/must"
"github.com/filecoin-project/lotus/node/modules"
)
@ -66,6 +69,19 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}
}
slrLazy := lazy.MakeLazy(func() (*ffi.SealCalls, error) {
return ffi.NewSealCalls(stor, lstor, si), nil
})
{
// Piece handling
if cfg.Subsystems.EnableParkPiece {
parkPieceTask := piece.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks)
cleanupPieceTask := piece.NewCleanupPieceTask(db, must.One(slrLazy.Val()), 0)
activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask)
}
}
hasAnySealingTask := cfg.Subsystems.EnableSealSDR ||
cfg.Subsystems.EnableSealSDRTrees ||
cfg.Subsystems.EnableSendPrecommitMsg ||
@ -81,7 +97,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
sp = seal.NewPoller(db, full)
go sp.RunPoller(ctx)
slr = ffi.NewSealCalls(stor, lstor, si)
slr = must.One(slrLazy.Val())
}
// NOTE: Tasks with the LEAST priority are at the top

View File

@ -14,7 +14,8 @@ import (
"time"
"github.com/fatih/color"
"github.com/ipfs/go-cid"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/mitchellh/go-homedir"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
@ -37,6 +38,7 @@ import (
"github.com/filecoin-project/lotus/cmd/curio/deps"
cumarket "github.com/filecoin-project/lotus/curiosrc/market"
"github.com/filecoin-project/lotus/curiosrc/market/fakelm"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/must"
"github.com/filecoin-project/lotus/lib/nullreader"
"github.com/filecoin-project/lotus/metrics/proxy"
@ -427,7 +429,7 @@ var lpBoostProxyCmd = &cli.Command{
}
pieceInfoLk := new(sync.Mutex)
pieceInfos := map[cid.Cid][]pieceInfo{}
pieceInfos := map[uuid.UUID][]pieceInfo{}
ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) {
origPieceData := pieceData
@ -449,25 +451,103 @@ var lpBoostProxyCmd = &cli.Command{
done: make(chan struct{}),
}
pieceUUID := uuid.New()
color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID)
pieceInfoLk.Lock()
pieceInfos[deal.DealProposal.PieceCID] = append(pieceInfos[deal.DealProposal.PieceCID], pi)
pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi)
pieceInfoLk.Unlock()
// /piece?piece_cid=xxxx
dataUrl := rootUrl
dataUrl.Path = "/piece"
dataUrl.RawQuery = "piece_cid=" + deal.DealProposal.PieceCID.String()
dataUrl.RawQuery = "piece_id=" + pieceUUID.String()
// add piece entry
var refID int64
var pieceWasCreated bool
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)
if err != nil {
if err == pgx.ErrNoRows {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
VALUES ($1, $2, $3)
ON CONFLICT (piece_cid) DO NOTHING
RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
}
} else {
pieceWasCreated = false // Piece already exists, no new piece was created
}
// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
}
// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
}
// wait for piece to be parked
if pieceWasCreated {
<-pi.done
} else {
// If the piece was not created, we need to close the done channel
close(pi.done)
go func() {
// close the data reader (drain to eof if it's not a closer)
if closer, ok := pieceData.(io.Closer); ok {
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in DataCid", "error", err)
}
} else {
log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData))
_, err := io.Copy(io.Discard, pieceData)
if err != nil {
log.Warnw("draining pieceData in DataCid", "error", err)
}
}
}()
}
pieceIDUrl := url.URL{
Scheme: "pieceref",
Opaque: fmt.Sprintf("%d", refID),
}
// make a sector
so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), dataUrl, nil)
so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil)
if err != nil {
return api.SectorOffset{}, err
}
color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset)
<-pi.done
return so, nil
}
@ -484,10 +564,12 @@ var lpBoostProxyCmd = &cli.Command{
ast.Internal.StorageGetLocks = si.StorageGetLocks
var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
// /piece?piece_cid=xxxx
pieceCid, err := cid.Decode(r.URL.Query().Get("piece_cid"))
// /piece?piece_id=xxxx
pieceUUID := r.URL.Query().Get("piece_id")
pu, err := uuid.Parse(pieceUUID)
if err != nil {
http.Error(w, "bad piece_cid", http.StatusBadRequest)
http.Error(w, "bad piece id", http.StatusBadRequest)
return
}
@ -496,13 +578,13 @@ var lpBoostProxyCmd = &cli.Command{
return
}
fmt.Printf("%s request for piece from %s\n", pieceCid, r.RemoteAddr)
fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr)
pieceInfoLk.Lock()
pis, ok := pieceInfos[pieceCid]
pis, ok := pieceInfos[pu]
if !ok {
http.Error(w, "piece not found", http.StatusNotFound)
color.Red("%s not found", pieceCid)
color.Red("%s not found", pu)
pieceInfoLk.Unlock()
return
}
@ -511,7 +593,10 @@ var lpBoostProxyCmd = &cli.Command{
pi := pis[0]
pis = pis[1:]
pieceInfos[pieceCid] = pis
pieceInfos[pu] = pis
if len(pis) == 0 {
delete(pieceInfos, pu)
}
pieceInfoLk.Unlock()
@ -533,7 +618,7 @@ var lpBoostProxyCmd = &cli.Command{
return
}
color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pieceCid, float64(n)/(1024*1024), took, mbps)
color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps)
}
finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast)

View File

@ -0,0 +1,75 @@
package ffi
import (
"context"
"io"
"os"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
func (sb *SealCalls) WritePiece(ctx context.Context, pieceID storiface.PieceNumber, size int64, data io.Reader) error {
// todo: config(?): allow setting PathStorage for this
// todo storage reservations
paths, done, err := sb.sectors.AcquireSector(ctx, nil, pieceID.Ref(), storiface.FTNone, storiface.FTPiece, storiface.PathSealing)
if err != nil {
return err
}
defer done()
dest := paths.Piece
tempDest := dest + ".tmp"
destFile, err := os.OpenFile(tempDest, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return xerrors.Errorf("creating temp piece file '%s': %w", tempDest, err)
}
removeTemp := true
defer func() {
if removeTemp {
rerr := os.Remove(tempDest)
if rerr != nil {
log.Errorf("removing temp file: %+v", rerr)
}
}
}()
copyStart := time.Now()
n, err := io.CopyBuffer(destFile, io.LimitReader(data, size), make([]byte, 8<<20))
if err != nil {
_ = destFile.Close()
return xerrors.Errorf("copying piece data: %w", err)
}
if err := destFile.Close(); err != nil {
return xerrors.Errorf("closing temp piece file: %w", err)
}
if n != size {
return xerrors.Errorf("short write: %d", n)
}
copyEnd := time.Now()
log.Infow("wrote parked piece", "piece", pieceID, "size", size, "duration", copyEnd.Sub(copyStart), "dest", dest, "MiB/s", float64(size)/(1<<20)/copyEnd.Sub(copyStart).Seconds())
if err := os.Rename(tempDest, dest); err != nil {
return xerrors.Errorf("rename temp piece to dest %s -> %s: %w", tempDest, dest, err)
}
removeTemp = false
return nil
}
func (sb *SealCalls) PieceReader(ctx context.Context, id storiface.PieceNumber) (io.ReadCloser, error) {
return sb.sectors.storage.ReaderSeq(ctx, id.Ref(), storiface.FTPiece)
}
func (sb *SealCalls) RemovePiece(ctx context.Context, id storiface.PieceNumber) error {
return sb.sectors.storage.Remove(ctx, id.Ref().ID, storiface.FTPiece, true, nil)
}

View File

@ -42,7 +42,7 @@ type SealCalls struct {
externCalls ExternalSealer*/
}
func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCalls {
func NewSealCalls(st *paths.Remote, ls *paths.Local, si paths.SectorIndex) *SealCalls {
return &SealCalls{
sectors: &storageProvider{
storage: st,
@ -54,7 +54,7 @@ func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCa
}
type storageProvider struct {
storage paths.Store
storage *paths.Remote
localStore *paths.Local
sindex paths.SectorIndex
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]
@ -69,7 +69,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
if taskID != nil {
resv, ok = l.storageReservations.Load(*taskID)
}
if ok {
if ok && resv != nil {
if resv.Alloc != allocate || resv.Existing != existing {
// this should never happen, only when task definition is wrong
return storiface.SectorPaths{}, nil, xerrors.Errorf("storage reservation type mismatch")
@ -78,6 +78,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate)
paths = resv.Paths
storageIDs = resv.PathIDs
releaseStorage = resv.Release
} else {
var err error

View File

@ -0,0 +1,130 @@
package piece
import (
"context"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/curiosrc/ffi"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type CleanupPieceTask struct {
max int
db *harmonydb.DB
sc *ffi.SealCalls
TF promise.Promise[harmonytask.AddTaskFunc]
}
func NewCleanupPieceTask(db *harmonydb.DB, sc *ffi.SealCalls, max int) *CleanupPieceTask {
pt := &CleanupPieceTask{
db: db,
sc: sc,
max: max,
}
go pt.pollCleanupTasks(context.Background())
return pt
}
func (c *CleanupPieceTask) pollCleanupTasks(ctx context.Context) {
for {
// select pieces with no refs and null cleanup_task_id
var pieceIDs []struct {
ID storiface.PieceNumber `db:"id"`
}
err := c.db.Select(ctx, &pieceIDs, `SELECT id FROM parked_pieces WHERE cleanup_task_id IS NULL AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = parked_pieces.id) = 0`)
if err != nil {
log.Errorf("failed to get parked pieces: %s", err)
time.Sleep(PieceParkPollInterval)
continue
}
if len(pieceIDs) == 0 {
time.Sleep(PieceParkPollInterval)
continue
}
for _, pieceID := range pieceIDs {
pieceID := pieceID
// create a task for each piece
c.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
n, err := tx.Exec(`UPDATE parked_pieces SET cleanup_task_id = $1 WHERE id = $2 AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = parked_pieces.id) = 0`, id, pieceID.ID)
if err != nil {
return false, xerrors.Errorf("updating parked piece: %w", err)
}
// commit only if we updated the piece
return n > 0, nil
})
}
}
}
func (c *CleanupPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()
// select by cleanup_task_id
var pieceID int64
err = c.db.QueryRow(ctx, "SELECT id FROM parked_pieces WHERE cleanup_task_id = $1", taskID).Scan(&pieceID)
if err != nil {
return false, xerrors.Errorf("query parked_piece: %w", err)
}
// delete from parked_pieces where id = $1 where ref count = 0
// note: we delete from the db first because that guarantees that the piece is no longer in use
// if storage delete fails, it will be retried later is other cleanup tasks
n, err := c.db.Exec(ctx, "DELETE FROM parked_pieces WHERE id = $1 AND (SELECT count(*) FROM parked_piece_refs WHERE piece_id = $1) = 0", pieceID)
if err != nil {
return false, xerrors.Errorf("delete parked_piece: %w", err)
}
if n == 0 {
return true, nil
}
// remove from storage
err = c.sc.RemovePiece(ctx, storiface.PieceNumber(pieceID))
if err != nil {
log.Errorw("remove piece", "piece_id", pieceID, "error", err)
}
return true, nil
}
func (c *CleanupPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
// the remove call runs on paths.Remote storage, so it doesn't really matter where it runs
id := ids[0]
return &id, nil
}
func (c *CleanupPieceTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: c.max,
Name: "DropPiece",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,
Ram: 64 << 20,
Storage: nil,
},
MaxFailures: 10,
}
}
func (c *CleanupPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) {
c.TF.Set(taskFunc)
}
var _ harmonytask.TaskInterface = &CleanupPieceTask{}

View File

@ -0,0 +1,220 @@
package piece
import (
"context"
"encoding/json"
"strconv"
"time"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/curiosrc/ffi"
"github.com/filecoin-project/lotus/curiosrc/seal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("lppiece")
var PieceParkPollInterval = time.Second * 15
// ParkPieceTask gets a piece from some origin, and parks it in storage
// Pieces are always f00, piece ID is mapped to pieceCID in the DB
type ParkPieceTask struct {
db *harmonydb.DB
sc *ffi.SealCalls
TF promise.Promise[harmonytask.AddTaskFunc]
max int
}
func NewParkPieceTask(db *harmonydb.DB, sc *ffi.SealCalls, max int) *ParkPieceTask {
pt := &ParkPieceTask{
db: db,
sc: sc,
max: max,
}
go pt.pollPieceTasks(context.Background())
return pt
}
func (p *ParkPieceTask) pollPieceTasks(ctx context.Context) {
for {
// select parked pieces with no task_id
var pieceIDs []struct {
ID storiface.PieceNumber `db:"id"`
}
err := p.db.Select(ctx, &pieceIDs, `SELECT id FROM parked_pieces WHERE complete = FALSE AND task_id IS NULL`)
if err != nil {
log.Errorf("failed to get parked pieces: %s", err)
time.Sleep(PieceParkPollInterval)
continue
}
if len(pieceIDs) == 0 {
time.Sleep(PieceParkPollInterval)
continue
}
for _, pieceID := range pieceIDs {
pieceID := pieceID
// create a task for each piece
p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update
n, err := tx.Exec(`UPDATE parked_pieces SET task_id = $1 WHERE id = $2 AND complete = FALSE AND task_id IS NULL`, id, pieceID.ID)
if err != nil {
return false, xerrors.Errorf("updating parked piece: %w", err)
}
// commit only if we updated the piece
return n > 0, nil
})
}
}
}
func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()
// Define a struct to hold piece data.
var piecesData []struct {
PieceID int64 `db:"id"`
PieceCreatedAt time.Time `db:"created_at"`
PieceCID string `db:"piece_cid"`
Complete bool `db:"complete"`
PiecePaddedSize int64 `db:"piece_padded_size"`
PieceRawSize string `db:"piece_raw_size"`
}
// Select the piece data using the task ID.
err = p.db.Select(ctx, &piecesData, `
SELECT id, created_at, piece_cid, complete, piece_padded_size, piece_raw_size
FROM parked_pieces
WHERE task_id = $1
`, taskID)
if err != nil {
return false, xerrors.Errorf("fetching piece data: %w", err)
}
if len(piecesData) == 0 {
return false, xerrors.Errorf("no piece data found for task_id: %d", taskID)
}
pieceData := piecesData[0]
if pieceData.Complete {
log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", pieceData.PieceCID)
return true, nil
}
// Define a struct for reference data.
var refData []struct {
DataURL string `db:"data_url"`
DataHeaders json.RawMessage `db:"data_headers"`
}
// Now, select the first reference data that has a URL.
err = p.db.Select(ctx, &refData, `
SELECT data_url, data_headers
FROM parked_piece_refs
WHERE piece_id = $1 AND data_url IS NOT NULL
LIMIT 1
`, pieceData.PieceID)
if err != nil {
return false, xerrors.Errorf("fetching reference data: %w", err)
}
if len(refData) == 0 {
return false, xerrors.Errorf("no refs found for piece_id: %d", pieceData.PieceID)
}
// Convert piece_raw_size from string to int64.
pieceRawSize, err := strconv.ParseInt(pieceData.PieceRawSize, 10, 64)
if err != nil {
return false, xerrors.Errorf("parsing piece raw size: %w", err)
}
if refData[0].DataURL != "" {
upr := &seal.UrlPieceReader{
Url: refData[0].DataURL,
RawSize: pieceRawSize,
}
defer func() {
_ = upr.Close()
}()
pnum := storiface.PieceNumber(pieceData.PieceID)
if err := p.sc.WritePiece(ctx, pnum, pieceRawSize, upr); err != nil {
return false, xerrors.Errorf("write piece: %w", err)
}
// Update the piece as complete after a successful write.
_, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE WHERE id = $1`, pieceData.PieceID)
if err != nil {
return false, xerrors.Errorf("marking piece as complete: %w", err)
}
return true, nil
}
// If no URL is found, this indicates an issue since at least one URL is expected.
return false, xerrors.Errorf("no data URL found for piece_id: %d", pieceData.PieceID)
}
func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil
}
func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails {
const maxSizePiece = 64 << 30
return harmonytask.TaskTypeDetails{
Max: p.max,
Name: "ParkPiece",
Cost: resources.Resources{
Cpu: 1,
Gpu: 0,
Ram: 64 << 20,
Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing),
},
MaxFailures: 10,
}
}
func (p *ParkPieceTask) taskToRef(id harmonytask.TaskID) (ffi.SectorRef, error) {
var pieceIDs []struct {
ID storiface.PieceNumber `db:"id"`
}
err := p.db.Select(context.Background(), &pieceIDs, `SELECT id FROM parked_pieces WHERE task_id = $1`, id)
if err != nil {
return ffi.SectorRef{}, xerrors.Errorf("getting piece id: %w", err)
}
if len(pieceIDs) != 1 {
return ffi.SectorRef{}, xerrors.Errorf("expected 1 piece id, got %d", len(pieceIDs))
}
pref := pieceIDs[0].ID.Ref()
return ffi.SectorRef{
SpID: int64(pref.ID.Miner),
SectorNumber: int64(pref.ID.Number),
RegSealProof: pref.ProofType,
}, nil
}
func (p *ParkPieceTask) Adder(taskFunc harmonytask.AddTaskFunc) {
p.TF.Set(taskFunc)
}
var _ harmonytask.TaskInterface = &ParkPieceTask{}

View File

@ -0,0 +1,51 @@
package seal
import (
"context"
"net/url"
"strconv"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
func DropSectorPieceRefs(ctx context.Context, db *harmonydb.DB, sid abi.SectorID) error {
//_, err := db.Exec(ctx, `SELECT FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number)
var PieceURL []struct {
URL string `db:"data_url"`
}
err := db.Select(ctx, &PieceURL, `SELECT data_url FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sid.Miner, sid.Number)
if err != nil {
return xerrors.Errorf("getting piece url: %w", err)
}
for _, pu := range PieceURL {
gourl, err := url.Parse(pu.URL)
if err != nil {
log.Errorw("failed to parse piece url", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number)
continue
}
if gourl.Scheme == "pieceref" {
refID, err := strconv.ParseInt(gourl.Opaque, 10, 64)
if err != nil {
log.Errorw("failed to parse piece ref id", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number)
continue
}
n, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = $1`, refID)
if err != nil {
log.Errorw("failed to delete piece ref", "url", pu.URL, "error", err, "miner", sid.Miner, "sector", sid.Number)
}
log.Debugw("deleted piece ref", "url", pu.URL, "miner", sid.Miner, "sector", sid.Number, "rows", n)
}
}
return err
}

View File

@ -69,6 +69,10 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
return false, xerrors.Errorf("finalizing sector: %w", err)
}
if err := DropSectorPieceRefs(ctx, f.db, sector.ID); err != nil {
return false, xerrors.Errorf("dropping sector piece refs: %w", err)
}
// set after_finalize
_, err = f.db.Exec(ctx, `update sectors_sdr_pipeline set after_finalize=true where task_id_finalize=$1`, taskID)
if err != nil {

View File

@ -187,8 +187,6 @@ func (s *SDRTask) getTicket(ctx context.Context, maddr address.Address) (abi.Sea
}
func (s *SDRTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
// todo check storage (reserve too?)
id := ids[0]
return &id, nil
}

View File

@ -9,12 +9,15 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
actorstypes "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
miner12 "github.com/filecoin-project/go-state-types/builtin/v12/miner"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/curiosrc/message"
"github.com/filecoin-project/lotus/curiosrc/multictladdr"
@ -27,6 +30,7 @@ import (
type SubmitPrecommitTaskApi interface {
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
ctladdr.NodeApi
}
@ -136,6 +140,23 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo
}
}
nv, err := s.api.StateNetworkVersion(ctx, types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting network version: %w", err)
}
av, err := actorstypes.VersionForNetwork(nv)
if err != nil {
return false, xerrors.Errorf("failed to get actors version: %w", err)
}
msd, err := policy.GetMaxProveCommitDuration(av, sectorParams.RegSealProof)
if err != nil {
return false, xerrors.Errorf("failed to get max prove commit duration: %w", err)
}
if minExpiration := sectorParams.TicketEpoch + policy.MaxPreCommitRandomnessLookback + msd + miner.MinSectorExpiration; params.Sectors[0].Expiration < minExpiration {
params.Sectors[0].Expiration = minExpiration
}
var pbuf bytes.Buffer
if err := params.MarshalCBOR(&pbuf); err != nil {
return false, xerrors.Errorf("serializing params: %w", err)

View File

@ -4,6 +4,8 @@ import (
"context"
"io"
"net/http"
"net/url"
"strconv"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -88,6 +90,15 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
var dataReader io.Reader
var unpaddedData bool
var closers []io.Closer
defer func() {
for _, c := range closers {
if err := c.Close(); err != nil {
log.Errorw("error closing piece reader", "error", err)
}
}
}()
if len(pieces) > 0 {
pieceInfos := make([]abi.PieceInfo, len(pieces))
pieceReaders := make([]io.Reader, len(pieces))
@ -106,10 +117,49 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
// make pieceReader
if p.DataUrl != nil {
pieceReaders[i], _ = padreader.New(&UrlPieceReader{
Url: *p.DataUrl,
RawSize: *p.DataRawSize,
}, uint64(*p.DataRawSize))
dataUrl := *p.DataUrl
goUrl, err := url.Parse(dataUrl)
if err != nil {
return false, xerrors.Errorf("parsing data URL: %w", err)
}
if goUrl.Scheme == "pieceref" {
// url is to a piece reference
refNum, err := strconv.ParseInt(goUrl.Opaque, 10, 64)
if err != nil {
return false, xerrors.Errorf("parsing piece reference number: %w", err)
}
// get pieceID
var pieceID []struct {
PieceID storiface.PieceNumber `db:"piece_id"`
}
err = t.db.Select(ctx, &pieceID, `SELECT piece_id FROM parked_piece_refs WHERE ref_id = $1`, refNum)
if err != nil {
return false, xerrors.Errorf("getting pieceID: %w", err)
}
if len(pieceID) != 1 {
return false, xerrors.Errorf("expected 1 pieceID, got %d", len(pieceID))
}
pr, err := t.sc.PieceReader(ctx, pieceID[0].PieceID)
if err != nil {
return false, xerrors.Errorf("getting piece reader: %w", err)
}
closers = append(closers, pr)
pieceReaders[i], _ = padreader.New(pr, uint64(*p.DataRawSize))
} else {
pieceReaders[i], _ = padreader.New(&UrlPieceReader{
Url: dataUrl,
RawSize: *p.DataRawSize,
}, uint64(*p.DataRawSize))
}
} else { // padding piece (w/o fr32 padding, added in TreeD)
pieceReaders[i] = nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded())
}
@ -200,6 +250,7 @@ type UrlPieceReader struct {
RawSize int64 // the exact number of bytes read, if we read more or less that's an error
readSoFar int64
closed bool
active io.ReadCloser // auto-closed on EOF
}
@ -239,6 +290,7 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) {
// If EOF is reached, close the reader
if err == io.EOF {
cerr := u.active.Close()
u.closed = true
if cerr != nil {
log.Errorf("error closing http piece reader: %s", cerr)
}
@ -253,4 +305,13 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) {
return n, err
}
func (u *UrlPieceReader) Close() error {
if !u.closed {
u.closed = true
return u.active.Close()
}
return nil
}
var _ harmonytask.TaskInterface = &TreesTask{}

View File

@ -3954,6 +3954,7 @@ Response:
0,
1,
0,
0,
0
],
"Read": [
@ -3961,6 +3962,7 @@ Response:
3,
0,
0,
0,
0
]
}

View File

@ -25,6 +25,18 @@
# type: int
#WinningPostMaxTasks = 0
# EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching
# pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is
# only applicable when integrating with boost, and should be enabled on nodes which will hold deal data
# from boost until sectors containing the related pieces have the TreeD/TreeR constructed.
# Note that future Curio implementations will have a separate task type for fetching pieces from the internet.
#
# type: bool
#EnableParkPiece = false
# type: int
#ParkPieceMaxTasks = 0
# EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation
# creating 11 layer files in sector cache directory.
#

View File

@ -0,0 +1,35 @@
create table parked_pieces (
id bigserial primary key,
created_at timestamp default current_timestamp,
piece_cid text not null,
piece_padded_size bigint not null,
piece_raw_size bigint not null,
complete boolean not null default false,
task_id bigint default null,
cleanup_task_id bigint default null,
foreign key (task_id) references harmony_task (id) on delete set null,
foreign key (cleanup_task_id) references harmony_task (id) on delete set null,
unique (piece_cid)
);
/*
* This table is used to keep track of the references to the parked pieces
* so that we can delete them when they are no longer needed.
*
* All references into the parked_pieces table should be done through this table.
*
* data_url is optional for refs which also act as data sources.
*/
create table parked_piece_refs (
ref_id bigserial primary key,
piece_id bigint not null,
data_url text,
data_headers jsonb not null default '{}',
foreign key (piece_id) references parked_pieces(id) on delete cascade
);

View File

@ -4,12 +4,23 @@
package resources
import (
"os"
"strconv"
"strings"
ffi "github.com/filecoin-project/filecoin-ffi"
)
func getGPUDevices() float64 { // GPU boolean
if nstr := os.Getenv("HARMONY_OVERRIDE_GPUS"); nstr != "" {
n, err := strconv.ParseFloat(nstr, 64)
if err != nil {
logger.Errorf("parsing HARMONY_OVERRIDE_GPUS failed: %+v", err)
} else {
return n
}
}
gpus, err := ffi.GetGPUDevices()
logger.Infow("GPUs", "list", gpus)
if err != nil {

View File

@ -330,6 +330,22 @@ documentation.`,
Comment: ``,
},
{
Name: "EnableParkPiece",
Type: "bool",
Comment: `EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching
pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is
only applicable when integrating with boost, and should be enabled on nodes which will hold deal data
from boost until sectors containing the related pieces have the TreeD/TreeR constructed.
Note that future Curio implementations will have a separate task type for fetching pieces from the internet.`,
},
{
Name: "ParkPieceMaxTasks",
Type: "int",
Comment: ``,
},
{
Name: "EnableSealSDR",
Type: "bool",

View File

@ -112,6 +112,14 @@ type CurioSubsystemsConfig struct {
EnableWinningPost bool
WinningPostMaxTasks int
// EnableParkPiece enables the "piece parking" task to run on this node. This task is responsible for fetching
// pieces from the network and storing them in the storage subsystem until sectors are sealed. This task is
// only applicable when integrating with boost, and should be enabled on nodes which will hold deal data
// from boost until sectors containing the related pieces have the TreeD/TreeR constructed.
// Note that future Curio implementations will have a separate task type for fetching pieces from the internet.
EnableParkPiece bool
ParkPieceMaxTasks int
// EnableSealSDR enables SDR tasks to run. SDR is the long sequential computation
// creating 11 layer files in sector cache directory.
//

View File

@ -12,7 +12,6 @@ import (
"github.com/gorilla/mux"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
@ -340,18 +339,5 @@ func (handler *FetchHandler) generatePoRepVanillaProof(w http.ResponseWriter, r
}
func FileTypeFromString(t string) (storiface.SectorFileType, error) {
switch t {
case storiface.FTUnsealed.String():
return storiface.FTUnsealed, nil
case storiface.FTSealed.String():
return storiface.FTSealed, nil
case storiface.FTCache.String():
return storiface.FTCache, nil
case storiface.FTUpdate.String():
return storiface.FTUpdate, nil
case storiface.FTUpdateCache.String():
return storiface.FTUpdateCache, nil
default:
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}
return storiface.TypeFromString(t)
}

View File

@ -747,6 +747,48 @@ func (r *Remote) Reader(ctx context.Context, s storiface.SectorRef, offset, size
return nil, nil
}
// ReaderSeq creates a simple sequential reader for a file. Does not work for
// file types which are a directory (e.g. FTCache).
func (r *Remote) ReaderSeq(ctx context.Context, s storiface.SectorRef, ft storiface.SectorFileType) (io.ReadCloser, error) {
paths, _, err := r.local.AcquireSector(ctx, s, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
return nil, xerrors.Errorf("acquire local: %w", err)
}
path := storiface.PathByType(paths, ft)
if path != "" {
return os.Open(path)
}
si, err := r.index.StorageFindSector(ctx, s.ID, ft, 0, false)
if err != nil {
log.Debugf("Reader, did not find file on any of the workers %s (%s)", path, ft.String())
return nil, err
}
if len(si) == 0 {
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
sort.Slice(si, func(i, j int) bool {
return si[i].Weight > si[j].Weight
})
for _, info := range si {
for _, url := range info.URLs {
rd, err := r.readRemote(ctx, url, 0, 0)
if err != nil {
log.Warnw("reading from remote", "url", url, "error", err)
continue
}
return rd, err
}
}
return nil, xerrors.Errorf("failed to read sector %v from remote(%d): %w", s, ft, storiface.ErrSectorNotFound)
}
func (r *Remote) Reserve(ctx context.Context, sid storiface.SectorRef, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
log.Warnf("reserve called on remote store, sectorID: %v", sid.ID)
return func() {

View File

@ -39,6 +39,9 @@ func (b *Provider) AcquireSector(ctx context.Context, id storiface.SectorRef, ex
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUpdateCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
if err := os.Mkdir(filepath.Join(b.Root, storiface.FTPiece.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return storiface.SectorPaths{}, nil, err
}
done := func() {}

View File

@ -9,16 +9,22 @@ import (
)
const (
// "regular" sectors
FTUnsealed SectorFileType = 1 << iota
FTSealed
FTCache
// snap
FTUpdate
FTUpdateCache
// Piece Park
FTPiece
FileTypes = iota
)
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache}
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache, FTPiece}
const (
FTNone SectorFileType = 0
@ -39,6 +45,7 @@ var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
FTUpdate: FSOverheadDen,
FTUpdateCache: FSOverheadDen*2 + 1,
FTCache: 141, // 11 layers + D(2x ssize) + C + R'
FTPiece: FSOverheadDen,
}
// sector size * disk / fs overhead. FSOverheadDen is like the unit of sector size
@ -49,6 +56,7 @@ var FsOverheadFinalized = map[SectorFileType]int{
FTUpdate: FSOverheadDen,
FTUpdateCache: 1,
FTCache: 1,
FTPiece: FSOverheadDen,
}
type SectorFileType int
@ -65,6 +73,8 @@ func TypeFromString(s string) (SectorFileType, error) {
return FTUpdate, nil
case "update-cache":
return FTUpdateCache, nil
case "piece":
return FTPiece, nil
default:
return 0, xerrors.Errorf("unknown sector file type '%s'", s)
}
@ -82,6 +92,8 @@ func (t SectorFileType) String() string {
return "update"
case FTUpdateCache:
return "update-cache"
case FTPiece:
return "piece"
default:
return fmt.Sprintf("<unknown %d %v>", t, (t & ((1 << FileTypes) - 1)).Strings())
}
@ -206,6 +218,7 @@ type SectorPaths struct {
Cache string
Update string
UpdateCache string
Piece string
}
func ParseSectorID(baseName string) (abi.SectorID, error) {
@ -242,6 +255,8 @@ func PathByType(sps SectorPaths, fileType SectorFileType) string {
return sps.Update
case FTUpdateCache:
return sps.UpdateCache
case FTPiece:
return sps.Piece
}
panic("requested unknown path type")
@ -259,5 +274,7 @@ func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) {
sps.Update = p
case FTUpdateCache:
sps.UpdateCache = p
case FTPiece:
sps.Piece = p
}
}

View File

@ -20,6 +20,17 @@ type SectorRef struct {
var NoSectorRef = SectorRef{}
// PieceNumber is a reference to a piece in the storage system; mapping between
// pieces in the storage system and piece CIDs is maintained by the storage index
type PieceNumber uint64
func (pn PieceNumber) Ref() SectorRef {
return SectorRef{
ID: abi.SectorID{Miner: 0, Number: abi.SectorNumber(pn)},
ProofType: abi.RegisteredSealProof_StackedDrg64GiBV1, // This only cares about TreeD which is the same for all sizes
}
}
type ProverPoSt interface {
GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error)
GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, ppt abi.RegisteredPoStProof, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error)