Partial Files, use for sealing

This commit is contained in:
Łukasz Magiera 2020-05-14 17:35:38 +02:00
parent 4065c94c1f
commit f577c2120c
7 changed files with 377 additions and 18 deletions

255
ffiwrapper/partialfile.go Normal file
View File

@ -0,0 +1,255 @@
package ffiwrapper
import (
"encoding/binary"
"io"
"os"
"github.com/detailyang/go-fallocate"
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/specs-actors/actors/abi"
)
const veryLargeRle = 1 << 20
// Sectors can be partially unsealed. We support this by appending a small
// trailer to each unsealed sector file containing an RLE+ marking which bytes
// in a sector are unsealed, and which are not (holes)
// unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type partialFile struct {
maxPiece abi.UnpaddedPieceSize
path string
allocated rlepluslazy.RLE
file *os.File
}
func writeTrailer(psz int64, w *os.File, r rlepluslazy.RunIterator) error {
trailer, err := rlepluslazy.EncodeRuns(r, nil)
if err != nil {
return xerrors.Errorf("encoding trailer: %w", err)
}
if _, err := w.Seek(psz, io.SeekStart); err != nil {
return xerrors.Errorf("seek to trailer start: %w", err)
}
rb, err := w.Write(trailer)
if err != nil {
return xerrors.Errorf("writing trailer data: %w", err)
}
if err := binary.Write(w, binary.LittleEndian, uint32(len(trailer))); err != nil {
return xerrors.Errorf("writing trailer length: %w", err)
}
return w.Truncate(psz + int64(rb) + 4)
}
func createPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR | os.O_CREATE, 0644)
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
err = func() error {
err := fallocate.Fallocate(f, 0, int64(maxPieceSize))
if err != nil {
return xerrors.Errorf("fallocate '%s': %w", path, err)
}
if err := writeTrailer(int64(maxPieceSize), f, &rlepluslazy.RunSliceIterator{}); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}()
if err != nil {
f.Close()
return nil, err
}
if err := f.Close(); err != nil {
return nil, xerrors.Errorf("close empty partial file: %w", err)
}
return openPartialFile(maxPieceSize, path)
}
func openPartialFile(maxPieceSize abi.UnpaddedPieceSize, path string) (*partialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644)
if err != nil {
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
}
var rle rlepluslazy.RLE
err = func() error {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat '%s': %w", path, err)
}
if st.Size() < int64(maxPieceSize) {
return xerrors.Errorf("sector file '%s' was smaller than the sector size %d < %d", path, st.Size(), maxPieceSize)
}
// read trailer
var tlen [4]byte
_, err = f.ReadAt(tlen[:], st.Size() - int64(len(tlen)))
if err != nil {
return xerrors.Errorf("reading trailer length: %w", err)
}
// sanity-check the length
trailerLen := binary.LittleEndian.Uint32(tlen[:])
expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize)
if expectLen != st.Size() {
return xerrors.Errorf("file '%d' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen) + int64(len(tlen)), maxPieceSize)
}
if trailerLen > veryLargeRle {
log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen)
}
trailerStart := st.Size() - int64(len(tlen)) - int64(trailerLen)
if trailerStart != int64(maxPieceSize) {
return xerrors.Errorf("expected sector size to equal trailer start index")
}
trailerBytes := make([]byte, trailerLen)
_, err = f.ReadAt(trailerBytes, trailerStart)
if err != nil {
return xerrors.Errorf("reading trailer: %w", err)
}
rle, err = rlepluslazy.FromBuf(trailerBytes)
if err != nil {
return xerrors.Errorf("decoding trailer: %w", err)
}
it, err := rle.RunIterator()
if err != nil {
return xerrors.Errorf("getting trailer run iterator: %w", err)
}
lastSet, err := rlepluslazy.LastIndex(it, true)
if err != nil {
return xerrors.Errorf("finding last set byte index: %w", err)
}
if lastSet > uint64(maxPieceSize) {
return xerrors.Errorf("last set byte at index higher than sector size: %d > %d", lastSet, maxPieceSize)
}
return nil
}()
if err != nil {
f.Close()
return nil, err
}
return &partialFile{
maxPiece: maxPieceSize,
path: path,
allocated: rle,
file: f,
}, nil
}
func (pf *partialFile) Close() error {
return pf.file.Close()
}
func (pf *partialFile) Writer(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c > 0 {
log.Warnf("getting partial file writer overwriting %d allocated bytes", c)
}
}
return pf.file, nil
}
func (pf *partialFile) MarkAllocated(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
ored, err := rlepluslazy.Or(have, pieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, ored); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
func (pf *partialFile) Reader(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) (*os.File, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, pieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c != uint64(size) {
log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size) - c)
}
}
return pf.file, nil
}
func pieceRun(offset abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run
if offset > 0 {
runs = append(runs, rlepluslazy.Run{
Val: false,
Len: uint64(offset),
})
}
runs = append(runs, rlepluslazy.Run{
Val: true,
Len: uint64(size),
})
return &rlepluslazy.RunSliceIterator{Runs: runs}
}

View File

@ -7,6 +7,7 @@ import (
"io"
"math/bits"
"os"
"path/filepath"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -46,13 +47,20 @@ func (sb *Sealer) NewSector(ctx context.Context, sector abi.SectorID) error {
}
func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
return abi.PieceInfo{}, err
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
}
maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded()
if offset + pieceSize > maxPieceSize {
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
}
var err error
var done func()
var stagedFile *os.File
var stagedFile *partialFile
defer func() {
if done != nil {
@ -73,9 +81,9 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = os.Create(stagedPath.Unsealed)
stagedFile, err = createPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err)
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
}
} else {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, true)
@ -83,24 +91,35 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = os.OpenFile(stagedPath.Unsealed, os.O_RDWR, 0644)
stagedFile, err = openPartialFile(maxPieceSize, stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err)
}
if _, err := stagedFile.Seek(0, io.SeekEnd); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("seek end: %w", err)
return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err)
}
}
_, _, pieceCID, err := ffi.WriteWithAlignment(sb.sealProofType, f, pieceSize, stagedFile, existingPieceSizes)
w, err := stagedFile.Writer(offset, pieceSize)
if err != nil {
return abi.PieceInfo{}, err
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
}
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), w)
prf, werr, err := toReadableFile(pr, int64(pieceSize))
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting tee reader pipe: %w", err)
}
if err := f.Close(); err != nil {
pieceCID, err := ffi.GeneratePieceCIDFromFile(sb.sealProofType, prf, pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generating piece commitment: %w", err)
}
if err := stagedFile.MarkAllocated(offset, pieceSize); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}
if err := stagedFile.Close(); err != nil {
return abi.PieceInfo{}, err
}
stagedFile = nil
return abi.PieceInfo{
Size: pieceSize.Padded(),
@ -232,11 +251,22 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
staged := filepath.Join(paths.Cache, "staged")
if err := sb.rewriteAsPadded(paths.Unsealed, staged); err != nil {
return nil, xerrors.Errorf("rewriting sector as padded: %w", err)
}
defer func() {
if err := os.Remove(staged); err != nil {
log.Warnf("Removing staged sector file(%v): %s", sector, err)
}
}()
// TODO: context cancellation respect
p1o, err := ffi.SealPreCommitPhase1(
sb.sealProofType,
paths.Cache,
paths.Unsealed,
staged,
paths.Sealed,
sector.Number,
sector.Miner,
@ -249,6 +279,52 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
return p1o, nil
}
func (sb *Sealer) rewriteAsPadded(unsealed string, staged string) error {
maxPieceSize := abi.PaddedPieceSize(sb.ssize).Unpadded()
pf, err := openPartialFile(maxPieceSize, unsealed)
if err != nil {
return xerrors.Errorf("opening unsealed file: %w", err)
}
upr, err := pf.Reader(0, maxPieceSize)
if err != nil {
pf.Close()
return err
}
st, err := os.OpenFile(staged, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
pf.Close()
return xerrors.Errorf("openning staged file: %w", err)
}
// OPTIMIZATION: upr is a file, so it could be passed straight to
// WriteWithAlignment IF it wouldn't care about the trailer
lupr, werr, err := toReadableFile(io.LimitReader(upr, int64(maxPieceSize)), int64(maxPieceSize))
if err != nil {
return err
}
_, _, _, err = ffi.WriteWithAlignment(sb.sealProofType, lupr, maxPieceSize, st, nil)
if err != nil {
pf.Close()
st.Close()
return xerrors.Errorf("write with alignment: %w", err)
}
if err := st.Close(); err != nil {
pf.Close()
return err
}
if err := pf.Close(); err != nil {
return err
}
return werr()
}
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)
if err != nil {

View File

@ -1,8 +1,10 @@
package ffiwrapper
import (
"bytes"
"context"
"fmt"
ffi "github.com/filecoin-project/filecoin-ffi"
"io"
"io/ioutil"
"math/rand"
@ -351,3 +353,14 @@ func TestSealAndVerify2(t *testing.T) {
post(t, sb, s1, s2)
}
func TestScribbles(t *testing.T) {
rf, w, _ := toReadableFile(bytes.NewReader(bytes.Repeat([]byte{0xff, 0}, 127)), 254)
defer w()
tf, _ := ioutil.TempFile("/tmp/", "scrb-")
fmt.Println(tf.Name())
fmt.Println(ffi.WriteWithAlignment(abi.RegisteredProof_StackedDRG2KiBSeal, rf, 254, tf, nil))
}

2
go.mod
View File

@ -3,8 +3,10 @@ module github.com/filecoin-project/sector-storage
go 1.13
require (
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/elastic/go-sysinfo v1.3.0
github.com/filecoin-project/filecoin-ffi v0.0.0-20200326153646-e899cc1dd072
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518131841-989ba5ae71af
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5
github.com/filecoin-project/go-paramfetch v0.0.1
github.com/filecoin-project/specs-actors v0.4.1-0.20200508202406-42be6629284d

4
go.sum
View File

@ -19,6 +19,8 @@ github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e h1:lj77EKYUpYXTd8CD/+QMIf8b6OIOTsfEBSXiAzuEHTU=
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e/go.mod h1:3ZQK6DMPSz/QZ73jlWxBtUhNA8xZx7LzUFSq/OfP8vk=
github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE=
github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-windows v1.0.0 h1:qLURgZFkkrYyTTkvYpsZIgf83AUsdIHfvlJaqaZ7aSY=
@ -33,6 +35,8 @@ github.com/filecoin-project/go-bitfield v0.0.0-20200309034705-8c7ac40bd550 h1:ao
github.com/filecoin-project/go-bitfield v0.0.0-20200309034705-8c7ac40bd550/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw=
github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060 h1:/3qjGMn6ukXgZJHsIbuwGL7ipla8DOV3uHZDBJkBYfU=
github.com/filecoin-project/go-bitfield v0.0.0-20200416002808-b3ee67ec9060/go.mod h1:iodsLxOFZnqKtjj2zkgqzoGNrv6vUqj69AT/J8DKXEw=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518131841-989ba5ae71af h1:g34Sk2coFzyNUv61ZLQ+yyS4Fm8aJCqEaZMKf8Dv6Hs=
github.com/filecoin-project/go-bitfield v0.0.2-0.20200518131841-989ba5ae71af/go.mod h1:Ry9/iUlWSyjPUzlAvdnfy4Gtvrq4kWmWDztCU1yEgJY=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200208005934-2b8bd03caca5 h1:yvQJCW9mmi9zy+51xA01Ea2X7/dL7r8eKDPuGUjRmbo=

View File

@ -7,6 +7,7 @@ import (
"runtime"
"github.com/elastic/go-sysinfo"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
@ -181,6 +182,14 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return nil
}
func (l *LocalWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
panic("implement me")
}
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
panic("implement me")
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
return l.acceptTasks, nil
}

View File

@ -49,7 +49,7 @@ type Worker interface {
type SectorManager interface {
SectorSize() abi.SectorSize
ReadPieceFromSealedSector(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ffiwrapper.StorageSealer
storage.Prover
@ -189,7 +189,7 @@ func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool) fun
}
}
func (m *Manager) ReadPieceFromSealedSector(ctx context.Context, sink io.Writer, sector abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error {
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)