321 lines
8.1 KiB
Go
321 lines
8.1 KiB
Go
package partialfile
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"io"
|
|
"os"
|
|
"syscall"
|
|
|
|
"github.com/detailyang/go-fallocate"
|
|
"golang.org/x/xerrors"
|
|
|
|
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
|
|
|
logging "github.com/ipfs/go-log/v2"
|
|
)
|
|
|
|
var log = logging.Logger("partialfile")
|
|
|
|
const veryLargeRle = 1 << 20
|
|
|
|
// Sectors can be partially unsealed. We support this by appending a small
|
|
// trailer to each unsealed sector file containing an RLE+ marking which bytes
|
|
// in a sector are unsealed, and which are not (holes)
|
|
|
|
// unsealed sector files internally have this structure
|
|
// [unpadded (raw) data][rle+][4B LE length fo the rle+ field]
|
|
|
|
type PartialFile struct {
|
|
maxPiece abi.PaddedPieceSize
|
|
|
|
path string
|
|
allocated rlepluslazy.RLE
|
|
|
|
file *os.File
|
|
}
|
|
|
|
func writeTrailer(maxPieceSize int64, w *os.File, r rlepluslazy.RunIterator) error {
|
|
trailer, err := rlepluslazy.EncodeRuns(r, nil)
|
|
if err != nil {
|
|
return xerrors.Errorf("encoding trailer: %w", err)
|
|
}
|
|
|
|
// maxPieceSize == unpadded(sectorSize) == trailer start
|
|
if _, err := w.Seek(maxPieceSize, io.SeekStart); err != nil {
|
|
return xerrors.Errorf("seek to trailer start: %w", err)
|
|
}
|
|
|
|
rb, err := w.Write(trailer)
|
|
if err != nil {
|
|
return xerrors.Errorf("writing trailer data: %w", err)
|
|
}
|
|
|
|
if err := binary.Write(w, binary.LittleEndian, uint32(len(trailer))); err != nil {
|
|
return xerrors.Errorf("writing trailer length: %w", err)
|
|
}
|
|
|
|
return w.Truncate(maxPieceSize + int64(rb) + 4)
|
|
}
|
|
|
|
func CreatePartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
|
|
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) // nolint
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
|
|
}
|
|
|
|
err = func() error {
|
|
err := fallocate.Fallocate(f, 0, int64(maxPieceSize))
|
|
if errno, ok := err.(syscall.Errno); ok {
|
|
if errno == syscall.EOPNOTSUPP || errno == syscall.ENOSYS {
|
|
log.Warnf("could not allocated space, ignoring: %v", errno)
|
|
err = nil // log and ignore
|
|
}
|
|
}
|
|
if err != nil {
|
|
return xerrors.Errorf("fallocate '%s': %w", path, err)
|
|
}
|
|
|
|
if err := writeTrailer(int64(maxPieceSize), f, &rlepluslazy.RunSliceIterator{}); err != nil {
|
|
return xerrors.Errorf("writing trailer: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
_ = f.Close()
|
|
return nil, err
|
|
}
|
|
if err := f.Close(); err != nil {
|
|
return nil, xerrors.Errorf("close empty partial file: %w", err)
|
|
}
|
|
|
|
return OpenPartialFile(maxPieceSize, path)
|
|
}
|
|
|
|
func OpenPartialFile(maxPieceSize abi.PaddedPieceSize, path string) (*PartialFile, error) {
|
|
f, err := os.OpenFile(path, os.O_RDWR, 0644) // nolint
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("openning partial file '%s': %w", path, err)
|
|
}
|
|
|
|
var rle rlepluslazy.RLE
|
|
err = func() error {
|
|
st, err := f.Stat()
|
|
if err != nil {
|
|
return xerrors.Errorf("stat '%s': %w", path, err)
|
|
}
|
|
if st.Size() < int64(maxPieceSize) {
|
|
return xerrors.Errorf("sector file '%s' was smaller than the sector size %d < %d", path, st.Size(), maxPieceSize)
|
|
}
|
|
// read trailer
|
|
var tlen [4]byte
|
|
_, err = f.ReadAt(tlen[:], st.Size()-int64(len(tlen)))
|
|
if err != nil {
|
|
return xerrors.Errorf("reading trailer length: %w", err)
|
|
}
|
|
|
|
// sanity-check the length
|
|
trailerLen := binary.LittleEndian.Uint32(tlen[:])
|
|
expectLen := int64(trailerLen) + int64(len(tlen)) + int64(maxPieceSize)
|
|
if expectLen != st.Size() {
|
|
return xerrors.Errorf("file '%s' has inconsistent length; has %d bytes; expected %d (%d trailer, %d sector data)", path, st.Size(), expectLen, int64(trailerLen)+int64(len(tlen)), maxPieceSize)
|
|
}
|
|
if trailerLen > veryLargeRle {
|
|
log.Warnf("Partial file '%s' has a VERY large trailer with %d bytes", path, trailerLen)
|
|
}
|
|
|
|
trailerStart := st.Size() - int64(len(tlen)) - int64(trailerLen)
|
|
if trailerStart != int64(maxPieceSize) {
|
|
return xerrors.Errorf("expected sector size to equal trailer start index")
|
|
}
|
|
|
|
trailerBytes := make([]byte, trailerLen)
|
|
_, err = f.ReadAt(trailerBytes, trailerStart)
|
|
if err != nil {
|
|
return xerrors.Errorf("reading trailer: %w", err)
|
|
}
|
|
|
|
rle, err = rlepluslazy.FromBuf(trailerBytes)
|
|
if err != nil {
|
|
return xerrors.Errorf("decoding trailer: %w", err)
|
|
}
|
|
|
|
it, err := rle.RunIterator()
|
|
if err != nil {
|
|
return xerrors.Errorf("getting trailer run iterator: %w", err)
|
|
}
|
|
|
|
f, err := rlepluslazy.Fill(it)
|
|
if err != nil {
|
|
return xerrors.Errorf("filling bitfield: %w", err)
|
|
}
|
|
lastSet, err := rlepluslazy.Count(f)
|
|
if err != nil {
|
|
return xerrors.Errorf("finding last set byte index: %w", err)
|
|
}
|
|
|
|
if lastSet > uint64(maxPieceSize) {
|
|
return xerrors.Errorf("last set byte at index higher than sector size: %d > %d", lastSet, maxPieceSize)
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
_ = f.Close()
|
|
return nil, err
|
|
}
|
|
|
|
return &PartialFile{
|
|
maxPiece: maxPieceSize,
|
|
path: path,
|
|
allocated: rle,
|
|
file: f,
|
|
}, nil
|
|
}
|
|
|
|
func (pf *PartialFile) Close() error {
|
|
return pf.file.Close()
|
|
}
|
|
|
|
func (pf *PartialFile) Writer(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (io.Writer, error) {
|
|
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
|
|
return nil, xerrors.Errorf("seek piece start: %w", err)
|
|
}
|
|
|
|
{
|
|
have, err := pf.allocated.RunIterator()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
and, err := rlepluslazy.And(have, PieceRun(offset, size))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c, err := rlepluslazy.Count(and)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if c > 0 {
|
|
log.Warnf("getting partial file writer overwriting %d allocated bytes", c)
|
|
}
|
|
}
|
|
|
|
return pf.file, nil
|
|
}
|
|
|
|
func (pf *PartialFile) MarkAllocated(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
|
|
have, err := pf.allocated.RunIterator()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ored, err := rlepluslazy.Or(have, PieceRun(offset, size))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := writeTrailer(int64(pf.maxPiece), pf.file, ored); err != nil {
|
|
return xerrors.Errorf("writing trailer: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pf *PartialFile) Free(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) error {
|
|
have, err := pf.allocated.RunIterator()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := fsutil.Deallocate(pf.file, int64(offset), int64(size)); err != nil {
|
|
return xerrors.Errorf("deallocating: %w", err)
|
|
}
|
|
|
|
s, err := rlepluslazy.Subtract(have, PieceRun(offset, size))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := writeTrailer(int64(pf.maxPiece), pf.file, s); err != nil {
|
|
return xerrors.Errorf("writing trailer: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pf *PartialFile) Reader(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) (*os.File, error) {
|
|
if _, err := pf.file.Seek(int64(offset), io.SeekStart); err != nil {
|
|
return nil, xerrors.Errorf("seek piece start: %w", err)
|
|
}
|
|
|
|
{
|
|
have, err := pf.allocated.RunIterator()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
and, err := rlepluslazy.And(have, PieceRun(offset, size))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c, err := rlepluslazy.Count(and)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if c != uint64(size) {
|
|
log.Warnf("getting partial file reader reading %d unallocated bytes", uint64(size)-c)
|
|
}
|
|
}
|
|
|
|
return pf.file, nil
|
|
}
|
|
|
|
func (pf *PartialFile) Allocated() (rlepluslazy.RunIterator, error) {
|
|
return pf.allocated.RunIterator()
|
|
}
|
|
|
|
func (pf *PartialFile) HasAllocated(offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
|
|
have, err := pf.Allocated()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
u, err := rlepluslazy.And(have, PieceRun(offset.Padded(), size.Padded()))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
uc, err := rlepluslazy.Count(u)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return abi.PaddedPieceSize(uc) == size.Padded(), nil
|
|
}
|
|
|
|
func PieceRun(offset storiface.PaddedByteIndex, size abi.PaddedPieceSize) rlepluslazy.RunIterator {
|
|
var runs []rlepluslazy.Run
|
|
if offset > 0 {
|
|
runs = append(runs, rlepluslazy.Run{
|
|
Val: false,
|
|
Len: uint64(offset),
|
|
})
|
|
}
|
|
|
|
runs = append(runs, rlepluslazy.Run{
|
|
Val: true,
|
|
Len: uint64(size),
|
|
})
|
|
|
|
return &rlepluslazy.RunSliceIterator{Runs: runs}
|
|
}
|