Split PCA msg into smaller batches

This commit is contained in:
Shrenuj Bansal 2023-04-12 00:30:19 -04:00
parent 6f91dc7c5b
commit 4eb4af639a
3 changed files with 55 additions and 9 deletions

View File

@ -37,6 +37,7 @@ var aggFeeDen = big.NewInt(100)
type CommitBatcherApi interface {
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
@ -234,7 +235,11 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
if individual {
res, err = b.processIndividually(cfg)
} else {
res, err = b.processBatch(cfg)
var sectors []abi.SectorNumber
for sn := range b.todo {
sectors = append(sectors, sn)
}
res, err = b.processBatch(cfg, sectors)
}
if err != nil {
@ -264,13 +269,13 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
return res, nil
}
func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
func (b *CommitBatcher) processBatch(cfg sealiface.Config, sectors []abi.SectorNumber) ([]sealiface.CommitBatchRes, error) {
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
total := len(b.todo)
total := len(sectors)
res := sealiface.CommitBatchRes{
FailedSectors: map[abi.SectorNumber]string{},
@ -284,24 +289,24 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
infos := make([]proof.AggregateSealVerifyInfo, 0, total)
collateral := big.Zero()
for id, p := range b.todo {
for _, sector := range sectors {
if len(infos) >= cfg.MaxCommitBatch {
log.Infow("commit batch full")
break
}
res.Sectors = append(res.Sectors, id)
res.Sectors = append(res.Sectors, sector)
sc, err := b.getSectorCollateral(id, ts.Key())
sc, err := b.getSectorCollateral(sector, ts.Key())
if err != nil {
res.FailedSectors[id] = err.Error()
res.FailedSectors[sector] = err.Error()
continue
}
collateral = big.Add(collateral, sc)
params.SectorNumbers.Set(uint64(id))
infos = append(infos, p.Info)
params.SectorNumbers.Set(uint64(sector))
infos = append(infos, b.todo[sector].Info)
}
if len(infos) == 0 {
@ -318,17 +323,20 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
mid, err := address.IDFromAddress(b.maddr)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
}
nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key())
if err != nil {
res.Error = err.Error()
log.Errorf("getting network version: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
}
arp, err := b.aggregateProofType(nv)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate proof type: %w", err)
}
@ -339,16 +347,19 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
Infos: infos,
}, proofs)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err)
}
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
}
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, types.EmptyTSK)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}
@ -356,6 +367,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee)
if err != nil {
res.Error = err.Error()
log.Errorf("getting aggregate commit network fee: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting aggregate commit network fee: %s", err)
}
@ -365,6 +377,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
needFunds := big.Add(collateral, aggFee)
needFunds, err = collateralSendAmount(b.mctx, b.api, b.maddr, cfg, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, err
}
@ -372,9 +385,26 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds)
if err != nil {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}
_, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) {
res.Error = err.Error()
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err)
}
// If we're out of gas, split the batch in half and try again
if api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) {
mid := len(sectors) / 2
ret0, _ := b.processBatch(cfg, sectors[:mid])
ret1, _ := b.processBatch(cfg, sectors[mid:])
return append(ret0, ret1...), err
}
mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)

View File

@ -58,6 +58,21 @@ func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0)
}
// GasEstimateMessageGas mocks base method.
func (m *MockCommitBatcherApi) GasEstimateMessageGas(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec, arg3 types.TipSetKey) (*types.Message, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GasEstimateMessageGas", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*types.Message)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GasEstimateMessageGas indicates an expected call of GasEstimateMessageGas.
func (mr *MockCommitBatcherApiMockRecorder) GasEstimateMessageGas(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GasEstimateMessageGas", reflect.TypeOf((*MockCommitBatcherApi)(nil).GasEstimateMessageGas), arg0, arg1, arg2, arg3)
}
// MpoolPushMessage mocks base method.
func (m *MockCommitBatcherApi) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) {
m.ctrl.T.Helper()

View File

@ -162,6 +162,7 @@ func TestPrecommitBatcher(t *testing.T) {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil)
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil)
s.EXPECT().GasEstimateMessageGas(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&types.Message{GasLimit: 100000}, nil)
s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.(*types.Message)
var params miner6.PreCommitSectorBatchParams