Merge pull request #34 from filecoin-project/feat/unseal

Unseal support
This commit is contained in:
Łukasz Magiera 2020-05-29 19:52:41 +02:00 committed by GitHub
commit 9df0cdf193
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1809 additions and 189 deletions

View File

@ -21,7 +21,7 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredProof, se
// TODO: More better checks // TODO: More better checks
for _, sector := range sectors { for _, sector := range sectors {
err := func() error { err := func() error {
lp, _, done, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false) lp, _, done, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire sector in checkProvable: %w", err) return xerrors.Errorf("acquire sector in checkProvable: %w", err)
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
) )
type sectorFile struct { type sectorFile struct {
@ -63,13 +64,22 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing
return stores.SectorPaths{}, nil, ctx.Err() return stores.SectorPaths{}, nil, ctx.Err()
} }
path := filepath.Join(b.Root, fileType.String(), stores.SectorName(id))
prevDone := done prevDone := done
done = func() { done = func() {
prevDone() prevDone()
<-ch <-ch
} }
stores.SetPathByType(&out, fileType, filepath.Join(b.Root, fileType.String(), stores.SectorName(id))) if !allocate.Has(fileType) {
if _, err := os.Stat(path); os.IsNotExist(err) {
done()
return stores.SectorPaths{}, nil, storiface.ErrSectorNotFound
}
}
stores.SetPathByType(&out, fileType, path)
} }
return out, done, nil return out, done, nil

View File

@ -8,7 +8,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { func ToReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File) f, ok := r.(*os.File)
if ok { if ok {
return f, func() error { return nil }, nil return f, func() error { return nil }, nil

262
ffiwrapper/partialfile.go Normal file
View File

@ -0,0 +1,262 @@
package ffiwrapper
import (
"encoding/binary"
"io"
"os"
"github.com/detailyang/go-fallocate"
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/storiface"
)
const veryLargeRle = 1 << 20
// Sectors can be partially unsealed. We support this by appending a small
// trailer to each unsealed sector file containing an RLE+ marking which bytes
// in a sector are unsealed, and which are not (holes)
// unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type partialFile struct {
maxPiece abi.PaddedPieceSize
path string
allocated rlepluslazy.RLE
file *os.File
}
func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) error {
trailer, err := rlepluslazy.EncodeRuns(r, nil)
if err != nil {
return xerrors.Errorf("encoding trailer: %w", err)
}
// maxPieceSize == unpadded(sectorSize) == trailer start
if _, err := w.Seek(maxPieceSize, io.SeekStart); err != nil {
return xerrors.Errorf("seek to trailer start: %w", err)
}
rb, err := w.Write(trailer)
if err != nil {
return xerrors.Errorf("writing trailer data: %w", err)
}
if err := binary.Write(w, binary.LittleEndian, uint32(len(trailer))); err != nil {
return xerrors.Errorf("writing trailer length: %w", err)
}
return w.Truncate(maxPieceSize + int64(rb) + 4)
}
func createPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
err = func() error {
err := fallocate.Fallocate(f, 0, int64(maxPieceSize))
if err != nil {
return xerrors.Errorf("fallocate '%s': %w", path, err)
}
if err := writeTrailer(int64(maxPieceSize), f, &rlepluslazy.RunSliceIterator{}); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}()
if err != nil {
f.Close()
return nil, err
}
if err := f.Close(); err != nil {
return nil, xerrors.Errorf("close empty partial file: %w", err)
}
return openPartialFile(maxPieceSize, path)
}
func openPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644)
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
var rle rlepluslazy.RLE
err = func() error {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat '%s': %w", path, err)
}
if st.Size() < int64(maxPieceSize) {
return xerrors.Errorf("sector file '%s' was smaller than the sector size %d < %d", path, st.Size(), maxPieceSize)
}
// read trailer
var tlen [4]byte
_, err = f.ReadAt(tlen[:], st.Size()-int64(len(tlen)))
if err != nil {
return xerrors.Errorf("reading trailer length: %w", err)
}
// sanity-check the length
trailerLen := binary.LittleEndian.Uint32(tlen[:])
expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize)
if expectLen != st.Size() {
return xerrors.Errorf("file '%d' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen)+int64(len(tlen)), maxPieceSize)
}
if trailerLen > veryLargeRle {
log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen)
}
trailerStart := st.Size() - int64(len(tlen)) - int64(trailerLen)
if trailerStart != int64(maxPieceSize) {
return xerrors.Errorf("expected sector size to equal trailer start index")
}
trailerBytes := make([]byte, trailerLen)
_, err = f.ReadAt(trailerBytes, trailerStart)
if err != nil {
return xerrors.Errorf("reading trailer: %w", err)
}
rle, err = rlepluslazy.FromBuf(trailerBytes)
if err != nil {
return xerrors.Errorf("decoding trailer: %w", err)
}
it, err := rle.RunIterator()
if err != nil {
return xerrors.Errorf("getting trailer run iterator: %w", err)
}
lastSet, err := rlepluslazy.LastIndex(it, true)
if err != nil {
return xerrors.Errorf("finding last set byte index: %w", err)
}
if lastSet > uint64(maxPieceSize) {
return xerrors.Errorf("last set byte at index higher than sector size: %d > %d", lastSet, maxPieceSize)
}
return nil
}()
if err != nil {
f.Close()
return nil, err
}
return &partialFile{
maxPiece: maxPieceSize,
path: path,
allocated: rle,
file: f,
}, nil
}
func (pf *partialFile) Close() error {
return pf.file.Close()
}
func (pf *partialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c > 0 {
log.Warnf("getting partial file writer overwriting %d allocated bytes", c)
}
}
return pf.file, nil
}
func (pf *partialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
ored, err := rlepluslazy.Or(have, pieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, ored); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
func (pf *partialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c != uint64(size) {
log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size)-c)
}
}
return pf.file, nil
}
func (pf *partialFile) Allocated() (rlepluslazy.RunIterator, error) {
return pf.allocated.RunIterator()
}
func pieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run
if offset > 0 {
runs = append(runs, rlepluslazy.Run{
Val: false,
Len: uint64(offset),
})
}
runs = append(runs, rlepluslazy.Run{
Val: true,
Len: uint64(size),
})
return &rlepluslazy.RunSliceIterator{Runs: runs}
}

View File

@ -3,10 +3,14 @@
package ffiwrapper package ffiwrapper
import ( import (
"bufio"
"context" "context"
"io" "io"
"io/ioutil"
"math/bits" "math/bits"
"os" "os"
"runtime"
"syscall"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -15,7 +19,9 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/sector-storage/fr32"
"github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
"github.com/filecoin-project/sector-storage/zerocomm" "github.com/filecoin-project/sector-storage/zerocomm"
) )
@ -46,13 +52,20 @@ func (sb *Sealer) NewSector(ctx context.Context, sector abi.SectorID) error {
} }
func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) { func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
f, werr, err := toReadableFile(file, int64(pieceSize)) var offset abi.UnpaddedPieceSize
if err != nil { for _, size := range existingPieceSizes {
return abi.PieceInfo{}, err offset += size
} }
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
if offset.Padded()+pieceSize.Padded() > maxPieceSize {
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
}
var err error
var done func() var done func()
var stagedFile *os.File var stagedFile *partialFile
defer func() { defer func() {
if done != nil { if done != nil {
@ -73,9 +86,9 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
} }
stagedFile, err = os.Create(stagedPath.Unsealed) stagedFile, err = createPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err) return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
} }
} else { } else {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, true) stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, true)
@ -83,24 +96,45 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
} }
stagedFile, err = os.OpenFile(stagedPath.Unsealed, os.O_RDWR, 0644) stagedFile, err = openPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err) return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err)
}
if _, err := stagedFile.Seek(0, io.SeekEnd); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("seek end: %w", err)
} }
} }
_, _, pieceCID, err := ffi.WriteWithAlignment(sb.sealProofType, f, pieceSize, stagedFile, existingPieceSizes) w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded())
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
}
pw, err := fr32.NewPadWriter(w)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("creating padded reader: %w", err)
}
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
prf, werr, err := ToReadableFile(pr, int64(pieceSize))
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting tee reader pipe: %w", err)
}
pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err)
}
if err := pw.Close(); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("closing padded writer: %w", err)
}
if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}
if err := stagedFile.Close(); err != nil {
return abi.PieceInfo{}, err return abi.PieceInfo{}, err
} }
stagedFile = nil
if err := f.Close(); err != nil {
return abi.PieceInfo{}, err
}
return abi.PieceInfo{ return abi.PieceInfo{
Size: pieceSize.Padded(), Size: pieceSize.Padded(),
@ -114,82 +148,201 @@ func (cf closerFunc) Close() error {
return cf() return cf()
} }
func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) { func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
// try finding existing
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
var pf *partialFile
switch {
case xerrors.Is(err, storiface.ErrSectorNotFound):
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTNone, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
}
defer done()
pf, err = createPartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("create unsealed file: %w", err)
}
case err == nil:
defer done()
pf, err = openPartialFile(maxPieceSize, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("opening partial file: %w", err)
}
default:
return xerrors.Errorf("acquire unsealed sector path (existing): %w", err)
}
defer pf.Close()
allocated, err := pf.Allocated()
if err != nil {
return xerrors.Errorf("getting bitruns of allocated data: %w", err)
}
toUnseal, err := computeUnsealRanges(allocated, offset, size)
if err != nil {
return xerrors.Errorf("computing unseal ranges: %w", err)
}
if !toUnseal.HasNext() {
return nil
}
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache|stores.FTSealed, stores.FTNone, false)
if err != nil {
return xerrors.Errorf("acquire sealed sector paths: %w", err)
}
defer srcDone()
var at, nextat abi.PaddedPieceSize
for {
piece, err := toUnseal.NextRun()
if err != nil {
return xerrors.Errorf("getting next range to unseal: %w", err)
}
at = nextat
nextat += abi.PaddedPieceSize(piece.Len)
if !piece.Val {
continue
}
out, err := pf.Writer(offset.Padded(), size.Padded())
if err != nil {
return xerrors.Errorf("getting partial file writer: %w", err)
}
// <eww>
outpipe, err := ioutil.TempFile(os.TempDir(), "sector-storage-unseal-")
if err != nil {
return xerrors.Errorf("creating temp pipe file: %w", err)
}
var outpath string
var perr error
outWait := make(chan struct{})
{ {
path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) outpath = outpipe.Name()
if err := outpipe.Close(); err != nil {
return xerrors.Errorf("close pipe temp: %w", err)
}
if err := os.Remove(outpath); err != nil {
return xerrors.Errorf("rm pipe temp: %w", err)
}
// TODO: Make UnsealRange write to an FD
if err := syscall.Mkfifo(outpath, 0600); err != nil {
return xerrors.Errorf("mk temp fifo: %w", err)
}
go func() {
defer close(outWait)
defer os.Remove(outpath)
outpipe, err = os.OpenFile(outpath, os.O_RDONLY, 0600)
if err != nil { if err != nil {
return nil, xerrors.Errorf("acquire unsealed sector path: %w", err) perr = xerrors.Errorf("open temp pipe: %w", err)
return
} }
defer outpipe.Close()
f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644) padreader, err := fr32.NewPadReader(outpipe, abi.PaddedPieceSize(piece.Len).Unpadded())
if err == nil {
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
doneUnsealed()
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: closerFunc(func() error {
doneUnsealed()
return f.Close()
}),
}, nil
}
doneUnsealed()
if !os.IsNotExist(err) {
return nil, err
}
}
paths, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed, false)
if err != nil { if err != nil {
return nil, xerrors.Errorf("acquire sealed/cache sector path: %w", err) perr = xerrors.Errorf("creating new padded reader: %w", err)
return
} }
defer doneSealed()
// TODO: GC for those bsize := uint64(size.Padded())
// (Probably configurable count of sectors to be kept unsealed, and just if bsize > uint64(runtime.NumCPU())*fr32.MTTresh {
// remove last used one (or use whatever other cache policy makes sense)) bsize = uint64(runtime.NumCPU()) * fr32.MTTresh
err = ffi.Unseal( }
sb.sealProofType,
paths.Cache, padreader = bufio.NewReaderSize(padreader, int(bsize))
paths.Sealed,
paths.Unsealed, _, perr = io.CopyN(out, padreader, int64(size.Padded()))
}()
}
// </eww>
// TODO: This may be possible to do in parallel
err = ffi.UnsealRange(sb.sealProofType,
srcPaths.Cache,
srcPaths.Sealed,
outpath,
sector.Number, sector.Number,
sector.Miner, sector.Miner,
ticket, randomness,
unsealedCID, commd,
) uint64(at.Unpadded()),
uint64(abi.PaddedPieceSize(piece.Len).Unpadded()))
if err != nil { if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err) return xerrors.Errorf("unseal range: %w", err)
} }
f, err := os.OpenFile(paths.Unsealed, os.O_RDONLY, 0644) select {
case <-outWait:
case <-ctx.Done():
return ctx.Err()
}
if perr != nil {
return xerrors.Errorf("piping output to unsealed file: %w", perr)
}
if err := pf.MarkAllocated(storiface.PaddedByteIndex(at), abi.PaddedPieceSize(piece.Len)); err != nil {
return xerrors.Errorf("marking unsealed range as allocated: %w", err)
}
if !toUnseal.HasNext() {
break
}
}
return nil
}
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
if err != nil { if err != nil {
return nil, err return xerrors.Errorf("acquire unsealed sector path: %w", err)
}
defer done()
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
pf, err := openPartialFile(maxPieceSize, path.Unsealed)
if xerrors.Is(err, os.ErrNotExist) {
return xerrors.Errorf("opening partial file: %w", err)
} }
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { f, err := pf.Reader(offset.Padded(), size.Padded())
return nil, xerrors.Errorf("seek: %w", err) if err != nil {
pf.Close()
return xerrors.Errorf("getting partial file reader: %w", err)
} }
lr := io.LimitReader(f, int64(size)) upr, err := fr32.NewUnpadReader(f, size.Padded())
if err != nil {
return xerrors.Errorf("creating unpadded reader: %w", err)
}
return &struct { if _, err := io.CopyN(writer, upr, int64(size)); err != nil {
io.Reader pf.Close()
io.Closer return xerrors.Errorf("reading unsealed file: %w", err)
}{ }
Reader: lr,
Closer: f, if err := pf.Close(); err != nil {
}, nil return xerrors.Errorf("closing partial file: %w", err)
}
return nil
} }
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
@ -309,7 +462,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID) error
} }
func GeneratePieceCIDFromFile(proofType abi.RegisteredProof, piece io.Reader, pieceSize abi.UnpaddedPieceSize) (cid.Cid, error) { func GeneratePieceCIDFromFile(proofType abi.RegisteredProof, piece io.Reader, pieceSize abi.UnpaddedPieceSize) (cid.Cid, error) {
f, werr, err := toReadableFile(piece, int64(pieceSize)) f, werr, err := ToReadableFile(piece, int64(pieceSize))
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
} }

View File

@ -1,6 +1,7 @@
package ffiwrapper package ffiwrapper
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
@ -13,20 +14,26 @@ import (
"time" "time"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors" "golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
paramfetch "github.com/filecoin-project/go-paramfetch" paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/sector-storage/stores"
) )
func init() { func init() {
logging.SetLogLevel("*", "INFO") //nolint: errcheck logging.SetLogLevel("*", "DEBUG") //nolint: errcheck
} }
var sectorSize = abi.SectorSize(2048)
var sealProofType = abi.RegisteredProof_StackedDRG2KiBSeal var sealProofType = abi.RegisteredProof_StackedDRG2KiBSeal
var sectorSize, _ = sealProofType.SectorSize()
var sealRand = abi.SealRandomness{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2}
type seal struct { type seal struct {
id abi.SectorID id abi.SectorID
@ -35,18 +42,22 @@ type seal struct {
ticket abi.SealRandomness ticket abi.SealRandomness
} }
func data(sn abi.SectorNumber, dlen abi.UnpaddedPieceSize) io.Reader {
return io.LimitReader(rand.New(rand.NewSource(42+int64(sn))), int64(dlen))
}
func (s *seal) precommit(t *testing.T, sb *Sealer, id abi.SectorID, done func()) { func (s *seal) precommit(t *testing.T, sb *Sealer, id abi.SectorID, done func()) {
defer done() defer done()
dlen := abi.PaddedPieceSize(sectorSize).Unpadded() dlen := abi.PaddedPieceSize(sectorSize).Unpadded()
var err error var err error
r := io.LimitReader(rand.New(rand.NewSource(42+int64(id.Number))), int64(dlen)) r := data(id.Number, dlen)
s.pi, err = sb.AddPiece(context.TODO(), id, []abi.UnpaddedPieceSize{}, dlen, r) s.pi, err = sb.AddPiece(context.TODO(), id, []abi.UnpaddedPieceSize{}, dlen, r)
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
s.ticket = abi.SealRandomness{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2} s.ticket = sealRand
p1, err := sb.SealPreCommit1(context.TODO(), id, s.ticket, []abi.PieceInfo{s.pi}) p1, err := sb.SealPreCommit1(context.TODO(), id, s.ticket, []abi.PieceInfo{s.pi})
if err != nil { if err != nil {
@ -90,6 +101,60 @@ func (s *seal) commit(t *testing.T, sb *Sealer, done func()) {
} }
} }
func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.SectorID, done func()) {
defer done()
var b bytes.Buffer
err := sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
if err != nil {
t.Fatal(err)
}
expect, _ := ioutil.ReadAll(data(si.Number, 1016))
if !bytes.Equal(b.Bytes(), expect) {
t.Fatal("read wrong bytes")
}
p, sd, err := sp.AcquireSector(context.TODO(), si, stores.FTUnsealed, stores.FTNone, false)
if err != nil {
t.Fatal(err)
}
if err := os.Remove(p.Unsealed); err != nil {
t.Fatal(err)
}
sd()
err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
if err == nil {
t.Fatal("HOW?!")
}
log.Info("this is what we expect: ", err)
if err := sb.UnsealPiece(context.TODO(), si, 0, 1016, sealRand, s.cids.Unsealed); err != nil {
t.Fatal(err)
}
b.Reset()
err = sb.ReadPiece(context.TODO(), &b, si, 0, 1016)
if err != nil {
t.Fatal(err)
}
expect, _ = ioutil.ReadAll(data(si.Number, 1016))
require.Equal(t, expect, b.Bytes())
b.Reset()
err = sb.ReadPiece(context.TODO(), &b, si, 0, 2032)
if err != nil {
t.Fatal(err)
}
expect = append(expect, bytes.Repeat([]byte{0}, 1016)...)
if !bytes.Equal(b.Bytes(), expect) {
t.Fatal("read wrong bytes")
}
}
func post(t *testing.T, sealer *Sealer, seals ...seal) time.Time { func post(t *testing.T, sealer *Sealer, seals ...seal) time.Time {
/*randomness := abi.PoStRandomness{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 7} /*randomness := abi.PoStRandomness{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 7}
@ -227,6 +292,8 @@ func TestSealAndVerify(t *testing.T) {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
s.unseal(t, sb, sp, si, func() {})
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String()) fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("Commit: %s\n", commit.Sub(precommit).String()) fmt.Printf("Commit: %s\n", commit.Sub(precommit).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String()) fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String())
@ -348,3 +415,18 @@ func TestSealAndVerify2(t *testing.T) {
post(t, sb, s1, s2) post(t, sb, s1, s2)
} }
func BenchmarkWriteWithAlignment(b *testing.B) {
bt := abi.UnpaddedPieceSize(2 * 127 * 1024 * 1024)
b.SetBytes(int64(bt))
for i := 0; i < b.N; i++ {
b.StopTimer()
rf, w, _ := ToReadableFile(bytes.NewReader(bytes.Repeat([]byte{0xff, 0}, int(bt/2))), int64(bt))
tf, _ := ioutil.TempFile("/tmp/", "scrb-")
b.StartTimer()
ffi.WriteWithAlignment(abi.RegisteredProof_StackedDRG2KiBSeal, rf, bt, tf, nil)
w()
}
}

View File

@ -2,7 +2,6 @@ package ffiwrapper
import ( import (
"context" "context"
"errors"
"io" "io"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -12,10 +11,9 @@ import (
"github.com/filecoin-project/sector-storage/ffiwrapper/basicfs" "github.com/filecoin-project/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/sector-storage/stores" "github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
) )
type UnpaddedByteIndex uint64
type Validator interface { type Validator interface {
CanCommit(sector stores.SectorPaths) (bool, error) CanCommit(sector stores.SectorPaths) (bool, error)
CanProve(sector stores.SectorPaths) (bool, error) CanProve(sector stores.SectorPaths) (bool, error)
@ -30,7 +28,8 @@ type Storage interface {
storage.Prover storage.Prover
StorageSealer StorageSealer
ReadPieceFromSealedSector(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error
ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error
} }
type Verifier interface { type Verifier interface {
@ -41,10 +40,8 @@ type Verifier interface {
GenerateWinningPoStSectorChallenge(context.Context, abi.RegisteredProof, abi.ActorID, abi.PoStRandomness, uint64) ([]uint64, error) GenerateWinningPoStSectorChallenge(context.Context, abi.RegisteredProof, abi.ActorID, abi.PoStRandomness, uint64) ([]uint64, error)
} }
var ErrSectorNotFound = errors.New("sector not found")
type SectorProvider interface { type SectorProvider interface {
// * returns ErrSectorNotFound if a requested existing sector doesn't exist // * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists // * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error)
} }

View File

@ -0,0 +1,26 @@
package ffiwrapper
import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/storiface"
)
// merge gaps between ranges which are close to each other
// TODO: more benchmarking to come up with more optimal number
const mergeGaps = 32 << 20
// TODO const expandRuns = 16 << 20 // unseal more than requested for future requests
func computeUnsealRanges(unsealed rlepluslazy.RunIterator, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (rlepluslazy.RunIterator, error) {
todo := pieceRun(offset.Padded(), size.Padded())
todo, err := rlepluslazy.Subtract(todo, unsealed)
if err != nil {
return nil, xerrors.Errorf("compute todo-unsealed: %w", err)
}
return rlepluslazy.JoinClose(todo, mergeGaps)
}

157
fr32/fr32.go Normal file
View File

@ -0,0 +1,157 @@
package fr32
import (
"math/bits"
"runtime"
"sync"
"github.com/filecoin-project/specs-actors/actors/abi"
)
var MTTresh = uint64(32 << 20)
func mtChunkCount(usz abi.PaddedPieceSize) uint64 {
threads := (uint64(usz)) / MTTresh
if threads > uint64(runtime.NumCPU()) {
threads = 1 << (32 - bits.LeadingZeros32(uint32(runtime.NumCPU())))
}
if threads == 0 {
return 1
}
if threads > 64 {
return 64 // avoid too large buffers
}
return threads
}
func mt(in, out []byte, padLen int, op func(unpadded, padded []byte)) {
threads := mtChunkCount(abi.PaddedPieceSize(padLen))
threadBytes := abi.PaddedPieceSize(padLen / int(threads))
var wg sync.WaitGroup
wg.Add(int(threads))
for i := 0; i < int(threads); i++ {
go func(thread int) {
defer wg.Done()
start := threadBytes * abi.PaddedPieceSize(thread)
end := start + threadBytes
op(in[start.Unpadded():end.Unpadded()], out[start:end])
}(i)
}
wg.Wait()
}
// Assumes len(in)%127==0 and len(out)%128==0
func Pad(in, out []byte) {
if len(out) > int(MTTresh) {
mt(in, out, len(out), pad)
return
}
pad(in, out)
}
func pad(in, out []byte) {
chunks := len(out) / 128
for chunk := 0; chunk < chunks; chunk++ {
inOff := chunk * 127
outOff := chunk * 128
copy(out[outOff:outOff+31], in[inOff:inOff+31])
t := in[inOff+31] >> 6
out[outOff+31] = in[inOff+31] & 0x3f
var v byte
for i := 32; i < 64; i++ {
v = in[inOff+i]
out[outOff+i] = (v << 2) | t
t = v >> 6
}
t = v >> 4
out[outOff+63] &= 0x3f
for i := 64; i < 96; i++ {
v = in[inOff+i]
out[outOff+i] = (v << 4) | t
t = v >> 4
}
t = v >> 2
out[outOff+95] &= 0x3f
for i := 96; i < 127; i++ {
v = in[inOff+i]
out[outOff+i] = (v << 6) | t
t = v >> 2
}
out[outOff+127] = t & 0x3f
}
}
// Assumes len(in)%128==0 and len(out)%127==0
func Unpad(in []byte, out []byte) {
if len(in) > int(MTTresh) {
mt(out, in, len(in), unpad)
return
}
unpad(out, in)
}
func unpad(out, in []byte) {
chunks := len(in) / 128
for chunk := 0; chunk < chunks; chunk++ {
inOffNext := chunk*128 + 1
outOff := chunk * 127
at := in[chunk*128]
for i := 0; i < 32; i++ {
next := in[i+inOffNext]
out[outOff+i] = at
//out[i] |= next << 8
at = next
}
out[outOff+31] |= at << 6
for i := 32; i < 64; i++ {
next := in[i+inOffNext]
out[outOff+i] = at >> 2
out[outOff+i] |= next << 6
at = next
}
out[outOff+63] ^= (at << 6) ^ (at << 4)
for i := 64; i < 96; i++ {
next := in[i+inOffNext]
out[outOff+i] = at >> 4
out[outOff+i] |= next << 4
at = next
}
out[outOff+95] ^= (at << 4) ^ (at << 2)
for i := 96; i < 127; i++ {
next := in[i+inOffNext]
out[outOff+i] = at >> 6
out[outOff+i] |= next << 2
at = next
}
}
}

66
fr32/fr32_ffi_cmp_test.go Normal file
View File

@ -0,0 +1,66 @@
package fr32_test
import (
"bytes"
"github.com/filecoin-project/sector-storage/fr32"
"io"
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/ffiwrapper"
)
func TestWriteTwoPcs(t *testing.T) {
tf, _ := ioutil.TempFile("/tmp/", "scrb-")
paddedSize := abi.PaddedPieceSize(16 << 20)
n := 2
var rawBytes []byte
for i := 0; i < n; i++ {
buf := bytes.Repeat([]byte{0xab * byte(i)}, int(paddedSize.Unpadded()))
rawBytes = append(rawBytes, buf...)
rf, w, _ := ffiwrapper.ToReadableFile(bytes.NewReader(buf), int64(len(buf)))
_, _, _, err := ffi.WriteWithAlignment(abi.RegisteredProof_StackedDRG32GiBSeal, rf, abi.UnpaddedPieceSize(len(buf)), tf, nil)
if err != nil {
panic(err)
}
if err := w(); err != nil {
panic(err)
}
}
if _, err := tf.Seek(io.SeekStart, 0); err != nil {
panic(err)
}
ffiBytes, err := ioutil.ReadAll(tf)
if err != nil {
panic(err)
}
if err := tf.Close(); err != nil {
panic(err)
}
if err := os.Remove(tf.Name()); err != nil {
panic(err)
}
outBytes := make([]byte, int(paddedSize)*n)
fr32.Pad(rawBytes, outBytes)
require.Equal(t, ffiBytes, outBytes)
unpadBytes := make([]byte, int(paddedSize.Unpadded())*n)
fr32.Unpad(ffiBytes, unpadBytes)
require.Equal(t, rawBytes, unpadBytes)
}

250
fr32/fr32_test.go Normal file
View File

@ -0,0 +1,250 @@
package fr32_test
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/sector-storage/fr32"
)
func padFFI(buf []byte) []byte {
rf, w, _ := ffiwrapper.ToReadableFile(bytes.NewReader(buf), int64(len(buf)))
tf, _ := ioutil.TempFile("/tmp/", "scrb-")
_, _, _, err := ffi.WriteWithAlignment(abi.RegisteredProof_StackedDRG32GiBSeal, rf, abi.UnpaddedPieceSize(len(buf)), tf, nil)
if err != nil {
panic(err)
}
if err := w(); err != nil {
panic(err)
}
if _, err := tf.Seek(io.SeekStart, 0); err != nil {
panic(err)
}
padded, err := ioutil.ReadAll(tf)
if err != nil {
panic(err)
}
if err := tf.Close(); err != nil {
panic(err)
}
if err := os.Remove(tf.Name()); err != nil {
panic(err)
}
return padded
}
func TestPadChunkFFI(t *testing.T) {
testByteChunk := func(b byte) func(*testing.T) {
return func(t *testing.T) {
var buf [128]byte
copy(buf[:], bytes.Repeat([]byte{b}, 127))
fr32.Pad(buf[:], buf[:])
expect := padFFI(bytes.Repeat([]byte{b}, 127))
require.Equal(t, expect, buf[:])
}
}
t.Run("ones", testByteChunk(0xff))
t.Run("lsb1", testByteChunk(0x01))
t.Run("msb1", testByteChunk(0x80))
t.Run("zero", testByteChunk(0x0))
t.Run("mid", testByteChunk(0x3c))
}
func TestPadChunkRandEqFFI(t *testing.T) {
for i := 0; i < 200; i++ {
var input [127]byte
rand.Read(input[:])
var buf [128]byte
fr32.Pad(input[:], buf[:])
expect := padFFI(input[:])
require.Equal(t, expect, buf[:])
}
}
func TestRoundtrip(t *testing.T) {
testByteChunk := func(b byte) func(*testing.T) {
return func(t *testing.T) {
var buf [128]byte
input := bytes.Repeat([]byte{0x01}, 127)
fr32.Pad(input, buf[:])
var out [127]byte
fr32.Unpad(buf[:], out[:])
require.Equal(t, input, out[:])
}
}
t.Run("ones", testByteChunk(0xff))
t.Run("lsb1", testByteChunk(0x01))
t.Run("msb1", testByteChunk(0x80))
t.Run("zero", testByteChunk(0x0))
t.Run("mid", testByteChunk(0x3c))
}
func TestRoundtripChunkRand(t *testing.T) {
for i := 0; i < 200; i++ {
var input [127]byte
rand.Read(input[:])
var buf [128]byte
copy(buf[:], input[:])
fr32.Pad(buf[:], buf[:])
var out [127]byte
fr32.Unpad(buf[:], out[:])
require.Equal(t, input[:], out[:])
}
}
func TestRoundtrip16MRand(t *testing.T) {
up := abi.PaddedPieceSize(16 << 20).Unpadded()
input := make([]byte, up)
rand.Read(input[:])
buf := make([]byte, 16<<20)
fr32.Pad(input, buf)
out := make([]byte, up)
fr32.Unpad(buf, out)
require.Equal(t, input, out)
ffi := padFFI(input)
require.Equal(t, ffi, buf)
}
func BenchmarkPadChunk(b *testing.B) {
var buf [128]byte
in := bytes.Repeat([]byte{0xff}, 127)
b.SetBytes(127)
for i := 0; i < b.N; i++ {
fr32.Pad(in, buf[:])
}
}
func BenchmarkChunkRoundtrip(b *testing.B) {
var buf [128]byte
copy(buf[:], bytes.Repeat([]byte{0xff}, 127))
var out [127]byte
b.SetBytes(127)
for i := 0; i < b.N; i++ {
fr32.Pad(buf[:], buf[:])
fr32.Unpad(buf[:], out[:])
}
}
func BenchmarkUnpadChunk(b *testing.B) {
var buf [128]byte
copy(buf[:], bytes.Repeat([]byte{0xff}, 127))
fr32.Pad(buf[:], buf[:])
var out [127]byte
b.SetBytes(127)
b.ReportAllocs()
bs := buf[:]
for i := 0; i < b.N; i++ {
fr32.Unpad(bs, out[:])
}
}
func BenchmarkUnpad16MChunk(b *testing.B) {
up := abi.PaddedPieceSize(16 << 20).Unpadded()
var buf [16 << 20]byte
fr32.Pad(bytes.Repeat([]byte{0xff}, int(up)), buf[:])
var out [16 << 20]byte
b.SetBytes(16 << 20)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Unpad(buf[:], out[:])
}
}
func BenchmarkPad16MChunk(b *testing.B) {
up := abi.PaddedPieceSize(16 << 20).Unpadded()
var buf [16 << 20]byte
in := bytes.Repeat([]byte{0xff}, int(up))
b.SetBytes(16 << 20)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Pad(in, buf[:])
}
}
func BenchmarkPad1GChunk(b *testing.B) {
up := abi.PaddedPieceSize(1 << 30).Unpadded()
var buf [1 << 30]byte
in := bytes.Repeat([]byte{0xff}, int(up))
b.SetBytes(1 << 30)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Pad(in, buf[:])
}
}
func BenchmarkUnpad1GChunk(b *testing.B) {
up := abi.PaddedPieceSize(1 << 30).Unpadded()
var buf [1 << 30]byte
fr32.Pad(bytes.Repeat([]byte{0xff}, int(up)), buf[:])
var out [1 << 30]byte
b.SetBytes(1 << 30)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fr32.Unpad(buf[:], out[:])
}
}

183
fr32/readers.go Normal file
View File

@ -0,0 +1,183 @@
package fr32
import (
"io"
"math/bits"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type padReader struct {
src io.Reader
left uint64
work []byte
}
func NewPadReader(src io.Reader, sz abi.UnpaddedPieceSize) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, MTTresh*mtChunkCount(sz.Padded()))
return &padReader{
src: src,
left: uint64(sz.Padded()),
work: buf,
}, nil
}
func (r *padReader) Read(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(len(out))))
if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)
}
todo := abi.PaddedPieceSize(outTwoPow).Unpadded()
if r.left < uint64(todo.Padded()) {
todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left))).Unpadded()
}
r.left -= uint64(todo.Padded())
n, err := r.src.Read(r.work[:todo])
if err != nil && err != io.EOF {
return n, err
}
Pad(r.work[:todo], out[:todo.Padded()])
return int(todo.Padded()), err
}
type unpadReader struct {
src io.Reader
left uint64
work []byte
}
func NewUnpadReader(src io.Reader, sz abi.PaddedPieceSize) (io.Reader, error) {
if err := sz.Validate(); err != nil {
return nil, xerrors.Errorf("bad piece size: %w", err)
}
buf := make([]byte, MTTresh*mtChunkCount(sz))
return &unpadReader{
src: src,
left: uint64(sz),
work: buf,
}, nil
}
func (r *unpadReader) Read(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
chunks := len(out) / 127
outTwoPow := 1 << (63 - bits.LeadingZeros64(uint64(chunks*128)))
if err := abi.PaddedPieceSize(outTwoPow).Validate(); err != nil {
return 0, xerrors.Errorf("output must be of valid padded piece size: %w", err)
}
todo := abi.PaddedPieceSize(outTwoPow)
if r.left < uint64(todo) {
todo = abi.PaddedPieceSize(1 << (63 - bits.LeadingZeros64(r.left)))
}
r.left -= uint64(todo)
n, err := r.src.Read(r.work[:todo])
if err != nil && err != io.EOF {
return n, err
}
if n != int(todo) {
return 0, xerrors.Errorf("didn't read enough: %w", err)
}
Unpad(r.work[:todo], out[:todo.Unpadded()])
return int(todo.Unpadded()), err
}
type padWriter struct {
dst io.Writer
stash []byte
work []byte
}
func NewPadWriter(dst io.Writer) (io.WriteCloser, error) {
return &padWriter{
dst: dst,
}, nil
}
func (w *padWriter) Write(p []byte) (int, error) {
in := p
if len(p)+len(w.stash) < 127 {
w.stash = append(w.stash, p...)
return len(p), nil
}
if len(w.stash) != 0 {
in = append(w.stash, in...)
}
for {
pieces := subPieces(abi.UnpaddedPieceSize(len(in)))
biggest := pieces[len(pieces)-1]
if abi.PaddedPieceSize(cap(w.work)) < biggest.Padded() {
w.work = make([]byte, 0, biggest.Padded())
}
Pad(in[:int(biggest)], w.work[:int(biggest.Padded())])
n, err := w.dst.Write(w.work[:int(biggest.Padded())])
if err != nil {
return int(abi.PaddedPieceSize(n).Unpadded()), err
}
in = in[biggest:]
if len(in) < 127 {
if cap(w.stash) < len(in) {
w.stash = make([]byte, 0, len(in))
}
w.stash = w.stash[:len(in)]
copy(w.stash, in)
return len(p), nil
}
}
}
func (w *padWriter) Close() error {
if len(w.stash) > 0 {
return xerrors.Errorf("still have %d unprocessed bytes", len(w.stash))
}
// allow gc
w.stash = nil
w.work = nil
w.dst = nil
return nil
}

55
fr32/readers_test.go Normal file
View File

@ -0,0 +1,55 @@
package fr32_test
import (
"bytes"
"io/ioutil"
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/fr32"
)
func TestPadReader(t *testing.T) {
ps := abi.PaddedPieceSize(64 << 20).Unpadded()
raw := bytes.Repeat([]byte{0x55}, int(ps))
r, err := fr32.NewPadReader(bytes.NewReader(raw), ps)
if err != nil {
t.Fatal(err)
}
readerPadded, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
padOut := make([]byte, ps.Padded())
fr32.Pad(raw, padOut)
require.Equal(t, padOut, readerPadded)
}
func TestUnpadReader(t *testing.T) {
ps := abi.PaddedPieceSize(64 << 20).Unpadded()
raw := bytes.Repeat([]byte{0x77}, int(ps))
padOut := make([]byte, ps.Padded())
fr32.Pad(raw, padOut)
r, err := fr32.NewUnpadReader(bytes.NewReader(padOut), ps.Padded())
if err != nil {
t.Fatal(err)
}
readered, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
require.Equal(t, raw, readered)
}

31
fr32/utils.go Normal file
View File

@ -0,0 +1,31 @@
package fr32
import (
"math/bits"
"github.com/filecoin-project/specs-actors/actors/abi"
)
func subPieces(in abi.UnpaddedPieceSize) []abi.UnpaddedPieceSize {
// Convert to in-sector bytes for easier math:
//
// (we convert to sector bytes as they are nice round binary numbers)
w := uint64(in.Padded())
out := make([]abi.UnpaddedPieceSize, bits.OnesCount64(w))
for i := range out {
// Extract the next lowest non-zero bit
next := bits.TrailingZeros64(w)
psize := uint64(1) << next
// e.g: if the number is 0b010100, psize will be 0b000100
// set that bit to 0 by XORing it, so the next iteration looks at the
// next bit
w ^= psize
// Add the piece size to the list of pieces we need to create
out[i] = abi.PaddedPieceSize(psize).Unpadded()
}
return out
}

2
go.mod
View File

@ -3,8 +3,10 @@ module github.com/filecoin-project/sector-storage
go 1.13 go 1.13
require ( require (
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/elastic/go-sysinfo v1.3.0 github.com/elastic/go-sysinfo v1.3.0
github.com/filecoin-project/filecoin-ffi v0.0.0-20200326153646-e899cc1dd072 github.com/filecoin-project/filecoin-ffi v0.0.0-20200326153646-e899cc1dd072
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
github.com/filecoin-project/go-paramfetch v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.1
github.com/filecoin-project/specs-actors v0.5.4-0.20200521014528-0df536f7e461 github.com/filecoin-project/specs-actors v0.5.4-0.20200521014528-0df536f7e461

4
go.sum
View File

@ -19,6 +19,8 @@ github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e h1:lj77EKYUpYXTd8CD/+QMIf8b6OIOTsfEBSXiAzuEHTU=
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e/go.mod h1:3ZQK6DMPSz/QZ73jlWxBtUhNA8xZx7LzUFSq/OfP8vk=
github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE= github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE=
github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY= github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY=
@ -33,6 +35,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200309034705-8c7ac40bd550 h1:ao
github.com/filecoin-project/go-bitfield v0.0.0-20200309034705-8c7ac40bd550/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw= github.com/filecoin-project/go-bitfield v0.0.0-20200309034705-8c7ac40bd550/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw=
github.com/filecoin-project/go-bitfield v0.0.1 h1:Xg/JnrqqE77aJVKdbEyR04n9FZQWhwrN+buDgQCVpZU= github.com/filecoin-project/go-bitfield v0.0.1 h1:Xg/JnrqqE77aJVKdbEyR04n9FZQWhwrN+buDgQCVpZU=
github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY= github.com/filecoin-project/go-bitfield v0.0.1/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e h1:gkG/7G+iKy4He+IiQNeQn+nndFznb/vCoOR8iRQsm60=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518150651-562fdb554b6e/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo= github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo=

View File

@ -7,6 +7,7 @@ import (
"runtime" "runtime"
"github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
@ -55,10 +56,11 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
type localWorkerPathProvider struct { type localWorkerPathProvider struct {
w *LocalWorker w *LocalWorker
op stores.AcquireMode
} }
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing) paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, stores.PathType(sealing), l.op)
if err != nil { if err != nil {
return stores.SectorPaths{}, nil, err return stores.SectorPaths{}, nil, err
} }
@ -75,7 +77,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
sid := stores.PathByType(storageIDs, fileType) sid := stores.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil { if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == stores.AcquireMove); err != nil {
log.Errorf("declare sector error: %+v", err) log.Errorf("declare sector error: %+v", err)
} }
} }
@ -104,8 +106,8 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
return sb.AddPiece(ctx, sector, epcs, sz, r) return sb.AddPiece(ctx, sector, epcs, sz, r)
} }
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool) error { func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool, am stores.AcquireMode) error {
_, done, err := (&localWorkerPathProvider{w: l}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing) _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing)
if err != nil { if err != nil {
return err return err
} }
@ -181,6 +183,36 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return nil return nil
} }
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
sb, err := l.sb()
if err != nil {
return err
}
if err := sb.UnsealPiece(ctx, sector, index, size, randomness, cid); err != nil {
return xerrors.Errorf("unsealing sector: %w", err)
}
if err := l.storage.RemoveCopies(ctx, sector, stores.FTSealed); err != nil {
return xerrors.Errorf("removing source data: %w", err)
}
if err := l.storage.RemoveCopies(ctx, sector, stores.FTCache); err != nil {
return xerrors.Errorf("removing source data: %w", err)
}
return nil
}
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
sb, err := l.sb()
if err != nil {
return err
}
return sb.ReadPiece(ctx, writer, sector, index, size)
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) { func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
return l.acceptTasks, nil return l.acceptTasks, nil
} }

View File

@ -28,7 +28,10 @@ type URLs []string
type Worker interface { type Worker interface {
ffiwrapper.StorageSealer ffiwrapper.StorageSealer
Fetch(context.Context, abi.SectorID, stores.SectorFileType, bool) error
Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) error
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) error
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
@ -46,7 +49,7 @@ type Worker interface {
type SectorManager interface { type SectorManager interface {
SectorSize() abi.SectorSize SectorSize() abi.SectorSize
ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ffiwrapper.StorageSealer ffiwrapper.StorageSealer
storage.Prover storage.Prover
@ -74,6 +77,7 @@ type SealerConfig struct {
AllowPreCommit1 bool AllowPreCommit1 bool
AllowPreCommit2 bool AllowPreCommit2 bool
AllowCommit bool AllowCommit bool
AllowUnseal bool
} }
type StorageAuth http.Header type StorageAuth http.Header
@ -108,7 +112,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
go m.sched.runSched() go m.sched.runSched()
localTasks := []sealtasks.TaskType{ localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTReadUnsealed,
} }
if sc.AllowPreCommit1 { if sc.AllowPreCommit1 {
localTasks = append(localTasks, sealtasks.TTPreCommit1) localTasks = append(localTasks, sealtasks.TTPreCommit1)
@ -119,6 +123,9 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
if sc.AllowCommit { if sc.AllowCommit {
localTasks = append(localTasks, sealtasks.TTCommit2) localTasks = append(localTasks, sealtasks.TTCommit2)
} }
if sc.AllowUnseal {
localTasks = append(localTasks, sealtasks.TTUnseal)
}
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType, SealProof: cfg.SealProofType,
@ -173,20 +180,69 @@ func (m *Manager) SectorSize() abi.SectorSize {
return sz return sz
} }
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func schedNop(context.Context, Worker) error { func schedNop(context.Context, Worker) error {
return nil return nil
} }
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool) func(context.Context, Worker) error { func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool, am stores.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error { return func(ctx context.Context, worker Worker) error {
return worker.Fetch(ctx, sector, ft, sealing) return worker.Fetch(ctx, sector, ft, sealing, am)
} }
} }
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
}
var selector WorkerSelector
if len(best) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed)
} else { // append to existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
if err != nil {
return xerrors.Errorf("creating unsealPiece selector: %w", err)
}
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
unsealFetch := func(ctx context.Context, worker Worker) error {
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, true, stores.AcquireCopy); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
}
if len(best) > 0 {
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, true, stores.AcquireMove); err != nil {
return xerrors.Errorf("copy unsealed sector data: %w", err)
}
}
return nil
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
return w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)
})
if err != nil {
return err
}
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("creating readPiece selector: %w", err)
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
return w.ReadPiece(ctx, sink, sector, offset, size)
})
if err != nil {
return xerrors.Errorf("reading piece from sealed sector: %w", err)
}
return nil
}
func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error { func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
log.Warnf("stub NewSector") log.Warnf("stub NewSector")
return nil return nil
@ -197,7 +253,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
var err error var err error
if len(existingPieces) == 0 { // new if len(existingPieces) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed) selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed)
} else { // append to existing } else { // use existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
} }
if err != nil { if err != nil {
@ -225,7 +281,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return nil, xerrors.Errorf("creating path selector: %w", err) return nil, xerrors.Errorf("creating path selector: %w", err)
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
if err != nil { if err != nil {
return err return err
@ -243,7 +299,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err) return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit2(ctx, sector, phase1Out) p, err := w.SealPreCommit2(ctx, sector, phase1Out)
if err != nil { if err != nil {
return err return err
@ -264,7 +320,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
// (except, don't.. for now at least - we are using this step to bring data // (except, don't.. for now at least - we are using this step to bring data
// into 'provable' storage. Optimally we'd do that in commit2, in parallel // into 'provable' storage. Optimally we'd do that in commit2, in parallel
// with snark compute) // with snark compute)
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
if err != nil { if err != nil {
return err return err
@ -297,7 +353,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
} }
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false), schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false, stores.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
return w.FinalizeSector(ctx, sector) return w.FinalizeSector(ctx, sector)
}) })

View File

@ -65,6 +65,10 @@ func (t *testStorage) SetStorage(f func(*stores.StorageConfig)) error {
return nil return nil
} }
func (t *testStorage) Stat(path string) (stores.FsStat, error) {
return stores.Stat(path)
}
var _ stores.LocalStorage = &testStorage{} var _ stores.LocalStorage = &testStorage{}
func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *stores.Remote, *stores.Index) { func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *stores.Remote, *stores.Index) {

View File

@ -5,7 +5,6 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"math" "math"
"math/rand" "math/rand"
"sync" "sync"
@ -18,12 +17,14 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/sector-storage/ffiwrapper" "github.com/filecoin-project/sector-storage/ffiwrapper"
"github.com/filecoin-project/sector-storage/storiface"
) )
var log = logging.Logger("sbmock") var log = logging.Logger("sbmock")
type SectorMgr struct { type SectorMgr struct {
sectors map[abi.SectorID]*sectorState sectors map[abi.SectorID]*sectorState
pieces map[cid.Cid][]byte
sectorSize abi.SectorSize sectorSize abi.SectorSize
nextSectorID abi.SectorNumber nextSectorID abi.SectorNumber
proofType abi.RegisteredProof proofType abi.RegisteredProof
@ -41,6 +42,7 @@ func NewMockSectorMgr(ssize abi.SectorSize) *SectorMgr {
return &SectorMgr{ return &SectorMgr{
sectors: make(map[abi.SectorID]*sectorState), sectors: make(map[abi.SectorID]*sectorState),
pieces: map[cid.Cid][]byte{},
sectorSize: ssize, sectorSize: ssize,
nextSectorID: 5, nextSectorID: 5,
proofType: rt, proofType: rt,
@ -80,13 +82,17 @@ func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, exist
ss.lk.Lock() ss.lk.Lock()
defer ss.lk.Unlock() defer ss.lk.Unlock()
c, err := ffiwrapper.GeneratePieceCIDFromFile(mgr.proofType, r, size) var b bytes.Buffer
tr := io.TeeReader(r, &b)
c, err := ffiwrapper.GeneratePieceCIDFromFile(mgr.proofType, tr, size)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to generate piece cid: %w", err) return abi.PieceInfo{}, xerrors.Errorf("failed to generate piece cid: %w", err)
} }
log.Warn("Generated Piece CID: ", c) log.Warn("Generated Piece CID: ", c)
mgr.pieces[c] = b.Bytes()
ss.pieces = append(ss.pieces, c) ss.pieces = append(ss.pieces, c)
return abi.PieceInfo{ return abi.PieceInfo{
Size: size.Padded(), Size: size.Padded(),
@ -268,11 +274,13 @@ func generateFakePoSt(sectorInfo []abi.SectorInfo) []abi.PoStProof {
} }
} }
func (mgr *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) { func (mgr *SectorMgr) ReadPiece(ctx context.Context, w io.Writer, sectorID abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
if len(mgr.sectors[sectorID].pieces) > 1 { if len(mgr.sectors[sectorID].pieces) > 1 || offset != 0 {
panic("implme") panic("implme")
} }
return ioutil.NopCloser(io.LimitReader(bytes.NewReader(mgr.sectors[sectorID].pieces[0].Bytes()[offset:]), int64(size))), nil
_, err := io.CopyN(w, bytes.NewReader(mgr.pieces[mgr.sectors[sectorID].pieces[0]]), int64(size))
return err
} }
func (mgr *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo, error) { func (mgr *SectorMgr) StageFakeData(mid abi.ActorID) (abi.SectorID, []abi.PieceInfo, error) {

View File

@ -288,3 +288,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
}, },
}, },
} }
func init() {
ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately
ResourceTable[sealtasks.TTReadUnsealed] = ResourceTable[sealtasks.TTFetch]
}

View File

@ -20,7 +20,7 @@ func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, e
return stores.SectorPaths{}, nil, xerrors.New("read-only storage") return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
} }
p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing) p, _, done, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, stores.PathType(sealing), stores.AcquireMove)
return p, done, err return p, done, err
} }

View File

@ -12,6 +12,8 @@ const (
TTFinalize TaskType = "seal/v0/finalize" TTFinalize TaskType = "seal/v0/finalize"
TTFetch TaskType = "seal/v0/fetch" TTFetch TaskType = "seal/v0/fetch"
TTUnseal TaskType = "seal/v0/unseal"
TTReadUnsealed TaskType = "seal/v0/unsealread"
) )
var order = map[TaskType]int{ var order = map[TaskType]int{
@ -22,6 +24,8 @@ var order = map[TaskType]int{
TTCommit1: 3, TTCommit1: 3,
TTFetch: 2, TTFetch: 2,
TTFinalize: 1, TTFinalize: 1,
TTUnseal: 0,
TTReadUnsealed: 0,
} }
func (a TaskType) Less(b TaskType) bool { func (a TaskType) Less(b TaskType) bool {

View File

@ -12,7 +12,7 @@ import (
) )
type existingSelector struct { type existingSelector struct {
best []stores.StorageInfo best []stores.SectorStorageInfo
} }
func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) { func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) {

View File

@ -70,7 +70,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
} }
// passing 0 spt because we don't allocate anything // passing 0 spt because we don't allocate anything
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false) paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove)
if err != nil { if err != nil {
log.Error("%+v", err) log.Error("%+v", err)
w.WriteHeader(500) w.WriteHeader(500)

View File

@ -38,16 +38,27 @@ type HealthReport struct {
Err error Err error
} }
type SectorStorageInfo struct {
ID ID
URLs []string // TODO: Support non-http transports
Weight uint64
CanSeal bool
CanStore bool
Primary bool
}
type SectorIndex interface { // part of storage-miner api type SectorIndex interface { // part of storage-miner api
StorageAttach(context.Context, StorageInfo, FsStat) error StorageAttach(context.Context, StorageInfo, FsStat) error
StorageInfo(context.Context, ID) (StorageInfo, error) StorageInfo(context.Context, ID) (StorageInfo, error)
StorageReportHealth(context.Context, ID, HealthReport) error StorageReportHealth(context.Context, ID, HealthReport) error
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error)
} }
type Decl struct { type Decl struct {
@ -55,6 +66,11 @@ type Decl struct {
SectorFileType SectorFileType
} }
type declMeta struct {
storage ID
primary bool
}
type storageEntry struct { type storageEntry struct {
info *StorageInfo info *StorageInfo
fsi FsStat fsi FsStat
@ -66,13 +82,13 @@ type storageEntry struct {
type Index struct { type Index struct {
lk sync.RWMutex lk sync.RWMutex
sectors map[Decl][]ID sectors map[Decl][]*declMeta
stores map[ID]*storageEntry stores map[ID]*storageEntry
} }
func NewIndex() *Index { func NewIndex() *Index {
return &Index{ return &Index{
sectors: map[Decl][]ID{}, sectors: map[Decl][]*declMeta{},
stores: map[ID]*storageEntry{}, stores: map[ID]*storageEntry{},
} }
} }
@ -88,7 +104,7 @@ func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
} }
for decl, ids := range i.sectors { for decl, ids := range i.sectors {
for _, id := range ids { for _, id := range ids {
byID[id][decl.SectorID] |= decl.SectorFileType byID[id.storage][decl.SectorID] |= decl.SectorFileType
} }
} }
@ -157,10 +173,11 @@ func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthRep
return nil return nil
} }
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error { func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType, primary bool) error {
i.lk.Lock() i.lk.Lock()
defer i.lk.Unlock() defer i.lk.Unlock()
loop:
for _, fileType := range PathTypes { for _, fileType := range PathTypes {
if fileType&ft == 0 { if fileType&ft == 0 {
continue continue
@ -169,13 +186,20 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
d := Decl{s, fileType} d := Decl{s, fileType}
for _, sid := range i.sectors[d] { for _, sid := range i.sectors[d] {
if sid == storageId { if sid.storage == storageId {
if !sid.primary && primary {
sid.primary = true
} else {
log.Warnf("sector %v redeclared in %s", s, storageId) log.Warnf("sector %v redeclared in %s", s, storageId)
return nil }
continue loop
} }
} }
i.sectors[d] = append(i.sectors[d], storageId) i.sectors[d] = append(i.sectors[d], &declMeta{
storage: storageId,
primary: primary,
})
} }
return nil return nil
@ -196,9 +220,9 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto
return nil return nil
} }
rewritten := make([]ID, 0, len(i.sectors[d])-1) rewritten := make([]*declMeta, 0, len(i.sectors[d])-1)
for _, sid := range i.sectors[d] { for _, sid := range i.sectors[d] {
if sid == storageId { if sid.storage == storageId {
continue continue
} }
@ -215,11 +239,12 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto
return nil return nil
} }
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) { func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]SectorStorageInfo, error) {
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
storageIDs := map[ID]uint64{} storageIDs := map[ID]uint64{}
isprimary := map[ID]bool{}
for _, pathType := range PathTypes { for _, pathType := range PathTypes {
if ft&pathType == 0 { if ft&pathType == 0 {
@ -227,11 +252,12 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
} }
for _, id := range i.sectors[Decl{s, pathType}] { for _, id := range i.sectors[Decl{s, pathType}] {
storageIDs[id]++ storageIDs[id.storage]++
isprimary[id.storage] = isprimary[id.storage] || id.primary
} }
} }
out := make([]StorageInfo, 0, len(storageIDs)) out := make([]SectorStorageInfo, 0, len(storageIDs))
for id, n := range storageIDs { for id, n := range storageIDs {
st, ok := i.stores[id] st, ok := i.stores[id]
@ -251,12 +277,15 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
urls[k] = rl.String() urls[k] = rl.String()
} }
out = append(out, StorageInfo{ out = append(out, SectorStorageInfo{
ID: id, ID: id,
URLs: urls, URLs: urls,
Weight: st.info.Weight * n, // storage with more sector types is better Weight: st.info.Weight * n, // storage with more sector types is better
CanSeal: st.info.CanSeal, CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore, CanStore: st.info.CanStore,
Primary: isprimary[id],
}) })
} }
@ -277,12 +306,15 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
urls[k] = rl.String() urls[k] = rl.String()
} }
out = append(out, StorageInfo{ out = append(out, SectorStorageInfo{
ID: id, ID: id,
URLs: urls, URLs: urls,
Weight: st.info.Weight * 0, // TODO: something better than just '0' Weight: st.info.Weight * 0, // TODO: something better than just '0'
CanSeal: st.info.CanSeal, CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore, CanStore: st.info.CanStore,
Primary: false,
}) })
} }
} }
@ -302,7 +334,7 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
return *si.info, nil return *si.info, nil
} }
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, sealing bool) ([]StorageInfo, error) { func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredProof, pathType PathType) ([]StorageInfo, error) {
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
@ -314,10 +346,10 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
} }
for _, p := range i.stores { for _, p := range i.stores {
if sealing && !p.info.CanSeal { if (pathType == PathSealing) && !p.info.CanSeal {
continue continue
} }
if !sealing && !p.info.CanStore { if (pathType == PathStorage) && !p.info.CanStore {
continue continue
} }
@ -362,10 +394,19 @@ func (i *Index) FindSector(id abi.SectorID, typ SectorFileType) ([]ID, error) {
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
return i.sectors[Decl{ f, ok := i.sectors[Decl{
SectorID: id, SectorID: id,
SectorFileType: typ, SectorFileType: typ,
}], nil }]
if !ok {
return nil, nil
}
out := make([]ID, 0, len(f))
for _, meta := range f {
out = append(out, meta.storage)
}
return out, nil
} }
var _ SectorIndex = &Index{} var _ SectorIndex = &Index{}

View File

@ -9,10 +9,28 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
) )
type PathType bool
const (
PathStorage = false
PathSealing = true
)
type AcquireMode string
const (
AcquireMove = "move"
AcquireCopy = "copy"
)
type Store interface { type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, done func(), err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error
// like remove, but doesn't remove the primary sector copy, nor the last
// non-primary copy if there no primary copies
RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error
// move sectors into storage // move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error

View File

@ -47,6 +47,8 @@ type LocalPath struct {
type LocalStorage interface { type LocalStorage interface {
GetStorage() (StorageConfig, error) GetStorage() (StorageConfig, error)
SetStorage(func(*StorageConfig)) error SetStorage(func(*StorageConfig)) error
Stat(path string) (FsStat, error)
} }
const MetaFile = "sectorstore.json" const MetaFile = "sectorstore.json"
@ -98,7 +100,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
local: p, local: p,
} }
fst, err := Stat(p) fst, err := st.localStorage.Stat(p)
if err != nil { if err != nil {
return err return err
} }
@ -133,7 +135,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
} }
if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t); err != nil { if err := st.index.StorageDeclareSector(ctx, meta.ID, sid, t, meta.CanStore); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err) return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, meta.ID, err)
} }
} }
@ -177,7 +179,7 @@ func (st *Local) reportHealth(ctx context.Context) {
toReport := map[ID]HealthReport{} toReport := map[ID]HealthReport{}
for id, p := range st.paths { for id, p := range st.paths {
stat, err := Stat(p.local) stat, err := st.localStorage.Stat(p.local)
toReport[id] = HealthReport{ toReport[id] = HealthReport{
Stat: stat, Stat: stat,
@ -195,7 +197,7 @@ func (st *Local) reportHealth(ctx context.Context) {
} }
} }
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
} }
@ -240,7 +242,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
continue continue
} }
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, sealing) sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType)
if err != nil { if err != nil {
st.localLk.RUnlock() st.localLk.RUnlock()
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
@ -259,11 +261,11 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
continue continue
} }
if sealing && !si.CanSeal { if (pathType == PathSealing) && !si.CanSeal {
continue continue
} }
if !sealing && !si.CanStore { if (pathType == PathStorage) && !si.CanStore {
continue continue
} }
@ -328,16 +330,61 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
} }
for _, info := range si { for _, info := range si {
p, ok := st.paths[info.ID] if err := st.removeSector(ctx, sid, typ, info.ID); err != nil {
if !ok { return err
}
}
return nil
}
func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {
if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("delete expects one file type")
}
si, err := st.index.StorageFindSector(ctx, sid, typ, false)
if err != nil {
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", sid, typ, err)
}
var hasPrimary bool
for _, info := range si {
if info.Primary {
hasPrimary = true
break
}
}
if !hasPrimary {
log.Warnf("RemoveCopies: no primary copies of sector %v (%s), not removing anything", sid, typ)
return nil
}
for _, info := range si {
if info.Primary {
continue continue
} }
if err := st.removeSector(ctx, sid, typ, info.ID); err != nil {
return err
}
}
return nil
}
func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorFileType, storage ID) error {
p, ok := st.paths[storage]
if !ok {
return nil
}
if p.local == "" { // TODO: can that even be the case? if p.local == "" { // TODO: can that even be the case?
continue return nil
} }
if err := st.index.StorageDropSector(ctx, info.ID, sid, typ); err != nil { if err := st.index.StorageDropSector(ctx, storage, sid, typ); err != nil {
return xerrors.Errorf("dropping sector from index: %w", err) return xerrors.Errorf("dropping sector from index: %w", err)
} }
@ -347,19 +394,18 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
if err := os.RemoveAll(spath); err != nil { if err := os.RemoveAll(spath); err != nil {
log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err) log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err)
} }
}
return nil return nil
} }
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false) dest, destIds, sdone, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err) return xerrors.Errorf("acquire dest storage: %w", err)
} }
defer sdone() defer sdone()
src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false) src, srcIds, ddone, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire src storage: %w", err) return xerrors.Errorf("acquire src storage: %w", err)
} }
@ -401,7 +447,7 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.Regist
return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err) return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err)
} }
if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType); err != nil { if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType, true); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(PathByType(destIds, fileType)), err) return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(PathByType(destIds, fileType)), err)
} }
} }
@ -420,7 +466,7 @@ func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
return FsStat{}, errPathNotFound return FsStat{}, errPathNotFound
} }
return Stat(p.local) return st.localStorage.Stat(p.local)
} }
var _ Store = &Local{} var _ Store = &Local{}

90
stores/local_test.go Normal file
View File

@ -0,0 +1,90 @@
package stores
import (
"context"
"encoding/json"
"github.com/google/uuid"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
)
const pathSize = 16 << 20
type TestingLocalStorage struct {
root string
c StorageConfig
}
func (t *TestingLocalStorage) GetStorage() (StorageConfig, error) {
return t.c, nil
}
func (t *TestingLocalStorage) SetStorage(f func(*StorageConfig)) error {
f(&t.c)
return nil
}
func (t *TestingLocalStorage) Stat(path string) (FsStat, error) {
return FsStat{
Capacity: pathSize,
Available: pathSize,
Used: 0,
}, nil
}
func (t *TestingLocalStorage) init(subpath string) error {
path := filepath.Join(t.root, subpath)
if err := os.Mkdir(path, 0755); err != nil {
return err
}
metaFile := filepath.Join(path, MetaFile)
meta := &LocalStorageMeta{
ID: ID(uuid.New().String()),
Weight: 1,
CanSeal: true,
CanStore: true,
}
mb, err := json.MarshalIndent(meta, "", " ")
if err != nil {
return err
}
if err := ioutil.WriteFile(metaFile, mb, 0644); err != nil {
return err
}
return nil
}
var _ LocalStorage = &TestingLocalStorage{}
func TestLocalStorage(t *testing.T) {
ctx := context.TODO()
root, err := ioutil.TempDir("", "sector-storage-teststorage-")
require.NoError(t, err)
tstor := &TestingLocalStorage{
root: root,
}
index := NewIndex()
st, err := NewLocal(ctx, tstor, index, nil)
require.NoError(t, err)
p1 := "1"
require.NoError(t, tstor.init("1"))
err = st.OpenPath(ctx, filepath.Join(tstor.root, p1))
require.NoError(t, err)
// TODO: put more things here
}

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/storiface"
"github.com/filecoin-project/sector-storage/tarutil" "github.com/filecoin-project/sector-storage/tarutil"
) )
@ -31,6 +32,14 @@ type Remote struct {
fetching map[abi.SectorID]chan struct{} fetching map[abi.SectorID]chan struct{}
} }
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error {
// TODO: do this on remotes too
// (not that we really need to do that since it's always called by the
// worker which pulled the copy)
return r.local.RemoveCopies(ctx, s, types)
}
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
return &Remote{ return &Remote{
local: local, local: local,
@ -41,7 +50,7 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
} }
} }
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
} }
@ -73,7 +82,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
r.fetchLk.Unlock() r.fetchLk.Unlock()
}() }()
paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, sealing) paths, stores, done, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op)
if err != nil { if err != nil {
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
} }
@ -87,7 +96,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
continue continue
} }
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, sealing) ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, spt, fileType, pathType, op)
if err != nil { if err != nil {
done() done()
return SectorPaths{}, SectorPaths{}, nil, err return SectorPaths{}, SectorPaths{}, nil, err
@ -97,35 +106,36 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
SetPathByType(&paths, fileType, ap) SetPathByType(&paths, fileType, ap)
SetPathByType(&stores, fileType, string(storageID)) SetPathByType(&stores, fileType, string(storageID))
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil { if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType, op == AcquireMove); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
continue continue
} }
// TODO: some way to allow having duplicated sectors in the system for perf if op == AcquireMove {
if err := r.deleteFromRemote(ctx, url); err != nil { if err := r.deleteFromRemote(ctx, url); err != nil {
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
} }
} }
}
return paths, stores, done, nil return paths, stores, done, nil
} }
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) { func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, fileType SectorFileType, pathType PathType, op AcquireMode) (string, ID, string, func(), error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, false) si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil { if err != nil {
return "", "", "", nil, err return "", "", "", nil, err
} }
if len(si) == 0 { if len(si) == 0 {
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): not found", s, fileType) return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): %w", s, fileType, storiface.ErrSectorNotFound)
} }
sort.Slice(si, func(i, j int) bool { sort.Slice(si, func(i, j int) bool {
return si[i].Weight < si[j].Weight return si[i].Weight < si[j].Weight
}) })
apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, sealing) apaths, ids, done, err := r.local.AcquireSector(ctx, s, spt, FTNone, fileType, pathType, op)
if err != nil { if err != nil {
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
} }
@ -205,7 +215,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error { func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredProof, types SectorFileType) error {
// Make sure we have the data local // Make sure we have the data local
_, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, false) _, _, ddone, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire src storage (remote): %w", err) return xerrors.Errorf("acquire src storage (remote): %w", err)
} }

17
storiface/ffi.go Normal file
View File

@ -0,0 +1,17 @@
package storiface
import (
"errors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
var ErrSectorNotFound = errors.New("sector not found")
type UnpaddedByteIndex uint64
func (i UnpaddedByteIndex) Padded() PaddedByteIndex {
return PaddedByteIndex(abi.UnpaddedPieceSize(i).Padded())
}
type PaddedByteIndex uint64

View File

@ -2,6 +2,9 @@ package sectorstorage
import ( import (
"context" "context"
"io"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -46,6 +49,14 @@ func (t *testWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
panic("implement me") panic("implement me")
} }
func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
panic("implement me")
}
func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
panic("implement me")
}
func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
return t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) return t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
} }
@ -66,7 +77,7 @@ func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) er
panic("implement me") panic("implement me")
} }
func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool) error { func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool, am stores.AcquireMode) error {
return nil return nil
} }