From ad66ad4f7d5fcb1533f5c2a727fe2065ea6196fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 10 Mar 2022 20:24:26 +0100 Subject: [PATCH] fix: sealing fsm: Handle inputLk correctly --- extern/sector-storage/mock/mock.go | 16 +++++++--- extern/storage-sealing/input.go | 24 +++++++-------- itests/batch_deal_test.go | 27 ++++++++++------- itests/sector_miner_collateral_test.go | 42 ++++++++++++++------------ 4 files changed, 62 insertions(+), 47 deletions(-) diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index 20abad309..ecaeaa168 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -426,11 +426,19 @@ func generateFakePoSt(sectorInfo []proof.SectorInfo, rpt func(abi.RegisteredSeal } func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) { - if uint64(offset) != 0 { - panic("implme") + off := storiface.UnpaddedByteIndex(0) + var piece cid.Cid + for _, c := range mgr.sectors[sector.ID].pieces { + piece = c + if off >= offset { + break + } + off += storiface.UnpaddedByteIndex(len(mgr.pieces[piece])) } - - br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size]) + if off > offset { + panic("non-aligned offset todo") + } + br := bytes.NewReader(mgr.pieces[piece][:size]) return struct { io.ReadCloser diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index c999badfd..e644cd848 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -315,25 +315,21 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec m.inputLk.Unlock() // we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful - for { - res, err := waitAddPieceResp(ctx, pp) - if err != nil { - return api.SectorOffset{}, err - } - // there was an error waiting for a pre-existing add piece call, let's retry - if res.err != nil { - m.inputLk.Lock() - pp = m.addPendingPiece(ctx, size, data, deal, sp) - m.inputLk.Unlock() - continue - } + res, err := waitAddPieceResp(ctx, pp) + if err != nil { + return api.SectorOffset{}, err + } + if res.err == nil { // all good, return the response return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err } + // if there was an error waiting for a pre-existing add piece call, let's retry + m.inputLk.Lock() } + // addPendingPiece takes over m.inputLk pp := m.addPendingPiece(ctx, size, data, deal, sp) - m.inputLk.Unlock() + res, err := waitAddPieceResp(ctx, pp) if err != nil { return api.SectorOffset{}, err @@ -341,6 +337,7 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err } +// called with m.inputLk; transfers the lock to another goroutine! func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece { doneCh := make(chan struct{}) pp := &pendingPiece{ @@ -357,6 +354,7 @@ func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSiz m.pendingPieces[proposalCID(deal)] = pp go func() { + defer m.inputLk.Unlock() if err := m.updateInput(ctx, sp); err != nil { log.Errorf("%+v", err) } diff --git a/itests/batch_deal_test.go b/itests/batch_deal_test.go index 93d338e53..0db90df23 100644 --- a/itests/batch_deal_test.go +++ b/itests/batch_deal_test.go @@ -9,16 +9,18 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/stretchr/testify/require" ) func TestBatchDealInput(t *testing.T) { - t.Skip("this test is disabled as it's flaky: #4611") kit.QuietMiningLogs() var ( @@ -47,17 +49,20 @@ func TestBatchDealInput(t *testing.T) { })), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { return func() (sealiface.Config, error) { - return sealiface.Config{ - MaxWaitDealsSectors: 2, - MaxSealingSectors: 1, - MaxSealingSectorsForDeals: 3, - AlwaysKeepUnsealedCopy: true, - WaitDealsDelay: time.Hour, - }, nil + sc := modules.ToSealingConfig(config.DefaultStorageMiner()) + sc.MaxWaitDealsSectors = 2 + sc.MaxSealingSectors = 1 + sc.MaxSealingSectorsForDeals = 3 + sc.AlwaysKeepUnsealedCopy = true + sc.WaitDealsDelay = time.Hour + sc.BatchPreCommits = false + sc.AggregateCommits = false + + return sc, nil }, nil }), )) - client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts) + client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts, kit.ThroughRPC()) ens.InterconnectAll().BeginMining(blockTime) dh := kit.NewDealHarness(t, client, miner, miner) @@ -126,9 +131,9 @@ func TestBatchDealInput(t *testing.T) { t.Run("4-p513B", run(513, 4, 2)) if !testing.Short() { t.Run("32-p257B", run(257, 32, 8)) - t.Run("32-p10B", run(10, 32, 2)) // fixme: this appears to break data-transfer / markets in some really creative ways + //t.Run("32-p10B", run(10, 32, 2)) // t.Run("128-p10B", run(10, 128, 8)) } } diff --git a/itests/sector_miner_collateral_test.go b/itests/sector_miner_collateral_test.go index af67b132b..81aefc3ad 100644 --- a/itests/sector_miner_collateral_test.go +++ b/itests/sector_miner_collateral_test.go @@ -17,6 +17,8 @@ import ( "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" ) @@ -40,29 +42,31 @@ func TestMinerBalanceCollateral(t *testing.T) { opts := kit.ConstructorOpts( node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { return func() (sealiface.Config, error) { - return sealiface.Config{ - MaxWaitDealsSectors: 4, - MaxSealingSectors: 4, - MaxSealingSectorsForDeals: 4, - AlwaysKeepUnsealedCopy: true, - WaitDealsDelay: time.Hour, + sc := modules.ToSealingConfig(config.DefaultStorageMiner()) - BatchPreCommits: batching, - AggregateCommits: batching, + sc.MaxWaitDealsSectors = 4 + sc.MaxSealingSectors = 4 + sc.MaxSealingSectorsForDeals = 4 + sc.AlwaysKeepUnsealedCopy = true + sc.WaitDealsDelay = time.Hour - PreCommitBatchWait: time.Hour, - CommitBatchWait: time.Hour, + sc.BatchPreCommits = batching + sc.AggregateCommits = batching - MinCommitBatch: nSectors, - MaxPreCommitBatch: nSectors, - MaxCommitBatch: nSectors, + sc.PreCommitBatchWait = time.Hour + sc.CommitBatchWait = time.Hour - CollateralFromMinerBalance: enabled, - AvailableBalanceBuffer: big.Zero(), - DisableCollateralFallback: false, - AggregateAboveBaseFee: big.Zero(), - BatchPreCommitAboveBaseFee: big.Zero(), - }, nil + sc.MinCommitBatch = nSectors + sc.MaxPreCommitBatch = nSectors + sc.MaxCommitBatch = nSectors + + sc.CollateralFromMinerBalance = enabled + sc.AvailableBalanceBuffer = big.Zero() + sc.DisableCollateralFallback = false + sc.AggregateAboveBaseFee = big.Zero() + sc.BatchPreCommitAboveBaseFee = big.Zero() + + return sc, nil }, nil })), )