diff --git a/cmd/lotus-pcr/main.go b/cmd/lotus-pcr/main.go index 80732accf..613e746dc 100644 --- a/cmd/lotus-pcr/main.go +++ b/cmd/lotus-pcr/main.go @@ -125,7 +125,13 @@ var runCmd = &cli.Command{ Name: "max-message-queue", EnvVars: []string{"LOTUS_PCR_MAX_MESSAGE_QUEUE"}, Usage: "set the maximum number of messages that can be queue in the mpool", - Value: 3000, + Value: 300, + }, + &cli.IntFlag{ + Name: "aggregate-tipsets", + EnvVars: []string{"LOTUS_PCR_AGGREGATE_TIPSETS"}, + Usage: "number of tipsets to process before sending messages", + Value: 1, }, &cli.BoolFlag{ Name: "dry-run", @@ -194,6 +200,7 @@ var runCmd = &cli.Command{ dryRun := cctx.Bool("dry-run") preCommitEnabled := cctx.Bool("pre-commit") proveCommitEnabled := cctx.Bool("prove-commit") + aggregateTipsets := cctx.Int("aggregate-tipsets") rf := &refunder{ api: api, @@ -204,16 +211,27 @@ var runCmd = &cli.Command{ proveCommitEnabled: proveCommitEnabled, } + var refunds *MinersRefund = NewMinersRefund() + var rounds int = 0 + for tipset := range tipsetsCh { - refunds, err := rf.ProcessTipset(ctx, tipset) + refunds, err = rf.ProcessTipset(ctx, tipset, refunds) if err != nil { return err } - if err := rf.Refund(ctx, tipset, refunds); err != nil { + rounds = rounds + 1 + if rounds < aggregateTipsets { + continue + } + + if err := rf.Refund(ctx, tipset, refunds, rounds); err != nil { return err } + rounds = 0 + refunds = NewMinersRefund() + if err := r.SetHeight(tipset.Height()); err != nil { return err } @@ -247,13 +265,15 @@ var runCmd = &cli.Command{ } type MinersRefund struct { - refunds map[address.Address]types.BigInt - count int + refunds map[address.Address]types.BigInt + count int + totalRefunds types.BigInt } func NewMinersRefund() *MinersRefund { return &MinersRefund{ - refunds: make(map[address.Address]types.BigInt), + refunds: make(map[address.Address]types.BigInt), + totalRefunds: types.NewInt(0), } } @@ -263,6 +283,7 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) { } m.count = m.count + 1 + m.totalRefunds = types.BigAdd(m.totalRefunds, value) m.refunds[addr] = types.BigAdd(m.refunds[addr], value) } @@ -271,6 +292,10 @@ func (m *MinersRefund) Count() int { return m.count } +func (m *MinersRefund) TotalRefunds() types.BigInt { + return m.totalRefunds +} + func (m *MinersRefund) Miners() []address.Address { miners := make([]address.Address, 0, len(m.refunds)) for addr := range m.refunds { @@ -305,7 +330,7 @@ type refunder struct { proveCommitEnabled bool } -func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*MinersRefund, error) { +func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) { cids := tipset.Cids() if len(cids) == 0 { log.Errorw("no cids in tipset", "height", tipset.Height(), "key", tipset.Key()) @@ -329,9 +354,8 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*Mi return nil, nil } - refunds := NewMinersRefund() - refundValue := types.NewInt(0) + tipsetRefunds := NewMinersRefund() for i, msg := range msgs { m := msg.Message @@ -427,12 +451,15 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*Mi log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue) refunds.Track(m.From, refundValue) + tipsetRefunds.Track(m.From, refundValue) } + log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "total_refunds", tipsetRefunds.TotalRefunds(), "messages_processed", tipsetRefunds.Count()) + return refunds, nil } -func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) error { +func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error { if refunds.Count() == 0 { log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key()) return nil @@ -490,7 +517,7 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi refundSum = types.BigAdd(refundSum, msg.Value) } - log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) + log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) return nil }