From 2d80e75e19d0a38f6793ace929c2f5a4b28c3ad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 12 Mar 2024 17:11:50 +0100 Subject: [PATCH] fix: commit batch: Always go through commit batcher (#11704) * fix: commit batch: Always go through commit batcher * fix sealing fsm tests --- storage/pipeline/commit_batch.go | 6 +- storage/pipeline/fsm.go | 6 +- storage/pipeline/fsm_test.go | 23 ++++---- storage/pipeline/sector_state.go | 2 +- storage/pipeline/states_sealing.go | 88 ++---------------------------- 5 files changed, 21 insertions(+), 104 deletions(-) diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index a4688b741..d702d3078 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -209,7 +209,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, return nil, xerrors.Errorf("getting config: %w", err) } - if notif && total < cfg.MaxCommitBatch { + if notif && total < cfg.MaxCommitBatch && cfg.AggregateCommits { return nil, nil } @@ -233,7 +233,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, return false } - individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() + individual := (total < cfg.MinCommitBatch) || (total < miner.MinAggregatedSectors) || blackedOut() || !cfg.AggregateCommits if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) { if ts.MinTicketBlock().ParentBaseFee.LessThan(cfg.AggregateAboveBaseFee) { @@ -443,7 +443,7 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors2Params: %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitSectors3Params: %w", err) } _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSectors3, needFunds, maxFee, enc.Bytes()) diff --git a/storage/pipeline/fsm.go b/storage/pipeline/fsm.go index ac3dafa86..6829f5210 100644 --- a/storage/pipeline/fsm.go +++ b/storage/pipeline/fsm.go @@ -127,8 +127,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto ), Committing: planCommitting, CommitFinalize: planOne( - on(SectorFinalized{}, SubmitCommit), - on(SectorFinalizedAvailable{}, SubmitCommit), + on(SectorFinalized{}, SubmitCommitAggregate), + on(SectorFinalizedAvailable{}, SubmitCommitAggregate), on(SectorFinalizeFailed{}, CommitFinalizeFailed), ), SubmitCommit: planOne( @@ -674,7 +674,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err } case SectorCommitted: // the normal case e.apply(state) - state.State = SubmitCommit + state.State = SubmitCommitAggregate case SectorProofReady: // early finalize e.apply(state) state.State = CommitFinalize diff --git a/storage/pipeline/fsm_test.go b/storage/pipeline/fsm_test.go index 7d7201953..c403fb129 100644 --- a/storage/pipeline/fsm_test.go +++ b/storage/pipeline/fsm_test.go @@ -70,10 +70,10 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, Committing) m.planSingle(SectorCommitted{}) - require.Equal(m.t, m.state.State, SubmitCommit) + require.Equal(m.t, m.state.State, SubmitCommitAggregate) - m.planSingle(SectorCommitSubmitted{}) - require.Equal(m.t, m.state.State, CommitWait) + m.planSingle(SectorCommitAggregateSent{}) + require.Equal(m.t, m.state.State, CommitAggregateWait) m.planSingle(SectorProving{}) require.Equal(m.t, m.state.State, FinalizeSector) @@ -81,7 +81,7 @@ func TestHappyPath(t *testing.T) { m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, Proving) - expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving} + expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving} for i, n := range notif { if n.before.State != expected[i] { t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State) @@ -135,9 +135,6 @@ func TestHappyPathFinalizeEarly(t *testing.T) { require.Equal(m.t, m.state.State, CommitFinalize) m.planSingle(SectorFinalized{}) - require.Equal(m.t, m.state.State, SubmitCommit) - - m.planSingle(SectorSubmitCommitAggregate{}) require.Equal(m.t, m.state.State, SubmitCommitAggregate) m.planSingle(SectorCommitAggregateSent{}) @@ -149,7 +146,7 @@ func TestHappyPathFinalizeEarly(t *testing.T) { m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, Proving) - expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommit, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving} + expected := []SectorState{Packing, GetTicket, PreCommit1, PreCommit2, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector, Proving} for i, n := range notif { if n.before.State != expected[i] { t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State) @@ -188,9 +185,9 @@ func TestCommitFinalizeFailed(t *testing.T) { require.Equal(m.t, m.state.State, CommitFinalize) m.planSingle(SectorFinalized{}) - require.Equal(m.t, m.state.State, SubmitCommit) + require.Equal(m.t, m.state.State, SubmitCommitAggregate) - expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommit} + expected := []SectorState{Committing, CommitFinalize, CommitFinalizeFailed, CommitFinalize, SubmitCommitAggregate} for i, n := range notif { if n.before.State != expected[i] { t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State) @@ -242,10 +239,10 @@ func TestSeedRevert(t *testing.T) { // not changing the seed this time _, _, err = m.s.plan([]statemachine.Event{{User: SectorSeedReady{SeedValue: nil, SeedEpoch: 5}}, {User: SectorCommitted{}}}, m.state) require.NoError(t, err) - require.Equal(m.t, m.state.State, SubmitCommit) + require.Equal(m.t, m.state.State, SubmitCommitAggregate) - m.planSingle(SectorCommitSubmitted{}) - require.Equal(m.t, m.state.State, CommitWait) + m.planSingle(SectorCommitAggregateSent{}) + require.Equal(m.t, m.state.State, CommitAggregateWait) m.planSingle(SectorProving{}) require.Equal(m.t, m.state.State, FinalizeSector) diff --git a/storage/pipeline/sector_state.go b/storage/pipeline/sector_state.go index e1f5bfd69..9e7f75171 100644 --- a/storage/pipeline/sector_state.go +++ b/storage/pipeline/sector_state.go @@ -94,7 +94,7 @@ const ( CommitFinalizeFailed SectorState = "CommitFinalizeFailed" // single commit - SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain + SubmitCommit SectorState = "SubmitCommit" // send commit message to the chain (deprecated) CommitWait SectorState = "CommitWait" // wait for the commit message to land on chain SubmitCommitAggregate SectorState = "SubmitCommitAggregate" diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index aef394789..4f40ac7c7 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -18,7 +18,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" actorstypes "github.com/filecoin-project/go-state-types/actors" "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/go-state-types/builtin" miner2 "github.com/filecoin-project/go-state-types/builtin/v13/miner" verifreg13 "github.com/filecoin-project/go-state-types/builtin/v13/verifreg" "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" @@ -740,89 +739,10 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error { - // TODO: Deprecate this path, always go through batcher, just respect the AggregateCommits config in there - - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting config: %w", err) - } - - if cfg.AggregateCommits { - nv, err := m.Api.StateNetworkVersion(ctx.Context(), types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting network version: %w", err) - } - - if nv >= network.Version13 { - return ctx.Send(SectorSubmitCommitAggregate{}) - } - } - - ts, err := m.Api.ChainHead(ctx.Context()) - if err != nil { - log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) - return nil - } - - if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) - } - - enc := new(bytes.Buffer) - params := &miner.ProveCommitSectorParams{ - SectorNumber: sector.SectorNumber, - Proof: sector.Proof, - } - - if err := params.MarshalCBOR(enc); err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)}) - } - - mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key()) - if err != nil { - log.Errorf("handleCommitting: api error, not proceeding: %+v", err) - return nil - } - - pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) - if err != nil { - return xerrors.Errorf("getting precommit info: %w", err) - } - if pci == nil { - return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")}) - } - - collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, ts.Key()) - if err != nil { - return xerrors.Errorf("getting initial pledge collateral: %w", err) - } - - collateral = big.Sub(collateral, pci.PreCommitDeposit) - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() - } - - collateral, err = collateralSendAmount(ctx.Context(), m.Api, m.maddr, cfg, collateral) - if err != nil { - return err - } - - goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee)) - - from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral) - if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from: %w", err)}) - } - - // TODO: check seed / ticket / deals are up to date - mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) - if err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) - } - - return ctx.Send(SectorCommitSubmitted{ - Message: mcid, - }) + // like precommit this is a deprecated state, but we keep it around for + // existing state machines + // todo: drop after nv21 + return ctx.Send(SectorSubmitCommitAggregate{}) } // processPieces returns either: