diff --git a/chain/actors/builtin/market/market.go b/chain/actors/builtin/market/market.go index 195ca40b9..8bb31f2b4 100644 --- a/chain/actors/builtin/market/market.go +++ b/chain/actors/builtin/market/market.go @@ -81,6 +81,7 @@ type DealProposals interface { type PublishStorageDealsParams = market0.PublishStorageDealsParams type PublishStorageDealsReturn = market0.PublishStorageDealsReturn type VerifyDealsForActivationParams = market0.VerifyDealsForActivationParams +type WithdrawBalanceParams = market0.WithdrawBalanceParams type ClientDealProposal = market0.ClientDealProposal diff --git a/chain/market/cbor_gen.go b/chain/market/cbor_gen.go index 85f65121b..397b5477a 100644 --- a/chain/market/cbor_gen.go +++ b/chain/market/cbor_gen.go @@ -12,7 +12,7 @@ import ( var _ = xerrors.Errorf -var lengthBufFundedAddressState = []byte{132} +var lengthBufFundedAddressState = []byte{131} func (t *FundedAddressState) MarshalCBOR(w io.Writer) error { if t == nil { @@ -25,11 +25,6 @@ func (t *FundedAddressState) MarshalCBOR(w io.Writer) error { scratch := make([]byte, 9) - // t.Wallet (address.Address) (struct) - if err := t.Wallet.MarshalCBOR(w); err != nil { - return err - } - // t.Addr (address.Address) (struct) if err := t.Addr.MarshalCBOR(w); err != nil { return err @@ -69,19 +64,10 @@ func (t *FundedAddressState) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input should be of type array") } - if extra != 4 { + if extra != 3 { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Wallet (address.Address) (struct) - - { - - if err := t.Wallet.UnmarshalCBOR(br); err != nil { - return xerrors.Errorf("unmarshaling t.Wallet: %w", err) - } - - } // t.Addr (address.Address) (struct) { diff --git a/chain/market/fundmanager.go b/chain/market/fundmanager.go index 1834abf31..554d6e933 100644 --- a/chain/market/fundmanager.go +++ b/chain/market/fundmanager.go @@ -46,24 +46,22 @@ type FundManager struct { ctx context.Context shutdown context.CancelFunc api fundManagerAPI - wallet address.Address str *Store lk sync.Mutex fundedAddrs map[address.Address]*fundedAddress } -type waitSentinel cid.Cid +type WaitSentinel cid.Cid -var waitSentinelUndef = waitSentinel(cid.Undef) +var WaitSentinelUndef = WaitSentinel(cid.Undef) -func NewFundManager(api fundManagerAPI, ds datastore.Batching, wallet address.Address) *FundManager { +func NewFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager { ctx, cancel := context.WithCancel(context.Background()) return &FundManager{ ctx: ctx, shutdown: cancel, api: api, - wallet: wallet, str: newStore(ds), fundedAddrs: make(map[address.Address]*fundedAddress), } @@ -103,25 +101,25 @@ func (fm *FundManager) getFundedAddress(addr address.Address) *fundedAddress { return fa } -// Reserve adds amt to `reserved`. If there is not enough available funds for +// Reserve adds amt to `reserved`. If there are not enough available funds for // the address, submits a message on chain to top up available funds. -func (fm *FundManager) Reserve(ctx context.Context, addr address.Address, amt abi.TokenAmount) (waitSentinel, error) { - return fm.getFundedAddress(addr).reserve(ctx, amt) +func (fm *FundManager) Reserve(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (WaitSentinel, error) { + return fm.getFundedAddress(addr).reserve(ctx, wallet, amt) } // Subtract from `reserved`. -func (fm *FundManager) Release(ctx context.Context, addr address.Address, amt abi.TokenAmount) error { - return fm.getFundedAddress(addr).release(ctx, amt) +func (fm *FundManager) Release(addr address.Address, amt abi.TokenAmount) error { + return fm.getFundedAddress(addr).release(amt) } // Withdraw unreserved funds. Only succeeds if there are enough unreserved // funds for the address. -func (fm *FundManager) Withdraw(ctx context.Context, addr address.Address, amt abi.TokenAmount) (waitSentinel, error) { - return fm.getFundedAddress(addr).withdraw(ctx, amt) +func (fm *FundManager) Withdraw(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (WaitSentinel, error) { + return fm.getFundedAddress(addr).withdraw(ctx, wallet, amt) } // Waits for a reserve or withdraw to complete. -func (fm *FundManager) Wait(ctx context.Context, sentinel waitSentinel) error { +func (fm *FundManager) Wait(ctx context.Context, sentinel WaitSentinel) error { _, err := fm.api.StateWaitMsg(ctx, cid.Cid(sentinel), build.MessageConfidence) return err } @@ -129,9 +127,7 @@ func (fm *FundManager) Wait(ctx context.Context, sentinel waitSentinel) error { // FundedAddressState keeps track of the state of an address with funds in the // datastore type FundedAddressState struct { - // Wallet is the wallet from which funds are added to the address - Wallet address.Address - Addr address.Address + Addr address.Address // AmtReserved is the amount that must be kept in the address (cannot be // withdrawn) AmtReserved abi.TokenAmount @@ -164,14 +160,13 @@ func newFundedAddress(fm *FundManager, addr address.Address) *fundedAddress { env: &fundManagerEnvironment{api: fm.api}, str: fm.str, state: &FundedAddressState{ - Wallet: fm.wallet, Addr: addr, AmtReserved: abi.NewTokenAmount(0), }, } } -// If there is a in-progress on-chain message, don't submit any more messages +// If there is an in-progress on-chain message, don't submit any more messages // on chain until it completes func (a *fundedAddress) start() { a.lk.Lock() @@ -183,22 +178,22 @@ func (a *fundedAddress) start() { } } -func (a *fundedAddress) reserve(ctx context.Context, amt abi.TokenAmount) (waitSentinel, error) { - return a.requestAndWait(ctx, amt, &a.reservations) +func (a *fundedAddress) reserve(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (WaitSentinel, error) { + return a.requestAndWait(ctx, wallet, amt, &a.reservations) } -func (a *fundedAddress) release(ctx context.Context, amt abi.TokenAmount) error { - _, err := a.requestAndWait(ctx, amt, &a.releases) +func (a *fundedAddress) release(amt abi.TokenAmount) error { + _, err := a.requestAndWait(context.Background(), address.Undef, amt, &a.releases) return err } -func (a *fundedAddress) withdraw(ctx context.Context, amt abi.TokenAmount) (waitSentinel, error) { - return a.requestAndWait(ctx, amt, &a.withdrawals) +func (a *fundedAddress) withdraw(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (WaitSentinel, error) { + return a.requestAndWait(ctx, wallet, amt, &a.withdrawals) } -func (a *fundedAddress) requestAndWait(ctx context.Context, amt abi.TokenAmount, reqs *[]*fundRequest) (waitSentinel, error) { +func (a *fundedAddress) requestAndWait(ctx context.Context, wallet address.Address, amt abi.TokenAmount, reqs *[]*fundRequest) (WaitSentinel, error) { // Create a request and add it to the request queue - req := newFundRequest(ctx, amt) + req := newFundRequest(ctx, wallet, amt) a.lk.Lock() *reqs = append(*reqs, req) @@ -210,9 +205,9 @@ func (a *fundedAddress) requestAndWait(ctx context.Context, amt abi.TokenAmount, // Wait for the results select { case <-ctx.Done(): - return waitSentinelUndef, ctx.Err() + return WaitSentinelUndef, ctx.Err() case r := <-req.Result: - return waitSentinel(r.msgCid), r.err + return WaitSentinel(r.msgCid), r.err } } @@ -243,17 +238,40 @@ func (a *fundedAddress) process() { } // Check if there's anything to do - if len(a.reservations) == 0 && len(a.releases) == 0 && len(a.withdrawals) == 0 { + haveReservations := len(a.reservations) > 0 || len(a.releases) > 0 + haveWithdrawals := len(a.withdrawals) > 0 + if !haveReservations && !haveWithdrawals { return } - res, _ := a.processRequests() + // Process reservations / releases + if haveReservations { + res, err := a.processReservations(a.reservations, a.releases) + if err == nil { + a.applyStateChange(res.msgCid, res.amtReserved) + } + a.reservations = filterOutProcessedReqs(a.reservations) + a.releases = filterOutProcessedReqs(a.releases) + } - a.reservations = filterOutProcessedReqs(a.reservations) - a.releases = filterOutProcessedReqs(a.releases) - a.withdrawals = filterOutProcessedReqs(a.withdrawals) + // If there was no message sent on chain by adding reservations, and all + // reservations have completed processing, process withdrawals + if haveWithdrawals && a.state.MsgCid == nil && len(a.reservations) == 0 { + withdrawalCid, err := a.processWithdrawals(a.withdrawals) + if err == nil && withdrawalCid != cid.Undef { + a.applyStateChange(&withdrawalCid, types.EmptyInt) + } + a.withdrawals = filterOutProcessedReqs(a.withdrawals) + } - a.applyStateChange(res) + // If a message was sent on-chain + if a.state.MsgCid != nil { + // Start waiting for results of message (async) + a.startWaitForResults(*a.state.MsgCid) + } + + // Process any remaining queued requests + go a.process() } // Filter out completed requests @@ -268,9 +286,11 @@ func filterOutProcessedReqs(reqs []*fundRequest) []*fundRequest { } // Apply the results of processing queues and save to the datastore -func (a *fundedAddress) applyStateChange(res *processResult) { - a.state.MsgCid = res.msgCid - a.state.AmtReserved = res.amtReserved +func (a *fundedAddress) applyStateChange(msgCid *cid.Cid, amtReserved abi.TokenAmount) { + a.state.MsgCid = msgCid + if !amtReserved.Nil() { + a.state.AmtReserved = amtReserved + } a.saveState() } @@ -289,59 +309,67 @@ func (a *fundedAddress) saveState() { } } -// The result of processing the request queues +// The result of processing the reservation / release queues type processResult struct { + // Requests that completed without adding funds + cancelled []*fundRequest + // Requests that added funds + added []*fundRequest + // The new reserved amount amtReserved abi.TokenAmount - // The message cid, if a message was pushed + // The message cid, if a message was submitted on-chain msgCid *cid.Cid } -// process request queues and return the resulting changes to state -func (a *fundedAddress) processRequests() (pr *processResult, prerr error) { - // If there's an error, mark reserve requests as errored +// process reservations and releases, and return the resulting changes to state +func (a *fundedAddress) processReservations(reservations []*fundRequest, releases []*fundRequest) (pr *processResult, prerr error) { + // When the function returns defer func() { + // If there's an error, mark all requests as errored if prerr != nil { - for _, req := range a.reservations { + for _, req := range append(reservations, releases...) { req.Complete(cid.Undef, prerr) } + return + } + + // Complete all release requests + for _, req := range releases { + req.Complete(cid.Undef, nil) + } + + // Complete all cancelled requests + for _, req := range pr.cancelled { + req.Complete(cid.Undef, nil) + } + + // If a message was sent + if pr.msgCid != nil { + // Complete all add funds requests + for _, req := range pr.added { + req.Complete(*pr.msgCid, nil) + } } }() - // Start with the reserved amount in state - reserved := a.state.AmtReserved + // Split reservations into those to cancel (because they are covered by + // released amounts) and those to add + toCancel, toAdd, reservedDelta := splitReservations(reservations, releases) - // Add the amount of each reserve request - for _, req := range a.reservations { - amt := req.Amount() - a.debugf("reserve %d", amt) - reserved = types.BigAdd(reserved, amt) - } - - // Subtract the amount of each release request - for _, req := range a.releases { - amt := req.Amount() - a.debugf("release %d", amt) - reserved = types.BigSub(reserved, amt) - - // Mark release as complete - req.Complete(cid.Undef, nil) - } - - // If reserved amount is negative, set it to zero + // Apply the reserved delta to the reserved amount + reserved := types.BigAdd(a.state.AmtReserved, reservedDelta) if reserved.LessThan(abi.NewTokenAmount(0)) { reserved = abi.NewTokenAmount(0) } - - res := &processResult{amtReserved: reserved} + res := &processResult{ + amtReserved: reserved, + cancelled: toCancel, + } // Work out the amount to add to the balance - toAdd := abi.NewTokenAmount(0) - - // If the new reserved amount is greater than the existing amount - if reserved.GreaterThan(a.state.AmtReserved) { - a.debugf("reserved %d > state.AmtReserved %d", reserved, a.state.AmtReserved) - + amtToAdd := abi.NewTokenAmount(0) + if reserved.GreaterThan(abi.NewTokenAmount(0)) { // Get available funds for address avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr) if err != nil { @@ -349,63 +377,98 @@ func (a *fundedAddress) processRequests() (pr *processResult, prerr error) { } // amount to add = new reserved amount - available - toAdd = types.BigSub(reserved, avail) - a.debugf("reserved %d - avail %d = %d", reserved, avail, toAdd) + amtToAdd = types.BigSub(reserved, avail) + a.debugf("reserved %d - avail %d = to add %d", reserved, avail, amtToAdd) } - // If there's nothing to add to the balance - if toAdd.LessThanEqual(abi.NewTokenAmount(0)) { - // Mark reserve requests as complete - for _, req := range a.reservations { - req.Complete(cid.Undef, nil) - } - - // Process withdrawals - return a.processWithdrawals(reserved) + // If there's nothing to add to the balance, bail out + if amtToAdd.LessThanEqual(abi.NewTokenAmount(0)) { + a.debugf(" queued for cancel %d", len(toAdd)) + res.cancelled = append(res.cancelled, toAdd...) + return res, nil } // Add funds to address - a.debugf("add funds %d", toAdd) - addFundsCid, err := a.env.AddFunds(a.ctx, a.state.Wallet, a.state.Addr, toAdd) + a.debugf("add funds %d", amtToAdd) + addFundsCid, err := a.env.AddFunds(a.ctx, toAdd[0].Wallet, a.state.Addr, amtToAdd) if err != nil { return res, err } // Mark reserve requests as complete - for _, req := range a.reservations { - req.Complete(addFundsCid, nil) - } - - // Start waiting for results (async) - defer a.startWaitForResults(addFundsCid) + res.added = toAdd // Save the message CID to state res.msgCid = &addFundsCid return res, nil } +// Split reservations into those that are under the total release amount and +// those that exceed it +func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([]*fundRequest, []*fundRequest, abi.TokenAmount) { + toCancel := make([]*fundRequest, 0, len(reservations)) + toAdd := make([]*fundRequest, 0, len(reservations)) + toAddAmt := abi.NewTokenAmount(0) + + // Sum release amounts + releaseAmt := abi.NewTokenAmount(0) + for _, req := range releases { + releaseAmt = types.BigAdd(releaseAmt, req.Amount()) + } + + // We only want to combine requests that come from the same wallet + wallet := address.Undef + for _, req := range reservations { + amt := req.Amount() + + // If the amount to add to the reserve is cancelled out by a release + if amt.LessThanEqual(releaseAmt) { + // Cancel the request and update the release total + releaseAmt = types.BigSub(releaseAmt, amt) + toCancel = append(toCancel, req) + } else { + // The amount to add is greater that the release total so we want + // to send an add funds request + + // The first time the wallet will be undefined + if wallet == address.Undef { + wallet = req.Wallet + } + // If this request's wallet is the same as the first request's + // wallet, the requests will be combined + if wallet == req.Wallet { + delta := types.BigSub(amt, releaseAmt) + toAddAmt = types.BigAdd(toAddAmt, delta) + releaseAmt = abi.NewTokenAmount(0) + toAdd = append(toAdd, req) + } + } + } + + // The change in the reserved amount is "amount to add" - "amount to release" + reservedDelta := types.BigSub(toAddAmt, releaseAmt) + + return toCancel, toAdd, reservedDelta +} + // process withdrawal queue -func (a *fundedAddress) processWithdrawals(reserved abi.TokenAmount) (pr *processResult, prerr error) { - // If there's an error, mark withdrawal requests as errored +func (a *fundedAddress) processWithdrawals(withdrawals []*fundRequest) (msgCid cid.Cid, prerr error) { + // If there's an error, mark all withdrawal requests as errored defer func() { if prerr != nil { - for _, req := range a.withdrawals { + for _, req := range withdrawals { req.Complete(cid.Undef, prerr) } } }() - res := &processResult{ - amtReserved: reserved, - } - // Get the net available balance avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr) if err != nil { - return res, err + return cid.Undef, err } - netAvail := types.BigSub(avail, reserved) + netAvail := types.BigSub(avail, a.state.AmtReserved) // Fit as many withdrawals as possible into the available balance, and fail // the rest @@ -428,18 +491,18 @@ func (a *fundedAddress) processWithdrawals(reserved abi.TokenAmount) (pr *proces // Check if there is anything to withdraw if allowedAmt.Equals(abi.NewTokenAmount(0)) { - // Mark allowed requests as complete + // Mark allowed requests as cancelled for _, req := range allowed { req.Complete(cid.Undef, nil) } - return res, nil + return cid.Undef, nil } // Withdraw funds a.debugf("withdraw funds %d", allowedAmt) - withdrawFundsCid, err := a.env.WithdrawFunds(a.ctx, a.state.Wallet, a.state.Addr, allowedAmt) + withdrawFundsCid, err := a.env.WithdrawFunds(a.ctx, allowed[0].Wallet, a.state.Addr, allowedAmt) if err != nil { - return res, err + return cid.Undef, err } // Mark allowed requests as complete @@ -447,12 +510,8 @@ func (a *fundedAddress) processWithdrawals(reserved abi.TokenAmount) (pr *proces req.Complete(withdrawFundsCid, nil) } - // Start waiting for results of message (async) - defer a.startWaitForResults(withdrawFundsCid) - // Save the message CID to state - res.msgCid = &withdrawFundsCid - return res, nil + return withdrawFundsCid, nil } // asynchonously wait for results of message @@ -491,13 +550,15 @@ type fundRequest struct { ctx context.Context amt abi.TokenAmount completed chan struct{} + Wallet address.Address Result chan reqResult } -func newFundRequest(ctx context.Context, amt abi.TokenAmount) *fundRequest { +func newFundRequest(ctx context.Context, wallet address.Address, amt abi.TokenAmount) *fundRequest { return &fundRequest{ ctx: ctx, amt: amt, + Wallet: wallet, Result: make(chan reqResult), completed: make(chan struct{}), } @@ -532,10 +593,6 @@ func (frp *fundRequest) Completed() bool { } } -func (frp *fundRequest) Equals(other *fundRequest) bool { - return frp == other -} - // fundManagerEnvironment simplifies some API calls type fundManagerEnvironment struct { api fundManagerAPI @@ -556,7 +613,24 @@ func (env *fundManagerEnvironment) AddFunds( addr address.Address, amt abi.TokenAmount, ) (cid.Cid, error) { - return env.sendFunds(ctx, wallet, addr, amt) + params, err := actors.SerializeParams(&addr) + if err != nil { + return cid.Undef, err + } + + smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{ + To: market.Address, + From: wallet, + Value: amt, + Method: market.Methods.AddBalance, + Params: params, + }, nil) + + if aerr != nil { + return cid.Undef, aerr + } + + return smsg.Cid(), nil } func (env *fundManagerEnvironment) WithdrawFunds( @@ -565,25 +639,19 @@ func (env *fundManagerEnvironment) WithdrawFunds( addr address.Address, amt abi.TokenAmount, ) (cid.Cid, error) { - return env.sendFunds(ctx, addr, wallet, amt) -} - -func (env *fundManagerEnvironment) sendFunds( - ctx context.Context, - from address.Address, - to address.Address, - amt abi.TokenAmount, -) (cid.Cid, error) { - params, err := actors.SerializeParams(&to) + params, err := actors.SerializeParams(&market.WithdrawBalanceParams{ + ProviderOrClientAddress: addr, + Amount: amt, + }) if err != nil { - return cid.Undef, err + return cid.Undef, xerrors.Errorf("serializing params: %w", err) } smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{ To: market.Address, - From: from, - Value: amt, - Method: market.Methods.AddBalance, + From: wallet, + Value: types.NewInt(0), + Method: market.Methods.WithdrawBalance, Params: params, }, nil) diff --git a/chain/market/fundmanager_test.go b/chain/market/fundmanager_test.go index a8fabc2fc..3ad3a1669 100644 --- a/chain/market/fundmanager_test.go +++ b/chain/market/fundmanager_test.go @@ -35,11 +35,11 @@ func TestFundManagerBasic(t *testing.T) { // balance: 0 -> 10 // reserved: 0 -> 10 amt := abi.NewTokenAmount(10) - sentinel, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinel, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) msg := s.mockApi.getSentMessage(cid.Cid(sentinel)) - checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) + checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) s.mockApi.completeMsg(cid.Cid(sentinel)) err = s.fm.Wait(s.ctx, sentinel) @@ -49,11 +49,11 @@ func TestFundManagerBasic(t *testing.T) { // balance: 10 -> 17 // reserved: 10 -> 17 amt = abi.NewTokenAmount(7) - sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) - checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) + checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) s.mockApi.completeMsg(cid.Cid(sentinel)) err = s.fm.Wait(s.ctx, sentinel) @@ -63,18 +63,18 @@ func TestFundManagerBasic(t *testing.T) { // balance: 17 // reserved: 17 -> 12 amt = abi.NewTokenAmount(5) - err = s.fm.Release(s.ctx, s.acctAddr, amt) + err = s.fm.Release(s.acctAddr, amt) require.NoError(t, err) // Withdraw 2 // balance: 17 -> 15 // reserved: 12 amt = abi.NewTokenAmount(2) - sentinel, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) - checkMessageFields(t, msg, s.acctAddr, s.walletAddr, amt) + checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) s.mockApi.completeMsg(cid.Cid(sentinel)) err = s.fm.Wait(s.ctx, sentinel) @@ -87,10 +87,10 @@ func TestFundManagerBasic(t *testing.T) { // message msgCount := s.mockApi.messageCount() amt = abi.NewTokenAmount(3) - sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) require.Equal(t, msgCount, s.mockApi.messageCount()) - require.Equal(t, sentinel, waitSentinelUndef) + require.Equal(t, sentinel, WaitSentinelUndef) // Reserve 1 // balance: 15 -> 16 @@ -99,12 +99,12 @@ func TestFundManagerBasic(t *testing.T) { // message to top up balance amt = abi.NewTokenAmount(1) topUp := abi.NewTokenAmount(1) - sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) s.mockApi.completeMsg(cid.Cid(sentinel)) msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) - checkMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp) + checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp) // Withdraw 1 // balance: 16 @@ -112,7 +112,7 @@ func TestFundManagerBasic(t *testing.T) { // Note: Expect failure because there is no available balance to withdraw: // balance - reserved = 16 - 16 = 0 amt = abi.NewTokenAmount(1) - sentinel, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) require.Error(t, err) } @@ -123,7 +123,7 @@ func TestFundManagerParallel(t *testing.T) { // Reserve 10 amt := abi.NewTokenAmount(10) - sentinelReserve10, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinelReserve10, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) // Wait until all the subsequent requests are queued up @@ -141,16 +141,16 @@ func TestFundManagerParallel(t *testing.T) { withdrawReady := make(chan error) go func() { amt = abi.NewTokenAmount(5) - _, err := s.fm.Withdraw(s.ctx, s.acctAddr, amt) + _, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) withdrawReady <- err }() - reserveSentinels := make(chan waitSentinel) + reserveSentinels := make(chan WaitSentinel) // Reserve 3 go func() { amt := abi.NewTokenAmount(3) - sentinelReserve3, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinelReserve3, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) reserveSentinels <- sentinelReserve3 }() @@ -158,7 +158,7 @@ func TestFundManagerParallel(t *testing.T) { // Reserve 5 go func() { amt := abi.NewTokenAmount(5) - sentinelReserve5, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinelReserve5, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) reserveSentinels <- sentinelReserve5 }() @@ -166,7 +166,7 @@ func TestFundManagerParallel(t *testing.T) { // Release 2 go func() { amt := abi.NewTokenAmount(2) - err = s.fm.Release(s.ctx, s.acctAddr, amt) + err = s.fm.Release(s.acctAddr, amt) require.NoError(t, err) }() @@ -176,7 +176,7 @@ func TestFundManagerParallel(t *testing.T) { // Complete the "Reserve 10" message s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) msg := s.mockApi.getSentMessage(cid.Cid(sentinelReserve10)) - checkMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10)) + checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10)) // The other requests should now be combined and be submitted on-chain as // a single message @@ -200,7 +200,7 @@ func TestFundManagerParallel(t *testing.T) { // "Reserve 5" +5 // "Release 2" -2 // Result: 6 - checkMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(6)) + checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(6)) // Expect withdraw to fail because not enough available funds err = <-withdrawReady @@ -215,21 +215,21 @@ func TestFundManagerWithdrawal(t *testing.T) { // Reserve 10 amt := abi.NewTokenAmount(10) - sentinelReserve10, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinelReserve10, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) // Complete the "Reserve 10" message s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) // Release 10 - err = s.fm.Release(s.ctx, s.acctAddr, amt) + err = s.fm.Release(s.acctAddr, amt) require.NoError(t, err) // Available 10 // Withdraw 6 // Expect success amt = abi.NewTokenAmount(6) - sentinelWithdraw, err := s.fm.Withdraw(s.ctx, s.acctAddr, amt) + sentinelWithdraw, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) s.mockApi.completeMsg(cid.Cid(sentinelWithdraw)) @@ -240,7 +240,7 @@ func TestFundManagerWithdrawal(t *testing.T) { // Withdraw 4 // Expect success amt = abi.NewTokenAmount(4) - sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) s.mockApi.completeMsg(cid.Cid(sentinelWithdraw)) @@ -251,7 +251,7 @@ func TestFundManagerWithdrawal(t *testing.T) { // Withdraw 1 // Expect FAIL amt = abi.NewTokenAmount(1) - sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt) + sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) require.Error(t, err) } @@ -265,19 +265,19 @@ func TestFundManagerRestart(t *testing.T) { // Address 1: Reserve 10 amt := abi.NewTokenAmount(10) - sentinelAddr1, err := s.fm.Reserve(s.ctx, s.acctAddr, amt) + sentinelAddr1, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) msg := s.mockApi.getSentMessage(cid.Cid(sentinelAddr1)) - checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) + checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) // Address 2: Reserve 7 amt2 := abi.NewTokenAmount(7) - sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, acctAddr2, amt2) + sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, s.walletAddr, acctAddr2, amt2) require.NoError(t, err) msg2 := s.mockApi.getSentMessage(cid.Cid(sentinelAddr2Res7)) - checkMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2) + checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2) // Complete "Address 1: Reserve 10" s.mockApi.completeMsg(cid.Cid(sentinelAddr1)) @@ -289,14 +289,15 @@ func TestFundManagerRestart(t *testing.T) { // Restart mockApiAfter := s.mockApi - fmAfter := NewFundManager(mockApiAfter, s.ds, s.walletAddr) - fmAfter.Start() + fmAfter := NewFundManager(mockApiAfter, s.ds) + err = fmAfter.Start() + require.NoError(t, err) amt3 := abi.NewTokenAmount(9) - reserveSentinel := make(chan waitSentinel) + reserveSentinel := make(chan WaitSentinel) go func() { // Address 2: Reserve 9 - sentinel3, err := fmAfter.Reserve(s.ctx, acctAddr2, amt3) + sentinel3, err := fmAfter.Reserve(s.ctx, s.walletAddr, acctAddr2, amt3) require.NoError(t, err) reserveSentinel <- sentinel3 }() @@ -317,7 +318,7 @@ func TestFundManagerRestart(t *testing.T) { // Expect waiting message to now be sent sentinel3 := <-reserveSentinel msg3 := mockApiAfter.getSentMessage(cid.Cid(sentinel3)) - checkMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3) + checkAddMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3) } type scaffold struct { @@ -346,7 +347,7 @@ func setup(t *testing.T) *scaffold { mockApi := newMockFundManagerAPI(walletAddr) ds := ds_sync.MutexWrap(ds.NewMapDatastore()) - fm := NewFundManager(mockApi, ds, walletAddr) + fm := NewFundManager(mockApi, ds) return &scaffold{ ctx: ctx, ds: ds, @@ -357,7 +358,7 @@ func setup(t *testing.T) *scaffold { } } -func checkMessageFields(t *testing.T, msg *types.Message, from address.Address, to address.Address, amt abi.TokenAmount) { +func checkAddMessageFields(t *testing.T, msg *types.Message, from address.Address, to address.Address, amt abi.TokenAmount) { require.Equal(t, from, msg.From) require.Equal(t, market.Address, msg.To) require.Equal(t, amt, msg.Value) @@ -368,6 +369,18 @@ func checkMessageFields(t *testing.T, msg *types.Message, from address.Address, require.Equal(t, to, paramsTo) } +func checkWithdrawMessageFields(t *testing.T, msg *types.Message, from address.Address, addr address.Address, amt abi.TokenAmount) { + require.Equal(t, from, msg.From) + require.Equal(t, market.Address, msg.To) + require.Equal(t, abi.NewTokenAmount(0), msg.Value) + + var params market.WithdrawBalanceParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Equal(t, addr, params.ProviderOrClientAddress) + require.Equal(t, amt, params.Amount) +} + type sentMsg struct { msg *types.SignedMessage ready chan struct{} @@ -428,7 +441,7 @@ func (mapi *mockFundManagerAPI) completeMsg(msgCid cid.Cid) { pmsg, ok := mapi.sentMsgs[msgCid] if ok { - if pmsg.msg.Message.From == mapi.wallet { + if pmsg.msg.Message.Method == market.Methods.AddBalance { var escrowAcct address.Address err := escrowAcct.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params)) if err != nil { @@ -441,10 +454,16 @@ func (mapi *mockFundManagerAPI) completeMsg(msgCid cid.Cid) { mapi.escrow[escrowAcct] = escrow log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow) } else { - escrowAcct := pmsg.msg.Message.From + var params market.WithdrawBalanceParams + err := params.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params)) + if err != nil { + panic(err) + } + escrowAcct := params.ProviderOrClientAddress + escrow := mapi.getEscrow(escrowAcct) before := escrow - escrow = types.BigSub(escrow, pmsg.msg.Message.Value) + escrow = types.BigSub(escrow, params.Amount) mapi.escrow[escrowAcct] = escrow log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow) }