diff --git a/.circleci/config.yml b/.circleci/config.yml index 9f8f28b86..d151026f0 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -826,6 +826,11 @@ workflows: suite: itest-sdr_upgrade target: "./itests/sdr_upgrade_test.go" + - test: + name: test-itest-sector_finalize_early + suite: itest-sector_finalize_early + target: "./itests/sector_finalize_early_test.go" + - test: name: test-itest-sector_pledge suite: itest-sector_pledge diff --git a/cmd/lotus-seal-worker/storage.go b/cmd/lotus-seal-worker/storage.go index afb566166..be662a6c3 100644 --- a/cmd/lotus-seal-worker/storage.go +++ b/cmd/lotus-seal-worker/storage.go @@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{ } if !(cfg.CanStore || cfg.CanSeal) { - return xerrors.Errorf("must specify at least one of --store of --seal") + return xerrors.Errorf("must specify at least one of --store or --seal") } b, err := json.MarshalIndent(cfg, "", " ") diff --git a/cmd/lotus-storage-miner/storage.go b/cmd/lotus-storage-miner/storage.go index b4ab26ad3..f2068ea86 100644 --- a/cmd/lotus-storage-miner/storage.go +++ b/cmd/lotus-storage-miner/storage.go @@ -145,7 +145,7 @@ over time } if !(cfg.CanStore || cfg.CanSeal) { - return xerrors.Errorf("must specify at least one of --store of --seal") + return xerrors.Errorf("must specify at least one of --store or --seal") } b, err := json.MarshalIndent(cfg, "", " ") diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 136c00252..79eca74d6 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, } } + pathType := storiface.PathStorage + { + sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false) + if err != nil { + return xerrors.Errorf("finding sealed sector: %w", err) + } + + for _, store := range sealedStores { + if store.CanSeal { + pathType = storiface.PathSealing + break + } + } + } + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) return err diff --git a/itests/deals_test.go b/itests/deals_test.go index f8389bbd6..8aff414e0 100644 --- a/itests/deals_test.go +++ b/itests/deals_test.go @@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) { kit.QuietMiningLogs() - var blockTime = 1 * time.Second + var blockTime = 50 * time.Millisecond client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs. ens.InterconnectAll().BeginMining(blockTime) diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index d3f0d2e3c..eea2bc0c1 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -2,22 +2,29 @@ package kit import ( "context" + "encoding/json" "fmt" + "io/ioutil" + "os" + "path/filepath" "strings" "testing" "time" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/miner" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/require" ) // TestMiner represents a miner enrolled in an Ensemble. @@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) { fmt.Printf("COMMIT BATCH: %+v\n", cb) } } + +const metaFile = "sectorstore.json" + +func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) { + p, err := ioutil.TempDir("", "lotus-testsectors-") + require.NoError(t, err) + + if err := os.MkdirAll(p, 0755); err != nil { + if !os.IsExist(err) { + require.NoError(t, err) + } + } + + _, err = os.Stat(filepath.Join(p, metaFile)) + if !os.IsNotExist(err) { + require.NoError(t, err) + } + + cfg := &stores.LocalStorageMeta{ + ID: stores.ID(uuid.New().String()), + Weight: weight, + CanSeal: seal, + CanStore: store, + } + + if !(cfg.CanStore || cfg.CanSeal) { + t.Fatal("must specify at least one of CanStore or cfg.CanSeal") + } + + b, err := json.MarshalIndent(cfg, "", " ") + require.NoError(t, err) + + err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644) + require.NoError(t, err) + + err = tm.StorageAddLocal(ctx, p) + require.NoError(t, err) +} diff --git a/itests/sector_finalize_early_test.go b/itests/sector_finalize_early_test.go new file mode 100644 index 000000000..3eb980f9e --- /dev/null +++ b/itests/sector_finalize_early_test.go @@ -0,0 +1,66 @@ +package itests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" +) + +func TestDealsWithFinalizeEarly(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + kit.QuietMiningLogs() + + var blockTime = 50 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts( + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + cf := config.DefaultStorageMiner() + cf.Sealing.FinalizeEarly = true + return modules.ToSealingConfig(cf), nil + }, nil + })))) // no mock proofs. + ens.InterconnectAll().BeginMining(blockTime) + dh := kit.NewDealHarness(t, client, miner) + + ctx := context.Background() + + miner.AddStorage(ctx, t, 1000000000, true, false) + miner.AddStorage(ctx, t, 1000000000, false, true) + + sl, err := miner.StorageList(ctx) + require.NoError(t, err) + for si, d := range sl { + i, err := miner.StorageInfo(ctx, si) + require.NoError(t, err) + + fmt.Printf("stor d:%d %+v\n", len(d), i) + } + + t.Run("single", func(t *testing.T) { + dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1}) + }) + + sl, err = miner.StorageList(ctx) + require.NoError(t, err) + for si, d := range sl { + i, err := miner.StorageInfo(ctx, si) + require.NoError(t, err) + + fmt.Printf("stor d:%d %+v\n", len(d), i) + } +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 3d1d08071..8508850d3 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -882,33 +882,37 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error }, nil } +func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config { + return sealiface.Config{ + MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, + MaxSealingSectors: cfg.Sealing.MaxSealingSectors, + MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, + WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, + FinalizeEarly: cfg.Sealing.FinalizeEarly, + + BatchPreCommits: cfg.Sealing.BatchPreCommits, + MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, + PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), + PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), + + AggregateCommits: cfg.Sealing.AggregateCommits, + MinCommitBatch: cfg.Sealing.MinCommitBatch, + MaxCommitBatch: cfg.Sealing.MaxCommitBatch, + CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), + CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), + AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee), + + TerminateBatchMax: cfg.Sealing.TerminateBatchMax, + TerminateBatchMin: cfg.Sealing.TerminateBatchMin, + TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), + } +} + func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { return func() (out sealiface.Config, err error) { err = readCfg(r, func(cfg *config.StorageMiner) { - out = sealiface.Config{ - MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, - MaxSealingSectors: cfg.Sealing.MaxSealingSectors, - MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, - WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), - AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, - FinalizeEarly: cfg.Sealing.FinalizeEarly, - - BatchPreCommits: cfg.Sealing.BatchPreCommits, - MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, - PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), - PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), - - AggregateCommits: cfg.Sealing.AggregateCommits, - MinCommitBatch: cfg.Sealing.MinCommitBatch, - MaxCommitBatch: cfg.Sealing.MaxCommitBatch, - CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), - CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), - AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee), - - TerminateBatchMax: cfg.Sealing.TerminateBatchMax, - TerminateBatchMin: cfg.Sealing.TerminateBatchMin, - TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), - } + out = ToSealingConfig(cfg) }) return }, nil