v1.27.0-a #10
@ -11,6 +11,7 @@ import (
|
||||
"github.com/KarpelesLab/reflink"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/puzpuzpuz/xsync/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||
@ -18,6 +19,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
proof2 "github.com/filecoin-project/go-state-types/proof"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||
"github.com/filecoin-project/lotus/provider/lpproof"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/proofpaths"
|
||||
@ -46,6 +48,7 @@ func NewSealCalls(st paths.Store, ls *paths.Local, si paths.SectorIndex) *SealCa
|
||||
storage: st,
|
||||
localStore: ls,
|
||||
sindex: si,
|
||||
storageReservations: xsync.NewIntegerMapOf[harmonytask.TaskID, *StorageReservation](),
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -54,18 +57,40 @@ type storageProvider struct {
|
||||
storage paths.Store
|
||||
localStore *paths.Local
|
||||
sindex paths.SectorIndex
|
||||
storageReservations *xsync.MapOf[harmonytask.TaskID, *StorageReservation]
|
||||
}
|
||||
|
||||
func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
|
||||
paths, storageIDs, err := l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
|
||||
func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
|
||||
var paths, storageIDs storiface.SectorPaths
|
||||
var releaseStorage func()
|
||||
|
||||
var ok bool
|
||||
var resv *StorageReservation
|
||||
if taskID != nil {
|
||||
resv, ok = l.storageReservations.Load(*taskID)
|
||||
}
|
||||
if ok {
|
||||
if resv.Alloc != allocate || resv.Existing != existing {
|
||||
// this should never happen, only when task definition is wrong
|
||||
return storiface.SectorPaths{}, nil, xerrors.Errorf("storage reservation type mismatch")
|
||||
}
|
||||
|
||||
log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate)
|
||||
|
||||
paths = resv.Paths
|
||||
releaseStorage = resv.Release
|
||||
} else {
|
||||
var err error
|
||||
paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
|
||||
if err != nil {
|
||||
return storiface.SectorPaths{}, nil, err
|
||||
}
|
||||
|
||||
releaseStorage, err := l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
|
||||
releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal)
|
||||
if err != nil {
|
||||
return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
|
||||
|
||||
@ -85,8 +110,8 @@ func (l *storageProvider) AcquireSector(ctx context.Context, sector storiface.Se
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
|
||||
func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, sector storiface.SectorRef, ticket abi.SealRandomness, commKcid cid.Cid) error {
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, &taskID, sector, storiface.FTNone, storiface.FTCache, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquiring sector paths: %w", err)
|
||||
}
|
||||
@ -120,7 +145,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size
|
||||
maybeUns := storiface.FTNone
|
||||
// todo sectors with data
|
||||
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, maybeUns, storiface.PathSealing)
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, maybeUns, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
|
||||
}
|
||||
@ -135,7 +160,7 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns
|
||||
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
|
||||
}
|
||||
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
|
||||
}
|
||||
@ -331,7 +356,7 @@ func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.Sector
|
||||
alloc = storiface.FTUnsealed
|
||||
}
|
||||
|
||||
sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, alloc, storiface.PathSealing)
|
||||
sectorPaths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, alloc, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquiring sector paths: %w", err)
|
||||
}
|
||||
|
241
provider/lpffi/task_storage.go
Normal file
241
provider/lpffi/task_storage.go
Normal file
@ -0,0 +1,241 @@
|
||||
package lpffi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||
"github.com/filecoin-project/lotus/lib/must"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type SectorRef struct {
|
||||
SpID int64 `db:"sp_id"`
|
||||
SectorNumber int64 `db:"sector_number"`
|
||||
RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"`
|
||||
}
|
||||
|
||||
func (sr SectorRef) ID() abi.SectorID {
|
||||
return abi.SectorID{
|
||||
Miner: abi.ActorID(sr.SpID),
|
||||
Number: abi.SectorNumber(sr.SectorNumber),
|
||||
}
|
||||
}
|
||||
|
||||
func (sr SectorRef) Ref() storiface.SectorRef {
|
||||
return storiface.SectorRef{
|
||||
ID: sr.ID(),
|
||||
ProofType: sr.RegSealProof,
|
||||
}
|
||||
}
|
||||
|
||||
type TaskStorage struct {
|
||||
sc *SealCalls
|
||||
|
||||
alloc, existing storiface.SectorFileType
|
||||
ssize abi.SectorSize
|
||||
pathType storiface.PathType
|
||||
|
||||
taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error)
|
||||
}
|
||||
|
||||
type ReleaseStorageFunc func() // free storage reservation
|
||||
|
||||
type StorageReservation struct {
|
||||
SectorRef SectorRef
|
||||
Release ReleaseStorageFunc
|
||||
Paths storiface.SectorPaths
|
||||
PathIDs storiface.SectorPaths
|
||||
|
||||
Alloc, Existing storiface.SectorFileType
|
||||
}
|
||||
|
||||
func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) *TaskStorage {
|
||||
return &TaskStorage{
|
||||
sc: sb,
|
||||
alloc: alloc,
|
||||
existing: existing,
|
||||
ssize: ssize,
|
||||
pathType: pathType,
|
||||
taskToSectorRef: taskToSectorRef,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TaskStorage) HasCapacity() bool {
|
||||
ctx := context.Background()
|
||||
|
||||
paths, err := t.sc.sectors.sindex.StorageBestAlloc(ctx, t.alloc, t.ssize, t.pathType)
|
||||
if err != nil {
|
||||
log.Errorf("finding best alloc in HasCapacity: %+v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
local, err := t.sc.sectors.localStore.Local(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("getting local storage: %+v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
for _, path := range paths {
|
||||
if t.pathType == storiface.PathStorage && !path.CanStore {
|
||||
continue // we want to store, and this isn't a store path
|
||||
}
|
||||
if t.pathType == storiface.PathSealing && !path.CanSeal {
|
||||
continue // we want to seal, and this isn't a seal path
|
||||
}
|
||||
|
||||
// check if this path is on this node
|
||||
var found bool
|
||||
for _, storagePath := range local {
|
||||
if storagePath.ID == path.ID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
// this path isn't on this node
|
||||
continue
|
||||
}
|
||||
|
||||
// StorageBestAlloc already checks that there is enough space; Not atomic like reserving space, but it's
|
||||
// good enough for HasCapacity
|
||||
return true
|
||||
}
|
||||
|
||||
return false // no path found
|
||||
}
|
||||
|
||||
func (t *TaskStorage) Claim(taskID int) error {
|
||||
ctx := context.Background()
|
||||
|
||||
sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting sector ref: %w", err)
|
||||
}
|
||||
|
||||
// storage writelock sector
|
||||
lkctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
allocate := storiface.FTCache
|
||||
|
||||
lockAcquireTimuout := time.Second * 10
|
||||
lockAcquireTimer := time.NewTimer(lockAcquireTimuout)
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case <-lockAcquireTimer.C:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
|
||||
if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, allocate); err != nil {
|
||||
// timer will expire
|
||||
return xerrors.Errorf("claim StorageLock: %w", err)
|
||||
}
|
||||
|
||||
if !lockAcquireTimer.Stop() {
|
||||
// timer expired, so lkctx is done, and that means the lock was acquired and dropped..
|
||||
return xerrors.Errorf("failed to acquire lock")
|
||||
}
|
||||
defer func() {
|
||||
// make sure we release the sector lock
|
||||
lockAcquireTimer.Reset(0)
|
||||
}()
|
||||
|
||||
// find anywhere
|
||||
// if found return nil, for now
|
||||
s, err := t.sc.sectors.sindex.StorageFindSector(ctx, sectorRef.ID(), allocate, must.One(sectorRef.RegSealProof.SectorSize()), false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("claim StorageFindSector: %w", err)
|
||||
}
|
||||
|
||||
lp, err := t.sc.sectors.localStore.Local(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// see if there are any non-local sector files in storage
|
||||
for _, info := range s {
|
||||
for _, l := range lp {
|
||||
if l.ID == info.ID {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Create reservation for fetching; This will require quite a bit more refactoring, but for now we'll
|
||||
// only care about new allocations
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// acquire a path to make a reservation in
|
||||
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, allocate, storiface.PathSealing, storiface.AcquireMove)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// reserve the space
|
||||
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), allocate, pathIDs, storiface.FSOverheadSeal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sres := &StorageReservation{
|
||||
SectorRef: sectorRef,
|
||||
Release: release,
|
||||
Paths: pathsFs,
|
||||
PathIDs: pathIDs,
|
||||
|
||||
Alloc: t.alloc,
|
||||
Existing: t.existing,
|
||||
}
|
||||
|
||||
t.sc.sectors.storageReservations.Store(harmonytask.TaskID(taskID), sres)
|
||||
|
||||
log.Debugw("claimed storage", "task_id", taskID, "sector", sectorRef.ID(), "paths", pathsFs)
|
||||
|
||||
// note: we drop the sector writelock on return; THAT IS INTENTIONAL, this code runs in CanAccept, which doesn't
|
||||
// guarantee that the work for this sector will happen on this node; SDR CanAccept just ensures that the node can
|
||||
// run the job, harmonytask is what ensures that only one SDR runs at a time
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TaskStorage) MarkComplete(taskID int) error {
|
||||
// MarkComplete is ALWAYS called after the task is done or not scheduled
|
||||
// If Claim is called and returns without errors, MarkComplete with the same
|
||||
// taskID is guaranteed to eventually be called
|
||||
|
||||
sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting sector ref: %w", err)
|
||||
}
|
||||
|
||||
sres, ok := t.sc.sectors.storageReservations.Load(harmonytask.TaskID(taskID))
|
||||
if !ok {
|
||||
return xerrors.Errorf("no reservation found for task %d", taskID)
|
||||
}
|
||||
|
||||
if sectorRef != sres.SectorRef {
|
||||
return xerrors.Errorf("reservation sector ref doesn't match task sector ref: %+v != %+v", sectorRef, sres.SectorRef)
|
||||
}
|
||||
|
||||
log.Debugw("marking storage complete", "task_id", taskID, "sector", sectorRef.ID(), "paths", sres.Paths)
|
||||
|
||||
// remove the reservation
|
||||
t.sc.sectors.storageReservations.Delete(harmonytask.TaskID(taskID))
|
||||
|
||||
// release the reservation
|
||||
sres.Release()
|
||||
|
||||
// note: this only frees the reservation, allocated sectors are declared in AcquireSector which is aware of
|
||||
// the reservation
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ resources.Storage = &TaskStorage{}
|
@ -146,7 +146,7 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
|
||||
// Trees; After one retry, it should return the sector to the
|
||||
// SDR stage; max number of retries should be configurable
|
||||
|
||||
err = s.sc.GenerateSDR(ctx, sref, ticket, commd)
|
||||
err = s.sc.GenerateSDR(ctx, taskID, sref, ticket, commd)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("generating sdr: %w", err)
|
||||
}
|
||||
@ -194,6 +194,11 @@ func (s *SDRTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEn
|
||||
}
|
||||
|
||||
func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||
ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size
|
||||
if isDevnet {
|
||||
ssize = abi.SectorSize(2 << 20)
|
||||
}
|
||||
|
||||
res := harmonytask.TaskTypeDetails{
|
||||
Max: s.max,
|
||||
Name: "SDR",
|
||||
@ -201,6 +206,7 @@ func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||
Cpu: 4, // todo multicore sdr
|
||||
Gpu: 0,
|
||||
Ram: 54 << 30,
|
||||
Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing),
|
||||
},
|
||||
MaxFailures: 2,
|
||||
Follows: nil,
|
||||
@ -217,4 +223,19 @@ func (s *SDRTask) Adder(taskFunc harmonytask.AddTaskFunc) {
|
||||
s.sp.pollers[pollerSDR].Set(taskFunc)
|
||||
}
|
||||
|
||||
func (s *SDRTask) taskToSector(id harmonytask.TaskID) (lpffi.SectorRef, error) {
|
||||
var refs []lpffi.SectorRef
|
||||
|
||||
err := s.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_sdr = $1`, id)
|
||||
if err != nil {
|
||||
return lpffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err)
|
||||
}
|
||||
|
||||
if len(refs) != 1 {
|
||||
return lpffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs))
|
||||
}
|
||||
|
||||
return refs[0], nil
|
||||
}
|
||||
|
||||
var _ harmonytask.TaskInterface = &SDRTask{}
|
||||
|
Loading…
Reference in New Issue
Block a user