From 793d332002b17eda79a1cc38a3aedb04efde9bc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 26 May 2020 10:25:17 +0200 Subject: [PATCH] Move UnpaddedByteIndex from FFI wrapper --- ffiwrapper/partialfile.go | 10 ++++++---- ffiwrapper/sealer_cgo.go | 10 +++++----- ffiwrapper/types.go | 7 +++---- ffiwrapper/unseal_ranges.go | 4 +++- localworker.go | 4 ++-- manager.go | 8 ++++---- mock/mock.go | 3 ++- storiface/ffi.go | 2 ++ testworker_test.go | 5 ++--- 9 files changed, 29 insertions(+), 24 deletions(-) diff --git a/ffiwrapper/partialfile.go b/ffiwrapper/partialfile.go index a5b8f2548..a278e7347 100644 --- a/ffiwrapper/partialfile.go +++ b/ffiwrapper/partialfile.go @@ -10,6 +10,8 @@ import ( rlepluslazy "github.com/filecoin-project/go-bitfield/rle" "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/sector-storage/storiface" ) const veryLargeRle = 1 << 20 @@ -161,7 +163,7 @@ func (pf *partialFile) Close() error { return pf.file.Close() } -func (pf *partialFile) Writer(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.Writer, error) { +func (pf *partialFile) Writer(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (io.Writer, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -190,7 +192,7 @@ func (pf *partialFile) Writer(offset UnpaddedByteIndex, size abi.UnpaddedPieceSi return pf.file, nil } -func (pf *partialFile) MarkAllocated(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (pf *partialFile) MarkAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { have, err := pf.allocated.RunIterator() if err != nil { return err @@ -208,7 +210,7 @@ func (pf *partialFile) MarkAllocated(offset UnpaddedByteIndex, size abi.Unpadded return nil } -func (pf *partialFile) Reader(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) (*os.File, error) { +func (pf *partialFile) Reader(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (*os.File, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } @@ -241,7 +243,7 @@ func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) { return pf.allocated.RunIterator() } -func pieceRun(offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) rlepluslazy.RunIterator { +func pieceRun(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) rlepluslazy.RunIterator { var runs []rlepluslazy.Run if offset > 0 { runs = append(runs, rlepluslazy.Run{ diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index 1ffc10b72..382d3853f 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -100,7 +100,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie } } - w, err := stagedFile.Writer(UnpaddedByteIndex(offset), pieceSize) + w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset), pieceSize) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err) } @@ -115,7 +115,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err) } - if err := stagedFile.MarkAllocated(UnpaddedByteIndex(offset), pieceSize); err != nil { + if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset), pieceSize); err != nil { return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err) } @@ -136,7 +136,7 @@ func (cf closerFunc) Close() error { return cf() } -func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { +func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded() // try finding existing @@ -271,7 +271,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset U return xerrors.Errorf("piping output to unsealed file: %w", perr) } - if err := pf.MarkAllocated(UnpaddedByteIndex(at), abi.UnpaddedPieceSize(piece.Len)); err != nil { + if err := pf.MarkAllocated(storiface.UnpaddedByteIndex(at), abi.UnpaddedPieceSize(piece.Len)); err != nil { return xerrors.Errorf("marking unsealed range as allocated: %w", err) } @@ -283,7 +283,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset U return nil } -func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) if err != nil { return xerrors.Errorf("acquire unsealed sector path: %w", err) diff --git a/ffiwrapper/types.go b/ffiwrapper/types.go index 06c07b715..cf211056f 100644 --- a/ffiwrapper/types.go +++ b/ffiwrapper/types.go @@ -11,10 +11,9 @@ import ( "github.com/filecoin-project/sector-storage/ffiwrapper/basicfs" "github.com/filecoin-project/sector-storage/stores" + "github.com/filecoin-project/sector-storage/storiface" ) -type UnpaddedByteIndex uint64 - type Validator interface { CanCommit(sector stores.SectorPaths) (bool, error) CanProve(sector stores.SectorPaths) (bool, error) @@ -29,8 +28,8 @@ type Storage interface { storage.Prover StorageSealer - UnsealPiece(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error - ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) error + UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error + ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error } type Verifier interface { diff --git a/ffiwrapper/unseal_ranges.go b/ffiwrapper/unseal_ranges.go index 873ac45d0..522b58138 100644 --- a/ffiwrapper/unseal_ranges.go +++ b/ffiwrapper/unseal_ranges.go @@ -5,6 +5,8 @@ import ( "github.com/filecoin-project/go-bitfield/rle" "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/sector-storage/storiface" ) // merge gaps between ranges which are close to each other @@ -13,7 +15,7 @@ const mergeGaps = 32 << 20 // TODO const expandRuns = 16 << 20 // unseal more than requested for future requests -func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) { +func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) { todo := pieceRun(offset, size) todo, err := rlepluslazy.Subtract(todo, unsealed) if err != nil { diff --git a/localworker.go b/localworker.go index a01623bc1..710bd47fb 100644 --- a/localworker.go +++ b/localworker.go @@ -183,7 +183,7 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e return nil } -func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { +func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { sb, err := l.sb() if err != nil { return err @@ -200,7 +200,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde return nil } -func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { sb, err := l.sb() if err != nil { return err diff --git a/manager.go b/manager.go index 7f1cc4a4d..daa522c92 100644 --- a/manager.go +++ b/manager.go @@ -30,8 +30,8 @@ type Worker interface { ffiwrapper.StorageSealer Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error - UnsealPiece(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error - ReadPiece(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize) error + UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error + ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) @@ -49,7 +49,7 @@ type Worker interface { type SectorManager interface { SectorSize() abi.SectorSize - ReadPiece(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error + ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error ffiwrapper.StorageSealer storage.Prover @@ -190,7 +190,7 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am } } -func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { +func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error { best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) if err != nil { return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) diff --git a/mock/mock.go b/mock/mock.go index 6fad6bc20..403a1cbf6 100644 --- a/mock/mock.go +++ b/mock/mock.go @@ -17,6 +17,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/sector-storage/storiface" ) var log = logging.Logger("sbmock") @@ -267,7 +268,7 @@ func generateFakePoSt(sectorInfo []abi.SectorInfo) []abi.PoStProof { } } -func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error { +func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error { if len(mgr.sectors[sectorID].pieces) > 1 { panic("implme") } diff --git a/storiface/ffi.go b/storiface/ffi.go index 354b1e9c4..6821f9b35 100644 --- a/storiface/ffi.go +++ b/storiface/ffi.go @@ -3,3 +3,5 @@ package storiface import "errors" var ErrSectorNotFound = errors.New("sector not found") + +type UnpaddedByteIndex uint64 diff --git a/testworker_test.go b/testworker_test.go index 68d70c838..e61cf96ba 100644 --- a/testworker_test.go +++ b/testworker_test.go @@ -9,7 +9,6 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" - "github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/sector-storage/mock" "github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/stores" @@ -50,11 +49,11 @@ func (t *testWorker) NewSector(ctx context.Context, sector abi.SectorID) error { panic("implement me") } -func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { +func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { panic("implement me") } -func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { +func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { panic("implement me") }