Merge pull request #934 from filecoin-project/feat/sb-diskmgr
sectorbuilder: Check free space before creating sectors
This commit is contained in:
commit
3fda442bb3
@ -3,7 +3,7 @@ package build
|
||||
var CurrentCommit string
|
||||
|
||||
// Version is the local build version, set by build system
|
||||
const Version = "0.1.1"
|
||||
const Version = "0.1.2"
|
||||
|
||||
var UserVersion = Version + CurrentCommit
|
||||
|
||||
@ -16,7 +16,7 @@ var UserVersion = Version + CurrentCommit
|
||||
// R R H
|
||||
// |\vv/|
|
||||
// vv vv
|
||||
const APIVersion = 0x000101
|
||||
const APIVersion = 0x000102
|
||||
|
||||
const (
|
||||
MajorMask = 0xff0000
|
||||
|
@ -124,6 +124,22 @@ func (bi *BigInt) UnmarshalJSON(b []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var sizeUnits = []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"}
|
||||
|
||||
func (bi BigInt) SizeStr() string {
|
||||
r := new(big.Rat).SetInt(bi.Int)
|
||||
den := big.NewRat(1, 1024)
|
||||
|
||||
var i int
|
||||
for f, _ := r.Float64(); f >= 1024 && 1 < len(sizeUnits); f, _ = r.Float64() {
|
||||
i++
|
||||
r = r.Mul(r, den)
|
||||
}
|
||||
|
||||
f, _ := r.Float64()
|
||||
return fmt.Sprintf("%.3g %s", f, sizeUnits[i])
|
||||
}
|
||||
|
||||
func (bi *BigInt) Scan(value interface{}) error {
|
||||
switch value := value.(type) {
|
||||
case string:
|
||||
|
@ -3,6 +3,8 @@ package types
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestBigIntSerializationRoundTrip(t *testing.T) {
|
||||
@ -49,3 +51,21 @@ func TestFilRoundTrip(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSizeStr(t *testing.T) {
|
||||
cases := []struct {
|
||||
in uint64
|
||||
out string
|
||||
}{
|
||||
{0, "0 B"},
|
||||
{1, "1 B"},
|
||||
{1024, "1 KiB"},
|
||||
{2000, "1.95 KiB"},
|
||||
{5 << 20, "5 MiB"},
|
||||
{11 << 60, "11 EiB"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
assert.Equal(t, c.out, NewInt(c.in).SizeStr(), "input %+v, produced wrong result", c)
|
||||
}
|
||||
}
|
||||
|
24
cli/utils.go
24
cli/utils.go
@ -1,24 +0,0 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
var Units = []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"}
|
||||
|
||||
func SizeStr(size types.BigInt) string {
|
||||
r := new(big.Rat).SetInt(size.Int)
|
||||
den := big.NewRat(1, 1024)
|
||||
|
||||
var i int
|
||||
for f, _ := r.Float64(); f >= 1024 && 1 < len(Units); f, _ = r.Float64() {
|
||||
i++
|
||||
r = r.Mul(r, den)
|
||||
}
|
||||
|
||||
f, _ := r.Float64()
|
||||
return fmt.Sprintf("%.3g %s", f, Units[i])
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
types "github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestSizeStr(t *testing.T) {
|
||||
cases := []struct {
|
||||
in uint64
|
||||
out string
|
||||
}{
|
||||
{0, "0 B"},
|
||||
{1, "1 B"},
|
||||
{1024, "1 KiB"},
|
||||
{2000, "1.95 KiB"},
|
||||
{5 << 20, "5 MiB"},
|
||||
{11 << 60, "11 EiB"},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
assert.Equal(t, c.out, SizeStr(types.NewInt(c.in)), "input %+v, produced wrong result", c)
|
||||
}
|
||||
|
||||
}
|
@ -6,7 +6,6 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/docker/go-units"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
@ -14,8 +13,8 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
@ -140,17 +139,12 @@ func main() {
|
||||
Miner: maddr,
|
||||
SectorSize: sectorSize,
|
||||
WorkerThreads: 2,
|
||||
CacheDir: filepath.Join(sbdir, "cache"),
|
||||
SealedDir: filepath.Join(sbdir, "sealed"),
|
||||
StagedDir: filepath.Join(sbdir, "staged"),
|
||||
UnsealedDir: filepath.Join(sbdir, "unsealed"),
|
||||
Dir: sbdir,
|
||||
}
|
||||
|
||||
if robench == "" {
|
||||
for _, d := range []string{cfg.CacheDir, cfg.SealedDir, cfg.StagedDir, cfg.UnsealedDir} {
|
||||
if err := os.MkdirAll(d, 0775); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(sbdir, 0775); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@ -371,5 +365,5 @@ func bps(data uint64, d time.Duration) string {
|
||||
bdata := new(big.Int).SetUint64(data)
|
||||
bdata = bdata.Mul(bdata, big.NewInt(time.Second.Nanoseconds()))
|
||||
bps := bdata.Div(bdata, big.NewInt(d.Nanoseconds()))
|
||||
return lcli.SizeStr(types.BigInt{bps}) + "/s"
|
||||
return (types.BigInt{bps}).SizeStr() + "/s"
|
||||
}
|
||||
|
@ -2,10 +2,8 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
"net/http"
|
||||
|
||||
lapi "github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
@ -35,10 +33,7 @@ func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, aut
|
||||
SectorSize: ssize,
|
||||
Miner: act,
|
||||
WorkerThreads: 1,
|
||||
CacheDir: filepath.Join(repo, "cache"),
|
||||
SealedDir: filepath.Join(repo, "sealed"),
|
||||
StagedDir: filepath.Join(repo, "staged"),
|
||||
UnsealedDir: filepath.Join(repo, "unsealed"),
|
||||
Dir: repo,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -132,7 +132,7 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType)
|
||||
var err error
|
||||
switch typ {
|
||||
case sectorbuilder.WorkerPreCommit:
|
||||
err = w.fetch("staged", sectorID)
|
||||
err = w.fetch("staging", sectorID)
|
||||
case sectorbuilder.WorkerCommit:
|
||||
err = w.fetch("sealed", sectorID)
|
||||
if err != nil {
|
||||
|
@ -194,10 +194,7 @@ var aggregateSectorDirsCmd = &cli.Command{
|
||||
agsb, err := sectorbuilder.New(§orbuilder.Config{
|
||||
Miner: maddr,
|
||||
SectorSize: ssize,
|
||||
CacheDir: filepath.Join(destdir, "cache"),
|
||||
SealedDir: filepath.Join(destdir, "sealed"),
|
||||
StagedDir: filepath.Join(destdir, "staging"),
|
||||
UnsealedDir: filepath.Join(destdir, "unsealed"),
|
||||
Dir: destdir,
|
||||
WorkerThreads: 2,
|
||||
}, agmds)
|
||||
if err != nil {
|
||||
@ -258,10 +255,7 @@ var aggregateSectorDirsCmd = &cli.Command{
|
||||
sb, err := sectorbuilder.New(§orbuilder.Config{
|
||||
Miner: maddr,
|
||||
SectorSize: genm.SectorSize,
|
||||
CacheDir: filepath.Join(dir, "cache"),
|
||||
SealedDir: filepath.Join(dir, "sealed"),
|
||||
StagedDir: filepath.Join(dir, "staging"),
|
||||
UnsealedDir: filepath.Join(dir, "unsealed"),
|
||||
Dir: dir,
|
||||
WorkerThreads: 2,
|
||||
}, mds)
|
||||
if err != nil {
|
||||
|
@ -30,17 +30,12 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb
|
||||
Miner: maddr,
|
||||
SectorSize: ssize,
|
||||
FallbackLastID: offset,
|
||||
CacheDir: filepath.Join(sbroot, "cache"),
|
||||
SealedDir: filepath.Join(sbroot, "sealed"),
|
||||
StagedDir: filepath.Join(sbroot, "staging"),
|
||||
UnsealedDir: filepath.Join(sbroot, "unsealed"),
|
||||
Dir: sbroot,
|
||||
WorkerThreads: 2,
|
||||
}
|
||||
|
||||
for _, d := range []string{cfg.CacheDir, cfg.SealedDir, cfg.StagedDir, cfg.UnsealedDir} {
|
||||
if err := os.MkdirAll(d, 0775); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := os.MkdirAll(sbroot, 0775); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mds, err := badger.NewDatastore(filepath.Join(sbroot, "badger"), nil)
|
||||
|
@ -43,7 +43,7 @@ var infoCmd = &cli.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("Sector Size: %s\n", lcli.SizeStr(types.NewInt(sizeByte)))
|
||||
fmt.Printf("Sector Size: %s\n", types.NewInt(sizeByte).SizeStr())
|
||||
|
||||
pow, err := api.StateMinerPower(ctx, maddr, nil)
|
||||
if err != nil {
|
||||
@ -51,14 +51,14 @@ var infoCmd = &cli.Command{
|
||||
}
|
||||
|
||||
percI := types.BigDiv(types.BigMul(pow.MinerPower, types.NewInt(1000)), pow.TotalPower)
|
||||
fmt.Printf("Power: %s / %s (%0.4f%%)\n", lcli.SizeStr(pow.MinerPower), lcli.SizeStr(pow.TotalPower), float64(percI.Int64())/100000*10000)
|
||||
fmt.Printf("Power: %s / %s (%0.4f%%)\n", pow.MinerPower.SizeStr(), pow.TotalPower.SizeStr(), float64(percI.Int64())/100000*10000)
|
||||
|
||||
secCounts, err := api.StateMinerSectorCount(ctx, maddr, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("\tCommitted: %s\n", lcli.SizeStr(types.BigMul(types.NewInt(secCounts.Sset), types.NewInt(sizeByte))))
|
||||
fmt.Printf("\tProving: %s\n", lcli.SizeStr(types.BigMul(types.NewInt(secCounts.Pset), types.NewInt(sizeByte))))
|
||||
fmt.Printf("\tCommitted: %s\n", types.BigMul(types.NewInt(secCounts.Sset), types.NewInt(sizeByte)).SizeStr())
|
||||
fmt.Printf("\tProving: %s\n", types.BigMul(types.NewInt(secCounts.Pset), types.NewInt(sizeByte)).SizeStr())
|
||||
|
||||
// TODO: indicate whether the post worker is in use
|
||||
wstat, err := nodeApi.WorkerStats(ctx)
|
||||
|
@ -172,10 +172,7 @@ var initCmd = &cli.Command{
|
||||
oldsb, err := sectorbuilder.New(§orbuilder.Config{
|
||||
SectorSize: ssize,
|
||||
WorkerThreads: 2,
|
||||
SealedDir: filepath.Join(pssb, "sealed"),
|
||||
CacheDir: filepath.Join(pssb, "cache"),
|
||||
StagedDir: filepath.Join(pssb, "staging"),
|
||||
UnsealedDir: filepath.Join(pssb, "unsealed"),
|
||||
Dir: pssb,
|
||||
}, oldmds)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err)
|
||||
@ -184,10 +181,7 @@ var initCmd = &cli.Command{
|
||||
nsb, err := sectorbuilder.New(§orbuilder.Config{
|
||||
SectorSize: ssize,
|
||||
WorkerThreads: 2,
|
||||
SealedDir: filepath.Join(lr.Path(), "sealed"),
|
||||
CacheDir: filepath.Join(lr.Path(), "cache"),
|
||||
StagedDir: filepath.Join(lr.Path(), "staging"),
|
||||
UnsealedDir: filepath.Join(lr.Path(), "unsealed"),
|
||||
Dir: lr.Path(),
|
||||
}, mds)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to open up sectorbuilder: %w", err)
|
||||
|
@ -61,7 +61,7 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
|
||||
if v.APIVersion != build.APIVersion {
|
||||
return xerrors.Errorf("lotus-daemon API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
|
||||
return xerrors.Errorf("lotus-daemon API version doesn't match: local: %s", api.Version{APIVersion: build.APIVersion})
|
||||
}
|
||||
|
||||
log.Info("Checking full node sync status")
|
||||
|
@ -17,11 +17,11 @@ func (sb *SectorBuilder) SectorName(sectorID uint64) string {
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) StagedSectorPath(sectorID uint64) string {
|
||||
return filepath.Join(sb.stagedDir, sb.SectorName(sectorID))
|
||||
return filepath.Join(sb.filesystem.pathFor(dataStaging), sb.SectorName(sectorID))
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) unsealedSectorPath(sectorID uint64) string {
|
||||
return filepath.Join(sb.unsealedDir, sb.SectorName(sectorID))
|
||||
return filepath.Join(sb.filesystem.pathFor(dataUnsealed), sb.SectorName(sectorID))
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
|
||||
@ -29,13 +29,13 @@ func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) SealedSectorPath(sectorID uint64) (string, error) {
|
||||
path := filepath.Join(sb.sealedDir, sb.SectorName(sectorID))
|
||||
path := filepath.Join(sb.filesystem.pathFor(dataSealed), sb.SectorName(sectorID))
|
||||
|
||||
return path, nil
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
|
||||
dir := filepath.Join(sb.cacheDir, sb.SectorName(sectorID))
|
||||
dir := filepath.Join(sb.filesystem.pathFor(dataCache), sb.SectorName(sectorID))
|
||||
|
||||
err := os.Mkdir(dir, 0755)
|
||||
if os.IsExist(err) {
|
||||
@ -46,16 +46,12 @@ func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) GetPath(typ string, sectorName string) (string, error) {
|
||||
switch typ {
|
||||
case "staged":
|
||||
return filepath.Join(sb.stagedDir, sectorName), nil
|
||||
case "sealed":
|
||||
return filepath.Join(sb.sealedDir, sectorName), nil
|
||||
case "cache":
|
||||
return filepath.Join(sb.cacheDir, sectorName), nil
|
||||
default:
|
||||
return "", xerrors.Errorf("unknown sector type for write: %s", typ)
|
||||
_, found := overheadMul[dataType(typ)]
|
||||
if !found {
|
||||
return "", xerrors.Errorf("unknown sector type: %s", typ)
|
||||
}
|
||||
|
||||
return sb.filesystem.pathFor(dataType(typ)), nil
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) TrimCache(sectorID uint64) error {
|
||||
|
111
lib/sectorbuilder/fs.go
Normal file
111
lib/sectorbuilder/fs.go
Normal file
@ -0,0 +1,111 @@
|
||||
package sectorbuilder
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"golang.org/x/xerrors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type dataType string
|
||||
|
||||
const (
|
||||
dataCache dataType = "cache"
|
||||
dataStaging dataType = "staging"
|
||||
dataSealed dataType = "sealed"
|
||||
dataUnsealed dataType = "unsealed"
|
||||
)
|
||||
|
||||
var overheadMul = map[dataType]uint64{ // * sectorSize
|
||||
dataCache: 11, // TODO: check if true for 32G sectors
|
||||
dataStaging: 1,
|
||||
dataSealed: 1,
|
||||
dataUnsealed: 1,
|
||||
}
|
||||
|
||||
type fs struct {
|
||||
path string
|
||||
|
||||
// in progress actions
|
||||
|
||||
reserved map[dataType]uint64
|
||||
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func openFs(dir string) *fs {
|
||||
return &fs{
|
||||
path: dir,
|
||||
reserved: map[dataType]uint64{},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fs) init() error {
|
||||
for _, dir := range []string{f.path,
|
||||
f.pathFor(dataCache),
|
||||
f.pathFor(dataStaging),
|
||||
f.pathFor(dataSealed),
|
||||
f.pathFor(dataUnsealed)} {
|
||||
if err := os.Mkdir(dir, 0755); err != nil {
|
||||
if os.IsExist(err) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fs) pathFor(typ dataType) string {
|
||||
_, found := overheadMul[typ]
|
||||
if !found {
|
||||
panic("unknown data path requested")
|
||||
}
|
||||
|
||||
return filepath.Join(f.path, string(typ))
|
||||
}
|
||||
|
||||
func (f *fs) reservedBytes() int64 {
|
||||
var out int64
|
||||
for _, r := range f.reserved {
|
||||
out += int64(r)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (f *fs) reserve(typ dataType, size uint64) error {
|
||||
f.lk.Lock()
|
||||
defer f.lk.Unlock()
|
||||
|
||||
var fsstat syscall.Statfs_t
|
||||
|
||||
if err := syscall.Statfs(f.pathFor(typ), &fsstat); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
avail := int64(fsstat.Bavail) * fsstat.Bsize
|
||||
|
||||
avail -= f.reservedBytes()
|
||||
|
||||
need := overheadMul[typ] * size
|
||||
|
||||
if int64(need) > avail {
|
||||
return xerrors.Errorf("not enough space in '%s', need %s, available %s", f.path, types.NewInt(need).SizeStr(), types.NewInt(uint64(avail)).SizeStr())
|
||||
}
|
||||
|
||||
f.reserved[typ] += need
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fs) free(typ dataType, sectorSize uint64) {
|
||||
f.lk.Lock()
|
||||
defer f.lk.Unlock()
|
||||
|
||||
f.reserved[typ] -= overheadMul[typ] * sectorSize
|
||||
|
||||
return
|
||||
}
|
@ -1,8 +1,6 @@
|
||||
package sectorbuilder
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
@ -13,18 +11,10 @@ func TempSectorbuilderDir(dir string, sectorSize uint64, ds dtypes.MetadataDS) (
|
||||
return nil, err
|
||||
}
|
||||
|
||||
unsealed := filepath.Join(dir, "unsealed")
|
||||
sealed := filepath.Join(dir, "sealed")
|
||||
staging := filepath.Join(dir, "staging")
|
||||
cache := filepath.Join(dir, "cache")
|
||||
|
||||
sb, err := New(&Config{
|
||||
SectorSize: sectorSize,
|
||||
|
||||
SealedDir: sealed,
|
||||
StagedDir: staging,
|
||||
UnsealedDir: unsealed,
|
||||
CacheDir: cache,
|
||||
Dir: dir,
|
||||
|
||||
WorkerThreads: 2,
|
||||
Miner: addr,
|
||||
|
@ -64,11 +64,6 @@ type SectorBuilder struct {
|
||||
|
||||
Miner address.Address
|
||||
|
||||
stagedDir string
|
||||
sealedDir string
|
||||
cacheDir string
|
||||
unsealedDir string
|
||||
|
||||
unsealLk sync.Mutex
|
||||
|
||||
noCommit bool
|
||||
@ -89,6 +84,9 @@ type SectorBuilder struct {
|
||||
commitWait int32
|
||||
unsealWait int32
|
||||
|
||||
fsLk sync.Mutex
|
||||
filesystem *fs // TODO: multi-fs support
|
||||
|
||||
stopping chan struct{}
|
||||
}
|
||||
|
||||
@ -135,11 +133,8 @@ type Config struct {
|
||||
NoCommit bool
|
||||
NoPreCommit bool
|
||||
|
||||
CacheDir string
|
||||
SealedDir string
|
||||
StagedDir string
|
||||
UnsealedDir string
|
||||
_ struct{} // guard against nameless init
|
||||
Dir string
|
||||
_ struct{} // guard against nameless init
|
||||
}
|
||||
|
||||
func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
|
||||
@ -147,15 +142,6 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
|
||||
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers, cfg.WorkerThreads)
|
||||
}
|
||||
|
||||
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.UnsealedDir} {
|
||||
if err := os.Mkdir(dir, 0755); err != nil {
|
||||
if os.IsExist(err) {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var lastUsedID uint64
|
||||
b, err := ds.Get(lastSectorIdKey)
|
||||
switch err {
|
||||
@ -185,10 +171,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
|
||||
ssize: cfg.SectorSize,
|
||||
lastID: lastUsedID,
|
||||
|
||||
stagedDir: cfg.StagedDir,
|
||||
sealedDir: cfg.SealedDir,
|
||||
cacheDir: cfg.CacheDir,
|
||||
unsealedDir: cfg.UnsealedDir,
|
||||
filesystem: openFs(cfg.Dir),
|
||||
|
||||
Miner: cfg.Miner,
|
||||
|
||||
@ -205,35 +188,33 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
|
||||
stopping: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err := sb.filesystem.init(); err != nil {
|
||||
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
|
||||
}
|
||||
|
||||
return sb, nil
|
||||
}
|
||||
|
||||
func NewStandalone(cfg *Config) (*SectorBuilder, error) {
|
||||
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.UnsealedDir} {
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
if os.IsExist(err) {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &SectorBuilder{
|
||||
sb := &SectorBuilder{
|
||||
ds: nil,
|
||||
|
||||
ssize: cfg.SectorSize,
|
||||
|
||||
Miner: cfg.Miner,
|
||||
stagedDir: cfg.StagedDir,
|
||||
sealedDir: cfg.SealedDir,
|
||||
cacheDir: cfg.CacheDir,
|
||||
unsealedDir: cfg.UnsealedDir,
|
||||
Miner: cfg.Miner,
|
||||
filesystem: openFs(cfg.Dir),
|
||||
|
||||
taskCtr: 1,
|
||||
remotes: map[int]*remote{},
|
||||
rateLimit: make(chan struct{}, cfg.WorkerThreads),
|
||||
stopping: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err := sb.filesystem.init(); err != nil {
|
||||
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
|
||||
}
|
||||
|
||||
return sb, nil
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) checkRateLimit() {
|
||||
@ -312,6 +293,13 @@ func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
|
||||
fs := sb.filesystem
|
||||
|
||||
if err := fs.reserve(dataStaging, sb.ssize); err != nil {
|
||||
return PublicPieceInfo{}, err
|
||||
}
|
||||
defer fs.free(dataStaging, sb.ssize)
|
||||
|
||||
atomic.AddInt32(&sb.addPieceWait, 1)
|
||||
ret := sb.RateLimit()
|
||||
atomic.AddInt32(&sb.addPieceWait, -1)
|
||||
@ -347,6 +335,13 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
|
||||
fs := sb.filesystem
|
||||
|
||||
if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals
|
||||
return nil, err
|
||||
}
|
||||
defer fs.free(dataUnsealed, sb.ssize)
|
||||
|
||||
atomic.AddInt32(&sb.unsealWait, 1)
|
||||
// TODO: Don't wait if cached
|
||||
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
|
||||
@ -433,6 +428,18 @@ func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitO
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
|
||||
fs := sb.filesystem
|
||||
|
||||
if err := fs.reserve(dataCache, sb.ssize); err != nil {
|
||||
return RawSealPreCommitOutput{}, err
|
||||
}
|
||||
defer fs.free(dataCache, sb.ssize)
|
||||
|
||||
if err := fs.reserve(dataSealed, sb.ssize); err != nil {
|
||||
return RawSealPreCommitOutput{}, err
|
||||
}
|
||||
defer fs.free(dataSealed, sb.ssize)
|
||||
|
||||
call := workerCall{
|
||||
task: WorkerTask{
|
||||
Type: WorkerPreCommit,
|
||||
@ -692,15 +699,15 @@ func fallbackPostChallengeCount(sectors uint64) uint64 {
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder, symlink bool) error {
|
||||
if err := migrate(osb.cacheDir, sb.cacheDir, symlink); err != nil {
|
||||
if err := migrate(osb.filesystem.pathFor(dataCache), sb.filesystem.pathFor(dataCache), symlink); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := migrate(osb.sealedDir, sb.sealedDir, symlink); err != nil {
|
||||
if err := migrate(osb.filesystem.pathFor(dataStaging), sb.filesystem.pathFor(dataStaging), symlink); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := migrate(osb.stagedDir, sb.stagedDir, symlink); err != nil {
|
||||
if err := migrate(osb.filesystem.pathFor(dataSealed), sb.filesystem.pathFor(dataSealed), symlink); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,6 @@ package modules
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
||||
"github.com/ipfs/go-bitswap"
|
||||
@ -73,11 +72,6 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit
|
||||
return nil, xerrors.Errorf("too many sectorbuilder threads specified: %d, max allowed: %d", threads, math.MaxUint8)
|
||||
}
|
||||
|
||||
cache := filepath.Join(sp, "cache")
|
||||
unsealed := filepath.Join(sp, "unsealed")
|
||||
sealed := filepath.Join(sp, "sealed")
|
||||
staging := filepath.Join(sp, "staging")
|
||||
|
||||
sb := §orbuilder.Config{
|
||||
Miner: minerAddr,
|
||||
SectorSize: ssize,
|
||||
@ -86,10 +80,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit
|
||||
NoPreCommit: noprecommit,
|
||||
NoCommit: nocommit,
|
||||
|
||||
CacheDir: cache,
|
||||
UnsealedDir: unsealed,
|
||||
SealedDir: sealed,
|
||||
StagedDir: staging,
|
||||
Dir: sp,
|
||||
}
|
||||
|
||||
return sb, nil
|
||||
|
@ -229,10 +229,7 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te
|
||||
SectorSize: 1024,
|
||||
WorkerThreads: 2,
|
||||
Miner: genMiner,
|
||||
CacheDir: filepath.Join(psd, "cache"),
|
||||
StagedDir: filepath.Join(psd, "staging"),
|
||||
SealedDir: filepath.Join(psd, "sealed"),
|
||||
UnsealedDir: filepath.Join(psd, "unsealed"),
|
||||
Dir: psd,
|
||||
}, mds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
Loading…
Reference in New Issue
Block a user