sectorbuilder: Drop stateful sectorbuilder refs

This commit is contained in:
Łukasz Magiera 2019-11-26 23:13:01 +01:00
parent 8297feb1d3
commit d5a632f644
4 changed files with 18 additions and 67 deletions

View File

@ -17,8 +17,6 @@ func TempSectorbuilder(sectorSize uint64, ds dtypes.MetadataDS) (*SectorBuilder,
sb, err := TempSectorbuilderDir(dir, sectorSize, ds) sb, err := TempSectorbuilderDir(dir, sectorSize, ds)
return sb, func() { return sb, func() {
sb.Destroy()
if err := os.RemoveAll(dir); err != nil { if err := os.RemoveAll(dir); err != nil {
log.Warn("failed to clean up temp sectorbuilder: ", err) log.Warn("failed to clean up temp sectorbuilder: ", err)
} }

View File

@ -3,18 +3,15 @@ package sectorbuilder
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"os"
"sort"
"strconv"
"sync"
"unsafe"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder" sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"io"
"os"
"strconv"
"sync"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
@ -55,11 +52,11 @@ type EPostCandidate = sectorbuilder.Candidate
const CommLen = sectorbuilder.CommitmentBytesLen const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct { type SectorBuilder struct {
handle unsafe.Pointer ds dtypes.MetadataDS
ds dtypes.MetadataDS idLk sync.Mutex
idLk sync.Mutex
ssize uint64 ssize uint64
lastID uint64
Miner address.Address Miner address.Address
@ -87,8 +84,6 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads) return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
} }
proverId := addressToProverID(cfg.Miner)
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} { for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} {
if err := os.Mkdir(dir, 0755); err != nil { if err := os.Mkdir(dir, 0755); err != nil {
if os.IsExist(err) { if os.IsExist(err) {
@ -112,16 +107,11 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
return nil, err return nil, err
} }
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, lastUsedID, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
if err != nil {
return nil, err
}
sb := &SectorBuilder{ sb := &SectorBuilder{
handle: sbp, ds: ds,
ds: ds,
ssize: cfg.SectorSize, ssize: cfg.SectorSize,
lastID: lastUsedID,
stagedDir: cfg.StagedDir, stagedDir: cfg.StagedDir,
sealedDir: cfg.SealedDir, sealedDir: cfg.SealedDir,
@ -155,19 +145,14 @@ func addressToProverID(a address.Address) [32]byte {
return proverId return proverId
} }
func (sb *SectorBuilder) Destroy() {
sectorbuilder.DestroySectorBuilder(sb.handle)
}
func (sb *SectorBuilder) AcquireSectorId() (uint64, error) { func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
sb.idLk.Lock() sb.idLk.Lock()
defer sb.idLk.Unlock() defer sb.idLk.Unlock()
id, err := sectorbuilder.AcquireSectorId(sb.handle) sb.lastID++
if err != nil { id := sb.lastID
return 0, err
} err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id)))
err = sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id)))
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -212,7 +197,8 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, err
ret := sb.RateLimit() ret := sb.RateLimit()
defer ret() defer ret()
return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey) panic("fixme")
//return sectorbuilder.StandaloneUnseal(sb.handle, pieceKey)
} }
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
@ -285,28 +271,6 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
return proof, nil return proof, nil
} }
func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) {
return sectorbuilder.GetSectorSealingStatusByID(sb.handle, sector)
}
func (sb *SectorBuilder) GetAllStagedSectors() ([]uint64, error) {
sectors, err := sectorbuilder.GetAllStagedSectors(sb.handle)
if err != nil {
return nil, err
}
out := make([]uint64, len(sectors))
for i, v := range sectors {
out[i] = v.SectorID
}
sort.Slice(out, func(i, j int) bool {
return out[i] < out[j]
})
return out, nil
}
/* /*
func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
// Wait, this is a blocking method with no way of interrupting it? // Wait, this is a blocking method with no way of interrupting it?

View File

@ -137,7 +137,6 @@ func TestSealAndVerify(t *testing.T) {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
cleanup := func() { cleanup := func() {
sb.Destroy()
if t.Failed() { if t.Failed() {
fmt.Printf("not removing %s\n", dir) fmt.Printf("not removing %s\n", dir)
return return
@ -170,7 +169,6 @@ func TestSealAndVerify(t *testing.T) {
epost := time.Now() epost := time.Now()
// Restart sectorbuilder, re-run post // Restart sectorbuilder, re-run post
sb.Destroy()
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
@ -209,7 +207,6 @@ func TestSealPoStNoCommit(t *testing.T) {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
cleanup := func() { cleanup := func() {
sb.Destroy()
if t.Failed() { if t.Failed() {
fmt.Printf("not removing %s\n", dir) fmt.Printf("not removing %s\n", dir)
return return
@ -234,7 +231,6 @@ func TestSealPoStNoCommit(t *testing.T) {
precommit := time.Now() precommit := time.Now()
// Restart sectorbuilder, re-run post // Restart sectorbuilder, re-run post
sb.Destroy()
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds) sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)

View File

@ -200,19 +200,12 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode,
return m, nil return m, nil
} }
func SectorBuilder(lc fx.Lifecycle, cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuilder.SectorBuilder, error) { func SectorBuilder(cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuilder.SectorBuilder, error) {
sb, err := sectorbuilder.New(cfg, ds) sb, err := sectorbuilder.New(cfg, ds)
if err != nil { if err != nil {
return nil, err return nil, err
} }
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
sb.Destroy()
return nil
},
})
return sb, nil return sb, nil
} }