lotus-pcr: update miner recovery

This commit is contained in:
Travis Person 2020-09-19 00:51:49 +00:00
parent e39036dd49
commit e6bbc03ca8

View File

@ -7,7 +7,6 @@ import (
"encoding/csv" "encoding/csv"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
@ -213,16 +212,28 @@ var recoverMinersCmd = &cli.Command{
Usage: "do not send any messages", Usage: "do not send any messages",
Value: false, Value: false,
}, },
&cli.IntFlag{
Name: "threshold",
EnvVars: []string{"LOTUS_PCR_THRESHOLD"},
Usage: "total filecoin across all accounts that should be met, if the miner balance drops below zero",
Value: 0,
},
&cli.StringFlag{ &cli.StringFlag{
Name: "output", Name: "output",
Usage: "dump data as a csv format to this file", Usage: "dump data as a csv format to this file",
}, },
&cli.IntFlag{
Name: "miner-recovery-cutoff",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"},
Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied",
Value: 3000,
},
&cli.IntFlag{
Name: "miner-recovery-bonus",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"},
Usage: "additional FIL to send to each miner",
Value: 5,
},
&cli.IntFlag{
Name: "miner-recovery-refund-percent",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"},
Usage: "percent of refund to issue",
Value: 110,
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
ctx := context.Background() ctx := context.Background()
@ -244,13 +255,17 @@ var recoverMinersCmd = &cli.Command{
} }
dryRun := cctx.Bool("dry-run") dryRun := cctx.Bool("dry-run")
threshold := uint64(cctx.Int("threshold")) minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent")
minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff"))
minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus"))
rf := &refunder{ rf := &refunder{
api: api, api: api,
wallet: from, wallet: from,
dryRun: dryRun, dryRun: dryRun,
threshold: types.FromFil(threshold), minerRecoveryRefundPercent: minerRecoveryRefundPercent,
minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff),
minerRecoveryBonus: types.FromFil(minerRecoveryBonus),
} }
refundTipset, err := api.ChainHead(ctx) refundTipset, err := api.ChainHead(ctx)
@ -286,10 +301,10 @@ var runCmd = &cli.Command{
Usage: "do not wait for chain sync to complete", Usage: "do not wait for chain sync to complete",
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "percent-extra", Name: "refund-percent",
EnvVars: []string{"LOTUS_PCR_PERCENT_EXTRA"}, EnvVars: []string{"LOTUS_PCR_REFUND_PERCENT"},
Usage: "extra funds to send above the refund", Usage: "percent of refund to issue",
Value: 3, Value: 103,
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "max-message-queue", Name: "max-message-queue",
@ -327,6 +342,36 @@ var runCmd = &cli.Command{
Usage: "the number of tipsets to delay message processing to smooth chain reorgs", Usage: "the number of tipsets to delay message processing to smooth chain reorgs",
Value: int(build.MessageConfidence), Value: int(build.MessageConfidence),
}, },
&cli.BoolFlag{
Name: "miner-recovery",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY"},
Usage: "run the miner recovery job",
Value: false,
},
&cli.IntFlag{
Name: "miner-recovery-period",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_PERIOD"},
Usage: "interval between running miner recovery",
Value: 2880,
},
&cli.IntFlag{
Name: "miner-recovery-cutoff",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"},
Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied",
Value: 3000,
},
&cli.IntFlag{
Name: "miner-recovery-bonus",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"},
Usage: "additional FIL to send to each miner",
Value: 5,
},
&cli.IntFlag{
Name: "miner-recovery-refund-percent",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"},
Usage: "percent of refund to issue",
Value: 110,
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
go func() { go func() {
@ -365,24 +410,33 @@ var runCmd = &cli.Command{
log.Fatal(err) log.Fatal(err)
} }
percentExtra := cctx.Int("percent-extra") refundPercent := cctx.Int("refund-percent")
maxMessageQueue := cctx.Int("max-message-queue") maxMessageQueue := cctx.Int("max-message-queue")
dryRun := cctx.Bool("dry-run") dryRun := cctx.Bool("dry-run")
preCommitEnabled := cctx.Bool("pre-commit") preCommitEnabled := cctx.Bool("pre-commit")
proveCommitEnabled := cctx.Bool("prove-commit") proveCommitEnabled := cctx.Bool("prove-commit")
aggregateTipsets := cctx.Int("aggregate-tipsets") aggregateTipsets := cctx.Int("aggregate-tipsets")
minerRecoveryEnabled := cctx.Bool("miner-recovery")
minerRecoveryPeriod := abi.ChainEpoch(int64(cctx.Int("miner-recovery-period")))
minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent")
minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff"))
minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus"))
rf := &refunder{ rf := &refunder{
api: api, api: api,
wallet: from, wallet: from,
percentExtra: percentExtra, refundPercent: refundPercent,
dryRun: dryRun, minerRecoveryRefundPercent: minerRecoveryRefundPercent,
preCommitEnabled: preCommitEnabled, minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff),
proveCommitEnabled: proveCommitEnabled, minerRecoveryBonus: types.FromFil(minerRecoveryBonus),
dryRun: dryRun,
preCommitEnabled: preCommitEnabled,
proveCommitEnabled: proveCommitEnabled,
} }
var refunds *MinersRefund = NewMinersRefund() var refunds *MinersRefund = NewMinersRefund()
var rounds int = 0 var rounds int = 0
nextMinerRecovery := r.MinerRecoveryHeight() + minerRecoveryPeriod
for tipset := range tipsetsCh { for tipset := range tipsetsCh {
refunds, err = rf.ProcessTipset(ctx, tipset, refunds) refunds, err = rf.ProcessTipset(ctx, tipset, refunds)
@ -390,16 +444,33 @@ var runCmd = &cli.Command{
return err return err
} }
rounds = rounds + 1
if rounds < aggregateTipsets {
continue
}
refundTipset, err := api.ChainHead(ctx) refundTipset, err := api.ChainHead(ctx)
if err != nil { if err != nil {
return err return err
} }
if minerRecoveryEnabled && refundTipset.Height() >= nextMinerRecovery {
recoveryRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund(), "")
if err != nil {
return err
}
if err := rf.Refund(ctx, "refund to recover miners", refundTipset, recoveryRefund, 0); err != nil {
return err
}
if err := r.SetMinerRecoveryHeight(tipset.Height()); err != nil {
return err
}
nextMinerRecovery = r.MinerRecoveryHeight() + minerRecoveryPeriod
}
rounds = rounds + 1
if rounds < aggregateTipsets {
continue
}
if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil { if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil {
return err return err
} }
@ -502,13 +573,16 @@ type refunderNodeApi interface {
} }
type refunder struct { type refunder struct {
api refunderNodeApi api refunderNodeApi
wallet address.Address wallet address.Address
percentExtra int refundPercent int
dryRun bool minerRecoveryRefundPercent int
preCommitEnabled bool minerRecoveryCutoff big.Int
proveCommitEnabled bool minerRecoveryBonus big.Int
threshold big.Int dryRun bool
preCommitEnabled bool
proveCommitEnabled bool
threshold big.Int
} }
func (r *refunder) FindMiners(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, owner, worker, control bool) (*MinersRefund, error) { func (r *refunder) FindMiners(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, owner, worker, control bool) (*MinersRefund, error) {
@ -601,7 +675,7 @@ func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet
csvOut := csv.NewWriter(w) csvOut := csv.NewWriter(w)
defer csvOut.Flush() defer csvOut.Flush()
if err := csvOut.Write([]string{"MinerID", "Sectors", "CombinedBalance", "ProposedSend"}); err != nil { if err := csvOut.Write([]string{"MinerID", "FaultedSectors", "AvailableBalance", "ProposedRefund"}); err != nil {
return nil, err return nil, err
} }
@ -644,37 +718,85 @@ func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet
} }
} }
sectorInfo, err := r.api.StateMinerSectors(ctx, maddr, nil, false, tipset.Key()) faults, err := r.api.StateMinerFaults(ctx, maddr, tipset.Key())
if err != nil { if err != nil {
log.Errorw("failed to look up miner sectors", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) log.Errorw("failed to look up miner faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue continue
} }
if len(sectorInfo) == 0 { faultsCount, err := faults.Count()
log.Debugw("skipping miner with zero sectors", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) if err != nil {
log.Errorw("failed to get count of faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if faultsCount == 0 {
log.Debugw("skipping miner with zero faults", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue continue
} }
totalAvailableBalance := big.Add(addrSum, minerAvailableBalance) totalAvailableBalance := big.Add(addrSum, minerAvailableBalance)
balanceCutoff := big.Mul(big.Div(big.NewIntUnsigned(faultsCount), big.NewInt(10)), big.NewIntUnsigned(build.FilecoinPrecision))
numSectorInfo := float64(len(sectorInfo)) if totalAvailableBalance.GreaterThan(balanceCutoff) {
filAmount := uint64(math.Ceil(math.Max(math.Log(numSectorInfo)*math.Sqrt(numSectorInfo)/4, 20))) log.Debugw(
attoFilAmount := big.Mul(big.NewIntUnsigned(filAmount), big.NewIntUnsigned(build.FilecoinPrecision)) "skipping over miner with total available balance larger than refund",
"height", tipset.Height(),
if totalAvailableBalance.GreaterThanEqual(attoFilAmount) { "key", tipset.Key(),
log.Debugw("skipping over miner with total available balance larger than refund", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr, "available_balance", totalAvailableBalance, "possible_refund", attoFilAmount) "miner", maddr,
"available_balance", totalAvailableBalance,
"balance_cutoff", balanceCutoff,
"faults_count", faultsCount,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
continue continue
} }
refundValue := big.Sub(attoFilAmount, totalAvailableBalance) refundValue := big.Sub(balanceCutoff, totalAvailableBalance)
if r.minerRecoveryRefundPercent > 0 {
refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.minerRecoveryRefundPercent)))
}
refundValue = big.Add(refundValue, r.minerRecoveryBonus)
if refundValue.GreaterThan(r.minerRecoveryCutoff) {
log.Infow(
"skipping over miner with refund greater than refund cutoff",
"height", tipset.Height(),
"key", tipset.Key(),
"miner", maddr,
"available_balance", totalAvailableBalance,
"balance_cutoff", balanceCutoff,
"faults_count", faultsCount,
"refund", refundValue,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
continue
}
refunds.Track(maddr, refundValue) refunds.Track(maddr, refundValue)
record := []string{maddr.String(), fmt.Sprintf("%d", len(sectorInfo)), totalAvailableBalance.String(), refundValue.String()} record := []string{
maddr.String(),
fmt.Sprintf("%d", faultsCount),
big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).String(),
big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).String(),
}
if err := csvOut.Write(record); err != nil { if err := csvOut.Write(record); err != nil {
return nil, err return nil, err
} }
log.Debugw("processing miner", "miner", maddr, "sectors", len(sectorInfo), "available_balance", totalAvailableBalance, "refund", refundValue) log.Debugw(
"processing miner",
"miner", maddr,
"faults_count", faultsCount,
"available_balance", totalAvailableBalance,
"refund", refundValue,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
} }
return refunds, nil return refunds, nil
@ -794,17 +916,36 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refu
continue continue
} }
if r.percentExtra > 0 { if r.refundPercent > 0 {
refundValue = types.BigAdd(refundValue, types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.percentExtra)))) refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.refundPercent)))
} }
log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue) log.Debugw(
"processing message",
"method", messageMethod,
"cid", msg.Cid,
"from", m.From,
"to", m.To,
"value", m.Value,
"gas_fee_cap", m.GasFeeCap,
"gas_premium", m.GasPremium,
"gas_used", recps[i].GasUsed,
"refund", refundValue,
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
refunds.Track(m.From, refundValue) refunds.Track(m.From, refundValue)
tipsetRefunds.Track(m.From, refundValue) tipsetRefunds.Track(m.From, refundValue)
} }
log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "total_refunds", tipsetRefunds.TotalRefunds(), "messages_processed", tipsetRefunds.Count()) log.Infow(
"tipset stats",
"height", tipset.Height(),
"key", tipset.Key(),
"total_refunds", tipsetRefunds.TotalRefunds(),
"total_refunds_fil", big.Div(tipsetRefunds.TotalRefunds(), big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"messages_processed", tipsetRefunds.Count(),
)
return refunds, nil return refunds, nil
} }
@ -867,13 +1008,24 @@ func (r *refunder) Refund(ctx context.Context, name string, tipset *types.TipSet
refundSum = types.BigAdd(refundSum, msg.Value) refundSum = types.BigAdd(refundSum, msg.Value)
} }
log.Infow(name, "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) log.Infow(
name,
"tipsets_processed", rounds,
"height", tipset.Height(),
"key", tipset.Key(),
"messages_sent", len(messages)-failures,
"refund_sum", refundSum,
"refund_sum_fil", big.Div(refundSum, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"messages_failures", failures,
"messages_processed", refunds.Count(),
)
return nil return nil
} }
type Repo struct { type Repo struct {
last abi.ChainEpoch lastHeight abi.ChainEpoch
path string lastMinerRecoveryHeight abi.ChainEpoch
path string
} }
func NewRepo(path string) (*Repo, error) { func NewRepo(path string) (*Repo, error) {
@ -883,8 +1035,9 @@ func NewRepo(path string) (*Repo, error) {
} }
return &Repo{ return &Repo{
last: 0, lastHeight: 0,
path: path, lastMinerRecoveryHeight: 0,
path: path,
}, nil }, nil
} }
@ -915,43 +1068,66 @@ func (r *Repo) init() error {
return nil return nil
} }
func (r *Repo) Open() (err error) { func (r *Repo) Open() error {
if err = r.init(); err != nil { if err := r.init(); err != nil {
return return err
} }
var f *os.File if err := r.loadHeight(); err != nil {
return err
}
f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR|os.O_CREATE, 0644) if err := r.loadMinerRecoveryHeight(); err != nil {
return err
}
return nil
}
func loadChainEpoch(fn string) (abi.ChainEpoch, error) {
f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0644)
if err != nil { if err != nil {
return return 0, err
} }
defer func() { defer func() {
err = f.Close() err = f.Close()
}() }()
var raw []byte raw, err := ioutil.ReadAll(f)
raw, err = ioutil.ReadAll(f)
if err != nil { if err != nil {
return return 0, err
} }
height, err := strconv.Atoi(string(bytes.TrimSpace(raw))) height, err := strconv.Atoi(string(bytes.TrimSpace(raw)))
if err != nil { if err != nil {
return return 0, err
} }
r.last = abi.ChainEpoch(height) return abi.ChainEpoch(height), nil
return }
func (r *Repo) loadHeight() error {
var err error
r.lastHeight, err = loadChainEpoch(filepath.Join(r.path, "height"))
return err
}
func (r *Repo) loadMinerRecoveryHeight() error {
var err error
r.lastMinerRecoveryHeight, err = loadChainEpoch(filepath.Join(r.path, "miner_recovery_height"))
return err
} }
func (r *Repo) Height() abi.ChainEpoch { func (r *Repo) Height() abi.ChainEpoch {
return r.last return r.lastHeight
}
func (r *Repo) MinerRecoveryHeight() abi.ChainEpoch {
return r.lastMinerRecoveryHeight
} }
func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) { func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) {
r.last = last r.lastHeight = last
var f *os.File var f *os.File
f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR, 0644) f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR, 0644)
if err != nil { if err != nil {
@ -962,7 +1138,26 @@ func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) {
err = f.Close() err = f.Close()
}() }()
if _, err = fmt.Fprintf(f, "%d", r.last); err != nil { if _, err = fmt.Fprintf(f, "%d", r.lastHeight); err != nil {
return
}
return
}
func (r *Repo) SetMinerRecoveryHeight(last abi.ChainEpoch) (err error) {
r.lastMinerRecoveryHeight = last
var f *os.File
f, err = os.OpenFile(filepath.Join(r.path, "miner_recovery_height"), os.O_RDWR, 0644)
if err != nil {
return
}
defer func() {
err = f.Close()
}()
if _, err = fmt.Fprintf(f, "%d", r.lastMinerRecoveryHeight); err != nil {
return return
} }