diff --git a/ffiwrapper/basicfs/fs.go b/ffiwrapper/basicfs/fs.go new file mode 100644 index 000000000..cd85a1a0b --- /dev/null +++ b/ffiwrapper/basicfs/fs.go @@ -0,0 +1,70 @@ +package basicfs + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" +) + +type sectorFile struct { + abi.SectorID + stores.SectorFileType +} + +type Provider struct { + Root string + + lk sync.Mutex + waitSector map[sectorFile]chan struct{} +} + +func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { + os.Mkdir(filepath.Join(b.Root, stores.FTUnsealed.String()), 0755) + os.Mkdir(filepath.Join(b.Root, stores.FTSealed.String()), 0755) + os.Mkdir(filepath.Join(b.Root, stores.FTCache.String()), 0755) + + done := func() {} + + for i := 0; i < 3; i++ { + if (existing|allocate)&(1< MaxFallbackPostChallengeCount { + return MaxFallbackPostChallengeCount + } + return challengeCount +} + +func (sb *SectorBuilder) Stop() { + close(sb.stopping) +} + +func (sb *SectorBuilder) SectorSize() abi.SectorSize { + return sb.ssize +} + +func (sb *SectorBuilder) SealProofType() abi.RegisteredProof { + return sb.sealProofType +} + +func (sb *SectorBuilder) PoStProofType() abi.RegisteredProof { + return sb.postProofType +} diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go new file mode 100644 index 000000000..cc871c835 --- /dev/null +++ b/ffiwrapper/sealer_cgo.go @@ -0,0 +1,400 @@ +//+build cgo + +package ffiwrapper + +import ( + "context" + "io" + "math/bits" + "os" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" + "github.com/filecoin-project/lotus/storage/sectorstorage/zerocomm" +) + +var _ Basic = &SectorBuilder{} + +func New(sectors SectorProvider, cfg *Config) (*SectorBuilder, error) { + sectorSize, err := sizeFromConfig(*cfg) + if err != nil { + return nil, err + } + + sb := &SectorBuilder{ + sealProofType: cfg.SealProofType, + postProofType: cfg.PoStProofType, + ssize: sectorSize, + + sectors: sectors, + + stopping: make(chan struct{}), + } + + return sb, nil +} + +func (sb *SectorBuilder) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, faults []abi.SectorNumber) (ffi.SortedPrivateSectorInfo, error) { + fmap := map[abi.SectorNumber]struct{}{} + for _, fault := range faults { + fmap[fault] = struct{}{} + } + + var out []ffi.PrivateSectorInfo + for _, s := range sectorInfo { + if _, faulty := fmap[s.SectorNumber]; faulty { + continue + } + + paths, done, err := sb.sectors.AcquireSector(ctx, abi.SectorID{Miner: mid, Number: s.SectorNumber}, stores.FTCache|stores.FTSealed, 0, false) + if err != nil { + return ffi.SortedPrivateSectorInfo{}, xerrors.Errorf("acquire sector paths: %w", err) + } + done() // TODO: This is a tiny bit suboptimal + + postProofType, err := s.RegisteredProof.RegisteredPoStProof() + if err != nil { + return ffi.SortedPrivateSectorInfo{}, xerrors.Errorf("acquiring registered PoSt proof from sector info %+v: %w", s, err) + } + + out = append(out, ffi.PrivateSectorInfo{ + CacheDirPath: paths.Cache, + PoStProofType: postProofType, + SealedSectorPath: paths.Sealed, + SectorInfo: s, + }) + } + + return ffi.NewSortedPrivateSectorInfo(out...), nil +} + +func (sb *SectorBuilder) NewSector(ctx context.Context, sector abi.SectorID) error { + // TODO: Allocate the sector here instead of in addpiece + + return nil +} + +func (sb *SectorBuilder) AddPiece(ctx context.Context, sector abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) { + f, werr, err := toReadableFile(file, int64(pieceSize)) + if err != nil { + return abi.PieceInfo{}, err + } + + var done func() + var stagedFile *os.File + + defer func() { + if done != nil { + done() + } + + if stagedFile != nil { + if err := stagedFile.Close(); err != nil { + log.Errorf("closing staged file: %+v", err) + } + } + }() + + var stagedPath stores.SectorPaths + if len(existingPieceSizes) == 0 { + stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, true) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) + } + + stagedFile, err = os.Create(stagedPath.Unsealed) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err) + } + } else { + stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, true) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) + } + + stagedFile, err = os.OpenFile(stagedPath.Unsealed, os.O_RDWR, 0644) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("opening sector file: %w", err) + } + + if _, err := stagedFile.Seek(0, io.SeekEnd); err != nil { + return abi.PieceInfo{}, xerrors.Errorf("seek end: %w", err) + } + } + + _, _, pieceCID, err := ffi.WriteWithAlignment(sb.sealProofType, f, pieceSize, stagedFile, existingPieceSizes) + if err != nil { + return abi.PieceInfo{}, err + } + + if err := f.Close(); err != nil { + return abi.PieceInfo{}, err + } + + return abi.PieceInfo{ + Size: pieceSize.Padded(), + PieceCID: pieceCID, + }, werr() +} + +func (sb *SectorBuilder) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) { + path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTUnsealed, false) + if err != nil { + return nil, xerrors.Errorf("acquire unsealed sector path: %w", err) + } + defer doneUnsealed() + f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644) + if err == nil { + if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { + return nil, xerrors.Errorf("seek: %w", err) + } + + lr := io.LimitReader(f, int64(size)) + + return &struct { + io.Reader + io.Closer + }{ + Reader: lr, + Closer: f, + }, nil + } + if !os.IsNotExist(err) { + return nil, err + } + + sealed, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed|stores.FTCache, 0, false) + if err != nil { + return nil, xerrors.Errorf("acquire sealed/cache sector path: %w", err) + } + defer doneSealed() + + // TODO: GC for those + // (Probably configurable count of sectors to be kept unsealed, and just + // remove last used one (or use whatever other cache policy makes sense)) + err = ffi.Unseal( + sb.sealProofType, + sealed.Cache, + sealed.Sealed, + path.Unsealed, + sector.Number, + sector.Miner, + ticket, + unsealedCID, + ) + if err != nil { + return nil, xerrors.Errorf("unseal failed: %w", err) + } + + f, err = os.OpenFile(string(path.Unsealed), os.O_RDONLY, 0644) + if err != nil { + return nil, err + } + + if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { + return nil, xerrors.Errorf("seek: %w", err) + } + + lr := io.LimitReader(f, int64(size)) + + return &struct { + io.Reader + io.Closer + }{ + Reader: lr, + Closer: f, + }, nil +} + +func (sb *SectorBuilder) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { + paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, true) + if err != nil { + return nil, xerrors.Errorf("acquiring sector paths: %w", err) + } + defer done() + + e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, xerrors.Errorf("ensuring sealed file exists: %w", err) + } + if err := e.Close(); err != nil { + return nil, err + } + + if err := os.Mkdir(paths.Cache, 0755); err != nil { + if os.IsExist(err) { + log.Warnf("existing cache in %s; removing", paths.Cache) + + if err := os.RemoveAll(paths.Cache); err != nil { + return nil, xerrors.Errorf("remove existing sector cache from %s (sector %d): %w", paths.Cache, sector, err) + } + + if err := os.Mkdir(paths.Cache, 0755); err != nil { + return nil, xerrors.Errorf("mkdir cache path after cleanup: %w", err) + } + } else { + return nil, err + } + } + + var sum abi.UnpaddedPieceSize + for _, piece := range pieces { + sum += piece.Size.Unpadded() + } + ussize := abi.PaddedPieceSize(sb.ssize).Unpadded() + if sum != ussize { + return nil, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum)) + } + + // TODO: context cancellation respect + p1o, err := ffi.SealPreCommitPhase1( + sb.sealProofType, + paths.Cache, + paths.Unsealed, + paths.Sealed, + sector.Number, + sector.Miner, + ticket, + pieces, + ) + if err != nil { + return nil, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err) + } + return p1o, nil +} + +func (sb *SectorBuilder) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) { + paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true) + if err != nil { + return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err) + } + defer done() + + sealedCID, unsealedCID, err := ffi.SealPreCommitPhase2(phase1Out, paths.Cache, paths.Sealed) + if err != nil { + return storage.SectorCids{}, xerrors.Errorf("presealing sector %d (%s): %w", sector.Number, paths.Unsealed, err) + } + + return storage.SectorCids{ + Unsealed: unsealedCID, + Sealed: sealedCID, + }, nil +} + +func (sb *SectorBuilder) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { + paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true) + if err != nil { + return nil, xerrors.Errorf("acquire sector paths: %w", err) + } + defer done() + output, err := ffi.SealCommitPhase1( + sb.sealProofType, + cids.Sealed, + cids.Unsealed, + paths.Cache, + paths.Sealed, + sector.Number, + sector.Miner, + ticket, + seed, + pieces, + ) + if err != nil { + log.Warn("StandaloneSealCommit error: ", err) + log.Warnf("num:%d tkt:%v seed:%v, pi:%v sealedCID:%v, unsealedCID:%v", sector.Number, ticket, seed, pieces, cids.Sealed, cids.Unsealed) + + return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) + } + return output, nil +} + +func (sb *SectorBuilder) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (storage.Proof, error) { + return ffi.SealCommitPhase2(phase1Out, sector.Number, sector.Miner) +} + +func (sb *SectorBuilder) GenerateFallbackPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) (storage.FallbackPostOut, error) { + privsectors, err := sb.pubSectorToPriv(ctx, miner, sectorInfo, faults) + if err != nil { + return storage.FallbackPostOut{}, err + } + + challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults))) + challengeSeed[31] = 0 + + candidates, err := ffi.GenerateCandidates(miner, challengeSeed, challengeCount, privsectors) + if err != nil { + return storage.FallbackPostOut{}, err + } + + winners := make([]abi.PoStCandidate, len(candidates)) + for idx := range winners { + winners[idx] = candidates[idx].Candidate + } + + proof, err := ffi.GeneratePoSt(miner, privsectors, challengeSeed, winners) + return storage.FallbackPostOut{ + PoStInputs: ffiToStorageCandidates(candidates), + Proof: proof, + }, err +} + +func (sb *SectorBuilder) FinalizeSector(ctx context.Context, sector abi.SectorID) error { + paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, false) + if err != nil { + return xerrors.Errorf("acquiring sector cache path: %w", err) + } + defer done() + + return ffi.ClearCache(paths.Cache) +} + +func GeneratePieceCIDFromFile(proofType abi.RegisteredProof, piece io.Reader, pieceSize abi.UnpaddedPieceSize) (cid.Cid, error) { + f, werr, err := toReadableFile(piece, int64(pieceSize)) + if err != nil { + return cid.Undef, err + } + + pieceCID, err := ffi.GeneratePieceCIDFromFile(proofType, f, pieceSize) + if err != nil { + return cid.Undef, err + } + + return pieceCID, werr() +} + +func GenerateUnsealedCID(proofType abi.RegisteredProof, pieces []abi.PieceInfo) (cid.Cid, error) { + var sum abi.PaddedPieceSize + for _, p := range pieces { + sum += p.Size + } + + ssize, err := SectorSizeForRegisteredProof(proofType) + if err != nil { + return cid.Undef, err + } + + { + // pad remaining space with 0 CommPs + toFill := uint64(abi.PaddedPieceSize(ssize) - sum) + n := bits.OnesCount64(toFill) + for i := 0; i < n; i++ { + next := bits.TrailingZeros64(toFill) + psize := uint64(1) << uint(next) + toFill ^= psize + + unpadded := abi.PaddedPieceSize(psize).Unpadded() + pieces = append(pieces, abi.PieceInfo{ + Size: unpadded.Padded(), + PieceCID: zerocomm.ZeroPieceCommitment(unpadded), + }) + } + } + + return ffi.GenerateUnsealedCID(proofType, pieces) +} diff --git a/ffiwrapper/sealer_test.go b/ffiwrapper/sealer_test.go new file mode 100644 index 000000000..78d4a4898 --- /dev/null +++ b/ffiwrapper/sealer_test.go @@ -0,0 +1,356 @@ +package ffiwrapper + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "runtime" + "sync" + "testing" + "time" + + logging "github.com/ipfs/go-log" + "golang.org/x/xerrors" + + paramfetch "github.com/filecoin-project/go-paramfetch" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper/basicfs" +) + +func init() { + logging.SetLogLevel("*", "INFO") //nolint: errcheck +} + +var sectorSize = abi.SectorSize(2048) +var sealProofType = abi.RegisteredProof_StackedDRG2KiBSeal +var postProofType = abi.RegisteredProof_StackedDRG2KiBPoSt + +type seal struct { + id abi.SectorID + cids storage.SectorCids + pi abi.PieceInfo + ticket abi.SealRandomness +} + +func (s *seal) precommit(t *testing.T, sb *SectorBuilder, id abi.SectorID, done func()) { + defer done() + dlen := abi.PaddedPieceSize(sectorSize).Unpadded() + + var err error + r := io.LimitReader(rand.New(rand.NewSource(42+int64(id.Number))), int64(dlen)) + s.pi, err = sb.AddPiece(context.TODO(), id, []abi.UnpaddedPieceSize{}, dlen, r) + if err != nil { + t.Fatalf("%+v", err) + } + + s.ticket = abi.SealRandomness{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2} + + p1, err := sb.SealPreCommit1(context.TODO(), id, s.ticket, []abi.PieceInfo{s.pi}) + if err != nil { + t.Fatalf("%+v", err) + } + cids, err := sb.SealPreCommit2(context.TODO(), id, p1) + if err != nil { + t.Fatalf("%+v", err) + } + s.cids = cids +} + +func (s *seal) commit(t *testing.T, sb *SectorBuilder, done func()) { + defer done() + seed := abi.InteractiveSealRandomness{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9} + + pc1, err := sb.SealCommit1(context.TODO(), s.id, s.ticket, seed, []abi.PieceInfo{s.pi}, s.cids) + if err != nil { + t.Fatalf("%+v", err) + } + proof, err := sb.SealCommit2(context.TODO(), s.id, pc1) + if err != nil { + t.Fatalf("%+v", err) + } + + ok, err := ProofVerifier.VerifySeal(abi.SealVerifyInfo{ + SectorID: s.id, + OnChain: abi.OnChainSealVerifyInfo{ + SealedCID: s.cids.Sealed, + RegisteredProof: sealProofType, + Proof: proof, + SectorNumber: s.id.Number, + }, + Randomness: s.ticket, + InteractiveRandomness: seed, + UnsealedCID: s.cids.Unsealed, + }) + if err != nil { + t.Fatalf("%+v", err) + } + + if !ok { + t.Fatal("proof failed to validate") + } +} + +func post(t *testing.T, sb *SectorBuilder, seals ...seal) time.Time { + randomness := abi.PoStRandomness{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 7} + + sis := make([]abi.SectorInfo, len(seals)) + for i, s := range seals { + sis[i] = abi.SectorInfo{ + RegisteredProof: sealProofType, + SectorNumber: s.id.Number, + SealedCID: s.cids.Sealed, + } + } + + candidates, err := sb.GenerateEPostCandidates(context.TODO(), seals[0].id.Miner, sis, randomness, []abi.SectorNumber{}) + if err != nil { + t.Fatalf("%+v", err) + } + + genCandidates := time.Now() + + if len(candidates) != 1 { + t.Fatal("expected 1 candidate") + } + + candidatesPrime := make([]abi.PoStCandidate, len(candidates)) + for idx := range candidatesPrime { + candidatesPrime[idx] = candidates[idx].Candidate + } + + proofs, err := sb.ComputeElectionPoSt(context.TODO(), seals[0].id.Miner, sis, randomness, candidatesPrime) + if err != nil { + t.Fatalf("%+v", err) + } + + ePoStChallengeCount := ElectionPostChallengeCount(uint64(len(sis)), 0) + + ok, err := ProofVerifier.VerifyElectionPost(context.TODO(), abi.PoStVerifyInfo{ + Randomness: randomness, + Candidates: candidatesPrime, + Proofs: proofs, + EligibleSectors: sis, + Prover: seals[0].id.Miner, + ChallengeCount: ePoStChallengeCount, + }) + if err != nil { + t.Fatalf("%+v", err) + } + if !ok { + t.Fatal("bad post") + } + + return genCandidates +} + +func getGrothParamFileAndVerifyingKeys(s abi.SectorSize) { + dat, err := ioutil.ReadFile("./parameters.json") + if err != nil { + panic(xerrors.Errorf("failed to read contents of ./parameters.json: %w", err)) + } + + err = paramfetch.GetParams(dat, uint64(s)) + if err != nil { + panic(xerrors.Errorf("failed to acquire Groth parameters for 2KiB sectors: %w", err)) + } +} + +// TestDownloadParams exists only so that developers and CI can pre-download +// Groth parameters and verifying keys before running the tests which rely on +// those parameters and keys. To do this, run the following command: +// +// go test -run=^TestDownloadParams +// +func TestDownloadParams(t *testing.T) { + getGrothParamFileAndVerifyingKeys(sectorSize) +} + +func TestSealAndVerify(t *testing.T) { + if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware + t.Skip("this is slow") + } + _ = os.Setenv("RUST_LOG", "info") + + getGrothParamFileAndVerifyingKeys(sectorSize) + + cdir, err := ioutil.TempDir("", "sbtest-c-") + if err != nil { + t.Fatal(err) + } + miner := abi.ActorID(123) + + cfg := &Config{ + SealProofType: sealProofType, + PoStProofType: postProofType, + } + + sp := &basicfs.Provider{ + Root: cdir, + } + sb, err := New(sp, cfg) + if err != nil { + t.Fatalf("%+v", err) + } + cleanup := func() { + if t.Failed() { + fmt.Printf("not removing %s\n", cdir) + return + } + if err := os.RemoveAll(cdir); err != nil { + t.Error(err) + } + } + defer cleanup() + + si := abi.SectorID{Miner: miner, Number: 1} + + s := seal{id: si} + + start := time.Now() + + s.precommit(t, sb, si, func() {}) + + precommit := time.Now() + + s.commit(t, sb, func() {}) + + commit := time.Now() + + genCandidiates := post(t, sb, s) + + epost := time.Now() + + post(t, sb, s) + + if err := sb.FinalizeSector(context.TODO(), si); err != nil { + t.Fatalf("%+v", err) + } + + fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String()) + fmt.Printf("Commit: %s\n", commit.Sub(precommit).String()) + fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String()) + fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String()) +} + +func TestSealPoStNoCommit(t *testing.T) { + if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware + t.Skip("this is slow") + } + _ = os.Setenv("RUST_LOG", "info") + + getGrothParamFileAndVerifyingKeys(sectorSize) + + dir, err := ioutil.TempDir("", "sbtest") + if err != nil { + t.Fatal(err) + } + + miner := abi.ActorID(123) + + cfg := &Config{ + SealProofType: sealProofType, + PoStProofType: postProofType, + } + sp := &basicfs.Provider{ + Root: dir, + } + sb, err := New(sp, cfg) + if err != nil { + t.Fatalf("%+v", err) + } + + cleanup := func() { + if t.Failed() { + fmt.Printf("not removing %s\n", dir) + return + } + if err := os.RemoveAll(dir); err != nil { + t.Error(err) + } + } + defer cleanup() + + si := abi.SectorID{Miner: miner, Number: 1} + + s := seal{id: si} + + start := time.Now() + + s.precommit(t, sb, si, func() {}) + + precommit := time.Now() + + if err := sb.FinalizeSector(context.TODO(), si); err != nil { + t.Fatal(err) + } + + genCandidiates := post(t, sb, s) + + epost := time.Now() + + fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String()) + fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(precommit).String()) + fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String()) +} + +func TestSealAndVerify2(t *testing.T) { + if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware + t.Skip("this is slow") + } + _ = os.Setenv("RUST_LOG", "trace") + + getGrothParamFileAndVerifyingKeys(sectorSize) + + dir, err := ioutil.TempDir("", "sbtest") + if err != nil { + t.Fatal(err) + } + + miner := abi.ActorID(123) + + cfg := &Config{ + SealProofType: sealProofType, + PoStProofType: postProofType, + } + sp := &basicfs.Provider{ + Root: dir, + } + sb, err := New(sp, cfg) + if err != nil { + t.Fatalf("%+v", err) + } + + cleanup := func() { + if err := os.RemoveAll(dir); err != nil { + t.Error(err) + } + } + + defer cleanup() + + var wg sync.WaitGroup + + si1 := abi.SectorID{Miner: miner, Number: 1} + si2 := abi.SectorID{Miner: miner, Number: 2} + + s1 := seal{id: si1} + s2 := seal{id: si2} + + wg.Add(2) + go s1.precommit(t, sb, si1, wg.Done) //nolint: staticcheck + time.Sleep(100 * time.Millisecond) + go s2.precommit(t, sb, si2, wg.Done) //nolint: staticcheck + wg.Wait() + + wg.Add(2) + go s1.commit(t, sb, wg.Done) //nolint: staticcheck + go s2.commit(t, sb, wg.Done) //nolint: staticcheck + wg.Wait() + + post(t, sb, s1, s2) +} diff --git a/ffiwrapper/types.go b/ffiwrapper/types.go new file mode 100644 index 000000000..ea113fbbb --- /dev/null +++ b/ffiwrapper/types.go @@ -0,0 +1,49 @@ +package ffiwrapper + +import ( + "context" + "errors" + "github.com/ipfs/go-cid" + "io" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper/basicfs" + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" +) + +type UnpaddedByteIndex uint64 + +type Validator interface { + CanCommit(sector stores.SectorPaths) (bool, error) + CanProve(sector stores.SectorPaths) (bool, error) +} + +type Sealer interface { + storage.Sealer + storage.Storage +} + +type Basic interface { + storage.Prover + Sealer + + ReadPieceFromSealedSector(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) +} + +type Verifier interface { + VerifySeal(abi.SealVerifyInfo) (bool, error) + VerifyElectionPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) + VerifyFallbackPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) +} + +var ErrSectorNotFound = errors.New("sector not found") + +type SectorProvider interface { + // * returns ErrSectorNotFound if a requested existing sector doesn't exist + // * returns an error when allocate is set, and existing isn't, and the sector exists + AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) +} + +var _ SectorProvider = &basicfs.Provider{} diff --git a/ffiwrapper/varifier_cgo.go b/ffiwrapper/varifier_cgo.go new file mode 100644 index 000000000..6c01470ce --- /dev/null +++ b/ffiwrapper/varifier_cgo.go @@ -0,0 +1,80 @@ +//+build cgo + +package ffiwrapper + +import ( + "context" + + "go.opencensus.io/trace" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" +) + +func (sb *SectorBuilder) ComputeElectionPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) { + challengeSeed[31] = 0 + + privsects, err := sb.pubSectorToPriv(ctx, miner, sectorInfo, nil) // TODO: faults + if err != nil { + return nil, err + } + + return ffi.GeneratePoSt(miner, privsects, challengeSeed, winners) +} + +func (sb *SectorBuilder) GenerateEPostCandidates(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) { + privsectors, err := sb.pubSectorToPriv(ctx, miner, sectorInfo, faults) + if err != nil { + return nil, err + } + + challengeSeed[31] = 0 + + challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults))) + pc, err := ffi.GenerateCandidates(miner, challengeSeed, challengeCount, privsectors) + if err != nil { + return nil, err + } + + return ffiToStorageCandidates(pc), nil +} + +func ffiToStorageCandidates(pc []ffi.PoStCandidateWithTicket) []storage.PoStCandidateWithTicket { + out := make([]storage.PoStCandidateWithTicket, len(pc)) + for i := range out { + out[i] = storage.PoStCandidateWithTicket{ + Candidate: pc[i].Candidate, + Ticket: pc[i].Ticket, + } + } + + return out +} + +var _ Verifier = ProofVerifier + +type proofVerifier struct{} + +var ProofVerifier = proofVerifier{} + +func (proofVerifier) VerifySeal(info abi.SealVerifyInfo) (bool, error) { + return ffi.VerifySeal(info) +} + +func (proofVerifier) VerifyElectionPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) { + return verifyPost(ctx, info) +} + +func (proofVerifier) VerifyFallbackPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) { + return verifyPost(ctx, info) +} + +func verifyPost(ctx context.Context, info abi.PoStVerifyInfo) (bool, error) { + _, span := trace.StartSpan(ctx, "VerifyPoSt") + defer span.End() + + info.Randomness[31] = 0 + + return ffi.VerifyPoSt(info) +} diff --git a/manager.go b/manager.go index 6f4bd58e2..43438385f 100644 --- a/manager.go +++ b/manager.go @@ -4,6 +4,7 @@ import ( "container/list" "context" "errors" + "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper" "io" "net/http" "sync" @@ -13,7 +14,6 @@ import ( "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" @@ -30,7 +30,7 @@ var ErrNoWorkers = errors.New("no suitable workers found") type URLs []string type Worker interface { - sectorbuilder.Sealer + ffiwrapper.Sealer TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) @@ -45,16 +45,16 @@ type Worker interface { type SectorManager interface { SectorSize() abi.SectorSize - ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) + ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) - sectorbuilder.Sealer + ffiwrapper.Sealer storage.Prover } type WorkerID uint64 type Manager struct { - scfg *sectorbuilder.Config + scfg *ffiwrapper.Config ls stores.LocalStorage storage *stores.Remote @@ -76,13 +76,13 @@ type Manager struct { schedQueue *list.List // List[*workerRequest] } -func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, sc config.Storage, urls URLs, ca api.Common) (*Manager, error) { +func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc config.Storage, urls URLs, ca api.Common) (*Manager, error) { lstor, err := stores.NewLocal(ctx, ls, si, urls) if err != nil { return nil, err } - prover, err := sectorbuilder.New(&readonlyProvider{stor: lstor}, cfg) + prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg) if err != nil { return nil, xerrors.Errorf("creating prover instance: %w", err) } @@ -180,7 +180,7 @@ func (m *Manager) SectorSize() abi.SectorSize { return sz } -func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) { +func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwrapper.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) { panic("implement me") } @@ -273,9 +273,9 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie var best []stores.StorageInfo var err error if len(existingPieces) == 0 { // new - best, err = m.index.StorageBestAlloc(ctx, sectorbuilder.FTUnsealed, true) + best, err = m.index.StorageBestAlloc(ctx, stores.FTUnsealed, true) } else { // append to existing - best, err = m.index.StorageFindSector(ctx, sector, sectorbuilder.FTUnsealed, false) + best, err = m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false) } if err != nil { return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err) @@ -302,7 +302,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { // TODO: also consider where the unsealed data sits - best, err := m.index.StorageBestAlloc(ctx, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) + best, err := m.index.StorageBestAlloc(ctx, stores.FTCache|stores.FTSealed, true) if err != nil { return nil, xerrors.Errorf("finding path for sector sealing: %w", err) } @@ -326,7 +326,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) { // TODO: allow workers to fetch the sectors - best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) + best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed, true) if err != nil { return storage.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err) } @@ -348,7 +348,7 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase } func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) { - best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed, true) + best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed, true) if err != nil { return nil, xerrors.Errorf("finding path for sector sealing: %w", err) } @@ -400,7 +400,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou } func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error { - best, err := m.index.StorageFindSector(ctx, sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed, true) + best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, true) if err != nil { return xerrors.Errorf("finding sealed sector: %w", err) } diff --git a/mock/mock.go b/mock/mock.go index ca50ed189..e86d2b3e7 100644 --- a/mock/mock.go +++ b/mock/mock.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "github.com/filecoin-project/lotus/storage/sectorstorage" + "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper" "io" "io/ioutil" "math/big" @@ -12,7 +13,6 @@ import ( "sync" commcid "github.com/filecoin-project/go-fil-commcid" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-storage/storage" "github.com/ipfs/go-cid" @@ -93,7 +93,7 @@ func (sb *SectorMgr) AddPiece(ctx context.Context, sectorId abi.SectorID, existi ss.lk.Lock() defer ss.lk.Unlock() - c, err := sectorbuilder.GeneratePieceCIDFromFile(sb.proofType, r, size) + c, err := ffiwrapper.GeneratePieceCIDFromFile(sb.proofType, r, size) if err != nil { return abi.PieceInfo{}, xerrors.Errorf("failed to generate piece cid: %w", err) } @@ -273,7 +273,7 @@ func (sb *SectorMgr) GenerateEPostCandidates(ctx context.Context, mid abi.ActorI panic("todo") } - n := sectorbuilder.ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults))) + n := ffiwrapper.ElectionPostChallengeCount(uint64(len(sectorInfo)), uint64(len(faults))) if n > uint64(len(sectorInfo)) { n = uint64(len(sectorInfo)) } @@ -298,7 +298,7 @@ func (sb *SectorMgr) GenerateEPostCandidates(ctx context.Context, mid abi.ActorI return out, nil } -func (sb *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) { +func (sb *SectorMgr) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset ffiwrapper.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) { if len(sb.sectors[sectorID].pieces) > 1 { panic("implme") } @@ -355,10 +355,10 @@ func (m mockVerif) VerifySeal(svi abi.SealVerifyInfo) (bool, error) { } func (m mockVerif) GenerateDataCommitment(pt abi.RegisteredProof, pieces []abi.PieceInfo) (cid.Cid, error) { - return sectorbuilder.GenerateUnsealedCID(pt, pieces) + return ffiwrapper.GenerateUnsealedCID(pt, pieces) } var MockVerifier = mockVerif{} -var _ sectorbuilder.Verifier = MockVerifier +var _ ffiwrapper.Verifier = MockVerifier var _ sectorstorage.SectorManager = &SectorMgr{} diff --git a/mock/preseal.go b/mock/preseal.go index 6bac0aaea..20a4377cd 100644 --- a/mock/preseal.go +++ b/mock/preseal.go @@ -3,7 +3,7 @@ package mock import ( "github.com/filecoin-project/go-address" commcid "github.com/filecoin-project/go-fil-commcid" - "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/lotus/storage/sectorstorage/zerocomm" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/market" @@ -39,7 +39,7 @@ func PreSeal(ssize abi.SectorSize, maddr address.Address, sectors int) (*genesis preseal := &genesis.PreSeal{} preseal.ProofType = st - preseal.CommD = sectorbuilder.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) + preseal.CommD = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) d, _ := commcid.CIDToPieceCommitmentV1(preseal.CommD) r := commDR(d) preseal.CommR = commcid.ReplicaCommitmentV1ToCID(r[:]) diff --git a/resources.go b/resources.go index ebb32f165..4aafb5962 100644 --- a/resources.go +++ b/resources.go @@ -1,21 +1,21 @@ package sectorstorage import ( - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" + "github.com/filecoin-project/lotus/storage/sectorstorage/stores" "github.com/filecoin-project/specs-actors/actors/abi" ) -var FSOverheadSeal = map[sectorbuilder.SectorFileType]int{ // 10x overheads - sectorbuilder.FTUnsealed: 10, - sectorbuilder.FTSealed: 10, - sectorbuilder.FTCache: 70, // TODO: confirm for 32G +var FSOverheadSeal = map[stores.SectorFileType]int{ // 10x overheads + stores.FTUnsealed: 10, + stores.FTSealed: 10, + stores.FTCache: 70, // TODO: confirm for 32G } -var FsOverheadFinalized = map[sectorbuilder.SectorFileType]int{ - sectorbuilder.FTUnsealed: 10, - sectorbuilder.FTSealed: 10, - sectorbuilder.FTCache: 2, +var FsOverheadFinalized = map[stores.SectorFileType]int{ + stores.FTUnsealed: 10, + stores.FTSealed: 10, + stores.FTCache: 2, } type Resources struct { diff --git a/roprov.go b/roprov.go index 8355500b5..dfab863ff 100644 --- a/roprov.go +++ b/roprov.go @@ -3,7 +3,6 @@ package sectorstorage import ( "context" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/storage/sectorstorage/stores" "github.com/filecoin-project/specs-actors/actors/abi" @@ -14,9 +13,9 @@ type readonlyProvider struct { stor *stores.Local } -func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { +func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { if allocate != stores.FTNone { - return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage") + return stores.SectorPaths{}, nil, xerrors.New("read-only storage") } p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing) diff --git a/sectorutil/utils.go b/sectorutil/utils.go deleted file mode 100644 index ede59410b..000000000 --- a/sectorutil/utils.go +++ /dev/null @@ -1,56 +0,0 @@ -package sectorutil - -import ( - "fmt" - "github.com/filecoin-project/go-sectorbuilder" - - "golang.org/x/xerrors" - - "github.com/filecoin-project/specs-actors/actors/abi" -) - -func ParseSectorID(baseName string) (abi.SectorID, error) { - var n abi.SectorNumber - var mid abi.ActorID - read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n) - if err != nil { - return abi.SectorID{}, xerrors.Errorf("sscanf sector name ('%s'): %w", baseName, err) - } - - if read != 2 { - return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read) - } - - return abi.SectorID{ - Miner: mid, - Number: n, - }, nil -} - -func SectorName(sid abi.SectorID) string { - return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number) -} - -func PathByType(sps sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType) string { - switch fileType { - case sectorbuilder.FTUnsealed: - return sps.Unsealed - case sectorbuilder.FTSealed: - return sps.Sealed - case sectorbuilder.FTCache: - return sps.Cache - } - - panic("requested unknown path type") -} - -func SetPathByType(sps *sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType, p string) { - switch fileType { - case sectorbuilder.FTUnsealed: - sps.Unsealed = p - case sectorbuilder.FTSealed: - sps.Sealed = p - case sectorbuilder.FTCache: - sps.Cache = p - } -} diff --git a/stores/filetype.go b/stores/filetype.go index e85db1e53..ddd8cf45e 100644 --- a/stores/filetype.go +++ b/stores/filetype.go @@ -1,8 +1,87 @@ package stores -import "github.com/filecoin-project/go-sectorbuilder" +import ( + "fmt" + "golang.org/x/xerrors" + + "github.com/filecoin-project/specs-actors/actors/abi" +) const ( - // TODO: move the other types here after we drop go-sectorbuilder - FTNone sectorbuilder.SectorFileType = 0 + FTUnsealed SectorFileType = 1 << iota + FTSealed + FTCache ) + +const ( + FTNone SectorFileType = 0 +) + +type SectorFileType int + +func (t SectorFileType) String() string { + switch t { + case FTUnsealed: + return "unsealed" + case FTSealed: + return "sealed" + case FTCache: + return "cache" + default: + return fmt.Sprintf("", t) + } +} + +type SectorPaths struct { + Id abi.SectorID + + Unsealed string + Sealed string + Cache string +} + +func ParseSectorID(baseName string) (abi.SectorID, error) { + var n abi.SectorNumber + var mid abi.ActorID + read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n) + if err != nil { + return abi.SectorID{}, xerrors.Errorf("sscanf sector name ('%s'): %w", baseName, err) + } + + if read != 2 { + return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read) + } + + return abi.SectorID{ + Miner: mid, + Number: n, + }, nil +} + +func SectorName(sid abi.SectorID) string { + return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number) +} + +func PathByType(sps SectorPaths, fileType SectorFileType) string { + switch fileType { + case FTUnsealed: + return sps.Unsealed + case FTSealed: + return sps.Sealed + case FTCache: + return sps.Cache + } + + panic("requested unknown path type") +} + +func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) { + switch fileType { + case FTUnsealed: + sps.Unsealed = p + case FTSealed: + sps.Sealed = p + case FTCache: + sps.Cache = p + } +} diff --git a/stores/http_handler.go b/stores/http_handler.go index bbc9b2b04..21903494b 100644 --- a/stores/http_handler.go +++ b/stores/http_handler.go @@ -10,9 +10,7 @@ import ( logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/lib/tarutil" - "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" ) var log = logging.Logger("stores") @@ -57,7 +55,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ log.Infof("SERVE GET %s", r.URL) vars := mux.Vars(r) - id, err := sectorutil.ParseSectorID(vars["id"]) + id, err := ParseSectorID(vars["id"]) if err != nil { log.Error("%+v", err) w.WriteHeader(500) @@ -78,7 +76,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ } defer done() - path := sectorutil.PathByType(paths, ft) + path := PathByType(paths, ft) if path == "" { log.Error("acquired path was empty") w.WriteHeader(500) @@ -117,7 +115,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R log.Infof("SERVE DELETE %s", r.URL) vars := mux.Vars(r) - id, err := sectorutil.ParseSectorID(vars["id"]) + id, err := ParseSectorID(vars["id"]) if err != nil { log.Error("%+v", err) w.WriteHeader(500) @@ -138,14 +136,14 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R } } -func ftFromString(t string) (sectorbuilder.SectorFileType, error) { +func ftFromString(t string) (SectorFileType, error) { switch t { - case sectorbuilder.FTUnsealed.String(): - return sectorbuilder.FTUnsealed, nil - case sectorbuilder.FTSealed.String(): - return sectorbuilder.FTSealed, nil - case sectorbuilder.FTCache.String(): - return sectorbuilder.FTCache, nil + case FTUnsealed.String(): + return FTUnsealed, nil + case FTSealed.String(): + return FTSealed, nil + case FTCache.String(): + return FTCache, nil default: return 0, xerrors.Errorf("unknown sector file type: '%s'", t) } diff --git a/stores/index.go b/stores/index.go index ccad8ba7f..5e2fb81fb 100644 --- a/stores/index.go +++ b/stores/index.go @@ -9,11 +9,8 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" - - "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" ) // ID identifies sector storage by UUID. One sector storage should map to one @@ -34,16 +31,16 @@ type SectorIndex interface { // part of storage-miner api StorageInfo(context.Context, ID) (StorageInfo, error) // TODO: StorageUpdateStats(FsStat) - StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error - StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error - StorageFindSector(ctx context.Context, sector abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) + StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error + StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error + StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) - StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) + StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error) } type Decl struct { abi.SectorID - sectorbuilder.SectorFileType + SectorFileType } type storageEntry struct { @@ -66,10 +63,10 @@ func NewIndex() *Index { } func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) { - byID := map[ID]map[abi.SectorID]sectorbuilder.SectorFileType{} + byID := map[ID]map[abi.SectorID]SectorFileType{} for id := range i.stores { - byID[id] = map[abi.SectorID]sectorbuilder.SectorFileType{} + byID[id] = map[abi.SectorID]SectorFileType{} } for decl, ids := range i.sectors { for _, id := range ids { @@ -124,7 +121,7 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) er return nil } -func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { +func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error { i.lk.Lock() defer i.lk.Unlock() @@ -148,7 +145,7 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se return nil } -func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft sectorbuilder.SectorFileType) error { +func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error { i.lk.Lock() defer i.lk.Unlock() @@ -182,7 +179,7 @@ func (i *Index) StorageDropSector(ctx context.Context, storageId ID, s abi.Secto return nil } -func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType, allowFetch bool) ([]StorageInfo, error) { +func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, allowFetch bool) ([]StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -214,7 +211,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector return nil, xerrors.Errorf("failed to parse url: %w", err) } - rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s)) + rl.Path = gopath.Join(rl.Path, ft.String(), SectorName(s)) urls[k] = rl.String() } @@ -240,7 +237,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector return nil, xerrors.Errorf("failed to parse url: %w", err) } - rl.Path = gopath.Join(rl.Path, ft.String(), sectorutil.SectorName(s)) + rl.Path = gopath.Join(rl.Path, ft.String(), SectorName(s)) urls[k] = rl.String() } @@ -269,7 +266,7 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) { return *si.info, nil } -func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.SectorFileType, sealing bool) ([]StorageInfo, error) { +func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, sealing bool) ([]StorageInfo, error) { i.lk.RLock() defer i.lk.RUnlock() @@ -309,7 +306,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate sectorbuilder.Sec return out, nil } -func (i *Index) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]ID, error) { +func (i *Index) FindSector(id abi.SectorID, typ SectorFileType) ([]ID, error) { i.lk.RLock() defer i.lk.RUnlock() diff --git a/stores/interface.go b/stores/interface.go index 45e371fb7..556cd4dbf 100644 --- a/stores/interface.go +++ b/stores/interface.go @@ -6,16 +6,15 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" ) type Store interface { - AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error) - Remove(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error + AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (paths SectorPaths, stores SectorPaths, done func(), err error) + Remove(ctx context.Context, s abi.SectorID, types SectorFileType) error // move sectors into storage - MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error + MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error FsStat(ctx context.Context, id ID) (FsStat, error) } diff --git a/stores/local.go b/stores/local.go index bc2e56a69..4ed0f5b3e 100644 --- a/stores/local.go +++ b/stores/local.go @@ -12,9 +12,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi" "golang.org/x/xerrors" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" ) type StoragePath struct { @@ -43,7 +41,7 @@ type LocalStorage interface { const MetaFile = "sectorstore.json" -var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache} +var pathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache} type Local struct { localStorage LocalStorage @@ -120,7 +118,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { } for _, ent := range ents { - sid, err := sectorutil.ParseSectorID(ent.Name()) + sid, err := ParseSectorID(ent.Name()) if err != nil { return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) } @@ -152,15 +150,15 @@ func (st *Local) open(ctx context.Context) error { return nil } -func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { +func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { if existing|allocate != existing^allocate { - return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } st.localLk.RLock() - var out sectorbuilder.SectorPaths - var storageIDs sectorbuilder.SectorPaths + var out SectorPaths + var storageIDs SectorPaths for _, fileType := range pathTypes { if fileType&existing == 0 { @@ -183,9 +181,9 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s continue } - spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) - sectorutil.SetPathByType(&out, fileType, spath) - sectorutil.SetPathByType(&storageIDs, fileType, string(info.ID)) + spath := filepath.Join(p.local, fileType.String(), SectorName(sid)) + SetPathByType(&out, fileType, spath) + SetPathByType(&storageIDs, fileType, string(info.ID)) existing ^= fileType break @@ -200,7 +198,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s sis, err := st.index.StorageBestAlloc(ctx, fileType, sealing) if err != nil { st.localLk.RUnlock() - return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) + return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("finding best storage for allocating : %w", err) } var best string @@ -226,17 +224,17 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing s // TODO: Check free space - best = filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid)) + best = filepath.Join(p.local, fileType.String(), SectorName(sid)) bestID = si.ID } if best == "" { st.localLk.RUnlock() - return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector") + return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector") } - sectorutil.SetPathByType(&out, fileType, best) - sectorutil.SetPathByType(&storageIDs, fileType, string(bestID)) + SetPathByType(&out, fileType, best) + SetPathByType(&storageIDs, fileType, string(bestID)) allocate ^= fileType } @@ -270,7 +268,7 @@ func (st *Local) Local(ctx context.Context) ([]StoragePath, error) { return out, nil } -func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error { +func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error { if bits.OnesCount(uint(typ)) != 1 { return xerrors.New("delete expects one file type") } @@ -298,7 +296,7 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder return xerrors.Errorf("dropping sector from index: %w", err) } - spath := filepath.Join(p.local, typ.String(), sectorutil.SectorName(sid)) + spath := filepath.Join(p.local, typ.String(), SectorName(sid)) log.Infof("remove %s", spath) if err := os.RemoveAll(spath); err != nil { @@ -309,7 +307,7 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder return nil } -func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error { +func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error { dest, destIds, sdone, err := st.AcquireSector(ctx, s, FTNone, types, false) if err != nil { return xerrors.Errorf("acquire dest storage: %w", err) @@ -327,12 +325,12 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbu continue } - sst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(srcIds, fileType))) + sst, err := st.index.StorageInfo(ctx, ID(PathByType(srcIds, fileType))) if err != nil { return xerrors.Errorf("failed to get source storage info: %w", err) } - dst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(destIds, fileType))) + dst, err := st.index.StorageInfo(ctx, ID(PathByType(destIds, fileType))) if err != nil { return xerrors.Errorf("failed to get source storage info: %w", err) } @@ -349,17 +347,17 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbu log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore) - if err := st.index.StorageDropSector(ctx, ID(sectorutil.PathByType(srcIds, fileType)), s, fileType); err != nil { + if err := st.index.StorageDropSector(ctx, ID(PathByType(srcIds, fileType)), s, fileType); err != nil { return xerrors.Errorf("dropping source sector from index: %w", err) } - if err := move(sectorutil.PathByType(src, fileType), sectorutil.PathByType(dest, fileType)); err != nil { + if err := move(PathByType(src, fileType), PathByType(dest, fileType)); err != nil { // TODO: attempt some recovery (check if src is still there, re-declare) return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err) } - if err := st.index.StorageDeclareSector(ctx, ID(sectorutil.PathByType(destIds, fileType)), s, fileType); err != nil { - return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(sectorutil.PathByType(destIds, fileType)), err) + if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType); err != nil { + return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(PathByType(destIds, fileType)), err) } } diff --git a/stores/remote.go b/stores/remote.go index e44b8cfec..e919baa1d 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -17,11 +17,9 @@ import ( files "github.com/ipfs/go-ipfs-files" "golang.org/x/xerrors" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/lotus/lib/tarutil" - "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" ) type Remote struct { @@ -42,9 +40,9 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { } } -func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, sectorbuilder.SectorPaths, func(), error) { +func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { if existing|allocate != existing^allocate { - return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") + return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector") } r.fetchLk.Lock() @@ -52,7 +50,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing) if err != nil { - return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) + return SectorPaths{}, SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) } for _, fileType := range pathTypes { @@ -60,19 +58,19 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec continue } - if sectorutil.PathByType(paths, fileType) != "" { + if PathByType(paths, fileType) != "" { continue } ap, storageID, url, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing) if err != nil { done() - return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err + return SectorPaths{}, SectorPaths{}, nil, err } done = mergeDone(done, rdone) - sectorutil.SetPathByType(&paths, fileType, ap) - sectorutil.SetPathByType(&stores, fileType, string(storageID)) + SetPathByType(&paths, fileType, ap) + SetPathByType(&stores, fileType, string(storageID)) if err := r.index.StorageDeclareSector(ctx, storageID, s, fileType); err != nil { log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) @@ -88,7 +86,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec return paths, stores, done, nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, ID, string, func(), error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, sealing bool) (string, ID, string, func(), error) { si, err := r.index.StorageFindSector(ctx, s, fileType, false) if err != nil { return "", "", "", nil, err @@ -102,8 +100,8 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType if err != nil { return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) } - dest := sectorutil.PathByType(apaths, fileType) - storageID := sectorutil.PathByType(ids, fileType) + dest := PathByType(apaths, fileType) + storageID := PathByType(ids, fileType) var merr error for _, info := range si { @@ -176,7 +174,7 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error { } } -func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error { +func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types SectorFileType) error { // Make sure we have the data local _, _, ddone, err := r.AcquireSector(ctx, s, types, FTNone, false) if err != nil { @@ -187,7 +185,7 @@ func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbu return r.local.MoveStorage(ctx, s, types) } -func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error { +func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType) error { if bits.OnesCount(uint(typ)) != 1 { return xerrors.New("delete expects one file type") } diff --git a/worker_local.go b/worker_local.go index de3f19c89..3547a8a03 100644 --- a/worker_local.go +++ b/worker_local.go @@ -2,6 +2,7 @@ package sectorstorage import ( "context" + "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper" "io" "os" @@ -9,17 +10,15 @@ import ( "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" storage2 "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage/sectorstorage/sealtasks" - "github.com/filecoin-project/lotus/storage/sectorstorage/sectorutil" "github.com/filecoin-project/lotus/storage/sectorstorage/stores" ) -var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache} +var pathTypes = []stores.SectorFileType{stores.FTUnsealed, stores.FTSealed, stores.FTCache} type WorkerConfig struct { SealProof abi.RegisteredProof @@ -27,7 +26,7 @@ type WorkerConfig struct { } type LocalWorker struct { - scfg *sectorbuilder.Config + scfg *ffiwrapper.Config storage stores.Store localStore *stores.Local sindex stores.SectorIndex @@ -47,7 +46,7 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, } return &LocalWorker{ - scfg: §orbuilder.Config{ + scfg: &ffiwrapper.Config{ SealProofType: wcfg.SealProof, PoStProofType: ppt, }, @@ -63,10 +62,10 @@ type localWorkerPathProvider struct { w *LocalWorker } -func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { +func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing bool) (stores.SectorPaths, func(), error) { paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing) if err != nil { - return sectorbuilder.SectorPaths{}, nil, err + return stores.SectorPaths{}, nil, err } log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) @@ -79,7 +78,7 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. continue } - sid := sectorutil.PathByType(storageIDs, fileType) + sid := stores.PathByType(storageIDs, fileType) if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType); err != nil { log.Errorf("declare sector error: %+v", err) @@ -88,8 +87,8 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. }, nil } -func (l *LocalWorker) sb() (sectorbuilder.Basic, error) { - return sectorbuilder.New(&localWorkerPathProvider{w: l}, l.scfg) +func (l *LocalWorker) sb() (ffiwrapper.Basic, error) { + return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg) } func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error { @@ -156,11 +155,11 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e return xerrors.Errorf("finalizing sector: %w", err) } - if err := l.storage.Remove(ctx, sector, sectorbuilder.FTUnsealed); err != nil { + if err := l.storage.Remove(ctx, sector, stores.FTUnsealed); err != nil { return xerrors.Errorf("removing unsealed data: %w", err) } - if err := l.storage.MoveStorage(ctx, sector, sectorbuilder.FTSealed|sectorbuilder.FTCache); err != nil { + if err := l.storage.MoveStorage(ctx, sector, stores.FTSealed|stores.FTCache); err != nil { return xerrors.Errorf("moving sealed data to storage: %w", err) } diff --git a/zerocomm/zerocomm.go b/zerocomm/zerocomm.go new file mode 100644 index 000000000..7d6308549 --- /dev/null +++ b/zerocomm/zerocomm.go @@ -0,0 +1,55 @@ +package zerocomm + +import ( + "math/bits" + + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/ipfs/go-cid" +) + +const Levels = 37 +const Skip = 2 // can't generate for 32, 64b + +var PieceComms = [Levels - Skip][32]byte{ + {0x37, 0x31, 0xbb, 0x99, 0xac, 0x68, 0x9f, 0x66, 0xee, 0xf5, 0x97, 0x3e, 0x4a, 0x94, 0xda, 0x18, 0x8f, 0x4d, 0xdc, 0xae, 0x58, 0x7, 0x24, 0xfc, 0x6f, 0x3f, 0xd6, 0xd, 0xfd, 0x48, 0x83, 0x33}, + {0x64, 0x2a, 0x60, 0x7e, 0xf8, 0x86, 0xb0, 0x4, 0xbf, 0x2c, 0x19, 0x78, 0x46, 0x3a, 0xe1, 0xd4, 0x69, 0x3a, 0xc0, 0xf4, 0x10, 0xeb, 0x2d, 0x1b, 0x7a, 0x47, 0xfe, 0x20, 0x5e, 0x5e, 0x75, 0xf}, + {0x57, 0xa2, 0x38, 0x1a, 0x28, 0x65, 0x2b, 0xf4, 0x7f, 0x6b, 0xef, 0x7a, 0xca, 0x67, 0x9b, 0xe4, 0xae, 0xde, 0x58, 0x71, 0xab, 0x5c, 0xf3, 0xeb, 0x2c, 0x8, 0x11, 0x44, 0x88, 0xcb, 0x85, 0x26}, + {0x1f, 0x7a, 0xc9, 0x59, 0x55, 0x10, 0xe0, 0x9e, 0xa4, 0x1c, 0x46, 0xb, 0x17, 0x64, 0x30, 0xbb, 0x32, 0x2c, 0xd6, 0xfb, 0x41, 0x2e, 0xc5, 0x7c, 0xb1, 0x7d, 0x98, 0x9a, 0x43, 0x10, 0x37, 0x2f}, + {0xfc, 0x7e, 0x92, 0x82, 0x96, 0xe5, 0x16, 0xfa, 0xad, 0xe9, 0x86, 0xb2, 0x8f, 0x92, 0xd4, 0x4a, 0x4f, 0x24, 0xb9, 0x35, 0x48, 0x52, 0x23, 0x37, 0x6a, 0x79, 0x90, 0x27, 0xbc, 0x18, 0xf8, 0x33}, + {0x8, 0xc4, 0x7b, 0x38, 0xee, 0x13, 0xbc, 0x43, 0xf4, 0x1b, 0x91, 0x5c, 0xe, 0xed, 0x99, 0x11, 0xa2, 0x60, 0x86, 0xb3, 0xed, 0x62, 0x40, 0x1b, 0xf9, 0xd5, 0x8b, 0x8d, 0x19, 0xdf, 0xf6, 0x24}, + {0xb2, 0xe4, 0x7b, 0xfb, 0x11, 0xfa, 0xcd, 0x94, 0x1f, 0x62, 0xaf, 0x5c, 0x75, 0xf, 0x3e, 0xa5, 0xcc, 0x4d, 0xf5, 0x17, 0xd5, 0xc4, 0xf1, 0x6d, 0xb2, 0xb4, 0xd7, 0x7b, 0xae, 0xc1, 0xa3, 0x2f}, + {0xf9, 0x22, 0x61, 0x60, 0xc8, 0xf9, 0x27, 0xbf, 0xdc, 0xc4, 0x18, 0xcd, 0xf2, 0x3, 0x49, 0x31, 0x46, 0x0, 0x8e, 0xae, 0xfb, 0x7d, 0x2, 0x19, 0x4d, 0x5e, 0x54, 0x81, 0x89, 0x0, 0x51, 0x8}, + {0x2c, 0x1a, 0x96, 0x4b, 0xb9, 0xb, 0x59, 0xeb, 0xfe, 0xf, 0x6d, 0xa2, 0x9a, 0xd6, 0x5a, 0xe3, 0xe4, 0x17, 0x72, 0x4a, 0x8f, 0x7c, 0x11, 0x74, 0x5a, 0x40, 0xca, 0xc1, 0xe5, 0xe7, 0x40, 0x11}, + {0xfe, 0xe3, 0x78, 0xce, 0xf1, 0x64, 0x4, 0xb1, 0x99, 0xed, 0xe0, 0xb1, 0x3e, 0x11, 0xb6, 0x24, 0xff, 0x9d, 0x78, 0x4f, 0xbb, 0xed, 0x87, 0x8d, 0x83, 0x29, 0x7e, 0x79, 0x5e, 0x2, 0x4f, 0x2}, + {0x8e, 0x9e, 0x24, 0x3, 0xfa, 0x88, 0x4c, 0xf6, 0x23, 0x7f, 0x60, 0xdf, 0x25, 0xf8, 0x3e, 0xe4, 0xd, 0xca, 0x9e, 0xd8, 0x79, 0xeb, 0x6f, 0x63, 0x52, 0xd1, 0x50, 0x84, 0xf5, 0xad, 0xd, 0x3f}, + {0x75, 0x2d, 0x96, 0x93, 0xfa, 0x16, 0x75, 0x24, 0x39, 0x54, 0x76, 0xe3, 0x17, 0xa9, 0x85, 0x80, 0xf0, 0x9, 0x47, 0xaf, 0xb7, 0xa3, 0x5, 0x40, 0xd6, 0x25, 0xa9, 0x29, 0x1c, 0xc1, 0x2a, 0x7}, + {0x70, 0x22, 0xf6, 0xf, 0x7e, 0xf6, 0xad, 0xfa, 0x17, 0x11, 0x7a, 0x52, 0x61, 0x9e, 0x30, 0xce, 0xa8, 0x2c, 0x68, 0x7, 0x5a, 0xdf, 0x1c, 0x66, 0x77, 0x86, 0xec, 0x50, 0x6e, 0xef, 0x2d, 0x19}, + {0xd9, 0x98, 0x87, 0xb9, 0x73, 0x57, 0x3a, 0x96, 0xe1, 0x13, 0x93, 0x64, 0x52, 0x36, 0xc1, 0x7b, 0x1f, 0x4c, 0x70, 0x34, 0xd7, 0x23, 0xc7, 0xa9, 0x9f, 0x70, 0x9b, 0xb4, 0xda, 0x61, 0x16, 0x2b}, + {0xd0, 0xb5, 0x30, 0xdb, 0xb0, 0xb4, 0xf2, 0x5c, 0x5d, 0x2f, 0x2a, 0x28, 0xdf, 0xee, 0x80, 0x8b, 0x53, 0x41, 0x2a, 0x2, 0x93, 0x1f, 0x18, 0xc4, 0x99, 0xf5, 0xa2, 0x54, 0x8, 0x6b, 0x13, 0x26}, + {0x84, 0xc0, 0x42, 0x1b, 0xa0, 0x68, 0x5a, 0x1, 0xbf, 0x79, 0x5a, 0x23, 0x44, 0x6, 0x4f, 0xe4, 0x24, 0xbd, 0x52, 0xa9, 0xd2, 0x43, 0x77, 0xb3, 0x94, 0xff, 0x4c, 0x4b, 0x45, 0x68, 0xe8, 0x11}, + {0x65, 0xf2, 0x9e, 0x5d, 0x98, 0xd2, 0x46, 0xc3, 0x8b, 0x38, 0x8c, 0xfc, 0x6, 0xdb, 0x1f, 0x6b, 0x2, 0x13, 0x3, 0xc5, 0xa2, 0x89, 0x0, 0xb, 0xdc, 0xe8, 0x32, 0xa9, 0xc3, 0xec, 0x42, 0x1c}, + {0xa2, 0x24, 0x75, 0x8, 0x28, 0x58, 0x50, 0x96, 0x5b, 0x7e, 0x33, 0x4b, 0x31, 0x27, 0xb0, 0xc0, 0x42, 0xb1, 0xd0, 0x46, 0xdc, 0x54, 0x40, 0x21, 0x37, 0x62, 0x7c, 0xd8, 0x79, 0x9c, 0xe1, 0x3a}, + {0xda, 0xfd, 0xab, 0x6d, 0xa9, 0x36, 0x44, 0x53, 0xc2, 0x6d, 0x33, 0x72, 0x6b, 0x9f, 0xef, 0xe3, 0x43, 0xbe, 0x8f, 0x81, 0x64, 0x9e, 0xc0, 0x9, 0xaa, 0xd3, 0xfa, 0xff, 0x50, 0x61, 0x75, 0x8}, + {0xd9, 0x41, 0xd5, 0xe0, 0xd6, 0x31, 0x4a, 0x99, 0x5c, 0x33, 0xff, 0xbd, 0x4f, 0xbe, 0x69, 0x11, 0x8d, 0x73, 0xd4, 0xe5, 0xfd, 0x2c, 0xd3, 0x1f, 0xf, 0x7c, 0x86, 0xeb, 0xdd, 0x14, 0xe7, 0x6}, + {0x51, 0x4c, 0x43, 0x5c, 0x3d, 0x4, 0xd3, 0x49, 0xa5, 0x36, 0x5f, 0xbd, 0x59, 0xff, 0xc7, 0x13, 0x62, 0x91, 0x11, 0x78, 0x59, 0x91, 0xc1, 0xa3, 0xc5, 0x3a, 0xf2, 0x20, 0x79, 0x74, 0x1a, 0x2f}, + {0xad, 0x6, 0x85, 0x39, 0x69, 0xd3, 0x7d, 0x34, 0xff, 0x8, 0xe0, 0x9f, 0x56, 0x93, 0xa, 0x4a, 0xd1, 0x9a, 0x89, 0xde, 0xf6, 0xc, 0xbf, 0xee, 0x7e, 0x1d, 0x33, 0x81, 0xc1, 0xe7, 0x1c, 0x37}, + {0x39, 0x56, 0xe, 0x7b, 0x13, 0xa9, 0x3b, 0x7, 0xa2, 0x43, 0xfd, 0x27, 0x20, 0xff, 0xa7, 0xcb, 0x3e, 0x1d, 0x2e, 0x50, 0x5a, 0xb3, 0x62, 0x9e, 0x79, 0xf4, 0x63, 0x13, 0x51, 0x2c, 0xda, 0x6}, + {0xcc, 0xc3, 0xc0, 0x12, 0xf5, 0xb0, 0x5e, 0x81, 0x1a, 0x2b, 0xbf, 0xdd, 0xf, 0x68, 0x33, 0xb8, 0x42, 0x75, 0xb4, 0x7b, 0xf2, 0x29, 0xc0, 0x5, 0x2a, 0x82, 0x48, 0x4f, 0x3c, 0x1a, 0x5b, 0x3d}, + {0x7d, 0xf2, 0x9b, 0x69, 0x77, 0x31, 0x99, 0xe8, 0xf2, 0xb4, 0xb, 0x77, 0x91, 0x9d, 0x4, 0x85, 0x9, 0xee, 0xd7, 0x68, 0xe2, 0xc7, 0x29, 0x7b, 0x1f, 0x14, 0x37, 0x3, 0x4f, 0xc3, 0xc6, 0x2c}, + {0x66, 0xce, 0x5, 0xa3, 0x66, 0x75, 0x52, 0xcf, 0x45, 0xc0, 0x2b, 0xcc, 0x4e, 0x83, 0x92, 0x91, 0x9b, 0xde, 0xac, 0x35, 0xde, 0x2f, 0xf5, 0x62, 0x71, 0x84, 0x8e, 0x9f, 0x7b, 0x67, 0x51, 0x7}, + {0xd8, 0x61, 0x2, 0x18, 0x42, 0x5a, 0xb5, 0xe9, 0x5b, 0x1c, 0xa6, 0x23, 0x9d, 0x29, 0xa2, 0xe4, 0x20, 0xd7, 0x6, 0xa9, 0x6f, 0x37, 0x3e, 0x2f, 0x9c, 0x9a, 0x91, 0xd7, 0x59, 0xd1, 0x9b, 0x1}, + {0x6d, 0x36, 0x4b, 0x1e, 0xf8, 0x46, 0x44, 0x1a, 0x5a, 0x4a, 0x68, 0x86, 0x23, 0x14, 0xac, 0xc0, 0xa4, 0x6f, 0x1, 0x67, 0x17, 0xe5, 0x34, 0x43, 0xe8, 0x39, 0xee, 0xdf, 0x83, 0xc2, 0x85, 0x3c}, + {0x7, 0x7e, 0x5f, 0xde, 0x35, 0xc5, 0xa, 0x93, 0x3, 0xa5, 0x50, 0x9, 0xe3, 0x49, 0x8a, 0x4e, 0xbe, 0xdf, 0xf3, 0x9c, 0x42, 0xb7, 0x10, 0xb7, 0x30, 0xd8, 0xec, 0x7a, 0xc7, 0xaf, 0xa6, 0x3e}, + {0xe6, 0x40, 0x5, 0xa6, 0xbf, 0xe3, 0x77, 0x79, 0x53, 0xb8, 0xad, 0x6e, 0xf9, 0x3f, 0xf, 0xca, 0x10, 0x49, 0xb2, 0x4, 0x16, 0x54, 0xf2, 0xa4, 0x11, 0xf7, 0x70, 0x27, 0x99, 0xce, 0xce, 0x2}, + {0x25, 0x9d, 0x3d, 0x6b, 0x1f, 0x4d, 0x87, 0x6d, 0x11, 0x85, 0xe1, 0x12, 0x3a, 0xf6, 0xf5, 0x50, 0x1a, 0xf0, 0xf6, 0x7c, 0xf1, 0x5b, 0x52, 0x16, 0x25, 0x5b, 0x7b, 0x17, 0x8d, 0x12, 0x5, 0x1d}, + {0x3f, 0x9a, 0x4d, 0x41, 0x1d, 0xa4, 0xef, 0x1b, 0x36, 0xf3, 0x5f, 0xf0, 0xa1, 0x95, 0xae, 0x39, 0x2a, 0xb2, 0x3f, 0xee, 0x79, 0x67, 0xb7, 0xc4, 0x1b, 0x3, 0xd1, 0x61, 0x3f, 0xc2, 0x92, 0x39}, + {0xfe, 0x4e, 0xf3, 0x28, 0xc6, 0x1a, 0xa3, 0x9c, 0xfd, 0xb2, 0x48, 0x4e, 0xaa, 0x32, 0xa1, 0x51, 0xb1, 0xfe, 0x3d, 0xfd, 0x1f, 0x96, 0xdd, 0x8c, 0x97, 0x11, 0xfd, 0x86, 0xd6, 0xc5, 0x81, 0x13}, + {0xf5, 0x5d, 0x68, 0x90, 0xe, 0x2d, 0x83, 0x81, 0xec, 0xcb, 0x81, 0x64, 0xcb, 0x99, 0x76, 0xf2, 0x4b, 0x2d, 0xe0, 0xdd, 0x61, 0xa3, 0x1b, 0x97, 0xce, 0x6e, 0xb2, 0x38, 0x50, 0xd5, 0xe8, 0x19}, + {0xaa, 0xaa, 0x8c, 0x4c, 0xb4, 0xa, 0xac, 0xee, 0x1e, 0x2, 0xdc, 0x65, 0x42, 0x4b, 0x2a, 0x6c, 0x8e, 0x99, 0xf8, 0x3, 0xb7, 0x2f, 0x79, 0x29, 0xc4, 0x10, 0x1d, 0x7f, 0xae, 0x6b, 0xff, 0x32}, +} + +func ZeroPieceCommitment(sz abi.UnpaddedPieceSize) cid.Cid { + level := bits.TrailingZeros64(uint64(sz.Padded())) - Skip - 5 // 2^5 = 32 + return commcid.PieceCommitmentV1ToCID(PieceComms[level][:]) +} diff --git a/zerocomm/zerocomm_test.go b/zerocomm/zerocomm_test.go new file mode 100644 index 000000000..ef3f11d88 --- /dev/null +++ b/zerocomm/zerocomm_test.go @@ -0,0 +1,115 @@ +package zerocomm_test + +import ( + "bytes" + "fmt" + "github.com/filecoin-project/lotus/storage/sectorstorage/zerocomm" + "io" + "testing" + + commcid "github.com/filecoin-project/go-fil-commcid" + abi "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/storage/sectorstorage/ffiwrapper" +) + +func TestComms(t *testing.T) { + t.Skip("don't have enough ram") // no, but seriously, currently this needs like 3tb of /tmp + + var expPieceComms [zerocomm.Levels - zerocomm.Skip]cid.Cid + + { + l2, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, bytes.NewReader(make([]byte, 127)), 127) + if err != nil { + t.Fatal(err) + } + expPieceComms[0] = l2 + } + + for i := 1; i < zerocomm.Levels-2; i++ { + var err error + sz := abi.UnpaddedPieceSize(127 << uint(i)) + fmt.Println(i, sz) + r := io.LimitReader(&NullReader{}, int64(sz)) + + expPieceComms[i], err = ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, r, sz) + if err != nil { + t.Fatal(err) + } + } + + for i, comm := range expPieceComms { + c, err := commcid.CIDToPieceCommitmentV1(comm) + if err != nil { + t.Fatal(err) + } + if string(c) != string(zerocomm.PieceComms[i][:]) { + t.Errorf("zero commitment %d didn't match", i) + } + } + + for _, comm := range expPieceComms { // Could do codegen, but this is good enough + fmt.Printf("%#v,\n", comm) + } +} + +func TestCommsSmall(t *testing.T) { + var expPieceComms [8]cid.Cid + lvls := len(expPieceComms) + zerocomm.Skip + + { + l2, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, bytes.NewReader(make([]byte, 127)), 127) + if err != nil { + t.Fatal(err) + } + expPieceComms[0] = l2 + } + + for i := 1; i < lvls-2; i++ { + var err error + sz := abi.UnpaddedPieceSize(127 << uint(i)) + fmt.Println(i, sz) + r := io.LimitReader(&NullReader{}, int64(sz)) + + expPieceComms[i], err = ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, r, sz) + if err != nil { + t.Fatal(err) + } + } + + for i, comm := range expPieceComms { + c, err := commcid.CIDToPieceCommitmentV1(comm) + if err != nil { + t.Fatal(err) + } + if string(c) != string(zerocomm.PieceComms[i][:]) { + t.Errorf("zero commitment %d didn't match", i) + } + } + + for _, comm := range expPieceComms { // Could do codegen, but this is good enough + fmt.Printf("%#v,\n", comm) + } +} + +func TestForSise(t *testing.T) { + exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredProof_StackedDRG2KiBPoSt, bytes.NewReader(make([]byte, 1016)), 1016) + if err != nil { + return + } + + actual := zerocomm.ZeroPieceCommitment(1016) + if !exp.Equals(actual) { + t.Errorf("zero commitment didn't match") + } +} + +type NullReader struct{} + +func (NullReader) Read(out []byte) (int, error) { + for i := range out { + out[i] = 0 + } + return len(out), nil +}