sectorbuilder: use standalone methods

This commit is contained in:
Łukasz Magiera 2019-11-07 17:39:27 +01:00
parent e5f90371c7
commit 3e39d6e445
4 changed files with 230 additions and 75 deletions

View File

@ -0,0 +1,88 @@
package sectorbuilder
import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"golang.org/x/xerrors"
)
func (sb *SectorBuilder) sectorName(sectorID uint64) string {
return fmt.Sprintf("s-%s-%d", sb.Miner, sectorID)
}
func (sb *SectorBuilder) stagedSectorPath(sectorID uint64) string {
return filepath.Join(sb.stagedDir, sb.sectorName(sectorID))
}
func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
}
func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) {
path := filepath.Join(sb.sealedDir, sb.sectorName(sectorID))
e, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return "", err
}
return path, e.Close()
}
func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
dir := filepath.Join(sb.cacheDir, sb.sectorName(sectorID))
err := os.Mkdir(dir, 0755)
if os.IsExist(err) {
err = nil
}
return dir, err
}
func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {
return f, func() error { return nil }, nil
}
var w *os.File
f, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
var wait sync.Mutex
var werr error
wait.Lock()
go func() {
defer wait.Unlock()
copied, werr := io.CopyN(w, r, n)
if werr != nil {
log.Warnf("toReadableFile: copy error: %+v", werr)
}
err := w.Close()
if werr == nil && err != nil {
werr = err
log.Warnf("toReadableFile: close error: %+v", err)
return
}
if copied != n {
log.Warnf("copied different amount than expected: %d != %d", copied, n)
werr = xerrors.Errorf("copied different amount than expected: %d != %d", copied, n)
}
}()
return f, func() error {
wait.Lock()
return werr
}, nil
}

View File

@ -24,8 +24,15 @@ func TempSectorbuilder(sectorSize uint64) (*SectorBuilder, func(), error) {
staging := filepath.Join(dir, "staging")
cache := filepath.Join(dir, "cache")
for _, dir := range []string{metadata, sealed, staging, cache} {
if err := os.Mkdir(dir, 0755); err != nil {
return nil, nil, err
}
}
sb, err := New(&Config{
SectorSize: sectorSize,
SectorSize: sectorSize,
SealedDir: sealed,
StagedDir: staging,
MetadataDir: metadata,

View File

@ -2,9 +2,7 @@ package sectorbuilder
import (
"io"
"os"
"sort"
"sync"
"unsafe"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
@ -15,6 +13,7 @@ import (
)
const PoStReservedWorkers = 1
const PoRepProofPartitions = 2
var log = logging.Logger("sectorbuilder")
@ -36,6 +35,8 @@ type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
type RawSealPreCommitOutput = sectorbuilder.RawSealPreCommitOutput
const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct {
@ -44,6 +45,10 @@ type SectorBuilder struct {
Miner address.Address
stagedDir string
sealedDir string
cacheDir string
rateLimit chan struct{}
}
@ -66,7 +71,7 @@ func New(cfg *Config) (*SectorBuilder, error) {
proverId := addressToProverID(cfg.Miner)
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
if err != nil {
return nil, err
}
@ -75,6 +80,10 @@ func New(cfg *Config) (*SectorBuilder, error) {
handle: sbp,
ssize: cfg.SectorSize,
stagedDir: cfg.StagedDir,
sealedDir: cfg.SealedDir,
cacheDir: cfg.CacheDir,
Miner: cfg.Miner,
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
}, nil
@ -101,21 +110,44 @@ func (sb *SectorBuilder) Destroy() {
sectorbuilder.DestroySectorBuilder(sb.handle)
}
func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, file io.Reader) (uint64, error) {
func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
return sectorbuilder.AcquireSectorId(sb.handle)
}
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader) (PublicPieceInfo, error) {
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
return 0, err
return PublicPieceInfo{}, err
}
ret := sb.rlimit()
defer ret()
sectorID, err := sectorbuilder.AddPieceFromFile(sb.handle, pieceKey, pieceSize, f)
stagedFile, err := sb.stagedSectorFile(sectorId)
if err != nil {
return 0, err
return PublicPieceInfo{}, err
}
return sectorID, werr()
writeUnpadded, commP, err := sectorbuilder.StandaloneWriteWithoutAlignment(f, pieceSize, stagedFile)
if err != nil {
return PublicPieceInfo{}, err
}
if writeUnpadded != pieceSize {
return PublicPieceInfo{}, xerrors.Errorf("writeUnpadded != pieceSize: %d != %d", writeUnpadded, pieceSize)
}
if err := stagedFile.Close(); err != nil {
return PublicPieceInfo{}, err
}
if err := f.Close(); err != nil {
return PublicPieceInfo{}, err
}
return PublicPieceInfo{
Size: pieceSize,
CommP: commP,
}, werr()
}
// TODO: should *really really* return an io.ReadCloser
@ -126,25 +158,89 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err
return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey)
}
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket) (SealPreCommitOutput, error) {
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
ret := sb.rlimit()
defer ret()
return sectorbuilder.SealPreCommit(sb.handle, sectorID, ticket)
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
}
sealedPath, err := sb.sealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
}
return sectorbuilder.StandaloneSealPreCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
sb.stagedSectorPath(sectorID),
sealedPath,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
pieces,
)
}
func (sb *SectorBuilder) SealCommit(sectorID uint64, seed SealSeed) (SealCommitOutput, error) {
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) {
ret := sb.rlimit()
defer ret()
return sectorbuilder.SealCommit(sb.handle, sectorID, seed)
}
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
func (sb *SectorBuilder) ResumeSealCommit(sectorID uint64) (SealCommitOutput, error) {
ret := sb.rlimit()
defer ret()
proof, err = sectorbuilder.StandaloneSealCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
seed.TicketBytes,
pieces,
rspco,
)
if err != nil {
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return sectorbuilder.ResumeSealCommit(sb.handle, sectorID)
pmeta := make([]sectorbuilder.PieceMetadata, len(pieces))
for i, piece := range pieces {
pmeta[i] = sectorbuilder.PieceMetadata{
Key: pieceKeys[i],
Size: piece.Size,
CommP: piece.CommP,
}
}
sealedPath, err := sb.sealedSectorPath(sectorID)
if err != nil {
return nil, err
}
err = sectorbuilder.ImportSealedSector(
sb.handle,
sectorID,
cacheDir,
sealedPath,
ticket,
seed,
rspco.CommR,
rspco.CommD,
rspco.CommC,
rspco.CommRLast,
proof,
pmeta,
)
if err != nil {
return nil, xerrors.Errorf("ImportSealedSector: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) {
@ -217,46 +313,3 @@ func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]
func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
}
func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {
return f, func() error { return nil }, nil
}
var w *os.File
f, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
var wait sync.Mutex
var werr error
wait.Lock()
go func() {
defer wait.Unlock()
copied, werr := io.CopyN(w, r, n)
if werr != nil {
log.Warnf("toReadableFile: copy error: %+v", werr)
}
err := w.Close()
if werr == nil && err != nil {
werr = err
log.Warnf("toReadableFile: close error: %+v", err)
return
}
if copied != n {
log.Warnf("copied different amount than expected: %d != %d", copied, n)
werr = xerrors.Errorf("copied different amount than expected: %d != %d", copied, n)
}
}()
return f, func() error {
wait.Lock()
return werr
}, nil
}

View File

@ -3,6 +3,7 @@ package sectorbuilder_test
import (
"io"
"math/rand"
"os"
"testing"
"github.com/filecoin-project/lotus/build"
@ -12,27 +13,33 @@ import (
const sectorSize = 1024
func TestSealAndVerify(t *testing.T) {
t.Skip("this is slow")
//os.Setenv("BELLMAN_NO_GPU", "1")
//t.Skip("this is slow")
os.Setenv("BELLMAN_NO_GPU", "1")
build.SectorSizes = []uint64{sectorSize}
if err := build.GetParams(true); err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize)
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
defer cleanup()
_ = cleanup
//defer cleanup()
dlen := sectorbuilder.UserBytesForSectorSize(sectorSize)
r := io.LimitReader(rand.New(rand.NewSource(42)), int64(dlen))
sid, err := sb.AddPiece("foo", dlen, r)
sid, err := sb.AcquireSectorId()
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
r := io.LimitReader(rand.New(rand.NewSource(42)), int64(dlen))
ppi, err := sb.AddPiece(dlen, sid, r)
if err != nil {
t.Fatalf("%+v", err)
}
ticket := sectorbuilder.SealTicket{
@ -40,9 +47,9 @@ func TestSealAndVerify(t *testing.T) {
TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2},
}
pco, err := sb.SealPreCommit(sid, ticket)
pco, err := sb.SealPreCommit(sid, ticket, []sectorbuilder.PublicPieceInfo{ppi})
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
seed := sectorbuilder.SealSeed{
@ -50,14 +57,14 @@ func TestSealAndVerify(t *testing.T) {
TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9},
}
sco, err := sb.SealCommit(sid, seed)
proof, err := sb.SealCommit(sid, ticket, seed, []sectorbuilder.PublicPieceInfo{ppi}, []string{"foo"}, pco)
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
ok, err := sectorbuilder.VerifySeal(sectorSize, pco.CommR[:], pco.CommD[:], sb.Miner, ticket.TicketBytes[:], seed.TicketBytes[:], sid, sco.Proof)
ok, err := sectorbuilder.VerifySeal(sectorSize, pco.CommR[:], pco.CommD[:], sb.Miner, ticket.TicketBytes[:], seed.TicketBytes[:], sid, proof)
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
if !ok {