lotus/extern/sector-storage/ffiwrapper/sealer_cgo.go

1116 lines
31 KiB
Go
Raw Normal View History

//go:build cgo
// +build cgo
2020-03-26 02:50:56 +00:00
package ffiwrapper
import (
2020-05-29 15:21:10 +00:00
"bufio"
2020-06-09 09:13:23 +00:00
"bytes"
2020-03-26 02:50:56 +00:00
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
2020-03-26 02:50:56 +00:00
"io"
"math/bits"
"os"
2020-05-29 15:21:10 +00:00
"runtime"
2020-03-26 02:50:56 +00:00
2022-04-20 21:34:28 +00:00
"github.com/filecoin-project/go-state-types/proof"
2022-03-18 09:59:27 +00:00
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/detailyang/go-fallocate"
2020-03-26 02:50:56 +00:00
ffi "github.com/filecoin-project/filecoin-ffi"
2020-07-03 19:52:31 +00:00
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
2022-03-18 09:59:27 +00:00
commpffi "github.com/filecoin-project/go-commp-utils/ffiwrapper"
"github.com/filecoin-project/go-commp-utils/zerocomm"
2020-06-09 09:13:23 +00:00
commcid "github.com/filecoin-project/go-fil-commcid"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-03-26 02:50:56 +00:00
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fr32"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-03-26 02:50:56 +00:00
)
2020-03-26 19:34:38 +00:00
var _ Storage = &Sealer{}
2020-03-26 02:50:56 +00:00
func New(sectors SectorProvider) (*Sealer, error) {
2020-03-26 19:34:38 +00:00
sb := &Sealer{
2020-03-26 02:50:56 +00:00
sectors: sectors,
stopping: make(chan struct{}),
}
return sb, nil
}
func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error {
2020-03-26 02:50:56 +00:00
// TODO: Allocate the sector here instead of in addpiece
return nil
}
2022-04-26 16:22:52 +00:00
func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
maxSizeSpt := abi.RegisteredSealProof_StackedDrg64GiBV1_1
throttle := make(chan []byte, parallel)
piecePromises := make([]func() (abi.PieceInfo, error), 0)
buf := make([]byte, chunk.Unpadded())
for i := 0; i < parallel; i++ {
if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize {
break // won't use this many buffers
}
throttle <- make([]byte, chunk.Unpadded())
}
for {
var read int
for rbuf := buf; len(rbuf) > 0; {
n, err := pieceData.Read(rbuf)
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
}
rbuf = rbuf[n:]
read += n
if err == io.EOF {
break
}
}
if read == 0 {
break
}
done := make(chan struct {
cid.Cid
error
}, 1)
pbuf := <-throttle
copy(pbuf, buf[:read])
go func(read int) {
defer func() {
throttle <- pbuf
}()
c, err := sb.pieceCid(maxSizeSpt, pbuf[:read])
done <- struct {
cid.Cid
error
}{c, err}
}(read)
piecePromises = append(piecePromises, func() (abi.PieceInfo, error) {
select {
case e := <-done:
if e.error != nil {
return abi.PieceInfo{}, e.error
}
return abi.PieceInfo{
2022-04-27 18:34:55 +00:00
Size: abi.UnpaddedPieceSize(read).Padded(),
2022-04-26 16:22:52 +00:00
PieceCID: e.Cid,
}, nil
case <-ctx.Done():
return abi.PieceInfo{}, ctx.Err()
}
})
}
if len(piecePromises) == 1 {
return piecePromises[0]()
}
var payloadRoundedBytes abi.PaddedPieceSize
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pinfo, err := promise()
if err != nil {
return abi.PieceInfo{}, err
}
pieceCids[i] = pinfo
payloadRoundedBytes += pinfo.Size
}
pieceCID, err := ffi.GenerateUnsealedCID(maxSizeSpt, pieceCids)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
}
// validate that the pieceCID was properly formed
if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil {
return abi.PieceInfo{}, err
}
if payloadRoundedBytes < pieceSize.Padded() {
paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err)
}
pieceCID = paddedCid
}
return abi.PieceInfo{
Size: pieceSize.Padded(),
PieceCID: pieceCID,
}, nil
}
func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
2021-01-10 21:54:05 +00:00
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
2020-05-14 15:35:38 +00:00
var offset abi.UnpaddedPieceSize
for _, size := range existingPieceSizes {
offset += size
}
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return abi.PieceInfo{}, err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
2020-05-14 15:35:38 +00:00
if offset.Padded()+pieceSize.Padded() > maxPieceSize {
2020-05-14 15:35:38 +00:00
return abi.PieceInfo{}, xerrors.Errorf("can't add %d byte piece to sector %v with %d bytes of existing pieces", pieceSize, sector, offset)
2020-03-26 02:50:56 +00:00
}
var done func()
var stagedFile *partialfile.PartialFile
2020-03-26 02:50:56 +00:00
defer func() {
if done != nil {
done()
}
if stagedFile != nil {
if err := stagedFile.Close(); err != nil {
log.Errorf("closing staged file: %+v", err)
}
}
}()
2020-09-06 16:54:00 +00:00
var stagedPath storiface.SectorPaths
2020-03-26 02:50:56 +00:00
if len(existingPieceSizes) == 0 {
2020-09-06 16:54:00 +00:00
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, storiface.FTUnsealed, storiface.PathSealing)
2020-03-26 02:50:56 +00:00
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = partialfile.CreatePartialFile(maxPieceSize, stagedPath.Unsealed)
2020-03-26 02:50:56 +00:00
if err != nil {
2020-05-14 15:35:38 +00:00
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
2020-03-26 02:50:56 +00:00
}
} else {
2020-09-06 16:54:00 +00:00
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathSealing)
2020-03-26 02:50:56 +00:00
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = partialfile.OpenPartialFile(maxPieceSize, stagedPath.Unsealed)
2020-03-26 02:50:56 +00:00
if err != nil {
2020-05-14 15:35:38 +00:00
return abi.PieceInfo{}, xerrors.Errorf("opening unsealed sector file: %w", err)
2020-03-26 02:50:56 +00:00
}
2020-05-14 15:35:38 +00:00
}
2020-03-26 02:50:56 +00:00
w, err := stagedFile.Writer(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded())
2020-05-14 15:35:38 +00:00
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("getting partial file writer: %w", err)
}
2020-06-09 09:13:23 +00:00
pw := fr32.NewPadWriter(w)
2020-05-29 15:21:10 +00:00
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
2020-03-26 02:50:56 +00:00
2021-01-10 21:54:05 +00:00
throttle := make(chan []byte, parallel)
piecePromises := make([]func() (abi.PieceInfo, error), 0)
2020-06-09 09:13:23 +00:00
2020-06-09 10:06:21 +00:00
buf := make([]byte, chunk.Unpadded())
2021-01-10 21:54:05 +00:00
for i := 0; i < parallel; i++ {
if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize {
break // won't use this many buffers
}
throttle <- make([]byte, chunk.Unpadded())
}
2020-06-09 09:13:23 +00:00
for {
var read int
for rbuf := buf; len(rbuf) > 0; {
n, err := pr.Read(rbuf)
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
}
rbuf = rbuf[n:]
read += n
if err == io.EOF {
break
}
2020-06-09 09:13:23 +00:00
}
if read == 0 {
2020-06-09 09:13:23 +00:00
break
}
2021-01-10 21:54:21 +00:00
done := make(chan struct {
cid.Cid
error
}, 1)
2021-01-10 21:54:05 +00:00
pbuf := <-throttle
copy(pbuf, buf[:read])
go func(read int) {
defer func() {
throttle <- pbuf
}()
c, err := sb.pieceCid(sector.ProofType, pbuf[:read])
2021-01-10 21:54:21 +00:00
done <- struct {
cid.Cid
error
}{c, err}
2021-01-10 21:54:05 +00:00
}(read)
piecePromises = append(piecePromises, func() (abi.PieceInfo, error) {
select {
case e := <-done:
if e.error != nil {
return abi.PieceInfo{}, e.error
}
return abi.PieceInfo{
Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(),
PieceCID: e.Cid,
}, nil
case <-ctx.Done():
return abi.PieceInfo{}, ctx.Err()
}
2020-06-09 10:06:21 +00:00
})
2020-03-26 02:50:56 +00:00
}
2020-05-29 15:21:10 +00:00
if err := pw.Close(); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("closing padded writer: %w", err)
}
if err := stagedFile.MarkAllocated(storiface.UnpaddedByteIndex(offset).Padded(), pieceSize.Padded()); err != nil {
2020-05-14 15:35:38 +00:00
return abi.PieceInfo{}, xerrors.Errorf("marking data range as allocated: %w", err)
}
if err := stagedFile.Close(); err != nil {
2020-03-26 02:50:56 +00:00
return abi.PieceInfo{}, err
}
2020-05-14 15:35:38 +00:00
stagedFile = nil
2020-03-26 02:50:56 +00:00
2021-01-10 21:54:05 +00:00
if len(piecePromises) == 1 {
return piecePromises[0]()
}
var payloadRoundedBytes abi.PaddedPieceSize
2021-01-10 21:54:05 +00:00
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pinfo, err := promise()
2021-01-10 21:54:05 +00:00
if err != nil {
return abi.PieceInfo{}, err
}
pieceCids[i] = pinfo
payloadRoundedBytes += pinfo.Size
2020-06-09 09:13:23 +00:00
}
pieceCID, err := ffi.GenerateUnsealedCID(sector.ProofType, pieceCids)
2020-06-09 09:13:23 +00:00
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
}
2021-04-29 17:24:16 +00:00
// validate that the pieceCID was properly formed
if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil {
return abi.PieceInfo{}, err
}
if payloadRoundedBytes < pieceSize.Padded() {
paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err)
}
pieceCID = paddedCid
}
2020-03-26 02:50:56 +00:00
return abi.PieceInfo{
Size: pieceSize.Padded(),
PieceCID: pieceCID,
2020-06-09 09:13:23 +00:00
}, nil
2020-03-26 02:50:56 +00:00
}
func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, error) {
2020-11-23 15:09:09 +00:00
prf, werr, err := commpffi.ToReadableFile(bytes.NewReader(in), int64(len(in)))
2020-06-09 09:13:23 +00:00
if err != nil {
return cid.Undef, xerrors.Errorf("getting tee reader pipe: %w", err)
}
pieceCID, err := ffi.GeneratePieceCIDFromFile(spt, prf, abi.UnpaddedPieceSize(len(in)))
2020-06-09 09:13:23 +00:00
if err != nil {
return cid.Undef, xerrors.Errorf("generating piece commitment: %w", err)
}
2020-08-16 10:40:35 +00:00
_ = prf.Close()
2020-05-07 22:22:58 +00:00
2020-06-09 09:13:23 +00:00
return pieceCID, werr()
2020-05-07 22:22:58 +00:00
}
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string) (bool, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
return false, nil
} else if err != nil {
return false, xerrors.Errorf("reading updated replica: %w", err)
}
defer done()
// Sector data stored in replica update
updateProof, err := sector.ProofType.RegisteredUpdateProof()
if err != nil {
return false, err
}
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, paths.Update, paths.Sealed, paths.Cache, commD)
}
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
2020-05-18 22:08:11 +00:00
// try finding existing
2020-09-06 16:54:00 +00:00
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
var pf *partialfile.PartialFile
2020-05-18 22:08:11 +00:00
switch {
case xerrors.Is(err, storiface.ErrSectorNotFound):
2020-09-06 16:54:00 +00:00
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathStorage)
2020-05-18 22:08:11 +00:00
if err != nil {
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
}
defer done()
pf, err = partialfile.CreatePartialFile(maxPieceSize, unsealedPath.Unsealed)
2020-05-18 22:08:11 +00:00
if err != nil {
return xerrors.Errorf("create unsealed file: %w", err)
}
case err == nil:
defer done()
pf, err = partialfile.OpenPartialFile(maxPieceSize, unsealedPath.Unsealed)
2020-05-18 22:08:11 +00:00
if err != nil {
return xerrors.Errorf("opening partial file: %w", err)
}
default:
return xerrors.Errorf("acquire unsealed sector path (existing): %w", err)
}
2020-08-16 10:40:35 +00:00
defer pf.Close() // nolint
2020-05-18 22:08:11 +00:00
allocated, err := pf.Allocated()
if err != nil {
return xerrors.Errorf("getting bitruns of allocated data: %w", err)
}
toUnseal, err := computeUnsealRanges(allocated, offset, size)
if err != nil {
return xerrors.Errorf("computing unseal ranges: %w", err)
}
if !toUnseal.HasNext() {
return nil
}
// If piece data stored in updated replica decode whole sector
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("decoding sector from replica: %w", err)
}
if decoded {
return pf.MarkAllocated(0, maxPieceSize)
}
// Piece data sealed in sector
2020-09-06 16:54:00 +00:00
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathStorage)
2020-05-18 22:08:11 +00:00
if err != nil {
return xerrors.Errorf("acquire sealed sector paths: %w", err)
}
defer srcDone()
2020-08-16 10:40:35 +00:00
sealed, err := os.OpenFile(srcPaths.Sealed, os.O_RDONLY, 0644) // nolint:gosec
2020-06-08 21:53:31 +00:00
if err != nil {
return xerrors.Errorf("opening sealed file: %w", err)
}
2020-08-16 10:40:35 +00:00
defer sealed.Close() // nolint
2020-06-08 21:53:31 +00:00
var at, nextat abi.PaddedPieceSize
2020-07-08 17:51:26 +00:00
first := true
for first || toUnseal.HasNext() {
first = false
2020-05-18 22:08:11 +00:00
piece, err := toUnseal.NextRun()
if err != nil {
return xerrors.Errorf("getting next range to unseal: %w", err)
}
at = nextat
nextat += abi.PaddedPieceSize(piece.Len)
2020-05-18 22:08:11 +00:00
if !piece.Val {
continue
}
out, err := pf.Writer(offset.Padded(), size.Padded())
2020-05-18 22:08:11 +00:00
if err != nil {
return xerrors.Errorf("getting partial file writer: %w", err)
}
// <eww>
2020-06-08 21:53:31 +00:00
opr, opw, err := os.Pipe()
2020-05-18 22:08:11 +00:00
if err != nil {
2020-06-08 21:53:31 +00:00
return xerrors.Errorf("creating out pipe: %w", err)
2020-05-18 22:08:11 +00:00
}
2020-06-08 21:53:31 +00:00
2020-05-18 22:08:11 +00:00
var perr error
outWait := make(chan struct{})
{
go func() {
defer close(outWait)
2020-08-16 10:40:35 +00:00
defer opr.Close() // nolint
2020-05-18 23:03:42 +00:00
2020-06-09 09:13:23 +00:00
padwriter := fr32.NewPadWriter(out)
2020-05-29 15:21:10 +00:00
bsize := uint64(size.Padded())
if bsize > uint64(runtime.NumCPU())*fr32.MTTresh {
bsize = uint64(runtime.NumCPU()) * fr32.MTTresh
}
2020-06-09 10:06:21 +00:00
bw := bufio.NewWriterSize(padwriter, int(abi.PaddedPieceSize(bsize).Unpadded()))
2020-06-09 09:13:23 +00:00
_, err := io.CopyN(bw, opr, int64(size))
2020-06-09 09:13:23 +00:00
if err != nil {
perr = xerrors.Errorf("copying data: %w", err)
return
}
2020-05-29 15:21:10 +00:00
2020-06-09 09:13:23 +00:00
if err := bw.Flush(); err != nil {
perr = xerrors.Errorf("flushing unpadded data: %w", err)
return
}
if err := padwriter.Close(); err != nil {
perr = xerrors.Errorf("closing padwriter: %w", err)
return
}
2020-05-18 22:08:11 +00:00
}()
}
// </eww>
// TODO: This may be possible to do in parallel
err = ffi.UnsealRange(sector.ProofType,
2020-05-18 22:08:11 +00:00
srcPaths.Cache,
2020-06-08 21:53:31 +00:00
sealed,
opw,
sector.ID.Number,
sector.ID.Miner,
2020-05-18 22:08:11 +00:00
randomness,
2020-05-18 23:03:42 +00:00
commd,
uint64(at.Unpadded()),
uint64(abi.PaddedPieceSize(piece.Len).Unpadded()))
2020-06-08 21:53:31 +00:00
_ = opw.Close()
2020-06-08 21:53:31 +00:00
2020-05-18 22:08:11 +00:00
if err != nil {
return xerrors.Errorf("unseal range: %w", err)
}
select {
case <-outWait:
case <-ctx.Done():
return ctx.Err()
}
if perr != nil {
return xerrors.Errorf("piping output to unsealed file: %w", perr)
}
if err := pf.MarkAllocated(storiface.PaddedByteIndex(at), abi.PaddedPieceSize(piece.Len)); err != nil {
2020-05-18 22:08:11 +00:00
return xerrors.Errorf("marking unsealed range as allocated: %w", err)
}
if !toUnseal.HasNext() {
break
}
}
return nil
}
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
2020-09-06 16:54:00 +00:00
path, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
2020-05-18 22:08:11 +00:00
if err != nil {
return false, xerrors.Errorf("acquire unsealed sector path: %w", err)
2020-05-18 22:08:11 +00:00
}
defer done()
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return false, err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
2020-05-18 22:08:11 +00:00
pf, err := partialfile.OpenPartialFile(maxPieceSize, path.Unsealed)
if err != nil {
if xerrors.Is(err, os.ErrNotExist) {
return false, nil
}
return false, xerrors.Errorf("opening partial file: %w", err)
}
ok, err := pf.HasAllocated(offset, size)
if err != nil {
2020-08-16 10:40:35 +00:00
_ = pf.Close()
return false, err
}
if !ok {
2020-08-16 10:40:35 +00:00
_ = pf.Close()
return false, nil
2020-05-18 22:08:11 +00:00
}
f, err := pf.Reader(offset.Padded(), size.Padded())
2020-05-18 22:08:11 +00:00
if err != nil {
2020-08-16 10:40:35 +00:00
_ = pf.Close()
return false, xerrors.Errorf("getting partial file reader: %w", err)
2020-05-18 22:08:11 +00:00
}
upr, err := fr32.NewUnpadReader(f, size.Padded())
if err != nil {
return false, xerrors.Errorf("creating unpadded reader: %w", err)
}
if _, err := io.CopyN(writer, upr, int64(size)); err != nil {
2020-08-16 10:40:35 +00:00
_ = pf.Close()
return false, xerrors.Errorf("reading unsealed file: %w", err)
2020-05-18 22:08:11 +00:00
}
if err := pf.Close(); err != nil {
return false, xerrors.Errorf("closing partial file: %w", err)
2020-05-18 22:08:11 +00:00
}
return true, nil
2020-05-18 22:08:11 +00:00
}
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
2020-09-06 16:54:00 +00:00
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
2020-03-26 02:50:56 +00:00
if err != nil {
return nil, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
2020-08-16 10:40:35 +00:00
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
2020-03-26 02:50:56 +00:00
if err != nil {
return nil, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return nil, err
}
2020-08-16 10:40:35 +00:00
if err := os.Mkdir(paths.Cache, 0755); err != nil { // nolint
2020-03-26 02:50:56 +00:00
if os.IsExist(err) {
log.Warnf("existing cache in %s; removing", paths.Cache)
if err := os.RemoveAll(paths.Cache); err != nil {
return nil, xerrors.Errorf("remove existing sector cache from %s (sector %d): %w", paths.Cache, sector, err)
}
2020-08-16 10:40:35 +00:00
if err := os.Mkdir(paths.Cache, 0755); err != nil { // nolint:gosec
2020-03-26 02:50:56 +00:00
return nil, xerrors.Errorf("mkdir cache path after cleanup: %w", err)
}
} else {
return nil, err
}
}
var sum abi.UnpaddedPieceSize
for _, piece := range pieces {
sum += piece.Size.Unpadded()
}
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return nil, err
}
ussize := abi.PaddedPieceSize(ssize).Unpadded()
2020-03-26 02:50:56 +00:00
if sum != ussize {
return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
// TODO: context cancellation respect
p1o, err := ffi.SealPreCommitPhase1(
sector.ProofType,
2020-03-26 02:50:56 +00:00
paths.Cache,
paths.Unsealed,
2020-03-26 02:50:56 +00:00
paths.Sealed,
sector.ID.Number,
sector.ID.Miner,
2020-03-26 02:50:56 +00:00
ticket,
pieces,
)
if err != nil {
return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
2020-03-26 02:50:56 +00:00
}
p1odec := map[string]interface{}{}
if err := json.Unmarshal(p1o, &p1odec); err != nil {
return nil, xerrors.Errorf("unmarshaling pc1 output: %w", err)
}
p1odec["_lotus_SealRandomness"] = ticket
return json.Marshal(&p1odec)
2020-03-26 02:50:56 +00:00
}
var PC2CheckRounds = 3
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
2020-09-06 16:54:00 +00:00
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
2020-03-26 02:50:56 +00:00
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.ID.Number, paths.Unsealed, err)
2020-03-26 02:50:56 +00:00
}
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("get ssize: %w", err)
}
p1odec := map[string]interface{}{}
if err := json.Unmarshal(phase1Out, &p1odec); err != nil {
return storage.SectorCids{}, xerrors.Errorf("unmarshaling pc1 output: %w", err)
}
var ticket abi.SealRandomness
ti, found := p1odec["_lotus_SealRandomness"]
if found {
ticket, err = base64.StdEncoding.DecodeString(ti.(string))
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("decoding ticket: %w", err)
}
for i := 0; i < PC2CheckRounds; i++ {
var sd [32]byte
_, _ = rand.Read(sd[:])
_, err := ffi.SealCommitPhase1(
sector.ProofType,
sealedCID,
unsealedCID,
paths.Cache,
paths.Sealed,
sector.ID.Number,
sector.ID.Miner,
ticket,
sd[:],
[]abi.PieceInfo{{Size: abi.PaddedPieceSize(ssize), PieceCID: unsealedCID}},
)
if err != nil {
log.Warn("checking PreCommit failed: ", err)
log.Warnf("num:%d tkt:%v seed:%v sealedCID:%v, unsealedCID:%v", sector.ID.Number, ticket, sd[:], sealedCID, unsealedCID)
return storage.SectorCids{}, xerrors.Errorf("checking PreCommit failed: %w", err)
}
}
}
2020-03-26 02:50:56 +00:00
return storage.SectorCids{
Unsealed: unsealedCID,
Sealed: sealedCID,
}, nil
}
func (sb *Sealer) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
2020-09-06 16:54:00 +00:00
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
2020-03-26 02:50:56 +00:00
if err != nil {
return nil, xerrors.Errorf("acquire sector paths: %w", err)
}
defer done()
output, err := ffi.SealCommitPhase1(
sector.ProofType,
2020-03-26 02:50:56 +00:00
cids.Sealed,
cids.Unsealed,
paths.Cache,
paths.Sealed,
sector.ID.Number,
sector.ID.Miner,
2020-03-26 02:50:56 +00:00
ticket,
seed,
pieces,
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.ID.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed)
2020-03-26 02:50:56 +00:00
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return output, nil
}
func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storage.Proof, error) {
return ffi.SealCommitPhase2(phase1Out, sector.ID.Number, sector.ID.Miner)
2020-03-26 02:50:56 +00:00
}
func (sb *Sealer) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storage.ReplicaUpdateOut, error) {
empty := storage.ReplicaUpdateOut{}
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed|storiface.FTSealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing)
if err != nil {
return empty, xerrors.Errorf("failed to acquire sector paths: %w", err)
}
defer done()
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
s, err := os.Stat(paths.Sealed)
if err != nil {
return empty, err
}
sealedSize := s.Size()
u, err := os.OpenFile(paths.Update, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
if err != nil {
return empty, xerrors.Errorf("ensuring updated replica file exists: %w", err)
}
if err := fallocate.Fallocate(u, 0, sealedSize); err != nil {
return empty, xerrors.Errorf("allocating space for replica update file: %w", err)
}
if err := u.Close(); err != nil {
return empty, err
}
if err := os.Mkdir(paths.UpdateCache, 0755); err != nil { // nolint
if os.IsExist(err) {
log.Warnf("existing cache in %s; removing", paths.UpdateCache)
if err := os.RemoveAll(paths.UpdateCache); err != nil {
return empty, xerrors.Errorf("remove existing sector cache from %s (sector %d): %w", paths.UpdateCache, sector, err)
}
if err := os.Mkdir(paths.UpdateCache, 0755); err != nil { // nolint:gosec
return empty, xerrors.Errorf("mkdir cache path after cleanup: %w", err)
}
} else {
return empty, err
}
}
sealed, unsealed, err := ffi.SectorUpdate.EncodeInto(updateProofType, paths.Update, paths.UpdateCache, paths.Sealed, paths.Cache, paths.Unsealed, pieces)
if err != nil {
return empty, xerrors.Errorf("failed to update replica %d with new deal data: %w", sector.ID.Number, err)
}
return storage.ReplicaUpdateOut{NewSealed: sealed, NewUnsealed: unsealed}, nil
}
func (sb *Sealer) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storage.ReplicaVanillaProofs, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return nil, xerrors.Errorf("failed to acquire sector paths: %w", err)
}
defer done()
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
vanillaProofs, err := ffi.SectorUpdate.GenerateUpdateVanillaProofs(updateProofType, sectorKey, newSealed, newUnsealed, paths.Update, paths.UpdateCache, paths.Sealed, paths.Cache)
if err != nil {
return nil, xerrors.Errorf("failed to generate proof of replica update for sector %d: %w", sector.ID.Number, err)
}
return vanillaProofs, nil
}
func (sb *Sealer) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storage.ReplicaUpdateProof, error) {
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
return ffi.SectorUpdate.GenerateUpdateProofWithVanilla(updateProofType, sectorKey, newSealed, newUnsealed, vanillaProofs)
}
func (sb *Sealer) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) error {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTSealed, storiface.PathSealing)
defer done()
if err != nil {
return xerrors.Errorf("failed to acquire sector paths: %w", err)
}
s, err := os.Stat(paths.Update)
if err != nil {
return xerrors.Errorf("measuring update file size: %w", err)
}
sealedSize := s.Size()
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
if err != nil {
return xerrors.Errorf("ensuring sector key file exists: %w", err)
}
if err := fallocate.Fallocate(e, 0, sealedSize); err != nil {
return xerrors.Errorf("allocating space for sector key file: %w", err)
}
if err := e.Close(); err != nil {
return err
}
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
return ffi.SectorUpdate.RemoveData(updateProofType, paths.Sealed, paths.Cache, paths.Update, paths.UpdateCache, paths.Unsealed, commD)
}
func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
return xerrors.Errorf("not supported at this layer")
}
2022-02-03 12:51:59 +00:00
func (sb *Sealer) freeUnsealed(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
maxPieceSize := abi.PaddedPieceSize(ssize)
if len(keepUnsealed) > 0 {
2020-07-03 19:52:31 +00:00
sr := partialfile.PieceRun(0, maxPieceSize)
2020-07-03 19:52:31 +00:00
for _, s := range keepUnsealed {
si := &rlepluslazy.RunSliceIterator{}
if s.Offset != 0 {
si.Runs = append(si.Runs, rlepluslazy.Run{Val: false, Len: uint64(s.Offset)})
}
si.Runs = append(si.Runs, rlepluslazy.Run{Val: true, Len: uint64(s.Size)})
var err error
sr, err = rlepluslazy.Subtract(sr, si)
if err != nil {
return err
}
}
2020-09-06 16:54:00 +00:00
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathStorage)
2020-07-03 19:52:31 +00:00
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
pf, err := partialfile.OpenPartialFile(maxPieceSize, paths.Unsealed)
if err == nil {
var at uint64
for sr.HasNext() {
r, err := sr.NextRun()
if err != nil {
_ = pf.Close()
return err
}
2020-07-03 19:52:31 +00:00
offset := at
at += r.Len
if !r.Val {
continue
}
2020-07-03 19:52:31 +00:00
err = pf.Free(storiface.PaddedByteIndex(abi.UnpaddedPieceSize(offset).Padded()), abi.UnpaddedPieceSize(r.Len).Padded())
if err != nil {
_ = pf.Close()
return xerrors.Errorf("free partial file range: %w", err)
}
2020-07-03 19:52:31 +00:00
}
if err := pf.Close(); err != nil {
return err
}
} else {
if !xerrors.Is(err, os.ErrNotExist) {
return xerrors.Errorf("opening partial file: %w", err)
2020-07-03 19:52:31 +00:00
}
}
}
2022-02-03 12:51:59 +00:00
return nil
}
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
return err
}
2020-09-06 16:54:00 +00:00
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
2020-03-26 02:50:56 +00:00
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
return ffi.ClearCache(uint64(ssize), paths.Cache)
2020-03-26 02:50:56 +00:00
}
2022-02-02 20:23:35 +00:00
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
2022-02-03 12:51:59 +00:00
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
return err
}
2022-02-02 20:23:35 +00:00
2022-02-03 12:51:59 +00:00
{
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
2022-02-02 20:23:35 +00:00
2022-02-03 12:51:59 +00:00
if err := ffi.ClearCache(uint64(ssize), paths.Cache); err != nil {
return xerrors.Errorf("clear cache: %w", err)
2022-02-02 20:23:35 +00:00
}
2022-02-03 12:51:59 +00:00
}
2022-02-02 20:23:35 +00:00
2022-02-03 12:51:59 +00:00
{
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdateCache, 0, storiface.PathStorage)
2022-02-02 20:23:35 +00:00
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
if err := ffi.ClearCache(uint64(ssize), paths.UpdateCache); err != nil {
return xerrors.Errorf("clear cache: %w", err)
2022-02-02 20:23:35 +00:00
}
}
2022-02-03 12:51:59 +00:00
return nil
2022-02-02 20:23:35 +00:00
}
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
// This call is meant to mark storage as 'freeable'. Given that unsealing is
// very expensive, we don't remove data as soon as we can - instead we only
// do that when we don't have free space for data that really needs it
// This function should not be called at this layer, everything should be
// handled in localworker
return xerrors.Errorf("not supported at this layer")
}
func (sb *Sealer) ReleaseReplicaUpgrade(ctx context.Context, sector storage.SectorRef) error {
return xerrors.Errorf("not supported at this layer")
}
func (sb *Sealer) ReleaseSectorKey(ctx context.Context, sector storage.SectorRef) error {
return xerrors.Errorf("not supported at this layer")
}
func (sb *Sealer) Remove(ctx context.Context, sector storage.SectorRef) error {
return xerrors.Errorf("not supported at this layer") // happens in localworker
}
2020-07-12 01:30:16 +00:00
func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) {
2020-03-26 02:50:56 +00:00
2020-07-12 01:30:16 +00:00
padPieces := make([]abi.PaddedPieceSize, 0)
2020-07-12 01:30:16 +00:00
toFill := uint64(-oldLength % newPieceLength)
2020-03-26 02:50:56 +00:00
2020-07-12 01:30:16 +00:00
n := bits.OnesCount64(toFill)
var sum abi.PaddedPieceSize
for i := 0; i < n; i++ {
next := bits.TrailingZeros64(toFill)
psize := uint64(1) << uint(next)
toFill ^= psize
2020-07-12 01:30:16 +00:00
padded := abi.PaddedPieceSize(psize)
padPieces = append(padPieces, padded)
sum += padded
}
2020-07-12 01:30:16 +00:00
return padPieces, sum
}
func GenerateUnsealedCID(proofType abi.RegisteredSealProof, pieces []abi.PieceInfo) (cid.Cid, error) {
ssize, err := proofType.SectorSize()
if err != nil {
return cid.Undef, err
}
2020-07-12 01:30:16 +00:00
pssize := abi.PaddedPieceSize(ssize)
allPieces := make([]abi.PieceInfo, 0, len(pieces))
if len(pieces) == 0 {
allPieces = append(allPieces, abi.PieceInfo{
Size: pssize,
PieceCID: zerocomm.ZeroPieceCommitment(pssize.Unpadded()),
})
} else {
var sum abi.PaddedPieceSize
padTo := func(pads []abi.PaddedPieceSize) {
for _, p := range pads {
allPieces = append(allPieces, abi.PieceInfo{
Size: p,
PieceCID: zerocomm.ZeroPieceCommitment(p.Unpadded()),
})
sum += p
}
}
for _, p := range pieces {
ps, _ := GetRequiredPadding(sum, p.Size)
padTo(ps)
allPieces = append(allPieces, p)
sum += p.Size
}
ps, _ := GetRequiredPadding(sum, pssize)
padTo(ps)
}
return ffi.GenerateUnsealedCID(proofType, allPieces)
2020-03-26 02:50:56 +00:00
}
2022-01-14 13:11:04 +00:00
2022-04-20 21:34:28 +00:00
func (sb *Sealer) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, vanillas [][]byte) ([]proof.PoStProof, error) {
2022-01-14 13:11:04 +00:00
return ffi.GenerateWinningPoStWithVanilla(proofType, minerID, randomness, vanillas)
}
2022-04-20 21:34:28 +00:00
func (sb *Sealer) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof.PoStProof, error) {
2022-01-14 13:11:04 +00:00
pp, err := ffi.GenerateSinglePartitionWindowPoStWithVanilla(proofType, minerID, randomness, proofs, uint(partitionIdx))
2022-01-21 09:11:04 +00:00
if err != nil {
2022-04-20 21:34:28 +00:00
return proof.PoStProof{}, err
2022-01-14 13:11:04 +00:00
}
2022-01-21 09:11:04 +00:00
if pp == nil {
// should be impossible, but just in case do not panic
2022-04-20 21:34:28 +00:00
return proof.PoStProof{}, xerrors.New("postproof was nil")
2022-01-21 09:11:04 +00:00
}
2022-04-20 21:34:28 +00:00
return proof.PoStProof{
2022-01-14 13:11:04 +00:00
PoStProof: pp.PoStProof,
ProofBytes: pp.ProofBytes,
}, nil
}