storage: Fix FinalizeSector with sectors in stoage paths
This commit is contained in:
parent
da0920584c
commit
6610965f87
@ -826,6 +826,11 @@ workflows:
|
||||
suite: itest-sdr_upgrade
|
||||
target: "./itests/sdr_upgrade_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-sector_finalize_early
|
||||
suite: itest-sector_finalize_early
|
||||
target: "./itests/sector_finalize_early_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-sector_pledge
|
||||
suite: itest-sector_pledge
|
||||
|
@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
||||
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(cfg, "", " ")
|
||||
|
@ -145,7 +145,7 @@ over time
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
return xerrors.Errorf("must specify at least one of --store of --seal")
|
||||
return xerrors.Errorf("must specify at least one of --store or --seal")
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(cfg, "", " ")
|
||||
|
17
extern/sector-storage/manager.go
vendored
17
extern/sector-storage/manager.go
vendored
@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
||||
}
|
||||
}
|
||||
|
||||
pathType := storiface.PathStorage
|
||||
{
|
||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding sealed sector: %w", err)
|
||||
}
|
||||
|
||||
for _, store := range sealedStores {
|
||||
if store.CanSeal {
|
||||
pathType = storiface.PathSealing
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
|
||||
|
||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
||||
return err
|
||||
|
@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
var blockTime = 1 * time.Second
|
||||
var blockTime = 50 * time.Millisecond
|
||||
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs.
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
|
@ -2,22 +2,29 @@ package kit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestMiner represents a miner enrolled in an Ensemble.
|
||||
@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
|
||||
fmt.Printf("COMMIT BATCH: %+v\n", cb)
|
||||
}
|
||||
}
|
||||
|
||||
const metaFile = "sectorstore.json"
|
||||
|
||||
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
|
||||
p, err := ioutil.TempDir("", "lotus-testsectors-")
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := os.MkdirAll(p, 0755); err != nil {
|
||||
if !os.IsExist(err) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = os.Stat(filepath.Join(p, metaFile))
|
||||
if !os.IsNotExist(err) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
cfg := &stores.LocalStorageMeta{
|
||||
ID: stores.ID(uuid.New().String()),
|
||||
Weight: weight,
|
||||
CanSeal: seal,
|
||||
CanStore: store,
|
||||
}
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(cfg, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = tm.StorageAddLocal(ctx, p)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
66
itests/sector_finalize_early_test.go
Normal file
66
itests/sector_finalize_early_test.go
Normal file
@ -0,0 +1,66 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
func TestDealsWithFinalizeEarly(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode")
|
||||
}
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
var blockTime = 50 * time.Millisecond
|
||||
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
|
||||
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
||||
return func() (sealiface.Config, error) {
|
||||
cf := config.DefaultStorageMiner()
|
||||
cf.Sealing.FinalizeEarly = true
|
||||
return modules.ToSealingConfig(cf), nil
|
||||
}, nil
|
||||
})))) // no mock proofs.
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
dh := kit.NewDealHarness(t, client, miner)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
miner.AddStorage(ctx, t, 1000000000, true, false)
|
||||
miner.AddStorage(ctx, t, 1000000000, false, true)
|
||||
|
||||
sl, err := miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
for si, d := range sl {
|
||||
i, err := miner.StorageInfo(ctx, si)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||
}
|
||||
|
||||
t.Run("single", func(t *testing.T) {
|
||||
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
|
||||
})
|
||||
|
||||
sl, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
for si, d := range sl {
|
||||
i, err := miner.StorageInfo(ctx, si)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("stor d:%d %+v\n", len(d), i)
|
||||
}
|
||||
}
|
@ -882,10 +882,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
||||
return func() (out sealiface.Config, err error) {
|
||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||
out = sealiface.Config{
|
||||
func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
|
||||
return sealiface.Config{
|
||||
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
|
||||
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
|
||||
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
|
||||
@ -909,6 +907,12 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
|
||||
TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
|
||||
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
|
||||
}
|
||||
}
|
||||
|
||||
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
|
||||
return func() (out sealiface.Config, err error) {
|
||||
err = readCfg(r, func(cfg *config.StorageMiner) {
|
||||
out = ToSealingConfig(cfg)
|
||||
})
|
||||
return
|
||||
}, nil
|
||||
|
Loading…
Reference in New Issue
Block a user