diff --git a/Makefile b/Makefile index bdcecc7be..0ef3c2264 100644 --- a/Makefile +++ b/Makefile @@ -166,6 +166,13 @@ lotus-stats: .PHONY: lotus-stats BINS+=lotus-stats +lotus-pcr: + rm -f lotus-pcr + go build $(GOFLAGS) -o lotus-pcr ./cmd/lotus-pcr + go run github.com/GeertJohan/go.rice/rice append --exec lotus-pcr -i ./build +.PHONY: lotus-pcr +BINS+=lotus-pcr + lotus-health: rm -f lotus-health go build -o lotus-health ./cmd/lotus-health diff --git a/cmd/lotus-pcr/main.go b/cmd/lotus-pcr/main.go new file mode 100644 index 000000000..881112d2b --- /dev/null +++ b/cmd/lotus-pcr/main.go @@ -0,0 +1,435 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + _ "net/http/pprof" + "os" + "path/filepath" + "strconv" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + + "github.com/mitchellh/go-homedir" + "github.com/urfave/cli/v2" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/tools/stats" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" + "github.com/filecoin-project/specs-actors/actors/runtime/exitcode" +) + +var log = logging.Logger("main") + +func main() { + local := []*cli.Command{ + runCmd, + versionCmd, + } + + app := &cli.App{ + Name: "lotus-pcr", + Usage: "Refunds precommit initial pledge for all miners", + Description: `Lotus PCR will attempt to reimbursement the initial pledge collateral of the PreCommitSector + miner actor method for all miners on the network. + + The refund is sent directly to the miner actor, and not to the worker. + + The value refunded to the miner actor is not the value in the message itself, but calculated + using StateMinerInitialPledgeCollateral of the PreCommitSector message params. This is to reduce + abuse by over send in the PreCommitSector message and receiving more funds than was actually + consumed by pledging the sector. + + No gas charges are refunded as part of this process, but a small 3% (by default) additional + funds are provided. + + A single message will be produced per miner totaling their refund for all PreCommitSector messages + in a tipset. +`, + Version: build.UserVersion(), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "lotus-path", + EnvVars: []string{"LOTUS_PATH"}, + Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME + }, + &cli.StringFlag{ + Name: "repo", + EnvVars: []string{"LOTUS_PCR_PATH"}, + Value: "~/.lotuspcr", // TODO: Consider XDG_DATA_HOME + }, + }, + + Commands: local, + } + + if err := app.Run(os.Args); err != nil { + log.Warn(err) + return + } + +} + +var versionCmd = &cli.Command{ + Name: "version", + Usage: "Print version", + Action: func(cctx *cli.Context) error { + cli.VersionPrinter(cctx) + return nil + }, +} +var runCmd = &cli.Command{ + Name: "run", + Usage: "Start message reimpursement", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "from", + EnvVars: []string{"LOTUS_PCR_FROM"}, + }, + &cli.BoolFlag{ + Name: "no-sync", + EnvVars: []string{"LOTUS_PCR_NO_SYNC"}, + }, + &cli.IntFlag{ + Name: "percent-extra", + Value: 3, + }, + &cli.IntFlag{ + Name: "head-delay", + Usage: "the number of tipsets to delay message processing to smooth chain reorgs", + Value: int(build.MessageConfidence), + }, + }, + Action: func(cctx *cli.Context) error { + go func() { + http.ListenAndServe(":6060", nil) + }() + + ctx := context.Background() + api, closer, err := stats.GetFullNodeAPI(cctx.String("lotus-path")) + if err != nil { + log.Fatal(err) + } + defer closer() + + r, err := NewRepo(cctx.String("repo")) + if err != nil { + return err + } + + if err := r.Open(); err != nil { + return err + } + + from, err := address.NewFromString(cctx.String("from")) + if err != nil { + return xerrors.Errorf("parsing source address (provide correct --from flag!): %w", err) + } + + if !cctx.Bool("no-sync") { + if err := stats.WaitForSyncComplete(ctx, api); err != nil { + log.Fatal(err) + } + } + + tipsetsCh, err := stats.GetTips(ctx, api, r.Height(), cctx.Int("head-delay")) + if err != nil { + log.Fatal(err) + } + + percentExtra := cctx.Int("percent-extra") + + for tipset := range tipsetsCh { + if err := ProcessTipset(ctx, api, tipset, from, percentExtra); err != nil { + return err + } + + if err := r.SetHeight(tipset.Height()); err != nil { + return err + } + } + + return nil + }, +} + +type MinersRefund struct { + refunds map[address.Address]types.BigInt +} + +func NewMinersRefund() *MinersRefund { + return &MinersRefund{ + refunds: make(map[address.Address]types.BigInt), + } +} + +func (m *MinersRefund) Track(addr address.Address, value types.BigInt) { + m.refunds[addr] = types.BigAdd(m.refunds[addr], value) +} + +func (m *MinersRefund) Count() int { + return len(m.refunds) +} + +func (m *MinersRefund) Miners() []address.Address { + miners := make([]address.Address, 0, len(m.refunds)) + for addr := range m.refunds { + miners = append(miners, addr) + } + + return miners +} + +func (m *MinersRefund) GetRefund(addr address.Address) types.BigInt { + return m.refunds[addr] +} + +type processTipSetApi interface { + ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) + ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) + StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) + MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) + GasEsitmateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) + WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) +} + +func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipSet, wallet address.Address, percentExtra int) error { + log.Infow("processing tipset", "height", tipset.Height(), "key", tipset.Key().String()) + + cids := tipset.Cids() + if len(cids) == 0 { + return fmt.Errorf("no cids in tipset") + } + + msgs, err := api.ChainGetParentMessages(ctx, cids[0]) + if err != nil { + log.Errorw("failed to get tipset parent messages", "err", err) + return nil + } + + recps, err := api.ChainGetParentReceipts(ctx, cids[0]) + if err != nil { + log.Errorw("failed to get tipset parent receipts", "err", err) + return nil + } + + if len(msgs) != len(recps) { + log.Errorw("message length does not match receipts length", "messages", len(msgs), "receipts", len(recps)) + return nil + } + + refunds := NewMinersRefund() + + count := 0 + for i, msg := range msgs { + m := msg.Message + + a, err := api.StateGetActor(ctx, m.To, tipset.Key()) + if err != nil { + log.Warnw("failed to look up state actor", "actor", m.To) + continue + } + + if a.Code != builtin.StorageMinerActorCodeID { + continue + } + + // we only care to look at PreCommitSector messages + if m.Method != builtin.MethodsMiner.PreCommitSector { + continue + } + + if recps[i].ExitCode != exitcode.Ok { + log.Debugw("skipping non-ok exitcode message", "cid", msg.Cid.String(), "exitcode", recps[i].ExitCode) + } + + var precommitInfo miner.SectorPreCommitInfo + if err := precommitInfo.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil { + log.Warnw("failed to decode precommit params", "err", err) + continue + } + + refundValue, err := api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitInfo, tipset.Key()) + if err != nil { + log.Warnw("failed to calculate", "err", err) + continue + } + + if percentExtra > 0 { + refundValue = types.BigAdd(refundValue, types.BigDiv(refundValue, types.NewInt(100*uint64(percentExtra)))) + } + + log.Infow("processing message", "from", m.From, "to", m.To, "value", m.Value.String(), "gas_fee_cap", m.GasFeeCap.String(), "gas_premium", m.GasPremium.String(), "gas_used", fmt.Sprintf("%d", recps[i].GasUsed), "refund", refundValue.String()) + + count = count + 1 + refunds.Track(m.To, refundValue) + } + + if refunds.Count() == 0 { + log.Debugw("no messages to refund in tipset") + return nil + } + + var messages []*types.Message + refundSum := types.NewInt(0) + + for _, maddr := range refunds.Miners() { + refundValue := refunds.GetRefund(maddr) + + // We want to try and ensure these messages get mined quickly + gasPremium, err := api.GasEsitmateGasPremium(ctx, 0, wallet, 0, tipset.Key()) + if err != nil { + log.Warnw("failed to estimate gas premium", "err", err) + continue + } + + msg := &types.Message{ + Value: refundValue, + From: wallet, + To: maddr, + + GasPremium: gasPremium, + } + + refundSum = types.BigAdd(refundSum, msg.Value) + messages = append(messages, msg) + } + + balance, err := api.WalletBalance(ctx, wallet) + if err != nil { + return xerrors.Errorf("failed to get wallet balance :%w", err) + } + + // Calculate the minimum balance as the total refund we need to issue plus 5% to cover fees + minBalance := types.BigAdd(refundSum, types.BigDiv(refundSum, types.NewInt(500))) + if balance.LessThan(minBalance) { + log.Errorw("not sufficent funds to cover refunds", "balance", balance.String(), "refund_sum", refundSum.String(), "minimum_required", minBalance.String()) + return xerrors.Errorf("wallet does not have enough balance to cover refund") + } + + failures := 0 + refundSum.SetUint64(0) + for _, msg := range messages { + if _, err = api.MpoolPushMessage(ctx, msg); err != nil { + log.Errorw("failed to MpoolPushMessage", "err", err, "msg", msg) + failures = failures + 1 + continue + } + + refundSum = types.BigAdd(refundSum, msg.Value) + } + + log.Infow("tipset stats", "messages_sent", len(messages)-failures, "refund_sum", refundSum.String(), "messages_failures", failures) + + return nil +} + +type repo struct { + last abi.ChainEpoch + path string +} + +func NewRepo(path string) (*repo, error) { + path, err := homedir.Expand(path) + if err != nil { + return nil, err + } + + return &repo{ + last: 0, + path: path, + }, nil +} + +func (r *repo) exists() (bool, error) { + _, err := os.Stat(r.path) + notexist := os.IsNotExist(err) + if notexist { + err = nil + } + return !notexist, err + +} + +func (r *repo) init() error { + exist, err := r.exists() + if err != nil { + return err + } + if exist { + return nil + } + + err = os.Mkdir(r.path, 0755) //nolint: gosec + if err != nil && !os.IsExist(err) { + return err + } + + return nil +} + +func (r *repo) Open() (err error) { + if err = r.init(); err != nil { + return + } + + var f *os.File + + f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return + } + defer func() { + err = f.Close() + }() + + var raw []byte + + raw, err = ioutil.ReadAll(f) + if err != nil { + return + } + + height, err := strconv.Atoi(string(bytes.TrimSpace(raw))) + if err != nil { + return + } + + r.last = abi.ChainEpoch(height) + return +} + +func (r *repo) Height() abi.ChainEpoch { + return r.last +} + +func (r *repo) SetHeight(last abi.ChainEpoch) (err error) { + r.last = last + var f *os.File + f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR, 0644) + if err != nil { + return + } + + defer func() { + err = f.Close() + }() + + if _, err = fmt.Fprintf(f, "%d", r.last); err != nil { + return + } + + return +}