From 3e39d6e4453b8a4a7d5b8b14d150e1bafa4c0ae4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 7 Nov 2019 17:39:27 +0100 Subject: [PATCH] sectorbuilder: use standalone methods --- lib/sectorbuilder/files.go | 88 ++++++++++++ lib/sectorbuilder/mock.go | 9 +- lib/sectorbuilder/sectorbuilder.go | 173 ++++++++++++++++-------- lib/sectorbuilder/sectorbuilder_test.go | 35 +++-- 4 files changed, 230 insertions(+), 75 deletions(-) create mode 100644 lib/sectorbuilder/files.go diff --git a/lib/sectorbuilder/files.go b/lib/sectorbuilder/files.go new file mode 100644 index 000000000..c0af2b6a7 --- /dev/null +++ b/lib/sectorbuilder/files.go @@ -0,0 +1,88 @@ +package sectorbuilder + +import ( + "fmt" + "io" + "os" + "path/filepath" + "sync" + + "golang.org/x/xerrors" +) + +func (sb *SectorBuilder) sectorName(sectorID uint64) string { + return fmt.Sprintf("s-%s-%d", sb.Miner, sectorID) +} + +func (sb *SectorBuilder) stagedSectorPath(sectorID uint64) string { + return filepath.Join(sb.stagedDir, sb.sectorName(sectorID)) +} + +func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) { + return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) +} + +func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) { + path := filepath.Join(sb.sealedDir, sb.sectorName(sectorID)) + + e, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return "", err + } + + return path, e.Close() +} + +func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) { + dir := filepath.Join(sb.cacheDir, sb.sectorName(sectorID)) + + err := os.Mkdir(dir, 0755) + if os.IsExist(err) { + err = nil + } + + return dir, err +} + +func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { + f, ok := r.(*os.File) + if ok { + return f, func() error { return nil }, nil + } + + var w *os.File + + f, w, err := os.Pipe() + if err != nil { + return nil, nil, err + } + + var wait sync.Mutex + var werr error + + wait.Lock() + go func() { + defer wait.Unlock() + + copied, werr := io.CopyN(w, r, n) + if werr != nil { + log.Warnf("toReadableFile: copy error: %+v", werr) + } + + err := w.Close() + if werr == nil && err != nil { + werr = err + log.Warnf("toReadableFile: close error: %+v", err) + return + } + if copied != n { + log.Warnf("copied different amount than expected: %d != %d", copied, n) + werr = xerrors.Errorf("copied different amount than expected: %d != %d", copied, n) + } + }() + + return f, func() error { + wait.Lock() + return werr + }, nil +} diff --git a/lib/sectorbuilder/mock.go b/lib/sectorbuilder/mock.go index 451757b17..1433d6a2d 100644 --- a/lib/sectorbuilder/mock.go +++ b/lib/sectorbuilder/mock.go @@ -24,8 +24,15 @@ func TempSectorbuilder(sectorSize uint64) (*SectorBuilder, func(), error) { staging := filepath.Join(dir, "staging") cache := filepath.Join(dir, "cache") + for _, dir := range []string{metadata, sealed, staging, cache} { + if err := os.Mkdir(dir, 0755); err != nil { + return nil, nil, err + } + } + sb, err := New(&Config{ - SectorSize: sectorSize, + SectorSize: sectorSize, + SealedDir: sealed, StagedDir: staging, MetadataDir: metadata, diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 7f09fa81b..9944429ec 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -2,9 +2,7 @@ package sectorbuilder import ( "io" - "os" "sort" - "sync" "unsafe" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" @@ -15,6 +13,7 @@ import ( ) const PoStReservedWorkers = 1 +const PoRepProofPartitions = 2 var log = logging.Logger("sectorbuilder") @@ -36,6 +35,8 @@ type SealCommitOutput = sectorbuilder.SealCommitOutput type PublicPieceInfo = sectorbuilder.PublicPieceInfo +type RawSealPreCommitOutput = sectorbuilder.RawSealPreCommitOutput + const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { @@ -44,6 +45,10 @@ type SectorBuilder struct { Miner address.Address + stagedDir string + sealedDir string + cacheDir string + rateLimit chan struct{} } @@ -66,7 +71,7 @@ func New(cfg *Config) (*SectorBuilder, error) { proverId := addressToProverID(cfg.Miner) - sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads) + sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads) if err != nil { return nil, err } @@ -75,6 +80,10 @@ func New(cfg *Config) (*SectorBuilder, error) { handle: sbp, ssize: cfg.SectorSize, + stagedDir: cfg.StagedDir, + sealedDir: cfg.SealedDir, + cacheDir: cfg.CacheDir, + Miner: cfg.Miner, rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers), }, nil @@ -101,21 +110,44 @@ func (sb *SectorBuilder) Destroy() { sectorbuilder.DestroySectorBuilder(sb.handle) } -func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, file io.Reader) (uint64, error) { +func (sb *SectorBuilder) AcquireSectorId() (uint64, error) { + return sectorbuilder.AcquireSectorId(sb.handle) +} + +func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader) (PublicPieceInfo, error) { f, werr, err := toReadableFile(file, int64(pieceSize)) if err != nil { - return 0, err + return PublicPieceInfo{}, err } ret := sb.rlimit() defer ret() - sectorID, err := sectorbuilder.AddPieceFromFile(sb.handle, pieceKey, pieceSize, f) + stagedFile, err := sb.stagedSectorFile(sectorId) if err != nil { - return 0, err + return PublicPieceInfo{}, err } - return sectorID, werr() + writeUnpadded, commP, err := sectorbuilder.StandaloneWriteWithoutAlignment(f, pieceSize, stagedFile) + if err != nil { + return PublicPieceInfo{}, err + } + if writeUnpadded != pieceSize { + return PublicPieceInfo{}, xerrors.Errorf("writeUnpadded != pieceSize: %d != %d", writeUnpadded, pieceSize) + } + + if err := stagedFile.Close(); err != nil { + return PublicPieceInfo{}, err + } + + if err := f.Close(); err != nil { + return PublicPieceInfo{}, err + } + + return PublicPieceInfo{ + Size: pieceSize, + CommP: commP, + }, werr() } // TODO: should *really really* return an io.ReadCloser @@ -126,25 +158,89 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey) } -func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket) (SealPreCommitOutput, error) { +func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { ret := sb.rlimit() defer ret() - return sectorbuilder.SealPreCommit(sb.handle, sectorID, ticket) + cacheDir, err := sb.sectorCacheDir(sectorID) + if err != nil { + return RawSealPreCommitOutput{}, err + } + + sealedPath, err := sb.sealedSectorPath(sectorID) + if err != nil { + return RawSealPreCommitOutput{}, err + } + + return sectorbuilder.StandaloneSealPreCommit( + sb.ssize, + PoRepProofPartitions, + cacheDir, + sb.stagedSectorPath(sectorID), + sealedPath, + sectorID, + addressToProverID(sb.Miner), + ticket.TicketBytes, + pieces, + ) } -func (sb *SectorBuilder) SealCommit(sectorID uint64, seed SealSeed) (SealCommitOutput, error) { +func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) { ret := sb.rlimit() defer ret() - return sectorbuilder.SealCommit(sb.handle, sectorID, seed) -} + cacheDir, err := sb.sectorCacheDir(sectorID) + if err != nil { + return nil, err + } -func (sb *SectorBuilder) ResumeSealCommit(sectorID uint64) (SealCommitOutput, error) { - ret := sb.rlimit() - defer ret() + proof, err = sectorbuilder.StandaloneSealCommit( + sb.ssize, + PoRepProofPartitions, + cacheDir, + sectorID, + addressToProverID(sb.Miner), + ticket.TicketBytes, + seed.TicketBytes, + pieces, + rspco, + ) + if err != nil { + return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) + } - return sectorbuilder.ResumeSealCommit(sb.handle, sectorID) + pmeta := make([]sectorbuilder.PieceMetadata, len(pieces)) + for i, piece := range pieces { + pmeta[i] = sectorbuilder.PieceMetadata{ + Key: pieceKeys[i], + Size: piece.Size, + CommP: piece.CommP, + } + } + + sealedPath, err := sb.sealedSectorPath(sectorID) + if err != nil { + return nil, err + } + + err = sectorbuilder.ImportSealedSector( + sb.handle, + sectorID, + cacheDir, + sealedPath, + ticket, + seed, + rspco.CommR, + rspco.CommD, + rspco.CommC, + rspco.CommRLast, + proof, + pmeta, + ) + if err != nil { + return nil, xerrors.Errorf("ImportSealedSector: %w", err) + } + return proof, nil } func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) { @@ -217,46 +313,3 @@ func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen] func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) { return sectorbuilder.GenerateDataCommitment(ssize, pieces) } - -func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) { - f, ok := r.(*os.File) - if ok { - return f, func() error { return nil }, nil - } - - var w *os.File - - f, w, err := os.Pipe() - if err != nil { - return nil, nil, err - } - - var wait sync.Mutex - var werr error - - wait.Lock() - go func() { - defer wait.Unlock() - - copied, werr := io.CopyN(w, r, n) - if werr != nil { - log.Warnf("toReadableFile: copy error: %+v", werr) - } - - err := w.Close() - if werr == nil && err != nil { - werr = err - log.Warnf("toReadableFile: close error: %+v", err) - return - } - if copied != n { - log.Warnf("copied different amount than expected: %d != %d", copied, n) - werr = xerrors.Errorf("copied different amount than expected: %d != %d", copied, n) - } - }() - - return f, func() error { - wait.Lock() - return werr - }, nil -} diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index 5ce934f04..c83f2c97b 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -3,6 +3,7 @@ package sectorbuilder_test import ( "io" "math/rand" + "os" "testing" "github.com/filecoin-project/lotus/build" @@ -12,27 +13,33 @@ import ( const sectorSize = 1024 func TestSealAndVerify(t *testing.T) { - t.Skip("this is slow") - //os.Setenv("BELLMAN_NO_GPU", "1") + //t.Skip("this is slow") + os.Setenv("BELLMAN_NO_GPU", "1") build.SectorSizes = []uint64{sectorSize} if err := build.GetParams(true); err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize) if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } - defer cleanup() + _ = cleanup + //defer cleanup() dlen := sectorbuilder.UserBytesForSectorSize(sectorSize) - r := io.LimitReader(rand.New(rand.NewSource(42)), int64(dlen)) - sid, err := sb.AddPiece("foo", dlen, r) + sid, err := sb.AcquireSectorId() if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) + } + + r := io.LimitReader(rand.New(rand.NewSource(42)), int64(dlen)) + ppi, err := sb.AddPiece(dlen, sid, r) + if err != nil { + t.Fatalf("%+v", err) } ticket := sectorbuilder.SealTicket{ @@ -40,9 +47,9 @@ func TestSealAndVerify(t *testing.T) { TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2}, } - pco, err := sb.SealPreCommit(sid, ticket) + pco, err := sb.SealPreCommit(sid, ticket, []sectorbuilder.PublicPieceInfo{ppi}) if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } seed := sectorbuilder.SealSeed{ @@ -50,14 +57,14 @@ func TestSealAndVerify(t *testing.T) { TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}, } - sco, err := sb.SealCommit(sid, seed) + proof, err := sb.SealCommit(sid, ticket, seed, []sectorbuilder.PublicPieceInfo{ppi}, []string{"foo"}, pco) if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } - ok, err := sectorbuilder.VerifySeal(sectorSize, pco.CommR[:], pco.CommD[:], sb.Miner, ticket.TicketBytes[:], seed.TicketBytes[:], sid, sco.Proof) + ok, err := sectorbuilder.VerifySeal(sectorSize, pco.CommR[:], pco.CommD[:], sb.Miner, ticket.TicketBytes[:], seed.TicketBytes[:], sid, proof) if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } if !ok {