From 4eb4af639aabc865bd4f1410a3769080cf64f97c Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Wed, 12 Apr 2023 00:30:19 -0400 Subject: [PATCH] Split PCA msg into smaller batches --- storage/pipeline/commit_batch.go | 48 +++++++++++++++---- storage/pipeline/mocks/mock_commit_batcher.go | 15 ++++++ storage/pipeline/precommit_batch_test.go | 1 + 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 572dff808..8adb7f3cd 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -37,6 +37,7 @@ var aggFeeDen = big.NewInt(100) type CommitBatcherApi interface { MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) ChainHead(ctx context.Context) (*types.TipSet, error) @@ -234,7 +235,11 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, if individual { res, err = b.processIndividually(cfg) } else { - res, err = b.processBatch(cfg) + var sectors []abi.SectorNumber + for sn := range b.todo { + sectors = append(sectors, sn) + } + res, err = b.processBatch(cfg, sectors) } if err != nil { @@ -264,13 +269,13 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, return res, nil } -func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { +func (b *CommitBatcher) processBatch(cfg sealiface.Config, sectors []abi.SectorNumber) ([]sealiface.CommitBatchRes, error) { ts, err := b.api.ChainHead(b.mctx) if err != nil { return nil, err } - total := len(b.todo) + total := len(sectors) res := sealiface.CommitBatchRes{ FailedSectors: map[abi.SectorNumber]string{}, @@ -284,24 +289,24 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa infos := make([]proof.AggregateSealVerifyInfo, 0, total) collateral := big.Zero() - for id, p := range b.todo { + for _, sector := range sectors { if len(infos) >= cfg.MaxCommitBatch { log.Infow("commit batch full") break } - res.Sectors = append(res.Sectors, id) + res.Sectors = append(res.Sectors, sector) - sc, err := b.getSectorCollateral(id, ts.Key()) + sc, err := b.getSectorCollateral(sector, ts.Key()) if err != nil { - res.FailedSectors[id] = err.Error() + res.FailedSectors[sector] = err.Error() continue } collateral = big.Add(collateral, sc) - params.SectorNumbers.Set(uint64(id)) - infos = append(infos, p.Info) + params.SectorNumbers.Set(uint64(sector)) + infos = append(infos, b.todo[sector].Info) } if len(infos) == 0 { @@ -318,17 +323,20 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa mid, err := address.IDFromAddress(b.maddr) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) } nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key()) if err != nil { + res.Error = err.Error() log.Errorf("getting network version: %s", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err) } arp, err := b.aggregateProofType(nv) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err) } @@ -339,16 +347,19 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa Infos: infos, }, proofs) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err) } enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) } mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err) } @@ -356,6 +367,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee) if err != nil { + res.Error = err.Error() log.Errorf("getting aggregate commit network fee: %s", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err) } @@ -365,6 +377,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa needFunds := big.Add(collateral, aggFee) needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, err } @@ -372,9 +385,26 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds) if err != nil { + res.Error = err.Error() return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } + _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) + + if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { + res.Error = err.Error() + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) + } + + // If we're out of gas, split the batch in half and try again + if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) { + mid := len(sectors) / 2 + ret0, _ := b.processBatch(cfg, sectors[:mid]) + ret1, _ := b.processBatch(cfg, sectors[mid:]) + + return append(ret0, ret1...), err + } + mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) if err != nil { return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err) diff --git a/storage/pipeline/mocks/mock_commit_batcher.go b/storage/pipeline/mocks/mock_commit_batcher.go index c4e7e3eef..431a47c73 100644 --- a/storage/pipeline/mocks/mock_commit_batcher.go +++ b/storage/pipeline/mocks/mock_commit_batcher.go @@ -58,6 +58,21 @@ func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0) } +// GasEstimateMessageGas mocks base method. +func (m *MockCommitBatcherApi) GasEstimateMessageGas(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec, arg3 types.TipSetKey) (*types.Message, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GasEstimateMessageGas", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*types.Message) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GasEstimateMessageGas indicates an expected call of GasEstimateMessageGas. +func (mr *MockCommitBatcherApiMockRecorder) GasEstimateMessageGas(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasEstimateMessageGas", reflect.TypeOf((*MockCommitBatcherApi)(nil).GasEstimateMessageGas), arg0, arg1, arg2, arg3) +} + // MpoolPushMessage mocks base method. func (m *MockCommitBatcherApi) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) { m.ctrl.T.Helper() diff --git a/storage/pipeline/precommit_batch_test.go b/storage/pipeline/precommit_batch_test.go index 1779128bd..b9c02530f 100644 --- a/storage/pipeline/precommit_batch_test.go +++ b/storage/pipeline/precommit_batch_test.go @@ -162,6 +162,7 @@ func TestPrecommitBatcher(t *testing.T) { s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil) s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil) + s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil) s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool { b := i.(*types.Message) var params miner6.PreCommitSectorBatchParams