727 lines
20 KiB
Go
727 lines
20 KiB
Go
package market
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/node/impl/full"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-datastore"
|
|
logging "github.com/ipfs/go-log"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
)
|
|
|
|
var log = logging.Logger("market_adapter")
|
|
|
|
// API is the fx dependencies need to run a fund manager
|
|
type FundManagerAPI struct {
|
|
fx.In
|
|
|
|
full.StateAPI
|
|
full.MpoolAPI
|
|
}
|
|
|
|
// fundManagerAPI is the specific methods called by the FundManager
|
|
// (used by the tests)
|
|
type fundManagerAPI interface {
|
|
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
|
|
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
|
|
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error)
|
|
}
|
|
|
|
// FundManager keeps track of funds in a set of addresses
|
|
type FundManager struct {
|
|
ctx context.Context
|
|
shutdown context.CancelFunc
|
|
api fundManagerAPI
|
|
str *Store
|
|
|
|
lk sync.Mutex
|
|
fundedAddrs map[address.Address]*fundedAddress
|
|
}
|
|
|
|
func NewFundManager(lc fx.Lifecycle, api FundManagerAPI, ds dtypes.MetadataDS) *FundManager {
|
|
fm := newFundManager(&api, ds)
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
return fm.Start()
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
fm.Stop()
|
|
return nil
|
|
},
|
|
})
|
|
return fm
|
|
}
|
|
|
|
// newFundManager is used by the tests
|
|
func newFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &FundManager{
|
|
ctx: ctx,
|
|
shutdown: cancel,
|
|
api: api,
|
|
str: newStore(ds),
|
|
fundedAddrs: make(map[address.Address]*fundedAddress),
|
|
}
|
|
}
|
|
|
|
func (fm *FundManager) Stop() {
|
|
fm.shutdown()
|
|
}
|
|
|
|
func (fm *FundManager) Start() error {
|
|
fm.lk.Lock()
|
|
defer fm.lk.Unlock()
|
|
|
|
// TODO:
|
|
// To save memory:
|
|
// - in State() only load addresses with in-progress messages
|
|
// - load the others just-in-time from getFundedAddress
|
|
// - delete(fm.fundedAddrs, addr) when the queue has been processed
|
|
return fm.str.forEach(func(state *FundedAddressState) {
|
|
fa := newFundedAddress(fm, state.Addr)
|
|
fa.state = state
|
|
fm.fundedAddrs[fa.state.Addr] = fa
|
|
fa.start()
|
|
})
|
|
}
|
|
|
|
// Creates a fundedAddress if it doesn't already exist, and returns it
|
|
func (fm *FundManager) getFundedAddress(addr address.Address) *fundedAddress {
|
|
fm.lk.Lock()
|
|
defer fm.lk.Unlock()
|
|
|
|
fa, ok := fm.fundedAddrs[addr]
|
|
if !ok {
|
|
fa = newFundedAddress(fm, addr)
|
|
fm.fundedAddrs[addr] = fa
|
|
}
|
|
return fa
|
|
}
|
|
|
|
// Reserve adds amt to `reserved`. If there are not enough available funds for
|
|
// the address, submits a message on chain to top up available funds.
|
|
// Returns the cid of the message that was submitted on chain, or cid.Undef if
|
|
// the required funds were already available.
|
|
func (fm *FundManager) Reserve(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
|
return fm.getFundedAddress(addr).reserve(ctx, wallet, amt)
|
|
}
|
|
|
|
// Subtract from `reserved`.
|
|
func (fm *FundManager) Release(addr address.Address, amt abi.TokenAmount) error {
|
|
return fm.getFundedAddress(addr).release(amt)
|
|
}
|
|
|
|
// Withdraw unreserved funds. Only succeeds if there are enough unreserved
|
|
// funds for the address.
|
|
// Returns the cid of the message that was submitted on chain.
|
|
func (fm *FundManager) Withdraw(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
|
return fm.getFundedAddress(addr).withdraw(ctx, wallet, amt)
|
|
}
|
|
|
|
// GetReserved returns the amount that is currently reserved for the address
|
|
func (fm *FundManager) GetReserved(addr address.Address) abi.TokenAmount {
|
|
return fm.getFundedAddress(addr).getReserved()
|
|
}
|
|
|
|
// FundedAddressState keeps track of the state of an address with funds in the
|
|
// datastore
|
|
type FundedAddressState struct {
|
|
Addr address.Address
|
|
// AmtReserved is the amount that must be kept in the address (cannot be
|
|
// withdrawn)
|
|
AmtReserved abi.TokenAmount
|
|
// MsgCid is the cid of an in-progress on-chain message
|
|
MsgCid *cid.Cid
|
|
}
|
|
|
|
// fundedAddress keeps track of the state and request queues for a
|
|
// particular address
|
|
type fundedAddress struct {
|
|
ctx context.Context
|
|
env *fundManagerEnvironment
|
|
str *Store
|
|
|
|
lk sync.RWMutex
|
|
state *FundedAddressState
|
|
|
|
// Note: These request queues are ephemeral, they are not saved to store
|
|
reservations []*fundRequest
|
|
releases []*fundRequest
|
|
withdrawals []*fundRequest
|
|
|
|
// Used by the tests
|
|
onProcessStartListener func() bool
|
|
}
|
|
|
|
func newFundedAddress(fm *FundManager, addr address.Address) *fundedAddress {
|
|
return &fundedAddress{
|
|
ctx: fm.ctx,
|
|
env: &fundManagerEnvironment{api: fm.api},
|
|
str: fm.str,
|
|
state: &FundedAddressState{
|
|
Addr: addr,
|
|
AmtReserved: abi.NewTokenAmount(0),
|
|
},
|
|
}
|
|
}
|
|
|
|
// If there is an in-progress on-chain message, don't submit any more messages
|
|
// on chain until it completes
|
|
func (a *fundedAddress) start() {
|
|
a.lk.Lock()
|
|
defer a.lk.Unlock()
|
|
|
|
if a.state.MsgCid != nil {
|
|
a.debugf("restart: wait for %s", a.state.MsgCid)
|
|
a.startWaitForResults(*a.state.MsgCid)
|
|
}
|
|
}
|
|
|
|
func (a *fundedAddress) getReserved() abi.TokenAmount {
|
|
a.lk.RLock()
|
|
defer a.lk.RUnlock()
|
|
|
|
return a.state.AmtReserved
|
|
}
|
|
|
|
func (a *fundedAddress) reserve(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
|
return a.requestAndWait(ctx, wallet, amt, &a.reservations)
|
|
}
|
|
|
|
func (a *fundedAddress) release(amt abi.TokenAmount) error {
|
|
_, err := a.requestAndWait(context.Background(), address.Undef, amt, &a.releases)
|
|
return err
|
|
}
|
|
|
|
func (a *fundedAddress) withdraw(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
|
return a.requestAndWait(ctx, wallet, amt, &a.withdrawals)
|
|
}
|
|
|
|
func (a *fundedAddress) requestAndWait(ctx context.Context, wallet address.Address, amt abi.TokenAmount, reqs *[]*fundRequest) (cid.Cid, error) {
|
|
// Create a request and add it to the request queue
|
|
req := newFundRequest(ctx, wallet, amt)
|
|
|
|
a.lk.Lock()
|
|
*reqs = append(*reqs, req)
|
|
a.lk.Unlock()
|
|
|
|
// Process the queue
|
|
go a.process()
|
|
|
|
// Wait for the results
|
|
select {
|
|
case <-ctx.Done():
|
|
return cid.Undef, ctx.Err()
|
|
case r := <-req.Result:
|
|
return r.msgCid, r.err
|
|
}
|
|
}
|
|
|
|
// Used by the tests
|
|
func (a *fundedAddress) onProcessStart(fn func() bool) {
|
|
a.lk.Lock()
|
|
defer a.lk.Unlock()
|
|
|
|
a.onProcessStartListener = fn
|
|
}
|
|
|
|
// Process queued requests
|
|
func (a *fundedAddress) process() {
|
|
a.lk.Lock()
|
|
defer a.lk.Unlock()
|
|
|
|
// Used by the tests
|
|
if a.onProcessStartListener != nil {
|
|
done := a.onProcessStartListener()
|
|
if !done {
|
|
return
|
|
}
|
|
a.onProcessStartListener = nil
|
|
}
|
|
|
|
// Check if we're still waiting for the response to a message
|
|
if a.state.MsgCid != nil {
|
|
return
|
|
}
|
|
|
|
// Check if there's anything to do
|
|
haveReservations := len(a.reservations) > 0 || len(a.releases) > 0
|
|
haveWithdrawals := len(a.withdrawals) > 0
|
|
if !haveReservations && !haveWithdrawals {
|
|
return
|
|
}
|
|
|
|
// Process reservations / releases
|
|
if haveReservations {
|
|
res, err := a.processReservations(a.reservations, a.releases)
|
|
if err == nil {
|
|
a.applyStateChange(res.msgCid, res.amtReserved)
|
|
}
|
|
a.reservations = filterOutProcessedReqs(a.reservations)
|
|
a.releases = filterOutProcessedReqs(a.releases)
|
|
}
|
|
|
|
// If there was no message sent on chain by adding reservations, and all
|
|
// reservations have completed processing, process withdrawals
|
|
if haveWithdrawals && a.state.MsgCid == nil && len(a.reservations) == 0 {
|
|
withdrawalCid, err := a.processWithdrawals(a.withdrawals)
|
|
if err == nil && withdrawalCid != cid.Undef {
|
|
a.applyStateChange(&withdrawalCid, types.EmptyInt)
|
|
}
|
|
a.withdrawals = filterOutProcessedReqs(a.withdrawals)
|
|
}
|
|
|
|
// If a message was sent on-chain
|
|
if a.state.MsgCid != nil {
|
|
// Start waiting for results of message (async)
|
|
a.startWaitForResults(*a.state.MsgCid)
|
|
}
|
|
|
|
// Process any remaining queued requests
|
|
go a.process()
|
|
}
|
|
|
|
// Filter out completed requests
|
|
func filterOutProcessedReqs(reqs []*fundRequest) []*fundRequest {
|
|
filtered := make([]*fundRequest, 0, len(reqs))
|
|
for _, req := range reqs {
|
|
if !req.Completed() {
|
|
filtered = append(filtered, req)
|
|
}
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
// Apply the results of processing queues and save to the datastore
|
|
func (a *fundedAddress) applyStateChange(msgCid *cid.Cid, amtReserved abi.TokenAmount) {
|
|
a.state.MsgCid = msgCid
|
|
if !amtReserved.Nil() {
|
|
a.state.AmtReserved = amtReserved
|
|
}
|
|
a.saveState()
|
|
}
|
|
|
|
// Clear the pending message cid so that a new message can be sent
|
|
func (a *fundedAddress) clearWaitState() {
|
|
a.state.MsgCid = nil
|
|
a.saveState()
|
|
}
|
|
|
|
// Save state to datastore
|
|
func (a *fundedAddress) saveState() {
|
|
// Not much we can do if saving to the datastore fails, just log
|
|
err := a.str.save(a.state)
|
|
if err != nil {
|
|
log.Errorf("saving state to store for addr %s: %v", a.state.Addr, err)
|
|
}
|
|
}
|
|
|
|
// The result of processing the reservation / release queues
|
|
type processResult struct {
|
|
// Requests that completed without adding funds
|
|
covered []*fundRequest
|
|
// Requests that added funds
|
|
added []*fundRequest
|
|
|
|
// The new reserved amount
|
|
amtReserved abi.TokenAmount
|
|
// The message cid, if a message was submitted on-chain
|
|
msgCid *cid.Cid
|
|
}
|
|
|
|
// process reservations and releases, and return the resulting changes to state
|
|
func (a *fundedAddress) processReservations(reservations []*fundRequest, releases []*fundRequest) (pr *processResult, prerr error) {
|
|
// When the function returns
|
|
defer func() {
|
|
// If there's an error, mark all requests as errored
|
|
if prerr != nil {
|
|
for _, req := range append(reservations, releases...) {
|
|
req.Complete(cid.Undef, prerr)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Complete all release requests
|
|
for _, req := range releases {
|
|
req.Complete(cid.Undef, nil)
|
|
}
|
|
|
|
// Complete all requests that were covered by released amounts
|
|
for _, req := range pr.covered {
|
|
req.Complete(cid.Undef, nil)
|
|
}
|
|
|
|
// If a message was sent
|
|
if pr.msgCid != nil {
|
|
// Complete all add funds requests
|
|
for _, req := range pr.added {
|
|
req.Complete(*pr.msgCid, nil)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Split reservations into those that are covered by released amounts,
|
|
// and those to add to the reserved amount.
|
|
// Note that we process requests from the same wallet in batches. So some
|
|
// requests may not be included in covered if they don't match the first
|
|
// covered request's wallet. These will be processed on a subsequent
|
|
// invocation of processReservations.
|
|
toCancel, toAdd, reservedDelta := splitReservations(reservations, releases)
|
|
|
|
// Apply the reserved delta to the reserved amount
|
|
reserved := types.BigAdd(a.state.AmtReserved, reservedDelta)
|
|
if reserved.LessThan(abi.NewTokenAmount(0)) {
|
|
reserved = abi.NewTokenAmount(0)
|
|
}
|
|
res := &processResult{
|
|
amtReserved: reserved,
|
|
covered: toCancel,
|
|
}
|
|
|
|
// Work out the amount to add to the balance
|
|
amtToAdd := abi.NewTokenAmount(0)
|
|
if len(toAdd) > 0 && reserved.GreaterThan(abi.NewTokenAmount(0)) {
|
|
// Get available funds for address
|
|
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
// amount to add = new reserved amount - available
|
|
amtToAdd = types.BigSub(reserved, avail)
|
|
a.debugf("reserved %d - avail %d = to add %d", reserved, avail, amtToAdd)
|
|
}
|
|
|
|
// If there's nothing to add to the balance, bail out
|
|
if amtToAdd.LessThanEqual(abi.NewTokenAmount(0)) {
|
|
res.covered = append(res.covered, toAdd...)
|
|
return res, nil
|
|
}
|
|
|
|
// Add funds to address
|
|
a.debugf("add funds %d", amtToAdd)
|
|
addFundsCid, err := a.env.AddFunds(a.ctx, toAdd[0].Wallet, a.state.Addr, amtToAdd)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
// Mark reservation requests as complete
|
|
res.added = toAdd
|
|
|
|
// Save the message CID to state
|
|
res.msgCid = &addFundsCid
|
|
return res, nil
|
|
}
|
|
|
|
// Split reservations into those that are under the total release amount
|
|
// (covered) and those that exceed it (to add).
|
|
// Note that we process requests from the same wallet in batches. So some
|
|
// requests may not be included in covered if they don't match the first
|
|
// covered request's wallet.
|
|
func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([]*fundRequest, []*fundRequest, abi.TokenAmount) {
|
|
toCancel := make([]*fundRequest, 0, len(reservations))
|
|
toAdd := make([]*fundRequest, 0, len(reservations))
|
|
toAddAmt := abi.NewTokenAmount(0)
|
|
|
|
// Sum release amounts
|
|
releaseAmt := abi.NewTokenAmount(0)
|
|
for _, req := range releases {
|
|
releaseAmt = types.BigAdd(releaseAmt, req.Amount())
|
|
}
|
|
|
|
// We only want to combine requests that come from the same wallet
|
|
batchWallet := address.Undef
|
|
for _, req := range reservations {
|
|
amt := req.Amount()
|
|
|
|
// If the amount to add to the reserve is cancelled out by a release
|
|
if amt.LessThanEqual(releaseAmt) {
|
|
// Cancel the request and update the release total
|
|
releaseAmt = types.BigSub(releaseAmt, amt)
|
|
toCancel = append(toCancel, req)
|
|
continue
|
|
}
|
|
|
|
// The amount to add is greater that the release total so we want
|
|
// to send an add funds request
|
|
|
|
// The first time the wallet will be undefined
|
|
if batchWallet == address.Undef {
|
|
batchWallet = req.Wallet
|
|
}
|
|
// If this request's wallet is the same as the batch wallet,
|
|
// the requests will be combined
|
|
if batchWallet == req.Wallet {
|
|
delta := types.BigSub(amt, releaseAmt)
|
|
toAddAmt = types.BigAdd(toAddAmt, delta)
|
|
releaseAmt = abi.NewTokenAmount(0)
|
|
toAdd = append(toAdd, req)
|
|
}
|
|
}
|
|
|
|
// The change in the reserved amount is "amount to add" - "amount to release"
|
|
reservedDelta := types.BigSub(toAddAmt, releaseAmt)
|
|
|
|
return toCancel, toAdd, reservedDelta
|
|
}
|
|
|
|
// process withdrawal queue
|
|
func (a *fundedAddress) processWithdrawals(withdrawals []*fundRequest) (msgCid cid.Cid, prerr error) {
|
|
// If there's an error, mark all withdrawal requests as errored
|
|
defer func() {
|
|
if prerr != nil {
|
|
for _, req := range withdrawals {
|
|
req.Complete(cid.Undef, prerr)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Get the net available balance
|
|
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
|
|
netAvail := types.BigSub(avail, a.state.AmtReserved)
|
|
|
|
// Fit as many withdrawals as possible into the available balance, and fail
|
|
// the rest
|
|
withdrawalAmt := abi.NewTokenAmount(0)
|
|
allowedAmt := abi.NewTokenAmount(0)
|
|
allowed := make([]*fundRequest, 0, len(withdrawals))
|
|
var batchWallet address.Address
|
|
for _, req := range withdrawals {
|
|
amt := req.Amount()
|
|
if amt.IsZero() {
|
|
// If the context for the request was cancelled, bail out
|
|
req.Complete(cid.Undef, err)
|
|
continue
|
|
}
|
|
|
|
// If the amount would exceed the available amount, complete the
|
|
// request with an error
|
|
newWithdrawalAmt := types.BigAdd(withdrawalAmt, amt)
|
|
if newWithdrawalAmt.GreaterThan(netAvail) {
|
|
msg := fmt.Sprintf("insufficient funds for withdrawal of %s: ", types.FIL(amt))
|
|
msg += fmt.Sprintf("net available (%s) = available (%s) - reserved (%s)",
|
|
types.FIL(types.BigSub(netAvail, withdrawalAmt)), types.FIL(avail), types.FIL(a.state.AmtReserved))
|
|
if !withdrawalAmt.IsZero() {
|
|
msg += fmt.Sprintf(" - queued withdrawals (%s)", types.FIL(withdrawalAmt))
|
|
}
|
|
err := xerrors.Errorf(msg)
|
|
a.debugf("%s", err)
|
|
req.Complete(cid.Undef, err)
|
|
continue
|
|
}
|
|
|
|
// If this is the first allowed withdrawal request in this batch, save
|
|
// its wallet address
|
|
if batchWallet == address.Undef {
|
|
batchWallet = req.Wallet
|
|
}
|
|
// If the request wallet doesn't match the batch wallet, bail out
|
|
// (the withdrawal will be processed after the current batch has
|
|
// completed)
|
|
if req.Wallet != batchWallet {
|
|
continue
|
|
}
|
|
|
|
// Include this withdrawal request in the batch
|
|
withdrawalAmt = newWithdrawalAmt
|
|
a.debugf("withdraw %d", amt)
|
|
allowed = append(allowed, req)
|
|
allowedAmt = types.BigAdd(allowedAmt, amt)
|
|
}
|
|
|
|
// Check if there is anything to withdraw.
|
|
// Note that if the context for a request is cancelled,
|
|
// req.Amount() returns zero
|
|
if allowedAmt.Equals(abi.NewTokenAmount(0)) {
|
|
// Mark allowed requests as complete
|
|
for _, req := range allowed {
|
|
req.Complete(cid.Undef, nil)
|
|
}
|
|
return cid.Undef, nil
|
|
}
|
|
|
|
// Withdraw funds
|
|
a.debugf("withdraw funds %d", allowedAmt)
|
|
withdrawFundsCid, err := a.env.WithdrawFunds(a.ctx, allowed[0].Wallet, a.state.Addr, allowedAmt)
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
|
|
// Mark allowed requests as complete
|
|
for _, req := range allowed {
|
|
req.Complete(withdrawFundsCid, nil)
|
|
}
|
|
|
|
// Save the message CID to state
|
|
return withdrawFundsCid, nil
|
|
}
|
|
|
|
// asynchonously wait for results of message
|
|
func (a *fundedAddress) startWaitForResults(msgCid cid.Cid) {
|
|
go func() {
|
|
err := a.env.WaitMsg(a.ctx, msgCid)
|
|
if err != nil {
|
|
// We don't really care about the results here, we're just waiting
|
|
// so as to only process one on-chain message at a time
|
|
log.Errorf("waiting for results of message %s for addr %s: %v", msgCid, a.state.Addr, err)
|
|
}
|
|
|
|
a.lk.Lock()
|
|
a.debugf("complete wait")
|
|
a.clearWaitState()
|
|
a.lk.Unlock()
|
|
|
|
a.process()
|
|
}()
|
|
}
|
|
|
|
func (a *fundedAddress) debugf(args ...interface{}) {
|
|
fmtStr := args[0].(string)
|
|
args = args[1:]
|
|
log.Debugf(a.state.Addr.String()+": "+fmtStr, args...)
|
|
}
|
|
|
|
// The result of a fund request
|
|
type reqResult struct {
|
|
msgCid cid.Cid
|
|
err error
|
|
}
|
|
|
|
// A request to change funds
|
|
type fundRequest struct {
|
|
ctx context.Context
|
|
amt abi.TokenAmount
|
|
completed chan struct{}
|
|
Wallet address.Address
|
|
Result chan reqResult
|
|
}
|
|
|
|
func newFundRequest(ctx context.Context, wallet address.Address, amt abi.TokenAmount) *fundRequest {
|
|
return &fundRequest{
|
|
ctx: ctx,
|
|
amt: amt,
|
|
Wallet: wallet,
|
|
Result: make(chan reqResult),
|
|
completed: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Amount returns zero if the context has expired
|
|
func (frp *fundRequest) Amount() abi.TokenAmount {
|
|
if frp.ctx.Err() != nil {
|
|
return abi.NewTokenAmount(0)
|
|
}
|
|
return frp.amt
|
|
}
|
|
|
|
// Complete is called with the message CID when the funds request has been
|
|
// started or with the error if there was an error
|
|
func (frp *fundRequest) Complete(msgCid cid.Cid, err error) {
|
|
select {
|
|
case <-frp.completed:
|
|
case <-frp.ctx.Done():
|
|
case frp.Result <- reqResult{msgCid: msgCid, err: err}:
|
|
}
|
|
close(frp.completed)
|
|
}
|
|
|
|
// Completed indicates if Complete has already been called
|
|
func (frp *fundRequest) Completed() bool {
|
|
select {
|
|
case <-frp.completed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// fundManagerEnvironment simplifies some API calls
|
|
type fundManagerEnvironment struct {
|
|
api fundManagerAPI
|
|
}
|
|
|
|
func (env *fundManagerEnvironment) AvailableFunds(ctx context.Context, addr address.Address) (abi.TokenAmount, error) {
|
|
bal, err := env.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
|
|
if err != nil {
|
|
return abi.NewTokenAmount(0), err
|
|
}
|
|
|
|
return types.BigSub(bal.Escrow, bal.Locked), nil
|
|
}
|
|
|
|
func (env *fundManagerEnvironment) AddFunds(
|
|
ctx context.Context,
|
|
wallet address.Address,
|
|
addr address.Address,
|
|
amt abi.TokenAmount,
|
|
) (cid.Cid, error) {
|
|
params, err := actors.SerializeParams(&addr)
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
|
|
smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{
|
|
To: market.Address,
|
|
From: wallet,
|
|
Value: amt,
|
|
Method: market.Methods.AddBalance,
|
|
Params: params,
|
|
}, nil)
|
|
|
|
if aerr != nil {
|
|
return cid.Undef, aerr
|
|
}
|
|
|
|
return smsg.Cid(), nil
|
|
}
|
|
|
|
func (env *fundManagerEnvironment) WithdrawFunds(
|
|
ctx context.Context,
|
|
wallet address.Address,
|
|
addr address.Address,
|
|
amt abi.TokenAmount,
|
|
) (cid.Cid, error) {
|
|
params, err := actors.SerializeParams(&market.WithdrawBalanceParams{
|
|
ProviderOrClientAddress: addr,
|
|
Amount: amt,
|
|
})
|
|
if err != nil {
|
|
return cid.Undef, xerrors.Errorf("serializing params: %w", err)
|
|
}
|
|
|
|
smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{
|
|
To: market.Address,
|
|
From: wallet,
|
|
Value: types.NewInt(0),
|
|
Method: market.Methods.WithdrawBalance,
|
|
Params: params,
|
|
}, nil)
|
|
|
|
if aerr != nil {
|
|
return cid.Undef, aerr
|
|
}
|
|
|
|
return smsg.Cid(), nil
|
|
}
|
|
|
|
func (env *fundManagerEnvironment) WaitMsg(ctx context.Context, c cid.Cid) error {
|
|
_, err := env.api.StateWaitMsg(ctx, c, build.MessageConfidence)
|
|
return err
|
|
}
|