From 5040623a12c3b93c27c3949c74c6cb64922c3ea8 Mon Sep 17 00:00:00 2001 From: Travis Person Date: Mon, 17 Aug 2020 18:52:02 +0000 Subject: [PATCH] lotus-pcr: refund provecommit sectors --- cmd/lotus-pcr/main.go | 244 ++++++++++++++++++++++++++++++++---------- 1 file changed, 185 insertions(+), 59 deletions(-) diff --git a/cmd/lotus-pcr/main.go b/cmd/lotus-pcr/main.go index fe75349a2..440c0163b 100644 --- a/cmd/lotus-pcr/main.go +++ b/cmd/lotus-pcr/main.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strconv" + "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -20,6 +21,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" @@ -70,16 +72,24 @@ func main() { EnvVars: []string{"LOTUS_PCR_PATH"}, Value: "~/.lotuspcr", // TODO: Consider XDG_DATA_HOME }, + &cli.StringFlag{ + Name: "log-level", + EnvVars: []string{"LOTUS_PCR_LOG_LEVEL"}, + Hidden: true, + Value: "info", + }, + }, + Before: func(cctx *cli.Context) error { + return logging.SetLogLevel("main", cctx.String("log-level")) }, - Commands: local, } if err := app.Run(os.Args); err != nil { - log.Warn(err) + log.Errorw("exit in error", "err", err) + os.Exit(1) return } - } var versionCmd = &cli.Command{ @@ -90,6 +100,7 @@ var versionCmd = &cli.Command{ return nil }, } + var runCmd = &cli.Command{ Name: "run", Usage: "Start message reimpursement", @@ -97,19 +108,36 @@ var runCmd = &cli.Command{ &cli.StringFlag{ Name: "from", EnvVars: []string{"LOTUS_PCR_FROM"}, + Usage: "wallet address to send refund from", }, &cli.BoolFlag{ Name: "no-sync", EnvVars: []string{"LOTUS_PCR_NO_SYNC"}, + Usage: "do not wait for chain sync to complete", }, &cli.IntFlag{ - Name: "percent-extra", - Value: 3, + Name: "percent-extra", + EnvVars: []string{"LOTUS_PCR_PERCENT_EXTRA"}, + Usage: "extra funds to send above the refund", + Value: 3, }, &cli.IntFlag{ - Name: "head-delay", - Usage: "the number of tipsets to delay message processing to smooth chain reorgs", - Value: int(build.MessageConfidence), + Name: "max-message-queue", + EnvVars: []string{"LOTUS_PCR_MAX_MESSAGE_QUEUE"}, + Usage: "set the maximum number of messages that can be queue in the mpool", + Value: 3000, + }, + &cli.BoolFlag{ + Name: "dry-run", + EnvVars: []string{"LOTUS_PCR_DRY_RUN"}, + Usage: "do not send any messages", + Value: false, + }, + &cli.IntFlag{ + Name: "head-delay", + EnvVars: []string{"LOTUS_PCR_HEAD_DELAY"}, + Usage: "the number of tipsets to delay message processing to smooth chain reorgs", + Value: int(build.MessageConfidence), }, }, Action: func(cctx *cli.Context) error { @@ -150,15 +178,52 @@ var runCmd = &cli.Command{ } percentExtra := cctx.Int("percent-extra") + maxMessageQueue := cctx.Int("max-message-queue") + dryRun := cctx.Bool("dry-run") + + rf := &refunder{ + api: api, + wallet: from, + percentExtra: percentExtra, + dryRun: dryRun, + } for tipset := range tipsetsCh { - if err := ProcessTipset(ctx, api, tipset, from, percentExtra); err != nil { + refunds, err := rf.ProcessTipset(ctx, tipset) + if err != nil { + return err + } + + if err := rf.Refund(ctx, tipset, refunds); err != nil { return err } if err := r.SetHeight(tipset.Height()); err != nil { return err } + + for { + msgs, err := api.MpoolPending(ctx, types.EmptyTSK) + if err != nil { + log.Warnw("failed to fetch pending messages", "err", err) + time.Sleep(time.Duration(int64(time.Second) * int64(build.BlockDelaySecs))) + continue + } + + count := 0 + for _, msg := range msgs { + if msg.Message.From == from { + count = count + 1 + } + } + + if count < maxMessageQueue { + break + } + + log.Warnw("messages in mpool over max message queue", "message_count", count, "max_message_queue", maxMessageQueue) + time.Sleep(time.Duration(int64(time.Second) * int64(build.BlockDelaySecs))) + } } return nil @@ -167,6 +232,7 @@ var runCmd = &cli.Command{ type MinersRefund struct { refunds map[address.Address]types.BigInt + count int } func NewMinersRefund() *MinersRefund { @@ -180,11 +246,13 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) { m.refunds[addr] = types.NewInt(0) } + m.count = m.count + 1 + m.refunds[addr] = types.BigAdd(m.refunds[addr], value) } func (m *MinersRefund) Count() int { - return len(m.refunds) + return m.count } func (m *MinersRefund) Miners() []address.Address { @@ -200,50 +268,58 @@ func (m *MinersRefund) GetRefund(addr address.Address) types.BigInt { return m.refunds[addr] } -type processTipSetApi interface { +type refunderNodeApi interface { ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) + ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) + StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) } -func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipSet, wallet address.Address, percentExtra int) error { - log.Infow("processing tipset", "height", tipset.Height(), "key", tipset.Key().String()) +type refunder struct { + api refunderNodeApi + wallet address.Address + percentExtra int + dryRun bool +} +func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*MinersRefund, error) { cids := tipset.Cids() if len(cids) == 0 { - return fmt.Errorf("no cids in tipset") + log.Errorw("no cids in tipset", "height", tipset.Height(), "key", tipset.Key()) + return nil, fmt.Errorf("no cids in tipset") } - msgs, err := api.ChainGetParentMessages(ctx, cids[0]) + msgs, err := r.api.ChainGetParentMessages(ctx, cids[0]) if err != nil { - log.Errorw("failed to get tipset parent messages", "err", err) - return nil + log.Errorw("failed to get tipset parent messages", "err", err, "height", tipset.Height(), "key", tipset.Key()) + return nil, nil } - recps, err := api.ChainGetParentReceipts(ctx, cids[0]) + recps, err := r.api.ChainGetParentReceipts(ctx, cids[0]) if err != nil { - log.Errorw("failed to get tipset parent receipts", "err", err) - return nil + log.Errorw("failed to get tipset parent receipts", "err", err, "height", tipset.Height(), "key", tipset.Key()) + return nil, nil } if len(msgs) != len(recps) { - log.Errorw("message length does not match receipts length", "messages", len(msgs), "receipts", len(recps)) - return nil + log.Errorw("message length does not match receipts length", "height", tipset.Height(), "key", tipset.Key(), "messages", len(msgs), "receipts", len(recps)) + return nil, nil } refunds := NewMinersRefund() - count := 0 + refundValue := types.NewInt(0) for i, msg := range msgs { m := msg.Message - a, err := api.StateGetActor(ctx, m.To, tipset.Key()) + a, err := r.api.StateGetActor(ctx, m.To, tipset.Key()) if err != nil { - log.Warnw("failed to look up state actor", "actor", m.To) + log.Warnw("failed to look up state actor", "height", tipset.Height(), "key", tipset.Key(), "actor", m.To) continue } @@ -251,39 +327,87 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS continue } - // we only care to look at PreCommitSector messages - if m.Method != builtin.MethodsMiner.PreCommitSector { + var messageMethod string + + switch m.Method { + case builtin.MethodsMiner.ProveCommitSector: + messageMethod = "ProveCommitSector" + if recps[i].ExitCode != exitcode.Ok { + log.Debugw("skipping non-ok exitcode message", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "exitcode", recps[i].ExitCode) + continue + } + + var proveCommitSector miner.ProveCommitSectorParams + if err := proveCommitSector.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil { + log.Warnw("failed to decode provecommit params", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To) + continue + } + + // We use the parent tipset key because precommit information is removed when ProveCommitSector is executed + precommitChainInfo, err := r.api.StateSectorPreCommitInfo(ctx, m.To, proveCommitSector.SectorNumber, tipset.Parents()) + if err != nil { + log.Warnw("failed to get precommit info for sector", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber) + continue + } + + precommitTipset, err := r.api.ChainGetTipSetByHeight(ctx, precommitChainInfo.PreCommitEpoch, tipset.Key()) + if err != nil { + log.Warnf("failed to lookup precommit epoch", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber) + continue + } + + collateral, err := r.api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitChainInfo.Info, precommitTipset.Key()) + if err != nil { + log.Warnw("failed to get initial pledge collateral", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber) + } + + collateral = big.Sub(collateral, precommitChainInfo.PreCommitDeposit) + if collateral.LessThan(big.Zero()) { + log.Debugw("skipping zero pledge collateral difference", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber) + continue + } + + refundValue = collateral + case builtin.MethodsMiner.PreCommitSector: + messageMethod = "PreCommitSector" + + if recps[i].ExitCode != exitcode.Ok { + log.Debugw("skipping non-ok exitcode message", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "exitcode", recps[i].ExitCode) + continue + } + + var precommitInfo miner.SectorPreCommitInfo + if err := precommitInfo.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil { + log.Warnw("failed to decode precommit params", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To) + continue + } + + collateral, err := r.api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitInfo, tipset.Key()) + if err != nil { + log.Warnw("failed to calculate initial pledge collateral", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", precommitInfo.SectorNumber) + continue + } + + refundValue = collateral + default: continue } - if recps[i].ExitCode != exitcode.Ok { - log.Debugw("skipping non-ok exitcode message", "cid", msg.Cid.String(), "exitcode", recps[i].ExitCode) + if r.percentExtra > 0 { + refundValue = types.BigAdd(refundValue, types.BigDiv(types.BigMul(refundValue, types.NewInt(100)), types.NewInt(uint64(r.percentExtra)))) } - var precommitInfo miner.SectorPreCommitInfo - if err := precommitInfo.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil { - log.Warnw("failed to decode precommit params", "err", err) - continue - } + log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue) - refundValue, err := api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitInfo, tipset.Key()) - if err != nil { - log.Warnw("failed to calculate", "err", err) - continue - } - - if percentExtra > 0 { - refundValue = types.BigAdd(refundValue, types.BigDiv(refundValue, types.NewInt(100*uint64(percentExtra)))) - } - - log.Infow("processing message", "from", m.From, "to", m.To, "value", m.Value.String(), "gas_fee_cap", m.GasFeeCap.String(), "gas_premium", m.GasPremium.String(), "gas_used", fmt.Sprintf("%d", recps[i].GasUsed), "refund", refundValue.String()) - - count = count + 1 refunds.Track(m.From, refundValue) } + return refunds, nil +} + +func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) error { if refunds.Count() == 0 { - log.Debugw("no messages to refund in tipset") + log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key()) return nil } @@ -294,15 +418,15 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS refundValue := refunds.GetRefund(maddr) // We want to try and ensure these messages get mined quickly - gasPremium, err := api.GasEstimateGasPremium(ctx, 0, wallet, 0, tipset.Key()) + gasPremium, err := r.api.GasEstimateGasPremium(ctx, 0, r.wallet, 0, tipset.Key()) if err != nil { - log.Warnw("failed to estimate gas premium", "err", err) + log.Warnw("failed to estimate gas premium", "err", err, "height", tipset.Height(), "key", tipset.Key()) continue } msg := &types.Message{ Value: refundValue, - From: wallet, + From: r.wallet, To: maddr, GasPremium: gasPremium, @@ -312,32 +436,34 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS messages = append(messages, msg) } - balance, err := api.WalletBalance(ctx, wallet) + balance, err := r.api.WalletBalance(ctx, r.wallet) if err != nil { + log.Errorw("failed to get wallet balance", "err", err, "height", tipset.Height(), "key", tipset.Key()) return xerrors.Errorf("failed to get wallet balance :%w", err) } // Calculate the minimum balance as the total refund we need to issue plus 5% to cover fees minBalance := types.BigAdd(refundSum, types.BigDiv(refundSum, types.NewInt(500))) if balance.LessThan(minBalance) { - log.Errorw("not sufficent funds to cover refunds", "balance", balance.String(), "refund_sum", refundSum.String(), "minimum_required", minBalance.String()) + log.Errorw("not sufficent funds to cover refunds", "balance", balance, "refund_sum", refundSum, "minimum_required", minBalance) return xerrors.Errorf("wallet does not have enough balance to cover refund") } failures := 0 refundSum.SetUint64(0) for _, msg := range messages { - if _, err = api.MpoolPushMessage(ctx, msg, nil); err != nil { - log.Errorw("failed to MpoolPushMessage", "err", err, "msg", msg) - failures = failures + 1 - continue + if !r.dryRun { + if _, err = r.api.MpoolPushMessage(ctx, msg, nil); err != nil { + log.Errorw("failed to MpoolPushMessage", "err", err, "msg", msg) + failures = failures + 1 + continue + } } refundSum = types.BigAdd(refundSum, msg.Value) } - log.Infow("tipset stats", "messages_sent", len(messages)-failures, "refund_sum", refundSum.String(), "messages_failures", failures) - + log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) return nil }