lotus/lib/sectorbuilder/sectorbuilder.go

791 lines
18 KiB
Go
Raw Normal View History

2019-07-27 00:45:27 +00:00
package sectorbuilder
import (
"fmt"
2019-09-23 10:50:28 +00:00
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync"
2019-11-21 16:10:04 +00:00
"sync/atomic"
2019-07-27 00:45:27 +00:00
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
2019-12-08 21:53:48 +00:00
dcopy "github.com/otiai10/copy"
2019-11-04 17:36:29 +00:00
"golang.org/x/xerrors"
2019-08-06 22:04:21 +00:00
2019-11-25 16:16:18 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2019-07-27 00:45:27 +00:00
)
2019-11-04 17:36:29 +00:00
const PoStReservedWorkers = 1
const PoRepProofPartitions = 10
2019-11-04 17:36:29 +00:00
2019-12-01 04:02:52 +00:00
var lastSectorIdKey = datastore.NewKey("/sectorbuilder/last")
var log = logging.Logger("sectorbuilder")
2019-11-25 04:45:13 +00:00
type SortedPublicSectorInfo = sectorbuilder.SortedPublicSectorInfo
type SortedPrivateSectorInfo = sectorbuilder.SortedPrivateSectorInfo
2019-10-27 08:56:53 +00:00
type SealTicket = sectorbuilder.SealTicket
2019-10-30 18:10:29 +00:00
type SealSeed = sectorbuilder.SealSeed
type SealPreCommitOutput = sectorbuilder.SealPreCommitOutput
2019-10-30 18:10:29 +00:00
type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
2019-10-27 08:56:53 +00:00
2019-11-22 15:48:02 +00:00
type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput
2019-11-07 16:39:27 +00:00
2019-11-21 22:21:45 +00:00
type EPostCandidate = sectorbuilder.Candidate
2019-07-27 00:45:27 +00:00
const CommLen = sectorbuilder.CommitmentBytesLen
type WorkerCfg struct {
NoPreCommit bool
NoCommit bool
// TODO: 'cost' info, probably in terms of sealing + transfer speed
}
2019-07-27 00:45:27 +00:00
type SectorBuilder struct {
ds dtypes.MetadataDS
idLk sync.Mutex
ssize uint64
lastID uint64
2019-11-04 17:36:29 +00:00
Miner address.Address
2019-12-01 17:58:31 +00:00
unsealLk sync.Mutex
2019-11-07 16:39:27 +00:00
noCommit bool
noPreCommit bool
rateLimit chan struct{}
2019-11-21 00:52:59 +00:00
precommitTasks chan workerCall
commitTasks chan workerCall
2019-11-21 00:52:59 +00:00
2019-11-21 16:10:04 +00:00
taskCtr uint64
2019-11-21 00:52:59 +00:00
remoteLk sync.Mutex
2019-11-21 18:38:43 +00:00
remoteCtr int
remotes map[int]*remote
2019-11-21 00:52:59 +00:00
remoteResults map[uint64]chan<- SealRes
addPieceWait int32
preCommitWait int32
commitWait int32
unsealWait int32
fsLk sync.Mutex
filesystem *fs // TODO: multi-fs support
2019-11-21 00:52:59 +00:00
stopping chan struct{}
}
2019-11-22 15:48:02 +00:00
type JsonRSPCO struct {
CommD []byte
CommR []byte
2019-11-22 15:48:02 +00:00
}
func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO {
return JsonRSPCO{
CommD: rspco.CommD[:],
CommR: rspco.CommR[:],
2019-11-22 15:48:02 +00:00
}
}
func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput {
var out RawSealPreCommitOutput
copy(out.CommD[:], rspco.CommD)
copy(out.CommR[:], rspco.CommR)
return out
}
2019-11-21 00:52:59 +00:00
type SealRes struct {
2019-12-04 16:53:32 +00:00
Err string
2019-11-30 13:22:50 +00:00
GoErr error `json:"-"`
2019-11-21 00:52:59 +00:00
2019-11-22 15:48:02 +00:00
Proof []byte
Rspco JsonRSPCO
2019-11-21 00:52:59 +00:00
}
type remote struct {
lk sync.Mutex
sealTasks chan<- WorkerTask
busy uint64 // only for metrics
2019-07-27 00:45:27 +00:00
}
type Config struct {
SectorSize uint64
Miner address.Address
WorkerThreads uint8
FallbackLastID uint64
NoCommit bool
NoPreCommit bool
Dir string
_ struct{} // guard against nameless init
2019-07-27 00:45:27 +00:00
}
func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
2019-11-21 00:52:59 +00:00
if cfg.WorkerThreads < PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers, cfg.WorkerThreads)
2019-11-04 17:36:29 +00:00
}
var lastUsedID uint64
b, err := ds.Get(lastSectorIdKey)
switch err {
case nil:
i, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return nil, err
}
lastUsedID = uint64(i)
case datastore.ErrNotFound:
lastUsedID = cfg.FallbackLastID
default:
return nil, err
}
2019-11-21 00:52:59 +00:00
rlimit := cfg.WorkerThreads - PoStReservedWorkers
sealLocal := rlimit > 0
if rlimit == 0 {
rlimit = 1
}
sb := &SectorBuilder{
ds: ds,
ssize: cfg.SectorSize,
lastID: lastUsedID,
2019-11-06 23:09:48 +00:00
filesystem: openFs(cfg.Dir),
2019-11-07 16:39:27 +00:00
2019-11-21 00:52:59 +00:00
Miner: cfg.Miner,
noPreCommit: cfg.NoPreCommit || !sealLocal,
noCommit: cfg.NoCommit || !sealLocal,
rateLimit: make(chan struct{}, rlimit),
2019-11-21 00:52:59 +00:00
taskCtr: 1,
precommitTasks: make(chan workerCall),
commitTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
remotes: map[int]*remote{},
stopping: make(chan struct{}),
}
if err := sb.filesystem.init(); err != nil {
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
}
return sb, nil
2019-07-27 00:45:27 +00:00
}
func NewStandalone(cfg *Config) (*SectorBuilder, error) {
sb := &SectorBuilder{
2019-12-04 16:53:32 +00:00
ds: nil,
2019-12-04 16:53:32 +00:00
ssize: cfg.SectorSize,
Miner: cfg.Miner,
filesystem: openFs(cfg.Dir),
2019-11-21 16:10:04 +00:00
taskCtr: 1,
2019-11-21 18:38:43 +00:00
remotes: map[int]*remote{},
rateLimit: make(chan struct{}, cfg.WorkerThreads),
stopping: make(chan struct{}),
}
if err := sb.filesystem.init(); err != nil {
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
}
return sb, nil
}
2019-11-21 16:10:04 +00:00
func (sb *SectorBuilder) checkRateLimit() {
2019-11-04 17:36:29 +00:00
if cap(sb.rateLimit) == len(sb.rateLimit) {
2019-11-21 16:10:04 +00:00
log.Warn("rate-limiting local sectorbuilder call")
2019-11-04 17:36:29 +00:00
}
2019-11-21 16:10:04 +00:00
}
func (sb *SectorBuilder) RateLimit() func() {
sb.checkRateLimit()
2019-11-04 17:36:29 +00:00
sb.rateLimit <- struct{}{}
return func() {
<-sb.rateLimit
}
}
2019-11-21 16:23:42 +00:00
type WorkerStats struct {
2019-11-21 18:38:43 +00:00
LocalFree int
2019-11-21 16:23:42 +00:00
LocalReserved int
2019-11-21 18:38:43 +00:00
LocalTotal int
2019-11-21 16:23:42 +00:00
// todo: post in progress
RemotesTotal int
2019-11-21 18:38:43 +00:00
RemotesFree int
AddPieceWait int
PreCommitWait int
CommitWait int
UnsealWait int
2019-11-21 16:23:42 +00:00
}
func (sb *SectorBuilder) WorkerStats() WorkerStats {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
remoteFree := len(sb.remotes)
for _, r := range sb.remotes {
if r.busy > 0 {
remoteFree--
}
}
return WorkerStats{
LocalFree: cap(sb.rateLimit) - len(sb.rateLimit),
LocalReserved: PoStReservedWorkers,
LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers,
RemotesTotal: len(sb.remotes),
RemotesFree: remoteFree,
AddPieceWait: int(atomic.LoadInt32(&sb.addPieceWait)),
PreCommitWait: int(atomic.LoadInt32(&sb.preCommitWait)),
CommitWait: int(atomic.LoadInt32(&sb.commitWait)),
UnsealWait: int(atomic.LoadInt32(&sb.unsealWait)),
2019-11-21 16:23:42 +00:00
}
2019-11-08 18:15:13 +00:00
}
2019-10-21 11:58:41 +00:00
func addressToProverID(a address.Address) [32]byte {
var proverId [32]byte
copy(proverId[:], a.Payload())
return proverId
}
2019-11-07 16:39:27 +00:00
func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
sb.idLk.Lock()
defer sb.idLk.Unlock()
sb.lastID++
id := sb.lastID
2019-12-01 04:02:52 +00:00
err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id)))
if err != nil {
return 0, err
}
return id, nil
2019-11-07 16:39:27 +00:00
}
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
fs := sb.filesystem
if err := fs.reserve(dataStaging, sb.ssize); err != nil {
return PublicPieceInfo{}, err
}
defer fs.free(dataStaging, sb.ssize)
atomic.AddInt32(&sb.addPieceWait, 1)
ret := sb.RateLimit()
atomic.AddInt32(&sb.addPieceWait, -1)
defer ret()
2019-09-23 10:50:28 +00:00
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
2019-11-07 16:39:27 +00:00
return PublicPieceInfo{}, err
2019-09-23 10:50:28 +00:00
}
2019-11-07 16:39:27 +00:00
stagedFile, err := sb.stagedSectorFile(sectorId)
2019-09-23 10:50:28 +00:00
if err != nil {
2019-11-07 16:39:27 +00:00
return PublicPieceInfo{}, err
}
_, _, commP, err := sectorbuilder.WriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes)
2019-11-07 16:39:27 +00:00
if err != nil {
return PublicPieceInfo{}, err
}
if err := stagedFile.Close(); err != nil {
return PublicPieceInfo{}, err
}
if err := f.Close(); err != nil {
return PublicPieceInfo{}, err
2019-09-23 10:50:28 +00:00
}
2019-11-07 16:39:27 +00:00
return PublicPieceInfo{
Size: pieceSize,
CommP: commP,
}, werr()
2019-07-27 00:45:27 +00:00
}
2019-12-01 17:58:31 +00:00
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
fs := sb.filesystem
if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals
return nil, err
}
defer fs.free(dataUnsealed, sb.ssize)
atomic.AddInt32(&sb.unsealWait, 1)
// TODO: Don't wait if cached
2019-12-01 17:58:31 +00:00
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
2019-11-04 17:36:29 +00:00
defer ret()
atomic.AddInt32(&sb.unsealWait, -1)
2019-11-04 17:36:29 +00:00
2019-12-01 17:58:31 +00:00
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Unlock()
2019-12-01 17:58:31 +00:00
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
sealedPath, err := sb.SealedSectorPath(sectorID)
2019-12-01 17:58:31 +00:00
if err != nil {
return nil, err
}
unsealedPath := sb.unsealedSectorPath(sectorID)
// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
var commd [CommLen]byte
copy(commd[:], commD)
var tkt [CommLen]byte
copy(tkt[:], ticket)
err = sectorbuilder.Unseal(sb.ssize,
PoRepProofPartitions,
cacheDir,
sealedPath,
unsealedPath,
sectorID,
addressToProverID(sb.Miner),
tkt,
commd)
if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
}
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
2019-07-27 00:45:27 +00:00
}
2019-11-21 16:10:04 +00:00
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
atomic.AddInt32(&sb.preCommitWait, -1)
2019-11-21 16:10:04 +00:00
select {
case ret := <-call.ret:
2019-11-30 09:25:31 +00:00
var err error
if ret.Err != "" {
err = xerrors.New(ret.Err)
}
return ret.Rspco.rspco(), err
2019-11-21 16:10:04 +00:00
case <-sb.stopping:
return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped")
}
}
2019-11-07 16:39:27 +00:00
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
fs := sb.filesystem
if err := fs.reserve(dataCache, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataCache, sb.ssize)
if err := fs.reserve(dataSealed, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataSealed, sb.ssize)
2019-11-21 16:10:04 +00:00
call := workerCall{
task: WorkerTask{
Type: WorkerPreCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.preCommitWait, 1)
2019-11-21 16:10:04 +00:00
select { // prefer remote
case sb.precommitTasks <- call:
2019-11-21 16:10:04 +00:00
return sb.sealPreCommitRemote(call)
default:
}
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noPreCommit {
rl = make(chan struct{})
}
2019-11-21 16:10:04 +00:00
select { // use whichever is available
case sb.precommitTasks <- call:
2019-11-21 16:10:04 +00:00
return sb.sealPreCommitRemote(call)
case rl <- struct{}{}:
2019-11-21 16:10:04 +00:00
}
atomic.AddInt32(&sb.preCommitWait, -1)
2019-11-21 16:10:04 +00:00
// local
defer func() {
<-sb.rateLimit
}()
2019-11-04 17:36:29 +00:00
2019-11-07 16:39:27 +00:00
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
2019-11-22 15:48:02 +00:00
return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err)
2019-11-07 16:39:27 +00:00
}
2019-11-21 00:52:59 +00:00
sealedPath, err := sb.SealedSectorPath(sectorID)
2019-11-07 16:39:27 +00:00
if err != nil {
2019-11-22 15:48:02 +00:00
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err)
2019-11-07 16:39:27 +00:00
}
2019-11-04 17:36:29 +00:00
e, err := os.OpenFile(sealedPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return RawSealPreCommitOutput{}, err
}
2019-11-07 19:54:24 +00:00
var sum uint64
for _, piece := range pieces {
sum += piece.Size
}
ussize := UserBytesForSectorSize(sb.ssize)
if sum != ussize {
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
2019-11-07 19:54:24 +00:00
}
2019-11-21 00:52:59 +00:00
stagedPath := sb.StagedSectorPath(sectorID)
2019-11-07 19:54:24 +00:00
rspco, err := sectorbuilder.SealPreCommit(
2019-11-07 16:39:27 +00:00
sb.ssize,
PoRepProofPartitions,
cacheDir,
2019-11-07 19:54:24 +00:00
stagedPath,
2019-11-07 16:39:27 +00:00
sealedPath,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
pieces,
)
2019-11-07 19:54:24 +00:00
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
2019-11-22 15:48:02 +00:00
return RawSealPreCommitOutput(rspco), nil
2019-07-27 00:45:27 +00:00
}
2019-11-21 19:51:48 +00:00
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
2019-11-21 19:51:48 +00:00
select {
case ret := <-call.ret:
2019-11-30 09:25:31 +00:00
if ret.Err != "" {
err = xerrors.New(ret.Err)
}
return ret.Proof, err
2019-11-21 19:51:48 +00:00
case <-sb.stopping:
return nil, xerrors.New("sectorbuilder stopped")
}
}
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
2019-11-21 19:51:48 +00:00
defer func() {
<-sb.rateLimit
}()
2019-11-04 17:36:29 +00:00
2019-11-07 16:39:27 +00:00
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
proof, err = sectorbuilder.SealCommit(
2019-11-07 16:39:27 +00:00
sb.ssize,
PoRepProofPartitions,
cacheDir,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
seed.TicketBytes,
pieces,
2019-11-22 15:48:02 +00:00
sectorbuilder.RawSealPreCommitOutput(rspco),
2019-11-07 16:39:27 +00:00
)
if err != nil {
2019-11-22 15:48:02 +00:00
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco)
2019-11-07 16:39:27 +00:00
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
2019-11-21 19:51:48 +00:00
return proof, nil
}
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
2019-11-21 19:51:48 +00:00
call := workerCall{
task: WorkerTask{
Type: WorkerCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
SealSeed: seed,
Rspco: rspco,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.commitWait, 1)
2019-11-21 19:51:48 +00:00
select { // prefer remote
case sb.commitTasks <- call:
2019-11-21 19:51:48 +00:00
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noCommit {
rl = make(chan struct{})
}
2019-11-21 19:51:48 +00:00
select { // use whichever is available
case sb.commitTasks <- call:
2019-11-21 19:51:48 +00:00
proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}:
2019-11-21 19:51:48 +00:00
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
}
}
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}
return proof, nil
2019-11-06 23:09:48 +00:00
}
2019-11-22 15:48:02 +00:00
2019-11-25 04:45:13 +00:00
func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) {
2019-11-21 22:21:45 +00:00
if len(challengeSeed) != CommLen {
return nil, xerrors.Errorf("given challenge seed was the wrong length: %d != %d", len(challengeSeed), CommLen)
2019-11-07 16:39:27 +00:00
}
2019-11-21 22:21:45 +00:00
var cseed [CommLen]byte
copy(cseed[:], challengeSeed)
2019-11-07 16:39:27 +00:00
2019-11-26 02:43:43 +00:00
privsects, err := sb.pubSectorToPriv(sectorInfo)
2019-11-07 16:39:27 +00:00
if err != nil {
return nil, err
}
2019-11-26 02:43:43 +00:00
proverID := addressToProverID(sb.Miner)
2019-11-30 23:17:50 +00:00
2019-11-27 22:34:48 +00:00
return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsects, cseed, winners)
2019-11-21 22:21:45 +00:00
}
2019-11-25 04:45:13 +00:00
func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo)
2019-11-21 19:51:48 +00:00
if err != nil {
return nil, err
}
challengeCount := types.ElectionPostChallengeCount(uint64(len(sectorInfo.Values())))
2019-11-25 16:16:18 +00:00
2019-11-25 04:45:13 +00:00
proverID := addressToProverID(sb.Miner)
2019-11-27 22:34:48 +00:00
return sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
2019-11-25 04:45:13 +00:00
}
func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo) (SortedPrivateSectorInfo, error) {
var out []sectorbuilder.PrivateSectorInfo
2019-11-25 04:45:13 +00:00
for _, s := range sectorInfo.Values() {
cachePath, err := sb.sectorCacheDir(s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache path for sector %d: %w", s.SectorID, err)
}
sealedPath, err := sb.SealedSectorPath(s.SectorID)
2019-11-25 04:45:13 +00:00
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err)
}
out = append(out, sectorbuilder.PrivateSectorInfo{
2019-11-25 04:45:13 +00:00
SectorID: s.SectorID,
CommR: s.CommR,
CacheDirPath: cachePath,
SealedSectorPath: sealedPath,
})
}
return NewSortedPrivateSectorInfo(out), nil
2019-11-21 22:21:45 +00:00
}
func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo)
2019-11-07 16:39:27 +00:00
if err != nil {
return nil, nil, err
2019-11-07 16:39:27 +00:00
}
2019-11-28 12:46:56 +00:00
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())))
proverID := addressToProverID(sb.Miner)
candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
if err != nil {
return nil, nil, err
}
proof, err := sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates)
return candidates, proof, err
}
2019-11-21 00:52:59 +00:00
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
}
2019-11-25 16:16:18 +00:00
2019-11-28 12:46:56 +00:00
func fallbackPostChallengeCount(sectors uint64) uint64 {
challengeCount := types.ElectionPostChallengeCount(sectors)
2019-11-28 12:46:56 +00:00
if challengeCount > build.MaxFallbackPostChallengeCount {
return build.MaxFallbackPostChallengeCount
}
return challengeCount
}
2019-11-30 23:17:50 +00:00
func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder, symlink bool) error {
2019-12-16 18:49:32 +00:00
if err := migrate(osb.filesystem.pathFor(dataCache), sb.filesystem.pathFor(dataCache), symlink); err != nil {
2019-11-30 23:17:50 +00:00
return err
}
2019-12-16 18:49:32 +00:00
if err := migrate(osb.filesystem.pathFor(dataStaging), sb.filesystem.pathFor(dataStaging), symlink); err != nil {
2019-11-30 23:17:50 +00:00
return err
}
2019-12-16 18:49:32 +00:00
if err := migrate(osb.filesystem.pathFor(dataSealed), sb.filesystem.pathFor(dataSealed), symlink); err != nil {
2019-11-30 23:17:50 +00:00
return err
}
2019-12-01 04:02:52 +00:00
val, err := osb.ds.Get(lastSectorIdKey)
2019-11-30 23:17:50 +00:00
if err != nil {
return err
}
2019-12-01 04:02:52 +00:00
if err := sb.ds.Put(lastSectorIdKey, val); err != nil {
2019-11-30 23:17:50 +00:00
return err
}
sb.lastID = osb.lastID
return nil
}
2019-12-08 20:06:40 +00:00
func (sb *SectorBuilder) SetLastSectorID(id uint64) error {
if err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id))); err != nil {
return err
}
sb.lastID = id
return nil
}
func migrate(from, to string, symlink bool) error {
st, err := os.Stat(from)
if err != nil {
return err
}
if st.IsDir() {
return migrateDir(from, to, symlink)
}
return migrateFile(from, to, symlink)
}
func migrateDir(from, to string, symlink bool) error {
tost, err := os.Stat(to)
if err != nil {
if !os.IsNotExist(err) {
return err
}
if err := os.MkdirAll(to, 0755); err != nil {
return err
}
} else if !tost.IsDir() {
return xerrors.Errorf("target %q already exists and is a file (expected directory)")
}
dirents, err := ioutil.ReadDir(from)
if err != nil {
return err
}
for _, inf := range dirents {
n := inf.Name()
if inf.IsDir() {
if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
} else {
if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
}
}
return nil
}
func migrateFile(from, to string, symlink bool) error {
if symlink {
return os.Symlink(from, to)
}
return dcopy.Copy(from, to)
}