From 1e25d7b4531f8c20a28ada7d277a0f0b85e1c29b Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Mon, 10 Apr 2023 14:52:39 -0400 Subject: [PATCH 1/7] Split precommit batches if gas used exceeds block limit --- node/impl/full/gas.go | 3 ++ storage/pipeline/precommit_batch.go | 52 +++++++++++++++++++++-------- storage/pipeline/sealing.go | 1 + storage/pipeline/utils.go | 15 +++++++++ 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/node/impl/full/gas.go b/node/impl/full/gas.go index b07d658e2..33d047873 100644 --- a/node/impl/full/gas.go +++ b/node/impl/full/gas.go @@ -384,10 +384,13 @@ func gasEstimateGasLimit( func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, _ types.TipSetKey) (*types.Message, error) { if msg.GasLimit == 0 { gasLimit, err := m.GasEstimateGasLimit(ctx, msg, types.EmptyTSK) + log.Errorf("GasEstimateMessageGas GasLimit: %f", gasLimit) if err != nil { return nil, err } msg.GasLimit = int64(float64(gasLimit) * m.Mpool.GetConfig().GasLimitOverestimation) + + log.Errorf("GasEstimateMessageGas GasLimit: %f, GasLimitWithOverestimation", gasLimit, msg.GasLimit) // Gas overestimation can cause us to exceed the block gas limit, cap it. if msg.GasLimit > build.BlockGasLimit { msg.GasLimit = build.BlockGasLimit diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 9bc624329..6817df0e5 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -31,6 +31,7 @@ import ( type PreCommitBatcherApi interface { MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) ChainHead(ctx context.Context) (*types.TipSet, error) @@ -319,17 +320,13 @@ func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, return mcid, nil } -func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKey, bf abi.TokenAmount, nv network.Version) ([]sealiface.PreCommitBatchRes, error) { +func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.TokenAmount, entries []*preCommitEntry, nv network.Version) ([]sealiface.PreCommitBatchRes, error) { + log.Errorf("!!!!!!!!!!!!!!!!processPreCommitBatch: %d entries!!!!!!!!!!!!!!!!", len(entries)) params := miner.PreCommitSectorBatchParams{} deposit := big.Zero() var res sealiface.PreCommitBatchRes - for _, p := range b.todo { - if len(params.Sectors) >= cfg.MaxPreCommitBatch { - log.Infow("precommit batch full") - break - } - + for _, p := range entries { res.Sectors = append(res.Sectors, p.pci.SectorNumber) params.Sectors = append(params.Sectors, *infoToPreCommitSectorParams(p.pci)) deposit = big.Add(deposit, p.deposit) @@ -368,18 +365,47 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKe return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } - mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes()) - if err != nil { - return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) + msg, err := simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, deposit, maxFee, enc.Bytes()) + + var gasLimit int64 + if msg != nil { + gasLimit = msg.GasLimit + } + log.Errorf("!!!!!!!!!!!!!!simulate Msg err: %w, gasLimit: %d !!!!!!!!!!!!!!!!!!!!", err, gasLimit) + if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(entries) == 1) { + res.Error = err.Error() + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("simulating PreCommitBatch message failed: %w", err) } + // If we're out of gas, split the batch in half and try again + if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { + mid := len(entries) / 2 + ret0, err := b.processPreCommitBatch(cfg, bf, entries[:mid], nv) + ret1, err := b.processPreCommitBatch(cfg, bf, entries[mid:], nv) + + return append(ret0, ret1...), err + } + + // If state call succeeds, we can send the message for real + mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, deposit, maxFee, enc.Bytes()) + log.Errorf("!!!!!!!!!!!!!!sendMsg err: %w, mcid: %v!!!!!!!!!!!!!!!!!!!!", err, mcid) + if err != nil { + res.Error = err.Error() + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("pushing message to mpool: %w", err) + } res.Msg = &mcid - - log.Infow("Sent PreCommitSectorBatch message", "cid", mcid, "from", from, "sectors", len(b.todo)) - return []sealiface.PreCommitBatchRes{res}, nil } +func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKey, bf abi.TokenAmount, nv network.Version) ([]sealiface.PreCommitBatchRes, error) { + var pcEntries []*preCommitEntry + for _, p := range b.todo { + pcEntries = append(pcEntries, p) + } + + return b.processPreCommitBatch(cfg, bf, pcEntries, nv) +} + // register PreCommit, wait for batch message, return message CID func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) { ts, err := b.api.ChainHead(b.mctx) diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index 0fadb6131..d664de1e2 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -64,6 +64,7 @@ type SealingAPI interface { StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error) StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tsk types.TipSetKey) ([]api.Partition, error) MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) ChainHead(ctx context.Context) (*types.TipSet, error) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) diff --git a/storage/pipeline/utils.go b/storage/pipeline/utils.go index 3f8d534cb..ce4283b6c 100644 --- a/storage/pipeline/utils.go +++ b/storage/pipeline/utils.go @@ -94,6 +94,21 @@ func collateralSendAmount(ctx context.Context, api interface { return collateral, nil } +func simulateMsgGas(ctx context.Context, sa interface { + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) +}, + from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (*types.Message, error) { + msg := types.Message{ + To: to, + From: from, + Value: value, + Method: method, + Params: params, + } + + return sa.GasEstimateMessageGas(ctx, &msg, nil, types.EmptyTSK) +} + func sendMsg(ctx context.Context, sa interface { MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) }, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) { From 8893c62a428843904621fede1914331d13ff392f Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Mon, 10 Apr 2023 15:39:42 -0400 Subject: [PATCH 2/7] make gen --- storage/pipeline/mocks/api.go | 15 +++++++++++++++ storage/pipeline/mocks/mock_precommit_batcher.go | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/storage/pipeline/mocks/api.go b/storage/pipeline/mocks/api.go index 066fe996e..5c67a1c42 100644 --- a/storage/pipeline/mocks/api.go +++ b/storage/pipeline/mocks/api.go @@ -94,6 +94,21 @@ func (mr *MockSealingAPIMockRecorder) ChainReadObj(arg0, arg1 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainReadObj", reflect.TypeOf((*MockSealingAPI)(nil).ChainReadObj), arg0, arg1) } +// GasEstimateMessageGas mocks base method. +func (m *MockSealingAPI) GasEstimateMessageGas(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec, arg3 types.TipSetKey) (*types.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GasEstimateMessageGas", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*types.Message) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GasEstimateMessageGas indicates an expected call of GasEstimateMessageGas. +func (mr *MockSealingAPIMockRecorder) GasEstimateMessageGas(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasEstimateMessageGas", reflect.TypeOf((*MockSealingAPI)(nil).GasEstimateMessageGas), arg0, arg1, arg2, arg3) +} + // MpoolPushMessage mocks base method. func (m *MockSealingAPI) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) { m.ctrl.T.Helper() diff --git a/storage/pipeline/mocks/mock_precommit_batcher.go b/storage/pipeline/mocks/mock_precommit_batcher.go index 2d514eb7e..68cce7fb0 100644 --- a/storage/pipeline/mocks/mock_precommit_batcher.go +++ b/storage/pipeline/mocks/mock_precommit_batcher.go @@ -58,6 +58,21 @@ func (mr *MockPreCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).ChainHead), arg0) } +// GasEstimateMessageGas mocks base method. +func (m *MockPreCommitBatcherApi) GasEstimateMessageGas(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec, arg3 types.TipSetKey) (*types.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GasEstimateMessageGas", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*types.Message) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GasEstimateMessageGas indicates an expected call of GasEstimateMessageGas. +func (mr *MockPreCommitBatcherApiMockRecorder) GasEstimateMessageGas(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasEstimateMessageGas", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).GasEstimateMessageGas), arg0, arg1, arg2, arg3) +} + // MpoolPushMessage mocks base method. func (m *MockPreCommitBatcherApi) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) { m.ctrl.T.Helper() From 6f91dc7c5b2ba3cb8f25be0197be5222c965016a Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Mon, 10 Apr 2023 16:24:13 -0400 Subject: [PATCH 3/7] populate result error on exit conditions --- storage/pipeline/precommit_batch.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 6817df0e5..dc50737a7 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -334,11 +334,13 @@ func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.To enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { + res.Error = err.Error() return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't serialize PreCommitSectorBatchParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK) if err != nil { + res.Error = err.Error() return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } @@ -347,6 +349,7 @@ func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.To aggFeeRaw, err := policy.AggregatePreCommitNetworkFee(nv, len(params.Sectors), bf) if err != nil { log.Errorf("getting aggregate precommit network fee: %s", err) + res.Error = err.Error() return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("getting aggregate precommit network fee: %s", err) } @@ -380,8 +383,8 @@ func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.To // If we're out of gas, split the batch in half and try again if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { mid := len(entries) / 2 - ret0, err := b.processPreCommitBatch(cfg, bf, entries[:mid], nv) - ret1, err := b.processPreCommitBatch(cfg, bf, entries[mid:], nv) + ret0, _ := b.processPreCommitBatch(cfg, bf, entries[:mid], nv) + ret1, _ := b.processPreCommitBatch(cfg, bf, entries[mid:], nv) return append(ret0, ret1...), err } From 4eb4af639aabc865bd4f1410a3769080cf64f97c Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Wed, 12 Apr 2023 00:30:19 -0400 Subject: [PATCH 4/7] Split PCA msg into smaller batches --- storage/pipeline/commit_batch.go | 48 +++++++++++++++---- storage/pipeline/mocks/mock_commit_batcher.go | 15 ++++++ storage/pipeline/precommit_batch_test.go | 1 + 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 572dff808..8adb7f3cd 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -37,6 +37,7 @@ var aggFeeDen = big.NewInt(100) type CommitBatcherApi interface { MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) ChainHead(ctx context.Context) (*types.TipSet, error) @@ -234,7 +235,11 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, if individual { res, err = b.processIndividually(cfg) } else { - res, err = b.processBatch(cfg) + var sectors []abi.SectorNumber + for sn := range b.todo { + sectors = append(sectors, sn) + } + res, err = b.processBatch(cfg, sectors) } if err != nil { @@ -264,13 +269,13 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, return res, nil } -func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { +func (b *CommitBatcher) processBatch(cfg sealiface.Config, sectors []abi.SectorNumber) ([]sealiface.CommitBatchRes, error) { ts, err := b.api.ChainHead(b.mctx) if err != nil { return nil, err } - total := len(b.todo) + total := len(sectors) res := sealiface.CommitBatchRes{ FailedSectors: map[abi.SectorNumber]string{}, @@ -284,24 +289,24 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa infos := make([]proof.AggregateSealVerifyInfo, 0, total) collateral := big.Zero() - for id, p := range b.todo { + for _, sector := range sectors { if len(infos) >= cfg.MaxCommitBatch { log.Infow("commit batch full") break } - res.Sectors = append(res.Sectors, id) + res.Sectors = append(res.Sectors, sector) - sc, err := b.getSectorCollateral(id, ts.Key()) + sc, err := b.getSectorCollateral(sector, ts.Key()) if err != nil { - res.FailedSectors[id] = err.Error() + res.FailedSectors[sector] = err.Error() continue } collateral = big.Add(collateral, sc) - params.SectorNumbers.Set(uint64(id)) - infos = append(infos, p.Info) + params.SectorNumbers.Set(uint64(sector)) + infos = append(infos, b.todo[sector].Info) } if len(infos) == 0 { @@ -318,17 +323,20 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa mid, err := address.IDFromAddress(b.maddr) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) } nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key()) if err != nil { + res.Error = err.Error() log.Errorf("getting network version: %s", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err) } arp, err := b.aggregateProofType(nv) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err) } @@ -339,16 +347,19 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa Infos: infos, }, proofs) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } @@ -356,6 +367,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee) if err != nil { + res.Error = err.Error() log.Errorf("getting aggregate commit network fee: %s", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err) } @@ -365,6 +377,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa needFunds := big.Add(collateral, aggFee) needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, err } @@ -372,9 +385,26 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } + _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) + + if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { + res.Error = err.Error() + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) + } + + // If we're out of gas, split the batch in half and try again + if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { + mid := len(sectors) / 2 + ret0, _ := b.processBatch(cfg, sectors[:mid]) + ret1, _ := b.processBatch(cfg, sectors[mid:]) + + return append(ret0, ret1...), err + } + mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) if err != nil { return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) diff --git a/storage/pipeline/mocks/mock_commit_batcher.go b/storage/pipeline/mocks/mock_commit_batcher.go index c4e7e3eef..431a47c73 100644 --- a/storage/pipeline/mocks/mock_commit_batcher.go +++ b/storage/pipeline/mocks/mock_commit_batcher.go @@ -58,6 +58,21 @@ func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0) } +// GasEstimateMessageGas mocks base method. +func (m *MockCommitBatcherApi) GasEstimateMessageGas(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec, arg3 types.TipSetKey) (*types.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GasEstimateMessageGas", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*types.Message) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GasEstimateMessageGas indicates an expected call of GasEstimateMessageGas. +func (mr *MockCommitBatcherApiMockRecorder) GasEstimateMessageGas(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasEstimateMessageGas", reflect.TypeOf((*MockCommitBatcherApi)(nil).GasEstimateMessageGas), arg0, arg1, arg2, arg3) +} + // MpoolPushMessage mocks base method. func (m *MockCommitBatcherApi) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) { m.ctrl.T.Helper() diff --git a/storage/pipeline/precommit_batch_test.go b/storage/pipeline/precommit_batch_test.go index 1779128bd..b9c02530f 100644 --- a/storage/pipeline/precommit_batch_test.go +++ b/storage/pipeline/precommit_batch_test.go @@ -162,6 +162,7 @@ func TestPrecommitBatcher(t *testing.T) { s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil) s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil) + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil) s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool { b := i.(*types.Message) var params miner6.PreCommitSectorBatchParams From 79826447f582211560e9e71f24f7d954451c37cf Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Wed, 12 Apr 2023 21:45:43 -0400 Subject: [PATCH 5/7] fix unit and integration test breaks --- storage/pipeline/commit_batch.go | 4 +++- storage/pipeline/commit_batch_test.go | 1 + storage/pipeline/precommit_batch.go | 14 ++++---------- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 8adb7f3cd..09f1357d7 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -392,17 +392,19 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config, sectors []abi.SectorN _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { + log.Errorf("simulating CommitBatch message failed: %s", err) res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) } // If we're out of gas, split the batch in half and try again if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { + log.Warnf("CommitAggregate message ran out of gas, splitting batch in half and trying again (sectors: %d)", len(sectors)) mid := len(sectors) / 2 ret0, _ := b.processBatch(cfg, sectors[:mid]) ret1, _ := b.processBatch(cfg, sectors[mid:]) - return append(ret0, ret1...), err + return append(ret0, ret1...), nil } mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) diff --git a/storage/pipeline/commit_batch_test.go b/storage/pipeline/commit_batch_test.go index a8948edcf..ef45b6b71 100644 --- a/storage/pipeline/commit_batch_test.go +++ b/storage/pipeline/commit_batch_test.go @@ -201,6 +201,7 @@ func TestCommitBatcher(t *testing.T) { if batch { s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 1000000}, nil) //s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) } diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index dc50737a7..869c4feb5 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -321,7 +321,6 @@ func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, } func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.TokenAmount, entries []*preCommitEntry, nv network.Version) ([]sealiface.PreCommitBatchRes, error) { - log.Errorf("!!!!!!!!!!!!!!!!processPreCommitBatch: %d entries!!!!!!!!!!!!!!!!", len(entries)) params := miner.PreCommitSectorBatchParams{} deposit := big.Zero() var res sealiface.PreCommitBatchRes @@ -368,13 +367,8 @@ func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.To return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } - msg, err := simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, deposit, maxFee, enc.Bytes()) + _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes()) - var gasLimit int64 - if msg != nil { - gasLimit = msg.GasLimit - } - log.Errorf("!!!!!!!!!!!!!!simulate Msg err: %w, gasLimit: %d !!!!!!!!!!!!!!!!!!!!", err, gasLimit) if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(entries) == 1) { res.Error = err.Error() return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("simulating PreCommitBatch message failed: %w", err) @@ -382,16 +376,16 @@ func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.To // If we're out of gas, split the batch in half and try again if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { + log.Warnf("PreCommitBatch out of gas, splitting batch in half and trying again") mid := len(entries) / 2 ret0, _ := b.processPreCommitBatch(cfg, bf, entries[:mid], nv) ret1, _ := b.processPreCommitBatch(cfg, bf, entries[mid:], nv) - return append(ret0, ret1...), err + return append(ret0, ret1...), nil } // If state call succeeds, we can send the message for real - mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, deposit, maxFee, enc.Bytes()) - log.Errorf("!!!!!!!!!!!!!!sendMsg err: %w, mcid: %v!!!!!!!!!!!!!!!!!!!!", err, mcid) + mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes()) if err != nil { res.Error = err.Error() return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("pushing message to mpool: %w", err) From 0c83781a7f1b4518ffb993198e1fa8f1cbd91bb7 Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Wed, 19 Apr 2023 18:44:32 -0400 Subject: [PATCH 6/7] Add tests for PCB/PCA batch splitting --- node/impl/full/gas.go | 2 - storage/pipeline/commit_batch.go | 5 --- storage/pipeline/commit_batch_test.go | 53 +++++++++++++++++++++++- storage/pipeline/precommit_batch_test.go | 53 +++++++++++++++++------- 4 files changed, 89 insertions(+), 24 deletions(-) diff --git a/node/impl/full/gas.go b/node/impl/full/gas.go index 33d047873..c5b22354a 100644 --- a/node/impl/full/gas.go +++ b/node/impl/full/gas.go @@ -384,13 +384,11 @@ func gasEstimateGasLimit( func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, _ types.TipSetKey) (*types.Message, error) { if msg.GasLimit == 0 { gasLimit, err := m.GasEstimateGasLimit(ctx, msg, types.EmptyTSK) - log.Errorf("GasEstimateMessageGas GasLimit: %f", gasLimit) if err != nil { return nil, err } msg.GasLimit = int64(float64(gasLimit) * m.Mpool.GetConfig().GasLimitOverestimation) - log.Errorf("GasEstimateMessageGas GasLimit: %f, GasLimitWithOverestimation", gasLimit, msg.GasLimit) // Gas overestimation can cause us to exceed the block gas limit, cap it. if msg.GasLimit > build.BlockGasLimit { msg.GasLimit = build.BlockGasLimit diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 09f1357d7..afcd2a12e 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -290,11 +290,6 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config, sectors []abi.SectorN collateral := big.Zero() for _, sector := range sectors { - if len(infos) >= cfg.MaxCommitBatch { - log.Infow("commit batch full") - break - } - res.Sectors = append(res.Sectors, sector) sc, err := b.getSectorCollateral(sector, ts.Key()) diff --git a/storage/pipeline/commit_batch_test.go b/storage/pipeline/commit_batch_test.go index ef45b6b71..15c2100cb 100644 --- a/storage/pipeline/commit_batch_test.go +++ b/storage/pipeline/commit_batch_test.go @@ -201,8 +201,7 @@ func TestCommitBatcher(t *testing.T) { if batch { s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) - s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 1000000}, nil) - //s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil) } s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool { @@ -225,6 +224,48 @@ func TestCommitBatcher(t *testing.T) { } } + expectProcessBatch := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool, gasOverLimit bool) action { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise { + s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil) + + ti := len(expect) + batch := false + if ti >= minBatch { + batch = true + ti = 1 + } + + if !aboveBalancer { + batch = false + ti = len(expect) + } + + pciC := len(expect) + if failOnePCI { + s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), abi.SectorNumber(1), gomock.Any()).Return(nil, nil).Times(1) // not found + pciC = len(expect) - 1 + if !batch { + ti-- + } + } + s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&minertypes.SectorPreCommitOnChainInfo{ + PreCommitDeposit: big.Zero(), + }, nil).Times(pciC) + s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(pciC) + + if batch { + s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version18, nil) + if gasOverLimit { + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, &api.ErrOutOfGas{}) + } else { + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil) + } + + } + return nil + } + } + flush := func(expect []abi.SectorNumber, aboveBalancer, failOnePCI bool) action { return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *pipeline.CommitBatcher) promise { _ = expectSend(expect, aboveBalancer, failOnePCI)(t, s, pcb) @@ -310,6 +351,14 @@ func TestCommitBatcher(t *testing.T) { addSectors(getSectors(maxBatch), true), }, }, + "addMax-aboveBalancer-gasAboveLimit": { + actions: []action{ + expectProcessBatch(getSectors(maxBatch), true, false, true), + expectSend(getSectors(maxBatch)[:maxBatch/2], true, false), + expectSend(getSectors(maxBatch)[maxBatch/2:], true, false), + addSectors(getSectors(maxBatch), true), + }, + }, "addSingle-belowBalancer": { actions: []action{ addSector(0, false), diff --git a/storage/pipeline/precommit_batch_test.go b/storage/pipeline/precommit_batch_test.go index b9c02530f..6951faad7 100644 --- a/storage/pipeline/precommit_batch_test.go +++ b/storage/pipeline/precommit_batch_test.go @@ -156,22 +156,34 @@ func TestPrecommitBatcher(t *testing.T) { } //stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001 - expectSend := func(expect []abi.SectorNumber) action { + expectSend := func(expect []abi.SectorNumber, gasOverLimit bool) action { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { + s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil) + if gasOverLimit { + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, &api.ErrOutOfGas{}) + } else { + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil) + } + + if !gasOverLimit { + s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool { + b := i.(*types.Message) + var params miner6.PreCommitSectorBatchParams + require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b.Params))) + for s, number := range expect { + require.Equal(t, number, params.Sectors[s].SectorNumber) + } + return true + }), gomock.Any()).Return(dummySmsg, nil) + } + return nil + } + } + + expectInitialCalls := func() action { return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { s.EXPECT().ChainHead(gomock.Any()).Return(makeBFTs(t, big.NewInt(10001), 1), nil) s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil) - - s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil) - s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil) - s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool { - b := i.(*types.Message) - var params miner6.PreCommitSectorBatchParams - require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b.Params))) - for s, number := range expect { - require.Equal(t, number, params.Sectors[s].SectorNumber) - } - return true - }), gomock.Any()).Return(dummySmsg, nil) return nil } } @@ -199,7 +211,8 @@ func TestPrecommitBatcher(t *testing.T) { flush := func(expect []abi.SectorNumber) action { return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { - _ = expectSend(expect)(t, s, pcb) + _ = expectInitialCalls()(t, s, pcb) + _ = expectSend(expect, false)(t, s, pcb) r, err := pcb.Flush(ctx) require.NoError(t, err) @@ -241,7 +254,17 @@ func TestPrecommitBatcher(t *testing.T) { }, "addMax": { actions: []action{ - expectSend(getSectors(maxBatch)), + expectInitialCalls(), + expectSend(getSectors(maxBatch), false), + addSectors(getSectors(maxBatch), true), + }, + }, + "addMax-gasAboveLimit": { + actions: []action{ + expectInitialCalls(), + expectSend(getSectors(maxBatch), true), + expectSend(getSectors(maxBatch)[:maxBatch/2], false), + expectSend(getSectors(maxBatch)[maxBatch/2:], false), addSectors(getSectors(maxBatch), true), }, }, From d1f3380850b3b8d8bcc7a764b3b4f2e1fca63fac Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Thu, 20 Apr 2023 12:15:51 -0400 Subject: [PATCH 7/7] change comment --- storage/pipeline/commit_batch.go | 2 +- storage/pipeline/precommit_batch.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index afcd2a12e..9948b5432 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -392,7 +392,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config, sectors []abi.SectorN return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) } - // If we're out of gas, split the batch in half and try again + // If we're out of gas, split the batch in half and evaluate again if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { log.Warnf("CommitAggregate message ran out of gas, splitting batch in half and trying again (sectors: %d)", len(sectors)) mid := len(sectors) / 2 diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 869c4feb5..63e263662 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -374,7 +374,7 @@ func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.To return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("simulating PreCommitBatch message failed: %w", err) } - // If we're out of gas, split the batch in half and try again + // If we're out of gas, split the batch in half and evaluate again if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { log.Warnf("PreCommitBatch out of gas, splitting batch in half and trying again") mid := len(entries) / 2