Merge sectorbuilder into sectorstorage

This commit is contained in:
Łukasz Magiera 2020-03-26 03:50:56 +01:00
parent 3081a1b9c7
commit e8e90300f0
24 changed files with 1504 additions and 177 deletions

70
ffiwrapper/basicfs/fs.go Normal file
View File

@ -0,0 +1,70 @@
package basicfs
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
)
type sectorFile struct {
abi.SectorID
stores.SectorFileType
}
type Provider struct {
Root string
lk sync.Mutex
waitSector map[sectorFile]chan struct{}
}
func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
os.Mkdir(filepath.Join(b.Root, stores.FTUnsealed.String()), 0755)
os.Mkdir(filepath.Join(b.Root, stores.FTSealed.String()), 0755)
os.Mkdir(filepath.Join(b.Root, stores.FTCache.String()), 0755)
done := func() {}
for i := 0; i < 3; i++ {
if (existing|allocate)&(1<<i) == 0 {
continue
}
b.lk.Lock()
if b.waitSector == nil {
b.waitSector = map[sectorFile]chan struct{}{}
}
ch, found := b.waitSector[sectorFile{id, 1 << i}]
if !found {
ch = make(chan struct{}, 1)
b.waitSector[sectorFile{id, 1 << i}] = ch
}
b.lk.Unlock()
select {
case ch <- struct{}{}:
case <-ctx.Done():
done()
return stores.SectorPaths{}, nil, ctx.Err()
}
prevDone := done
done = func() {
prevDone()
<-ch
}
}
return stores.SectorPaths{
Id: id,
Unsealed: filepath.Join(b.Root, stores.FTUnsealed.String(), fmt.Sprintf("s-t0%d-%d", id.Miner, id.Number)),
Sealed: filepath.Join(b.Root, stores.FTSealed.String(), fmt.Sprintf("s-t0%d-%d", id.Miner, id.Number)),
Cache: filepath.Join(b.Root, stores.FTCache.String(), fmt.Sprintf("s-t0%d-%d", id.Miner, id.Number)),
}, done, nil
}

79
ffiwrapper/config.go Normal file
View File

@ -0,0 +1,79 @@
package ffiwrapper
import (
"fmt"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type Config struct {
SealProofType abi.RegisteredProof
PoStProofType abi.RegisteredProof
_ struct{} // guard against nameless init
}
func sizeFromConfig(cfg Config) (abi.SectorSize, error) {
if cfg.SealProofType == abi.RegisteredProof(0) {
return abi.SectorSize(0), xerrors.New("must specify a seal proof type from abi.RegisteredProof")
}
if cfg.PoStProofType == abi.RegisteredProof(0) {
return abi.SectorSize(0), xerrors.New("must specify a PoSt proof type from abi.RegisteredProof")
}
s1, err := sizeFromProofType(cfg.SealProofType)
if err != nil {
return abi.SectorSize(0), err
}
s2, err := sizeFromProofType(cfg.PoStProofType)
if err != nil {
return abi.SectorSize(0), err
}
if s1 != s2 {
return abi.SectorSize(0), xerrors.Errorf("seal sector size %d does not equal PoSt sector size %d", s1, s2)
}
return s1, nil
}
func sizeFromProofType(p abi.RegisteredProof) (abi.SectorSize, error) {
x, err := p.RegisteredPoStProof()
if err != nil {
return 0, err
}
// values taken from https://github.com/filecoin-project/rust-fil-proofs/blob/master/filecoin-proofs/src/constants.rs#L11
switch x {
case abi.RegisteredProof_StackedDRG32GiBPoSt:
return 1 << 35, nil
case abi.RegisteredProof_StackedDRG2KiBPoSt:
return 2048, nil
case abi.RegisteredProof_StackedDRG8MiBPoSt:
return 1 << 23, nil
case abi.RegisteredProof_StackedDRG512MiBPoSt:
return 1 << 29, nil
default:
return abi.SectorSize(0), xerrors.Errorf("unsupported proof type: %+v", p)
}
}
// TODO: remove this method after implementing it along side the registered proofs and importing it from there.
func SectorSizeForRegisteredProof(p abi.RegisteredProof) (abi.SectorSize, error) {
switch p {
case abi.RegisteredProof_StackedDRG32GiBSeal, abi.RegisteredProof_StackedDRG32GiBPoSt:
return 32 << 30, nil
case abi.RegisteredProof_StackedDRG2KiBSeal, abi.RegisteredProof_StackedDRG2KiBPoSt:
return 2 << 10, nil
case abi.RegisteredProof_StackedDRG8MiBSeal, abi.RegisteredProof_StackedDRG8MiBPoSt:
return 8 << 20, nil
case abi.RegisteredProof_StackedDRG512MiBSeal, abi.RegisteredProof_StackedDRG512MiBPoSt:
return 512 << 20, nil
default:
return 0, fmt.Errorf("unsupported registered proof %d", p)
}
}

53
ffiwrapper/files.go Normal file
View File

@ -0,0 +1,53 @@
package ffiwrapper
import (
"io"
"os"
"sync"
"golang.org/x/xerrors"
)
func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {
return f, func() error { return nil }, nil
}
var w *os.File
f, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
var wait sync.Mutex
var werr error
wait.Lock()
go func() {
defer wait.Unlock()
var copied int64
copied, werr = io.CopyN(w, r, n)
if werr != nil {
log.Warnf("toReadableFile: copy error: %+v", werr)
}
err := w.Close()
if werr == nil && err != nil {
werr = err
log.Warnf("toReadableFile: close error: %+v", err)
return
}
if copied != n {
log.Warnf("copied different amount than expected: %d != %d", copied, n)
werr = xerrors.Errorf("copied different amount than expected: %d != %d", copied, n)
}
}()
return f, func() error {
wait.Lock()
return werr
}, nil
}

View File

@ -0,0 +1,18 @@
package ffiwrapper
// /////
// Proofs
// 1 / n
const SectorChallengeRatioDiv = 25
const MaxFallbackPostChallengeCount = 10
// extracted from lotus/chain/types/blockheader
func ElectionPostChallengeCount(sectors uint64, faults uint64) uint64 {
if sectors-faults == 0 {
return 0
}
// ceil(sectors / SectorChallengeRatioDiv)
return (sectors-faults-1)/SectorChallengeRatioDiv + 1
}

41
ffiwrapper/sealer.go Normal file
View File

@ -0,0 +1,41 @@
package ffiwrapper
import (
"github.com/filecoin-project/specs-actors/actors/abi"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("ffiwrapper")
type SectorBuilder struct {
sealProofType abi.RegisteredProof
postProofType abi.RegisteredProof
ssize abi.SectorSize // a function of sealProofType and postProofType
sectors SectorProvider
stopping chan struct{}
}
func fallbackPostChallengeCount(sectors uint64, faults uint64) uint64 {
challengeCount := ElectionPostChallengeCount(sectors, faults)
if challengeCount > MaxFallbackPostChallengeCount {
return MaxFallbackPostChallengeCount
}
return challengeCount
}
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
}
func (sb *SectorBuilder) SectorSize() abi.SectorSize {
return sb.ssize
}
func (sb *SectorBuilder) SealProofType() abi.RegisteredProof {
return sb.sealProofType
}
func (sb *SectorBuilder) PoStProofType() abi.RegisteredProof {
return sb.postProofType
}

400
ffiwrapper/sealer_cgo.go Normal file
View File

@ -0,0 +1,400 @@
//+build cgo
package ffiwrapper
import (
"context"
"io"
"math/bits"
"os"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
"github.com/filecoin-project/lotus/storage/sectorstorage/zerocomm"
)
var _ Basic = &SectorBuilder{}
func New(sectors SectorProvider, cfg *Config) (*SectorBuilder, error) {
sectorSize, err := sizeFromConfig(*cfg)
if err != nil {
return nil, err
}
sb := &SectorBuilder{
sealProofType: cfg.SealProofType,
postProofType: cfg.PoStProofType,
ssize: sectorSize,
sectors: sectors,
stopping: make(chan struct{}),
}
return sb, nil
}
func (sb *SectorBuilder) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, faults []abi.SectorNumber) (ffi.SortedPrivateSectorInfo, error) {
fmap := map[abi.SectorNumber]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
}
var out []ffi.PrivateSectorInfo
for _, s := range sectorInfo {
if _, faulty := fmap[s.SectorNumber]; faulty {
continue
}
paths, done, err := sb.sectors.AcquireSector(ctx, abi.SectorID{Miner: mid, Number: s.SectorNumber}, stores.FTCache|stores.FTSealed, 0, false)
if err != nil {
return ffi.SortedPrivateSectorInfo{}, xerrors.Errorf("acquire sector paths: %w", err)
}
done() // TODO: This is a tiny bit suboptimal
postProofType, err := s.RegisteredProof.RegisteredPoStProof()
if err != nil {
return ffi.SortedPrivateSectorInfo{}, xerrors.Errorf("acquiring registered PoSt proof from sector info %+v: %w", s, err)
}
out = append(out, ffi.PrivateSectorInfo{
CacheDirPath: paths.Cache,
PoStProofType: postProofType,
SealedSectorPath: paths.Sealed,
SectorInfo: s,
})
}
return ffi.NewSortedPrivateSectorInfo(out...), nil
}
func (sb *SectorBuilder) NewSector(ctx context.Context, sector abi.SectorID) error {
// TODO: Allocate the sector here instead of in addpiece
return nil
}
func (sb *SectorBuilder) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
return abi.PieceInfo{}, err
}
var done func()
var stagedFile *os.File
defer func() {
if done != nil {
done()
}
if stagedFile != nil {
if err := stagedFile.Close(); err != nil {
log.Errorf("closing staged file: %+v", err)
}
}
}()
var stagedPath stores.SectorPaths
if len(existingPieceSizes) == 0 {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, true)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = os.Create(stagedPath.Unsealed)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err)
}
} else {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, true)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
}
stagedFile, err = os.OpenFile(stagedPath.Unsealed, os.O_RDWR, 0644)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err)
}
if _, err := stagedFile.Seek(0, io.SeekEnd); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("seek end: %w", err)
}
}
_, _, pieceCID, err := ffi.WriteWithAlignment(sb.sealProofType, f, pieceSize, stagedFile, existingPieceSizes)
if err != nil {
return abi.PieceInfo{}, err
}
if err := f.Close(); err != nil {
return abi.PieceInfo{}, err
}
return abi.PieceInfo{
Size: pieceSize.Padded(),
PieceCID: pieceCID,
}, werr()
}
func (sb *SectorBuilder) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) {
path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTUnsealed, false)
if err != nil {
return nil, xerrors.Errorf("acquire unsealed sector path: %w", err)
}
defer doneUnsealed()
f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644)
if err == nil {
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}
if !os.IsNotExist(err) {
return nil, err
}
sealed, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed|stores.FTCache, 0, false)
if err != nil {
return nil, xerrors.Errorf("acquire sealed/cache sector path: %w", err)
}
defer doneSealed()
// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
err = ffi.Unseal(
sb.sealProofType,
sealed.Cache,
sealed.Sealed,
path.Unsealed,
sector.Number,
sector.Miner,
ticket,
unsealedCID,
)
if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err = os.OpenFile(string(path.Unsealed), os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}
func (sb *SectorBuilder) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, true)
if err != nil {
return nil, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return nil, err
}
if err := os.Mkdir(paths.Cache, 0755); err != nil {
if os.IsExist(err) {
log.Warnf("existing cache in %s; removing", paths.Cache)
if err := os.RemoveAll(paths.Cache); err != nil {
return nil, xerrors.Errorf("remove existing sector cache from %s (sector %d): %w", paths.Cache, sector, err)
}
if err := os.Mkdir(paths.Cache, 0755); err != nil {
return nil, xerrors.Errorf("mkdir cache path after cleanup: %w", err)
}
} else {
return nil, err
}
}
var sum abi.UnpaddedPieceSize
for _, piece := range pieces {
sum += piece.Size.Unpadded()
}
ussize := abi.PaddedPieceSize(sb.ssize).Unpadded()
if sum != ussize {
return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
// TODO: context cancellation respect
p1o, err := ffi.SealPreCommitPhase1(
sb.sealProofType,
paths.Cache,
paths.Unsealed,
paths.Sealed,
sector.Number,
sector.Miner,
ticket,
pieces,
)
if err != nil {
return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err)
}
return p1o, nil
}
func (sb *SectorBuilder) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
}
defer done()
sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err)
}
return storage.SectorCids{
Unsealed: unsealedCID,
Sealed: sealedCID,
}, nil
}
func (sb *SectorBuilder) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)
if err != nil {
return nil, xerrors.Errorf("acquire sector paths: %w", err)
}
defer done()
output, err := ffi.SealCommitPhase1(
sb.sealProofType,
cids.Sealed,
cids.Unsealed,
paths.Cache,
paths.Sealed,
sector.Number,
sector.Miner,
ticket,
seed,
pieces,
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed)
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return output, nil
}
func (sb *SectorBuilder) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (storage.Proof, error) {
return ffi.SealCommitPhase2(phase1Out, sector.Number, sector.Miner)
}
func (sb *SectorBuilder) GenerateFallbackPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) (storage.FallbackPostOut, error) {
privsectors, err := sb.pubSectorToPriv(ctx, miner, sectorInfo, faults)
if err != nil {
return storage.FallbackPostOut{}, err
}
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults)))
challengeSeed[31] = 0
candidates, err := ffi.GenerateCandidates(miner, challengeSeed, challengeCount, privsectors)
if err != nil {
return storage.FallbackPostOut{}, err
}
winners := make([]abi.PoStCandidate, len(candidates))
for idx := range winners {
winners[idx] = candidates[idx].Candidate
}
proof, err := ffi.GeneratePoSt(miner, privsectors, challengeSeed, winners)
return storage.FallbackPostOut{
PoStInputs: ffiToStorageCandidates(candidates),
Proof: proof,
}, err
}
func (sb *SectorBuilder) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, false)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
return ffi.ClearCache(paths.Cache)
}
func GeneratePieceCIDFromFile(proofType abi.RegisteredProof, piece io.Reader, pieceSize abi.UnpaddedPieceSize) (cid.Cid, error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return cid.Undef, err
}
pieceCID, err := ffi.GeneratePieceCIDFromFile(proofType, f, pieceSize)
if err != nil {
return cid.Undef, err
}
return pieceCID, werr()
}
func GenerateUnsealedCID(proofType abi.RegisteredProof, pieces []abi.PieceInfo) (cid.Cid, error) {
var sum abi.PaddedPieceSize
for _, p := range pieces {
sum += p.Size
}
ssize, err := SectorSizeForRegisteredProof(proofType)
if err != nil {
return cid.Undef, err
}
{
// pad remaining space with 0 CommPs
toFill := uint64(abi.PaddedPieceSize(ssize) - sum)
n := bits.OnesCount64(toFill)
for i := 0; i < n; i++ {
next := bits.TrailingZeros64(toFill)
psize := uint64(1) << uint(next)
toFill ^= psize
unpadded := abi.PaddedPieceSize(psize).Unpadded()
pieces = append(pieces, abi.PieceInfo{
Size: unpadded.Padded(),
PieceCID: zerocomm.ZeroPieceCommitment(unpadded),
})
}
}
return ffi.GenerateUnsealedCID(proofType, pieces)
}

356
ffiwrapper/sealer_test.go Normal file
View File

@ -0,0 +1,356 @@
package ffiwrapper
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"runtime"
"sync"
"testing"
"time"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper/basicfs"
)
func init() {
logging.SetLogLevel("*", "INFO") //nolint: errcheck
}
var sectorSize = abi.SectorSize(2048)
var sealProofType = abi.RegisteredProof_StackedDRG2KiBSeal
var postProofType = abi.RegisteredProof_StackedDRG2KiBPoSt
type seal struct {
id abi.SectorID
cids storage.SectorCids
pi abi.PieceInfo
ticket abi.SealRandomness
}
func (s *seal) precommit(t *testing.T, sb *SectorBuilder, id abi.SectorID, done func()) {
defer done()
dlen := abi.PaddedPieceSize(sectorSize).Unpadded()
var err error
r := io.LimitReader(rand.New(rand.NewSource(42+int64(id.Number))), int64(dlen))
s.pi, err = sb.AddPiece(context.TODO(), id, []abi.UnpaddedPieceSize{}, dlen, r)
if err != nil {
t.Fatalf("%+v", err)
}
s.ticket = abi.SealRandomness{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2}
p1, err := sb.SealPreCommit1(context.TODO(), id, s.ticket, []abi.PieceInfo{s.pi})
if err != nil {
t.Fatalf("%+v", err)
}
cids, err := sb.SealPreCommit2(context.TODO(), id, p1)
if err != nil {
t.Fatalf("%+v", err)
}
s.cids = cids
}
func (s *seal) commit(t *testing.T, sb *SectorBuilder, done func()) {
defer done()
seed := abi.InteractiveSealRandomness{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}
pc1, err := sb.SealCommit1(context.TODO(), s.id, s.ticket, seed, []abi.PieceInfo{s.pi}, s.cids)
if err != nil {
t.Fatalf("%+v", err)
}
proof, err := sb.SealCommit2(context.TODO(), s.id, pc1)
if err != nil {
t.Fatalf("%+v", err)
}
ok, err := ProofVerifier.VerifySeal(abi.SealVerifyInfo{
SectorID: s.id,
OnChain: abi.OnChainSealVerifyInfo{
SealedCID: s.cids.Sealed,
RegisteredProof: sealProofType,
Proof: proof,
SectorNumber: s.id.Number,
},
Randomness: s.ticket,
InteractiveRandomness: seed,
UnsealedCID: s.cids.Unsealed,
})
if err != nil {
t.Fatalf("%+v", err)
}
if !ok {
t.Fatal("proof failed to validate")
}
}
func post(t *testing.T, sb *SectorBuilder, seals ...seal) time.Time {
randomness := abi.PoStRandomness{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 7}
sis := make([]abi.SectorInfo, len(seals))
for i, s := range seals {
sis[i] = abi.SectorInfo{
RegisteredProof: sealProofType,
SectorNumber: s.id.Number,
SealedCID: s.cids.Sealed,
}
}
candidates, err := sb.GenerateEPostCandidates(context.TODO(), seals[0].id.Miner, sis, randomness, []abi.SectorNumber{})
if err != nil {
t.Fatalf("%+v", err)
}
genCandidates := time.Now()
if len(candidates) != 1 {
t.Fatal("expected 1 candidate")
}
candidatesPrime := make([]abi.PoStCandidate, len(candidates))
for idx := range candidatesPrime {
candidatesPrime[idx] = candidates[idx].Candidate
}
proofs, err := sb.ComputeElectionPoSt(context.TODO(), seals[0].id.Miner, sis, randomness, candidatesPrime)
if err != nil {
t.Fatalf("%+v", err)
}
ePoStChallengeCount := ElectionPostChallengeCount(uint64(len(sis)), 0)
ok, err := ProofVerifier.VerifyElectionPost(context.TODO(), abi.PoStVerifyInfo{
Randomness: randomness,
Candidates: candidatesPrime,
Proofs: proofs,
EligibleSectors: sis,
Prover: seals[0].id.Miner,
ChallengeCount: ePoStChallengeCount,
})
if err != nil {
t.Fatalf("%+v", err)
}
if !ok {
t.Fatal("bad post")
}
return genCandidates
}
func getGrothParamFileAndVerifyingKeys(s abi.SectorSize) {
dat, err := ioutil.ReadFile("./parameters.json")
if err != nil {
panic(xerrors.Errorf("failed to read contents of ./parameters.json: %w", err))
}
err = paramfetch.GetParams(dat, uint64(s))
if err != nil {
panic(xerrors.Errorf("failed to acquire Groth parameters for 2KiB sectors: %w", err))
}
}
// TestDownloadParams exists only so that developers and CI can pre-download
// Groth parameters and verifying keys before running the tests which rely on
// those parameters and keys. To do this, run the following command:
//
// go test -run=^TestDownloadParams
//
func TestDownloadParams(t *testing.T) {
getGrothParamFileAndVerifyingKeys(sectorSize)
}
func TestSealAndVerify(t *testing.T) {
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
t.Skip("this is slow")
}
_ = os.Setenv("RUST_LOG", "info")
getGrothParamFileAndVerifyingKeys(sectorSize)
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
cfg := &Config{
SealProofType: sealProofType,
PoStProofType: postProofType,
}
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp, cfg)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
t.Error(err)
}
}
defer cleanup()
si := abi.SectorID{Miner: miner, Number: 1}
s := seal{id: si}
start := time.Now()
s.precommit(t, sb, si, func() {})
precommit := time.Now()
s.commit(t, sb, func() {})
commit := time.Now()
genCandidiates := post(t, sb, s)
epost := time.Now()
post(t, sb, s)
if err := sb.FinalizeSector(context.TODO(), si); err != nil {
t.Fatalf("%+v", err)
}
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("Commit: %s\n", commit.Sub(precommit).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String())
}
func TestSealPoStNoCommit(t *testing.T) {
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
t.Skip("this is slow")
}
_ = os.Setenv("RUST_LOG", "info")
getGrothParamFileAndVerifyingKeys(sectorSize)
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
cfg := &Config{
SealProofType: sealProofType,
PoStProofType: postProofType,
}
sp := &basicfs.Provider{
Root: dir,
}
sb, err := New(sp, cfg)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", dir)
return
}
if err := os.RemoveAll(dir); err != nil {
t.Error(err)
}
}
defer cleanup()
si := abi.SectorID{Miner: miner, Number: 1}
s := seal{id: si}
start := time.Now()
s.precommit(t, sb, si, func() {})
precommit := time.Now()
if err := sb.FinalizeSector(context.TODO(), si); err != nil {
t.Fatal(err)
}
genCandidiates := post(t, sb, s)
epost := time.Now()
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(precommit).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String())
}
func TestSealAndVerify2(t *testing.T) {
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
t.Skip("this is slow")
}
_ = os.Setenv("RUST_LOG", "trace")
getGrothParamFileAndVerifyingKeys(sectorSize)
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
cfg := &Config{
SealProofType: sealProofType,
PoStProofType: postProofType,
}
sp := &basicfs.Provider{
Root: dir,
}
sb, err := New(sp, cfg)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if err := os.RemoveAll(dir); err != nil {
t.Error(err)
}
}
defer cleanup()
var wg sync.WaitGroup
si1 := abi.SectorID{Miner: miner, Number: 1}
si2 := abi.SectorID{Miner: miner, Number: 2}
s1 := seal{id: si1}
s2 := seal{id: si2}
wg.Add(2)
go s1.precommit(t, sb, si1, wg.Done) //nolint: staticcheck
time.Sleep(100 * time.Millisecond)
go s2.precommit(t, sb, si2, wg.Done) //nolint: staticcheck
wg.Wait()
wg.Add(2)
go s1.commit(t, sb, wg.Done) //nolint: staticcheck
go s2.commit(t, sb, wg.Done) //nolint: staticcheck
wg.Wait()
post(t, sb, s1, s2)
}

49
ffiwrapper/types.go Normal file
View File

@ -0,0 +1,49 @@
package ffiwrapper
import (
"context"
"errors"
"github.com/ipfs/go-cid"
"io"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
)
type UnpaddedByteIndex uint64
type Validator interface {
CanCommit(sector stores.SectorPaths) (bool, error)
CanProve(sector stores.SectorPaths) (bool, error)
}
type Sealer interface {
storage.Sealer
storage.Storage
}
type Basic interface {
storage.Prover
Sealer
ReadPieceFromSealedSector(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
}
type Verifier interface {
VerifySeal(abi.SealVerifyInfo) (bool, error)
VerifyElectionPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error)
VerifyFallbackPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error)
}
var ErrSectorNotFound = errors.New("sector not found")
type SectorProvider interface {
// * returns ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error)
}
var _ SectorProvider = &basicfs.Provider{}

View File

@ -0,0 +1,80 @@
//+build cgo
package ffiwrapper
import (
"context"
"go.opencensus.io/trace"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
)
func (sb *SectorBuilder) ComputeElectionPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
challengeSeed[31] = 0
privsects, err := sb.pubSectorToPriv(ctx, miner, sectorInfo, nil) // TODO: faults
if err != nil {
return nil, err
}
return ffi.GeneratePoSt(miner, privsects, challengeSeed, winners)
}
func (sb *SectorBuilder) GenerateEPostCandidates(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
privsectors, err := sb.pubSectorToPriv(ctx, miner, sectorInfo, faults)
if err != nil {
return nil, err
}
challengeSeed[31] = 0
challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults)))
pc, err := ffi.GenerateCandidates(miner, challengeSeed, challengeCount, privsectors)
if err != nil {
return nil, err
}
return ffiToStorageCandidates(pc), nil
}
func ffiToStorageCandidates(pc []ffi.PoStCandidateWithTicket) []storage.PoStCandidateWithTicket {
out := make([]storage.PoStCandidateWithTicket, len(pc))
for i := range out {
out[i] = storage.PoStCandidateWithTicket{
Candidate: pc[i].Candidate,
Ticket: pc[i].Ticket,
}
}
return out
}
var _ Verifier = ProofVerifier
type proofVerifier struct{}
var ProofVerifier = proofVerifier{}
func (proofVerifier) VerifySeal(info abi.SealVerifyInfo) (bool, error) {
return ffi.VerifySeal(info)
}
func (proofVerifier) VerifyElectionPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) {
return verifyPost(ctx, info)
}
func (proofVerifier) VerifyFallbackPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) {
return verifyPost(ctx, info)
}
func verifyPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) {
_, span := trace.StartSpan(ctx, "VerifyPoSt")
defer span.End()
info.Randomness[31] = 0
return ffi.VerifyPoSt(info)
}

View File

@ -4,6 +4,7 @@ import (
"container/list"
"context"
"errors"
"github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper"
"io"
"net/http"
"sync"
@ -13,7 +14,6 @@ import (
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
@ -30,7 +30,7 @@ var ErrNoWorkers = errors.New("no suitable workers found")
type URLs []string
type Worker interface {
sectorbuilder.Sealer
ffiwrapper.Sealer
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
@ -45,16 +45,16 @@ type Worker interface {
type SectorManager interface {
SectorSize() abi.SectorSize
ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
sectorbuilder.Sealer
ffiwrapper.Sealer
storage.Prover
}
type WorkerID uint64
type Manager struct {
scfg *sectorbuilder.Config
scfg *ffiwrapper.Config
ls stores.LocalStorage
storage *stores.Remote
@ -76,13 +76,13 @@ type Manager struct {
schedQueue *list.List // List[*workerRequest]
}
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, sc config.Storage, urls URLs, ca api.Common) (*Manager, error) {
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc config.Storage, urls URLs, ca api.Common) (*Manager, error) {
lstor, err := stores.NewLocal(ctx, ls, si, urls)
if err != nil {
return nil, err
}
prover, err := sectorbuilder.New(&readonlyProvider{stor: lstor}, cfg)
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -180,7 +180,7 @@ func (m *Manager) SectorSize() abi.SectorSize {
return sz
}
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
@ -273,9 +273,9 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
var best []stores.StorageInfo
var err error
if len(existingPieces) == 0 { // new
best, err = m.index.StorageBestAlloc(ctx, sectorbuilder.FTUnsealed, true)
best, err = m.index.StorageBestAlloc(ctx, stores.FTUnsealed, true)
} else { // append to existing
best, err = m.index.StorageFindSector(ctx, sector, sectorbuilder.FTUnsealed, false)
best, err = m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
@ -302,7 +302,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
// TODO: also consider where the unsealed data sits
best, err := m.index.StorageBestAlloc(ctx, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
best, err := m.index.StorageBestAlloc(ctx, stores.FTCache|stores.FTSealed, true)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -326,7 +326,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
// TODO: allow workers to fetch the sectors
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed, true)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -348,7 +348,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
}
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed, true)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -400,7 +400,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
}
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed, true)
best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, true)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}

View File

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"github.com/filecoin-project/lotus/storage/sectorstorage"
"github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper"
"io"
"io/ioutil"
"math/big"
@ -12,7 +13,6 @@ import (
"sync"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
@ -93,7 +93,7 @@ func (sb *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, existi
ss.lk.Lock()
defer ss.lk.Unlock()
c, err := sectorbuilder.GeneratePieceCIDFromFile(sb.proofType, r, size)
c, err := ffiwrapper.GeneratePieceCIDFromFile(sb.proofType, r, size)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to generate piece cid: %w", err)
}
@ -273,7 +273,7 @@ func (sb *SectorMgr) GenerateEPostCandidates(ctx context.Context, mid abi.ActorI
panic("todo")
}
n := sectorbuilder.ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults)))
n := ffiwrapper.ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults)))
if n > uint64(len(sectorInfo)) {
n = uint64(len(sectorInfo))
}
@ -298,7 +298,7 @@ func (sb *SectorMgr) GenerateEPostCandidates(ctx context.Context, mid abi.ActorI
return out, nil
}
func (sb *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) {
func (sb *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) {
if len(sb.sectors[sectorID].pieces) > 1 {
panic("implme")
}
@ -355,10 +355,10 @@ func (m mockVerif) VerifySeal(svi abi.SealVerifyInfo) (bool, error) {
}
func (m mockVerif) GenerateDataCommitment(pt abi.RegisteredProof, pieces []abi.PieceInfo) (cid.Cid, error) {
return sectorbuilder.GenerateUnsealedCID(pt, pieces)
return ffiwrapper.GenerateUnsealedCID(pt, pieces)
}
var MockVerifier = mockVerif{}
var _ sectorbuilder.Verifier = MockVerifier
var _ ffiwrapper.Verifier = MockVerifier
var _ sectorstorage.SectorManager = &SectorMgr{}

View File

@ -3,7 +3,7 @@ package mock
import (
"github.com/filecoin-project/go-address"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sectorstorage/zerocomm"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
@ -39,7 +39,7 @@ func PreSeal(ssize abi.SectorSize, maddr address.Address, sectors int) (*genesis
preseal := &genesis.PreSeal{}
preseal.ProofType = st
preseal.CommD = sectorbuilder.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
preseal.CommD = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
d, _ := commcid.CIDToPieceCommitmentV1(preseal.CommD)
r := commDR(d)
preseal.CommR = commcid.ReplicaCommitmentV1ToCID(r[:])

View File

@ -1,21 +1,21 @@
package sectorstorage
import (
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
"github.com/filecoin-project/specs-actors/actors/abi"
)
var FSOverheadSeal = map[sectorbuilder.SectorFileType]int{ // 10x overheads
sectorbuilder.FTUnsealed: 10,
sectorbuilder.FTSealed: 10,
sectorbuilder.FTCache: 70, // TODO: confirm for 32G
var FSOverheadSeal = map[stores.SectorFileType]int{ // 10x overheads
stores.FTUnsealed: 10,
stores.FTSealed: 10,
stores.FTCache: 70, // TODO: confirm for 32G
}
var FsOverheadFinalized = map[sectorbuilder.SectorFileType]int{
sectorbuilder.FTUnsealed: 10,
sectorbuilder.FTSealed: 10,
sectorbuilder.FTCache: 2,
var FsOverheadFinalized = map[stores.SectorFileType]int{
stores.FTUnsealed: 10,
stores.FTSealed: 10,
stores.FTCache: 2,
}
type Resources struct {

View File

@ -3,7 +3,6 @@ package sectorstorage
import (
"context"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -14,9 +13,9 @@ type readonlyProvider struct {
stor *stores.Local
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
if allocate != stores.FTNone {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
return stores.SectorPaths{}, nil, xerrors.New("read-only storage")
}
p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing)

View File

@ -1,56 +0,0 @@
package sectorutil
import (
"fmt"
"github.com/filecoin-project/go-sectorbuilder"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
func ParseSectorID(baseName string) (abi.SectorID, error) {
var n abi.SectorNumber
var mid abi.ActorID
read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n)
if err != nil {
return abi.SectorID{}, xerrors.Errorf("sscanf sector name ('%s'): %w", baseName, err)
}
if read != 2 {
return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read)
}
return abi.SectorID{
Miner: mid,
Number: n,
}, nil
}
func SectorName(sid abi.SectorID) string {
return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number)
}
func PathByType(sps sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType) string {
switch fileType {
case sectorbuilder.FTUnsealed:
return sps.Unsealed
case sectorbuilder.FTSealed:
return sps.Sealed
case sectorbuilder.FTCache:
return sps.Cache
}
panic("requested unknown path type")
}
func SetPathByType(sps *sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType, p string) {
switch fileType {
case sectorbuilder.FTUnsealed:
sps.Unsealed = p
case sectorbuilder.FTSealed:
sps.Sealed = p
case sectorbuilder.FTCache:
sps.Cache = p
}
}

View File

@ -1,8 +1,87 @@
package stores
import "github.com/filecoin-project/go-sectorbuilder"
import (
"fmt"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
)
const (
// TODO: move the other types here after we drop go-sectorbuilder
FTNone sectorbuilder.SectorFileType = 0
FTUnsealed SectorFileType = 1 << iota
FTSealed
FTCache
)
const (
FTNone SectorFileType = 0
)
type SectorFileType int
func (t SectorFileType) String() string {
switch t {
case FTUnsealed:
return "unsealed"
case FTSealed:
return "sealed"
case FTCache:
return "cache"
default:
return fmt.Sprintf("<unknown %d>", t)
}
}
type SectorPaths struct {
Id abi.SectorID
Unsealed string
Sealed string
Cache string
}
func ParseSectorID(baseName string) (abi.SectorID, error) {
var n abi.SectorNumber
var mid abi.ActorID
read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n)
if err != nil {
return abi.SectorID{}, xerrors.Errorf("sscanf sector name ('%s'): %w", baseName, err)
}
if read != 2 {
return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read)
}
return abi.SectorID{
Miner: mid,
Number: n,
}, nil
}
func SectorName(sid abi.SectorID) string {
return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number)
}
func PathByType(sps SectorPaths, fileType SectorFileType) string {
switch fileType {
case FTUnsealed:
return sps.Unsealed
case FTSealed:
return sps.Sealed
case FTCache:
return sps.Cache
}
panic("requested unknown path type")
}
func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) {
switch fileType {
case FTUnsealed:
sps.Unsealed = p
case FTSealed:
sps.Sealed = p
case FTCache:
sps.Cache = p
}
}

View File

@ -10,9 +10,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
)
var log = logging.Logger("stores")
@ -57,7 +55,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r)
id, err := sectorutil.ParseSectorID(vars["id"])
id, err := ParseSectorID(vars["id"])
if err != nil {
log.Error("%+v", err)
w.WriteHeader(500)
@ -78,7 +76,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
}
defer done()
path := sectorutil.PathByType(paths, ft)
path := PathByType(paths, ft)
if path == "" {
log.Error("acquired path was empty")
w.WriteHeader(500)
@ -117,7 +115,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
log.Infof("SERVE DELETE %s", r.URL)
vars := mux.Vars(r)
id, err := sectorutil.ParseSectorID(vars["id"])
id, err := ParseSectorID(vars["id"])
if err != nil {
log.Error("%+v", err)
w.WriteHeader(500)
@ -138,14 +136,14 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
}
}
func ftFromString(t string) (sectorbuilder.SectorFileType, error) {
func ftFromString(t string) (SectorFileType, error) {
switch t {
case sectorbuilder.FTUnsealed.String():
return sectorbuilder.FTUnsealed, nil
case sectorbuilder.FTSealed.String():
return sectorbuilder.FTSealed, nil
case sectorbuilder.FTCache.String():
return sectorbuilder.FTCache, nil
case FTUnsealed.String():
return FTUnsealed, nil
case FTSealed.String():
return FTSealed, nil
case FTCache.String():
return FTCache, nil
default:
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}

View File

@ -9,11 +9,8 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
)
// ID identifies sector storage by UUID. One sector storage should map to one
@ -34,16 +31,16 @@ type SectorIndex interface { // part of storage-miner api
StorageInfo(context.Context, ID) (StorageInfo, error)
// TODO: StorageUpdateStats(FsStat)
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error
StorageFindSector(ctx context.Context, sector abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error)
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error)
}
type Decl struct {
abi.SectorID
sectorbuilder.SectorFileType
SectorFileType
}
type storageEntry struct {
@ -66,10 +63,10 @@ func NewIndex() *Index {
}
func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
byID := map[ID]map[abi.SectorID]sectorbuilder.SectorFileType{}
byID := map[ID]map[abi.SectorID]SectorFileType{}
for id := range i.stores {
byID[id] = map[abi.SectorID]sectorbuilder.SectorFileType{}
byID[id] = map[abi.SectorID]SectorFileType{}
}
for decl, ids := range i.sectors {
for _, id := range ids {
@ -124,7 +121,7 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) er
return nil
}
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error {
i.lk.Lock()
defer i.lk.Unlock()
@ -148,7 +145,7 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
return nil
}
func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error {
func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error {
i.lk.Lock()
defer i.lk.Unlock()
@ -182,7 +179,7 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto
return nil
}
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) {
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
@ -214,7 +211,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
return nil, xerrors.Errorf("failed to parse url: %w", err)
}
rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s))
rl.Path = gopath.Join(rl.Path, ft.String(), SectorName(s))
urls[k] = rl.String()
}
@ -240,7 +237,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
return nil, xerrors.Errorf("failed to parse url: %w", err)
}
rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s))
rl.Path = gopath.Join(rl.Path, ft.String(), SectorName(s))
urls[k] = rl.String()
}
@ -269,7 +266,7 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
return *si.info, nil
}
func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) {
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error) {
i.lk.RLock()
defer i.lk.RUnlock()
@ -309,7 +306,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.Sec
return out, nil
}
func (i *Index) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]ID, error) {
func (i *Index) FindSector(id abi.SectorID, typ SectorFileType) ([]ID, error) {
i.lk.RLock()
defer i.lk.RUnlock()

View File

@ -6,16 +6,15 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
Remove(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error
AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType) error
// move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error
MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error
FsStat(ctx context.Context, id ID) (FsStat, error)
}

View File

@ -12,9 +12,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
)
type StoragePath struct {
@ -43,7 +41,7 @@ type LocalStorage interface {
const MetaFile = "sectorstore.json"
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
var pathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache}
type Local struct {
localStorage LocalStorage
@ -120,7 +118,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
}
for _, ent := range ents {
sid, err := sectorutil.ParseSectorID(ent.Name())
sid, err := ParseSectorID(ent.Name())
if err != nil {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
}
@ -152,15 +150,15 @@ func (st *Local) open(ctx context.Context) error {
return nil
}
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
st.localLk.RLock()
var out sectorbuilder.SectorPaths
var storageIDs sectorbuilder.SectorPaths
var out SectorPaths
var storageIDs SectorPaths
for _, fileType := range pathTypes {
if fileType&existing == 0 {
@ -183,9 +181,9 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
continue
}
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
sectorutil.SetPathByType(&out, fileType, spath)
sectorutil.SetPathByType(&storageIDs, fileType, string(info.ID))
spath := filepath.Join(p.local, fileType.String(), SectorName(sid))
SetPathByType(&out, fileType, spath)
SetPathByType(&storageIDs, fileType, string(info.ID))
existing ^= fileType
break
@ -200,7 +198,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing)
if err != nil {
st.localLk.RUnlock()
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err)
}
var best string
@ -226,17 +224,17 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s
// TODO: Check free space
best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
best = filepath.Join(p.local, fileType.String(), SectorName(sid))
bestID = si.ID
}
if best == "" {
st.localLk.RUnlock()
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
}
sectorutil.SetPathByType(&out, fileType, best)
sectorutil.SetPathByType(&storageIDs, fileType, string(bestID))
SetPathByType(&out, fileType, best)
SetPathByType(&storageIDs, fileType, string(bestID))
allocate ^= fileType
}
@ -270,7 +268,7 @@ func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
return out, nil
}
func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {
if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("delete expects one file type")
}
@ -298,7 +296,7 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder
return xerrors.Errorf("dropping sector from index: %w", err)
}
spath := filepath.Join(p.local, typ.String(), sectorutil.SectorName(sid))
spath := filepath.Join(p.local, typ.String(), SectorName(sid))
log.Infof("remove %s", spath)
if err := os.RemoveAll(spath); err != nil {
@ -309,7 +307,7 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder
return nil
}
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error {
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error {
dest, destIds, sdone, err := st.AcquireSector(ctx, s, FTNone, types, false)
if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err)
@ -327,12 +325,12 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbu
continue
}
sst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(srcIds, fileType)))
sst, err := st.index.StorageInfo(ctx, ID(PathByType(srcIds, fileType)))
if err != nil {
return xerrors.Errorf("failed to get source storage info: %w", err)
}
dst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(destIds, fileType)))
dst, err := st.index.StorageInfo(ctx, ID(PathByType(destIds, fileType)))
if err != nil {
return xerrors.Errorf("failed to get source storage info: %w", err)
}
@ -349,17 +347,17 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbu
log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore)
if err := st.index.StorageDropSector(ctx, ID(sectorutil.PathByType(srcIds, fileType)), s, fileType); err != nil {
if err := st.index.StorageDropSector(ctx, ID(PathByType(srcIds, fileType)), s, fileType); err != nil {
return xerrors.Errorf("dropping source sector from index: %w", err)
}
if err := move(sectorutil.PathByType(src, fileType), sectorutil.PathByType(dest, fileType)); err != nil {
if err := move(PathByType(src, fileType), PathByType(dest, fileType)); err != nil {
// TODO: attempt some recovery (check if src is still there, re-declare)
return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err)
}
if err := st.index.StorageDeclareSector(ctx, ID(sectorutil.PathByType(destIds, fileType)), s, fileType); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(sectorutil.PathByType(destIds, fileType)), err)
if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(PathByType(destIds, fileType)), err)
}
}

View File

@ -17,11 +17,9 @@ import (
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
)
type Remote struct {
@ -42,9 +40,9 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
}
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) {
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
r.fetchLk.Lock()
@ -52,7 +50,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
}
for _, fileType := range pathTypes {
@ -60,19 +58,19 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
continue
}
if sectorutil.PathByType(paths, fileType) != "" {
if PathByType(paths, fileType) != "" {
continue
}
ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
if err != nil {
done()
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err
return SectorPaths{}, SectorPaths{}, nil, err
}
done = mergeDone(done, rdone)
sectorutil.SetPathByType(&paths, fileType, ap)
sectorutil.SetPathByType(&stores, fileType, string(storageID))
SetPathByType(&paths, fileType, ap)
SetPathByType(&stores, fileType, string(storageID))
if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
@ -88,7 +86,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
return paths, stores, done, nil
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, func(), error) {
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, false)
if err != nil {
return "", "", "", nil, err
@ -102,8 +100,8 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
if err != nil {
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
}
dest := sectorutil.PathByType(apaths, fileType)
storageID := sectorutil.PathByType(ids, fileType)
dest := PathByType(apaths, fileType)
storageID := PathByType(ids, fileType)
var merr error
for _, info := range si {
@ -176,7 +174,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
}
}
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error {
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error {
// Make sure we have the data local
_, _, ddone, err := r.AcquireSector(ctx, s, types, FTNone, false)
if err != nil {
@ -187,7 +185,7 @@ func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbu
return r.local.MoveStorage(ctx, s, types)
}
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error {
if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("delete expects one file type")
}

View File

@ -2,6 +2,7 @@ package sectorstorage
import (
"context"
"github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper"
"io"
"os"
@ -9,17 +10,15 @@ import (
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks"
"github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil"
"github.com/filecoin-project/lotus/storage/sectorstorage/stores"
)
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
var pathTypes = []stores.SectorFileType{stores.FTUnsealed, stores.FTSealed, stores.FTCache}
type WorkerConfig struct {
SealProof abi.RegisteredProof
@ -27,7 +26,7 @@ type WorkerConfig struct {
}
type LocalWorker struct {
scfg *sectorbuilder.Config
scfg *ffiwrapper.Config
storage stores.Store
localStore *stores.Local
sindex stores.SectorIndex
@ -47,7 +46,7 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
}
return &LocalWorker{
scfg: &sectorbuilder.Config{
scfg: &ffiwrapper.Config{
SealProofType: wcfg.SealProof,
PoStProofType: ppt,
},
@ -63,10 +62,10 @@ type localWorkerPathProvider struct {
w *LocalWorker
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
return stores.SectorPaths{}, nil, err
}
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
@ -79,7 +78,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
continue
}
sid := sectorutil.PathByType(storageIDs, fileType)
sid := stores.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil {
log.Errorf("declare sector error: %+v", err)
@ -88,8 +87,8 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
}, nil
}
func (l *LocalWorker) sb() (sectorbuilder.Basic, error) {
return sectorbuilder.New(&localWorkerPathProvider{w: l}, l.scfg)
func (l *LocalWorker) sb() (ffiwrapper.Basic, error) {
return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg)
}
func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
@ -156,11 +155,11 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
return xerrors.Errorf("finalizing sector: %w", err)
}
if err := l.storage.Remove(ctx, sector, sectorbuilder.FTUnsealed); err != nil {
if err := l.storage.Remove(ctx, sector, stores.FTUnsealed); err != nil {
return xerrors.Errorf("removing unsealed data: %w", err)
}
if err := l.storage.MoveStorage(ctx, sector, sectorbuilder.FTSealed|sectorbuilder.FTCache); err != nil {
if err := l.storage.MoveStorage(ctx, sector, stores.FTSealed|stores.FTCache); err != nil {
return xerrors.Errorf("moving sealed data to storage: %w", err)
}

55
zerocomm/zerocomm.go Normal file
View File

@ -0,0 +1,55 @@
package zerocomm
import (
"math/bits"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
)
const Levels = 37
const Skip = 2 // can't generate for 32, 64b
var PieceComms = [Levels - Skip][32]byte{
{0x37, 0x31, 0xbb, 0x99, 0xac, 0x68, 0x9f, 0x66, 0xee, 0xf5, 0x97, 0x3e, 0x4a, 0x94, 0xda, 0x18, 0x8f, 0x4d, 0xdc, 0xae, 0x58, 0x7, 0x24, 0xfc, 0x6f, 0x3f, 0xd6, 0xd, 0xfd, 0x48, 0x83, 0x33},
{0x64, 0x2a, 0x60, 0x7e, 0xf8, 0x86, 0xb0, 0x4, 0xbf, 0x2c, 0x19, 0x78, 0x46, 0x3a, 0xe1, 0xd4, 0x69, 0x3a, 0xc0, 0xf4, 0x10, 0xeb, 0x2d, 0x1b, 0x7a, 0x47, 0xfe, 0x20, 0x5e, 0x5e, 0x75, 0xf},
{0x57, 0xa2, 0x38, 0x1a, 0x28, 0x65, 0x2b, 0xf4, 0x7f, 0x6b, 0xef, 0x7a, 0xca, 0x67, 0x9b, 0xe4, 0xae, 0xde, 0x58, 0x71, 0xab, 0x5c, 0xf3, 0xeb, 0x2c, 0x8, 0x11, 0x44, 0x88, 0xcb, 0x85, 0x26},
{0x1f, 0x7a, 0xc9, 0x59, 0x55, 0x10, 0xe0, 0x9e, 0xa4, 0x1c, 0x46, 0xb, 0x17, 0x64, 0x30, 0xbb, 0x32, 0x2c, 0xd6, 0xfb, 0x41, 0x2e, 0xc5, 0x7c, 0xb1, 0x7d, 0x98, 0x9a, 0x43, 0x10, 0x37, 0x2f},
{0xfc, 0x7e, 0x92, 0x82, 0x96, 0xe5, 0x16, 0xfa, 0xad, 0xe9, 0x86, 0xb2, 0x8f, 0x92, 0xd4, 0x4a, 0x4f, 0x24, 0xb9, 0x35, 0x48, 0x52, 0x23, 0x37, 0x6a, 0x79, 0x90, 0x27, 0xbc, 0x18, 0xf8, 0x33},
{0x8, 0xc4, 0x7b, 0x38, 0xee, 0x13, 0xbc, 0x43, 0xf4, 0x1b, 0x91, 0x5c, 0xe, 0xed, 0x99, 0x11, 0xa2, 0x60, 0x86, 0xb3, 0xed, 0x62, 0x40, 0x1b, 0xf9, 0xd5, 0x8b, 0x8d, 0x19, 0xdf, 0xf6, 0x24},
{0xb2, 0xe4, 0x7b, 0xfb, 0x11, 0xfa, 0xcd, 0x94, 0x1f, 0x62, 0xaf, 0x5c, 0x75, 0xf, 0x3e, 0xa5, 0xcc, 0x4d, 0xf5, 0x17, 0xd5, 0xc4, 0xf1, 0x6d, 0xb2, 0xb4, 0xd7, 0x7b, 0xae, 0xc1, 0xa3, 0x2f},
{0xf9, 0x22, 0x61, 0x60, 0xc8, 0xf9, 0x27, 0xbf, 0xdc, 0xc4, 0x18, 0xcd, 0xf2, 0x3, 0x49, 0x31, 0x46, 0x0, 0x8e, 0xae, 0xfb, 0x7d, 0x2, 0x19, 0x4d, 0x5e, 0x54, 0x81, 0x89, 0x0, 0x51, 0x8},
{0x2c, 0x1a, 0x96, 0x4b, 0xb9, 0xb, 0x59, 0xeb, 0xfe, 0xf, 0x6d, 0xa2, 0x9a, 0xd6, 0x5a, 0xe3, 0xe4, 0x17, 0x72, 0x4a, 0x8f, 0x7c, 0x11, 0x74, 0x5a, 0x40, 0xca, 0xc1, 0xe5, 0xe7, 0x40, 0x11},
{0xfe, 0xe3, 0x78, 0xce, 0xf1, 0x64, 0x4, 0xb1, 0x99, 0xed, 0xe0, 0xb1, 0x3e, 0x11, 0xb6, 0x24, 0xff, 0x9d, 0x78, 0x4f, 0xbb, 0xed, 0x87, 0x8d, 0x83, 0x29, 0x7e, 0x79, 0x5e, 0x2, 0x4f, 0x2},
{0x8e, 0x9e, 0x24, 0x3, 0xfa, 0x88, 0x4c, 0xf6, 0x23, 0x7f, 0x60, 0xdf, 0x25, 0xf8, 0x3e, 0xe4, 0xd, 0xca, 0x9e, 0xd8, 0x79, 0xeb, 0x6f, 0x63, 0x52, 0xd1, 0x50, 0x84, 0xf5, 0xad, 0xd, 0x3f},
{0x75, 0x2d, 0x96, 0x93, 0xfa, 0x16, 0x75, 0x24, 0x39, 0x54, 0x76, 0xe3, 0x17, 0xa9, 0x85, 0x80, 0xf0, 0x9, 0x47, 0xaf, 0xb7, 0xa3, 0x5, 0x40, 0xd6, 0x25, 0xa9, 0x29, 0x1c, 0xc1, 0x2a, 0x7},
{0x70, 0x22, 0xf6, 0xf, 0x7e, 0xf6, 0xad, 0xfa, 0x17, 0x11, 0x7a, 0x52, 0x61, 0x9e, 0x30, 0xce, 0xa8, 0x2c, 0x68, 0x7, 0x5a, 0xdf, 0x1c, 0x66, 0x77, 0x86, 0xec, 0x50, 0x6e, 0xef, 0x2d, 0x19},
{0xd9, 0x98, 0x87, 0xb9, 0x73, 0x57, 0x3a, 0x96, 0xe1, 0x13, 0x93, 0x64, 0x52, 0x36, 0xc1, 0x7b, 0x1f, 0x4c, 0x70, 0x34, 0xd7, 0x23, 0xc7, 0xa9, 0x9f, 0x70, 0x9b, 0xb4, 0xda, 0x61, 0x16, 0x2b},
{0xd0, 0xb5, 0x30, 0xdb, 0xb0, 0xb4, 0xf2, 0x5c, 0x5d, 0x2f, 0x2a, 0x28, 0xdf, 0xee, 0x80, 0x8b, 0x53, 0x41, 0x2a, 0x2, 0x93, 0x1f, 0x18, 0xc4, 0x99, 0xf5, 0xa2, 0x54, 0x8, 0x6b, 0x13, 0x26},
{0x84, 0xc0, 0x42, 0x1b, 0xa0, 0x68, 0x5a, 0x1, 0xbf, 0x79, 0x5a, 0x23, 0x44, 0x6, 0x4f, 0xe4, 0x24, 0xbd, 0x52, 0xa9, 0xd2, 0x43, 0x77, 0xb3, 0x94, 0xff, 0x4c, 0x4b, 0x45, 0x68, 0xe8, 0x11},
{0x65, 0xf2, 0x9e, 0x5d, 0x98, 0xd2, 0x46, 0xc3, 0x8b, 0x38, 0x8c, 0xfc, 0x6, 0xdb, 0x1f, 0x6b, 0x2, 0x13, 0x3, 0xc5, 0xa2, 0x89, 0x0, 0xb, 0xdc, 0xe8, 0x32, 0xa9, 0xc3, 0xec, 0x42, 0x1c},
{0xa2, 0x24, 0x75, 0x8, 0x28, 0x58, 0x50, 0x96, 0x5b, 0x7e, 0x33, 0x4b, 0x31, 0x27, 0xb0, 0xc0, 0x42, 0xb1, 0xd0, 0x46, 0xdc, 0x54, 0x40, 0x21, 0x37, 0x62, 0x7c, 0xd8, 0x79, 0x9c, 0xe1, 0x3a},
{0xda, 0xfd, 0xab, 0x6d, 0xa9, 0x36, 0x44, 0x53, 0xc2, 0x6d, 0x33, 0x72, 0x6b, 0x9f, 0xef, 0xe3, 0x43, 0xbe, 0x8f, 0x81, 0x64, 0x9e, 0xc0, 0x9, 0xaa, 0xd3, 0xfa, 0xff, 0x50, 0x61, 0x75, 0x8},
{0xd9, 0x41, 0xd5, 0xe0, 0xd6, 0x31, 0x4a, 0x99, 0x5c, 0x33, 0xff, 0xbd, 0x4f, 0xbe, 0x69, 0x11, 0x8d, 0x73, 0xd4, 0xe5, 0xfd, 0x2c, 0xd3, 0x1f, 0xf, 0x7c, 0x86, 0xeb, 0xdd, 0x14, 0xe7, 0x6},
{0x51, 0x4c, 0x43, 0x5c, 0x3d, 0x4, 0xd3, 0x49, 0xa5, 0x36, 0x5f, 0xbd, 0x59, 0xff, 0xc7, 0x13, 0x62, 0x91, 0x11, 0x78, 0x59, 0x91, 0xc1, 0xa3, 0xc5, 0x3a, 0xf2, 0x20, 0x79, 0x74, 0x1a, 0x2f},
{0xad, 0x6, 0x85, 0x39, 0x69, 0xd3, 0x7d, 0x34, 0xff, 0x8, 0xe0, 0x9f, 0x56, 0x93, 0xa, 0x4a, 0xd1, 0x9a, 0x89, 0xde, 0xf6, 0xc, 0xbf, 0xee, 0x7e, 0x1d, 0x33, 0x81, 0xc1, 0xe7, 0x1c, 0x37},
{0x39, 0x56, 0xe, 0x7b, 0x13, 0xa9, 0x3b, 0x7, 0xa2, 0x43, 0xfd, 0x27, 0x20, 0xff, 0xa7, 0xcb, 0x3e, 0x1d, 0x2e, 0x50, 0x5a, 0xb3, 0x62, 0x9e, 0x79, 0xf4, 0x63, 0x13, 0x51, 0x2c, 0xda, 0x6},
{0xcc, 0xc3, 0xc0, 0x12, 0xf5, 0xb0, 0x5e, 0x81, 0x1a, 0x2b, 0xbf, 0xdd, 0xf, 0x68, 0x33, 0xb8, 0x42, 0x75, 0xb4, 0x7b, 0xf2, 0x29, 0xc0, 0x5, 0x2a, 0x82, 0x48, 0x4f, 0x3c, 0x1a, 0x5b, 0x3d},
{0x7d, 0xf2, 0x9b, 0x69, 0x77, 0x31, 0x99, 0xe8, 0xf2, 0xb4, 0xb, 0x77, 0x91, 0x9d, 0x4, 0x85, 0x9, 0xee, 0xd7, 0x68, 0xe2, 0xc7, 0x29, 0x7b, 0x1f, 0x14, 0x37, 0x3, 0x4f, 0xc3, 0xc6, 0x2c},
{0x66, 0xce, 0x5, 0xa3, 0x66, 0x75, 0x52, 0xcf, 0x45, 0xc0, 0x2b, 0xcc, 0x4e, 0x83, 0x92, 0x91, 0x9b, 0xde, 0xac, 0x35, 0xde, 0x2f, 0xf5, 0x62, 0x71, 0x84, 0x8e, 0x9f, 0x7b, 0x67, 0x51, 0x7},
{0xd8, 0x61, 0x2, 0x18, 0x42, 0x5a, 0xb5, 0xe9, 0x5b, 0x1c, 0xa6, 0x23, 0x9d, 0x29, 0xa2, 0xe4, 0x20, 0xd7, 0x6, 0xa9, 0x6f, 0x37, 0x3e, 0x2f, 0x9c, 0x9a, 0x91, 0xd7, 0x59, 0xd1, 0x9b, 0x1},
{0x6d, 0x36, 0x4b, 0x1e, 0xf8, 0x46, 0x44, 0x1a, 0x5a, 0x4a, 0x68, 0x86, 0x23, 0x14, 0xac, 0xc0, 0xa4, 0x6f, 0x1, 0x67, 0x17, 0xe5, 0x34, 0x43, 0xe8, 0x39, 0xee, 0xdf, 0x83, 0xc2, 0x85, 0x3c},
{0x7, 0x7e, 0x5f, 0xde, 0x35, 0xc5, 0xa, 0x93, 0x3, 0xa5, 0x50, 0x9, 0xe3, 0x49, 0x8a, 0x4e, 0xbe, 0xdf, 0xf3, 0x9c, 0x42, 0xb7, 0x10, 0xb7, 0x30, 0xd8, 0xec, 0x7a, 0xc7, 0xaf, 0xa6, 0x3e},
{0xe6, 0x40, 0x5, 0xa6, 0xbf, 0xe3, 0x77, 0x79, 0x53, 0xb8, 0xad, 0x6e, 0xf9, 0x3f, 0xf, 0xca, 0x10, 0x49, 0xb2, 0x4, 0x16, 0x54, 0xf2, 0xa4, 0x11, 0xf7, 0x70, 0x27, 0x99, 0xce, 0xce, 0x2},
{0x25, 0x9d, 0x3d, 0x6b, 0x1f, 0x4d, 0x87, 0x6d, 0x11, 0x85, 0xe1, 0x12, 0x3a, 0xf6, 0xf5, 0x50, 0x1a, 0xf0, 0xf6, 0x7c, 0xf1, 0x5b, 0x52, 0x16, 0x25, 0x5b, 0x7b, 0x17, 0x8d, 0x12, 0x5, 0x1d},
{0x3f, 0x9a, 0x4d, 0x41, 0x1d, 0xa4, 0xef, 0x1b, 0x36, 0xf3, 0x5f, 0xf0, 0xa1, 0x95, 0xae, 0x39, 0x2a, 0xb2, 0x3f, 0xee, 0x79, 0x67, 0xb7, 0xc4, 0x1b, 0x3, 0xd1, 0x61, 0x3f, 0xc2, 0x92, 0x39},
{0xfe, 0x4e, 0xf3, 0x28, 0xc6, 0x1a, 0xa3, 0x9c, 0xfd, 0xb2, 0x48, 0x4e, 0xaa, 0x32, 0xa1, 0x51, 0xb1, 0xfe, 0x3d, 0xfd, 0x1f, 0x96, 0xdd, 0x8c, 0x97, 0x11, 0xfd, 0x86, 0xd6, 0xc5, 0x81, 0x13},
{0xf5, 0x5d, 0x68, 0x90, 0xe, 0x2d, 0x83, 0x81, 0xec, 0xcb, 0x81, 0x64, 0xcb, 0x99, 0x76, 0xf2, 0x4b, 0x2d, 0xe0, 0xdd, 0x61, 0xa3, 0x1b, 0x97, 0xce, 0x6e, 0xb2, 0x38, 0x50, 0xd5, 0xe8, 0x19},
{0xaa, 0xaa, 0x8c, 0x4c, 0xb4, 0xa, 0xac, 0xee, 0x1e, 0x2, 0xdc, 0x65, 0x42, 0x4b, 0x2a, 0x6c, 0x8e, 0x99, 0xf8, 0x3, 0xb7, 0x2f, 0x79, 0x29, 0xc4, 0x10, 0x1d, 0x7f, 0xae, 0x6b, 0xff, 0x32},
}
func ZeroPieceCommitment(sz abi.UnpaddedPieceSize) cid.Cid {
level := bits.TrailingZeros64(uint64(sz.Padded())) - Skip - 5 // 2^5 = 32
return commcid.PieceCommitmentV1ToCID(PieceComms[level][:])
}

115
zerocomm/zerocomm_test.go Normal file
View File

@ -0,0 +1,115 @@
package zerocomm_test
import (
"bytes"
"fmt"
"github.com/filecoin-project/lotus/storage/sectorstorage/zerocomm"
"io"
"testing"
commcid "github.com/filecoin-project/go-fil-commcid"
abi "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper"
)
func TestComms(t *testing.T) {
t.Skip("don't have enough ram") // no, but seriously, currently this needs like 3tb of /tmp
var expPieceComms [zerocomm.Levels - zerocomm.Skip]cid.Cid
{
l2, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, bytes.NewReader(make([]byte, 127)), 127)
if err != nil {
t.Fatal(err)
}
expPieceComms[0] = l2
}
for i := 1; i < zerocomm.Levels-2; i++ {
var err error
sz := abi.UnpaddedPieceSize(127 << uint(i))
fmt.Println(i, sz)
r := io.LimitReader(&NullReader{}, int64(sz))
expPieceComms[i], err = ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, r, sz)
if err != nil {
t.Fatal(err)
}
}
for i, comm := range expPieceComms {
c, err := commcid.CIDToPieceCommitmentV1(comm)
if err != nil {
t.Fatal(err)
}
if string(c) != string(zerocomm.PieceComms[i][:]) {
t.Errorf("zero commitment %d didn't match", i)
}
}
for _, comm := range expPieceComms { // Could do codegen, but this is good enough
fmt.Printf("%#v,\n", comm)
}
}
func TestCommsSmall(t *testing.T) {
var expPieceComms [8]cid.Cid
lvls := len(expPieceComms) + zerocomm.Skip
{
l2, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, bytes.NewReader(make([]byte, 127)), 127)
if err != nil {
t.Fatal(err)
}
expPieceComms[0] = l2
}
for i := 1; i < lvls-2; i++ {
var err error
sz := abi.UnpaddedPieceSize(127 << uint(i))
fmt.Println(i, sz)
r := io.LimitReader(&NullReader{}, int64(sz))
expPieceComms[i], err = ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, r, sz)
if err != nil {
t.Fatal(err)
}
}
for i, comm := range expPieceComms {
c, err := commcid.CIDToPieceCommitmentV1(comm)
if err != nil {
t.Fatal(err)
}
if string(c) != string(zerocomm.PieceComms[i][:]) {
t.Errorf("zero commitment %d didn't match", i)
}
}
for _, comm := range expPieceComms { // Could do codegen, but this is good enough
fmt.Printf("%#v,\n", comm)
}
}
func TestForSise(t *testing.T) {
exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, bytes.NewReader(make([]byte, 1016)), 1016)
if err != nil {
return
}
actual := zerocomm.ZeroPieceCommitment(1016)
if !exp.Equals(actual) {
t.Errorf("zero commitment didn't match")
}
}
type NullReader struct{}
func (NullReader) Read(out []byte) (int, error) {
for i := range out {
out[i] = 0
}
return len(out), nil
}