From 26de0db75756b85f5135017fa43282b21b3255eb Mon Sep 17 00:00:00 2001 From: Travis Person Date: Wed, 9 Sep 2020 18:45:09 +0000 Subject: [PATCH] lotus-pcr: add recover-miners command --- cmd/lotus-pcr/main.go | 167 ++++++++++++++++++++++++++++++++++++++--- tools/stats/metrics.go | 22 ++++-- 2 files changed, 172 insertions(+), 17 deletions(-) diff --git a/cmd/lotus-pcr/main.go b/cmd/lotus-pcr/main.go index dc12693ca..473fc7ff1 100644 --- a/cmd/lotus-pcr/main.go +++ b/cmd/lotus-pcr/main.go @@ -38,6 +38,7 @@ var log = logging.Logger("main") func main() { local := []*cli.Command{ runCmd, + recoverMinersCmd, versionCmd, } @@ -101,6 +102,80 @@ var versionCmd = &cli.Command{ }, } +var recoverMinersCmd = &cli.Command{ + Name: "recover-miners", + Usage: "Ensure all miners with a negative available balance have a FIL surplus across accounts", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "from", + EnvVars: []string{"LOTUS_PCR_FROM"}, + Usage: "wallet address to send refund from", + }, + &cli.BoolFlag{ + Name: "no-sync", + EnvVars: []string{"LOTUS_PCR_NO_SYNC"}, + Usage: "do not wait for chain sync to complete", + }, + &cli.BoolFlag{ + Name: "dry-run", + EnvVars: []string{"LOTUS_PCR_DRY_RUN"}, + Usage: "do not send any messages", + Value: false, + }, + &cli.IntFlag{ + Name: "miner-total-funds-threashold", + EnvVars: []string{"LOTUS_PCR_MINER_TOTAL_FUNDS_THREASHOLD"}, + Usage: "total filecoin across all accounts that should be met, if the miner balancer drops below zero", + Value: 0, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := context.Background() + api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path")) + if err != nil { + log.Fatal(err) + } + defer closer() + + from, err := address.NewFromString(cctx.String("from")) + if err != nil { + return xerrors.Errorf("parsing source address (provide correct --from flag!): %w", err) + } + + if !cctx.Bool("no-sync") { + if err := stats.WaitForSyncComplete(ctx, api); err != nil { + log.Fatal(err) + } + } + + dryRun := cctx.Bool("dry-run") + minerTotalFundsThreashold := uint64(cctx.Int("miner-total-funds-threashold")) + + rf := &refunder{ + api: api, + wallet: from, + dryRun: dryRun, + minerTotalFundsThreashold: types.FromFil(minerTotalFundsThreashold), + } + + refundTipset, err := api.ChainHead(ctx) + if err != nil { + return err + } + + negativeBalancerRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund()) + if err != nil { + return err + } + + if err := rf.Refund(ctx, "refund negative balancer miner", refundTipset, negativeBalancerRefund, 0); err != nil { + return err + } + + return nil + }, +} + var runCmd = &cli.Command{ Name: "run", Usage: "Start message reimpursement", @@ -230,7 +305,7 @@ var runCmd = &cli.Command{ return err } - if err := rf.Refund(ctx, refundTipset, refunds, rounds); err != nil { + if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil { return err } @@ -289,7 +364,6 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) { m.count = m.count + 1 m.totalRefunds = types.BigAdd(m.totalRefunds, value) - m.refunds[addr] = types.BigAdd(m.refunds[addr], value) } @@ -318,8 +392,12 @@ type refunderNodeApi interface { ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) + StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) + StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) @@ -327,12 +405,81 @@ type refunderNodeApi interface { } type refunder struct { - api refunderNodeApi - wallet address.Address - percentExtra int - dryRun bool - preCommitEnabled bool - proveCommitEnabled bool + api refunderNodeApi + wallet address.Address + percentExtra int + dryRun bool + preCommitEnabled bool + proveCommitEnabled bool + minerTotalFundsThreashold big.Int +} + +func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) { + miners, err := r.api.StateListMiners(ctx, tipset.Key()) + if err != nil { + return nil, err + } + + for _, maddr := range miners { + mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + if !mact.Balance.GreaterThan(big.Zero()) { + continue + } + + minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key()) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + if minerAvailableBalance.GreaterThanEqual(big.Zero()) { + log.Debugw("skipping over miner with positive balance", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + // Look up and find all addresses associated with the miner + minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key()) + + allAddresses := []address.Address{minerInfo.Worker, minerInfo.Owner} + allAddresses = append(allAddresses, minerInfo.ControlAddresses...) + + // Sum the balancer of all the addresses + addrSum := big.Zero() + addrCheck := make(map[address.Address]struct{}, len(allAddresses)) + for _, addr := range allAddresses { + if _, found := addrCheck[addr]; !found { + balance, err := r.api.WalletBalance(ctx, addr) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + addrSum = big.Add(addrSum, balance) + addrCheck[addr] = struct{}{} + } + } + + totalAvailableBalance := big.Add(addrSum, minerAvailableBalance) + + // If the miner has available balance they should use it + if totalAvailableBalance.GreaterThanEqual(r.minerTotalFundsThreashold) { + log.Debugw("skipping over miner with enough funds cross all accounts", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr, "miner_available_balance", minerAvailableBalance, "wallet_total_balances", addrSum) + continue + } + + // Calculate the required FIL to bring the miner up to the minimum across all of their accounts + refundValue := big.Add(totalAvailableBalance.Abs(), r.minerTotalFundsThreashold) + refunds.Track(maddr, refundValue) + + log.Debugw("processing negative balance miner", "miner", maddr, "refund", refundValue) + } + + return refunds, nil } func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) { @@ -464,7 +611,7 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refu return refunds, nil } -func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error { +func (r *refunder) Refund(ctx context.Context, name string, tipset *types.TipSet, refunds *MinersRefund, rounds int) error { if refunds.Count() == 0 { log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key()) return nil @@ -522,7 +669,7 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi refundSum = types.BigAdd(refundSum, msg.Value) } - log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) + log.Infow(name, "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) return nil } diff --git a/tools/stats/metrics.go b/tools/stats/metrics.go index e50ac953f..eeef72e8e 100644 --- a/tools/stats/metrics.go +++ b/tools/stats/metrics.go @@ -201,16 +201,24 @@ func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, ti return nil } -type apiIpldStore struct { +type ApiIpldStore struct { ctx context.Context - api api.FullNode + api apiIpldStoreApi } -func (ht *apiIpldStore) Context() context.Context { +type apiIpldStoreApi interface { + ChainReadObj(context.Context, cid.Cid) ([]byte, error) +} + +func NewApiIpldStore(ctx context.Context, api apiIpldStoreApi) *ApiIpldStore { + return &ApiIpldStore{ctx, api} +} + +func (ht *ApiIpldStore) Context() context.Context { return ht.ctx } -func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { +func (ht *ApiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { raw, err := ht.api.ChainReadObj(ctx, c) if err != nil { return err @@ -227,8 +235,8 @@ func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) err return fmt.Errorf("Object does not implement CBORUnmarshaler") } -func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { - return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore") +func (ht *ApiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cid.Undef, fmt.Errorf("Put is not implemented on ApiIpldStore") } func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { @@ -279,7 +287,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis return fmt.Errorf("failed to unmarshal power actor state: %w", err) } - s := &apiIpldStore{ctx, api} + s := NewApiIpldStore(ctx, api) mp, err := adt.AsMap(s, powerActorState.Claims) if err != nil { return err