package partialfile import ( "encoding/binary" "io" "os" "syscall" "github.com/detailyang/go-fallocate" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" rlepluslazy "github.com/filecoin-project/go-bitfield/rle" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/lib/readerutil" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) var log = logging.Logger("partialfile") const veryLargeRle = 1 << 20 // Sectors can be partially unsealed. We support this by appending a small // trailer to each unsealed sector file containing an RLE+ marking which bytes // in a sector are unsealed, and which are not (holes) // unsealed sector files internally have this structure // [unpadded (raw) data][rle+][4B LE length fo the rle+ field] type PartialFile struct { maxPiece abi.PaddedPieceSize path string allocated rlepluslazy.RLE file *os.File } func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) error { trailer, err := rlepluslazy.EncodeRuns(r, nil) if err != nil { return xerrors.Errorf("encoding trailer: %w", err) } // maxPieceSize == unpadded(sectorSize) == trailer start if _, err := w.Seek(maxPieceSize, io.SeekStart); err != nil { return xerrors.Errorf("seek to trailer start: %w", err) } rb, err := w.Write(trailer) if err != nil { return xerrors.Errorf("writing trailer data: %w", err) } if err := binary.Write(w, binary.LittleEndian, uint32(len(trailer))); err != nil { return xerrors.Errorf("writing trailer length: %w", err) } return w.Truncate(maxPieceSize + int64(rb) + 4) } func CreatePartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) { f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint if err != nil { return nil, xerrors.Errorf("openning partial file '%s': %w", path, err) } err = func() error { err := fallocate.Fallocate(f, 0, int64(maxPieceSize)) if errno, ok := err.(syscall.Errno); ok { if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS { log.Warnf("could not allocate space, ignoring: %v", errno) err = nil // log and ignore } } if err != nil { return xerrors.Errorf("fallocate '%s': %w", path, err) } if err := writeTrailer(int64(maxPieceSize), f, &rlepluslazy.RunSliceIterator{}); err != nil { return xerrors.Errorf("writing trailer: %w", err) } return nil }() if err != nil { _ = f.Close() return nil, err } if err := f.Close(); err != nil { return nil, xerrors.Errorf("close empty partial file: %w", err) } return OpenPartialFile(maxPieceSize, path) } func OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) { f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint if err != nil { return nil, xerrors.Errorf("openning partial file '%s': %w", path, err) } var rle rlepluslazy.RLE err = func() error { st, err := f.Stat() if err != nil { return xerrors.Errorf("stat '%s': %w", path, err) } if st.Size() < int64(maxPieceSize) { return xerrors.Errorf("sector file '%s' was smaller than the sector size %d < %d", path, st.Size(), maxPieceSize) } // read trailer var tlen [4]byte _, err = f.ReadAt(tlen[:], st.Size()-int64(len(tlen))) if err != nil { return xerrors.Errorf("reading trailer length: %w", err) } // sanity-check the length trailerLen := binary.LittleEndian.Uint32(tlen[:]) expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize) if expectLen != st.Size() { return xerrors.Errorf("file '%s' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen)+int64(len(tlen)), maxPieceSize) } if trailerLen > veryLargeRle { log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen) } trailerStart := st.Size() - int64(len(tlen)) - int64(trailerLen) if trailerStart != int64(maxPieceSize) { return xerrors.Errorf("expected sector size to equal trailer start index") } trailerBytes := make([]byte, trailerLen) _, err = f.ReadAt(trailerBytes, trailerStart) if err != nil { return xerrors.Errorf("reading trailer: %w", err) } rle, err = rlepluslazy.FromBuf(trailerBytes) if err != nil { return xerrors.Errorf("decoding trailer: %w", err) } it, err := rle.RunIterator() if err != nil { return xerrors.Errorf("getting trailer run iterator: %w", err) } f, err := rlepluslazy.Fill(it) if err != nil { return xerrors.Errorf("filling bitfield: %w", err) } lastSet, err := rlepluslazy.Count(f) if err != nil { return xerrors.Errorf("finding last set byte index: %w", err) } if lastSet > uint64(maxPieceSize) { return xerrors.Errorf("last set byte at index higher than sector size: %d > %d", lastSet, maxPieceSize) } return nil }() if err != nil { _ = f.Close() return nil, err } return &PartialFile{ maxPiece: maxPieceSize, path: path, allocated: rle, file: f, }, nil } func (pf *PartialFile) Close() error { return pf.file.Close() } func (pf *PartialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } { have, err := pf.allocated.RunIterator() if err != nil { return nil, err } and, err := rlepluslazy.And(have, PieceRun(offset, size)) if err != nil { return nil, err } c, err := rlepluslazy.Count(and) if err != nil { return nil, err } if c > 0 { log.Warnf("getting partial file writer overwriting %d allocated bytes", c) } } return pf.file, nil } func (pf *PartialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error { have, err := pf.allocated.RunIterator() if err != nil { return err } ored, err := rlepluslazy.Or(have, PieceRun(offset, size)) if err != nil { return err } if err := writeTrailer(int64(pf.maxPiece), pf.file, ored); err != nil { return xerrors.Errorf("writing trailer: %w", err) } return nil } func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error { have, err := pf.allocated.RunIterator() if err != nil { return err } if err := fsutil.Deallocate(pf.file, int64(offset), int64(size)); err != nil { return xerrors.Errorf("deallocating: %w", err) } s, err := rlepluslazy.Subtract(have, PieceRun(offset, size)) if err != nil { return err } if err := writeTrailer(int64(pf.maxPiece), pf.file, s); err != nil { return xerrors.Errorf("writing trailer: %w", err) } return nil } // Reader forks off a new reader from the underlying file, and returns a reader // starting at the given offset and reading the given size. Safe for concurrent // use. func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) { if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek piece start: %w", err) } { have, err := pf.allocated.RunIterator() if err != nil { return nil, err } and, err := rlepluslazy.And(have, PieceRun(offset, size)) if err != nil { return nil, err } c, err := rlepluslazy.Count(and) if err != nil { return nil, err } if c != uint64(size) { log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size)-c) } } return io.LimitReader(readerutil.NewReadSeekerFromReaderAt(pf.file, int64(offset)), int64(size)), nil } func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) { return pf.allocated.RunIterator() } func (pf *PartialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { have, err := pf.Allocated() if err != nil { return false, err } u, err := rlepluslazy.And(have, PieceRun(offset.Padded(), size.Padded())) if err != nil { return false, err } uc, err := rlepluslazy.Count(u) if err != nil { return false, err } return abi.PaddedPieceSize(uc) == size.Padded(), nil } func PieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator { var runs []rlepluslazy.Run if offset > 0 { runs = append(runs, rlepluslazy.Run{ Val: false, Len: uint64(offset), }) } runs = append(runs, rlepluslazy.Run{ Val: true, Len: uint64(size), }) return &rlepluslazy.RunSliceIterator{Runs: runs} }