lotus/lib/sectorbuilder/sectorbuilder.go

263 lines
6.4 KiB
Go
Raw Normal View History

2019-07-27 00:45:27 +00:00
package sectorbuilder
import (
2019-09-23 10:50:28 +00:00
"io"
"os"
"sort"
2019-09-23 10:50:28 +00:00
"sync"
2019-07-27 00:45:27 +00:00
"unsafe"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
logging "github.com/ipfs/go-log"
2019-11-04 17:36:29 +00:00
"golang.org/x/xerrors"
2019-08-06 22:04:21 +00:00
"github.com/filecoin-project/lotus/chain/address"
2019-07-27 00:45:27 +00:00
)
2019-11-04 17:36:29 +00:00
const PoStReservedWorkers = 1
var log = logging.Logger("sectorbuilder")
type SectorSealingStatus = sectorbuilder.SectorSealingStatus
type StagedSectorMetadata = sectorbuilder.StagedSectorMetadata
2019-09-18 03:32:52 +00:00
type SortedSectorInfo = sectorbuilder.SortedSectorInfo
type SectorInfo = sectorbuilder.SectorInfo
2019-10-27 08:56:53 +00:00
type SealTicket = sectorbuilder.SealTicket
2019-10-30 18:10:29 +00:00
type SealSeed = sectorbuilder.SealSeed
type SealPreCommitOutput = sectorbuilder.SealPreCommitOutput
2019-10-30 18:10:29 +00:00
type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
2019-10-27 08:56:53 +00:00
2019-07-27 00:45:27 +00:00
const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct {
handle unsafe.Pointer
2019-11-06 23:09:48 +00:00
ssize uint64
2019-11-04 17:36:29 +00:00
Miner address.Address
2019-11-04 17:36:29 +00:00
rateLimit chan struct{}
2019-07-27 00:45:27 +00:00
}
type Config struct {
SectorSize uint64
Miner address.Address
WorkerThreads uint8
CacheDir string
2019-07-27 00:45:27 +00:00
SealedDir string
StagedDir string
MetadataDir string
}
func New(cfg *Config) (*SectorBuilder, error) {
2019-11-04 17:36:29 +00:00
if cfg.WorkerThreads <= PoStReservedWorkers {
2019-11-05 14:03:59 +00:00
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
2019-11-04 17:36:29 +00:00
}
proverId := addressToProverID(cfg.Miner)
2019-07-27 00:45:27 +00:00
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
2019-07-27 00:45:27 +00:00
if err != nil {
return nil, err
}
return &SectorBuilder{
2019-11-06 23:09:48 +00:00
handle: sbp,
ssize: cfg.SectorSize,
2019-11-05 18:40:51 +00:00
Miner: cfg.Miner,
2019-11-05 14:03:59 +00:00
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
2019-07-27 00:45:27 +00:00
}, nil
}
2019-11-04 17:36:29 +00:00
func (sb *SectorBuilder) rlimit() func() {
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting sectorbuilder call")
}
sb.rateLimit <- struct{}{}
return func() {
<-sb.rateLimit
}
}
2019-10-21 11:58:41 +00:00
func addressToProverID(a address.Address) [32]byte {
var proverId [32]byte
copy(proverId[:], a.Payload())
return proverId
}
2019-07-27 00:45:27 +00:00
func (sb *SectorBuilder) Destroy() {
sectorbuilder.DestroySectorBuilder(sb.handle)
}
2019-09-23 10:50:28 +00:00
func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, file io.Reader) (uint64, error) {
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
return 0, err
}
2019-11-04 17:36:29 +00:00
ret := sb.rlimit()
defer ret()
2019-09-23 10:50:28 +00:00
sectorID, err := sectorbuilder.AddPieceFromFile(sb.handle, pieceKey, pieceSize, f)
if err != nil {
return 0, err
}
return sectorID, werr()
2019-07-27 00:45:27 +00:00
}
// TODO: should *really really* return an io.ReadCloser
func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) {
2019-11-04 17:36:29 +00:00
ret := sb.rlimit()
defer ret()
2019-07-27 00:45:27 +00:00
return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey)
}
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket) (SealPreCommitOutput, error) {
2019-11-04 17:36:29 +00:00
ret := sb.rlimit()
defer ret()
return sectorbuilder.SealPreCommit(sb.handle, sectorID, ticket)
}
func (sb *SectorBuilder) SealCommit(sectorID uint64, seed SealSeed) (SealCommitOutput, error) {
2019-11-04 17:36:29 +00:00
ret := sb.rlimit()
defer ret()
2019-10-30 18:10:29 +00:00
return sectorbuilder.SealCommit(sb.handle, sectorID, seed)
2019-07-27 00:45:27 +00:00
}
2019-10-30 18:10:29 +00:00
func (sb *SectorBuilder) ResumeSealCommit(sectorID uint64) (SealCommitOutput, error) {
2019-11-04 17:36:29 +00:00
ret := sb.rlimit()
defer ret()
2019-10-30 18:10:29 +00:00
return sectorbuilder.ResumeSealCommit(sb.handle, sectorID)
}
func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) {
2019-07-27 00:45:27 +00:00
return sectorbuilder.GetSectorSealingStatusByID(sb.handle, sector)
}
func (sb *SectorBuilder) GetAllStagedSectors() ([]uint64, error) {
sectors, err := sectorbuilder.GetAllStagedSectors(sb.handle)
if err != nil {
return nil, err
}
out := make([]uint64, len(sectors))
for i, v := range sectors {
out[i] = v.SectorID
}
sort.Slice(out, func(i, j int) bool {
return out[i] < out[j]
})
return out, nil
}
2019-09-18 03:32:52 +00:00
func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
2019-07-27 00:45:27 +00:00
// Wait, this is a blocking method with no way of interrupting it?
// does it checkpoint itself?
return sectorbuilder.GeneratePoSt(sb.handle, sectorInfo, challengeSeed, faults)
2019-07-27 00:45:27 +00:00
}
2019-11-06 23:09:48 +00:00
func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
2019-10-30 18:10:29 +00:00
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
2019-10-21 11:58:41 +00:00
copy(ticketa[:], ticket)
2019-10-30 18:10:29 +00:00
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof)
}
func NewSortedSectorInfo(sectors []SectorInfo) SortedSectorInfo {
return sectorbuilder.NewSortedSectorInfo(sectors...)
}
func VerifyPost(sectorSize uint64, sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, proof []byte, faults []uint64) (bool, error) {
return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeed, proof, faults)
2019-07-27 00:45:27 +00:00
}
2019-09-23 10:50:28 +00:00
func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return [32]byte{}, err
}
commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize)
if err != nil {
return [32]byte{}, err
}
return commP, werr()
}
func GenerateDataCommitment(ssize uint64, pieces []PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
}
2019-09-23 10:50:28 +00:00
func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {
return f, func() error { return nil }, nil
}
var w *os.File
f, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
2019-09-23 11:43:32 +00:00
var wait sync.Mutex
2019-09-23 10:50:28 +00:00
var werr error
2019-09-23 11:43:32 +00:00
wait.Lock()
2019-09-23 10:50:28 +00:00
go func() {
2019-09-23 11:43:32 +00:00
defer wait.Unlock()
2019-09-23 10:50:28 +00:00
2019-11-06 12:04:33 +00:00
copied, werr := io.CopyN(w, r, n)
if werr != nil {
log.Warnf("toReadableFile: copy error: %+v", werr)
}
2019-09-23 10:50:28 +00:00
err := w.Close()
2019-11-06 23:09:48 +00:00
if werr == nil && err != nil {
2019-09-23 10:50:28 +00:00
werr = err
2019-11-06 12:04:33 +00:00
log.Warnf("toReadableFile: close error: %+v", err)
return
}
if copied != n {
log.Warnf("copied different amount than expected: %d != %d", copied, n)
werr = xerrors.Errorf("copied different amount than expected: %d != %d", copied, n)
2019-09-23 10:50:28 +00:00
}
}()
return f, func() error {
2019-09-23 11:43:32 +00:00
wait.Lock()
return werr
2019-09-23 10:50:28 +00:00
}, nil
}