storage: Fix FinalizeSector with sectors in stoage paths
This commit is contained in:
parent
20c8250872
commit
8a94ab676e
@ -826,6 +826,11 @@ workflows:
|
|||||||
suite: itest-sdr_upgrade
|
suite: itest-sdr_upgrade
|
||||||
target: "./itests/sdr_upgrade_test.go"
|
target: "./itests/sdr_upgrade_test.go"
|
||||||
|
|
||||||
|
- test:
|
||||||
|
name: test-itest-sector_finalize_early
|
||||||
|
suite: itest-sector_finalize_early
|
||||||
|
target: "./itests/sector_finalize_early_test.go"
|
||||||
|
|
||||||
- test:
|
- test:
|
||||||
name: test-itest-sector_pledge
|
name: test-itest-sector_pledge
|
||||||
suite: itest-sector_pledge
|
suite: itest-sector_pledge
|
||||||
|
@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !(cfg.CanStore || cfg.CanSeal) {
|
if !(cfg.CanStore || cfg.CanSeal) {
|
||||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.MarshalIndent(cfg, "", " ")
|
b, err := json.MarshalIndent(cfg, "", " ")
|
||||||
|
@ -145,7 +145,7 @@ over time
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !(cfg.CanStore || cfg.CanSeal) {
|
if !(cfg.CanStore || cfg.CanSeal) {
|
||||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := json.MarshalIndent(cfg, "", " ")
|
b, err := json.MarshalIndent(cfg, "", " ")
|
||||||
|
17
extern/sector-storage/manager.go
vendored
17
extern/sector-storage/manager.go
vendored
@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pathType := storiface.PathStorage
|
||||||
|
{
|
||||||
|
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("finding sealed sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, store := range sealedStores {
|
||||||
|
if store.CanSeal {
|
||||||
|
pathType = storiface.PathSealing
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
||||||
return err
|
return err
|
||||||
|
@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
|
|||||||
|
|
||||||
kit.QuietMiningLogs()
|
kit.QuietMiningLogs()
|
||||||
|
|
||||||
var blockTime = 1 * time.Second
|
var blockTime = 50 * time.Millisecond
|
||||||
|
|
||||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs.
|
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs.
|
||||||
ens.InterconnectAll().BeginMining(blockTime)
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
@ -2,22 +2,29 @@ package kit
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||||
"github.com/filecoin-project/lotus/miner"
|
"github.com/filecoin-project/lotus/miner"
|
||||||
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
|
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestMiner represents a miner enrolled in an Ensemble.
|
// TestMiner represents a miner enrolled in an Ensemble.
|
||||||
@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
|
|||||||
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const metaFile = "sectorstore.json"
|
||||||
|
|
||||||
|
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
|
||||||
|
p, err := ioutil.TempDir("", "lotus-testsectors-")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
if err := os.MkdirAll(p, 0755); err != nil {
|
||||||
|
if !os.IsExist(err) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = os.Stat(filepath.Join(p, metaFile))
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := &stores.LocalStorageMeta{
|
||||||
|
ID: stores.ID(uuid.New().String()),
|
||||||
|
Weight: weight,
|
||||||
|
CanSeal: seal,
|
||||||
|
CanStore: store,
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(cfg.CanStore || cfg.CanSeal) {
|
||||||
|
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := json.MarshalIndent(cfg, "", " ")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = tm.StorageAddLocal(ctx, p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
66
itests/sector_finalize_early_test.go
Normal file
66
itests/sector_finalize_early_test.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package itests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
|
"github.com/filecoin-project/lotus/node"
|
||||||
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDealsWithFinalizeEarly(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
kit.QuietMiningLogs()
|
||||||
|
|
||||||
|
var blockTime = 50 * time.Millisecond
|
||||||
|
|
||||||
|
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
|
||||||
|
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
||||||
|
return func() (sealiface.Config, error) {
|
||||||
|
cf := config.DefaultStorageMiner()
|
||||||
|
cf.Sealing.FinalizeEarly = true
|
||||||
|
return modules.ToSealingConfig(cf), nil
|
||||||
|
}, nil
|
||||||
|
})))) // no mock proofs.
|
||||||
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
dh := kit.NewDealHarness(t, client, miner)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
miner.AddStorage(ctx, t, 1000000000, true, false)
|
||||||
|
miner.AddStorage(ctx, t, 1000000000, false, true)
|
||||||
|
|
||||||
|
sl, err := miner.StorageList(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for si, d := range sl {
|
||||||
|
i, err := miner.StorageInfo(ctx, si)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("single", func(t *testing.T) {
|
||||||
|
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
|
||||||
|
})
|
||||||
|
|
||||||
|
sl, err = miner.StorageList(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
for si, d := range sl {
|
||||||
|
i, err := miner.StorageInfo(ctx, si)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||||
|
}
|
||||||
|
}
|
@ -882,33 +882,37 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
|
||||||
|
return sealiface.Config{
|
||||||
|
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
|
||||||
|
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
||||||
|
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
||||||
|
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
|
||||||
|
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
|
||||||
|
FinalizeEarly: cfg.Sealing.FinalizeEarly,
|
||||||
|
|
||||||
|
BatchPreCommits: cfg.Sealing.BatchPreCommits,
|
||||||
|
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
|
||||||
|
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
|
||||||
|
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
|
||||||
|
|
||||||
|
AggregateCommits: cfg.Sealing.AggregateCommits,
|
||||||
|
MinCommitBatch: cfg.Sealing.MinCommitBatch,
|
||||||
|
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
|
||||||
|
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
|
||||||
|
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
|
||||||
|
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
|
||||||
|
|
||||||
|
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
|
||||||
|
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
|
||||||
|
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
||||||
return func() (out sealiface.Config, err error) {
|
return func() (out sealiface.Config, err error) {
|
||||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||||
out = sealiface.Config{
|
out = ToSealingConfig(cfg)
|
||||||
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
|
|
||||||
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
|
||||||
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
|
||||||
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
|
|
||||||
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
|
|
||||||
FinalizeEarly: cfg.Sealing.FinalizeEarly,
|
|
||||||
|
|
||||||
BatchPreCommits: cfg.Sealing.BatchPreCommits,
|
|
||||||
MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch,
|
|
||||||
PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait),
|
|
||||||
PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack),
|
|
||||||
|
|
||||||
AggregateCommits: cfg.Sealing.AggregateCommits,
|
|
||||||
MinCommitBatch: cfg.Sealing.MinCommitBatch,
|
|
||||||
MaxCommitBatch: cfg.Sealing.MaxCommitBatch,
|
|
||||||
CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait),
|
|
||||||
CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack),
|
|
||||||
AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee),
|
|
||||||
|
|
||||||
TerminateBatchMax: cfg.Sealing.TerminateBatchMax,
|
|
||||||
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
|
|
||||||
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}, nil
|
}, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user