Merge pull request #4736 from filecoin-project/refactor/fund-mgr
Refactor FundManager
This commit is contained in:
commit
e53b0ef068
@ -81,6 +81,7 @@ type DealProposals interface {
|
||||
type PublishStorageDealsParams = market0.PublishStorageDealsParams
|
||||
type PublishStorageDealsReturn = market0.PublishStorageDealsReturn
|
||||
type VerifyDealsForActivationParams = market0.VerifyDealsForActivationParams
|
||||
type WithdrawBalanceParams = market0.WithdrawBalanceParams
|
||||
|
||||
type ClientDealProposal = market0.ClientDealProposal
|
||||
|
||||
|
112
chain/market/cbor_gen.go
Normal file
112
chain/market/cbor_gen.go
Normal file
@ -0,0 +1,112 @@
|
||||
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
|
||||
|
||||
package market
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
xerrors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var _ = xerrors.Errorf
|
||||
|
||||
var lengthBufFundedAddressState = []byte{131}
|
||||
|
||||
func (t *FundedAddressState) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write(lengthBufFundedAddressState); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scratch := make([]byte, 9)
|
||||
|
||||
// t.Addr (address.Address) (struct)
|
||||
if err := t.Addr.MarshalCBOR(w); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.AmtReserved (big.Int) (struct)
|
||||
if err := t.AmtReserved.MarshalCBOR(w); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.MsgCid (cid.Cid) (struct)
|
||||
|
||||
if t.MsgCid == nil {
|
||||
if _, err := w.Write(cbg.CborNull); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := cbg.WriteCidBuf(scratch, w, *t.MsgCid); err != nil {
|
||||
return xerrors.Errorf("failed to write cid field t.MsgCid: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *FundedAddressState) UnmarshalCBOR(r io.Reader) error {
|
||||
*t = FundedAddressState{}
|
||||
|
||||
br := cbg.GetPeeker(r)
|
||||
scratch := make([]byte, 8)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 3 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Addr (address.Address) (struct)
|
||||
|
||||
{
|
||||
|
||||
if err := t.Addr.UnmarshalCBOR(br); err != nil {
|
||||
return xerrors.Errorf("unmarshaling t.Addr: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
// t.AmtReserved (big.Int) (struct)
|
||||
|
||||
{
|
||||
|
||||
if err := t.AmtReserved.UnmarshalCBOR(br); err != nil {
|
||||
return xerrors.Errorf("unmarshaling t.AmtReserved: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
// t.MsgCid (cid.Cid) (struct)
|
||||
|
||||
{
|
||||
|
||||
b, err := br.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b != cbg.CborNull[0] {
|
||||
if err := br.UnreadByte(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := cbg.ReadCid(br)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to read cid field t.MsgCid: %w", err)
|
||||
}
|
||||
|
||||
t.MsgCid = &c
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
695
chain/market/fundmanager.go
Normal file
695
chain/market/fundmanager.go
Normal file
@ -0,0 +1,695 @@
|
||||
package market
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
"go.uber.org/fx"
|
||||
)
|
||||
|
||||
var log = logging.Logger("market_adapter")
|
||||
|
||||
// API is the fx dependencies need to run a fund manager
|
||||
type FundManagerAPI struct {
|
||||
fx.In
|
||||
|
||||
full.StateAPI
|
||||
full.MpoolAPI
|
||||
}
|
||||
|
||||
// fundManagerAPI is the specific methods called by the FundManager
|
||||
type fundManagerAPI interface {
|
||||
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
|
||||
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
|
||||
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error)
|
||||
}
|
||||
|
||||
// FundManager keeps track of funds in a set of addresses
|
||||
type FundManager struct {
|
||||
ctx context.Context
|
||||
shutdown context.CancelFunc
|
||||
api fundManagerAPI
|
||||
str *Store
|
||||
|
||||
lk sync.Mutex
|
||||
fundedAddrs map[address.Address]*fundedAddress
|
||||
}
|
||||
|
||||
func NewFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &FundManager{
|
||||
ctx: ctx,
|
||||
shutdown: cancel,
|
||||
api: api,
|
||||
str: newStore(ds),
|
||||
fundedAddrs: make(map[address.Address]*fundedAddress),
|
||||
}
|
||||
}
|
||||
|
||||
func (fm *FundManager) Stop() {
|
||||
fm.shutdown()
|
||||
}
|
||||
|
||||
func (fm *FundManager) Start() error {
|
||||
fm.lk.Lock()
|
||||
defer fm.lk.Unlock()
|
||||
|
||||
// TODO:
|
||||
// To save memory:
|
||||
// - in State() only load addresses with in-progress messages
|
||||
// - load the others just-in-time from getFundedAddress
|
||||
// - delete(fm.fundedAddrs, addr) when the queue has been processed
|
||||
return fm.str.forEach(func(state *FundedAddressState) {
|
||||
fa := newFundedAddress(fm, state.Addr)
|
||||
fa.state = state
|
||||
fm.fundedAddrs[fa.state.Addr] = fa
|
||||
fa.start()
|
||||
})
|
||||
}
|
||||
|
||||
// Creates a fundedAddress if it doesn't already exist, and returns it
|
||||
func (fm *FundManager) getFundedAddress(addr address.Address) *fundedAddress {
|
||||
fm.lk.Lock()
|
||||
defer fm.lk.Unlock()
|
||||
|
||||
fa, ok := fm.fundedAddrs[addr]
|
||||
if !ok {
|
||||
fa = newFundedAddress(fm, addr)
|
||||
fm.fundedAddrs[addr] = fa
|
||||
}
|
||||
return fa
|
||||
}
|
||||
|
||||
// Reserve adds amt to `reserved`. If there are not enough available funds for
|
||||
// the address, submits a message on chain to top up available funds.
|
||||
// Returns the cid of the message that was submitted on chain, or cid.Undef if
|
||||
// the required funds were already available.
|
||||
func (fm *FundManager) Reserve(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
||||
return fm.getFundedAddress(addr).reserve(ctx, wallet, amt)
|
||||
}
|
||||
|
||||
// Subtract from `reserved`.
|
||||
func (fm *FundManager) Release(addr address.Address, amt abi.TokenAmount) error {
|
||||
return fm.getFundedAddress(addr).release(amt)
|
||||
}
|
||||
|
||||
// Withdraw unreserved funds. Only succeeds if there are enough unreserved
|
||||
// funds for the address.
|
||||
// Returns the cid of the message that was submitted on chain.
|
||||
func (fm *FundManager) Withdraw(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
||||
return fm.getFundedAddress(addr).withdraw(ctx, wallet, amt)
|
||||
}
|
||||
|
||||
// FundedAddressState keeps track of the state of an address with funds in the
|
||||
// datastore
|
||||
type FundedAddressState struct {
|
||||
Addr address.Address
|
||||
// AmtReserved is the amount that must be kept in the address (cannot be
|
||||
// withdrawn)
|
||||
AmtReserved abi.TokenAmount
|
||||
// MsgCid is the cid of an in-progress on-chain message
|
||||
MsgCid *cid.Cid
|
||||
}
|
||||
|
||||
// fundedAddress keeps track of the state and request queues for a
|
||||
// particular address
|
||||
type fundedAddress struct {
|
||||
ctx context.Context
|
||||
env *fundManagerEnvironment
|
||||
str *Store
|
||||
|
||||
lk sync.Mutex
|
||||
state *FundedAddressState
|
||||
|
||||
// Note: These request queues are ephemeral, they are not saved to store
|
||||
reservations []*fundRequest
|
||||
releases []*fundRequest
|
||||
withdrawals []*fundRequest
|
||||
|
||||
// Used by the tests
|
||||
onProcessStartListener func() bool
|
||||
}
|
||||
|
||||
func newFundedAddress(fm *FundManager, addr address.Address) *fundedAddress {
|
||||
return &fundedAddress{
|
||||
ctx: fm.ctx,
|
||||
env: &fundManagerEnvironment{api: fm.api},
|
||||
str: fm.str,
|
||||
state: &FundedAddressState{
|
||||
Addr: addr,
|
||||
AmtReserved: abi.NewTokenAmount(0),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// If there is an in-progress on-chain message, don't submit any more messages
|
||||
// on chain until it completes
|
||||
func (a *fundedAddress) start() {
|
||||
a.lk.Lock()
|
||||
defer a.lk.Unlock()
|
||||
|
||||
if a.state.MsgCid != nil {
|
||||
a.debugf("restart: wait for %s", a.state.MsgCid)
|
||||
a.startWaitForResults(*a.state.MsgCid)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *fundedAddress) reserve(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
||||
return a.requestAndWait(ctx, wallet, amt, &a.reservations)
|
||||
}
|
||||
|
||||
func (a *fundedAddress) release(amt abi.TokenAmount) error {
|
||||
_, err := a.requestAndWait(context.Background(), address.Undef, amt, &a.releases)
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *fundedAddress) withdraw(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
||||
return a.requestAndWait(ctx, wallet, amt, &a.withdrawals)
|
||||
}
|
||||
|
||||
func (a *fundedAddress) requestAndWait(ctx context.Context, wallet address.Address, amt abi.TokenAmount, reqs *[]*fundRequest) (cid.Cid, error) {
|
||||
// Create a request and add it to the request queue
|
||||
req := newFundRequest(ctx, wallet, amt)
|
||||
|
||||
a.lk.Lock()
|
||||
*reqs = append(*reqs, req)
|
||||
a.lk.Unlock()
|
||||
|
||||
// Process the queue
|
||||
go a.process()
|
||||
|
||||
// Wait for the results
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, ctx.Err()
|
||||
case r := <-req.Result:
|
||||
return r.msgCid, r.err
|
||||
}
|
||||
}
|
||||
|
||||
// Used by the tests
|
||||
func (a *fundedAddress) onProcessStart(fn func() bool) {
|
||||
a.lk.Lock()
|
||||
defer a.lk.Unlock()
|
||||
|
||||
a.onProcessStartListener = fn
|
||||
}
|
||||
|
||||
// Process queued requests
|
||||
func (a *fundedAddress) process() {
|
||||
a.lk.Lock()
|
||||
defer a.lk.Unlock()
|
||||
|
||||
// Used by the tests
|
||||
if a.onProcessStartListener != nil {
|
||||
done := a.onProcessStartListener()
|
||||
if !done {
|
||||
return
|
||||
}
|
||||
a.onProcessStartListener = nil
|
||||
}
|
||||
|
||||
// Check if we're still waiting for the response to a message
|
||||
if a.state.MsgCid != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check if there's anything to do
|
||||
haveReservations := len(a.reservations) > 0 || len(a.releases) > 0
|
||||
haveWithdrawals := len(a.withdrawals) > 0
|
||||
if !haveReservations && !haveWithdrawals {
|
||||
return
|
||||
}
|
||||
|
||||
// Process reservations / releases
|
||||
if haveReservations {
|
||||
res, err := a.processReservations(a.reservations, a.releases)
|
||||
if err == nil {
|
||||
a.applyStateChange(res.msgCid, res.amtReserved)
|
||||
}
|
||||
a.reservations = filterOutProcessedReqs(a.reservations)
|
||||
a.releases = filterOutProcessedReqs(a.releases)
|
||||
}
|
||||
|
||||
// If there was no message sent on chain by adding reservations, and all
|
||||
// reservations have completed processing, process withdrawals
|
||||
if haveWithdrawals && a.state.MsgCid == nil && len(a.reservations) == 0 {
|
||||
withdrawalCid, err := a.processWithdrawals(a.withdrawals)
|
||||
if err == nil && withdrawalCid != cid.Undef {
|
||||
a.applyStateChange(&withdrawalCid, types.EmptyInt)
|
||||
}
|
||||
a.withdrawals = filterOutProcessedReqs(a.withdrawals)
|
||||
}
|
||||
|
||||
// If a message was sent on-chain
|
||||
if a.state.MsgCid != nil {
|
||||
// Start waiting for results of message (async)
|
||||
a.startWaitForResults(*a.state.MsgCid)
|
||||
}
|
||||
|
||||
// Process any remaining queued requests
|
||||
go a.process()
|
||||
}
|
||||
|
||||
// Filter out completed requests
|
||||
func filterOutProcessedReqs(reqs []*fundRequest) []*fundRequest {
|
||||
filtered := make([]*fundRequest, 0, len(reqs))
|
||||
for _, req := range reqs {
|
||||
if !req.Completed() {
|
||||
filtered = append(filtered, req)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
// Apply the results of processing queues and save to the datastore
|
||||
func (a *fundedAddress) applyStateChange(msgCid *cid.Cid, amtReserved abi.TokenAmount) {
|
||||
a.state.MsgCid = msgCid
|
||||
if !amtReserved.Nil() {
|
||||
a.state.AmtReserved = amtReserved
|
||||
}
|
||||
a.saveState()
|
||||
}
|
||||
|
||||
// Clear the pending message cid so that a new message can be sent
|
||||
func (a *fundedAddress) clearWaitState() {
|
||||
a.state.MsgCid = nil
|
||||
a.saveState()
|
||||
}
|
||||
|
||||
// Save state to datastore
|
||||
func (a *fundedAddress) saveState() {
|
||||
// Not much we can do if saving to the datastore fails, just log
|
||||
err := a.str.save(a.state)
|
||||
if err != nil {
|
||||
log.Errorf("saving state to store for addr %s: %w", a.state.Addr, err)
|
||||
}
|
||||
}
|
||||
|
||||
// The result of processing the reservation / release queues
|
||||
type processResult struct {
|
||||
// Requests that completed without adding funds
|
||||
covered []*fundRequest
|
||||
// Requests that added funds
|
||||
added []*fundRequest
|
||||
|
||||
// The new reserved amount
|
||||
amtReserved abi.TokenAmount
|
||||
// The message cid, if a message was submitted on-chain
|
||||
msgCid *cid.Cid
|
||||
}
|
||||
|
||||
// process reservations and releases, and return the resulting changes to state
|
||||
func (a *fundedAddress) processReservations(reservations []*fundRequest, releases []*fundRequest) (pr *processResult, prerr error) {
|
||||
// When the function returns
|
||||
defer func() {
|
||||
// If there's an error, mark all requests as errored
|
||||
if prerr != nil {
|
||||
for _, req := range append(reservations, releases...) {
|
||||
req.Complete(cid.Undef, prerr)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Complete all release requests
|
||||
for _, req := range releases {
|
||||
req.Complete(cid.Undef, nil)
|
||||
}
|
||||
|
||||
// Complete all requests that were covered by released amounts
|
||||
for _, req := range pr.covered {
|
||||
req.Complete(cid.Undef, nil)
|
||||
}
|
||||
|
||||
// If a message was sent
|
||||
if pr.msgCid != nil {
|
||||
// Complete all add funds requests
|
||||
for _, req := range pr.added {
|
||||
req.Complete(*pr.msgCid, nil)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Split reservations into those that are covered by released amounts,
|
||||
// and those to add to the reserved amount.
|
||||
// Note that we process requests from the same wallet in batches. So some
|
||||
// requests may not be included in covered if they don't match the first
|
||||
// covered request's wallet. These will be processed on a subsequent
|
||||
// invocation of processReservations.
|
||||
toCancel, toAdd, reservedDelta := splitReservations(reservations, releases)
|
||||
|
||||
// Apply the reserved delta to the reserved amount
|
||||
reserved := types.BigAdd(a.state.AmtReserved, reservedDelta)
|
||||
if reserved.LessThan(abi.NewTokenAmount(0)) {
|
||||
reserved = abi.NewTokenAmount(0)
|
||||
}
|
||||
res := &processResult{
|
||||
amtReserved: reserved,
|
||||
covered: toCancel,
|
||||
}
|
||||
|
||||
// Work out the amount to add to the balance
|
||||
amtToAdd := abi.NewTokenAmount(0)
|
||||
if reserved.GreaterThan(abi.NewTokenAmount(0)) {
|
||||
// Get available funds for address
|
||||
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
// amount to add = new reserved amount - available
|
||||
amtToAdd = types.BigSub(reserved, avail)
|
||||
a.debugf("reserved %d - avail %d = to add %d", reserved, avail, amtToAdd)
|
||||
}
|
||||
|
||||
// If there's nothing to add to the balance, bail out
|
||||
if amtToAdd.LessThanEqual(abi.NewTokenAmount(0)) {
|
||||
res.covered = append(res.covered, toAdd...)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Add funds to address
|
||||
a.debugf("add funds %d", amtToAdd)
|
||||
addFundsCid, err := a.env.AddFunds(a.ctx, toAdd[0].Wallet, a.state.Addr, amtToAdd)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
// Mark reservation requests as complete
|
||||
res.added = toAdd
|
||||
|
||||
// Save the message CID to state
|
||||
res.msgCid = &addFundsCid
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Split reservations into those that are under the total release amount
|
||||
// (covered) and those that exceed it (to add).
|
||||
// Note that we process requests from the same wallet in batches. So some
|
||||
// requests may not be included in covered if they don't match the first
|
||||
// covered request's wallet.
|
||||
func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([]*fundRequest, []*fundRequest, abi.TokenAmount) {
|
||||
toCancel := make([]*fundRequest, 0, len(reservations))
|
||||
toAdd := make([]*fundRequest, 0, len(reservations))
|
||||
toAddAmt := abi.NewTokenAmount(0)
|
||||
|
||||
// Sum release amounts
|
||||
releaseAmt := abi.NewTokenAmount(0)
|
||||
for _, req := range releases {
|
||||
releaseAmt = types.BigAdd(releaseAmt, req.Amount())
|
||||
}
|
||||
|
||||
// We only want to combine requests that come from the same wallet
|
||||
batchWallet := address.Undef
|
||||
for _, req := range reservations {
|
||||
amt := req.Amount()
|
||||
|
||||
// If the amount to add to the reserve is cancelled out by a release
|
||||
if amt.LessThanEqual(releaseAmt) {
|
||||
// Cancel the request and update the release total
|
||||
releaseAmt = types.BigSub(releaseAmt, amt)
|
||||
toCancel = append(toCancel, req)
|
||||
continue
|
||||
}
|
||||
|
||||
// The amount to add is greater that the release total so we want
|
||||
// to send an add funds request
|
||||
|
||||
// The first time the wallet will be undefined
|
||||
if batchWallet == address.Undef {
|
||||
batchWallet = req.Wallet
|
||||
}
|
||||
// If this request's wallet is the same as the batch wallet,
|
||||
// the requests will be combined
|
||||
if batchWallet == req.Wallet {
|
||||
delta := types.BigSub(amt, releaseAmt)
|
||||
toAddAmt = types.BigAdd(toAddAmt, delta)
|
||||
releaseAmt = abi.NewTokenAmount(0)
|
||||
toAdd = append(toAdd, req)
|
||||
}
|
||||
}
|
||||
|
||||
// The change in the reserved amount is "amount to add" - "amount to release"
|
||||
reservedDelta := types.BigSub(toAddAmt, releaseAmt)
|
||||
|
||||
return toCancel, toAdd, reservedDelta
|
||||
}
|
||||
|
||||
// process withdrawal queue
|
||||
func (a *fundedAddress) processWithdrawals(withdrawals []*fundRequest) (msgCid cid.Cid, prerr error) {
|
||||
// If there's an error, mark all withdrawal requests as errored
|
||||
defer func() {
|
||||
if prerr != nil {
|
||||
for _, req := range withdrawals {
|
||||
req.Complete(cid.Undef, prerr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Get the net available balance
|
||||
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
netAvail := types.BigSub(avail, a.state.AmtReserved)
|
||||
|
||||
// Fit as many withdrawals as possible into the available balance, and fail
|
||||
// the rest
|
||||
withdrawalAmt := abi.NewTokenAmount(0)
|
||||
allowedAmt := abi.NewTokenAmount(0)
|
||||
allowed := make([]*fundRequest, 0, len(a.withdrawals))
|
||||
var batchWallet address.Address
|
||||
for _, req := range a.withdrawals {
|
||||
amt := req.Amount()
|
||||
if amt.IsZero() {
|
||||
// If the context for the request was cancelled, bail out
|
||||
req.Complete(cid.Undef, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// If the amount would exceed the available amount, complete the
|
||||
// request with an error
|
||||
newWithdrawalAmt := types.BigAdd(withdrawalAmt, amt)
|
||||
if newWithdrawalAmt.GreaterThan(netAvail) {
|
||||
err := xerrors.Errorf("insufficient funds for withdrawal of %d", amt)
|
||||
a.debugf("%s", err)
|
||||
req.Complete(cid.Undef, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// If this is the first allowed withdrawal request in this batch, save
|
||||
// its wallet address
|
||||
if batchWallet == address.Undef {
|
||||
batchWallet = req.Wallet
|
||||
}
|
||||
// If the request wallet doesn't match the batch wallet, bail out
|
||||
// (the withdrawal will be processed after the current batch has
|
||||
// completed)
|
||||
if req.Wallet != batchWallet {
|
||||
continue
|
||||
}
|
||||
|
||||
// Include this withdrawal request in the batch
|
||||
withdrawalAmt = newWithdrawalAmt
|
||||
a.debugf("withdraw %d", amt)
|
||||
allowed = append(allowed, req)
|
||||
allowedAmt = types.BigAdd(allowedAmt, amt)
|
||||
}
|
||||
|
||||
// Check if there is anything to withdraw.
|
||||
// Note that if the context for a request is cancelled,
|
||||
// req.Amount() returns zero
|
||||
if allowedAmt.Equals(abi.NewTokenAmount(0)) {
|
||||
// Mark allowed requests as complete
|
||||
for _, req := range allowed {
|
||||
req.Complete(cid.Undef, nil)
|
||||
}
|
||||
return cid.Undef, nil
|
||||
}
|
||||
|
||||
// Withdraw funds
|
||||
a.debugf("withdraw funds %d", allowedAmt)
|
||||
withdrawFundsCid, err := a.env.WithdrawFunds(a.ctx, allowed[0].Wallet, a.state.Addr, allowedAmt)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
// Mark allowed requests as complete
|
||||
for _, req := range allowed {
|
||||
req.Complete(withdrawFundsCid, nil)
|
||||
}
|
||||
|
||||
// Save the message CID to state
|
||||
return withdrawFundsCid, nil
|
||||
}
|
||||
|
||||
// asynchonously wait for results of message
|
||||
func (a *fundedAddress) startWaitForResults(msgCid cid.Cid) {
|
||||
go func() {
|
||||
err := a.env.WaitMsg(a.ctx, msgCid)
|
||||
if err != nil {
|
||||
// We don't really care about the results here, we're just waiting
|
||||
// so as to only process one on-chain message at a time
|
||||
log.Errorf("waiting for results of message %s for addr %s: %w", msgCid, a.state.Addr, err)
|
||||
}
|
||||
|
||||
a.lk.Lock()
|
||||
a.debugf("complete wait")
|
||||
a.clearWaitState()
|
||||
a.lk.Unlock()
|
||||
|
||||
a.process()
|
||||
}()
|
||||
}
|
||||
|
||||
func (a *fundedAddress) debugf(args ...interface{}) {
|
||||
fmtStr := args[0].(string)
|
||||
args = args[1:]
|
||||
log.Debugf(a.state.Addr.String()+": "+fmtStr, args...)
|
||||
}
|
||||
|
||||
// The result of a fund request
|
||||
type reqResult struct {
|
||||
msgCid cid.Cid
|
||||
err error
|
||||
}
|
||||
|
||||
// A request to change funds
|
||||
type fundRequest struct {
|
||||
ctx context.Context
|
||||
amt abi.TokenAmount
|
||||
completed chan struct{}
|
||||
Wallet address.Address
|
||||
Result chan reqResult
|
||||
}
|
||||
|
||||
func newFundRequest(ctx context.Context, wallet address.Address, amt abi.TokenAmount) *fundRequest {
|
||||
return &fundRequest{
|
||||
ctx: ctx,
|
||||
amt: amt,
|
||||
Wallet: wallet,
|
||||
Result: make(chan reqResult),
|
||||
completed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Amount returns zero if the context has expired
|
||||
func (frp *fundRequest) Amount() abi.TokenAmount {
|
||||
if frp.ctx.Err() != nil {
|
||||
return abi.NewTokenAmount(0)
|
||||
}
|
||||
return frp.amt
|
||||
}
|
||||
|
||||
// Complete is called with the message CID when the funds request has been
|
||||
// started or with the error if there was an error
|
||||
func (frp *fundRequest) Complete(msgCid cid.Cid, err error) {
|
||||
select {
|
||||
case <-frp.completed:
|
||||
case <-frp.ctx.Done():
|
||||
case frp.Result <- reqResult{msgCid: msgCid, err: err}:
|
||||
}
|
||||
close(frp.completed)
|
||||
}
|
||||
|
||||
// Completed indicates if Complete has already been called
|
||||
func (frp *fundRequest) Completed() bool {
|
||||
select {
|
||||
case <-frp.completed:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// fundManagerEnvironment simplifies some API calls
|
||||
type fundManagerEnvironment struct {
|
||||
api fundManagerAPI
|
||||
}
|
||||
|
||||
func (env *fundManagerEnvironment) AvailableFunds(ctx context.Context, addr address.Address) (abi.TokenAmount, error) {
|
||||
bal, err := env.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return abi.NewTokenAmount(0), err
|
||||
}
|
||||
|
||||
return types.BigSub(bal.Escrow, bal.Locked), nil
|
||||
}
|
||||
|
||||
func (env *fundManagerEnvironment) AddFunds(
|
||||
ctx context.Context,
|
||||
wallet address.Address,
|
||||
addr address.Address,
|
||||
amt abi.TokenAmount,
|
||||
) (cid.Cid, error) {
|
||||
params, err := actors.SerializeParams(&addr)
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{
|
||||
To: market.Address,
|
||||
From: wallet,
|
||||
Value: amt,
|
||||
Method: market.Methods.AddBalance,
|
||||
Params: params,
|
||||
}, nil)
|
||||
|
||||
if aerr != nil {
|
||||
return cid.Undef, aerr
|
||||
}
|
||||
|
||||
return smsg.Cid(), nil
|
||||
}
|
||||
|
||||
func (env *fundManagerEnvironment) WithdrawFunds(
|
||||
ctx context.Context,
|
||||
wallet address.Address,
|
||||
addr address.Address,
|
||||
amt abi.TokenAmount,
|
||||
) (cid.Cid, error) {
|
||||
params, err := actors.SerializeParams(&market.WithdrawBalanceParams{
|
||||
ProviderOrClientAddress: addr,
|
||||
Amount: amt,
|
||||
})
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("serializing params: %w", err)
|
||||
}
|
||||
|
||||
smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{
|
||||
To: market.Address,
|
||||
From: wallet,
|
||||
Value: types.NewInt(0),
|
||||
Method: market.Methods.WithdrawBalance,
|
||||
Params: params,
|
||||
}, nil)
|
||||
|
||||
if aerr != nil {
|
||||
return cid.Undef, aerr
|
||||
}
|
||||
|
||||
return smsg.Cid(), nil
|
||||
}
|
||||
|
||||
func (env *fundManagerEnvironment) WaitMsg(ctx context.Context, c cid.Cid) error {
|
||||
_, err := env.api.StateWaitMsg(ctx, c, build.MessageConfidence)
|
||||
return err
|
||||
}
|
764
chain/market/fundmanager_test.go
Normal file
764
chain/market/fundmanager_test.go
Normal file
@ -0,0 +1,764 @@
|
||||
package market
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
ds_sync "github.com/ipfs/go-datastore/sync"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// TestFundManagerBasic verifies that the basic fund manager operations work
|
||||
func TestFundManagerBasic(t *testing.T) {
|
||||
s := setup(t)
|
||||
defer s.fm.Stop()
|
||||
|
||||
// Reserve 10
|
||||
// balance: 0 -> 10
|
||||
// reserved: 0 -> 10
|
||||
amt := abi.NewTokenAmount(10)
|
||||
sentinel, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := s.mockApi.getSentMessage(sentinel)
|
||||
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
|
||||
|
||||
s.mockApi.completeMsg(sentinel)
|
||||
|
||||
// Reserve 7
|
||||
// balance: 10 -> 17
|
||||
// reserved: 10 -> 17
|
||||
amt = abi.NewTokenAmount(7)
|
||||
sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg = s.mockApi.getSentMessage(sentinel)
|
||||
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
|
||||
|
||||
s.mockApi.completeMsg(sentinel)
|
||||
|
||||
// Release 5
|
||||
// balance: 17
|
||||
// reserved: 17 -> 12
|
||||
amt = abi.NewTokenAmount(5)
|
||||
err = s.fm.Release(s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Withdraw 2
|
||||
// balance: 17 -> 15
|
||||
// reserved: 12
|
||||
amt = abi.NewTokenAmount(2)
|
||||
sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg = s.mockApi.getSentMessage(sentinel)
|
||||
checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
|
||||
|
||||
s.mockApi.completeMsg(sentinel)
|
||||
|
||||
// Reserve 3
|
||||
// balance: 15
|
||||
// reserved: 12 -> 15
|
||||
// Note: reserved (15) is <= balance (15) so should not send on-chain
|
||||
// message
|
||||
msgCount := s.mockApi.messageCount()
|
||||
amt = abi.NewTokenAmount(3)
|
||||
sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msgCount, s.mockApi.messageCount())
|
||||
require.Equal(t, sentinel, cid.Undef)
|
||||
|
||||
// Reserve 1
|
||||
// balance: 15 -> 16
|
||||
// reserved: 15 -> 16
|
||||
// Note: reserved (16) is above balance (15) so *should* send on-chain
|
||||
// message to top up balance
|
||||
amt = abi.NewTokenAmount(1)
|
||||
topUp := abi.NewTokenAmount(1)
|
||||
sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.mockApi.completeMsg(sentinel)
|
||||
msg = s.mockApi.getSentMessage(sentinel)
|
||||
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp)
|
||||
|
||||
// Withdraw 1
|
||||
// balance: 16
|
||||
// reserved: 16
|
||||
// Note: Expect failure because there is no available balance to withdraw:
|
||||
// balance - reserved = 16 - 16 = 0
|
||||
amt = abi.NewTokenAmount(1)
|
||||
sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
// TestFundManagerParallel verifies that operations can be run in parallel
|
||||
func TestFundManagerParallel(t *testing.T) {
|
||||
s := setup(t)
|
||||
defer s.fm.Stop()
|
||||
|
||||
// Reserve 10
|
||||
amt := abi.NewTokenAmount(10)
|
||||
sentinelReserve10, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait until all the subsequent requests are queued up
|
||||
queueReady := make(chan struct{})
|
||||
fa := s.fm.getFundedAddress(s.acctAddr)
|
||||
fa.onProcessStart(func() bool {
|
||||
if len(fa.withdrawals) == 1 && len(fa.reservations) == 2 && len(fa.releases) == 1 {
|
||||
close(queueReady)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
// Withdraw 5 (should not run until after reserves / releases)
|
||||
withdrawReady := make(chan error)
|
||||
go func() {
|
||||
amt = abi.NewTokenAmount(5)
|
||||
_, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
withdrawReady <- err
|
||||
}()
|
||||
|
||||
reserveSentinels := make(chan cid.Cid)
|
||||
|
||||
// Reserve 3
|
||||
go func() {
|
||||
amt := abi.NewTokenAmount(3)
|
||||
sentinelReserve3, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
reserveSentinels <- sentinelReserve3
|
||||
}()
|
||||
|
||||
// Reserve 5
|
||||
go func() {
|
||||
amt := abi.NewTokenAmount(5)
|
||||
sentinelReserve5, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
reserveSentinels <- sentinelReserve5
|
||||
}()
|
||||
|
||||
// Release 2
|
||||
go func() {
|
||||
amt := abi.NewTokenAmount(2)
|
||||
err = s.fm.Release(s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Everything is queued up
|
||||
<-queueReady
|
||||
|
||||
// Complete the "Reserve 10" message
|
||||
s.mockApi.completeMsg(sentinelReserve10)
|
||||
msg := s.mockApi.getSentMessage(sentinelReserve10)
|
||||
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10))
|
||||
|
||||
// The other requests should now be combined and be submitted on-chain as
|
||||
// a single message
|
||||
rs1 := <-reserveSentinels
|
||||
rs2 := <-reserveSentinels
|
||||
require.Equal(t, rs1, rs2)
|
||||
|
||||
// Withdraw should not have been called yet, because reserve / release
|
||||
// requests run first
|
||||
select {
|
||||
case <-withdrawReady:
|
||||
require.Fail(t, "Withdraw should run after reserve / release")
|
||||
default:
|
||||
}
|
||||
|
||||
// Complete the message
|
||||
s.mockApi.completeMsg(rs1)
|
||||
msg = s.mockApi.getSentMessage(rs1)
|
||||
|
||||
// "Reserve 3" +3
|
||||
// "Reserve 5" +5
|
||||
// "Release 2" -2
|
||||
// Result: 6
|
||||
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(6))
|
||||
|
||||
// Expect withdraw to fail because not enough available funds
|
||||
err = <-withdrawReady
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
// TestFundManagerReserveByWallet verifies that reserve requests are grouped by wallet
|
||||
func TestFundManagerReserveByWallet(t *testing.T) {
|
||||
s := setup(t)
|
||||
defer s.fm.Stop()
|
||||
|
||||
walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
require.NoError(t, err)
|
||||
walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait until all the reservation requests are queued up
|
||||
walletAQueuedUp := make(chan struct{})
|
||||
queueReady := make(chan struct{})
|
||||
fa := s.fm.getFundedAddress(s.acctAddr)
|
||||
fa.onProcessStart(func() bool {
|
||||
if len(fa.reservations) == 1 {
|
||||
close(walletAQueuedUp)
|
||||
}
|
||||
if len(fa.reservations) == 3 {
|
||||
close(queueReady)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
type reserveResult struct {
|
||||
ws cid.Cid
|
||||
err error
|
||||
}
|
||||
results := make(chan *reserveResult)
|
||||
|
||||
amtA1 := abi.NewTokenAmount(1)
|
||||
go func() {
|
||||
// Wallet A: Reserve 1
|
||||
sentinelA1, err := s.fm.Reserve(s.ctx, walletAddrA, s.acctAddr, amtA1)
|
||||
results <- &reserveResult{
|
||||
ws: sentinelA1,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
amtB1 := abi.NewTokenAmount(2)
|
||||
amtB2 := abi.NewTokenAmount(3)
|
||||
go func() {
|
||||
// Wait for reservation for wallet A to be queued up
|
||||
<-walletAQueuedUp
|
||||
|
||||
// Wallet B: Reserve 2
|
||||
go func() {
|
||||
sentinelB1, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB1)
|
||||
results <- &reserveResult{
|
||||
ws: sentinelB1,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
// Wallet B: Reserve 3
|
||||
sentinelB2, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB2)
|
||||
results <- &reserveResult{
|
||||
ws: sentinelB2,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
// All reservation requests are queued up
|
||||
<-queueReady
|
||||
|
||||
resA := <-results
|
||||
sentinelA1 := resA.ws
|
||||
|
||||
// Should send to wallet A
|
||||
msg := s.mockApi.getSentMessage(sentinelA1)
|
||||
checkAddMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)
|
||||
|
||||
// Complete wallet A message
|
||||
s.mockApi.completeMsg(sentinelA1)
|
||||
|
||||
resB1 := <-results
|
||||
resB2 := <-results
|
||||
require.NoError(t, resB1.err)
|
||||
require.NoError(t, resB2.err)
|
||||
sentinelB1 := resB1.ws
|
||||
sentinelB2 := resB2.ws
|
||||
|
||||
// Should send different message to wallet B
|
||||
require.NotEqual(t, sentinelA1, sentinelB1)
|
||||
// Should be single message combining amount 1 and 2
|
||||
require.Equal(t, sentinelB1, sentinelB2)
|
||||
msg = s.mockApi.getSentMessage(sentinelB1)
|
||||
checkAddMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2))
|
||||
}
|
||||
|
||||
// TestFundManagerWithdrawal verifies that as many withdraw operations as
|
||||
// possible are processed
|
||||
func TestFundManagerWithdrawalLimit(t *testing.T) {
|
||||
s := setup(t)
|
||||
defer s.fm.Stop()
|
||||
|
||||
// Reserve 10
|
||||
amt := abi.NewTokenAmount(10)
|
||||
sentinelReserve10, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Complete the "Reserve 10" message
|
||||
s.mockApi.completeMsg(sentinelReserve10)
|
||||
|
||||
// Release 10
|
||||
err = s.fm.Release(s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Queue up withdraw requests
|
||||
queueReady := make(chan struct{})
|
||||
fa := s.fm.getFundedAddress(s.acctAddr)
|
||||
withdrawalReqTotal := 3
|
||||
withdrawalReqEnqueued := 0
|
||||
withdrawalReqQueue := make(chan func(), withdrawalReqTotal)
|
||||
fa.onProcessStart(func() bool {
|
||||
// If a new withdrawal request was enqueued
|
||||
if len(fa.withdrawals) > withdrawalReqEnqueued {
|
||||
withdrawalReqEnqueued++
|
||||
|
||||
// Pop the next request and run it
|
||||
select {
|
||||
case fn := <-withdrawalReqQueue:
|
||||
go fn()
|
||||
default:
|
||||
}
|
||||
}
|
||||
// Once all the requests have arrived, we're ready to process the queue
|
||||
if withdrawalReqEnqueued == withdrawalReqTotal {
|
||||
close(queueReady)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
type withdrawResult struct {
|
||||
reqIndex int
|
||||
ws cid.Cid
|
||||
err error
|
||||
}
|
||||
withdrawRes := make(chan *withdrawResult)
|
||||
|
||||
// Queue up three "Withdraw 5" requests
|
||||
enqueuedCount := 0
|
||||
for i := 0; i < withdrawalReqTotal; i++ {
|
||||
withdrawalReqQueue <- func() {
|
||||
idx := enqueuedCount
|
||||
enqueuedCount++
|
||||
|
||||
amt := abi.NewTokenAmount(5)
|
||||
ws, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
withdrawRes <- &withdrawResult{reqIndex: idx, ws: ws, err: err}
|
||||
}
|
||||
}
|
||||
// Start the first request
|
||||
fn := <-withdrawalReqQueue
|
||||
go fn()
|
||||
|
||||
// All withdrawal requests are queued up and ready to be processed
|
||||
<-queueReady
|
||||
|
||||
// Organize results in request order
|
||||
results := make([]*withdrawResult, withdrawalReqTotal)
|
||||
for i := 0; i < 3; i++ {
|
||||
res := <-withdrawRes
|
||||
results[res.reqIndex] = res
|
||||
}
|
||||
|
||||
// Available 10
|
||||
// Withdraw 5
|
||||
// Expect Success
|
||||
require.NoError(t, results[0].err)
|
||||
// Available 5
|
||||
// Withdraw 5
|
||||
// Expect Success
|
||||
require.NoError(t, results[1].err)
|
||||
// Available 0
|
||||
// Withdraw 5
|
||||
// Expect FAIL
|
||||
require.Error(t, results[2].err)
|
||||
|
||||
// Expect withdrawal requests that fit under reserved amount to be combined
|
||||
// into a single message on-chain
|
||||
require.Equal(t, results[0].ws, results[1].ws)
|
||||
}
|
||||
|
||||
// TestFundManagerWithdrawByWallet verifies that withdraw requests are grouped by wallet
|
||||
func TestFundManagerWithdrawByWallet(t *testing.T) {
|
||||
s := setup(t)
|
||||
defer s.fm.Stop()
|
||||
|
||||
walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
require.NoError(t, err)
|
||||
walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Reserve 10
|
||||
reserveAmt := abi.NewTokenAmount(10)
|
||||
sentinelReserve, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, reserveAmt)
|
||||
require.NoError(t, err)
|
||||
s.mockApi.completeMsg(sentinelReserve)
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Release 10
|
||||
err = s.fm.Release(s.acctAddr, reserveAmt)
|
||||
require.NoError(t, err)
|
||||
|
||||
type withdrawResult struct {
|
||||
ws cid.Cid
|
||||
err error
|
||||
}
|
||||
results := make(chan *withdrawResult)
|
||||
|
||||
// Wait until withdrawals are queued up
|
||||
walletAQueuedUp := make(chan struct{})
|
||||
queueReady := make(chan struct{})
|
||||
withdrawalCount := 0
|
||||
fa := s.fm.getFundedAddress(s.acctAddr)
|
||||
fa.onProcessStart(func() bool {
|
||||
if len(fa.withdrawals) == withdrawalCount {
|
||||
return false
|
||||
}
|
||||
withdrawalCount = len(fa.withdrawals)
|
||||
|
||||
if withdrawalCount == 1 {
|
||||
close(walletAQueuedUp)
|
||||
} else if withdrawalCount == 3 {
|
||||
close(queueReady)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
amtA1 := abi.NewTokenAmount(1)
|
||||
go func() {
|
||||
// Wallet A: Withdraw 1
|
||||
sentinelA1, err := s.fm.Withdraw(s.ctx, walletAddrA, s.acctAddr, amtA1)
|
||||
results <- &withdrawResult{
|
||||
ws: sentinelA1,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
amtB1 := abi.NewTokenAmount(2)
|
||||
amtB2 := abi.NewTokenAmount(3)
|
||||
go func() {
|
||||
// Wait until withdraw for wallet A is queued up
|
||||
<-walletAQueuedUp
|
||||
|
||||
// Wallet B: Withdraw 2
|
||||
go func() {
|
||||
sentinelB1, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB1)
|
||||
results <- &withdrawResult{
|
||||
ws: sentinelB1,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
// Wallet B: Withdraw 3
|
||||
sentinelB2, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB2)
|
||||
results <- &withdrawResult{
|
||||
ws: sentinelB2,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
// Withdrawals are queued up
|
||||
<-queueReady
|
||||
|
||||
// Should withdraw from wallet A first
|
||||
resA1 := <-results
|
||||
sentinelA1 := resA1.ws
|
||||
msg := s.mockApi.getSentMessage(sentinelA1)
|
||||
checkWithdrawMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)
|
||||
|
||||
// Complete wallet A message
|
||||
s.mockApi.completeMsg(sentinelA1)
|
||||
|
||||
resB1 := <-results
|
||||
resB2 := <-results
|
||||
require.NoError(t, resB1.err)
|
||||
require.NoError(t, resB2.err)
|
||||
sentinelB1 := resB1.ws
|
||||
sentinelB2 := resB2.ws
|
||||
|
||||
// Should send different message for wallet B from wallet A
|
||||
require.NotEqual(t, sentinelA1, sentinelB1)
|
||||
// Should be single message combining amount 1 and 2
|
||||
require.Equal(t, sentinelB1, sentinelB2)
|
||||
msg = s.mockApi.getSentMessage(sentinelB1)
|
||||
checkWithdrawMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2))
|
||||
}
|
||||
|
||||
// TestFundManagerRestart verifies that waiting for incomplete requests resumes
|
||||
// on restart
|
||||
func TestFundManagerRestart(t *testing.T) {
|
||||
s := setup(t)
|
||||
defer s.fm.Stop()
|
||||
|
||||
acctAddr2 := tutils.NewActorAddr(t, "addr2")
|
||||
|
||||
// Address 1: Reserve 10
|
||||
amt := abi.NewTokenAmount(10)
|
||||
sentinelAddr1, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg := s.mockApi.getSentMessage(sentinelAddr1)
|
||||
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
|
||||
|
||||
// Address 2: Reserve 7
|
||||
amt2 := abi.NewTokenAmount(7)
|
||||
sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, s.walletAddr, acctAddr2, amt2)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg2 := s.mockApi.getSentMessage(sentinelAddr2Res7)
|
||||
checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2)
|
||||
|
||||
// Complete "Address 1: Reserve 10"
|
||||
s.mockApi.completeMsg(sentinelAddr1)
|
||||
|
||||
// Give the completed state a moment to be stored before restart
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
|
||||
// Restart
|
||||
mockApiAfter := s.mockApi
|
||||
fmAfter := NewFundManager(mockApiAfter, s.ds)
|
||||
err = fmAfter.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
amt3 := abi.NewTokenAmount(9)
|
||||
reserveSentinel := make(chan cid.Cid)
|
||||
go func() {
|
||||
// Address 2: Reserve 9
|
||||
sentinel3, err := fmAfter.Reserve(s.ctx, s.walletAddr, acctAddr2, amt3)
|
||||
require.NoError(t, err)
|
||||
reserveSentinel <- sentinel3
|
||||
}()
|
||||
|
||||
// Expect no message to be sent, because still waiting for previous
|
||||
// message "Address 2: Reserve 7" to complete on-chain
|
||||
select {
|
||||
case <-reserveSentinel:
|
||||
require.Fail(t, "Expected no message to be sent")
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
}
|
||||
|
||||
// Complete "Address 2: Reserve 7"
|
||||
mockApiAfter.completeMsg(sentinelAddr2Res7)
|
||||
|
||||
// Expect waiting message to now be sent
|
||||
sentinel3 := <-reserveSentinel
|
||||
msg3 := mockApiAfter.getSentMessage(sentinel3)
|
||||
checkAddMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3)
|
||||
}
|
||||
|
||||
type scaffold struct {
|
||||
ctx context.Context
|
||||
ds *ds_sync.MutexDatastore
|
||||
wllt *wallet.LocalWallet
|
||||
walletAddr address.Address
|
||||
acctAddr address.Address
|
||||
mockApi *mockFundManagerAPI
|
||||
fm *FundManager
|
||||
}
|
||||
|
||||
func setup(t *testing.T) *scaffold {
|
||||
ctx := context.Background()
|
||||
|
||||
wllt, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
walletAddr, err := wllt.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
acctAddr := tutils.NewActorAddr(t, "addr")
|
||||
|
||||
mockApi := newMockFundManagerAPI(walletAddr)
|
||||
dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
|
||||
fm := NewFundManager(mockApi, dstore)
|
||||
return &scaffold{
|
||||
ctx: ctx,
|
||||
ds: dstore,
|
||||
wllt: wllt,
|
||||
walletAddr: walletAddr,
|
||||
acctAddr: acctAddr,
|
||||
mockApi: mockApi,
|
||||
fm: fm,
|
||||
}
|
||||
}
|
||||
|
||||
func checkAddMessageFields(t *testing.T, msg *types.Message, from address.Address, to address.Address, amt abi.TokenAmount) {
|
||||
require.Equal(t, from, msg.From)
|
||||
require.Equal(t, market.Address, msg.To)
|
||||
require.Equal(t, amt, msg.Value)
|
||||
|
||||
var paramsTo address.Address
|
||||
err := paramsTo.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, to, paramsTo)
|
||||
}
|
||||
|
||||
func checkWithdrawMessageFields(t *testing.T, msg *types.Message, from address.Address, addr address.Address, amt abi.TokenAmount) {
|
||||
require.Equal(t, from, msg.From)
|
||||
require.Equal(t, market.Address, msg.To)
|
||||
require.Equal(t, abi.NewTokenAmount(0), msg.Value)
|
||||
|
||||
var params market.WithdrawBalanceParams
|
||||
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, addr, params.ProviderOrClientAddress)
|
||||
require.Equal(t, amt, params.Amount)
|
||||
}
|
||||
|
||||
type sentMsg struct {
|
||||
msg *types.SignedMessage
|
||||
ready chan struct{}
|
||||
}
|
||||
|
||||
type mockFundManagerAPI struct {
|
||||
wallet address.Address
|
||||
|
||||
lk sync.Mutex
|
||||
escrow map[address.Address]abi.TokenAmount
|
||||
sentMsgs map[cid.Cid]*sentMsg
|
||||
completedMsgs map[cid.Cid]struct{}
|
||||
waitingFor map[cid.Cid]chan struct{}
|
||||
}
|
||||
|
||||
func newMockFundManagerAPI(wallet address.Address) *mockFundManagerAPI {
|
||||
return &mockFundManagerAPI{
|
||||
wallet: wallet,
|
||||
escrow: make(map[address.Address]abi.TokenAmount),
|
||||
sentMsgs: make(map[cid.Cid]*sentMsg),
|
||||
completedMsgs: make(map[cid.Cid]struct{}),
|
||||
waitingFor: make(map[cid.Cid]chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (mapi *mockFundManagerAPI) MpoolPushMessage(ctx context.Context, message *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||
mapi.lk.Lock()
|
||||
defer mapi.lk.Unlock()
|
||||
|
||||
smsg := &types.SignedMessage{Message: *message}
|
||||
mapi.sentMsgs[smsg.Cid()] = &sentMsg{msg: smsg, ready: make(chan struct{})}
|
||||
|
||||
return smsg, nil
|
||||
}
|
||||
|
||||
func (mapi *mockFundManagerAPI) getSentMessage(c cid.Cid) *types.Message {
|
||||
mapi.lk.Lock()
|
||||
defer mapi.lk.Unlock()
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
if pending, ok := mapi.sentMsgs[c]; ok {
|
||||
return &pending.msg.Message
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
panic("expected message to be sent")
|
||||
}
|
||||
|
||||
func (mapi *mockFundManagerAPI) messageCount() int {
|
||||
mapi.lk.Lock()
|
||||
defer mapi.lk.Unlock()
|
||||
|
||||
return len(mapi.sentMsgs)
|
||||
}
|
||||
|
||||
func (mapi *mockFundManagerAPI) completeMsg(msgCid cid.Cid) {
|
||||
mapi.lk.Lock()
|
||||
|
||||
pmsg, ok := mapi.sentMsgs[msgCid]
|
||||
if ok {
|
||||
if pmsg.msg.Message.Method == market.Methods.AddBalance {
|
||||
var escrowAcct address.Address
|
||||
err := escrowAcct.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
escrow := mapi.getEscrow(escrowAcct)
|
||||
before := escrow
|
||||
escrow = types.BigAdd(escrow, pmsg.msg.Message.Value)
|
||||
mapi.escrow[escrowAcct] = escrow
|
||||
log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow)
|
||||
} else {
|
||||
var params market.WithdrawBalanceParams
|
||||
err := params.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
escrowAcct := params.ProviderOrClientAddress
|
||||
|
||||
escrow := mapi.getEscrow(escrowAcct)
|
||||
before := escrow
|
||||
escrow = types.BigSub(escrow, params.Amount)
|
||||
mapi.escrow[escrowAcct] = escrow
|
||||
log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow)
|
||||
}
|
||||
}
|
||||
|
||||
mapi.completedMsgs[msgCid] = struct{}{}
|
||||
|
||||
ready, ok := mapi.waitingFor[msgCid]
|
||||
|
||||
mapi.lk.Unlock()
|
||||
|
||||
if ok {
|
||||
close(ready)
|
||||
}
|
||||
}
|
||||
|
||||
func (mapi *mockFundManagerAPI) StateMarketBalance(ctx context.Context, a address.Address, key types.TipSetKey) (api.MarketBalance, error) {
|
||||
mapi.lk.Lock()
|
||||
defer mapi.lk.Unlock()
|
||||
|
||||
return api.MarketBalance{
|
||||
Locked: abi.NewTokenAmount(0),
|
||||
Escrow: mapi.getEscrow(a),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mapi *mockFundManagerAPI) getEscrow(a address.Address) abi.TokenAmount {
|
||||
escrow := mapi.escrow[a]
|
||||
if escrow.Nil() {
|
||||
return abi.NewTokenAmount(0)
|
||||
}
|
||||
return escrow
|
||||
}
|
||||
|
||||
func (mapi *mockFundManagerAPI) StateWaitMsg(ctx context.Context, c cid.Cid, confidence uint64) (*api.MsgLookup, error) {
|
||||
res := &api.MsgLookup{
|
||||
Message: c,
|
||||
Receipt: types.MessageReceipt{
|
||||
ExitCode: 0,
|
||||
Return: nil,
|
||||
},
|
||||
}
|
||||
ready := make(chan struct{})
|
||||
|
||||
mapi.lk.Lock()
|
||||
_, ok := mapi.completedMsgs[c]
|
||||
if !ok {
|
||||
mapi.waitingFor[c] = ready
|
||||
}
|
||||
mapi.lk.Unlock()
|
||||
|
||||
if !ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-ready:
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
@ -8,7 +8,6 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -20,8 +19,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
)
|
||||
|
||||
var log = logging.Logger("market_adapter")
|
||||
|
||||
// API is the dependencies need to run a fund manager
|
||||
type API struct {
|
||||
fx.In
|
||||
|
73
chain/market/store.go
Normal file
73
chain/market/store.go
Normal file
@ -0,0 +1,73 @@
|
||||
package market
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
cborrpc "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
dsq "github.com/ipfs/go-datastore/query"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
|
||||
const dsKeyAddr = "Addr"
|
||||
|
||||
type Store struct {
|
||||
ds datastore.Batching
|
||||
}
|
||||
|
||||
func newStore(ds dtypes.MetadataDS) *Store {
|
||||
ds = namespace.Wrap(ds, datastore.NewKey("/fundmgr/"))
|
||||
return &Store{
|
||||
ds: ds,
|
||||
}
|
||||
}
|
||||
|
||||
// save the state to the datastore
|
||||
func (ps *Store) save(state *FundedAddressState) error {
|
||||
k := dskeyForAddr(state.Addr)
|
||||
|
||||
b, err := cborrpc.Dump(state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ps.ds.Put(k, b)
|
||||
}
|
||||
|
||||
// forEach calls iter with each address in the datastore
|
||||
func (ps *Store) forEach(iter func(*FundedAddressState)) error {
|
||||
res, err := ps.ds.Query(dsq.Query{Prefix: dsKeyAddr})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Close() //nolint:errcheck
|
||||
|
||||
for {
|
||||
res, ok := res.NextSync()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
if res.Error != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var stored FundedAddressState
|
||||
if err := stored.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
iter(&stored)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// The datastore key used to identify the address state
|
||||
func dskeyForAddr(addr address.Address) datastore.Key {
|
||||
return datastore.KeyWithNamespaces([]string{dsKeyAddr, addr.String()})
|
||||
}
|
10
gen/main.go
10
gen/main.go
@ -4,6 +4,8 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/market"
|
||||
|
||||
gen "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -67,6 +69,14 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
err = gen.WriteTupleEncodersToFile("./chain/market/cbor_gen.go", "market",
|
||||
market.FundedAddressState{},
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange",
|
||||
exchange.Request{},
|
||||
exchange.Response{},
|
||||
|
Loading…
Reference in New Issue
Block a user