Merge remote-tracking branch 'origin/master' into feat/backup

This commit is contained in:
Łukasz Magiera 2020-10-03 00:08:23 +02:00
commit 6f33706025
21 changed files with 1381 additions and 297 deletions

View File

@ -192,6 +192,9 @@ type FullNode interface {
// MpoolPush pushes a signed message to mempool.
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
// MpoolPushUntrusted pushes a signed message to mempool from untrusted sources.
MpoolPushUntrusted(context.Context, *types.SignedMessage) (cid.Cid, error)
// MpoolPushMessage atomically assigns a nonce, signs, and pushes a message
// to mempool.
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified

View File

@ -123,6 +123,8 @@ type FullNodeStruct struct {
MpoolClear func(context.Context, bool) error `perm:"write"`
MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushUntrusted func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"`
@ -557,6 +559,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
return c.Internal.MpoolPush(ctx, smsg)
}
func (c *FullNodeStruct) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return c.Internal.MpoolPushUntrusted(ctx, smsg)
}
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
return c.Internal.MpoolPushMessage(ctx, msg, spec)
}

View File

@ -67,7 +67,7 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal(err)
}
channelAmt := int64(100000)
channelAmt := int64(7000)
channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt))
if err != nil {
t.Fatal(err)
@ -169,6 +169,51 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) {
t.Fatal("Timed out waiting for receiver to submit vouchers")
}
// Create a new voucher now that some vouchers have already been submitted
vouchRes, err := paymentCreator.PaychVoucherCreate(ctx, channel, abi.NewTokenAmount(1000), 3)
if err != nil {
t.Fatal(err)
}
if vouchRes.Voucher == nil {
t.Fatal(fmt.Errorf("Not enough funds to create voucher: missing %d", vouchRes.Shortfall))
}
vdelta, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouchRes.Voucher, nil, abi.NewTokenAmount(1000))
if err != nil {
t.Fatal(err)
}
if !vdelta.Equals(abi.NewTokenAmount(1000)) {
t.Fatal("voucher didn't have the right amount")
}
// Create a new voucher whose value would exceed the channel balance
excessAmt := abi.NewTokenAmount(1000)
vouchRes, err = paymentCreator.PaychVoucherCreate(ctx, channel, excessAmt, 4)
if err != nil {
t.Fatal(err)
}
if vouchRes.Voucher != nil {
t.Fatal("Expected not to be able to create voucher whose value would exceed channel balance")
}
if !vouchRes.Shortfall.Equals(excessAmt) {
t.Fatal(fmt.Errorf("Expected voucher shortfall of %d, got %d", excessAmt, vouchRes.Shortfall))
}
// Add a voucher whose value would exceed the channel balance
vouch := &paych.SignedVoucher{ChannelAddr: channel, Amount: excessAmt, Lane: 4, Nonce: 1}
vb, err := vouch.SigningBytes()
if err != nil {
t.Fatal(err)
}
sig, err := paymentCreator.WalletSign(ctx, createrAddr, vb)
if err != nil {
t.Fatal(err)
}
vouch.Signature = sig
_, err = paymentReceiver.PaychVoucherAdd(ctx, channel, vouch, nil, abi.NewTokenAmount(1000))
if err == nil {
t.Fatal(fmt.Errorf("Expected shortfall error of %d", excessAmt))
}
// wait for the settlement period to pass before collecting
waitForBlocks(ctx, t, bm, paymentReceiver, receiverAddr, paych.SettleDelay)

View File

@ -27,10 +27,9 @@ type mockLaneState struct {
func NewMockPayChState(from address.Address,
to address.Address,
settlingAt abi.ChainEpoch,
toSend abi.TokenAmount,
lanes map[uint64]paych.LaneState,
) paych.State {
return &mockState{from, to, settlingAt, toSend, lanes}
return &mockState{from: from, to: to, settlingAt: settlingAt, toSend: big.NewInt(0), lanes: lanes}
}
// NewMockLaneState constructs a state for a payment channel lane with the set fixed values

View File

@ -55,6 +55,7 @@ var baseFeeLowerBoundFactor = types.NewInt(10)
var baseFeeLowerBoundFactorConservative = types.NewInt(100)
var MaxActorPendingMessages = 1000
var MaxUntrustedActorPendingMessages = 10
var MaxNonceGap = uint64(4)
@ -195,9 +196,17 @@ func CapGasFee(msg *types.Message, maxFee abi.TokenAmount) {
msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap
}
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) {
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted bool) (bool, error) {
nextNonce := ms.nextNonce
nonceGap := false
maxNonceGap := MaxNonceGap
maxActorPendingMessages := MaxActorPendingMessages
if untrusted {
maxNonceGap = 0
maxActorPendingMessages = MaxUntrustedActorPendingMessages
}
switch {
case m.Message.Nonce == nextNonce:
nextNonce++
@ -206,7 +215,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
nextNonce++
}
case strict && m.Message.Nonce > nextNonce+MaxNonceGap:
case strict && m.Message.Nonce > nextNonce+maxNonceGap:
return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)
case m.Message.Nonce > nextNonce:
@ -242,7 +251,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
}
if !has && strict && len(ms.msgs) > MaxActorPendingMessages {
if !has && strict && len(ms.msgs) >= maxActorPendingMessages {
log.Errorf("too many pending messages from actor %s", m.Message.From)
return false, ErrTooManyPendingMessages
}
@ -484,7 +493,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
}
mp.curTsLk.Lock()
publish, err := mp.addTs(m, mp.curTs, true)
publish, err := mp.addTs(m, mp.curTs, true, false)
if err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
@ -551,7 +560,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
_, err = mp.addTs(m, mp.curTs, false)
_, err = mp.addTs(m, mp.curTs, false, false)
return err
}
@ -619,7 +628,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet)
return nil
}
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) {
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) {
snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil {
return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
@ -641,7 +650,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local
return false, err
}
return publish, mp.addLocked(m, !local)
return publish, mp.addLocked(m, !local, untrusted)
}
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
@ -676,17 +685,17 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
return err
}
return mp.addLocked(m, false)
return mp.addLocked(m, false, false)
}
func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.addLocked(m, false)
return mp.addLocked(m, false, false)
}
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error {
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
if m.Signature.Type == crypto.SigTypeBLS {
mp.blsSigCache.Add(m.Cid(), m.Signature)
@ -713,7 +722,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
mp.pending[m.Message.From] = mset
}
incr, err := mset.add(m, mp, strict)
incr, err := mset.add(m, mp, strict, untrusted)
if err != nil {
log.Debug(err)
return err
@ -793,6 +802,50 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (
return act.Balance, nil
}
// this method is provided for the gateway to push messages.
// differences from Push:
// - strict checks are enabled
// - extra strict add checks are used when adding the messages to the msgSet
// that means: no nonce gaps, at most 10 pending messages for the actor
func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
err := mp.checkMessage(m)
if err != nil {
return cid.Undef, err
}
// serialize push access to reduce lock contention
mp.addSema <- struct{}{}
defer func() {
<-mp.addSema
}()
msgb, err := m.Serialize()
if err != nil {
return cid.Undef, err
}
mp.curTsLk.Lock()
publish, err := mp.addTs(m, mp.curTs, false, true)
if err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
}
mp.curTsLk.Unlock()
mp.lk.Lock()
if err := mp.addLocal(m, msgb); err != nil {
mp.lk.Unlock()
return cid.Undef, err
}
mp.lk.Unlock()
if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}
return m.Cid(), err
}
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
mp.lk.Lock()
defer mp.lk.Unlock()

View File

@ -1191,13 +1191,6 @@ func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.
}
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
seen := cid.NewSet()
walked := cid.NewSet()
h := &car.CarHeader{
Roots: ts.Cids(),
Version: 1,
@ -1207,6 +1200,28 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return xerrors.Errorf("failed to write car header: %s", err)
}
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error {
blk, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}
if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}
return nil
})
}
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
seen := cid.NewSet()
walked := cid.NewSet()
blocksToWalk := ts.Cids()
currentMinHeight := ts.Height()
@ -1215,15 +1230,15 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return nil
}
if err := cb(blk); err != nil {
return err
}
data, err := cs.bs.Get(blk)
if err != nil {
return xerrors.Errorf("getting block: %w", err)
}
if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
@ -1270,14 +1285,11 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
if c.Prefix().Codec != cid.DagCBOR {
continue
}
data, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car (get %s): %w", c, err)
if err := cb(c); err != nil {
return err
}
if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write out car object: %w", err)
}
}
}

View File

@ -12,13 +12,12 @@ import (
"text/tabwriter"
"time"
"github.com/filecoin-project/specs-actors/actors/builtin"
tm "github.com/buger/goterm"
"github.com/docker/go-units"
"github.com/fatih/color"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer"
@ -476,6 +475,7 @@ func interactiveDeal(cctx *cli.Context) error {
var ask storagemarket.StorageAsk
var epochPrice big.Int
var epochs abi.ChainEpoch
var verified bool
var a address.Address
if from := cctx.String("from"); from != "" {
@ -572,6 +572,53 @@ func interactiveDeal(cctx *cli.Context) error {
ask = *a
// TODO: run more validation
state = "verified"
case "verified":
ts, err := api.ChainHead(ctx)
if err != nil {
return err
}
dcap, err := api.StateVerifiedClientStatus(ctx, a, ts.Key())
if err != nil {
return err
}
if dcap == nil {
state = "confirm"
continue
}
color.Blue(".. checking verified deal eligibility\n")
ds, err := api.ClientDealSize(ctx, data)
if err != nil {
return err
}
if dcap.Uint64() < uint64(ds.PieceSize) {
color.Yellow(".. not enough DataCap available for a verified deal\n")
state = "confirm"
continue
}
fmt.Print("\nMake this a verified deal? (yes/no): ")
var yn string
_, err = fmt.Scan(&yn)
if err != nil {
return err
}
switch yn {
case "yes":
verified = true
case "no":
verified = false
default:
fmt.Println("Type in full 'yes' or 'no'")
continue
}
state = "confirm"
case "confirm":
fromBal, err := api.WalletBalance(ctx, a)
@ -590,10 +637,15 @@ func interactiveDeal(cctx *cli.Context) error {
epochs = abi.ChainEpoch(dur / (time.Duration(build.BlockDelaySecs) * time.Second))
// TODO: do some more or epochs math (round to miner PP, deal start buffer)
pricePerGib := ask.Price
if verified {
pricePerGib = ask.VerifiedPrice
}
gib := types.NewInt(1 << 30)
// TODO: price is based on PaddedPieceSize, right?
epochPrice = types.BigDiv(types.BigMul(ask.Price, types.NewInt(uint64(ds.PieceSize))), gib)
epochPrice = types.BigDiv(types.BigMul(pricePerGib, types.NewInt(uint64(ds.PieceSize))), gib)
totalPrice := types.BigMul(epochPrice, types.NewInt(uint64(epochs)))
fmt.Printf("-----\n")
@ -603,6 +655,7 @@ func interactiveDeal(cctx *cli.Context) error {
fmt.Printf("Piece size: %s (Payload size: %s)\n", units.BytesSize(float64(ds.PieceSize)), units.BytesSize(float64(ds.PayloadSize)))
fmt.Printf("Duration: %s\n", dur)
fmt.Printf("Total price: ~%s (%s per epoch)\n", types.FIL(totalPrice), types.FIL(epochPrice))
fmt.Printf("Verified: %v\n", verified)
state = "accept"
case "accept":
@ -637,7 +690,7 @@ func interactiveDeal(cctx *cli.Context) error {
MinBlocksDuration: uint64(epochs),
DealStartEpoch: abi.ChainEpoch(cctx.Int64("start-epoch")),
FastRetrieval: cctx.Bool("fast-retrieval"),
VerifiedDeal: false, // TODO: Allow setting
VerifiedDeal: verified,
})
if err != nil {
return err

View File

@ -7,6 +7,7 @@ import (
"sort"
"strconv"
cid "github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
@ -43,6 +44,10 @@ var mpoolPending = &cli.Command{
Name: "local",
Usage: "print pending messages for addresses in local wallet only",
},
&cli.BoolFlag{
Name: "cids",
Usage: "only print cids of messages in output",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
@ -79,12 +84,16 @@ var mpoolPending = &cli.Command{
}
}
if cctx.Bool("cids") {
fmt.Println(msg.Cid())
} else {
out, err := json.MarshalIndent(msg, "", " ")
if err != nil {
return err
}
fmt.Println(string(out))
}
}
return nil
},
@ -308,21 +317,8 @@ var mpoolReplaceCmd = &cli.Command{
Usage: "Spend up to X FIL for this message (applicable for auto mode)",
},
},
ArgsUsage: "[from] [nonce]",
ArgsUsage: "<from nonce> | <message-cid>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 2 {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
from, err := address.NewFromString(cctx.Args().Get(0))
if err != nil {
return err
}
nonce, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return err
}
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
@ -332,6 +328,39 @@ var mpoolReplaceCmd = &cli.Command{
ctx := ReqContext(cctx)
var from address.Address
var nonce uint64
switch cctx.Args().Len() {
case 1:
mcid, err := cid.Decode(cctx.Args().First())
if err != nil {
return err
}
msg, err := api.ChainGetMessage(ctx, mcid)
if err != nil {
return fmt.Errorf("could not find referenced message: %w", err)
}
from = msg.From
nonce = msg.Nonce
case 2:
f, err := address.NewFromString(cctx.Args().Get(0))
if err != nil {
return err
}
n, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return err
}
from = f
nonce = n
default:
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
ts, err := api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting chain head: %w", err)

View File

@ -86,5 +86,5 @@ func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (ci
// TODO: additional anti-spam checks
return a.api.MpoolPush(ctx, sm)
return a.api.MpoolPushUntrusted(ctx, sm)
}

View File

@ -1,8 +1,10 @@
package main
import (
"bufio"
"bytes"
"context"
"encoding/csv"
"fmt"
"io/ioutil"
"net/http"
@ -25,11 +27,12 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
@ -42,6 +45,8 @@ var log = logging.Logger("main")
func main() {
local := []*cli.Command{
runCmd,
recoverMinersCmd,
findMinersCmd,
versionCmd,
}
@ -105,6 +110,186 @@ var versionCmd = &cli.Command{
},
}
var findMinersCmd = &cli.Command{
Name: "find-miners",
Usage: "find miners with a desired minimum balance",
Description: `Find miners returns a list of miners and their balances that are below a
threhold value. By default only the miner actor available balance is considered but other
account balances can be included by enabling them through the flags.
Examples
Find all miners with an available balance below 100 FIL
lotus-pcr find-miners --threshold 100
Find all miners with a balance below zero, which includes the owner and worker balances
lotus-pcr find-miners --threshold 0 --owner --worker
`,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
},
&cli.IntFlag{
Name: "threshold",
EnvVars: []string{"LOTUS_PCR_THRESHOLD"},
Usage: "balance below this limit will be printed",
Value: 0,
},
&cli.BoolFlag{
Name: "owner",
Usage: "include owner balance",
Value: false,
},
&cli.BoolFlag{
Name: "worker",
Usage: "include worker balance",
Value: false,
},
&cli.BoolFlag{
Name: "control",
Usage: "include control balance",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path"))
if err != nil {
log.Fatal(err)
}
defer closer()
if !cctx.Bool("no-sync") {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
owner := cctx.Bool("owner")
worker := cctx.Bool("worker")
control := cctx.Bool("control")
threshold := uint64(cctx.Int("threshold"))
rf := &refunder{
api: api,
threshold: types.FromFil(threshold),
}
refundTipset, err := api.ChainHead(ctx)
if err != nil {
return err
}
balanceRefund, err := rf.FindMiners(ctx, refundTipset, NewMinersRefund(), owner, worker, control)
if err != nil {
return err
}
for _, maddr := range balanceRefund.Miners() {
fmt.Printf("%s\t%s\n", maddr, types.FIL(balanceRefund.GetRefund(maddr)))
}
return nil
},
}
var recoverMinersCmd = &cli.Command{
Name: "recover-miners",
Usage: "Ensure all miners with a negative available balance have a FIL surplus across accounts",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "from",
EnvVars: []string{"LOTUS_PCR_FROM"},
Usage: "wallet address to send refund from",
},
&cli.BoolFlag{
Name: "no-sync",
EnvVars: []string{"LOTUS_PCR_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
},
&cli.BoolFlag{
Name: "dry-run",
EnvVars: []string{"LOTUS_PCR_DRY_RUN"},
Usage: "do not send any messages",
Value: false,
},
&cli.StringFlag{
Name: "output",
Usage: "dump data as a csv format to this file",
},
&cli.IntFlag{
Name: "miner-recovery-cutoff",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"},
Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied",
Value: 3000,
},
&cli.IntFlag{
Name: "miner-recovery-bonus",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"},
Usage: "additional FIL to send to each miner",
Value: 5,
},
&cli.IntFlag{
Name: "miner-recovery-refund-percent",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"},
Usage: "percent of refund to issue",
Value: 110,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path"))
if err != nil {
log.Fatal(err)
}
defer closer()
from, err := address.NewFromString(cctx.String("from"))
if err != nil {
return xerrors.Errorf("parsing source address (provide correct --from flag!): %w", err)
}
if !cctx.Bool("no-sync") {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
dryRun := cctx.Bool("dry-run")
minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent")
minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff"))
minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus"))
rf := &refunder{
api: api,
wallet: from,
dryRun: dryRun,
minerRecoveryRefundPercent: minerRecoveryRefundPercent,
minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff),
minerRecoveryBonus: types.FromFil(minerRecoveryBonus),
}
refundTipset, err := api.ChainHead(ctx)
if err != nil {
return err
}
balanceRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund(), cctx.String("output"))
if err != nil {
return err
}
if err := rf.Refund(ctx, "refund to recover miner", refundTipset, balanceRefund, 0); err != nil {
return err
}
return nil
},
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start message reimpursement",
@ -120,10 +305,10 @@ var runCmd = &cli.Command{
Usage: "do not wait for chain sync to complete",
},
&cli.IntFlag{
Name: "percent-extra",
EnvVars: []string{"LOTUS_PCR_PERCENT_EXTRA"},
Usage: "extra funds to send above the refund",
Value: 3,
Name: "refund-percent",
EnvVars: []string{"LOTUS_PCR_REFUND_PERCENT"},
Usage: "percent of refund to issue",
Value: 103,
},
&cli.IntFlag{
Name: "max-message-queue",
@ -161,6 +346,36 @@ var runCmd = &cli.Command{
Usage: "the number of tipsets to delay message processing to smooth chain reorgs",
Value: int(build.MessageConfidence),
},
&cli.BoolFlag{
Name: "miner-recovery",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY"},
Usage: "run the miner recovery job",
Value: false,
},
&cli.IntFlag{
Name: "miner-recovery-period",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_PERIOD"},
Usage: "interval between running miner recovery",
Value: 2880,
},
&cli.IntFlag{
Name: "miner-recovery-cutoff",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"},
Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied",
Value: 3000,
},
&cli.IntFlag{
Name: "miner-recovery-bonus",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"},
Usage: "additional FIL to send to each miner",
Value: 5,
},
&cli.IntFlag{
Name: "miner-recovery-refund-percent",
EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"},
Usage: "percent of refund to issue",
Value: 110,
},
},
Action: func(cctx *cli.Context) error {
go func() {
@ -199,17 +414,25 @@ var runCmd = &cli.Command{
log.Fatal(err)
}
percentExtra := cctx.Int("percent-extra")
refundPercent := cctx.Int("refund-percent")
maxMessageQueue := cctx.Int("max-message-queue")
dryRun := cctx.Bool("dry-run")
preCommitEnabled := cctx.Bool("pre-commit")
proveCommitEnabled := cctx.Bool("prove-commit")
aggregateTipsets := cctx.Int("aggregate-tipsets")
minerRecoveryEnabled := cctx.Bool("miner-recovery")
minerRecoveryPeriod := abi.ChainEpoch(int64(cctx.Int("miner-recovery-period")))
minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent")
minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff"))
minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus"))
rf := &refunder{
api: api,
wallet: from,
percentExtra: percentExtra,
refundPercent: refundPercent,
minerRecoveryRefundPercent: minerRecoveryRefundPercent,
minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff),
minerRecoveryBonus: types.FromFil(minerRecoveryBonus),
dryRun: dryRun,
preCommitEnabled: preCommitEnabled,
proveCommitEnabled: proveCommitEnabled,
@ -217,6 +440,7 @@ var runCmd = &cli.Command{
var refunds *MinersRefund = NewMinersRefund()
var rounds int = 0
nextMinerRecovery := r.MinerRecoveryHeight() + minerRecoveryPeriod
for tipset := range tipsetsCh {
refunds, err = rf.ProcessTipset(ctx, tipset, refunds)
@ -224,17 +448,34 @@ var runCmd = &cli.Command{
return err
}
rounds = rounds + 1
if rounds < aggregateTipsets {
continue
}
refundTipset, err := api.ChainHead(ctx)
if err != nil {
return err
}
if err := rf.Refund(ctx, refundTipset, refunds, rounds); err != nil {
if minerRecoveryEnabled && refundTipset.Height() >= nextMinerRecovery {
recoveryRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund(), "")
if err != nil {
return err
}
if err := rf.Refund(ctx, "refund to recover miners", refundTipset, recoveryRefund, 0); err != nil {
return err
}
if err := r.SetMinerRecoveryHeight(tipset.Height()); err != nil {
return err
}
nextMinerRecovery = r.MinerRecoveryHeight() + minerRecoveryPeriod
}
rounds = rounds + 1
if rounds < aggregateTipsets {
continue
}
if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil {
return err
}
@ -293,7 +534,6 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) {
m.count = m.count + 1
m.totalRefunds = types.BigAdd(m.totalRefunds, value)
m.refunds[addr] = types.BigAdd(m.refunds[addr], value)
}
@ -322,8 +562,14 @@ type refunderNodeApi interface {
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error)
ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error)
StateMinerSectors(ctx context.Context, addr address.Address, filter *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
StateMinerFaults(ctx context.Context, addr address.Address, tsk types.TipSetKey) (bitfield.BitField, error)
StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error)
StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
@ -334,10 +580,239 @@ type refunderNodeApi interface {
type refunder struct {
api refunderNodeApi
wallet address.Address
percentExtra int
refundPercent int
minerRecoveryRefundPercent int
minerRecoveryCutoff big.Int
minerRecoveryBonus big.Int
dryRun bool
preCommitEnabled bool
proveCommitEnabled bool
threshold big.Int
}
func (r *refunder) FindMiners(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, owner, worker, control bool) (*MinersRefund, error) {
miners, err := r.api.StateListMiners(ctx, tipset.Key())
if err != nil {
return nil, err
}
for _, maddr := range miners {
mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if !mact.Balance.GreaterThan(big.Zero()) {
continue
}
minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
// Look up and find all addresses associated with the miner
minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
allAddresses := []address.Address{}
if worker {
allAddresses = append(allAddresses, minerInfo.Worker)
}
if owner {
allAddresses = append(allAddresses, minerInfo.Owner)
}
if control {
allAddresses = append(allAddresses, minerInfo.ControlAddresses...)
}
// Sum the balancer of all the addresses
addrSum := big.Zero()
addrCheck := make(map[address.Address]struct{}, len(allAddresses))
for _, addr := range allAddresses {
if _, found := addrCheck[addr]; !found {
balance, err := r.api.WalletBalance(ctx, addr)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
addrSum = big.Add(addrSum, balance)
addrCheck[addr] = struct{}{}
}
}
totalAvailableBalance := big.Add(addrSum, minerAvailableBalance)
if totalAvailableBalance.GreaterThanEqual(r.threshold) {
continue
}
refunds.Track(maddr, totalAvailableBalance)
log.Debugw("processing miner", "miner", maddr, "sectors", "available_balance", totalAvailableBalance)
}
return refunds, nil
}
func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, output string) (*MinersRefund, error) {
miners, err := r.api.StateListMiners(ctx, tipset.Key())
if err != nil {
return nil, err
}
w := ioutil.Discard
if len(output) != 0 {
f, err := os.Create(output)
if err != nil {
return nil, err
}
defer f.Close() // nolint:errcheck
w = bufio.NewWriter(f)
}
csvOut := csv.NewWriter(w)
defer csvOut.Flush()
if err := csvOut.Write([]string{"MinerID", "FaultedSectors", "AvailableBalance", "ProposedRefund"}); err != nil {
return nil, err
}
for _, maddr := range miners {
mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if !mact.Balance.GreaterThan(big.Zero()) {
continue
}
minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
// Look up and find all addresses associated with the miner
minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
allAddresses := []address.Address{minerInfo.Worker, minerInfo.Owner}
allAddresses = append(allAddresses, minerInfo.ControlAddresses...)
// Sum the balancer of all the addresses
addrSum := big.Zero()
addrCheck := make(map[address.Address]struct{}, len(allAddresses))
for _, addr := range allAddresses {
if _, found := addrCheck[addr]; !found {
balance, err := r.api.WalletBalance(ctx, addr)
if err != nil {
log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
addrSum = big.Add(addrSum, balance)
addrCheck[addr] = struct{}{}
}
}
faults, err := r.api.StateMinerFaults(ctx, maddr, tipset.Key())
if err != nil {
log.Errorw("failed to look up miner faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
faultsCount, err := faults.Count()
if err != nil {
log.Errorw("failed to get count of faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
if faultsCount == 0 {
log.Debugw("skipping miner with zero faults", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr)
continue
}
totalAvailableBalance := big.Add(addrSum, minerAvailableBalance)
balanceCutoff := big.Mul(big.Div(big.NewIntUnsigned(faultsCount), big.NewInt(10)), big.NewIntUnsigned(build.FilecoinPrecision))
if totalAvailableBalance.GreaterThan(balanceCutoff) {
log.Debugw(
"skipping over miner with total available balance larger than refund",
"height", tipset.Height(),
"key", tipset.Key(),
"miner", maddr,
"available_balance", totalAvailableBalance,
"balance_cutoff", balanceCutoff,
"faults_count", faultsCount,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
continue
}
refundValue := big.Sub(balanceCutoff, totalAvailableBalance)
if r.minerRecoveryRefundPercent > 0 {
refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.minerRecoveryRefundPercent)))
}
refundValue = big.Add(refundValue, r.minerRecoveryBonus)
if refundValue.GreaterThan(r.minerRecoveryCutoff) {
log.Infow(
"skipping over miner with refund greater than refund cutoff",
"height", tipset.Height(),
"key", tipset.Key(),
"miner", maddr,
"available_balance", totalAvailableBalance,
"balance_cutoff", balanceCutoff,
"faults_count", faultsCount,
"refund", refundValue,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
continue
}
refunds.Track(maddr, refundValue)
record := []string{
maddr.String(),
fmt.Sprintf("%d", faultsCount),
big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).String(),
big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).String(),
}
if err := csvOut.Write(record); err != nil {
return nil, err
}
log.Debugw(
"processing miner",
"miner", maddr,
"faults_count", faultsCount,
"available_balance", totalAvailableBalance,
"refund", refundValue,
"available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
}
return refunds, nil
}
func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) {
@ -458,22 +933,41 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refu
continue
}
if r.percentExtra > 0 {
refundValue = types.BigAdd(refundValue, types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.percentExtra))))
if r.refundPercent > 0 {
refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.refundPercent)))
}
log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue)
log.Debugw(
"processing message",
"method", messageMethod,
"cid", msg.Cid,
"from", m.From,
"to", m.To,
"value", m.Value,
"gas_fee_cap", m.GasFeeCap,
"gas_premium", m.GasPremium,
"gas_used", recps[i].GasUsed,
"refund", refundValue,
"refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
)
refunds.Track(m.From, refundValue)
tipsetRefunds.Track(m.From, refundValue)
}
log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "total_refunds", tipsetRefunds.TotalRefunds(), "messages_processed", tipsetRefunds.Count())
log.Infow(
"tipset stats",
"height", tipset.Height(),
"key", tipset.Key(),
"total_refunds", tipsetRefunds.TotalRefunds(),
"total_refunds_fil", big.Div(tipsetRefunds.TotalRefunds(), big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"messages_processed", tipsetRefunds.Count(),
)
return refunds, nil
}
func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error {
func (r *refunder) Refund(ctx context.Context, name string, tipset *types.TipSet, refunds *MinersRefund, rounds int) error {
if refunds.Count() == 0 {
log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key())
return nil
@ -531,12 +1025,23 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi
refundSum = types.BigAdd(refundSum, msg.Value)
}
log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count())
log.Infow(
name,
"tipsets_processed", rounds,
"height", tipset.Height(),
"key", tipset.Key(),
"messages_sent", len(messages)-failures,
"refund_sum", refundSum,
"refund_sum_fil", big.Div(refundSum, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(),
"messages_failures", failures,
"messages_processed", refunds.Count(),
)
return nil
}
type Repo struct {
last abi.ChainEpoch
lastHeight abi.ChainEpoch
lastMinerRecoveryHeight abi.ChainEpoch
path string
}
@ -547,7 +1052,8 @@ func NewRepo(path string) (*Repo, error) {
}
return &Repo{
last: 0,
lastHeight: 0,
lastMinerRecoveryHeight: 0,
path: path,
}, nil
}
@ -579,43 +1085,66 @@ func (r *Repo) init() error {
return nil
}
func (r *Repo) Open() (err error) {
if err = r.init(); err != nil {
return
func (r *Repo) Open() error {
if err := r.init(); err != nil {
return err
}
var f *os.File
if err := r.loadHeight(); err != nil {
return err
}
f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR|os.O_CREATE, 0644)
if err := r.loadMinerRecoveryHeight(); err != nil {
return err
}
return nil
}
func loadChainEpoch(fn string) (abi.ChainEpoch, error) {
f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return
return 0, err
}
defer func() {
err = f.Close()
}()
var raw []byte
raw, err = ioutil.ReadAll(f)
raw, err := ioutil.ReadAll(f)
if err != nil {
return
return 0, err
}
height, err := strconv.Atoi(string(bytes.TrimSpace(raw)))
if err != nil {
return
return 0, err
}
r.last = abi.ChainEpoch(height)
return
return abi.ChainEpoch(height), nil
}
func (r *Repo) loadHeight() error {
var err error
r.lastHeight, err = loadChainEpoch(filepath.Join(r.path, "height"))
return err
}
func (r *Repo) loadMinerRecoveryHeight() error {
var err error
r.lastMinerRecoveryHeight, err = loadChainEpoch(filepath.Join(r.path, "miner_recovery_height"))
return err
}
func (r *Repo) Height() abi.ChainEpoch {
return r.last
return r.lastHeight
}
func (r *Repo) MinerRecoveryHeight() abi.ChainEpoch {
return r.lastMinerRecoveryHeight
}
func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) {
r.last = last
r.lastHeight = last
var f *os.File
f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR, 0644)
if err != nil {
@ -626,7 +1155,26 @@ func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) {
err = f.Close()
}()
if _, err = fmt.Fprintf(f, "%d", r.last); err != nil {
if _, err = fmt.Fprintf(f, "%d", r.lastHeight); err != nil {
return
}
return
}
func (r *Repo) SetMinerRecoveryHeight(last abi.ChainEpoch) (err error) {
r.lastMinerRecoveryHeight = last
var f *os.File
f, err = os.OpenFile(filepath.Join(r.path, "miner_recovery_height"), os.O_RDWR, 0644)
if err != nil {
return
}
defer func() {
err = f.Close()
}()
if _, err = fmt.Fprintf(f, "%d", r.lastMinerRecoveryHeight); err != nil {
return
}

View File

@ -39,6 +39,7 @@ func main() {
consensusCmd,
serveDealStatsCmd,
syncCmd,
stateTreePruneCmd,
datastoreCmd,
}

289
cmd/lotus-shed/pruning.go Normal file
View File

@ -0,0 +1,289 @@
package main
import (
"context"
"fmt"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo"
"github.com/ipfs/bbloom"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)
type cidSet interface {
Add(cid.Cid)
Has(cid.Cid) bool
HasRaw([]byte) bool
Len() int
}
type bloomSet struct {
bloom *bbloom.Bloom
}
func newBloomSet(size int64) (*bloomSet, error) {
b, err := bbloom.New(float64(size), 3)
if err != nil {
return nil, err
}
return &bloomSet{bloom: b}, nil
}
func (bs *bloomSet) Add(c cid.Cid) {
bs.bloom.Add(c.Hash())
}
func (bs *bloomSet) Has(c cid.Cid) bool {
return bs.bloom.Has(c.Hash())
}
func (bs *bloomSet) HasRaw(b []byte) bool {
return bs.bloom.Has(b)
}
func (bs *bloomSet) Len() int {
return int(bs.bloom.ElementsAdded())
}
type mapSet struct {
m map[string]struct{}
}
func newMapSet() *mapSet {
return &mapSet{m: make(map[string]struct{})}
}
func (bs *mapSet) Add(c cid.Cid) {
bs.m[string(c.Hash())] = struct{}{}
}
func (bs *mapSet) Has(c cid.Cid) bool {
_, ok := bs.m[string(c.Hash())]
return ok
}
func (bs *mapSet) HasRaw(b []byte) bool {
_, ok := bs.m[string(b)]
return ok
}
func (bs *mapSet) Len() int {
return len(bs.m)
}
var stateTreePruneCmd = &cli.Command{
Name: "state-prune",
Description: "Deletes old state root data from local chainstore",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
&cli.Int64Flag{
Name: "keep-from-lookback",
Usage: "keep stateroots at or newer than the current height minus this lookback",
Value: 1800, // 2 x finality
},
&cli.IntFlag{
Name: "delete-up-to",
Usage: "delete up to the given number of objects (used to run a faster 'partial' sync)",
},
&cli.BoolFlag{
Name: "use-bloom-set",
Usage: "use a bloom filter for the 'good' set instead of a map, reduces memory usage but may not clean up as much",
},
&cli.BoolFlag{
Name: "dry-run",
Usage: "only enumerate the good set, don't do any deletions",
},
&cli.BoolFlag{
Name: "only-ds-gc",
Usage: "Only run datastore GC",
},
&cli.IntFlag{
Name: "gc-count",
Usage: "number of times to run gc",
Value: 20,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.TODO()
fsrepo, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
if err != nil {
return err
}
defer lkrepo.Close() //nolint:errcheck
ds, err := lkrepo.Datastore("/chain")
if err != nil {
return err
}
defer ds.Close() //nolint:errcheck
mds, err := lkrepo.Datastore("/metadata")
if err != nil {
return err
}
defer mds.Close() //nolint:errcheck
if cctx.Bool("only-ds-gc") {
gcds, ok := ds.(datastore.GCDatastore)
if ok {
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}
fmt.Println("gc complete!")
return nil
}
return fmt.Errorf("datastore doesnt support gc")
}
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}
var goodSet cidSet
if cctx.Bool("use-bloom-set") {
bset, err := newBloomSet(10000000)
if err != nil {
return err
}
goodSet = bset
} else {
goodSet = newMapSet()
}
ts := cs.GetHeaviestTipSet()
rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback"))
if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error {
if goodSet.Len()%20 == 0 {
fmt.Printf("\renumerating keep set: %d ", goodSet.Len())
}
goodSet.Add(c)
return nil
}); err != nil {
return fmt.Errorf("snapshot walk failed: %w", err)
}
fmt.Println()
fmt.Printf("Successfully marked keep set! (%d objects)\n", goodSet.Len())
if cctx.Bool("dry-run") {
return nil
}
var b datastore.Batch
var batchCount int
markForRemoval := func(c cid.Cid) error {
if b == nil {
nb, err := ds.Batch()
if err != nil {
return fmt.Errorf("opening batch: %w", err)
}
b = nb
}
batchCount++
if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil {
return err
}
if batchCount > 100 {
if err := b.Commit(); err != nil {
return xerrors.Errorf("failed to commit batch deletes: %w", err)
}
b = nil
batchCount = 0
}
return nil
}
res, err := ds.Query(query.Query{KeysOnly: true})
if err != nil {
return xerrors.Errorf("failed to query datastore: %w", err)
}
dupTo := cctx.Int("delete-up-to")
var deleteCount int
var goodHits int
for {
v, ok := res.NextSync()
if !ok {
break
}
bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):]))
if err != nil {
return xerrors.Errorf("failed to parse key: %w", err)
}
if goodSet.HasRaw(bk) {
goodHits++
continue
}
nc := cid.NewCidV1(cid.Raw, bk)
deleteCount++
if err := markForRemoval(nc); err != nil {
return fmt.Errorf("failed to remove cid %s: %w", nc, err)
}
if deleteCount%20 == 0 {
fmt.Printf("\rdeleting %d objects (good hits: %d)... ", deleteCount, goodHits)
}
if dupTo != 0 && deleteCount > dupTo {
break
}
}
if b != nil {
if err := b.Commit(); err != nil {
return xerrors.Errorf("failed to commit final batch delete: %w", err)
}
}
gcds, ok := ds.(datastore.GCDatastore)
if ok {
fmt.Println("running datastore gc....")
for i := 0; i < cctx.Int("gc-count"); i++ {
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
}
fmt.Println("gc complete!")
}
return nil
},
}

View File

@ -74,6 +74,7 @@
* [MpoolPending](#MpoolPending)
* [MpoolPush](#MpoolPush)
* [MpoolPushMessage](#MpoolPushMessage)
* [MpoolPushUntrusted](#MpoolPushUntrusted)
* [MpoolSelect](#MpoolSelect)
* [MpoolSetConfig](#MpoolSetConfig)
* [MpoolSub](#MpoolSub)
@ -1802,6 +1803,43 @@ Response:
}
```
### MpoolPushUntrusted
MpoolPushUntrusted pushes a signed message to mempool from untrusted sources.
Perms: write
Inputs:
```json
[
{
"Message": {
"Version": 42,
"To": "t01234",
"From": "t01234",
"Nonce": 42,
"Value": "0",
"GasLimit": 9,
"GasFeeCap": "0",
"GasPremium": "0",
"Method": 1,
"Params": "Ynl0ZSBhcnJheQ=="
},
"Signature": {
"Type": 2,
"Data": "Ynl0ZSBhcnJheQ=="
}
}
]
```
Response:
```json
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
```
### MpoolSelect
MpoolSelect returns a list of pending messages for inclusion in the next block

1
go.mod
View File

@ -51,6 +51,7 @@ require (
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/golang-lru v0.5.4
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
github.com/ipfs/bbloom v0.0.4
github.com/ipfs/go-bitswap v0.2.20
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834

View File

@ -110,6 +110,10 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (ci
return a.Mpool.Push(smsg)
}
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return a.Mpool.PushUntrusted(smsg)
}
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
cp := *msg
msg = &cp

View File

@ -205,7 +205,7 @@ func (ca *channelAccessor) checkVoucherValidUnlocked(ctx context.Context, ch add
}
// Check the voucher against the highest known voucher nonce / value
laneStates, err := ca.laneState(ctx, pchState, ch)
laneStates, err := ca.laneState(pchState, ch)
if err != nil {
return nil, err
}
@ -253,16 +253,9 @@ func (ca *channelAccessor) checkVoucherValidUnlocked(ctx context.Context, ch add
return nil, err
}
// Total required balance = total redeemed + toSend
// Must not exceed actor balance
ts, err := pchState.ToSend()
if err != nil {
return nil, err
}
newTotal := types.BigAdd(totalRedeemed, ts)
if act.Balance.LessThan(newTotal) {
return nil, newErrInsufficientFunds(types.BigSub(newTotal, act.Balance))
// Total required balance must not exceed actor balance
if act.Balance.LessThan(totalRedeemed) {
return nil, newErrInsufficientFunds(types.BigSub(totalRedeemed, act.Balance))
}
if len(sv.Merges) != 0 {
@ -505,7 +498,6 @@ func (ca *channelAccessor) allocateLane(ch address.Address) (uint64, error) {
ca.lk.Lock()
defer ca.lk.Unlock()
// TODO: should this take into account lane state?
return ca.store.AllocateLane(ch)
}
@ -520,7 +512,7 @@ func (ca *channelAccessor) listVouchers(ctx context.Context, ch address.Address)
// laneState gets the LaneStates from chain, then applies all vouchers in
// the data store over the chain state
func (ca *channelAccessor) laneState(ctx context.Context, state paych.State, ch address.Address) (map[uint64]paych.LaneState, error) {
func (ca *channelAccessor) laneState(state paych.State, ch address.Address) (map[uint64]paych.LaneState, error) {
// TODO: we probably want to call UpdateChannelState with all vouchers to be fully correct
// (but technically dont't need to)
@ -552,9 +544,12 @@ func (ca *channelAccessor) laneState(ctx context.Context, state paych.State, ch
return nil, xerrors.Errorf("paych merges not handled yet")
}
// If there's a voucher for a lane that isn't in chain state just
// create it
// Check if there is an existing laneState in the payment channel
// for this voucher's lane
ls, ok := laneStates[v.Voucher.Lane]
// If the voucher does not have a higher nonce than the existing
// laneState for this lane, ignore it
if ok {
n, err := ls.Nonce()
if err != nil {
@ -565,6 +560,7 @@ func (ca *channelAccessor) laneState(ctx context.Context, state paych.State, ch
}
}
// Voucher has a higher nonce, so replace laneState with this voucher
laneStates[v.Voucher.Lane] = laneState{v.Voucher.Amount, v.Voucher.Nonce}
}

View File

@ -47,7 +47,6 @@ func TestCheckVoucherValid(t *testing.T) {
expectError bool
key []byte
actorBalance big.Int
toSend big.Int
voucherAmount big.Int
voucherLane uint64
voucherNonce uint64
@ -56,35 +55,30 @@ func TestCheckVoucherValid(t *testing.T) {
name: "passes when voucher amount < balance",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
}, {
name: "fails when funds too low",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(5),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(10),
}, {
name: "fails when invalid signature",
expectError: true,
key: randKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
}, {
name: "fails when signed by channel To account (instead of From account)",
expectError: true,
key: toKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
}, {
name: "fails when nonce too low",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 1,
voucherNonce: 2,
@ -95,7 +89,6 @@ func TestCheckVoucherValid(t *testing.T) {
name: "passes when nonce higher",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 1,
voucherNonce: 3,
@ -106,7 +99,6 @@ func TestCheckVoucherValid(t *testing.T) {
name: "passes when nonce for different lane",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 2,
voucherNonce: 2,
@ -118,32 +110,22 @@ func TestCheckVoucherValid(t *testing.T) {
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(0),
voucherAmount: big.NewInt(5),
voucherLane: 1,
voucherNonce: 3,
laneStates: map[uint64]paych.LaneState{
1: paychmock.NewMockLaneState(big.NewInt(6), 2),
},
}, {
name: "fails when voucher + ToSend > balance",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(9),
voucherAmount: big.NewInt(2),
}, {
// voucher supersedes lane 1 redeemed so
// lane 1 effective redeemed = voucher amount
//
// required balance = toSend + total redeemed
// = 1 + 6 (lane1)
// required balance = voucher amt
// = 7
// So required balance: 7 < actor balance: 10
name: "passes when voucher + total redeemed <= balance",
name: "passes when voucher total redeemed <= balance",
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(1),
voucherAmount: big.NewInt(6),
voucherLane: 1,
voucherNonce: 2,
@ -152,29 +134,68 @@ func TestCheckVoucherValid(t *testing.T) {
1: paychmock.NewMockLaneState(big.NewInt(4), 1),
},
}, {
// required balance = toSend + total redeemed
// = 1 + 4 (lane 2) + 6 (voucher lane 1)
// required balance = total redeemed
// = 6 (voucher lane 1) + 5 (lane 2)
// = 11
// So required balance: 11 > actor balance: 10
name: "fails when voucher + total redeemed > balance",
name: "fails when voucher total redeemed > balance",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
toSend: big.NewInt(1),
voucherAmount: big.NewInt(6),
voucherLane: 1,
voucherNonce: 1,
laneStates: map[uint64]paych.LaneState{
// Lane 2 (different from voucher lane 1)
2: paychmock.NewMockLaneState(big.NewInt(4), 1),
2: paychmock.NewMockLaneState(big.NewInt(5), 1),
},
}, {
// voucher supersedes lane 1 redeemed so
// lane 1 effective redeemed = voucher amount
//
// required balance = total redeemed
// = 6 (new voucher lane 1) + 5 (lane 2)
// = 11
// So required balance: 11 > actor balance: 10
name: "fails when voucher total redeemed > balance",
expectError: true,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
voucherAmount: big.NewInt(6),
voucherLane: 1,
voucherNonce: 2,
laneStates: map[uint64]paych.LaneState{
// Lane 1 (superseded by new voucher in voucher lane 1)
1: paychmock.NewMockLaneState(big.NewInt(5), 1),
// Lane 2 (different from voucher lane 1)
2: paychmock.NewMockLaneState(big.NewInt(5), 1),
},
}, {
// voucher supersedes lane 1 redeemed so
// lane 1 effective redeemed = voucher amount
//
// required balance = total redeemed
// = 5 (new voucher lane 1) + 5 (lane 2)
// = 10
// So required balance: 10 <= actor balance: 10
name: "passes when voucher total redeemed <= balance",
expectError: false,
key: fromKeyPrivate,
actorBalance: big.NewInt(10),
voucherAmount: big.NewInt(5),
voucherLane: 1,
voucherNonce: 2,
laneStates: map[uint64]paych.LaneState{
// Lane 1 (superseded by new voucher in voucher lane 1)
1: paychmock.NewMockLaneState(big.NewInt(4), 1),
// Lane 2 (different from voucher lane 1)
2: paychmock.NewMockLaneState(big.NewInt(5), 1),
},
}}
for _, tcase := range tcases {
tcase := tcase
t.Run(tcase.name, func(t *testing.T) {
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
// Create an actor for the channel with the test case balance
act := &types.Actor{
Code: builtin.AccountActorCodeID,
@ -184,16 +205,17 @@ func TestCheckVoucherValid(t *testing.T) {
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(
fromAcct, toAcct, abi.ChainEpoch(0), tcase.toSend, tcase.laneStates))
fromAcct, toAcct, abi.ChainEpoch(0), tcase.laneStates))
// Create a manager
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Add channel To address to wallet
mock.addWalletAddress(to)
// Create a signed voucher
// Create the test case signed voucher
sv := createTestVoucher(t, ch, tcase.voucherLane, tcase.voucherNonce, tcase.voucherAmount, tcase.key)
// Check the voucher's validity
@ -207,135 +229,11 @@ func TestCheckVoucherValid(t *testing.T) {
}
}
func TestCheckVoucherValidCountingAllLanes(t *testing.T) {
ctx := context.Background()
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic))
to := tutils.NewSECP256K1Addr(t, "secpTo")
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
minDelta := big.NewInt(0)
mock := newMockManagerAPI()
mock.setAccountAddress(fromAcct, from)
mock.setAccountAddress(toAcct, to)
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
actorBalance := big.NewInt(10)
toSend := big.NewInt(1)
laneStates := map[uint64]paych.LaneState{
1: paychmock.NewMockLaneState(big.NewInt(3), 1),
2: paychmock.NewMockLaneState(big.NewInt(4), 1),
}
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: actorBalance,
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), toSend, laneStates))
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Add channel To address to wallet
mock.addWalletAddress(to)
//
// Should not be possible to add a voucher with a value such that
// <total lane Redeemed> + toSend > <actor balance>
//
// lane 1 redeemed: 3
// voucher amount (lane 1): 6
// lane 1 redeemed (with voucher): 6
//
// Lane 1: 6
// Lane 2: 4
// toSend: 1
// --
// total: 11
//
// actor balance is 10 so total is too high.
//
voucherLane := uint64(1)
voucherNonce := uint64(2)
voucherAmount := big.NewInt(6)
sv := createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.Error(t, err)
//
// lane 1 redeemed: 3
// voucher amount (lane 1): 4
// lane 1 redeemed (with voucher): 4
//
// Lane 1: 4
// Lane 2: 4
// toSend: 1
// --
// total: 9
//
// actor balance is 10 so total is ok.
//
voucherAmount = big.NewInt(4)
sv = createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.NoError(t, err)
// Add voucher to lane 1, so Lane 1 effective redeemed
// (with first voucher) is now 4
_, err = mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta)
require.NoError(t, err)
//
// lane 1 redeemed: 4
// voucher amount (lane 1): 6
// lane 1 redeemed (with voucher): 6
//
// Lane 1: 6
// Lane 2: 4
// toSend: 1
// --
// total: 11
//
// actor balance is 10 so total is too high.
//
voucherNonce++
voucherAmount = big.NewInt(6)
sv = createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.Error(t, err)
//
// lane 1 redeemed: 4
// voucher amount (lane 1): 5
// lane 1 redeemed (with voucher): 5
//
// Lane 1: 5
// Lane 2: 4
// toSend: 1
// --
// total: 10
//
// actor balance is 10 so total is ok.
//
voucherAmount = big.NewInt(5)
sv = createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate)
err = mgr.CheckVoucherValid(ctx, ch, sv)
require.NoError(t, err)
}
func TestCreateVoucher(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
// Create a voucher in lane 1
voucherLane1Amt := big.NewInt(5)
@ -400,7 +298,7 @@ func TestAddVoucherDelta(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
voucherLane := uint64(1)
@ -442,7 +340,7 @@ func TestAddVoucherNextLane(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
minDelta := big.NewInt(0)
voucherAmount := big.NewInt(2)
@ -489,10 +387,8 @@ func TestAddVoucherNextLane(t *testing.T) {
}
func TestAllocateLane(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
// First lane should be 0
lane, err := s.mgr.AllocateLane(s.ch)
@ -525,7 +421,6 @@ func TestAllocateLaneWithExistingLaneState(t *testing.T) {
// Create a channel that will be retrieved from state
actorBalance := big.NewInt(10)
toSend := big.NewInt(1)
act := &types.Actor{
Code: builtin.AccountActorCodeID,
@ -534,7 +429,7 @@ func TestAllocateLaneWithExistingLaneState(t *testing.T) {
Balance: actorBalance,
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), toSend, make(map[uint64]paych.LaneState)))
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
mgr, err := newManager(store, mock)
require.NoError(t, err)
@ -559,7 +454,7 @@ func TestAddVoucherProof(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
nonce := uint64(1)
voucherAmount := big.NewInt(1)
@ -622,10 +517,11 @@ func TestAddVoucherInboundWalletKey(t *testing.T) {
}
mock := newMockManagerAPI()
mock.setAccountAddress(fromAcct, from)
mock.setAccountAddress(toAcct, to)
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), types.NewInt(0), make(map[uint64]paych.LaneState)))
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
// Create a manager
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
@ -660,7 +556,7 @@ func TestBestSpendable(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
// Add vouchers to lane 1 with amounts: [1, 2, 3]
voucherLane := uint64(1)
@ -740,7 +636,7 @@ func TestCheckSpendable(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
// Create voucher with Extra
voucherLane := uint64(1)
@ -821,7 +717,7 @@ func TestSubmitVoucher(t *testing.T) {
ctx := context.Background()
// Set up a manager with a single payment channel
s := testSetupMgrWithChannel(ctx, t)
s := testSetupMgrWithChannel(t)
// Create voucher with Extra
voucherLane := uint64(1)
@ -908,7 +804,7 @@ type testScaffold struct {
fromKeyPrivate []byte
}
func testSetupMgrWithChannel(ctx context.Context, t *testing.T) *testScaffold {
func testSetupMgrWithChannel(t *testing.T) *testScaffold {
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100)
@ -929,7 +825,7 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) *testScaffold {
Nonce: 0,
Balance: balance,
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), big.NewInt(0), make(map[uint64]paych.LaneState)))
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
mgr, err := newManager(store, mock)

View File

@ -978,7 +978,7 @@ func TestPaychAvailableFunds(t *testing.T) {
Nonce: 0,
Balance: createAmt,
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), big.NewInt(0), make(map[uint64]paych.LaneState)))
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)

View File

@ -0,0 +1,103 @@
package paychmgr
import (
"context"
"testing"
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
paychmock "github.com/filecoin-project/lotus/chain/actors/builtin/paych/mock"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
tutils "github.com/filecoin-project/specs-actors/support/testing"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)
// TestPaychAddVoucherAfterAddFunds tests adding a voucher to a channel with
// insufficient funds, then adding funds to the channel, then adding the
// voucher again
func TestPaychAddVoucherAfterAddFunds(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t)
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic))
to := tutils.NewSECP256K1Addr(t, "secpTo")
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
mock := newMockManagerAPI()
defer mock.close()
// Add the from signing key to the wallet
mock.setAccountAddress(fromAcct, from)
mock.setAccountAddress(toAcct, to)
mock.addSigningKey(fromKeyPrivate)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
createAmt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt)
require.NoError(t, err)
// Send create channel response
response := testChannelResponse(t, ch)
mock.receiveMsgResponse(createMsgCid, response)
// Create an actor in state for the channel with the initial channel balance
act := &types.Actor{
Code: builtin.AccountActorCodeID,
Head: cid.Cid{},
Nonce: 0,
Balance: createAmt,
}
mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState)))
// Wait for create response to be processed by manager
_, err = mgr.GetPaychWaitReady(ctx, createMsgCid)
require.NoError(t, err)
// Create a voucher with a value equal to the channel balance
voucher := paych.SignedVoucher{Amount: createAmt, Lane: 1}
res, err := mgr.CreateVoucher(ctx, ch, voucher)
require.NoError(t, err)
require.NotNil(t, res.Voucher)
// Create a voucher in a different lane with an amount that exceeds the
// channel balance
excessAmt := types.NewInt(5)
voucher = paych.SignedVoucher{Amount: excessAmt, Lane: 2}
res, err = mgr.CreateVoucher(ctx, ch, voucher)
require.NoError(t, err)
require.Nil(t, res.Voucher)
require.Equal(t, res.Shortfall, excessAmt)
// Add funds so as to cover the voucher shortfall
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt)
require.NoError(t, err)
// Trigger add funds confirmation
mock.receiveMsgResponse(addFundsMsgCid, types.MessageReceipt{ExitCode: 0})
// Update actor test case balance to reflect added funds
act.Balance = types.BigAdd(createAmt, excessAmt)
// Wait for add funds confirmation to be processed by manager
_, err = mgr.GetPaychWaitReady(ctx, addFundsMsgCid)
require.NoError(t, err)
// Adding same voucher that previously exceeded channel balance
// should succeed now that the channel balance has been increased
res, err = mgr.CreateVoucher(ctx, ch, voucher)
require.NoError(t, err)
require.NotNil(t, res.Voucher)
}

View File

@ -312,7 +312,7 @@ func (ca *channelAccessor) currentAvailableFunds(channelID string, queuedAmt typ
return nil, err
}
laneStates, err := ca.laneState(ca.chctx, pchState, ch)
laneStates, err := ca.laneState(pchState, ch)
if err != nil {
return nil, err
}

View File

@ -199,16 +199,24 @@ func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, ti
return nil
}
type apiIpldStore struct {
type ApiIpldStore struct {
ctx context.Context
api api.FullNode
api apiIpldStoreApi
}
func (ht *apiIpldStore) Context() context.Context {
type apiIpldStoreApi interface {
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
}
func NewApiIpldStore(ctx context.Context, api apiIpldStoreApi) *ApiIpldStore {
return &ApiIpldStore{ctx, api}
}
func (ht *ApiIpldStore) Context() context.Context {
return ht.ctx
}
func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
func (ht *ApiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
raw, err := ht.api.ChainReadObj(ctx, c)
if err != nil {
return err
@ -225,8 +233,8 @@ func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) err
return fmt.Errorf("Object does not implement CBORUnmarshaler")
}
func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore")
func (ht *ApiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cid.Undef, fmt.Errorf("Put is not implemented on ApiIpldStore")
}
func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {