lotus/chain/market/fundmanager.go

730 lines
20 KiB
Go
Raw Normal View History

2020-11-05 16:50:40 +00:00
package market
import (
"context"
2021-01-05 15:58:12 +00:00
"fmt"
2020-11-05 16:50:40 +00:00
"sync"
2022-06-14 15:00:51 +00:00
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"
"golang.org/x/xerrors"
2020-11-10 15:45:48 +00:00
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
2022-04-20 21:34:28 +00:00
"github.com/filecoin-project/go-state-types/builtin"
build: release: v1.18.0 (#9652) * build: Bump version to v1.17.3-dev * build: set version to v1.18.0-dev * chore: actors: Allow builtin-actors to return a map of methods (#9342) * Allow builtin-actors to return a map of methods * go mod * Fix tests * Fix tests, check carefully please * Delete lotus-pond (#9352) * feat: add StateNetworkVersion to mpool API * chore: refactor: rename NewestNetworkVersion * feat: actors: Integrate datacap actor into lotus (#9348) * Integrate datacap actor * Implement datacap actor in chain/builtin * feat: support typed errors over RPC * chore: deps: update to go-jsonrpc 0.1.8 * remove duplicate import * fix: itest: check for closed connection * chore: refactor: move retry test to API * address magik supernit * Add ability to only have single partition per msg for partitions with recovery sectors * doc gen * Address comments * Return beneficiary info from miner state Info() * Update builtin-actors to dev/20220922-v9 which includes FIP-0045 changes in progress * Integrate verifreg changes to lotus * Setup datacap actor * Update builtin-actors to dev/20220922-v9-1 * Update datacap actor to query datacap instead of verifreg * update gst * update markets * update actors with hamt fix * update gst * Update datacap to parse tokens * Update bundles * datacap and verifreg actors use ID addresses without protocol byte * update builtin-actors to rc1 * update go-fil-markets * Update bundles to rc2 * Integrate the v9 migration * Add api for getting allocation * Add upgrade epoch for butterfly * Tweak PreSeal struct to be infra-friendly * docsgen * More tweaking of PreSeal for genesis * review fixes * Use fake cid for test * add butterfly artifacts for oct 5 upgrade * check datacaps for v8 verifreg match v9 datacap actor * Remove print statements * Update to go-state-types master * Update to go-state-types v0.9.0-rc1 * review fixes * use go-fil-markets v1.24.0-v17 * Add accessors for allocations and claims maps * fix: missing permissions tag * butterfly * update butterfly artifacts * sealing pipeline: Prepare deal assigning logic for FIP-45 * sealing pipeline: Get allocationId with StateApi * use NoAllocationID instead of nil AllocationId * address review * Add datacap actor to registry.go * Add cli for listing allocations and removing expired allocations * Update to go-state-types master * deps: upgrade go-merkledag to 0.8.0 * shark params * Update cli/filplus.go Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * revert change to verifreg util * docsgen-cli * miss the stuff * Update FFI * Update go-state-types to v0.9.0 * Update builtin-actors to v9.0.0 * add calib upgrade epcoh * update the upgrade envvar * kill shark * Remove fvm splash banner from nv17 upgrade * check invariance for pending deals and allocations * check pending verified deal proposal migrated to allocation * Add check for unsealed CID in precommit sectors * Fix counting of allocations in nv17 migration test * make gen * pass state trees as pointers * Add assertion that migrations with & without cache are the same * compare allocation to verified deal proposal * Fix miner state precommit info * fix migration test tool * add changelog * Update to go-state-types v0.9.1 * Integrate builtin-actors v9.0.1 * chore: ver: bump version for rc3 (#9512) * Bump version to 1.18.0-rc3 * Update CHANGELOG.md * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Migration: Use autobatch bs * Fix autobatch Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> * Invoker: Use MethodMeta from go-state-types * Add a second premigration for nv17 * Add more shed tools for migration checking * address review * Lotus release v1.18.0-rc4 * fix: ci: fix app-image build on ci (#9527) * Remove old go version first * Add GO_VERSION file * Use GO_VERSION to set / verify go version * mv GO_VERSION GO_VERSION_MIN * Use GO_VERSION_MIN in Makefile check Co-authored-by: Ian Davis <jungziege@gmail.com> * Update to latest go-state-types for migration fixes * go mod tidy * fix: use api.ErrActorNotFound instead of types.ErrActorNotFound * fix: add fields to ForkUpgradeParams * docs: update actors_version_checklist.md * chore: fix lint * update to go state type v0.9.6 with market migration fix (#9545) * update go-state-types to v-0.9.7 * Add invariant checks to migration * fix invariant check: number of entries in datacap actor should include verifreg * Invariant checks: Only include not-activated deals * test: nv17 migration * Address review * add lotus-shed invariance method * Migration cli takes a stateroot cid and a height * make gen * Update to builtin-actors v9.0.2 * Failing test that shows that notaries can remove datacap from the verifreg actor * Test that should pass when the problem is solved * make gen * Review fixes * statemanager call function will return call information even if call errors * update go-state-types * update builtin-actors * bubble up errors properly from ApplyImplicitMessage * bump to rc5 * set new upgrade heights for calibnet * set new upgrade height for butterfly * tweak calibnet upgrade schedule * clarify changelog note about calibnet * butterfly * update calibnet artifacts * Allow setting local bundles for Debug FVM for av 9+ * fix: autobatch: remove potential deadlock when a block is missing Check the _underlying_ blockstore instead of recursing. Also, drop the lock before we do that. * fix imports * build: set shark mainnet epoch (#9640) * chore: build: Lotus release v1.18.0 (#9641) * Lotus release v1.18.0 * add changelog * address review * changelog improvement Co-authored-by: Jennifer Wang <jiayingw703@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Łukasz Magiera <magik6k@gmail.com> Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com> Co-authored-by: Aayush <arajasek94@gmail.com> Co-authored-by: Geoff Stuart <geoff.vball@gmail.com> Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai> Co-authored-by: simlecode <69969590+simlecode@users.noreply.github.com> Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Ian Davis <jungziege@gmail.com> Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com> Co-authored-by: Steven Allen <steven@stebalien.com>
2022-11-16 01:57:23 +00:00
"github.com/filecoin-project/go-state-types/builtin/v9/market"
2022-06-14 15:00:51 +00:00
2020-11-10 15:45:48 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
2020-11-05 16:50:40 +00:00
"github.com/filecoin-project/lotus/chain/actors"
2020-11-10 15:45:48 +00:00
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2020-11-05 16:50:40 +00:00
)
var log = logging.Logger("market_adapter")
// API is the fx dependencies need to run a fund manager
type FundManagerAPI struct {
fx.In
full.StateAPI
full.MpoolAPI
}
// fundManagerAPI is the specific methods called by the FundManager
2020-11-10 15:45:48 +00:00
// (used by the tests)
2020-11-05 16:50:40 +00:00
type fundManagerAPI interface {
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
2021-04-05 11:23:46 +00:00
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
2020-11-05 16:50:40 +00:00
}
// FundManager keeps track of funds in a set of addresses
type FundManager struct {
ctx context.Context
shutdown context.CancelFunc
api fundManagerAPI
str *Store
lk sync.Mutex
fundedAddrs map[address.Address]*fundedAddress
}
func NewFundManager(lc fx.Lifecycle, api FundManagerAPI, ds dtypes.MetadataDS) *FundManager {
2020-11-10 15:45:48 +00:00
fm := newFundManager(&api, ds)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return fm.Start()
},
OnStop: func(ctx context.Context) error {
fm.Stop()
return nil
},
})
return fm
}
// newFundManager is used by the tests
func newFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager {
2020-11-05 16:50:40 +00:00
ctx, cancel := context.WithCancel(context.Background())
return &FundManager{
ctx: ctx,
shutdown: cancel,
api: api,
str: newStore(ds),
fundedAddrs: make(map[address.Address]*fundedAddress),
}
}
func (fm *FundManager) Stop() {
fm.shutdown()
}
func (fm *FundManager) Start() error {
fm.lk.Lock()
defer fm.lk.Unlock()
// TODO:
// To save memory:
// - in State() only load addresses with in-progress messages
// - load the others just-in-time from getFundedAddress
// - delete(fm.fundedAddrs, addr) when the queue has been processed
2021-12-11 21:03:00 +00:00
return fm.str.forEach(fm.ctx, func(state *FundedAddressState) {
2020-11-05 16:50:40 +00:00
fa := newFundedAddress(fm, state.Addr)
fa.state = state
fm.fundedAddrs[fa.state.Addr] = fa
fa.start()
})
}
// Creates a fundedAddress if it doesn't already exist, and returns it
func (fm *FundManager) getFundedAddress(addr address.Address) *fundedAddress {
fm.lk.Lock()
defer fm.lk.Unlock()
fa, ok := fm.fundedAddrs[addr]
if !ok {
fa = newFundedAddress(fm, addr)
fm.fundedAddrs[addr] = fa
}
return fa
}
// Reserve adds amt to `reserved`. If there are not enough available funds for
2020-11-05 16:50:40 +00:00
// the address, submits a message on chain to top up available funds.
2020-11-10 08:55:38 +00:00
// Returns the cid of the message that was submitted on chain, or cid.Undef if
// the required funds were already available.
func (fm *FundManager) Reserve(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return fm.getFundedAddress(addr).reserve(ctx, wallet, amt)
2020-11-05 16:50:40 +00:00
}
// Subtract from `reserved`.
func (fm *FundManager) Release(addr address.Address, amt abi.TokenAmount) error {
return fm.getFundedAddress(addr).release(amt)
2020-11-05 16:50:40 +00:00
}
// Withdraw unreserved funds. Only succeeds if there are enough unreserved
// funds for the address.
2020-11-10 08:55:38 +00:00
// Returns the cid of the message that was submitted on chain.
func (fm *FundManager) Withdraw(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return fm.getFundedAddress(addr).withdraw(ctx, wallet, amt)
2020-11-05 16:50:40 +00:00
}
// GetReserved returns the amount that is currently reserved for the address
func (fm *FundManager) GetReserved(addr address.Address) abi.TokenAmount {
return fm.getFundedAddress(addr).getReserved()
}
2020-11-05 16:50:40 +00:00
// FundedAddressState keeps track of the state of an address with funds in the
// datastore
type FundedAddressState struct {
Addr address.Address
2020-11-05 16:50:40 +00:00
// AmtReserved is the amount that must be kept in the address (cannot be
// withdrawn)
AmtReserved abi.TokenAmount
// MsgCid is the cid of an in-progress on-chain message
MsgCid *cid.Cid
}
// fundedAddress keeps track of the state and request queues for a
// particular address
type fundedAddress struct {
ctx context.Context
env *fundManagerEnvironment
str *Store
lk sync.RWMutex
2020-11-05 16:50:40 +00:00
state *FundedAddressState
// Note: These request queues are ephemeral, they are not saved to store
reservations []*fundRequest
releases []*fundRequest
withdrawals []*fundRequest
// Used by the tests
onProcessStartListener func() bool
}
func newFundedAddress(fm *FundManager, addr address.Address) *fundedAddress {
return &fundedAddress{
ctx: fm.ctx,
env: &fundManagerEnvironment{api: fm.api},
str: fm.str,
state: &FundedAddressState{
Addr: addr,
AmtReserved: abi.NewTokenAmount(0),
},
}
}
// If there is an in-progress on-chain message, don't submit any more messages
2020-11-05 16:50:40 +00:00
// on chain until it completes
func (a *fundedAddress) start() {
a.lk.Lock()
defer a.lk.Unlock()
if a.state.MsgCid != nil {
a.debugf("restart: wait for %s", a.state.MsgCid)
a.startWaitForResults(*a.state.MsgCid)
}
}
func (a *fundedAddress) getReserved() abi.TokenAmount {
a.lk.RLock()
defer a.lk.RUnlock()
return a.state.AmtReserved
}
2020-11-10 08:55:38 +00:00
func (a *fundedAddress) reserve(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return a.requestAndWait(ctx, wallet, amt, &a.reservations)
2020-11-05 16:50:40 +00:00
}
func (a *fundedAddress) release(amt abi.TokenAmount) error {
_, err := a.requestAndWait(context.Background(), address.Undef, amt, &a.releases)
2020-11-05 16:50:40 +00:00
return err
}
2020-11-10 08:55:38 +00:00
func (a *fundedAddress) withdraw(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return a.requestAndWait(ctx, wallet, amt, &a.withdrawals)
2020-11-05 16:50:40 +00:00
}
2020-11-10 08:55:38 +00:00
func (a *fundedAddress) requestAndWait(ctx context.Context, wallet address.Address, amt abi.TokenAmount, reqs *[]*fundRequest) (cid.Cid, error) {
2020-11-05 16:50:40 +00:00
// Create a request and add it to the request queue
req := newFundRequest(ctx, wallet, amt)
2020-11-05 16:50:40 +00:00
a.lk.Lock()
*reqs = append(*reqs, req)
a.lk.Unlock()
// Process the queue
go a.process()
// Wait for the results
select {
case <-ctx.Done():
2020-11-10 08:55:38 +00:00
return cid.Undef, ctx.Err()
2020-11-05 16:50:40 +00:00
case r := <-req.Result:
2020-11-10 08:55:38 +00:00
return r.msgCid, r.err
2020-11-05 16:50:40 +00:00
}
}
// Used by the tests
func (a *fundedAddress) onProcessStart(fn func() bool) {
a.lk.Lock()
defer a.lk.Unlock()
a.onProcessStartListener = fn
}
// Process queued requests
func (a *fundedAddress) process() {
a.lk.Lock()
defer a.lk.Unlock()
// Used by the tests
if a.onProcessStartListener != nil {
done := a.onProcessStartListener()
if !done {
return
2020-11-05 16:50:40 +00:00
}
a.onProcessStartListener = nil
2020-11-05 16:50:40 +00:00
}
// Check if we're still waiting for the response to a message
if a.state.MsgCid != nil {
return
}
// Check if there's anything to do
haveReservations := len(a.reservations) > 0 || len(a.releases) > 0
haveWithdrawals := len(a.withdrawals) > 0
if !haveReservations && !haveWithdrawals {
2020-11-05 16:50:40 +00:00
return
}
// Process reservations / releases
if haveReservations {
res, err := a.processReservations(a.reservations, a.releases)
if err == nil {
a.applyStateChange(res.msgCid, res.amtReserved)
}
a.reservations = filterOutProcessedReqs(a.reservations)
a.releases = filterOutProcessedReqs(a.releases)
}
2020-11-05 16:50:40 +00:00
// If there was no message sent on chain by adding reservations, and all
// reservations have completed processing, process withdrawals
if haveWithdrawals && a.state.MsgCid == nil && len(a.reservations) == 0 {
withdrawalCid, err := a.processWithdrawals(a.withdrawals)
if err == nil && withdrawalCid != cid.Undef {
a.applyStateChange(&withdrawalCid, types.EmptyInt)
}
a.withdrawals = filterOutProcessedReqs(a.withdrawals)
}
2020-11-05 16:50:40 +00:00
// If a message was sent on-chain
if a.state.MsgCid != nil {
// Start waiting for results of message (async)
a.startWaitForResults(*a.state.MsgCid)
}
// Process any remaining queued requests
go a.process()
2020-11-05 16:50:40 +00:00
}
// Filter out completed requests
func filterOutProcessedReqs(reqs []*fundRequest) []*fundRequest {
filtered := make([]*fundRequest, 0, len(reqs))
for _, req := range reqs {
if !req.Completed() {
filtered = append(filtered, req)
}
}
return filtered
}
// Apply the results of processing queues and save to the datastore
func (a *fundedAddress) applyStateChange(msgCid *cid.Cid, amtReserved abi.TokenAmount) {
a.state.MsgCid = msgCid
if !amtReserved.Nil() {
a.state.AmtReserved = amtReserved
}
2020-11-05 16:50:40 +00:00
a.saveState()
}
// Clear the pending message cid so that a new message can be sent
func (a *fundedAddress) clearWaitState() {
a.state.MsgCid = nil
a.saveState()
}
// Save state to datastore
func (a *fundedAddress) saveState() {
// Not much we can do if saving to the datastore fails, just log
2021-12-11 21:03:00 +00:00
err := a.str.save(a.ctx, a.state)
2020-11-05 16:50:40 +00:00
if err != nil {
2021-02-11 11:00:26 +00:00
log.Errorf("saving state to store for addr %s: %v", a.state.Addr, err)
2020-11-05 16:50:40 +00:00
}
}
// The result of processing the reservation / release queues
2020-11-05 16:50:40 +00:00
type processResult struct {
// Requests that completed without adding funds
covered []*fundRequest
// Requests that added funds
added []*fundRequest
2020-11-05 16:50:40 +00:00
// The new reserved amount
amtReserved abi.TokenAmount
// The message cid, if a message was submitted on-chain
2020-11-05 16:50:40 +00:00
msgCid *cid.Cid
}
// process reservations and releases, and return the resulting changes to state
func (a *fundedAddress) processReservations(reservations []*fundRequest, releases []*fundRequest) (pr *processResult, prerr error) {
// When the function returns
2020-11-05 16:50:40 +00:00
defer func() {
// If there's an error, mark all requests as errored
2020-11-05 16:50:40 +00:00
if prerr != nil {
for _, req := range append(reservations, releases...) {
2020-11-05 16:50:40 +00:00
req.Complete(cid.Undef, prerr)
}
return
2020-11-05 16:50:40 +00:00
}
// Complete all release requests
for _, req := range releases {
req.Complete(cid.Undef, nil)
}
2020-11-05 16:50:40 +00:00
// Complete all requests that were covered by released amounts
for _, req := range pr.covered {
req.Complete(cid.Undef, nil)
}
2020-11-05 16:50:40 +00:00
// If a message was sent
if pr.msgCid != nil {
// Complete all add funds requests
for _, req := range pr.added {
req.Complete(*pr.msgCid, nil)
}
}
}()
2020-11-05 16:50:40 +00:00
// Split reservations into those that are covered by released amounts,
// and those to add to the reserved amount.
// Note that we process requests from the same wallet in batches. So some
// requests may not be included in covered if they don't match the first
// covered request's wallet. These will be processed on a subsequent
// invocation of processReservations.
toCancel, toAdd, reservedDelta := splitReservations(reservations, releases)
2020-11-05 16:50:40 +00:00
// Apply the reserved delta to the reserved amount
reserved := types.BigAdd(a.state.AmtReserved, reservedDelta)
2020-11-05 16:50:40 +00:00
if reserved.LessThan(abi.NewTokenAmount(0)) {
reserved = abi.NewTokenAmount(0)
}
res := &processResult{
amtReserved: reserved,
covered: toCancel,
}
2020-11-05 16:50:40 +00:00
// Work out the amount to add to the balance
amtToAdd := abi.NewTokenAmount(0)
2020-11-11 19:32:26 +00:00
if len(toAdd) > 0 && reserved.GreaterThan(abi.NewTokenAmount(0)) {
2020-11-05 16:50:40 +00:00
// Get available funds for address
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
if err != nil {
return res, err
}
// amount to add = new reserved amount - available
amtToAdd = types.BigSub(reserved, avail)
a.debugf("reserved %d - avail %d = to add %d", reserved, avail, amtToAdd)
2020-11-05 16:50:40 +00:00
}
// If there's nothing to add to the balance, bail out
if amtToAdd.LessThanEqual(abi.NewTokenAmount(0)) {
res.covered = append(res.covered, toAdd...)
return res, nil
2020-11-05 16:50:40 +00:00
}
// Add funds to address
a.debugf("add funds %d", amtToAdd)
addFundsCid, err := a.env.AddFunds(a.ctx, toAdd[0].Wallet, a.state.Addr, amtToAdd)
2020-11-05 16:50:40 +00:00
if err != nil {
return res, err
}
// Mark reservation requests as complete
res.added = toAdd
2020-11-05 16:50:40 +00:00
// Save the message CID to state
res.msgCid = &addFundsCid
return res, nil
}
// Split reservations into those that are under the total release amount
// (covered) and those that exceed it (to add).
// Note that we process requests from the same wallet in batches. So some
// requests may not be included in covered if they don't match the first
// covered request's wallet.
func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([]*fundRequest, []*fundRequest, abi.TokenAmount) {
toCancel := make([]*fundRequest, 0, len(reservations))
toAdd := make([]*fundRequest, 0, len(reservations))
toAddAmt := abi.NewTokenAmount(0)
// Sum release amounts
releaseAmt := abi.NewTokenAmount(0)
for _, req := range releases {
releaseAmt = types.BigAdd(releaseAmt, req.Amount())
}
// We only want to combine requests that come from the same wallet
batchWallet := address.Undef
for _, req := range reservations {
amt := req.Amount()
// If the amount to add to the reserve is cancelled out by a release
if amt.LessThanEqual(releaseAmt) {
// Cancel the request and update the release total
releaseAmt = types.BigSub(releaseAmt, amt)
toCancel = append(toCancel, req)
continue
}
// The amount to add is greater that the release total so we want
// to send an add funds request
// The first time the wallet will be undefined
if batchWallet == address.Undef {
batchWallet = req.Wallet
}
// If this request's wallet is the same as the batch wallet,
// the requests will be combined
if batchWallet == req.Wallet {
delta := types.BigSub(amt, releaseAmt)
toAddAmt = types.BigAdd(toAddAmt, delta)
releaseAmt = abi.NewTokenAmount(0)
toAdd = append(toAdd, req)
}
}
// The change in the reserved amount is "amount to add" - "amount to release"
reservedDelta := types.BigSub(toAddAmt, releaseAmt)
return toCancel, toAdd, reservedDelta
}
2020-11-05 16:50:40 +00:00
// process withdrawal queue
func (a *fundedAddress) processWithdrawals(withdrawals []*fundRequest) (msgCid cid.Cid, prerr error) {
// If there's an error, mark all withdrawal requests as errored
2020-11-05 16:50:40 +00:00
defer func() {
if prerr != nil {
for _, req := range withdrawals {
2020-11-05 16:50:40 +00:00
req.Complete(cid.Undef, prerr)
}
}
}()
// Get the net available balance
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
if err != nil {
return cid.Undef, err
2020-11-05 16:50:40 +00:00
}
netAvail := types.BigSub(avail, a.state.AmtReserved)
2020-11-05 16:50:40 +00:00
// Fit as many withdrawals as possible into the available balance, and fail
// the rest
withdrawalAmt := abi.NewTokenAmount(0)
allowedAmt := abi.NewTokenAmount(0)
2020-11-11 19:32:26 +00:00
allowed := make([]*fundRequest, 0, len(withdrawals))
var batchWallet address.Address
2020-11-11 19:32:26 +00:00
for _, req := range withdrawals {
2020-11-05 16:50:40 +00:00
amt := req.Amount()
if amt.IsZero() {
// If the context for the request was cancelled, bail out
req.Complete(cid.Undef, err)
continue
}
// If the amount would exceed the available amount, complete the
// request with an error
newWithdrawalAmt := types.BigAdd(withdrawalAmt, amt)
if newWithdrawalAmt.GreaterThan(netAvail) {
msg := fmt.Sprintf("insufficient funds for withdrawal of %s: ", types.FIL(amt))
msg += fmt.Sprintf("net available (%s) = available (%s) - reserved (%s)",
types.FIL(types.BigSub(netAvail, withdrawalAmt)), types.FIL(avail), types.FIL(a.state.AmtReserved))
2021-01-05 15:58:12 +00:00
if !withdrawalAmt.IsZero() {
msg += fmt.Sprintf(" - queued withdrawals (%s)", types.FIL(withdrawalAmt))
2021-01-05 15:58:12 +00:00
}
err := xerrors.Errorf(msg)
2020-11-05 16:50:40 +00:00
a.debugf("%s", err)
req.Complete(cid.Undef, err)
continue
}
// If this is the first allowed withdrawal request in this batch, save
// its wallet address
if batchWallet == address.Undef {
batchWallet = req.Wallet
}
// If the request wallet doesn't match the batch wallet, bail out
// (the withdrawal will be processed after the current batch has
// completed)
if req.Wallet != batchWallet {
continue
2020-11-05 16:50:40 +00:00
}
// Include this withdrawal request in the batch
withdrawalAmt = newWithdrawalAmt
a.debugf("withdraw %d", amt)
allowed = append(allowed, req)
allowedAmt = types.BigAdd(allowedAmt, amt)
2020-11-05 16:50:40 +00:00
}
// Check if there is anything to withdraw.
// Note that if the context for a request is cancelled,
// req.Amount() returns zero
2020-11-05 16:50:40 +00:00
if allowedAmt.Equals(abi.NewTokenAmount(0)) {
// Mark allowed requests as complete
2020-11-05 16:50:40 +00:00
for _, req := range allowed {
req.Complete(cid.Undef, nil)
}
return cid.Undef, nil
2020-11-05 16:50:40 +00:00
}
// Withdraw funds
a.debugf("withdraw funds %d", allowedAmt)
withdrawFundsCid, err := a.env.WithdrawFunds(a.ctx, allowed[0].Wallet, a.state.Addr, allowedAmt)
2020-11-05 16:50:40 +00:00
if err != nil {
return cid.Undef, err
2020-11-05 16:50:40 +00:00
}
// Mark allowed requests as complete
for _, req := range allowed {
req.Complete(withdrawFundsCid, nil)
}
// Save the message CID to state
return withdrawFundsCid, nil
2020-11-05 16:50:40 +00:00
}
// asynchonously wait for results of message
func (a *fundedAddress) startWaitForResults(msgCid cid.Cid) {
go func() {
err := a.env.WaitMsg(a.ctx, msgCid)
if err != nil {
// We don't really care about the results here, we're just waiting
// so as to only process one on-chain message at a time
2021-02-11 11:00:26 +00:00
log.Errorf("waiting for results of message %s for addr %s: %v", msgCid, a.state.Addr, err)
2020-11-05 16:50:40 +00:00
}
a.lk.Lock()
a.debugf("complete wait")
a.clearWaitState()
a.lk.Unlock()
a.process()
}()
}
func (a *fundedAddress) debugf(args ...interface{}) {
fmtStr := args[0].(string)
args = args[1:]
log.Debugf(a.state.Addr.String()+": "+fmtStr, args...)
}
// The result of a fund request
type reqResult struct {
msgCid cid.Cid
err error
}
// A request to change funds
type fundRequest struct {
ctx context.Context
amt abi.TokenAmount
completed chan struct{}
Wallet address.Address
2020-11-05 16:50:40 +00:00
Result chan reqResult
}
func newFundRequest(ctx context.Context, wallet address.Address, amt abi.TokenAmount) *fundRequest {
2020-11-05 16:50:40 +00:00
return &fundRequest{
ctx: ctx,
amt: amt,
Wallet: wallet,
2020-11-05 16:50:40 +00:00
Result: make(chan reqResult),
completed: make(chan struct{}),
}
}
// Amount returns zero if the context has expired
func (frp *fundRequest) Amount() abi.TokenAmount {
if frp.ctx.Err() != nil {
return abi.NewTokenAmount(0)
}
return frp.amt
}
// Complete is called with the message CID when the funds request has been
// started or with the error if there was an error
func (frp *fundRequest) Complete(msgCid cid.Cid, err error) {
select {
case <-frp.completed:
case <-frp.ctx.Done():
case frp.Result <- reqResult{msgCid: msgCid, err: err}:
}
close(frp.completed)
}
// Completed indicates if Complete has already been called
func (frp *fundRequest) Completed() bool {
select {
case <-frp.completed:
return true
default:
return false
}
}
// fundManagerEnvironment simplifies some API calls
type fundManagerEnvironment struct {
api fundManagerAPI
}
func (env *fundManagerEnvironment) AvailableFunds(ctx context.Context, addr address.Address) (abi.TokenAmount, error) {
bal, err := env.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
if err != nil {
return abi.NewTokenAmount(0), err
}
return types.BigSub(bal.Escrow, bal.Locked), nil
}
func (env *fundManagerEnvironment) AddFunds(
ctx context.Context,
wallet address.Address,
addr address.Address,
amt abi.TokenAmount,
) (cid.Cid, error) {
params, err := actors.SerializeParams(&addr)
if err != nil {
return cid.Undef, err
}
smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{
2022-04-20 21:34:28 +00:00
To: builtin.StorageMarketActorAddr,
From: wallet,
Value: amt,
2022-04-20 21:34:28 +00:00
Method: builtin.MethodsMarket.AddBalance,
Params: params,
}, nil)
if aerr != nil {
return cid.Undef, aerr
}
return smsg.Cid(), nil
2020-11-05 16:50:40 +00:00
}
func (env *fundManagerEnvironment) WithdrawFunds(
ctx context.Context,
wallet address.Address,
addr address.Address,
amt abi.TokenAmount,
) (cid.Cid, error) {
params, err := actors.SerializeParams(&market.WithdrawBalanceParams{
ProviderOrClientAddress: addr,
Amount: amt,
})
2020-11-05 16:50:40 +00:00
if err != nil {
return cid.Undef, xerrors.Errorf("serializing params: %w", err)
2020-11-05 16:50:40 +00:00
}
smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{
2022-04-20 21:34:28 +00:00
To: builtin.StorageMarketActorAddr,
From: wallet,
Value: types.NewInt(0),
2022-04-20 21:34:28 +00:00
Method: builtin.MethodsMarket.WithdrawBalance,
2020-11-05 16:50:40 +00:00
Params: params,
}, nil)
if aerr != nil {
return cid.Undef, aerr
}
return smsg.Cid(), nil
}
func (env *fundManagerEnvironment) WaitMsg(ctx context.Context, c cid.Cid) error {
2021-04-05 11:23:46 +00:00
_, err := env.api.StateWaitMsg(ctx, c, build.MessageConfidence, api.LookbackNoLimit, true)
2020-11-05 16:50:40 +00:00
return err
}