Merge pull request #6090 from filecoin-project/feat/rtvl-prov-unseal-logging
Add more debug logging for unsealing
This commit is contained in:
commit
ad3c588582
17
extern/sector-storage/manager.go
vendored
17
extern/sector-storage/manager.go
vendored
@ -208,6 +208,7 @@ func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileTy
|
||||
|
||||
func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error {
|
||||
return func(ctx context.Context, w Worker) error {
|
||||
log.Debugf("read piece data from sector %d, offset %d, size %d", sector.ID, offset, size)
|
||||
r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size))
|
||||
if err != nil {
|
||||
return err
|
||||
@ -215,6 +216,7 @@ func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset sto
|
||||
if r != nil {
|
||||
*rok = r.(bool)
|
||||
}
|
||||
log.Debugf("completed read piece data from sector %d, offset %d, size %d: read ok? %t", sector.ID, offset, size, *rok)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -225,11 +227,13 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
log.Debugf("acquire read sector lock for sector %d", sector.ID)
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil {
|
||||
returnErr = xerrors.Errorf("acquiring read sector lock: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("find unsealed sector %d", sector.ID)
|
||||
// passing 0 spt because we only need it when allowFetch is true
|
||||
best, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||
if err != nil {
|
||||
@ -240,41 +244,49 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect
|
||||
foundUnsealed = len(best) > 0
|
||||
if foundUnsealed { // append to existing
|
||||
// There is unsealed sector, see if we can read from it
|
||||
log.Debugf("found unsealed sector %d", sector.ID)
|
||||
|
||||
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
|
||||
|
||||
log.Debugf("scheduling read of unsealed sector %d", sector.ID)
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||
m.readPiece(sink, sector, offset, size, &readOk))
|
||||
if err != nil {
|
||||
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||
}
|
||||
} else {
|
||||
log.Debugf("did not find unsealed sector %d", sector.ID)
|
||||
selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
|
||||
log.Debugf("fetch and read piece in sector %d, offset %d, size %d", sector.ID, offset, size)
|
||||
foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if readOk {
|
||||
log.Debugf("completed read of unsealed piece in sector %d, offset %d, size %d", sector.ID, offset, size)
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
log.Debugf("acquire unseal sector lock for sector %d", sector.ID)
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil {
|
||||
return xerrors.Errorf("acquiring unseal sector lock: %w", err)
|
||||
}
|
||||
|
||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
|
||||
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
|
||||
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
||||
}
|
||||
|
||||
if foundUnsealed {
|
||||
log.Debugf("copy unsealed sector data for sector %d", sector.ID)
|
||||
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil {
|
||||
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
||||
}
|
||||
@ -291,13 +303,16 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
|
||||
return xerrors.Errorf("getting sector size: %w", err)
|
||||
}
|
||||
|
||||
log.Debugf("schedule unseal for sector %d", sector.ID)
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
|
||||
// TODO: make restartable
|
||||
|
||||
// NOTE: we're unsealing the whole sector here as with SDR we can't really
|
||||
// unseal the sector partially. Requesting the whole sector here can
|
||||
// save us some work in case another piece is requested from here
|
||||
log.Debugf("unseal sector %d", sector.ID)
|
||||
_, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, unsealed))
|
||||
log.Debugf("completed unseal sector %d", sector.ID)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@ -306,6 +321,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
|
||||
|
||||
selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false)
|
||||
|
||||
log.Debugf("schedule read piece for sector %d, offset %d, size %d", sector.ID, offset, size)
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||
m.readPiece(sink, sector, offset, size, &readOk))
|
||||
if err != nil {
|
||||
@ -316,6 +332,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.
|
||||
return xerrors.Errorf("failed to read unsealed piece")
|
||||
}
|
||||
|
||||
log.Debugf("completed read of piece in sector %d, offset %d, size %d", sector.ID, offset, size)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -47,6 +47,8 @@ func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, min
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length)
|
||||
|
||||
si, err := rpn.miner.GetSectorInfo(sectorID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -73,7 +75,9 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi
|
||||
if si.CommD != nil {
|
||||
commD = *si.CommD
|
||||
}
|
||||
|
||||
// Read the piece into the pipe's writer, unsealing the piece if necessary
|
||||
log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid)
|
||||
err := rpn.sealer.ReadPiece(ctx, w, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
|
||||
if err != nil {
|
||||
log.Errorf("failed to unseal piece from sector %d: %s", sectorID, err)
|
||||
|
Loading…
Reference in New Issue
Block a user