diff --git a/api/api_full.go b/api/api_full.go index acbae19c4..7e7cf8e15 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -192,6 +192,9 @@ type FullNode interface { // MpoolPush pushes a signed message to mempool. MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error) + // MpoolPushUntrusted pushes a signed message to mempool from untrusted sources. + MpoolPushUntrusted(context.Context, *types.SignedMessage) (cid.Cid, error) + // MpoolPushMessage atomically assigns a nonce, signs, and pushes a message // to mempool. // maxFee is only used when GasFeeCap/GasPremium fields aren't specified diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 2ae4ac08d..b074d2ef0 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -122,7 +122,9 @@ type FullNodeStruct struct { MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` MpoolClear func(context.Context, bool) error `perm:"write"` - MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` + MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` + MpoolPushUntrusted func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` + MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"` MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"` MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"` @@ -557,6 +559,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag return c.Internal.MpoolPush(ctx, smsg) } +func (c *FullNodeStruct) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { + return c.Internal.MpoolPushUntrusted(ctx, smsg) +} + func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { return c.Internal.MpoolPushMessage(ctx, msg, spec) } diff --git a/api/test/paych.go b/api/test/paych.go index 36eb2c256..15ce352bd 100644 --- a/api/test/paych.go +++ b/api/test/paych.go @@ -67,7 +67,7 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) { t.Fatal(err) } - channelAmt := int64(100000) + channelAmt := int64(7000) channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt)) if err != nil { t.Fatal(err) @@ -169,6 +169,51 @@ func TestPaymentChannels(t *testing.T, b APIBuilder, blocktime time.Duration) { t.Fatal("Timed out waiting for receiver to submit vouchers") } + // Create a new voucher now that some vouchers have already been submitted + vouchRes, err := paymentCreator.PaychVoucherCreate(ctx, channel, abi.NewTokenAmount(1000), 3) + if err != nil { + t.Fatal(err) + } + if vouchRes.Voucher == nil { + t.Fatal(fmt.Errorf("Not enough funds to create voucher: missing %d", vouchRes.Shortfall)) + } + vdelta, err := paymentReceiver.PaychVoucherAdd(ctx, channel, vouchRes.Voucher, nil, abi.NewTokenAmount(1000)) + if err != nil { + t.Fatal(err) + } + if !vdelta.Equals(abi.NewTokenAmount(1000)) { + t.Fatal("voucher didn't have the right amount") + } + + // Create a new voucher whose value would exceed the channel balance + excessAmt := abi.NewTokenAmount(1000) + vouchRes, err = paymentCreator.PaychVoucherCreate(ctx, channel, excessAmt, 4) + if err != nil { + t.Fatal(err) + } + if vouchRes.Voucher != nil { + t.Fatal("Expected not to be able to create voucher whose value would exceed channel balance") + } + if !vouchRes.Shortfall.Equals(excessAmt) { + t.Fatal(fmt.Errorf("Expected voucher shortfall of %d, got %d", excessAmt, vouchRes.Shortfall)) + } + + // Add a voucher whose value would exceed the channel balance + vouch := &paych.SignedVoucher{ChannelAddr: channel, Amount: excessAmt, Lane: 4, Nonce: 1} + vb, err := vouch.SigningBytes() + if err != nil { + t.Fatal(err) + } + sig, err := paymentCreator.WalletSign(ctx, createrAddr, vb) + if err != nil { + t.Fatal(err) + } + vouch.Signature = sig + _, err = paymentReceiver.PaychVoucherAdd(ctx, channel, vouch, nil, abi.NewTokenAmount(1000)) + if err == nil { + t.Fatal(fmt.Errorf("Expected shortfall error of %d", excessAmt)) + } + // wait for the settlement period to pass before collecting waitForBlocks(ctx, t, bm, paymentReceiver, receiverAddr, paych.SettleDelay) diff --git a/chain/actors/builtin/paych/mock/mock.go b/chain/actors/builtin/paych/mock/mock.go index c4903f3ac..3b82511ff 100644 --- a/chain/actors/builtin/paych/mock/mock.go +++ b/chain/actors/builtin/paych/mock/mock.go @@ -27,10 +27,9 @@ type mockLaneState struct { func NewMockPayChState(from address.Address, to address.Address, settlingAt abi.ChainEpoch, - toSend abi.TokenAmount, lanes map[uint64]paych.LaneState, ) paych.State { - return &mockState{from, to, settlingAt, toSend, lanes} + return &mockState{from: from, to: to, settlingAt: settlingAt, toSend: big.NewInt(0), lanes: lanes} } // NewMockLaneState constructs a state for a payment channel lane with the set fixed values diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index d54ea7164..83aa5c6b7 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -55,6 +55,7 @@ var baseFeeLowerBoundFactor = types.NewInt(10) var baseFeeLowerBoundFactorConservative = types.NewInt(100) var MaxActorPendingMessages = 1000 +var MaxUntrustedActorPendingMessages = 10 var MaxNonceGap = uint64(4) @@ -195,9 +196,17 @@ func CapGasFee(msg *types.Message, maxFee abi.TokenAmount) { msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap } -func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) { +func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted bool) (bool, error) { nextNonce := ms.nextNonce nonceGap := false + + maxNonceGap := MaxNonceGap + maxActorPendingMessages := MaxActorPendingMessages + if untrusted { + maxNonceGap = 0 + maxActorPendingMessages = MaxUntrustedActorPendingMessages + } + switch { case m.Message.Nonce == nextNonce: nextNonce++ @@ -206,7 +215,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo nextNonce++ } - case strict && m.Message.Nonce > nextNonce+MaxNonceGap: + case strict && m.Message.Nonce > nextNonce+maxNonceGap: return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap) case m.Message.Nonce > nextNonce: @@ -242,7 +251,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo //ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int) } - if !has && strict && len(ms.msgs) > MaxActorPendingMessages { + if !has && strict && len(ms.msgs) >= maxActorPendingMessages { log.Errorf("too many pending messages from actor %s", m.Message.From) return false, ErrTooManyPendingMessages } @@ -484,7 +493,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { } mp.curTsLk.Lock() - publish, err := mp.addTs(m, mp.curTs, true) + publish, err := mp.addTs(m, mp.curTs, true, false) if err != nil { mp.curTsLk.Unlock() return cid.Undef, err @@ -551,7 +560,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() - _, err = mp.addTs(m, mp.curTs, false) + _, err = mp.addTs(m, mp.curTs, false, false) return err } @@ -619,7 +628,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) return nil } -func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) { +func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) { snonce, err := mp.getStateNonce(m.Message.From, curTs) if err != nil { return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) @@ -641,7 +650,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local return false, err } - return publish, mp.addLocked(m, !local) + return publish, mp.addLocked(m, !local, untrusted) } func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { @@ -676,17 +685,17 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { return err } - return mp.addLocked(m, false) + return mp.addLocked(m, false, false) } func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { mp.lk.Lock() defer mp.lk.Unlock() - return mp.addLocked(m, false) + return mp.addLocked(m, false, false) } -func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error { +func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error { log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce) if m.Signature.Type == crypto.SigTypeBLS { mp.blsSigCache.Add(m.Cid(), m.Signature) @@ -713,7 +722,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error { mp.pending[m.Message.From] = mset } - incr, err := mset.add(m, mp, strict) + incr, err := mset.add(m, mp, strict, untrusted) if err != nil { log.Debug(err) return err @@ -793,6 +802,50 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) ( return act.Balance, nil } +// this method is provided for the gateway to push messages. +// differences from Push: +// - strict checks are enabled +// - extra strict add checks are used when adding the messages to the msgSet +// that means: no nonce gaps, at most 10 pending messages for the actor +func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) { + err := mp.checkMessage(m) + if err != nil { + return cid.Undef, err + } + + // serialize push access to reduce lock contention + mp.addSema <- struct{}{} + defer func() { + <-mp.addSema + }() + + msgb, err := m.Serialize() + if err != nil { + return cid.Undef, err + } + + mp.curTsLk.Lock() + publish, err := mp.addTs(m, mp.curTs, false, true) + if err != nil { + mp.curTsLk.Unlock() + return cid.Undef, err + } + mp.curTsLk.Unlock() + + mp.lk.Lock() + if err := mp.addLocal(m, msgb); err != nil { + mp.lk.Unlock() + return cid.Undef, err + } + mp.lk.Unlock() + + if publish { + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + } + + return m.Cid(), err +} + func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) { mp.lk.Lock() defer mp.lk.Unlock() diff --git a/chain/store/store.go b/chain/store/store.go index 0806fb921..0c99b9d43 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -1191,13 +1191,6 @@ func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid. } func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error { - if ts == nil { - ts = cs.GetHeaviestTipSet() - } - - seen := cid.NewSet() - walked := cid.NewSet() - h := &car.CarHeader{ Roots: ts.Cids(), Version: 1, @@ -1207,6 +1200,28 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return xerrors.Errorf("failed to write car header: %s", err) } + return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error { + blk, err := cs.bs.Get(c) + if err != nil { + return xerrors.Errorf("writing object to car, bs.Get: %w", err) + } + + if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil { + return xerrors.Errorf("failed to write block to car output: %w", err) + } + + return nil + }) +} + +func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error { + if ts == nil { + ts = cs.GetHeaviestTipSet() + } + + seen := cid.NewSet() + walked := cid.NewSet() + blocksToWalk := ts.Cids() currentMinHeight := ts.Height() @@ -1215,15 +1230,15 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return nil } + if err := cb(blk); err != nil { + return err + } + data, err := cs.bs.Get(blk) if err != nil { return xerrors.Errorf("getting block: %w", err) } - if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil { - return xerrors.Errorf("failed to write block to car output: %w", err) - } - var b types.BlockHeader if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil { return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err) @@ -1270,14 +1285,11 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo if c.Prefix().Codec != cid.DagCBOR { continue } - data, err := cs.bs.Get(c) - if err != nil { - return xerrors.Errorf("writing object to car (get %s): %w", c, err) + + if err := cb(c); err != nil { + return err } - if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil { - return xerrors.Errorf("failed to write out car object: %w", err) - } } } diff --git a/cli/client.go b/cli/client.go index 255f5a70c..7494815bf 100644 --- a/cli/client.go +++ b/cli/client.go @@ -12,13 +12,12 @@ import ( "text/tabwriter" "time" - "github.com/filecoin-project/specs-actors/actors/builtin" - tm "github.com/buger/goterm" "github.com/docker/go-units" "github.com/fatih/color" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" "github.com/libp2p/go-libp2p-core/peer" @@ -476,6 +475,7 @@ func interactiveDeal(cctx *cli.Context) error { var ask storagemarket.StorageAsk var epochPrice big.Int var epochs abi.ChainEpoch + var verified bool var a address.Address if from := cctx.String("from"); from != "" { @@ -572,6 +572,53 @@ func interactiveDeal(cctx *cli.Context) error { ask = *a // TODO: run more validation + state = "verified" + case "verified": + ts, err := api.ChainHead(ctx) + if err != nil { + return err + } + + dcap, err := api.StateVerifiedClientStatus(ctx, a, ts.Key()) + if err != nil { + return err + } + + if dcap == nil { + state = "confirm" + continue + } + + color.Blue(".. checking verified deal eligibility\n") + ds, err := api.ClientDealSize(ctx, data) + if err != nil { + return err + } + + if dcap.Uint64() < uint64(ds.PieceSize) { + color.Yellow(".. not enough DataCap available for a verified deal\n") + state = "confirm" + continue + } + + fmt.Print("\nMake this a verified deal? (yes/no): ") + + var yn string + _, err = fmt.Scan(&yn) + if err != nil { + return err + } + + switch yn { + case "yes": + verified = true + case "no": + verified = false + default: + fmt.Println("Type in full 'yes' or 'no'") + continue + } + state = "confirm" case "confirm": fromBal, err := api.WalletBalance(ctx, a) @@ -590,10 +637,15 @@ func interactiveDeal(cctx *cli.Context) error { epochs = abi.ChainEpoch(dur / (time.Duration(build.BlockDelaySecs) * time.Second)) // TODO: do some more or epochs math (round to miner PP, deal start buffer) + pricePerGib := ask.Price + if verified { + pricePerGib = ask.VerifiedPrice + } + gib := types.NewInt(1 << 30) // TODO: price is based on PaddedPieceSize, right? - epochPrice = types.BigDiv(types.BigMul(ask.Price, types.NewInt(uint64(ds.PieceSize))), gib) + epochPrice = types.BigDiv(types.BigMul(pricePerGib, types.NewInt(uint64(ds.PieceSize))), gib) totalPrice := types.BigMul(epochPrice, types.NewInt(uint64(epochs))) fmt.Printf("-----\n") @@ -603,6 +655,7 @@ func interactiveDeal(cctx *cli.Context) error { fmt.Printf("Piece size: %s (Payload size: %s)\n", units.BytesSize(float64(ds.PieceSize)), units.BytesSize(float64(ds.PayloadSize))) fmt.Printf("Duration: %s\n", dur) fmt.Printf("Total price: ~%s (%s per epoch)\n", types.FIL(totalPrice), types.FIL(epochPrice)) + fmt.Printf("Verified: %v\n", verified) state = "accept" case "accept": @@ -637,7 +690,7 @@ func interactiveDeal(cctx *cli.Context) error { MinBlocksDuration: uint64(epochs), DealStartEpoch: abi.ChainEpoch(cctx.Int64("start-epoch")), FastRetrieval: cctx.Bool("fast-retrieval"), - VerifiedDeal: false, // TODO: Allow setting + VerifiedDeal: verified, }) if err != nil { return err diff --git a/cli/mpool.go b/cli/mpool.go index 65f4ef942..a8c73b656 100644 --- a/cli/mpool.go +++ b/cli/mpool.go @@ -7,6 +7,7 @@ import ( "sort" "strconv" + cid "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -43,6 +44,10 @@ var mpoolPending = &cli.Command{ Name: "local", Usage: "print pending messages for addresses in local wallet only", }, + &cli.BoolFlag{ + Name: "cids", + Usage: "only print cids of messages in output", + }, }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) @@ -79,11 +84,15 @@ var mpoolPending = &cli.Command{ } } - out, err := json.MarshalIndent(msg, "", " ") - if err != nil { - return err + if cctx.Bool("cids") { + fmt.Println(msg.Cid()) + } else { + out, err := json.MarshalIndent(msg, "", " ") + if err != nil { + return err + } + fmt.Println(string(out)) } - fmt.Println(string(out)) } return nil @@ -308,21 +317,8 @@ var mpoolReplaceCmd = &cli.Command{ Usage: "Spend up to X FIL for this message (applicable for auto mode)", }, }, - ArgsUsage: "[from] [nonce]", + ArgsUsage: " | ", Action: func(cctx *cli.Context) error { - if cctx.Args().Len() < 2 { - return cli.ShowCommandHelp(cctx, cctx.Command.Name) - } - - from, err := address.NewFromString(cctx.Args().Get(0)) - if err != nil { - return err - } - - nonce, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64) - if err != nil { - return err - } api, closer, err := GetFullNodeAPI(cctx) if err != nil { @@ -332,6 +328,39 @@ var mpoolReplaceCmd = &cli.Command{ ctx := ReqContext(cctx) + var from address.Address + var nonce uint64 + switch cctx.Args().Len() { + case 1: + mcid, err := cid.Decode(cctx.Args().First()) + if err != nil { + return err + } + + msg, err := api.ChainGetMessage(ctx, mcid) + if err != nil { + return fmt.Errorf("could not find referenced message: %w", err) + } + + from = msg.From + nonce = msg.Nonce + case 2: + f, err := address.NewFromString(cctx.Args().Get(0)) + if err != nil { + return err + } + + n, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64) + if err != nil { + return err + } + + from = f + nonce = n + default: + return cli.ShowCommandHelp(cctx, cctx.Command.Name) + } + ts, err := api.ChainHead(ctx) if err != nil { return xerrors.Errorf("getting chain head: %w", err) diff --git a/cmd/lotus-gateway/api.go b/cmd/lotus-gateway/api.go index 42e9e4829..0a6365dbd 100644 --- a/cmd/lotus-gateway/api.go +++ b/cmd/lotus-gateway/api.go @@ -86,5 +86,5 @@ func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (ci // TODO: additional anti-spam checks - return a.api.MpoolPush(ctx, sm) + return a.api.MpoolPushUntrusted(ctx, sm) } diff --git a/cmd/lotus-pcr/main.go b/cmd/lotus-pcr/main.go index 36961a663..4ce5bbb9f 100644 --- a/cmd/lotus-pcr/main.go +++ b/cmd/lotus-pcr/main.go @@ -1,8 +1,10 @@ package main import ( + "bufio" "bytes" "context" + "encoding/csv" "fmt" "io/ioutil" "net/http" @@ -25,11 +27,12 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/exitcode" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -42,6 +45,8 @@ var log = logging.Logger("main") func main() { local := []*cli.Command{ runCmd, + recoverMinersCmd, + findMinersCmd, versionCmd, } @@ -105,6 +110,186 @@ var versionCmd = &cli.Command{ }, } +var findMinersCmd = &cli.Command{ + Name: "find-miners", + Usage: "find miners with a desired minimum balance", + Description: `Find miners returns a list of miners and their balances that are below a + threhold value. By default only the miner actor available balance is considered but other + account balances can be included by enabling them through the flags. + + Examples + Find all miners with an available balance below 100 FIL + + lotus-pcr find-miners --threshold 100 + + Find all miners with a balance below zero, which includes the owner and worker balances + + lotus-pcr find-miners --threshold 0 --owner --worker +`, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "no-sync", + EnvVars: []string{"LOTUS_PCR_NO_SYNC"}, + Usage: "do not wait for chain sync to complete", + }, + &cli.IntFlag{ + Name: "threshold", + EnvVars: []string{"LOTUS_PCR_THRESHOLD"}, + Usage: "balance below this limit will be printed", + Value: 0, + }, + &cli.BoolFlag{ + Name: "owner", + Usage: "include owner balance", + Value: false, + }, + &cli.BoolFlag{ + Name: "worker", + Usage: "include worker balance", + Value: false, + }, + &cli.BoolFlag{ + Name: "control", + Usage: "include control balance", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := context.Background() + api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path")) + if err != nil { + log.Fatal(err) + } + defer closer() + + if !cctx.Bool("no-sync") { + if err := stats.WaitForSyncComplete(ctx, api); err != nil { + log.Fatal(err) + } + } + + owner := cctx.Bool("owner") + worker := cctx.Bool("worker") + control := cctx.Bool("control") + threshold := uint64(cctx.Int("threshold")) + + rf := &refunder{ + api: api, + threshold: types.FromFil(threshold), + } + + refundTipset, err := api.ChainHead(ctx) + if err != nil { + return err + } + + balanceRefund, err := rf.FindMiners(ctx, refundTipset, NewMinersRefund(), owner, worker, control) + if err != nil { + return err + } + + for _, maddr := range balanceRefund.Miners() { + fmt.Printf("%s\t%s\n", maddr, types.FIL(balanceRefund.GetRefund(maddr))) + } + + return nil + }, +} + +var recoverMinersCmd = &cli.Command{ + Name: "recover-miners", + Usage: "Ensure all miners with a negative available balance have a FIL surplus across accounts", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "from", + EnvVars: []string{"LOTUS_PCR_FROM"}, + Usage: "wallet address to send refund from", + }, + &cli.BoolFlag{ + Name: "no-sync", + EnvVars: []string{"LOTUS_PCR_NO_SYNC"}, + Usage: "do not wait for chain sync to complete", + }, + &cli.BoolFlag{ + Name: "dry-run", + EnvVars: []string{"LOTUS_PCR_DRY_RUN"}, + Usage: "do not send any messages", + Value: false, + }, + &cli.StringFlag{ + Name: "output", + Usage: "dump data as a csv format to this file", + }, + &cli.IntFlag{ + Name: "miner-recovery-cutoff", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"}, + Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied", + Value: 3000, + }, + &cli.IntFlag{ + Name: "miner-recovery-bonus", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"}, + Usage: "additional FIL to send to each miner", + Value: 5, + }, + &cli.IntFlag{ + Name: "miner-recovery-refund-percent", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"}, + Usage: "percent of refund to issue", + Value: 110, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := context.Background() + api, closer, err := stats.GetFullNodeAPI(cctx.Context, cctx.String("lotus-path")) + if err != nil { + log.Fatal(err) + } + defer closer() + + from, err := address.NewFromString(cctx.String("from")) + if err != nil { + return xerrors.Errorf("parsing source address (provide correct --from flag!): %w", err) + } + + if !cctx.Bool("no-sync") { + if err := stats.WaitForSyncComplete(ctx, api); err != nil { + log.Fatal(err) + } + } + + dryRun := cctx.Bool("dry-run") + minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent") + minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff")) + minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus")) + + rf := &refunder{ + api: api, + wallet: from, + dryRun: dryRun, + minerRecoveryRefundPercent: minerRecoveryRefundPercent, + minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff), + minerRecoveryBonus: types.FromFil(minerRecoveryBonus), + } + + refundTipset, err := api.ChainHead(ctx) + if err != nil { + return err + } + + balanceRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund(), cctx.String("output")) + if err != nil { + return err + } + + if err := rf.Refund(ctx, "refund to recover miner", refundTipset, balanceRefund, 0); err != nil { + return err + } + + return nil + }, +} + var runCmd = &cli.Command{ Name: "run", Usage: "Start message reimpursement", @@ -120,10 +305,10 @@ var runCmd = &cli.Command{ Usage: "do not wait for chain sync to complete", }, &cli.IntFlag{ - Name: "percent-extra", - EnvVars: []string{"LOTUS_PCR_PERCENT_EXTRA"}, - Usage: "extra funds to send above the refund", - Value: 3, + Name: "refund-percent", + EnvVars: []string{"LOTUS_PCR_REFUND_PERCENT"}, + Usage: "percent of refund to issue", + Value: 103, }, &cli.IntFlag{ Name: "max-message-queue", @@ -161,6 +346,36 @@ var runCmd = &cli.Command{ Usage: "the number of tipsets to delay message processing to smooth chain reorgs", Value: int(build.MessageConfidence), }, + &cli.BoolFlag{ + Name: "miner-recovery", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY"}, + Usage: "run the miner recovery job", + Value: false, + }, + &cli.IntFlag{ + Name: "miner-recovery-period", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_PERIOD"}, + Usage: "interval between running miner recovery", + Value: 2880, + }, + &cli.IntFlag{ + Name: "miner-recovery-cutoff", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_CUTOFF"}, + Usage: "maximum amount of FIL that can be sent to any one miner before refund percent is applied", + Value: 3000, + }, + &cli.IntFlag{ + Name: "miner-recovery-bonus", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_BONUS"}, + Usage: "additional FIL to send to each miner", + Value: 5, + }, + &cli.IntFlag{ + Name: "miner-recovery-refund-percent", + EnvVars: []string{"LOTUS_PCR_MINER_RECOVERY_REFUND_PERCENT"}, + Usage: "percent of refund to issue", + Value: 110, + }, }, Action: func(cctx *cli.Context) error { go func() { @@ -199,24 +414,33 @@ var runCmd = &cli.Command{ log.Fatal(err) } - percentExtra := cctx.Int("percent-extra") + refundPercent := cctx.Int("refund-percent") maxMessageQueue := cctx.Int("max-message-queue") dryRun := cctx.Bool("dry-run") preCommitEnabled := cctx.Bool("pre-commit") proveCommitEnabled := cctx.Bool("prove-commit") aggregateTipsets := cctx.Int("aggregate-tipsets") + minerRecoveryEnabled := cctx.Bool("miner-recovery") + minerRecoveryPeriod := abi.ChainEpoch(int64(cctx.Int("miner-recovery-period"))) + minerRecoveryRefundPercent := cctx.Int("miner-recovery-refund-percent") + minerRecoveryCutoff := uint64(cctx.Int("miner-recovery-cutoff")) + minerRecoveryBonus := uint64(cctx.Int("miner-recovery-bonus")) rf := &refunder{ - api: api, - wallet: from, - percentExtra: percentExtra, - dryRun: dryRun, - preCommitEnabled: preCommitEnabled, - proveCommitEnabled: proveCommitEnabled, + api: api, + wallet: from, + refundPercent: refundPercent, + minerRecoveryRefundPercent: minerRecoveryRefundPercent, + minerRecoveryCutoff: types.FromFil(minerRecoveryCutoff), + minerRecoveryBonus: types.FromFil(minerRecoveryBonus), + dryRun: dryRun, + preCommitEnabled: preCommitEnabled, + proveCommitEnabled: proveCommitEnabled, } var refunds *MinersRefund = NewMinersRefund() var rounds int = 0 + nextMinerRecovery := r.MinerRecoveryHeight() + minerRecoveryPeriod for tipset := range tipsetsCh { refunds, err = rf.ProcessTipset(ctx, tipset, refunds) @@ -224,17 +448,34 @@ var runCmd = &cli.Command{ return err } - rounds = rounds + 1 - if rounds < aggregateTipsets { - continue - } - refundTipset, err := api.ChainHead(ctx) if err != nil { return err } - if err := rf.Refund(ctx, refundTipset, refunds, rounds); err != nil { + if minerRecoveryEnabled && refundTipset.Height() >= nextMinerRecovery { + recoveryRefund, err := rf.EnsureMinerMinimums(ctx, refundTipset, NewMinersRefund(), "") + if err != nil { + return err + } + + if err := rf.Refund(ctx, "refund to recover miners", refundTipset, recoveryRefund, 0); err != nil { + return err + } + + if err := r.SetMinerRecoveryHeight(tipset.Height()); err != nil { + return err + } + + nextMinerRecovery = r.MinerRecoveryHeight() + minerRecoveryPeriod + } + + rounds = rounds + 1 + if rounds < aggregateTipsets { + continue + } + + if err := rf.Refund(ctx, "refund stats", refundTipset, refunds, rounds); err != nil { return err } @@ -293,7 +534,6 @@ func (m *MinersRefund) Track(addr address.Address, value types.BigInt) { m.count = m.count + 1 m.totalRefunds = types.BigAdd(m.totalRefunds, value) - m.refunds[addr] = types.BigAdd(m.refunds[addr], value) } @@ -322,8 +562,14 @@ type refunderNodeApi interface { ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error) ChainGetParentReceipts(ctx context.Context, blockCid cid.Cid) ([]*types.MessageReceipt, error) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) StateMinerInitialPledgeCollateral(ctx context.Context, addr address.Address, precommitInfo miner.SectorPreCommitInfo, tsk types.TipSetKey) (types.BigInt, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) StateSectorPreCommitInfo(ctx context.Context, addr address.Address, sector abi.SectorNumber, tsk types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error) + StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) + StateMinerSectors(ctx context.Context, addr address.Address, filter *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) + StateMinerFaults(ctx context.Context, addr address.Address, tsk types.TipSetKey) (bitfield.BitField, error) + StateListMiners(context.Context, types.TipSetKey) ([]address.Address, error) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) @@ -332,12 +578,241 @@ type refunderNodeApi interface { } type refunder struct { - api refunderNodeApi - wallet address.Address - percentExtra int - dryRun bool - preCommitEnabled bool - proveCommitEnabled bool + api refunderNodeApi + wallet address.Address + refundPercent int + minerRecoveryRefundPercent int + minerRecoveryCutoff big.Int + minerRecoveryBonus big.Int + dryRun bool + preCommitEnabled bool + proveCommitEnabled bool + threshold big.Int +} + +func (r *refunder) FindMiners(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, owner, worker, control bool) (*MinersRefund, error) { + miners, err := r.api.StateListMiners(ctx, tipset.Key()) + if err != nil { + return nil, err + } + + for _, maddr := range miners { + mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + if !mact.Balance.GreaterThan(big.Zero()) { + continue + } + + minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key()) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + // Look up and find all addresses associated with the miner + minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key()) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + allAddresses := []address.Address{} + + if worker { + allAddresses = append(allAddresses, minerInfo.Worker) + } + + if owner { + allAddresses = append(allAddresses, minerInfo.Owner) + } + + if control { + allAddresses = append(allAddresses, minerInfo.ControlAddresses...) + } + + // Sum the balancer of all the addresses + addrSum := big.Zero() + addrCheck := make(map[address.Address]struct{}, len(allAddresses)) + for _, addr := range allAddresses { + if _, found := addrCheck[addr]; !found { + balance, err := r.api.WalletBalance(ctx, addr) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + addrSum = big.Add(addrSum, balance) + addrCheck[addr] = struct{}{} + } + } + + totalAvailableBalance := big.Add(addrSum, minerAvailableBalance) + + if totalAvailableBalance.GreaterThanEqual(r.threshold) { + continue + } + + refunds.Track(maddr, totalAvailableBalance) + + log.Debugw("processing miner", "miner", maddr, "sectors", "available_balance", totalAvailableBalance) + } + + return refunds, nil +} + +func (r *refunder) EnsureMinerMinimums(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, output string) (*MinersRefund, error) { + miners, err := r.api.StateListMiners(ctx, tipset.Key()) + if err != nil { + return nil, err + } + + w := ioutil.Discard + if len(output) != 0 { + f, err := os.Create(output) + if err != nil { + return nil, err + } + + defer f.Close() // nolint:errcheck + + w = bufio.NewWriter(f) + } + + csvOut := csv.NewWriter(w) + defer csvOut.Flush() + if err := csvOut.Write([]string{"MinerID", "FaultedSectors", "AvailableBalance", "ProposedRefund"}); err != nil { + return nil, err + } + + for _, maddr := range miners { + mact, err := r.api.StateGetActor(ctx, maddr, types.EmptyTSK) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + if !mact.Balance.GreaterThan(big.Zero()) { + continue + } + + minerAvailableBalance, err := r.api.StateMinerAvailableBalance(ctx, maddr, tipset.Key()) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + // Look up and find all addresses associated with the miner + minerInfo, err := r.api.StateMinerInfo(ctx, maddr, tipset.Key()) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + allAddresses := []address.Address{minerInfo.Worker, minerInfo.Owner} + allAddresses = append(allAddresses, minerInfo.ControlAddresses...) + + // Sum the balancer of all the addresses + addrSum := big.Zero() + addrCheck := make(map[address.Address]struct{}, len(allAddresses)) + for _, addr := range allAddresses { + if _, found := addrCheck[addr]; !found { + balance, err := r.api.WalletBalance(ctx, addr) + if err != nil { + log.Errorw("failed", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + addrSum = big.Add(addrSum, balance) + addrCheck[addr] = struct{}{} + } + } + + faults, err := r.api.StateMinerFaults(ctx, maddr, tipset.Key()) + if err != nil { + log.Errorw("failed to look up miner faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + faultsCount, err := faults.Count() + if err != nil { + log.Errorw("failed to get count of faults", "err", err, "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + if faultsCount == 0 { + log.Debugw("skipping miner with zero faults", "height", tipset.Height(), "key", tipset.Key(), "miner", maddr) + continue + } + + totalAvailableBalance := big.Add(addrSum, minerAvailableBalance) + balanceCutoff := big.Mul(big.Div(big.NewIntUnsigned(faultsCount), big.NewInt(10)), big.NewIntUnsigned(build.FilecoinPrecision)) + + if totalAvailableBalance.GreaterThan(balanceCutoff) { + log.Debugw( + "skipping over miner with total available balance larger than refund", + "height", tipset.Height(), + "key", tipset.Key(), + "miner", maddr, + "available_balance", totalAvailableBalance, + "balance_cutoff", balanceCutoff, + "faults_count", faultsCount, + "available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + "balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + ) + continue + } + + refundValue := big.Sub(balanceCutoff, totalAvailableBalance) + if r.minerRecoveryRefundPercent > 0 { + refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.minerRecoveryRefundPercent))) + } + + refundValue = big.Add(refundValue, r.minerRecoveryBonus) + + if refundValue.GreaterThan(r.minerRecoveryCutoff) { + log.Infow( + "skipping over miner with refund greater than refund cutoff", + "height", tipset.Height(), + "key", tipset.Key(), + "miner", maddr, + "available_balance", totalAvailableBalance, + "balance_cutoff", balanceCutoff, + "faults_count", faultsCount, + "refund", refundValue, + "available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + "balance_cutoff_fil", big.Div(balanceCutoff, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + "refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + ) + continue + } + + refunds.Track(maddr, refundValue) + record := []string{ + maddr.String(), + fmt.Sprintf("%d", faultsCount), + big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).String(), + big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).String(), + } + if err := csvOut.Write(record); err != nil { + return nil, err + } + + log.Debugw( + "processing miner", + "miner", maddr, + "faults_count", faultsCount, + "available_balance", totalAvailableBalance, + "refund", refundValue, + "available_balance_fil", big.Div(totalAvailableBalance, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + "refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + ) + } + + return refunds, nil } func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund) (*MinersRefund, error) { @@ -458,22 +933,41 @@ func (r *refunder) ProcessTipset(ctx context.Context, tipset *types.TipSet, refu continue } - if r.percentExtra > 0 { - refundValue = types.BigAdd(refundValue, types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.percentExtra)))) + if r.refundPercent > 0 { + refundValue = types.BigMul(types.BigDiv(refundValue, types.NewInt(100)), types.NewInt(uint64(r.refundPercent))) } - log.Debugw("processing message", "method", messageMethod, "cid", msg.Cid, "from", m.From, "to", m.To, "value", m.Value, "gas_fee_cap", m.GasFeeCap, "gas_premium", m.GasPremium, "gas_used", recps[i].GasUsed, "refund", refundValue) + log.Debugw( + "processing message", + "method", messageMethod, + "cid", msg.Cid, + "from", m.From, + "to", m.To, + "value", m.Value, + "gas_fee_cap", m.GasFeeCap, + "gas_premium", m.GasPremium, + "gas_used", recps[i].GasUsed, + "refund", refundValue, + "refund_fil", big.Div(refundValue, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + ) refunds.Track(m.From, refundValue) tipsetRefunds.Track(m.From, refundValue) } - log.Infow("tipset stats", "height", tipset.Height(), "key", tipset.Key(), "total_refunds", tipsetRefunds.TotalRefunds(), "messages_processed", tipsetRefunds.Count()) + log.Infow( + "tipset stats", + "height", tipset.Height(), + "key", tipset.Key(), + "total_refunds", tipsetRefunds.TotalRefunds(), + "total_refunds_fil", big.Div(tipsetRefunds.TotalRefunds(), big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + "messages_processed", tipsetRefunds.Count(), + ) return refunds, nil } -func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *MinersRefund, rounds int) error { +func (r *refunder) Refund(ctx context.Context, name string, tipset *types.TipSet, refunds *MinersRefund, rounds int) error { if refunds.Count() == 0 { log.Debugw("no messages to refund in tipset", "height", tipset.Height(), "key", tipset.Key()) return nil @@ -531,13 +1025,24 @@ func (r *refunder) Refund(ctx context.Context, tipset *types.TipSet, refunds *Mi refundSum = types.BigAdd(refundSum, msg.Value) } - log.Infow("refund stats", "tipsets_processed", rounds, "height", tipset.Height(), "key", tipset.Key(), "messages_sent", len(messages)-failures, "refund_sum", refundSum, "messages_failures", failures, "messages_processed", refunds.Count()) + log.Infow( + name, + "tipsets_processed", rounds, + "height", tipset.Height(), + "key", tipset.Key(), + "messages_sent", len(messages)-failures, + "refund_sum", refundSum, + "refund_sum_fil", big.Div(refundSum, big.NewIntUnsigned(build.FilecoinPrecision)).Int64(), + "messages_failures", failures, + "messages_processed", refunds.Count(), + ) return nil } type Repo struct { - last abi.ChainEpoch - path string + lastHeight abi.ChainEpoch + lastMinerRecoveryHeight abi.ChainEpoch + path string } func NewRepo(path string) (*Repo, error) { @@ -547,8 +1052,9 @@ func NewRepo(path string) (*Repo, error) { } return &Repo{ - last: 0, - path: path, + lastHeight: 0, + lastMinerRecoveryHeight: 0, + path: path, }, nil } @@ -579,43 +1085,66 @@ func (r *Repo) init() error { return nil } -func (r *Repo) Open() (err error) { - if err = r.init(); err != nil { - return +func (r *Repo) Open() error { + if err := r.init(); err != nil { + return err } - var f *os.File + if err := r.loadHeight(); err != nil { + return err + } - f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR|os.O_CREATE, 0644) + if err := r.loadMinerRecoveryHeight(); err != nil { + return err + } + + return nil +} + +func loadChainEpoch(fn string) (abi.ChainEpoch, error) { + f, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0644) if err != nil { - return + return 0, err } defer func() { err = f.Close() }() - var raw []byte - - raw, err = ioutil.ReadAll(f) + raw, err := ioutil.ReadAll(f) if err != nil { - return + return 0, err } height, err := strconv.Atoi(string(bytes.TrimSpace(raw))) if err != nil { - return + return 0, err } - r.last = abi.ChainEpoch(height) - return + return abi.ChainEpoch(height), nil +} + +func (r *Repo) loadHeight() error { + var err error + r.lastHeight, err = loadChainEpoch(filepath.Join(r.path, "height")) + return err +} + +func (r *Repo) loadMinerRecoveryHeight() error { + var err error + r.lastMinerRecoveryHeight, err = loadChainEpoch(filepath.Join(r.path, "miner_recovery_height")) + return err } func (r *Repo) Height() abi.ChainEpoch { - return r.last + return r.lastHeight +} + +func (r *Repo) MinerRecoveryHeight() abi.ChainEpoch { + return r.lastMinerRecoveryHeight } func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) { - r.last = last + r.lastHeight = last var f *os.File f, err = os.OpenFile(filepath.Join(r.path, "height"), os.O_RDWR, 0644) if err != nil { @@ -626,7 +1155,26 @@ func (r *Repo) SetHeight(last abi.ChainEpoch) (err error) { err = f.Close() }() - if _, err = fmt.Fprintf(f, "%d", r.last); err != nil { + if _, err = fmt.Fprintf(f, "%d", r.lastHeight); err != nil { + return + } + + return +} + +func (r *Repo) SetMinerRecoveryHeight(last abi.ChainEpoch) (err error) { + r.lastMinerRecoveryHeight = last + var f *os.File + f, err = os.OpenFile(filepath.Join(r.path, "miner_recovery_height"), os.O_RDWR, 0644) + if err != nil { + return + } + + defer func() { + err = f.Close() + }() + + if _, err = fmt.Fprintf(f, "%d", r.lastMinerRecoveryHeight); err != nil { return } diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 409d8928b..4542551db 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -39,6 +39,7 @@ func main() { consensusCmd, serveDealStatsCmd, syncCmd, + stateTreePruneCmd, datastoreCmd, } diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go new file mode 100644 index 000000000..79158c3a3 --- /dev/null +++ b/cmd/lotus-shed/pruning.go @@ -0,0 +1,289 @@ +package main + +import ( + "context" + "fmt" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/lib/blockstore" + "github.com/filecoin-project/lotus/node/repo" + "github.com/ipfs/bbloom" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + dshelp "github.com/ipfs/go-ipfs-ds-help" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" +) + +type cidSet interface { + Add(cid.Cid) + Has(cid.Cid) bool + HasRaw([]byte) bool + Len() int +} + +type bloomSet struct { + bloom *bbloom.Bloom +} + +func newBloomSet(size int64) (*bloomSet, error) { + b, err := bbloom.New(float64(size), 3) + if err != nil { + return nil, err + } + + return &bloomSet{bloom: b}, nil +} + +func (bs *bloomSet) Add(c cid.Cid) { + bs.bloom.Add(c.Hash()) + +} + +func (bs *bloomSet) Has(c cid.Cid) bool { + return bs.bloom.Has(c.Hash()) +} + +func (bs *bloomSet) HasRaw(b []byte) bool { + return bs.bloom.Has(b) +} + +func (bs *bloomSet) Len() int { + return int(bs.bloom.ElementsAdded()) +} + +type mapSet struct { + m map[string]struct{} +} + +func newMapSet() *mapSet { + return &mapSet{m: make(map[string]struct{})} +} + +func (bs *mapSet) Add(c cid.Cid) { + bs.m[string(c.Hash())] = struct{}{} +} + +func (bs *mapSet) Has(c cid.Cid) bool { + _, ok := bs.m[string(c.Hash())] + return ok +} + +func (bs *mapSet) HasRaw(b []byte) bool { + _, ok := bs.m[string(b)] + return ok +} + +func (bs *mapSet) Len() int { + return len(bs.m) +} + +var stateTreePruneCmd = &cli.Command{ + Name: "state-prune", + Description: "Deletes old state root data from local chainstore", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + Value: "~/.lotus", + }, + &cli.Int64Flag{ + Name: "keep-from-lookback", + Usage: "keep stateroots at or newer than the current height minus this lookback", + Value: 1800, // 2 x finality + }, + &cli.IntFlag{ + Name: "delete-up-to", + Usage: "delete up to the given number of objects (used to run a faster 'partial' sync)", + }, + &cli.BoolFlag{ + Name: "use-bloom-set", + Usage: "use a bloom filter for the 'good' set instead of a map, reduces memory usage but may not clean up as much", + }, + &cli.BoolFlag{ + Name: "dry-run", + Usage: "only enumerate the good set, don't do any deletions", + }, + &cli.BoolFlag{ + Name: "only-ds-gc", + Usage: "Only run datastore GC", + }, + &cli.IntFlag{ + Name: "gc-count", + Usage: "number of times to run gc", + Value: 20, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := context.TODO() + + fsrepo, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return err + } + + lkrepo, err := fsrepo.Lock(repo.FullNode) + if err != nil { + return err + } + + defer lkrepo.Close() //nolint:errcheck + + ds, err := lkrepo.Datastore("/chain") + if err != nil { + return err + } + + defer ds.Close() //nolint:errcheck + + mds, err := lkrepo.Datastore("/metadata") + if err != nil { + return err + } + defer mds.Close() //nolint:errcheck + + if cctx.Bool("only-ds-gc") { + gcds, ok := ds.(datastore.GCDatastore) + if ok { + fmt.Println("running datastore gc....") + for i := 0; i < cctx.Int("gc-count"); i++ { + if err := gcds.CollectGarbage(); err != nil { + return xerrors.Errorf("datastore GC failed: %w", err) + } + } + fmt.Println("gc complete!") + return nil + } + return fmt.Errorf("datastore doesnt support gc") + } + + bs := blockstore.NewBlockstore(ds) + + cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier)) + if err := cs.Load(); err != nil { + return fmt.Errorf("loading chainstore: %w", err) + } + + var goodSet cidSet + if cctx.Bool("use-bloom-set") { + bset, err := newBloomSet(10000000) + if err != nil { + return err + } + goodSet = bset + } else { + goodSet = newMapSet() + } + + ts := cs.GetHeaviestTipSet() + + rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback")) + + if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error { + if goodSet.Len()%20 == 0 { + fmt.Printf("\renumerating keep set: %d ", goodSet.Len()) + } + goodSet.Add(c) + return nil + }); err != nil { + return fmt.Errorf("snapshot walk failed: %w", err) + } + + fmt.Println() + fmt.Printf("Successfully marked keep set! (%d objects)\n", goodSet.Len()) + + if cctx.Bool("dry-run") { + return nil + } + + var b datastore.Batch + var batchCount int + markForRemoval := func(c cid.Cid) error { + if b == nil { + nb, err := ds.Batch() + if err != nil { + return fmt.Errorf("opening batch: %w", err) + } + + b = nb + } + batchCount++ + + if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil { + return err + } + + if batchCount > 100 { + if err := b.Commit(); err != nil { + return xerrors.Errorf("failed to commit batch deletes: %w", err) + } + b = nil + batchCount = 0 + } + return nil + } + + res, err := ds.Query(query.Query{KeysOnly: true}) + if err != nil { + return xerrors.Errorf("failed to query datastore: %w", err) + } + + dupTo := cctx.Int("delete-up-to") + + var deleteCount int + var goodHits int + for { + v, ok := res.NextSync() + if !ok { + break + } + + bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):])) + if err != nil { + return xerrors.Errorf("failed to parse key: %w", err) + } + + if goodSet.HasRaw(bk) { + goodHits++ + continue + } + + nc := cid.NewCidV1(cid.Raw, bk) + + deleteCount++ + if err := markForRemoval(nc); err != nil { + return fmt.Errorf("failed to remove cid %s: %w", nc, err) + } + + if deleteCount%20 == 0 { + fmt.Printf("\rdeleting %d objects (good hits: %d)... ", deleteCount, goodHits) + } + + if dupTo != 0 && deleteCount > dupTo { + break + } + } + + if b != nil { + if err := b.Commit(); err != nil { + return xerrors.Errorf("failed to commit final batch delete: %w", err) + } + } + + gcds, ok := ds.(datastore.GCDatastore) + if ok { + fmt.Println("running datastore gc....") + for i := 0; i < cctx.Int("gc-count"); i++ { + if err := gcds.CollectGarbage(); err != nil { + return xerrors.Errorf("datastore GC failed: %w", err) + } + } + fmt.Println("gc complete!") + } + + return nil + }, +} diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index f74a975b0..94ea6b1da 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -74,6 +74,7 @@ * [MpoolPending](#MpoolPending) * [MpoolPush](#MpoolPush) * [MpoolPushMessage](#MpoolPushMessage) + * [MpoolPushUntrusted](#MpoolPushUntrusted) * [MpoolSelect](#MpoolSelect) * [MpoolSetConfig](#MpoolSetConfig) * [MpoolSub](#MpoolSub) @@ -1802,6 +1803,43 @@ Response: } ``` +### MpoolPushUntrusted +MpoolPushUntrusted pushes a signed message to mempool from untrusted sources. + + +Perms: write + +Inputs: +```json +[ + { + "Message": { + "Version": 42, + "To": "t01234", + "From": "t01234", + "Nonce": 42, + "Value": "0", + "GasLimit": 9, + "GasFeeCap": "0", + "GasPremium": "0", + "Method": 1, + "Params": "Ynl0ZSBhcnJheQ==" + }, + "Signature": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + } + } +] +``` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + ### MpoolSelect MpoolSelect returns a list of pending messages for inclusion in the next block diff --git a/go.mod b/go.mod index 3f2661d0e..ae5500d0f 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/golang-lru v0.5.4 github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d + github.com/ipfs/bbloom v0.0.4 github.com/ipfs/go-bitswap v0.2.20 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 066aafdc5..e0dd3ecef 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -110,6 +110,10 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (ci return a.Mpool.Push(smsg) } +func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { + return a.Mpool.PushUntrusted(smsg) +} + func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { cp := *msg msg = &cp diff --git a/paychmgr/paych.go b/paychmgr/paych.go index f856b9890..e67991911 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -205,7 +205,7 @@ func (ca *channelAccessor) checkVoucherValidUnlocked(ctx context.Context, ch add } // Check the voucher against the highest known voucher nonce / value - laneStates, err := ca.laneState(ctx, pchState, ch) + laneStates, err := ca.laneState(pchState, ch) if err != nil { return nil, err } @@ -253,16 +253,9 @@ func (ca *channelAccessor) checkVoucherValidUnlocked(ctx context.Context, ch add return nil, err } - // Total required balance = total redeemed + toSend - // Must not exceed actor balance - ts, err := pchState.ToSend() - if err != nil { - return nil, err - } - - newTotal := types.BigAdd(totalRedeemed, ts) - if act.Balance.LessThan(newTotal) { - return nil, newErrInsufficientFunds(types.BigSub(newTotal, act.Balance)) + // Total required balance must not exceed actor balance + if act.Balance.LessThan(totalRedeemed) { + return nil, newErrInsufficientFunds(types.BigSub(totalRedeemed, act.Balance)) } if len(sv.Merges) != 0 { @@ -505,7 +498,6 @@ func (ca *channelAccessor) allocateLane(ch address.Address) (uint64, error) { ca.lk.Lock() defer ca.lk.Unlock() - // TODO: should this take into account lane state? return ca.store.AllocateLane(ch) } @@ -520,7 +512,7 @@ func (ca *channelAccessor) listVouchers(ctx context.Context, ch address.Address) // laneState gets the LaneStates from chain, then applies all vouchers in // the data store over the chain state -func (ca *channelAccessor) laneState(ctx context.Context, state paych.State, ch address.Address) (map[uint64]paych.LaneState, error) { +func (ca *channelAccessor) laneState(state paych.State, ch address.Address) (map[uint64]paych.LaneState, error) { // TODO: we probably want to call UpdateChannelState with all vouchers to be fully correct // (but technically dont't need to) @@ -552,9 +544,12 @@ func (ca *channelAccessor) laneState(ctx context.Context, state paych.State, ch return nil, xerrors.Errorf("paych merges not handled yet") } - // If there's a voucher for a lane that isn't in chain state just - // create it + // Check if there is an existing laneState in the payment channel + // for this voucher's lane ls, ok := laneStates[v.Voucher.Lane] + + // If the voucher does not have a higher nonce than the existing + // laneState for this lane, ignore it if ok { n, err := ls.Nonce() if err != nil { @@ -565,6 +560,7 @@ func (ca *channelAccessor) laneState(ctx context.Context, state paych.State, ch } } + // Voucher has a higher nonce, so replace laneState with this voucher laneStates[v.Voucher.Lane] = laneState{v.Voucher.Amount, v.Voucher.Nonce} } diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go index b27b1e540..56e7e9089 100644 --- a/paychmgr/paych_test.go +++ b/paychmgr/paych_test.go @@ -47,7 +47,6 @@ func TestCheckVoucherValid(t *testing.T) { expectError bool key []byte actorBalance big.Int - toSend big.Int voucherAmount big.Int voucherLane uint64 voucherNonce uint64 @@ -56,35 +55,30 @@ func TestCheckVoucherValid(t *testing.T) { name: "passes when voucher amount < balance", key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), voucherAmount: big.NewInt(5), }, { name: "fails when funds too low", expectError: true, key: fromKeyPrivate, actorBalance: big.NewInt(5), - toSend: big.NewInt(0), voucherAmount: big.NewInt(10), }, { name: "fails when invalid signature", expectError: true, key: randKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), voucherAmount: big.NewInt(5), }, { name: "fails when signed by channel To account (instead of From account)", expectError: true, key: toKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), voucherAmount: big.NewInt(5), }, { name: "fails when nonce too low", expectError: true, key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), voucherAmount: big.NewInt(5), voucherLane: 1, voucherNonce: 2, @@ -95,7 +89,6 @@ func TestCheckVoucherValid(t *testing.T) { name: "passes when nonce higher", key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), voucherAmount: big.NewInt(5), voucherLane: 1, voucherNonce: 3, @@ -106,7 +99,6 @@ func TestCheckVoucherValid(t *testing.T) { name: "passes when nonce for different lane", key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), voucherAmount: big.NewInt(5), voucherLane: 2, voucherNonce: 2, @@ -118,32 +110,22 @@ func TestCheckVoucherValid(t *testing.T) { expectError: true, key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(0), voucherAmount: big.NewInt(5), voucherLane: 1, voucherNonce: 3, laneStates: map[uint64]paych.LaneState{ 1: paychmock.NewMockLaneState(big.NewInt(6), 2), }, - }, { - name: "fails when voucher + ToSend > balance", - expectError: true, - key: fromKeyPrivate, - actorBalance: big.NewInt(10), - toSend: big.NewInt(9), - voucherAmount: big.NewInt(2), }, { // voucher supersedes lane 1 redeemed so // lane 1 effective redeemed = voucher amount // - // required balance = toSend + total redeemed - // = 1 + 6 (lane1) + // required balance = voucher amt // = 7 // So required balance: 7 < actor balance: 10 - name: "passes when voucher + total redeemed <= balance", + name: "passes when voucher total redeemed <= balance", key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(1), voucherAmount: big.NewInt(6), voucherLane: 1, voucherNonce: 2, @@ -152,29 +134,68 @@ func TestCheckVoucherValid(t *testing.T) { 1: paychmock.NewMockLaneState(big.NewInt(4), 1), }, }, { - // required balance = toSend + total redeemed - // = 1 + 4 (lane 2) + 6 (voucher lane 1) + // required balance = total redeemed + // = 6 (voucher lane 1) + 5 (lane 2) // = 11 // So required balance: 11 > actor balance: 10 - name: "fails when voucher + total redeemed > balance", + name: "fails when voucher total redeemed > balance", expectError: true, key: fromKeyPrivate, actorBalance: big.NewInt(10), - toSend: big.NewInt(1), voucherAmount: big.NewInt(6), voucherLane: 1, voucherNonce: 1, laneStates: map[uint64]paych.LaneState{ // Lane 2 (different from voucher lane 1) - 2: paychmock.NewMockLaneState(big.NewInt(4), 1), + 2: paychmock.NewMockLaneState(big.NewInt(5), 1), + }, + }, { + // voucher supersedes lane 1 redeemed so + // lane 1 effective redeemed = voucher amount + // + // required balance = total redeemed + // = 6 (new voucher lane 1) + 5 (lane 2) + // = 11 + // So required balance: 11 > actor balance: 10 + name: "fails when voucher total redeemed > balance", + expectError: true, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + voucherAmount: big.NewInt(6), + voucherLane: 1, + voucherNonce: 2, + laneStates: map[uint64]paych.LaneState{ + // Lane 1 (superseded by new voucher in voucher lane 1) + 1: paychmock.NewMockLaneState(big.NewInt(5), 1), + // Lane 2 (different from voucher lane 1) + 2: paychmock.NewMockLaneState(big.NewInt(5), 1), + }, + }, { + // voucher supersedes lane 1 redeemed so + // lane 1 effective redeemed = voucher amount + // + // required balance = total redeemed + // = 5 (new voucher lane 1) + 5 (lane 2) + // = 10 + // So required balance: 10 <= actor balance: 10 + name: "passes when voucher total redeemed <= balance", + expectError: false, + key: fromKeyPrivate, + actorBalance: big.NewInt(10), + voucherAmount: big.NewInt(5), + voucherLane: 1, + voucherNonce: 2, + laneStates: map[uint64]paych.LaneState{ + // Lane 1 (superseded by new voucher in voucher lane 1) + 1: paychmock.NewMockLaneState(big.NewInt(4), 1), + // Lane 2 (different from voucher lane 1) + 2: paychmock.NewMockLaneState(big.NewInt(5), 1), }, }} for _, tcase := range tcases { tcase := tcase t.Run(tcase.name, func(t *testing.T) { - store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) - // Create an actor for the channel with the test case balance act := &types.Actor{ Code: builtin.AccountActorCodeID, @@ -184,16 +205,17 @@ func TestCheckVoucherValid(t *testing.T) { } mock.setPaychState(ch, act, paychmock.NewMockPayChState( - fromAcct, toAcct, abi.ChainEpoch(0), tcase.toSend, tcase.laneStates)) + fromAcct, toAcct, abi.ChainEpoch(0), tcase.laneStates)) // Create a manager + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) mgr, err := newManager(store, mock) require.NoError(t, err) // Add channel To address to wallet mock.addWalletAddress(to) - // Create a signed voucher + // Create the test case signed voucher sv := createTestVoucher(t, ch, tcase.voucherLane, tcase.voucherNonce, tcase.voucherAmount, tcase.key) // Check the voucher's validity @@ -207,135 +229,11 @@ func TestCheckVoucherValid(t *testing.T) { } } -func TestCheckVoucherValidCountingAllLanes(t *testing.T) { - ctx := context.Background() - - fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) - - ch := tutils.NewIDAddr(t, 100) - from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) - to := tutils.NewSECP256K1Addr(t, "secpTo") - fromAcct := tutils.NewActorAddr(t, "fromAct") - toAcct := tutils.NewActorAddr(t, "toAct") - minDelta := big.NewInt(0) - - mock := newMockManagerAPI() - mock.setAccountAddress(fromAcct, from) - mock.setAccountAddress(toAcct, to) - - store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) - - actorBalance := big.NewInt(10) - toSend := big.NewInt(1) - laneStates := map[uint64]paych.LaneState{ - 1: paychmock.NewMockLaneState(big.NewInt(3), 1), - 2: paychmock.NewMockLaneState(big.NewInt(4), 1), - } - - act := &types.Actor{ - Code: builtin.AccountActorCodeID, - Head: cid.Cid{}, - Nonce: 0, - Balance: actorBalance, - } - - mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), toSend, laneStates)) - - mgr, err := newManager(store, mock) - require.NoError(t, err) - - // Add channel To address to wallet - mock.addWalletAddress(to) - - // - // Should not be possible to add a voucher with a value such that - // + toSend > - // - // lane 1 redeemed: 3 - // voucher amount (lane 1): 6 - // lane 1 redeemed (with voucher): 6 - // - // Lane 1: 6 - // Lane 2: 4 - // toSend: 1 - // -- - // total: 11 - // - // actor balance is 10 so total is too high. - // - voucherLane := uint64(1) - voucherNonce := uint64(2) - voucherAmount := big.NewInt(6) - sv := createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) - err = mgr.CheckVoucherValid(ctx, ch, sv) - require.Error(t, err) - - // - // lane 1 redeemed: 3 - // voucher amount (lane 1): 4 - // lane 1 redeemed (with voucher): 4 - // - // Lane 1: 4 - // Lane 2: 4 - // toSend: 1 - // -- - // total: 9 - // - // actor balance is 10 so total is ok. - // - voucherAmount = big.NewInt(4) - sv = createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) - err = mgr.CheckVoucherValid(ctx, ch, sv) - require.NoError(t, err) - - // Add voucher to lane 1, so Lane 1 effective redeemed - // (with first voucher) is now 4 - _, err = mgr.AddVoucherOutbound(ctx, ch, sv, nil, minDelta) - require.NoError(t, err) - - // - // lane 1 redeemed: 4 - // voucher amount (lane 1): 6 - // lane 1 redeemed (with voucher): 6 - // - // Lane 1: 6 - // Lane 2: 4 - // toSend: 1 - // -- - // total: 11 - // - // actor balance is 10 so total is too high. - // - voucherNonce++ - voucherAmount = big.NewInt(6) - sv = createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) - err = mgr.CheckVoucherValid(ctx, ch, sv) - require.Error(t, err) - - // - // lane 1 redeemed: 4 - // voucher amount (lane 1): 5 - // lane 1 redeemed (with voucher): 5 - // - // Lane 1: 5 - // Lane 2: 4 - // toSend: 1 - // -- - // total: 10 - // - // actor balance is 10 so total is ok. - // - voucherAmount = big.NewInt(5) - sv = createTestVoucher(t, ch, voucherLane, voucherNonce, voucherAmount, fromKeyPrivate) - err = mgr.CheckVoucherValid(ctx, ch, sv) - require.NoError(t, err) -} - func TestCreateVoucher(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) // Create a voucher in lane 1 voucherLane1Amt := big.NewInt(5) @@ -400,7 +298,7 @@ func TestAddVoucherDelta(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) voucherLane := uint64(1) @@ -442,7 +340,7 @@ func TestAddVoucherNextLane(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) minDelta := big.NewInt(0) voucherAmount := big.NewInt(2) @@ -489,10 +387,8 @@ func TestAddVoucherNextLane(t *testing.T) { } func TestAllocateLane(t *testing.T) { - ctx := context.Background() - // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) // First lane should be 0 lane, err := s.mgr.AllocateLane(s.ch) @@ -525,7 +421,6 @@ func TestAllocateLaneWithExistingLaneState(t *testing.T) { // Create a channel that will be retrieved from state actorBalance := big.NewInt(10) - toSend := big.NewInt(1) act := &types.Actor{ Code: builtin.AccountActorCodeID, @@ -534,7 +429,7 @@ func TestAllocateLaneWithExistingLaneState(t *testing.T) { Balance: actorBalance, } - mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), toSend, make(map[uint64]paych.LaneState))) + mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) mgr, err := newManager(store, mock) require.NoError(t, err) @@ -559,7 +454,7 @@ func TestAddVoucherProof(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) nonce := uint64(1) voucherAmount := big.NewInt(1) @@ -622,10 +517,11 @@ func TestAddVoucherInboundWalletKey(t *testing.T) { } mock := newMockManagerAPI() + mock.setAccountAddress(fromAcct, from) mock.setAccountAddress(toAcct, to) - mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), types.NewInt(0), make(map[uint64]paych.LaneState))) + mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) // Create a manager store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) @@ -660,7 +556,7 @@ func TestBestSpendable(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) // Add vouchers to lane 1 with amounts: [1, 2, 3] voucherLane := uint64(1) @@ -740,7 +636,7 @@ func TestCheckSpendable(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) // Create voucher with Extra voucherLane := uint64(1) @@ -821,7 +717,7 @@ func TestSubmitVoucher(t *testing.T) { ctx := context.Background() // Set up a manager with a single payment channel - s := testSetupMgrWithChannel(ctx, t) + s := testSetupMgrWithChannel(t) // Create voucher with Extra voucherLane := uint64(1) @@ -908,7 +804,7 @@ type testScaffold struct { fromKeyPrivate []byte } -func testSetupMgrWithChannel(ctx context.Context, t *testing.T) *testScaffold { +func testSetupMgrWithChannel(t *testing.T) *testScaffold { fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) ch := tutils.NewIDAddr(t, 100) @@ -929,7 +825,7 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) *testScaffold { Nonce: 0, Balance: balance, } - mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), big.NewInt(0), make(map[uint64]paych.LaneState))) + mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) mgr, err := newManager(store, mock) diff --git a/paychmgr/paychget_test.go b/paychmgr/paychget_test.go index 430e66c67..e35ae8371 100644 --- a/paychmgr/paychget_test.go +++ b/paychmgr/paychget_test.go @@ -978,7 +978,7 @@ func TestPaychAvailableFunds(t *testing.T) { Nonce: 0, Balance: createAmt, } - mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), big.NewInt(0), make(map[uint64]paych.LaneState))) + mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) // Send create channel response response := testChannelResponse(t, ch) mock.receiveMsgResponse(createMsgCid, response) diff --git a/paychmgr/paychvoucherfunds_test.go b/paychmgr/paychvoucherfunds_test.go new file mode 100644 index 000000000..dcbb4acc9 --- /dev/null +++ b/paychmgr/paychvoucherfunds_test.go @@ -0,0 +1,103 @@ +package paychmgr + +import ( + "context" + "testing" + + "github.com/filecoin-project/lotus/chain/actors/builtin/paych" + paychmock "github.com/filecoin-project/lotus/chain/actors/builtin/paych/mock" + + "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/specs-actors/actors/builtin" + tutils "github.com/filecoin-project/specs-actors/support/testing" + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" +) + +// TestPaychAddVoucherAfterAddFunds tests adding a voucher to a channel with +// insufficient funds, then adding funds to the channel, then adding the +// voucher again +func TestPaychAddVoucherAfterAddFunds(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + fromKeyPrivate, fromKeyPublic := testGenerateKeyPair(t) + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewSECP256K1Addr(t, string(fromKeyPublic)) + to := tutils.NewSECP256K1Addr(t, "secpTo") + fromAcct := tutils.NewActorAddr(t, "fromAct") + toAcct := tutils.NewActorAddr(t, "toAct") + + mock := newMockManagerAPI() + defer mock.close() + + // Add the from signing key to the wallet + mock.setAccountAddress(fromAcct, from) + mock.setAccountAddress(toAcct, to) + mock.addSigningKey(fromKeyPrivate) + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + // Send create message for a channel with value 10 + createAmt := big.NewInt(10) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt) + require.NoError(t, err) + + // Send create channel response + response := testChannelResponse(t, ch) + mock.receiveMsgResponse(createMsgCid, response) + + // Create an actor in state for the channel with the initial channel balance + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: createAmt, + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + + // Wait for create response to be processed by manager + _, err = mgr.GetPaychWaitReady(ctx, createMsgCid) + require.NoError(t, err) + + // Create a voucher with a value equal to the channel balance + voucher := paych.SignedVoucher{Amount: createAmt, Lane: 1} + res, err := mgr.CreateVoucher(ctx, ch, voucher) + require.NoError(t, err) + require.NotNil(t, res.Voucher) + + // Create a voucher in a different lane with an amount that exceeds the + // channel balance + excessAmt := types.NewInt(5) + voucher = paych.SignedVoucher{Amount: excessAmt, Lane: 2} + res, err = mgr.CreateVoucher(ctx, ch, voucher) + require.NoError(t, err) + require.Nil(t, res.Voucher) + require.Equal(t, res.Shortfall, excessAmt) + + // Add funds so as to cover the voucher shortfall + _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt) + require.NoError(t, err) + + // Trigger add funds confirmation + mock.receiveMsgResponse(addFundsMsgCid, types.MessageReceipt{ExitCode: 0}) + + // Update actor test case balance to reflect added funds + act.Balance = types.BigAdd(createAmt, excessAmt) + + // Wait for add funds confirmation to be processed by manager + _, err = mgr.GetPaychWaitReady(ctx, addFundsMsgCid) + require.NoError(t, err) + + // Adding same voucher that previously exceeded channel balance + // should succeed now that the channel balance has been increased + res, err = mgr.CreateVoucher(ctx, ch, voucher) + require.NoError(t, err) + require.NotNil(t, res.Voucher) +} diff --git a/paychmgr/simple.go b/paychmgr/simple.go index d49ccafe6..ca778829f 100644 --- a/paychmgr/simple.go +++ b/paychmgr/simple.go @@ -312,7 +312,7 @@ func (ca *channelAccessor) currentAvailableFunds(channelID string, queuedAmt typ return nil, err } - laneStates, err := ca.laneState(ca.chctx, pchState, ch) + laneStates, err := ca.laneState(pchState, ch) if err != nil { return nil, err } diff --git a/tools/stats/metrics.go b/tools/stats/metrics.go index dd51ee69f..aee61b2aa 100644 --- a/tools/stats/metrics.go +++ b/tools/stats/metrics.go @@ -199,16 +199,24 @@ func RecordTipsetPoints(ctx context.Context, api api.FullNode, pl *PointList, ti return nil } -type apiIpldStore struct { +type ApiIpldStore struct { ctx context.Context - api api.FullNode + api apiIpldStoreApi } -func (ht *apiIpldStore) Context() context.Context { +type apiIpldStoreApi interface { + ChainReadObj(context.Context, cid.Cid) ([]byte, error) +} + +func NewApiIpldStore(ctx context.Context, api apiIpldStoreApi) *ApiIpldStore { + return &ApiIpldStore{ctx, api} +} + +func (ht *ApiIpldStore) Context() context.Context { return ht.ctx } -func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { +func (ht *ApiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { raw, err := ht.api.ChainReadObj(ctx, c) if err != nil { return err @@ -225,8 +233,8 @@ func (ht *apiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) err return fmt.Errorf("Object does not implement CBORUnmarshaler") } -func (ht *apiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { - return cid.Undef, fmt.Errorf("Put is not implemented on apiIpldStore") +func (ht *ApiIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { + return cid.Undef, fmt.Errorf("Put is not implemented on ApiIpldStore") } func RecordTipsetStatePoints(ctx context.Context, api api.FullNode, pl *PointList, tipset *types.TipSet) error {