lotus-pcr: add recover-miners command

This commit is contained in:
Travis Person 2020-09-09 18:45:09 +00:00
parent 6050401652
commit 26de0db757
2 changed files with 172 additions and 17 deletions

View File

@ -38,6 +38,7 @@ var log = logging.Logger("main")
func main() { func main() {
local := []*cli.Command{ local := []*cli.Command{
runCmd, runCmd,
recoverMinersCmd,
versionCmd, versionCmd,
} }
@ -101,6 +102,80 @@ var versionCmd = &cli.Command{
}, },
} }
var recoverMinersCmd = &cli.Command{
Name: "recover-miners",
Usage: "Ensure all miners with a negative available balance have a FIL surplus across accounts",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "from",
EnvVars: []string{"LOTUS_PCR_FROM"},
Usage: "wallet address to send refund from",
},
&cli.BoolFlag{
Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
},
&cli.BoolFlag{
Name: "dry-run",
EnvVars: []string{"LOTUS_PCR_DRY_RUN"},
Usage: "do not send any messages",
Value: false,
},
&cli.IntFlag{
Name: "miner-total-funds-threashold",
EnvVars: []string{"LOTUS_PCR_MINER_TOTAL_FUNDS_THREASHOLD"},
Usage: "total filecoin across all accounts that should be met, if the miner balancer drops below zero",
Value: 0,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path"))
if err != nil {
log.Fatal(err)
}
defer closer()
from, err := address.NewFromString(cctx.String("from"))
if err != nil {
return xerrors.Errorf("parsing source address (provide correct --from flag!): %w", err)
}
if !cctx.Bool("no-sync") {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
dryRun := cctx.Bool("dry-run")
minerTotalFundsThreashold := uint64(cctx.Int("miner-total-funds-threashold"))
rf := &refunder{
api: api,
wallet: from,
dryRun: dryRun,
minerTotalFundsThreashold: types.FromFil(minerTotalFundsThreashold),
}
refundTipset, err := api.ChainHead(ctx)
if err != nil {
return err
}
negativeBalancerRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund())
if err != nil {
return err
}
if err := rf.Refund(ctx, "refund negative balancer miner", refundTipset, negativeBalancerRefund, 0); err != nil {
return err
}
return nil
},
}
var runCmd = &cli.Command{ var runCmd = &cli.Command{
Name: "run", Name: "run",
Usage: "Start message reimpursement", Usage: "Start message reimpursement",
@ -230,7 +305,7 @@ var runCmd = &cli.Command{
return err return err
} }
if err := rf.Refund(ctx, refundTipset, refunds, rounds); err != nil { if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil {
return err return err
} }
@ -289,7 +364,6 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) {
m.count = m.count + 1 m.count = m.count + 1
m.totalRefunds = types.BigAdd(m.totalRefunds, value) m.totalRefunds = types.BigAdd(m.totalRefunds, value)
m.refunds[addr] = types.BigAdd(m.refunds[addr], value) m.refunds[addr] = types.BigAdd(m.refunds[addr], value)
} }
@ -318,8 +392,12 @@ type refunderNodeApi interface {
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
@ -327,12 +405,81 @@ type refunderNodeApi interface {
} }
type refunder struct { type refunder struct {
api refunderNodeApi api refunderNodeApi
wallet address.Address wallet address.Address
percentExtra int percentExtra int
dryRun bool dryRun bool
preCommitEnabled bool preCommitEnabled bool
proveCommitEnabled bool proveCommitEnabled bool
minerTotalFundsThreashold big.Int
}
func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) {
miners, err := r.api.StateListMiners(ctx, tipset.Key())
if err != nil {
return nil, err
}
for _, maddr := range miners {
mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if !mact.Balance.GreaterThan(big.Zero()) {
continue
}
minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if minerAvailableBalance.GreaterThanEqual(big.Zero()) {
log.Debugw("skipping over miner with positive balance", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
// Look up and find all addresses associated with the miner
minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key())
allAddresses := []address.Address{minerInfo.Worker, minerInfo.Owner}
allAddresses = append(allAddresses, minerInfo.ControlAddresses...)
// Sum the balancer of all the addresses
addrSum := big.Zero()
addrCheck := make(map[address.Address]struct{}, len(allAddresses))
for _, addr := range allAddresses {
if _, found := addrCheck[addr]; !found {
balance, err := r.api.WalletBalance(ctx, addr)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
addrSum = big.Add(addrSum, balance)
addrCheck[addr] = struct{}{}
}
}
totalAvailableBalance := big.Add(addrSum, minerAvailableBalance)
// If the miner has available balance they should use it
if totalAvailableBalance.GreaterThanEqual(r.minerTotalFundsThreashold) {
log.Debugw("skipping over miner with enough funds cross all accounts", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr, "miner_available_balance", minerAvailableBalance, "wallet_total_balances", addrSum)
continue
}
// Calculate the required FIL to bring the miner up to the minimum across all of their accounts
refundValue := big.Add(totalAvailableBalance.Abs(), r.minerTotalFundsThreashold)
refunds.Track(maddr, refundValue)
log.Debugw("processing negative balance miner", "miner", maddr, "refund", refundValue)
}
return refunds, nil
} }
func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) { func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) {
@ -464,7 +611,7 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refu
return refunds, nil return refunds, nil
} }
func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error { func (r *refunder) Refund(ctx context.Context, name string, tipset *types.TipSet, refunds *MinersRefund, rounds int) error {
if refunds.Count() == 0 { if refunds.Count() == 0 {
log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key()) log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil return nil
@ -522,7 +669,7 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi
refundSum = types.BigAdd(refundSum, msg.Value) refundSum = types.BigAdd(refundSum, msg.Value)
} }
log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) log.Infow(name, "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
return nil return nil
} }

View File

@ -201,16 +201,24 @@ func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, ti
return nil return nil
} }
type apiIpldStore struct { type ApiIpldStore struct {
ctx context.Context ctx context.Context
api api.FullNode api apiIpldStoreApi
} }
func (ht *apiIpldStore) Context() context.Context { type apiIpldStoreApi interface {
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
}
func NewApiIpldStore(ctx context.Context, api apiIpldStoreApi) *ApiIpldStore {
return &ApiIpldStore{ctx, api}
}
func (ht *ApiIpldStore) Context() context.Context {
return ht.ctx return ht.ctx
} }
func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { func (ht *ApiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
raw, err := ht.api.ChainReadObj(ctx, c) raw, err := ht.api.ChainReadObj(ctx, c)
if err != nil { if err != nil {
return err return err
@ -227,8 +235,8 @@ func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) err
return fmt.Errorf("Object does not implement CBORUnmarshaler") return fmt.Errorf("Object does not implement CBORUnmarshaler")
} }
func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { func (ht *ApiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore") return cid.Undef, fmt.Errorf("Put is not implemented on ApiIpldStore")
} }
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {
@ -279,7 +287,7 @@ func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointLis
return fmt.Errorf("failed to unmarshal power actor state: %w", err) return fmt.Errorf("failed to unmarshal power actor state: %w", err)
} }
s := &apiIpldStore{ctx, api} s := NewApiIpldStore(ctx, api)
mp, err := adt.AsMap(s, powerActorState.Claims) mp, err := adt.AsMap(s, powerActorState.Claims)
if err != nil { if err != nil {
return err return err