lotus/storage/sealer/partialfile/partialfile.go
2024-03-14 15:32:50 +11:00

346 lines
9.0 KiB
Go

package partialfile
import (
"encoding/binary"
"io"
"os"
"syscall"
"github.com/detailyang/go-fallocate"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/readerutil"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var log = logging.Logger("partialfile")
const veryLargeRle = 1 << 20
// Sectors can be partially unsealed. We support this by appending a small
// trailer to each unsealed sector file containing an RLE+ marking which bytes
// in a sector are unsealed, and which are not (holes)
// unsealed sector files internally have this structure
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
type PartialFile struct {
maxPiece abi.PaddedPieceSize
path string
allocated rlepluslazy.RLE
file *os.File
}
func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) error {
trailer, err := rlepluslazy.EncodeRuns(r, nil)
if err != nil {
return xerrors.Errorf("encoding trailer: %w", err)
}
// maxPieceSize == unpadded(sectorSize) == trailer start
if _, err := w.Seek(maxPieceSize, io.SeekStart); err != nil {
return xerrors.Errorf("seek to trailer start: %w", err)
}
rb, err := w.Write(trailer)
if err != nil {
return xerrors.Errorf("writing trailer data: %w", err)
}
if err := binary.Write(w, binary.LittleEndian, uint32(len(trailer))); err != nil {
return xerrors.Errorf("writing trailer length: %w", err)
}
return w.Truncate(maxPieceSize + int64(rb) + 4)
}
func CreatePartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("opening partial file '%s': %w", path, err)
}
err = func() error {
err := fallocate.Fallocate(f, 0, int64(maxPieceSize))
if errno, ok := err.(syscall.Errno); ok {
if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS {
log.Warnf("could not allocate space, ignoring: %v", errno)
err = nil // log and ignore
}
}
if err != nil {
return xerrors.Errorf("fallocate '%s': %w", path, err)
}
if err := writeTrailer(int64(maxPieceSize), f, &rlepluslazy.RunSliceIterator{}); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}()
if err != nil {
_ = f.Close()
return nil, err
}
if err := f.Close(); err != nil {
return nil, xerrors.Errorf("close empty partial file: %w", err)
}
return OpenPartialFile(maxPieceSize, path)
}
func OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint
if err != nil {
return nil, xerrors.Errorf("opening partial file '%s': %w", path, err)
}
st, err := f.Stat()
if err != nil {
return nil, xerrors.Errorf("stat '%s': %w", path, err)
}
if st.Size() < int64(maxPieceSize) {
return nil, xerrors.Errorf("sector file '%s' was smaller than the sector size %d < %d", path, st.Size(), maxPieceSize)
}
if st.Size() == int64(maxPieceSize) {
log.Debugw("no partial file trailer, assuming fully allocated", "path", path)
allAlloc := &rlepluslazy.RunSliceIterator{Runs: []rlepluslazy.Run{{Val: true, Len: uint64(maxPieceSize)}}}
enc, err := rlepluslazy.EncodeRuns(allAlloc, []byte{})
if err != nil {
return nil, xerrors.Errorf("encoding full allocation: %w", err)
}
rle, err := rlepluslazy.FromBuf(enc)
if err != nil {
return nil, xerrors.Errorf("decoding full allocation: %w", err)
}
return &PartialFile{
maxPiece: maxPieceSize,
path: path,
allocated: rle,
file: f,
}, nil
}
var rle rlepluslazy.RLE
err = func() error {
// read trailer
var tlen [4]byte
_, err = f.ReadAt(tlen[:], st.Size()-int64(len(tlen)))
if err != nil {
return xerrors.Errorf("reading trailer length: %w", err)
}
// sanity-check the length
trailerLen := binary.LittleEndian.Uint32(tlen[:])
expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize)
if expectLen != st.Size() {
return xerrors.Errorf("file '%s' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen)+int64(len(tlen)), maxPieceSize)
}
if trailerLen > veryLargeRle {
log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen)
}
trailerStart := st.Size() - int64(len(tlen)) - int64(trailerLen)
if trailerStart != int64(maxPieceSize) {
return xerrors.Errorf("expected sector size to equal trailer start index")
}
trailerBytes := make([]byte, trailerLen)
_, err = f.ReadAt(trailerBytes, trailerStart)
if err != nil {
return xerrors.Errorf("reading trailer: %w", err)
}
rle, err = rlepluslazy.FromBuf(trailerBytes)
if err != nil {
return xerrors.Errorf("decoding trailer: %w", err)
}
it, err := rle.RunIterator()
if err != nil {
return xerrors.Errorf("getting trailer run iterator: %w", err)
}
f, err := rlepluslazy.Fill(it)
if err != nil {
return xerrors.Errorf("filling bitfield: %w", err)
}
lastSet, err := rlepluslazy.Count(f)
if err != nil {
return xerrors.Errorf("finding last set byte index: %w", err)
}
if lastSet > uint64(maxPieceSize) {
return xerrors.Errorf("last set byte at index higher than sector size: %d > %d", lastSet, maxPieceSize)
}
return nil
}()
if err != nil {
_ = f.Close()
return nil, err
}
return &PartialFile{
maxPiece: maxPieceSize,
path: path,
allocated: rle,
file: f,
}, nil
}
func (pf *PartialFile) Close() error {
return pf.file.Close()
}
func (pf *PartialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, PieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c > 0 {
log.Warnf("getting partial file writer overwriting %d allocated bytes", c)
}
}
return pf.file, nil
}
func (pf *PartialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
ored, err := rlepluslazy.Or(have, PieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, ored); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
have, err := pf.allocated.RunIterator()
if err != nil {
return err
}
if err := fsutil.Deallocate(pf.file, int64(offset), int64(size)); err != nil {
return xerrors.Errorf("deallocating: %w", err)
}
s, err := rlepluslazy.Subtract(have, PieceRun(offset, size))
if err != nil {
return err
}
if err := writeTrailer(int64(pf.maxPiece), pf.file, s); err != nil {
return xerrors.Errorf("writing trailer: %w", err)
}
return nil
}
// Reader forks off a new reader from the underlying file, and returns a reader
// starting at the given offset and reading the given size. Safe for concurrent
// use.
func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Reader, error) {
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek piece start: %w", err)
}
{
have, err := pf.allocated.RunIterator()
if err != nil {
return nil, err
}
and, err := rlepluslazy.And(have, PieceRun(offset, size))
if err != nil {
return nil, err
}
c, err := rlepluslazy.Count(and)
if err != nil {
return nil, err
}
if c != uint64(size) {
log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size)-c)
}
}
return io.LimitReader(readerutil.NewReadSeekerFromReaderAt(pf.file, int64(offset)), int64(size)), nil
}
func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) {
return pf.allocated.RunIterator()
}
func (pf *PartialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
have, err := pf.Allocated()
if err != nil {
return false, err
}
u, err := rlepluslazy.And(have, PieceRun(offset.Padded(), size.Padded()))
if err != nil {
return false, err
}
uc, err := rlepluslazy.Count(u)
if err != nil {
return false, err
}
return abi.PaddedPieceSize(uc) == size.Padded(), nil
}
func PieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
var runs []rlepluslazy.Run
if offset > 0 {
runs = append(runs, rlepluslazy.Run{
Val: false,
Len: uint64(offset),
})
}
runs = append(runs, rlepluslazy.Run{
Val: true,
Len: uint64(size),
})
return &rlepluslazy.RunSliceIterator{Runs: runs}
}