Merge pull request #3118 from filecoin-project/feat/improve-pcr

lotus-pcr: refund provecommit sectors
This commit is contained in:
Łukasz Magiera 2020-08-18 21:18:45 +02:00 committed by GitHub
commit d1cb60bdee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -10,6 +10,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -20,6 +21,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/runtime/exitcode" "github.com/filecoin-project/specs-actors/actors/runtime/exitcode"
@ -70,16 +72,24 @@ func main() {
EnvVars: []string{"LOTUS_PCR_PATH"}, EnvVars: []string{"LOTUS_PCR_PATH"},
Value: "~/.lotuspcr", // TODO: Consider XDG_DATA_HOME Value: "~/.lotuspcr", // TODO: Consider XDG_DATA_HOME
}, },
&cli.StringFlag{
Name: "log-level",
EnvVars: []string{"LOTUS_PCR_LOG_LEVEL"},
Hidden: true,
Value: "info",
},
},
Before: func(cctx *cli.Context) error {
return logging.SetLogLevel("main", cctx.String("log-level"))
}, },
Commands: local, Commands: local,
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Warn(err) log.Errorw("exit in error", "err", err)
os.Exit(1)
return return
} }
} }
var versionCmd = &cli.Command{ var versionCmd = &cli.Command{
@ -90,6 +100,7 @@ var versionCmd = &cli.Command{
return nil return nil
}, },
} }
var runCmd = &cli.Command{ var runCmd = &cli.Command{
Name: "run", Name: "run",
Usage: "Start message reimpursement", Usage: "Start message reimpursement",
@ -97,19 +108,36 @@ var runCmd = &cli.Command{
&cli.StringFlag{ &cli.StringFlag{
Name: "from", Name: "from",
EnvVars: []string{"LOTUS_PCR_FROM"}, EnvVars: []string{"LOTUS_PCR_FROM"},
Usage: "wallet address to send refund from",
}, },
&cli.BoolFlag{ &cli.BoolFlag{
Name: "no-sync", Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"}, EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "percent-extra", Name: "percent-extra",
Value: 3, EnvVars: []string{"LOTUS_PCR_PERCENT_EXTRA"},
Usage: "extra funds to send above the refund",
Value: 3,
}, },
&cli.IntFlag{ &cli.IntFlag{
Name: "head-delay", Name: "max-message-queue",
Usage: "the number of tipsets to delay message processing to smooth chain reorgs", EnvVars: []string{"LOTUS_PCR_MAX_MESSAGE_QUEUE"},
Value: int(build.MessageConfidence), Usage: "set the maximum number of messages that can be queue in the mpool",
Value: 3000,
},
&cli.BoolFlag{
Name: "dry-run",
EnvVars: []string{"LOTUS_PCR_DRY_RUN"},
Usage: "do not send any messages",
Value: false,
},
&cli.IntFlag{
Name: "head-delay",
EnvVars: []string{"LOTUS_PCR_HEAD_DELAY"},
Usage: "the number of tipsets to delay message processing to smooth chain reorgs",
Value: int(build.MessageConfidence),
}, },
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
@ -150,15 +178,52 @@ var runCmd = &cli.Command{
} }
percentExtra := cctx.Int("percent-extra") percentExtra := cctx.Int("percent-extra")
maxMessageQueue := cctx.Int("max-message-queue")
dryRun := cctx.Bool("dry-run")
rf := &refunder{
api: api,
wallet: from,
percentExtra: percentExtra,
dryRun: dryRun,
}
for tipset := range tipsetsCh { for tipset := range tipsetsCh {
if err := ProcessTipset(ctx, api, tipset, from, percentExtra); err != nil { refunds, err := rf.ProcessTipset(ctx, tipset)
if err != nil {
return err
}
if err := rf.Refund(ctx, tipset, refunds); err != nil {
return err return err
} }
if err := r.SetHeight(tipset.Height()); err != nil { if err := r.SetHeight(tipset.Height()); err != nil {
return err return err
} }
for {
msgs, err := api.MpoolPending(ctx, types.EmptyTSK)
if err != nil {
log.Warnw("failed to fetch pending messages", "err", err)
time.Sleep(time.Duration(int64(time.Second) * int64(build.BlockDelaySecs)))
continue
}
count := 0
for _, msg := range msgs {
if msg.Message.From == from {
count = count + 1
}
}
if count < maxMessageQueue {
break
}
log.Warnw("messages in mpool over max message queue", "message_count", count, "max_message_queue", maxMessageQueue)
time.Sleep(time.Duration(int64(time.Second) * int64(build.BlockDelaySecs)))
}
} }
return nil return nil
@ -167,6 +232,7 @@ var runCmd = &cli.Command{
type MinersRefund struct { type MinersRefund struct {
refunds map[address.Address]types.BigInt refunds map[address.Address]types.BigInt
count int
} }
func NewMinersRefund() *MinersRefund { func NewMinersRefund() *MinersRefund {
@ -180,11 +246,13 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) {
m.refunds[addr] = types.NewInt(0) m.refunds[addr] = types.NewInt(0)
} }
m.count = m.count + 1
m.refunds[addr] = types.BigAdd(m.refunds[addr], value) m.refunds[addr] = types.BigAdd(m.refunds[addr], value)
} }
func (m *MinersRefund) Count() int { func (m *MinersRefund) Count() int {
return len(m.refunds) return m.count
} }
func (m *MinersRefund) Miners() []address.Address { func (m *MinersRefund) Miners() []address.Address {
@ -200,50 +268,58 @@ func (m *MinersRefund) GetRefund(addr address.Address) types.BigInt {
return m.refunds[addr] return m.refunds[addr]
} }
type processTipSetApi interface { type refunderNodeApi interface {
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error)
StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error)
} }
func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipSet, wallet address.Address, percentExtra int) error { type refunder struct {
log.Infow("processing tipset", "height", tipset.Height(), "key", tipset.Key().String()) api refunderNodeApi
wallet address.Address
percentExtra int
dryRun bool
}
func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet) (*MinersRefund, error) {
cids := tipset.Cids() cids := tipset.Cids()
if len(cids) == 0 { if len(cids) == 0 {
return fmt.Errorf("no cids in tipset") log.Errorw("no cids in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil, fmt.Errorf("no cids in tipset")
} }
msgs, err := api.ChainGetParentMessages(ctx, cids[0]) msgs, err := r.api.ChainGetParentMessages(ctx, cids[0])
if err != nil { if err != nil {
log.Errorw("failed to get tipset parent messages", "err", err) log.Errorw("failed to get tipset parent messages", "err", err, "height", tipset.Height(), "key", tipset.Key())
return nil return nil, nil
} }
recps, err := api.ChainGetParentReceipts(ctx, cids[0]) recps, err := r.api.ChainGetParentReceipts(ctx, cids[0])
if err != nil { if err != nil {
log.Errorw("failed to get tipset parent receipts", "err", err) log.Errorw("failed to get tipset parent receipts", "err", err, "height", tipset.Height(), "key", tipset.Key())
return nil return nil, nil
} }
if len(msgs) != len(recps) { if len(msgs) != len(recps) {
log.Errorw("message length does not match receipts length", "messages", len(msgs), "receipts", len(recps)) log.Errorw("message length does not match receipts length", "height", tipset.Height(), "key", tipset.Key(), "messages", len(msgs), "receipts", len(recps))
return nil return nil, nil
} }
refunds := NewMinersRefund() refunds := NewMinersRefund()
count := 0 refundValue := types.NewInt(0)
for i, msg := range msgs { for i, msg := range msgs {
m := msg.Message m := msg.Message
a, err := api.StateGetActor(ctx, m.To, tipset.Key()) a, err := r.api.StateGetActor(ctx, m.To, tipset.Key())
if err != nil { if err != nil {
log.Warnw("failed to look up state actor", "actor", m.To) log.Warnw("failed to look up state actor", "height", tipset.Height(), "key", tipset.Key(), "actor", m.To)
continue continue
} }
@ -251,39 +327,87 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS
continue continue
} }
// we only care to look at PreCommitSector messages var messageMethod string
if m.Method != builtin.MethodsMiner.PreCommitSector {
switch m.Method {
case builtin.MethodsMiner.ProveCommitSector:
messageMethod = "ProveCommitSector"
if recps[i].ExitCode != exitcode.Ok {
log.Debugw("skipping non-ok exitcode message", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "exitcode", recps[i].ExitCode)
continue
}
var proveCommitSector miner.ProveCommitSectorParams
if err := proveCommitSector.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil {
log.Warnw("failed to decode provecommit params", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To)
continue
}
// We use the parent tipset key because precommit information is removed when ProveCommitSector is executed
precommitChainInfo, err := r.api.StateSectorPreCommitInfo(ctx, m.To, proveCommitSector.SectorNumber, tipset.Parents())
if err != nil {
log.Warnw("failed to get precommit info for sector", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
continue
}
precommitTipset, err := r.api.ChainGetTipSetByHeight(ctx, precommitChainInfo.PreCommitEpoch, tipset.Key())
if err != nil {
log.Warnf("failed to lookup precommit epoch", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
continue
}
collateral, err := r.api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitChainInfo.Info, precommitTipset.Key())
if err != nil {
log.Warnw("failed to get initial pledge collateral", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
}
collateral = big.Sub(collateral, precommitChainInfo.PreCommitDeposit)
if collateral.LessThan(big.Zero()) {
log.Debugw("skipping zero pledge collateral difference", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", proveCommitSector.SectorNumber)
continue
}
refundValue = collateral
case builtin.MethodsMiner.PreCommitSector:
messageMethod = "PreCommitSector"
if recps[i].ExitCode != exitcode.Ok {
log.Debugw("skipping non-ok exitcode message", "method", messageMethod, "cid", msg.Cid, "miner", m.To, "exitcode", recps[i].ExitCode)
continue
}
var precommitInfo miner.SectorPreCommitInfo
if err := precommitInfo.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil {
log.Warnw("failed to decode precommit params", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To)
continue
}
collateral, err := r.api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitInfo, tipset.Key())
if err != nil {
log.Warnw("failed to calculate initial pledge collateral", "err", err, "method", messageMethod, "cid", msg.Cid, "miner", m.To, "sector_number", precommitInfo.SectorNumber)
continue
}
refundValue = collateral
default:
continue continue
} }
if recps[i].ExitCode != exitcode.Ok { if r.percentExtra > 0 {
log.Debugw("skipping non-ok exitcode message", "cid", msg.Cid.String(), "exitcode", recps[i].ExitCode) refundValue = types.BigAdd(refundValue, types.BigDiv(types.BigMul(refundValue, types.NewInt(100)), types.NewInt(uint64(r.percentExtra))))
} }
var precommitInfo miner.SectorPreCommitInfo log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue)
if err := precommitInfo.UnmarshalCBOR(bytes.NewBuffer(m.Params)); err != nil {
log.Warnw("failed to decode precommit params", "err", err)
continue
}
refundValue, err := api.StateMinerInitialPledgeCollateral(ctx, m.To, precommitInfo, tipset.Key())
if err != nil {
log.Warnw("failed to calculate", "err", err)
continue
}
if percentExtra > 0 {
refundValue = types.BigAdd(refundValue, types.BigDiv(refundValue, types.NewInt(100*uint64(percentExtra))))
}
log.Infow("processing message", "from", m.From, "to", m.To, "value", m.Value.String(), "gas_fee_cap", m.GasFeeCap.String(), "gas_premium", m.GasPremium.String(), "gas_used", fmt.Sprintf("%d", recps[i].GasUsed), "refund", refundValue.String())
count = count + 1
refunds.Track(m.From, refundValue) refunds.Track(m.From, refundValue)
} }
return refunds, nil
}
func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) error {
if refunds.Count() == 0 { if refunds.Count() == 0 {
log.Debugw("no messages to refund in tipset") log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil return nil
} }
@ -294,15 +418,15 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS
refundValue := refunds.GetRefund(maddr) refundValue := refunds.GetRefund(maddr)
// We want to try and ensure these messages get mined quickly // We want to try and ensure these messages get mined quickly
gasPremium, err := api.GasEstimateGasPremium(ctx, 0, wallet, 0, tipset.Key()) gasPremium, err := r.api.GasEstimateGasPremium(ctx, 0, r.wallet, 0, tipset.Key())
if err != nil { if err != nil {
log.Warnw("failed to estimate gas premium", "err", err) log.Warnw("failed to estimate gas premium", "err", err, "height", tipset.Height(), "key", tipset.Key())
continue continue
} }
msg := &types.Message{ msg := &types.Message{
Value: refundValue, Value: refundValue,
From: wallet, From: r.wallet,
To: maddr, To: maddr,
GasPremium: gasPremium, GasPremium: gasPremium,
@ -312,32 +436,34 @@ func ProcessTipset(ctx context.Context, api processTipSetApi, tipset *types.TipS
messages = append(messages, msg) messages = append(messages, msg)
} }
balance, err := api.WalletBalance(ctx, wallet) balance, err := r.api.WalletBalance(ctx, r.wallet)
if err != nil { if err != nil {
log.Errorw("failed to get wallet balance", "err", err, "height", tipset.Height(), "key", tipset.Key())
return xerrors.Errorf("failed to get wallet balance :%w", err) return xerrors.Errorf("failed to get wallet balance :%w", err)
} }
// Calculate the minimum balance as the total refund we need to issue plus 5% to cover fees // Calculate the minimum balance as the total refund we need to issue plus 5% to cover fees
minBalance := types.BigAdd(refundSum, types.BigDiv(refundSum, types.NewInt(500))) minBalance := types.BigAdd(refundSum, types.BigDiv(refundSum, types.NewInt(500)))
if balance.LessThan(minBalance) { if balance.LessThan(minBalance) {
log.Errorw("not sufficent funds to cover refunds", "balance", balance.String(), "refund_sum", refundSum.String(), "minimum_required", minBalance.String()) log.Errorw("not sufficent funds to cover refunds", "balance", balance, "refund_sum", refundSum, "minimum_required", minBalance)
return xerrors.Errorf("wallet does not have enough balance to cover refund") return xerrors.Errorf("wallet does not have enough balance to cover refund")
} }
failures := 0 failures := 0
refundSum.SetUint64(0) refundSum.SetUint64(0)
for _, msg := range messages { for _, msg := range messages {
if _, err = api.MpoolPushMessage(ctx, msg, nil); err != nil { if !r.dryRun {
log.Errorw("failed to MpoolPushMessage", "err", err, "msg", msg) if _, err = r.api.MpoolPushMessage(ctx, msg, nil); err != nil {
failures = failures + 1 log.Errorw("failed to MpoolPushMessage", "err", err, "msg", msg)
continue failures = failures + 1
continue
}
} }
refundSum = types.BigAdd(refundSum, msg.Value) refundSum = types.BigAdd(refundSum, msg.Value)
} }
log.Infow("tipset stats", "messages_sent", len(messages)-failures, "refund_sum", refundSum.String(), "messages_failures", failures) log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
return nil return nil
} }