Merge pull request #8291 from filecoin-project/fix/fsm-input-locking

fix: sealing fsm: Handle inputLk correctly
This commit is contained in:
Aayush Rajasekaran 2022-03-11 10:02:26 -05:00 committed by GitHub
commit 362c73bfbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 47 deletions

View File

@ -426,11 +426,19 @@ func generateFakePoSt(sectorInfo []proof.SectorInfo, rpt func(abi.RegisteredSeal
} }
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) { func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 { off := storiface.UnpaddedByteIndex(0)
panic("implme") var piece cid.Cid
for _, c := range mgr.sectors[sector.ID].pieces {
piece = c
if off >= offset {
break
}
off += storiface.UnpaddedByteIndex(len(mgr.pieces[piece]))
} }
if off > offset {
br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size]) panic("non-aligned offset todo")
}
br := bytes.NewReader(mgr.pieces[piece][:size])
return struct { return struct {
io.ReadCloser io.ReadCloser

View File

@ -315,25 +315,21 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
m.inputLk.Unlock() m.inputLk.Unlock()
// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful // we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for { res, err := waitAddPieceResp(ctx, pp)
res, err := waitAddPieceResp(ctx, pp) if err != nil {
if err != nil { return api.SectorOffset{}, err
return api.SectorOffset{}, err }
} if res.err == nil {
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
// all good, return the response // all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
} }
// if there was an error waiting for a pre-existing add piece call, let's retry
m.inputLk.Lock()
} }
// addPendingPiece takes over m.inputLk
pp := m.addPendingPiece(ctx, size, data, deal, sp) pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
res, err := waitAddPieceResp(ctx, pp) res, err := waitAddPieceResp(ctx, pp)
if err != nil { if err != nil {
return api.SectorOffset{}, err return api.SectorOffset{}, err
@ -341,6 +337,7 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
} }
// called with m.inputLk; transfers the lock to another goroutine!
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece { func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{}) doneCh := make(chan struct{})
pp := &pendingPiece{ pp := &pendingPiece{
@ -357,6 +354,7 @@ func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSiz
m.pendingPieces[proposalCID(deal)] = pp m.pendingPieces[proposalCID(deal)] = pp
go func() { go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil { if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
} }

View File

@ -9,16 +9,18 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/stretchr/testify/require"
) )
func TestBatchDealInput(t *testing.T) { func TestBatchDealInput(t *testing.T) {
t.Skip("this test is disabled as it's flaky: #4611")
kit.QuietMiningLogs() kit.QuietMiningLogs()
var ( var (
@ -47,17 +49,20 @@ func TestBatchDealInput(t *testing.T) {
})), })),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) { return func() (sealiface.Config, error) {
return sealiface.Config{ sc := modules.ToSealingConfig(config.DefaultStorageMiner())
MaxWaitDealsSectors: 2, sc.MaxWaitDealsSectors = 2
MaxSealingSectors: 1, sc.MaxSealingSectors = 1
MaxSealingSectorsForDeals: 3, sc.MaxSealingSectorsForDeals = 3
AlwaysKeepUnsealedCopy: true, sc.AlwaysKeepUnsealedCopy = true
WaitDealsDelay: time.Hour, sc.WaitDealsDelay = time.Hour
}, nil sc.BatchPreCommits = false
sc.AggregateCommits = false
return sc, nil
}, nil }, nil
}), }),
)) ))
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts) client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts, kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime) ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner, miner) dh := kit.NewDealHarness(t, client, miner, miner)
@ -126,9 +131,9 @@ func TestBatchDealInput(t *testing.T) {
t.Run("4-p513B", run(513, 4, 2)) t.Run("4-p513B", run(513, 4, 2))
if !testing.Short() { if !testing.Short() {
t.Run("32-p257B", run(257, 32, 8)) t.Run("32-p257B", run(257, 32, 8))
t.Run("32-p10B", run(10, 32, 2))
// fixme: this appears to break data-transfer / markets in some really creative ways // fixme: this appears to break data-transfer / markets in some really creative ways
//t.Run("32-p10B", run(10, 32, 2))
// t.Run("128-p10B", run(10, 128, 8)) // t.Run("128-p10B", run(10, 128, 8))
} }
} }

View File

@ -17,6 +17,8 @@ import (
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -40,29 +42,31 @@ func TestMinerBalanceCollateral(t *testing.T) {
opts := kit.ConstructorOpts( opts := kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) { return func() (sealiface.Config, error) {
return sealiface.Config{ sc := modules.ToSealingConfig(config.DefaultStorageMiner())
MaxWaitDealsSectors: 4,
MaxSealingSectors: 4,
MaxSealingSectorsForDeals: 4,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,
BatchPreCommits: batching, sc.MaxWaitDealsSectors = 4
AggregateCommits: batching, sc.MaxSealingSectors = 4
sc.MaxSealingSectorsForDeals = 4
sc.AlwaysKeepUnsealedCopy = true
sc.WaitDealsDelay = time.Hour
PreCommitBatchWait: time.Hour, sc.BatchPreCommits = batching
CommitBatchWait: time.Hour, sc.AggregateCommits = batching
MinCommitBatch: nSectors, sc.PreCommitBatchWait = time.Hour
MaxPreCommitBatch: nSectors, sc.CommitBatchWait = time.Hour
MaxCommitBatch: nSectors,
CollateralFromMinerBalance: enabled, sc.MinCommitBatch = nSectors
AvailableBalanceBuffer: big.Zero(), sc.MaxPreCommitBatch = nSectors
DisableCollateralFallback: false, sc.MaxCommitBatch = nSectors
AggregateAboveBaseFee: big.Zero(),
BatchPreCommitAboveBaseFee: big.Zero(), sc.CollateralFromMinerBalance = enabled
}, nil sc.AvailableBalanceBuffer = big.Zero()
sc.DisableCollateralFallback = false
sc.AggregateAboveBaseFee = big.Zero()
sc.BatchPreCommitAboveBaseFee = big.Zero()
return sc, nil
}, nil }, nil
})), })),
) )