keep track of last used sector id across restarts

This commit is contained in:
Łukasz Magiera 2019-11-08 19:49:36 +01:00
parent 77dbb67651
commit c729cabef1
3 changed files with 98 additions and 10 deletions

View File

@ -6,9 +6,10 @@ import (
"path/filepath" "path/filepath"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/node/modules/dtypes"
) )
func TempSectorbuilder(sectorSize uint64) (*SectorBuilder, func(), error) { func TempSectorbuilder(sectorSize uint64, ds dtypes.MetadataDS) (*SectorBuilder, func(), error) {
dir, err := ioutil.TempDir("", "sbtest") dir, err := ioutil.TempDir("", "sbtest")
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -34,12 +35,14 @@ func TempSectorbuilder(sectorSize uint64) (*SectorBuilder, func(), error) {
WorkerThreads: 2, WorkerThreads: 2,
Miner: addr, Miner: addr,
}) }, ds, nil)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return sb, func() { return sb, func() {
sb.Destroy()
if err := os.RemoveAll(dir); err != nil { if err := os.RemoveAll(dir); err != nil {
log.Warn("failed to clean up temp sectorbuilder: ", err) log.Warn("failed to clean up temp sectorbuilder: ", err)
} }

View File

@ -1,21 +1,30 @@
package sectorbuilder package sectorbuilder
import ( import (
"context"
"fmt"
"io" "io"
"os" "os"
"sort" "sort"
"strconv"
"sync"
"unsafe" "unsafe"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder" sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/node/modules/dtypes"
) )
const PoStReservedWorkers = 1 const PoStReservedWorkers = 1
const PoRepProofPartitions = 2 const PoRepProofPartitions = 2
var lastSectorIdKey = datastore.NewKey("/sectorbuilder/last")
var log = logging.Logger("sectorbuilder") var log = logging.Logger("sectorbuilder")
type SectorSealingStatus = sectorbuilder.SectorSealingStatus type SectorSealingStatus = sectorbuilder.SectorSealingStatus
@ -42,6 +51,9 @@ const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct { type SectorBuilder struct {
handle unsafe.Pointer handle unsafe.Pointer
ds dtypes.MetadataDS
idLk sync.Mutex
ssize uint64 ssize uint64
Miner address.Address Miner address.Address
@ -65,7 +77,7 @@ type Config struct {
MetadataDir string MetadataDir string
} }
func New(cfg *Config) (*SectorBuilder, error) { func New(cfg *Config, ds dtypes.MetadataDS, lc fx.Lifecycle) (*SectorBuilder, error) {
if cfg.WorkerThreads <= PoStReservedWorkers { if cfg.WorkerThreads <= PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads) return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
} }
@ -81,13 +93,29 @@ func New(cfg *Config) (*SectorBuilder, error) {
} }
} }
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads) var lastUsedID uint64
b, err := ds.Get(lastSectorIdKey)
switch err {
case nil:
i, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return nil, err
}
lastUsedID = uint64(i)
case datastore.ErrNotFound:
default:
return nil, err
}
sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, lastUsedID, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &SectorBuilder{ sb := &SectorBuilder{
handle: sbp, handle: sbp,
ds: ds,
ssize: cfg.SectorSize, ssize: cfg.SectorSize,
stagedDir: cfg.StagedDir, stagedDir: cfg.StagedDir,
@ -96,7 +124,18 @@ func New(cfg *Config) (*SectorBuilder, error) {
Miner: cfg.Miner, Miner: cfg.Miner,
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers), rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
}, nil }
if lc != nil {
lc.Append(fx.Hook{
OnStop: func(context.Context) error {
sb.Destroy()
return nil
},
})
}
return sb, nil
} }
func (sb *SectorBuilder) rlimit() func() { func (sb *SectorBuilder) rlimit() func() {
@ -125,7 +164,18 @@ func (sb *SectorBuilder) Destroy() {
} }
func (sb *SectorBuilder) AcquireSectorId() (uint64, error) { func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
return sectorbuilder.AcquireSectorId(sb.handle) sb.idLk.Lock()
defer sb.idLk.Unlock()
id, err := sectorbuilder.AcquireSectorId(sb.handle)
if err != nil {
return 0, err
}
err = sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id)))
if err != nil {
return 0, err
}
return id, nil
} }
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) { func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {

View File

@ -6,6 +6,10 @@ import (
"os" "os"
"testing" "testing"
"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
@ -22,12 +26,11 @@ func TestSealAndVerify(t *testing.T) {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize) sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize, datastore.NewMapDatastore())
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
_ = cleanup defer cleanup()
//defer cleanup()
dlen := sectorbuilder.UserBytesForSectorSize(sectorSize) dlen := sectorbuilder.UserBytesForSectorSize(sectorSize)
@ -91,3 +94,35 @@ func TestSealAndVerify(t *testing.T) {
t.Fatal("bad post") t.Fatal("bad post")
} }
} }
func TestAcquireID(t *testing.T) {
ds := datastore.NewMapDatastore()
sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
assertAcquire := func(expect uint64) {
id, err := sb.AcquireSectorId()
require.NoError(t, err)
assert.Equal(t, expect, id)
}
assertAcquire(1)
assertAcquire(2)
assertAcquire(3)
cleanup()
sb, cleanup, err = sectorbuilder.TempSectorbuilder(sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
assertAcquire(4)
assertAcquire(5)
assertAcquire(6)
cleanup()
}