Merge pull request #6653 from filecoin-project/fix/finalize-in-storage

storage: Fix FinalizeSector with sectors in stoage paths
This commit is contained in:
Łukasz Magiera 2021-07-02 20:38:25 +02:00 committed by GitHub
commit e2f48b21ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 164 additions and 29 deletions

View File

@ -826,6 +826,11 @@ workflows:
suite: itest-sdr_upgrade suite: itest-sdr_upgrade
target: "./itests/sdr_upgrade_test.go" target: "./itests/sdr_upgrade_test.go"
- test:
name: test-itest-sector_finalize_early
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"
- test: - test:
name: test-itest-sector_pledge name: test-itest-sector_pledge
suite: itest-sector_pledge suite: itest-sector_pledge

View File

@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
} }
if !(cfg.CanStore || cfg.CanSeal) { if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal") return xerrors.Errorf("must specify at least one of --store or --seal")
} }
b, err := json.MarshalIndent(cfg, "", " ") b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -145,7 +145,7 @@ over time
} }
if !(cfg.CanStore || cfg.CanSeal) { if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal") return xerrors.Errorf("must specify at least one of --store or --seal")
} }
b, err := json.MarshalIndent(cfg, "", " ") b, err := json.MarshalIndent(cfg, "", " ")

View File

@ -528,10 +528,25 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
} }
} }
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
for _, store := range sealedStores {
if store.CanSeal {
pathType = storiface.PathSealing
break
}
}
}
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false) selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
return err return err

View File

@ -14,7 +14,7 @@ func TestDealsWithSealingAndRPC(t *testing.T) {
kit.QuietMiningLogs() kit.QuietMiningLogs()
var blockTime = 1 * time.Second var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs. client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime) ens.InterconnectAll().BeginMining(blockTime)

View File

@ -2,22 +2,29 @@ package kit
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io/ioutil"
"os"
"path/filepath"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
) )
// TestMiner represents a miner enrolled in an Ensemble. // TestMiner represents a miner enrolled in an Ensemble.
@ -119,3 +126,41 @@ func (tm *TestMiner) FlushSealingBatches(ctx context.Context) {
fmt.Printf("COMMIT BATCH: %+v\n", cb) fmt.Printf("COMMIT BATCH: %+v\n", cb)
} }
} }
const metaFile = "sectorstore.json"
func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64, seal, store bool) {
p, err := ioutil.TempDir("", "lotus-testsectors-")
require.NoError(t, err)
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
require.NoError(t, err)
}
}
_, err = os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
require.NoError(t, err)
}
cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: weight,
CanSeal: seal,
CanStore: store,
}
if !(cfg.CanStore || cfg.CanSeal) {
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
}
b, err := json.MarshalIndent(cfg, "", " ")
require.NoError(t, err)
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
require.NoError(t, err)
err = tm.StorageAddLocal(ctx, p)
require.NoError(t, err)
}

View File

@ -0,0 +1,66 @@
package itests
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
func TestDealsWithFinalizeEarly(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
kit.QuietMiningLogs()
var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
cf := config.DefaultStorageMiner()
cf.Sealing.FinalizeEarly = true
return modules.ToSealingConfig(cf), nil
}, nil
})))) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner)
ctx := context.Background()
miner.AddStorage(ctx, t, 1000000000, true, false)
miner.AddStorage(ctx, t, 1000000000, false, true)
sl, err := miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)
fmt.Printf("stor d:%d %+v\n", len(d), i)
}
t.Run("single", func(t *testing.T) {
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{N: 1})
})
sl, err = miner.StorageList(ctx)
require.NoError(t, err)
for si, d := range sl {
i, err := miner.StorageInfo(ctx, si)
require.NoError(t, err)
fmt.Printf("stor d:%d %+v\n", len(d), i)
}
}

View File

@ -882,10 +882,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
}, nil }, nil
} }
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
return func() (out sealiface.Config, err error) { return sealiface.Config{
err = readCfg(r, func(cfg *config.StorageMiner) {
out = sealiface.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors, MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals,
@ -909,6 +907,12 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error
TerminateBatchMin: cfg.Sealing.TerminateBatchMin, TerminateBatchMin: cfg.Sealing.TerminateBatchMin,
TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait),
} }
}
func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
return func() (out sealiface.Config, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {
out = ToSealingConfig(cfg)
}) })
return return
}, nil }, nil