From c729cabef156fdd6e9085e41ee0566b3e1e580d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 8 Nov 2019 19:49:36 +0100 Subject: [PATCH] keep track of last used sector id across restarts --- lib/sectorbuilder/mock.go | 7 ++- lib/sectorbuilder/sectorbuilder.go | 60 ++++++++++++++++++++++--- lib/sectorbuilder/sectorbuilder_test.go | 41 +++++++++++++++-- 3 files changed, 98 insertions(+), 10 deletions(-) diff --git a/lib/sectorbuilder/mock.go b/lib/sectorbuilder/mock.go index 20ec4ac51..58db95d09 100644 --- a/lib/sectorbuilder/mock.go +++ b/lib/sectorbuilder/mock.go @@ -6,9 +6,10 @@ import ( "path/filepath" "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) -func TempSectorbuilder(sectorSize uint64) (*SectorBuilder, func(), error) { +func TempSectorbuilder(sectorSize uint64, ds dtypes.MetadataDS) (*SectorBuilder, func(), error) { dir, err := ioutil.TempDir("", "sbtest") if err != nil { return nil, nil, err @@ -34,12 +35,14 @@ func TempSectorbuilder(sectorSize uint64) (*SectorBuilder, func(), error) { WorkerThreads: 2, Miner: addr, - }) + }, ds, nil) if err != nil { return nil, nil, err } return sb, func() { + sb.Destroy() + if err := os.RemoveAll(dir); err != nil { log.Warn("failed to clean up temp sectorbuilder: ", err) } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 27b804015..bdc8d5a66 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -1,21 +1,30 @@ package sectorbuilder import ( + "context" + "fmt" "io" "os" "sort" + "strconv" + "sync" "unsafe" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" + "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) const PoStReservedWorkers = 1 const PoRepProofPartitions = 2 +var lastSectorIdKey = datastore.NewKey("/sectorbuilder/last") + var log = logging.Logger("sectorbuilder") type SectorSealingStatus = sectorbuilder.SectorSealingStatus @@ -42,6 +51,9 @@ const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { handle unsafe.Pointer + ds dtypes.MetadataDS + idLk sync.Mutex + ssize uint64 Miner address.Address @@ -65,7 +77,7 @@ type Config struct { MetadataDir string } -func New(cfg *Config) (*SectorBuilder, error) { +func New(cfg *Config, ds dtypes.MetadataDS, lc fx.Lifecycle) (*SectorBuilder, error) { if cfg.WorkerThreads <= PoStReservedWorkers { return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads) } @@ -81,13 +93,29 @@ func New(cfg *Config) (*SectorBuilder, error) { } } - sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, 0, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads) + var lastUsedID uint64 + b, err := ds.Get(lastSectorIdKey) + switch err { + case nil: + i, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return nil, err + } + lastUsedID = uint64(i) + case datastore.ErrNotFound: + default: + return nil, err + } + + sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, PoRepProofPartitions, lastUsedID, cfg.MetadataDir, proverId, cfg.SealedDir, cfg.StagedDir, cfg.CacheDir, 16, cfg.WorkerThreads) if err != nil { return nil, err } - return &SectorBuilder{ + sb := &SectorBuilder{ handle: sbp, + ds: ds, + ssize: cfg.SectorSize, stagedDir: cfg.StagedDir, @@ -96,7 +124,18 @@ func New(cfg *Config) (*SectorBuilder, error) { Miner: cfg.Miner, rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers), - }, nil + } + + if lc != nil { + lc.Append(fx.Hook{ + OnStop: func(context.Context) error { + sb.Destroy() + return nil + }, + }) + } + + return sb, nil } func (sb *SectorBuilder) rlimit() func() { @@ -125,7 +164,18 @@ func (sb *SectorBuilder) Destroy() { } func (sb *SectorBuilder) AcquireSectorId() (uint64, error) { - return sectorbuilder.AcquireSectorId(sb.handle) + sb.idLk.Lock() + defer sb.idLk.Unlock() + + id, err := sectorbuilder.AcquireSectorId(sb.handle) + if err != nil { + return 0, err + } + err = sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id))) + if err != nil { + return 0, err + } + return id, nil } func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) { diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index 0e399be66..138cd939c 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -6,6 +6,10 @@ import ( "os" "testing" + "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) @@ -22,12 +26,11 @@ func TestSealAndVerify(t *testing.T) { t.Fatalf("%+v", err) } - sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize) + sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize, datastore.NewMapDatastore()) if err != nil { t.Fatalf("%+v", err) } - _ = cleanup - //defer cleanup() + defer cleanup() dlen := sectorbuilder.UserBytesForSectorSize(sectorSize) @@ -91,3 +94,35 @@ func TestSealAndVerify(t *testing.T) { t.Fatal("bad post") } } + +func TestAcquireID(t *testing.T) { + ds := datastore.NewMapDatastore() + + sb, cleanup, err := sectorbuilder.TempSectorbuilder(sectorSize, ds) + if err != nil { + t.Fatalf("%+v", err) + } + + assertAcquire := func(expect uint64) { + id, err := sb.AcquireSectorId() + require.NoError(t, err) + assert.Equal(t, expect, id) + } + + assertAcquire(1) + assertAcquire(2) + assertAcquire(3) + + cleanup() + + sb, cleanup, err = sectorbuilder.TempSectorbuilder(sectorSize, ds) + if err != nil { + t.Fatalf("%+v", err) + } + + assertAcquire(4) + assertAcquire(5) + assertAcquire(6) + + cleanup() +}