diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 3db7ac9ec..d3fef8533 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -208,6 +208,7 @@ func (m *Manager) schedFetch(sector storage.SectorRef, ft storiface.SectorFileTy func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error { return func(ctx context.Context, w Worker) error { + log.Debugf("read piece data from sector %d, offset %d, size %d", sector.ID, offset, size) r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size)) if err != nil { return err @@ -215,6 +216,7 @@ func (m *Manager) readPiece(sink io.Writer, sector storage.SectorRef, offset sto if r != nil { *rok = r.(bool) } + log.Debugf("completed read piece data from sector %d, offset %d, size %d: read ok? %t", sector.ID, offset, size, *rok) return nil } } @@ -225,11 +227,13 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect ctx, cancel := context.WithCancel(ctx) defer cancel() + log.Debugf("acquire read sector lock for sector %d", sector.ID) if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed, storiface.FTNone); err != nil { returnErr = xerrors.Errorf("acquiring read sector lock: %w", err) return } + log.Debugf("find unsealed sector %d", sector.ID) // passing 0 spt because we only need it when allowFetch is true best, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) if err != nil { @@ -240,41 +244,49 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect foundUnsealed = len(best) > 0 if foundUnsealed { // append to existing // There is unsealed sector, see if we can read from it + log.Debugf("found unsealed sector %d", sector.ID) selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false) + log.Debugf("scheduling read of unsealed sector %d", sector.ID) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err) } } else { + log.Debugf("did not find unsealed sector %d", sector.ID) selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing) } return } func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { + log.Debugf("fetch and read piece in sector %d, offset %d, size %d", sector.ID, offset, size) foundUnsealed, readOk, selector, err := m.tryReadUnsealedPiece(ctx, sink, sector, offset, size) if err != nil { return err } if readOk { + log.Debugf("completed read of unsealed piece in sector %d, offset %d, size %d", sector.ID, offset, size) return nil } ctx, cancel := context.WithCancel(ctx) defer cancel() + log.Debugf("acquire unseal sector lock for sector %d", sector.ID) if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil { return xerrors.Errorf("acquiring unseal sector lock: %w", err) } unsealFetch := func(ctx context.Context, worker Worker) error { + log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { return xerrors.Errorf("copy sealed/cache sector data: %w", err) } if foundUnsealed { + log.Debugf("copy unsealed sector data for sector %d", sector.ID) if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil { return xerrors.Errorf("copy unsealed sector data: %w", err) } @@ -291,13 +303,16 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage. return xerrors.Errorf("getting sector size: %w", err) } + log.Debugf("schedule unseal for sector %d", sector.ID) err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error { // TODO: make restartable // NOTE: we're unsealing the whole sector here as with SDR we can't really // unseal the sector partially. Requesting the whole sector here can // save us some work in case another piece is requested from here + log.Debugf("unseal sector %d", sector.ID) _, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, 0, abi.PaddedPieceSize(ssize).Unpadded(), ticket, unsealed)) + log.Debugf("completed unseal sector %d", sector.ID) return err }) if err != nil { @@ -306,6 +321,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage. selector = newExistingSelector(m.index, sector.ID, storiface.FTUnsealed, false) + log.Debugf("schedule read piece for sector %d, offset %d, size %d", sector.ID, offset, size) err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { @@ -316,6 +332,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage. return xerrors.Errorf("failed to read unsealed piece") } + log.Debugf("completed read of piece in sector %d, offset %d, size %d", sector.ID, offset, size) return nil } diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index 557dd3b6d..e58257c8a 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -47,6 +47,8 @@ func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, min } func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length) + si, err := rpn.miner.GetSectorInfo(sectorID) if err != nil { return nil, err @@ -73,7 +75,9 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi if si.CommD != nil { commD = *si.CommD } + // Read the piece into the pipe's writer, unsealing the piece if necessary + log.Debugf("read piece in sector %d, offset %d, length %d from miner %d", sectorID, offset, length, mid) err := rpn.sealer.ReadPiece(ctx, w, ref, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD) if err != nil { log.Errorf("failed to unseal piece from sector %d: %s", sectorID, err)