lotus/lib/sectorbuilder/sectorbuilder.go

463 lines
12 KiB
Go
Raw Normal View History

2019-07-27 00:45:27 +00:00
package sectorbuilder
import (
2019-11-12 19:54:25 +00:00
"context"
"fmt"
2019-09-23 10:50:28 +00:00
"io"
"os"
"sort"
"strconv"
"sync"
2019-07-27 00:45:27 +00:00
"unsafe"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
2019-11-12 19:54:25 +00:00
"go.opencensus.io/trace"
2019-11-04 17:36:29 +00:00
"golang.org/x/xerrors"
2019-08-06 22:04:21 +00:00
2019-11-25 16:16:18 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2019-07-27 00:45:27 +00:00
)
2019-11-04 17:36:29 +00:00
const PoStReservedWorkers = 1
2019-11-07 16:39:27 +00:00
const PoRepProofPartitions = 2
2019-11-04 17:36:29 +00:00
var lastSectorIdKey = datastore.NewKey("/sectorbuilder/last")
var log = logging.Logger("sectorbuilder")
type SectorSealingStatus = sectorbuilder.SectorSealingStatus
type StagedSectorMetadata = sectorbuilder.StagedSectorMetadata
2019-11-25 04:45:13 +00:00
type SortedPublicSectorInfo = sectorbuilder.SortedPublicSectorInfo
type SortedPrivateSectorInfo = sectorbuilder.SortedPrivateSectorInfo
2019-09-18 03:32:52 +00:00
2019-11-25 04:45:13 +00:00
type PrivateSectorInfo = sectorbuilder.SectorPrivateInfo
type PublicSectorInfo = sectorbuilder.SectorPublicInfo
2019-10-27 08:56:53 +00:00
type SealTicket = sectorbuilder.SealTicket
2019-10-30 18:10:29 +00:00
type SealSeed = sectorbuilder.SealSeed
type SealPreCommitOutput = sectorbuilder.SealPreCommitOutput
2019-10-30 18:10:29 +00:00
type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
2019-10-27 08:56:53 +00:00
2019-11-07 16:39:27 +00:00
type RawSealPreCommitOutput = sectorbuilder.RawSealPreCommitOutput
2019-11-21 22:21:45 +00:00
type EPostCandidate = sectorbuilder.Candidate
2019-07-27 00:45:27 +00:00
const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct {
handle unsafe.Pointer
ds dtypes.MetadataDS
idLk sync.Mutex
ssize uint64
2019-11-04 17:36:29 +00:00
Miner address.Address
2019-11-07 16:39:27 +00:00
stagedDir string
sealedDir string
cacheDir string
2019-11-04 17:36:29 +00:00
rateLimit chan struct{}
2019-07-27 00:45:27 +00:00
}
type Config struct {
SectorSize uint64
Miner address.Address
WorkerThreads uint8
CacheDir string
2019-07-27 00:45:27 +00:00
SealedDir string
StagedDir string
MetadataDir string
}
func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
2019-11-04 17:36:29 +00:00
if cfg.WorkerThreads <= PoStReservedWorkers {
2019-11-05 14:03:59 +00:00
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
2019-11-04 17:36:29 +00:00
}
proverId := addressToProverID(cfg.Miner)
2019-07-27 00:45:27 +00:00
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} {
if err := os.Mkdir(dir, 0755); err != nil {
if os.IsExist(err) {
continue
}
return nil, err
}
}
var lastUsedID uint64
b, err := ds.Get(lastSectorIdKey)
switch err {
case nil:
i, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return nil, err
}
lastUsedID = uint64(i)
case datastore.ErrNotFound:
default:
return nil, err
}
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, lastUsedID, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
2019-07-27 00:45:27 +00:00
if err != nil {
return nil, err
}
sb := &SectorBuilder{
2019-11-06 23:09:48 +00:00
handle: sbp,
ds: ds,
ssize: cfg.SectorSize,
2019-11-06 23:09:48 +00:00
2019-11-07 16:39:27 +00:00
stagedDir: cfg.StagedDir,
sealedDir: cfg.SealedDir,
cacheDir: cfg.CacheDir,
2019-11-05 18:40:51 +00:00
Miner: cfg.Miner,
2019-11-05 14:03:59 +00:00
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
}
return sb, nil
2019-07-27 00:45:27 +00:00
}
func (sb *SectorBuilder) RateLimit() func() {
2019-11-04 17:36:29 +00:00
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting sectorbuilder call")
}
sb.rateLimit <- struct{}{}
return func() {
<-sb.rateLimit
}
}
2019-11-08 18:15:13 +00:00
func (sb *SectorBuilder) WorkerStats() (free, reserved, total int) {
return cap(sb.rateLimit) - len(sb.rateLimit), PoStReservedWorkers, cap(sb.rateLimit) + PoStReservedWorkers
}
2019-10-21 11:58:41 +00:00
func addressToProverID(a address.Address) [32]byte {
var proverId [32]byte
copy(proverId[:], a.Payload())
return proverId
}
2019-07-27 00:45:27 +00:00
func (sb *SectorBuilder) Destroy() {
sectorbuilder.DestroySectorBuilder(sb.handle)
}
2019-11-07 16:39:27 +00:00
func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
sb.idLk.Lock()
defer sb.idLk.Unlock()
id, err := sectorbuilder.AcquireSectorId(sb.handle)
if err != nil {
return 0, err
}
err = sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id)))
if err != nil {
return 0, err
}
return id, nil
2019-11-07 16:39:27 +00:00
}
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
2019-09-23 10:50:28 +00:00
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
2019-11-07 16:39:27 +00:00
return PublicPieceInfo{}, err
2019-09-23 10:50:28 +00:00
}
ret := sb.RateLimit()
2019-11-04 17:36:29 +00:00
defer ret()
2019-11-07 16:39:27 +00:00
stagedFile, err := sb.stagedSectorFile(sectorId)
2019-09-23 10:50:28 +00:00
if err != nil {
2019-11-07 16:39:27 +00:00
return PublicPieceInfo{}, err
}
_, _, commP, err := sectorbuilder.StandaloneWriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes)
2019-11-07 16:39:27 +00:00
if err != nil {
return PublicPieceInfo{}, err
}
if err := stagedFile.Close(); err != nil {
return PublicPieceInfo{}, err
}
if err := f.Close(); err != nil {
return PublicPieceInfo{}, err
2019-09-23 10:50:28 +00:00
}
2019-11-07 16:39:27 +00:00
return PublicPieceInfo{
Size: pieceSize,
CommP: commP,
}, werr()
2019-07-27 00:45:27 +00:00
}
// TODO: should *really really* return an io.ReadCloser
func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) {
ret := sb.RateLimit()
2019-11-04 17:36:29 +00:00
defer ret()
2019-07-27 00:45:27 +00:00
return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey)
}
2019-11-07 16:39:27 +00:00
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
ret := sb.RateLimit()
2019-11-04 17:36:29 +00:00
defer ret()
2019-11-07 16:39:27 +00:00
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
}
2019-11-07 16:39:27 +00:00
sealedPath, err := sb.sealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, err
}
2019-11-04 17:36:29 +00:00
2019-11-07 19:54:24 +00:00
var sum uint64
for _, piece := range pieces {
sum += piece.Size
}
ussize := UserBytesForSectorSize(sb.ssize)
if sum != ussize {
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
2019-11-07 19:54:24 +00:00
}
stagedPath := sb.stagedSectorPath(sectorID)
rspco, err := sectorbuilder.StandaloneSealPreCommit(
2019-11-07 16:39:27 +00:00
sb.ssize,
PoRepProofPartitions,
cacheDir,
2019-11-07 19:54:24 +00:00
stagedPath,
2019-11-07 16:39:27 +00:00
sealedPath,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
pieces,
)
2019-11-07 19:54:24 +00:00
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
return rspco, nil
2019-07-27 00:45:27 +00:00
}
2019-11-07 16:39:27 +00:00
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, pieceKeys []string, rspco RawSealPreCommitOutput) (proof []byte, err error) {
ret := sb.RateLimit()
2019-11-04 17:36:29 +00:00
defer ret()
2019-11-07 16:39:27 +00:00
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
proof, err = sectorbuilder.StandaloneSealCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
seed.TicketBytes,
pieces,
rspco,
)
if err != nil {
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
pmeta := make([]sectorbuilder.PieceMetadata, len(pieces))
for i, piece := range pieces {
pmeta[i] = sectorbuilder.PieceMetadata{
Key: pieceKeys[i],
Size: piece.Size,
CommP: piece.CommP,
}
}
2019-11-26 02:43:43 +00:00
/* sealedPath, err := sb.sealedSectorPath(sectorID)
if err != nil {
return nil, err
}
2019-11-07 16:39:27 +00:00
2019-11-26 02:43:43 +00:00
err = sectorbuilder.ImportSealedSector(
sb.handle,
sectorID,
cacheDir,
sealedPath,
ticket,
seed,
rspco.CommR,
rspco.CommD,
rspco.CommC,
rspco.CommRLast,
proof,
pmeta,
)
if err != nil {
return nil, xerrors.Errorf("ImportSealedSector: %w", err)
}*/
2019-11-07 16:39:27 +00:00
return proof, nil
}
func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) {
2019-07-27 00:45:27 +00:00
return sectorbuilder.GetSectorSealingStatusByID(sb.handle, sector)
}
func (sb *SectorBuilder) GetAllStagedSectors() ([]uint64, error) {
sectors, err := sectorbuilder.GetAllStagedSectors(sb.handle)
if err != nil {
return nil, err
}
out := make([]uint64, len(sectors))
for i, v := range sectors {
out[i] = v.SectorID
}
sort.Slice(out, func(i, j int) bool {
return out[i] < out[j]
})
return out, nil
}
2019-11-21 22:21:45 +00:00
/*
2019-09-18 03:32:52 +00:00
func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
2019-07-27 00:45:27 +00:00
// Wait, this is a blocking method with no way of interrupting it?
// does it checkpoint itself?
return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults)
2019-07-27 00:45:27 +00:00
}
2019-11-21 22:21:45 +00:00
*/
2019-07-27 00:45:27 +00:00
2019-11-06 23:09:48 +00:00
func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize
}
2019-11-25 04:45:13 +00:00
func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) {
2019-11-21 22:21:45 +00:00
if len(challengeSeed) != CommLen {
return nil, xerrors.Errorf("given challenge seed was the wrong length: %d != %d", len(challengeSeed), CommLen)
}
var cseed [CommLen]byte
copy(cseed[:], challengeSeed)
2019-11-26 02:43:43 +00:00
privsects, err := sb.pubSectorToPriv(sectorInfo)
if err != nil {
return nil, err
}
2019-11-25 16:16:18 +00:00
2019-11-26 02:43:43 +00:00
proverID := addressToProverID(sb.Miner)
return sectorbuilder.StandaloneGeneratePoSt(sb.ssize, proverID, privsects, cseed, winners)
2019-11-21 22:21:45 +00:00
}
2019-11-25 04:45:13 +00:00
func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo)
if err != nil {
return nil, err
}
2019-11-25 16:16:18 +00:00
challengeCount := challangeCount(uint64(len(sectorInfo.Values())))
2019-11-25 04:45:13 +00:00
proverID := addressToProverID(sb.Miner)
2019-11-25 16:16:18 +00:00
return sectorbuilder.StandaloneGenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
2019-11-25 04:45:13 +00:00
}
func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo) (SortedPrivateSectorInfo, error) {
var out []PrivateSectorInfo
for _, s := range sectorInfo.Values() {
cachePath, err := sb.sectorCacheDir(s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache path for sector %d: %w", s.SectorID, err)
}
sealedPath, err := sb.sealedSectorPath(s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err)
}
out = append(out, PrivateSectorInfo{
SectorID: s.SectorID,
CommR: s.CommR,
CacheDirPath: cachePath,
SealedSectorPath: sealedPath,
})
}
return NewSortedPrivateSectorInfo(out), nil
2019-11-21 22:21:45 +00:00
}
2019-11-25 04:45:13 +00:00
func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPrivateSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
2019-11-21 22:21:45 +00:00
panic("NYI")
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
2019-10-30 18:10:29 +00:00
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
2019-10-21 11:58:41 +00:00
copy(ticketa[:], ticket)
2019-10-30 18:10:29 +00:00
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof)
}
2019-11-25 04:45:13 +00:00
func NewSortedPrivateSectorInfo(sectors []PrivateSectorInfo) SortedPrivateSectorInfo {
return sectorbuilder.NewSortedSectorPrivateInfo(sectors...)
}
func NewSortedPublicSectorInfo(sectors []PublicSectorInfo) SortedPublicSectorInfo {
return sectorbuilder.NewSortedSectorPublicInfo(sectors...)
}
2019-11-25 04:45:13 +00:00
func VerifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, winners []EPostCandidate, proverID address.Address) (bool, error) {
2019-11-21 22:21:45 +00:00
var challengeSeeda [CommLen]byte
copy(challengeSeeda[:], challengeSeed)
2019-11-25 16:16:18 +00:00
challengeCount := challangeCount(uint64(len(sectorInfo.Values())))
2019-11-12 19:54:25 +00:00
_, span := trace.StartSpan(ctx, "VerifyPoSt")
defer span.End()
2019-11-21 22:21:45 +00:00
prover := addressToProverID(proverID)
2019-11-25 16:16:18 +00:00
return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeeda, challengeCount, proof, winners, prover)
2019-07-27 00:45:27 +00:00
}
2019-09-23 10:50:28 +00:00
func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return [32]byte{}, err
}
commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize)
if err != nil {
return [32]byte{}, err
}
return commP, werr()
}
func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
}
2019-11-25 16:16:18 +00:00
func challangeCount(sectors uint64) uint64 {
// ceil(sectors / build.SectorChallengeRatioDiv)
return (sectors + build.SectorChallengeRatioDiv - 1) / build.SectorChallengeRatioDiv
}