Merge pull request #3714 from filecoin-project/feat/lotus-pcr-miner-recovery

lotus-pcr: add recover-miners command
This commit is contained in:
Łukasz Magiera 2020-10-01 21:27:42 +02:00 committed by GitHub
commit efc1b24f4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 613 additions and 57 deletions

View File

@ -1,8 +1,10 @@
package main package main
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/csv"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -25,11 +27,12 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -42,6 +45,8 @@ var log = logging.Logger("main")
func main() { func main() {
local := []*cli.Command{ local := []*cli.Command{
runCmd, runCmd,
recoverMinersCmd,
findMinersCmd,
versionCmd, versionCmd,
} }
@ -105,6 +110,186 @@ var versionCmd = &cli.Command{
}, },
} }
var findMinersCmd = &cli.Command{
Name: "find-miners",
Usage: "find miners with a desired minimum balance",
Description: `Find miners returns a list of miners and their balances that are below a
threhold value. By default only the miner actor available balance is considered but other
account balances can be included by enabling them through the flags.
Examples
Find all miners with an available balance below 100 FIL
lotus-pcr find-miners --threshold 100
Find all miners with a balance below zero, which includes the owner and worker balances
lotus-pcr find-miners --threshold 0 --owner --worker
`,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
},
&cli.IntFlag{
Name: "threshold",
EnvVars: []string{"LOTUS_PCR_THRESHOLD"},
Usage: "balance below this limit will be printed",
Value: 0,
},
&cli.BoolFlag{
Name: "owner",
Usage: "include owner balance",
Value: false,
},
&cli.BoolFlag{
Name: "worker",
Usage: "include worker balance",
Value: false,
},
&cli.BoolFlag{
Name: "control",
Usage: "include control balance",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path"))
if err != nil {
log.Fatal(err)
}
defer closer()
if !cctx.Bool("no-sync") {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
owner := cctx.Bool("owner")
worker := cctx.Bool("worker")
control := cctx.Bool("control")
threshold := uint64(cctx.Int("threshold"))
rf := &refunder{
api: api,
threshold: types.FromFil(threshold),
}
refundTipset, err := api.ChainHead(ctx)
if err != nil {
return err
}
balanceRefund, err := rf.FindMiners(ctx, refundTipset, NewMinersRefund(), owner, worker, control)
if err != nil {
return err
}
for _, maddr := range balanceRefund.Miners() {
fmt.Printf("%s\t%s\n", maddr, types.FIL(balanceRefund.GetRefund(maddr)))
}
return nil
},
}
var recoverMinersCmd = &cli.Command{
Name: "recover-miners",
Usage: "Ensure all miners with a negative available balance have a FIL surplus across accounts",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "from",
EnvVars: []string{"LOTUS_PCR_FROM"},
Usage: "wallet address to send refund from",
},
&cli.BoolFlag{
Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
},
&cli.BoolFlag{
Name: "dry-run",
EnvVars: []string{"LOTUS_PCR_DRY_RUN"},
Usage: "do not send any messages",
Value: false,
},
&cli.StringFlag{
Name: "output",
Usage: "dump data as a csv format to this file",
},
&cli.IntFlag{
Name: "miner-recovery-cutoff",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"},
Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied",
Value: 3000,
},
&cli.IntFlag{
Name: "miner-recovery-bonus",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"},
Usage: "additional FIL to send to each miner",
Value: 5,
},
&cli.IntFlag{
Name: "miner-recovery-refund-percent",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"},
Usage: "percent of refund to issue",
Value: 110,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path"))
if err != nil {
log.Fatal(err)
}
defer closer()
from, err := address.NewFromString(cctx.String("from"))
if err != nil {
return xerrors.Errorf("parsing source address (provide correct --from flag!): %w", err)
}
if !cctx.Bool("no-sync") {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
dryRun := cctx.Bool("dry-run")
minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent")
minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff"))
minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus"))
rf := &refunder{
api: api,
wallet: from,
dryRun: dryRun,
minerRecoveryRefundPercent: minerRecoveryRefundPercent,
minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff),
minerRecoveryBonus: types.FromFil(minerRecoveryBonus),
}
refundTipset, err := api.ChainHead(ctx)
if err != nil {
return err
}
balanceRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund(), cctx.String("output"))
if err != nil {
return err
}
if err := rf.Refund(ctx, "refund to recover miner", refundTipset, balanceRefund, 0); err != nil {
return err
}
return nil
},
}
var runCmd = &cli.Command{ var runCmd = &cli.Command{
Name: "run", Name: "run",
Usage: "Start message reimpursement", Usage: "Start message reimpursement",
@ -120,10 +305,10 @@ var runCmd = &cli.Command{
Usage: "do not wait for chain sync to complete", Usage: "do not wait for chain sync to complete",
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "percent-extra", Name: "refund-percent",
EnvVars: []string{"LOTUS_PCR_PERCENT_EXTRA"}, EnvVars: []string{"LOTUS_PCR_REFUND_PERCENT"},
Usage: "extra funds to send above the refund", Usage: "percent of refund to issue",
Value: 3, Value: 103,
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "max-message-queue", Name: "max-message-queue",
@ -161,6 +346,36 @@ var runCmd = &cli.Command{
Usage: "the number of tipsets to delay message processing to smooth chain reorgs", Usage: "the number of tipsets to delay message processing to smooth chain reorgs",
Value: int(build.MessageConfidence), Value: int(build.MessageConfidence),
}, },
&cli.BoolFlag{
Name: "miner-recovery",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY"},
Usage: "run the miner recovery job",
Value: false,
},
&cli.IntFlag{
Name: "miner-recovery-period",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_PERIOD"},
Usage: "interval between running miner recovery",
Value: 2880,
},
&cli.IntFlag{
Name: "miner-recovery-cutoff",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"},
Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied",
Value: 3000,
},
&cli.IntFlag{
Name: "miner-recovery-bonus",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"},
Usage: "additional FIL to send to each miner",
Value: 5,
},
&cli.IntFlag{
Name: "miner-recovery-refund-percent",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"},
Usage: "percent of refund to issue",
Value: 110,
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
go func() { go func() {
@ -199,24 +414,33 @@ var runCmd = &cli.Command{
log.Fatal(err) log.Fatal(err)
} }
percentExtra := cctx.Int("percent-extra") refundPercent := cctx.Int("refund-percent")
maxMessageQueue := cctx.Int("max-message-queue") maxMessageQueue := cctx.Int("max-message-queue")
dryRun := cctx.Bool("dry-run") dryRun := cctx.Bool("dry-run")
preCommitEnabled := cctx.Bool("pre-commit") preCommitEnabled := cctx.Bool("pre-commit")
proveCommitEnabled := cctx.Bool("prove-commit") proveCommitEnabled := cctx.Bool("prove-commit")
aggregateTipsets := cctx.Int("aggregate-tipsets") aggregateTipsets := cctx.Int("aggregate-tipsets")
minerRecoveryEnabled := cctx.Bool("miner-recovery")
minerRecoveryPeriod := abi.ChainEpoch(int64(cctx.Int("miner-recovery-period")))
minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent")
minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff"))
minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus"))
rf := &refunder{ rf := &refunder{
api: api, api: api,
wallet: from, wallet: from,
percentExtra: percentExtra, refundPercent: refundPercent,
dryRun: dryRun, minerRecoveryRefundPercent: minerRecoveryRefundPercent,
preCommitEnabled: preCommitEnabled, minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff),
proveCommitEnabled: proveCommitEnabled, minerRecoveryBonus: types.FromFil(minerRecoveryBonus),
dryRun: dryRun,
preCommitEnabled: preCommitEnabled,
proveCommitEnabled: proveCommitEnabled,
} }
var refunds *MinersRefund = NewMinersRefund() var refunds *MinersRefund = NewMinersRefund()
var rounds int = 0 var rounds int = 0
nextMinerRecovery := r.MinerRecoveryHeight() + minerRecoveryPeriod
for tipset := range tipsetsCh { for tipset := range tipsetsCh {
refunds, err = rf.ProcessTipset(ctx, tipset, refunds) refunds, err = rf.ProcessTipset(ctx, tipset, refunds)
@ -224,17 +448,34 @@ var runCmd = &cli.Command{
return err return err
} }
rounds = rounds + 1
if rounds < aggregateTipsets {
continue
}
refundTipset, err := api.ChainHead(ctx) refundTipset, err := api.ChainHead(ctx)
if err != nil { if err != nil {
return err return err
} }
if err := rf.Refund(ctx, refundTipset, refunds, rounds); err != nil { if minerRecoveryEnabled && refundTipset.Height() >= nextMinerRecovery {
recoveryRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund(), "")
if err != nil {
return err
}
if err := rf.Refund(ctx, "refund to recover miners", refundTipset, recoveryRefund, 0); err != nil {
return err
}
if err := r.SetMinerRecoveryHeight(tipset.Height()); err != nil {
return err
}
nextMinerRecovery = r.MinerRecoveryHeight() + minerRecoveryPeriod
}
rounds = rounds + 1
if rounds < aggregateTipsets {
continue
}
if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil {
return err return err
} }
@ -293,7 +534,6 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) {
m.count = m.count + 1 m.count = m.count + 1
m.totalRefunds = types.BigAdd(m.totalRefunds, value) m.totalRefunds = types.BigAdd(m.totalRefunds, value)
m.refunds[addr] = types.BigAdd(m.refunds[addr], value) m.refunds[addr] = types.BigAdd(m.refunds[addr], value)
} }
@ -322,8 +562,14 @@ type refunderNodeApi interface {
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
StateMinerSectors(ctx context.Context, addr address.Address, filter *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
StateMinerFaults(ctx context.Context, addr address.Address, tsk types.TipSetKey) (bitfield.BitField, error)
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
@ -332,12 +578,241 @@ type refunderNodeApi interface {
} }
type refunder struct { type refunder struct {
api refunderNodeApi api refunderNodeApi
wallet address.Address wallet address.Address
percentExtra int refundPercent int
dryRun bool minerRecoveryRefundPercent int
preCommitEnabled bool minerRecoveryCutoff big.Int
proveCommitEnabled bool minerRecoveryBonus big.Int
dryRun bool
preCommitEnabled bool
proveCommitEnabled bool
threshold big.Int
}
func (r *refunder) FindMiners(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, owner, worker, control bool) (*MinersRefund, error) {
miners, err := r.api.StateListMiners(ctx, tipset.Key())
if err != nil {
return nil, err
}
for _, maddr := range miners {
mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if !mact.Balance.GreaterThan(big.Zero()) {
continue
}
minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
// Look up and find all addresses associated with the miner
minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
allAddresses := []address.Address{}
if worker {
allAddresses = append(allAddresses, minerInfo.Worker)
}
if owner {
allAddresses = append(allAddresses, minerInfo.Owner)
}
if control {
allAddresses = append(allAddresses, minerInfo.ControlAddresses...)
}
// Sum the balancer of all the addresses
addrSum := big.Zero()
addrCheck := make(map[address.Address]struct{}, len(allAddresses))
for _, addr := range allAddresses {
if _, found := addrCheck[addr]; !found {
balance, err := r.api.WalletBalance(ctx, addr)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
addrSum = big.Add(addrSum, balance)
addrCheck[addr] = struct{}{}
}
}
totalAvailableBalance := big.Add(addrSum, minerAvailableBalance)
if totalAvailableBalance.GreaterThanEqual(r.threshold) {
continue
}
refunds.Track(maddr, totalAvailableBalance)
log.Debugw("processing miner", "miner", maddr, "sectors", "available_balance", totalAvailableBalance)
}
return refunds, nil
}
func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, output string) (*MinersRefund, error) {
miners, err := r.api.StateListMiners(ctx, tipset.Key())
if err != nil {
return nil, err
}
w := ioutil.Discard
if len(output) != 0 {
f, err := os.Create(output)
if err != nil {
return nil, err
}
defer f.Close() // nolint:errcheck
w = bufio.NewWriter(f)
}
csvOut := csv.NewWriter(w)
defer csvOut.Flush()
if err := csvOut.Write([]string{"MinerID", "FaultedSectors", "AvailableBalance", "ProposedRefund"}); err != nil {
return nil, err
}
for _, maddr := range miners {
mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if !mact.Balance.GreaterThan(big.Zero()) {
continue
}
minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
// Look up and find all addresses associated with the miner
minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
allAddresses := []address.Address{minerInfo.Worker, minerInfo.Owner}
allAddresses = append(allAddresses, minerInfo.ControlAddresses...)
// Sum the balancer of all the addresses
addrSum := big.Zero()
addrCheck := make(map[address.Address]struct{}, len(allAddresses))
for _, addr := range allAddresses {
if _, found := addrCheck[addr]; !found {
balance, err := r.api.WalletBalance(ctx, addr)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
addrSum = big.Add(addrSum, balance)
addrCheck[addr] = struct{}{}
}
}
faults, err := r.api.StateMinerFaults(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed to look up miner faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
faultsCount, err := faults.Count()
if err != nil {
log.Errorw("failed to get count of faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if faultsCount == 0 {
log.Debugw("skipping miner with zero faults", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
totalAvailableBalance := big.Add(addrSum, minerAvailableBalance)
balanceCutoff := big.Mul(big.Div(big.NewIntUnsigned(faultsCount), big.NewInt(10)), big.NewIntUnsigned(build.FilecoinPrecision))
if totalAvailableBalance.GreaterThan(balanceCutoff) {
log.Debugw(
"skipping over miner with total available balance larger than refund",
"height", tipset.Height(),
"key", tipset.Key(),
"miner", maddr,
"available_balance", totalAvailableBalance,
"balance_cutoff", balanceCutoff,
"faults_count", faultsCount,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
continue
}
refundValue := big.Sub(balanceCutoff, totalAvailableBalance)
if r.minerRecoveryRefundPercent > 0 {
refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.minerRecoveryRefundPercent)))
}
refundValue = big.Add(refundValue, r.minerRecoveryBonus)
if refundValue.GreaterThan(r.minerRecoveryCutoff) {
log.Infow(
"skipping over miner with refund greater than refund cutoff",
"height", tipset.Height(),
"key", tipset.Key(),
"miner", maddr,
"available_balance", totalAvailableBalance,
"balance_cutoff", balanceCutoff,
"faults_count", faultsCount,
"refund", refundValue,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
continue
}
refunds.Track(maddr, refundValue)
record := []string{
maddr.String(),
fmt.Sprintf("%d", faultsCount),
big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).String(),
big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).String(),
}
if err := csvOut.Write(record); err != nil {
return nil, err
}
log.Debugw(
"processing miner",
"miner", maddr,
"faults_count", faultsCount,
"available_balance", totalAvailableBalance,
"refund", refundValue,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
}
return refunds, nil
} }
func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) { func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) {
@ -458,22 +933,41 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refu
continue continue
} }
if r.percentExtra > 0 { if r.refundPercent > 0 {
refundValue = types.BigAdd(refundValue, types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.percentExtra)))) refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.refundPercent)))
} }
log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue) log.Debugw(
"processing message",
"method", messageMethod,
"cid", msg.Cid,
"from", m.From,
"to", m.To,
"value", m.Value,
"gas_fee_cap", m.GasFeeCap,
"gas_premium", m.GasPremium,
"gas_used", recps[i].GasUsed,
"refund", refundValue,
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
refunds.Track(m.From, refundValue) refunds.Track(m.From, refundValue)
tipsetRefunds.Track(m.From, refundValue) tipsetRefunds.Track(m.From, refundValue)
} }
log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "total_refunds", tipsetRefunds.TotalRefunds(), "messages_processed", tipsetRefunds.Count()) log.Infow(
"tipset stats",
"height", tipset.Height(),
"key", tipset.Key(),
"total_refunds", tipsetRefunds.TotalRefunds(),
"total_refunds_fil", big.Div(tipsetRefunds.TotalRefunds(), big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"messages_processed", tipsetRefunds.Count(),
)
return refunds, nil return refunds, nil
} }
func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error { func (r *refunder) Refund(ctx context.Context, name string, tipset *types.TipSet, refunds *MinersRefund, rounds int) error {
if refunds.Count() == 0 { if refunds.Count() == 0 {
log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key()) log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil return nil
@ -531,13 +1025,24 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi
refundSum = types.BigAdd(refundSum, msg.Value) refundSum = types.BigAdd(refundSum, msg.Value)
} }
log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) log.Infow(
name,
"tipsets_processed", rounds,
"height", tipset.Height(),
"key", tipset.Key(),
"messages_sent", len(messages)-failures,
"refund_sum", refundSum,
"refund_sum_fil", big.Div(refundSum, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"messages_failures", failures,
"messages_processed", refunds.Count(),
)
return nil return nil
} }
type Repo struct { type Repo struct {
last abi.ChainEpoch lastHeight abi.ChainEpoch
path string lastMinerRecoveryHeight abi.ChainEpoch
path string
} }
func NewRepo(path string) (*Repo, error) { func NewRepo(path string) (*Repo, error) {
@ -547,8 +1052,9 @@ func NewRepo(path string) (*Repo, error) {
} }
return &Repo{ return &Repo{
last: 0, lastHeight: 0,
path: path, lastMinerRecoveryHeight: 0,
path: path,
}, nil }, nil
} }
@ -579,43 +1085,66 @@ func (r *Repo) init() error {
return nil return nil
} }
func (r *Repo) Open() (err error) { func (r *Repo) Open() error {
if err = r.init(); err != nil { if err := r.init(); err != nil {
return return err
} }
var f *os.File if err := r.loadHeight(); err != nil {
return err
}
f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR|os.O_CREATE, 0644) if err := r.loadMinerRecoveryHeight(); err != nil {
return err
}
return nil
}
func loadChainEpoch(fn string) (abi.ChainEpoch, error) {
f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0644)
if err != nil { if err != nil {
return return 0, err
} }
defer func() { defer func() {
err = f.Close() err = f.Close()
}() }()
var raw []byte raw, err := ioutil.ReadAll(f)
raw, err = ioutil.ReadAll(f)
if err != nil { if err != nil {
return return 0, err
} }
height, err := strconv.Atoi(string(bytes.TrimSpace(raw))) height, err := strconv.Atoi(string(bytes.TrimSpace(raw)))
if err != nil { if err != nil {
return return 0, err
} }
r.last = abi.ChainEpoch(height) return abi.ChainEpoch(height), nil
return }
func (r *Repo) loadHeight() error {
var err error
r.lastHeight, err = loadChainEpoch(filepath.Join(r.path, "height"))
return err
}
func (r *Repo) loadMinerRecoveryHeight() error {
var err error
r.lastMinerRecoveryHeight, err = loadChainEpoch(filepath.Join(r.path, "miner_recovery_height"))
return err
} }
func (r *Repo) Height() abi.ChainEpoch { func (r *Repo) Height() abi.ChainEpoch {
return r.last return r.lastHeight
}
func (r *Repo) MinerRecoveryHeight() abi.ChainEpoch {
return r.lastMinerRecoveryHeight
} }
func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) { func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) {
r.last = last r.lastHeight = last
var f *os.File var f *os.File
f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR, 0644) f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR, 0644)
if err != nil { if err != nil {
@ -626,7 +1155,26 @@ func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) {
err = f.Close() err = f.Close()
}() }()
if _, err = fmt.Fprintf(f, "%d", r.last); err != nil { if _, err = fmt.Fprintf(f, "%d", r.lastHeight); err != nil {
return
}
return
}
func (r *Repo) SetMinerRecoveryHeight(last abi.ChainEpoch) (err error) {
r.lastMinerRecoveryHeight = last
var f *os.File
f, err = os.OpenFile(filepath.Join(r.path, "miner_recovery_height"), os.O_RDWR, 0644)
if err != nil {
return
}
defer func() {
err = f.Close()
}()
if _, err = fmt.Fprintf(f, "%d", r.lastMinerRecoveryHeight); err != nil {
return return
} }

View File

@ -199,16 +199,24 @@ func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, ti
return nil return nil
} }
type apiIpldStore struct { type ApiIpldStore struct {
ctx context.Context ctx context.Context
api api.FullNode api apiIpldStoreApi
} }
func (ht *apiIpldStore) Context() context.Context { type apiIpldStoreApi interface {
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
}
func NewApiIpldStore(ctx context.Context, api apiIpldStoreApi) *ApiIpldStore {
return &ApiIpldStore{ctx, api}
}
func (ht *ApiIpldStore) Context() context.Context {
return ht.ctx return ht.ctx
} }
func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { func (ht *ApiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
raw, err := ht.api.ChainReadObj(ctx, c) raw, err := ht.api.ChainReadObj(ctx, c)
if err != nil { if err != nil {
return err return err
@ -225,8 +233,8 @@ func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) err
return fmt.Errorf("Object does not implement CBORUnmarshaler") return fmt.Errorf("Object does not implement CBORUnmarshaler")
} }
func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { func (ht *ApiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore") return cid.Undef, fmt.Errorf("Put is not implemented on ApiIpldStore")
} }
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error { func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {