From a51a62cb42a5eb4ce2abf40790a15cd6eecdcaee Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 9 Nov 2020 15:33:13 +0100 Subject: [PATCH] fix: batch withdrawal requests by wallet --- chain/market/fundmanager.go | 75 +++++--- chain/market/fundmanager_test.go | 319 +++++++++++++++++++++++++++---- 2 files changed, 331 insertions(+), 63 deletions(-) diff --git a/chain/market/fundmanager.go b/chain/market/fundmanager.go index a69779612..3425a0ea8 100644 --- a/chain/market/fundmanager.go +++ b/chain/market/fundmanager.go @@ -227,9 +227,10 @@ func (a *fundedAddress) process() { // Used by the tests if a.onProcessStartListener != nil { done := a.onProcessStartListener() - if done { - a.onProcessStartListener = nil + if !done { + return } + a.onProcessStartListener = nil } // Check if we're still waiting for the response to a message @@ -387,7 +388,6 @@ func (a *fundedAddress) processReservations(reservations []*fundRequest, release // If there's nothing to add to the balance, bail out if amtToAdd.LessThanEqual(abi.NewTokenAmount(0)) { - a.debugf(" queued for cancel %d", len(toAdd)) res.covered = append(res.covered, toAdd...) return res, nil } @@ -424,7 +424,7 @@ func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([] } // We only want to combine requests that come from the same wallet - wallet := address.Undef + batchWallet := address.Undef for _, req := range reservations { amt := req.Amount() @@ -433,22 +433,23 @@ func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([] // Cancel the request and update the release total releaseAmt = types.BigSub(releaseAmt, amt) toCancel = append(toCancel, req) - } else { - // The amount to add is greater that the release total so we want - // to send an add funds request + continue + } - // The first time the wallet will be undefined - if wallet == address.Undef { - wallet = req.Wallet - } - // If this request's wallet is the same as the first request's - // wallet, the requests will be combined - if wallet == req.Wallet { - delta := types.BigSub(amt, releaseAmt) - toAddAmt = types.BigAdd(toAddAmt, delta) - releaseAmt = abi.NewTokenAmount(0) - toAdd = append(toAdd, req) - } + // The amount to add is greater that the release total so we want + // to send an add funds request + + // The first time the wallet will be undefined + if batchWallet == address.Undef { + batchWallet = req.Wallet + } + // If this request's wallet is the same as the batch wallet, + // the requests will be combined + if batchWallet == req.Wallet { + delta := types.BigSub(amt, releaseAmt) + toAddAmt = types.BigAdd(toAddAmt, delta) + releaseAmt = abi.NewTokenAmount(0) + toAdd = append(toAdd, req) } } @@ -482,18 +483,42 @@ func (a *fundedAddress) processWithdrawals(withdrawals []*fundRequest) (msgCid c withdrawalAmt := abi.NewTokenAmount(0) allowedAmt := abi.NewTokenAmount(0) allowed := make([]*fundRequest, 0, len(a.withdrawals)) + var batchWallet address.Address for _, req := range a.withdrawals { amt := req.Amount() - withdrawalAmt = types.BigAdd(withdrawalAmt, amt) - if withdrawalAmt.LessThanEqual(netAvail) { - a.debugf("withdraw %d", amt) - allowed = append(allowed, req) - allowedAmt = types.BigAdd(allowedAmt, amt) - } else { + if amt.IsZero() { + // If the context for the request was cancelled, bail out + req.Complete(cid.Undef, err) + continue + } + + // If the amount would exceed the available amount, complete the + // request with an error + newWithdrawalAmt := types.BigAdd(withdrawalAmt, amt) + if newWithdrawalAmt.GreaterThan(netAvail) { err := xerrors.Errorf("insufficient funds for withdrawal %d", amt) a.debugf("%s", err) req.Complete(cid.Undef, err) + continue } + + // If this is the first allowed withdrawal request in this batch, save + // its wallet address + if batchWallet == address.Undef { + batchWallet = req.Wallet + } + // If the request wallet doesn't match the batch wallet, bail out + // (the withdrawal will be processed after the current batch has + // completed) + if req.Wallet != batchWallet { + continue + } + + // Include this withdrawal request in the batch + withdrawalAmt = newWithdrawalAmt + a.debugf("withdraw %d", amt) + allowed = append(allowed, req) + allowedAmt = types.BigAdd(allowedAmt, amt) } // Check if there is anything to withdraw. diff --git a/chain/market/fundmanager_test.go b/chain/market/fundmanager_test.go index 3ad3a1669..4e97c620c 100644 --- a/chain/market/fundmanager_test.go +++ b/chain/market/fundmanager_test.go @@ -38,7 +38,7 @@ func TestFundManagerBasic(t *testing.T) { sentinel, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) - msg := s.mockApi.getSentMessage(cid.Cid(sentinel)) + msg := s.mockApi.getSentMessage(sentinel) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) s.mockApi.completeMsg(cid.Cid(sentinel)) @@ -52,7 +52,7 @@ func TestFundManagerBasic(t *testing.T) { sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) - msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) + msg = s.mockApi.getSentMessage(sentinel) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) s.mockApi.completeMsg(cid.Cid(sentinel)) @@ -73,7 +73,7 @@ func TestFundManagerBasic(t *testing.T) { sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) - msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) + msg = s.mockApi.getSentMessage(sentinel) checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) s.mockApi.completeMsg(cid.Cid(sentinel)) @@ -103,7 +103,7 @@ func TestFundManagerBasic(t *testing.T) { require.NoError(t, err) s.mockApi.completeMsg(cid.Cid(sentinel)) - msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) + msg = s.mockApi.getSentMessage(sentinel) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp) // Withdraw 1 @@ -175,7 +175,7 @@ func TestFundManagerParallel(t *testing.T) { // Complete the "Reserve 10" message s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) - msg := s.mockApi.getSentMessage(cid.Cid(sentinelReserve10)) + msg := s.mockApi.getSentMessage(sentinelReserve10) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10)) // The other requests should now be combined and be submitted on-chain as @@ -194,7 +194,7 @@ func TestFundManagerParallel(t *testing.T) { // Complete the message s.mockApi.completeMsg(cid.Cid(rs1)) - msg = s.mockApi.getSentMessage(cid.Cid(rs1)) + msg = s.mockApi.getSentMessage(rs1) // "Reserve 3" +3 // "Reserve 5" +5 @@ -207,9 +207,103 @@ func TestFundManagerParallel(t *testing.T) { require.Error(t, err) } +// TestFundManagerReserveByWallet verifies that reserve requests are grouped by wallet +func TestFundManagerReserveByWallet(t *testing.T) { + s := setup(t) + defer s.fm.Stop() + + walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1) + require.NoError(t, err) + walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1) + require.NoError(t, err) + + // Wait until all the reservation requests are queued up + walletAQueuedUp := make(chan struct{}) + queueReady := make(chan struct{}) + fa := s.fm.getFundedAddress(s.acctAddr) + fa.onProcessStart(func() bool { + if len(fa.reservations) == 1 { + close(walletAQueuedUp) + } + if len(fa.reservations) == 3 { + close(queueReady) + return true + } + return false + }) + + type reserveResult struct { + ws WaitSentinel + err error + } + results := make(chan *reserveResult) + + amtA1 := abi.NewTokenAmount(1) + go func() { + // Wallet A: Reserve 1 + sentinelA1, err := s.fm.Reserve(s.ctx, walletAddrA, s.acctAddr, amtA1) + results <- &reserveResult{ + ws: sentinelA1, + err: err, + } + }() + + amtB1 := abi.NewTokenAmount(2) + amtB2 := abi.NewTokenAmount(3) + go func() { + // Wait for reservation for wallet A to be queued up + <-walletAQueuedUp + + // Wallet B: Reserve 2 + go func() { + sentinelB1, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB1) + results <- &reserveResult{ + ws: sentinelB1, + err: err, + } + }() + + // Wallet B: Reserve 3 + sentinelB2, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB2) + results <- &reserveResult{ + ws: sentinelB2, + err: err, + } + }() + + // All reservation requests are queued up + <-queueReady + + resA := <-results + sentinelA1 := resA.ws + + // Should send to wallet A + msg := s.mockApi.getSentMessage(sentinelA1) + checkAddMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1) + + // Complete wallet A message + s.mockApi.completeMsg(cid.Cid(sentinelA1)) + err = s.fm.Wait(s.ctx, sentinelA1) + require.NoError(t, err) + + resB1 := <-results + resB2 := <-results + require.NoError(t, resB1.err) + require.NoError(t, resB2.err) + sentinelB1 := resB1.ws + sentinelB2 := resB2.ws + + // Should send different message to wallet B + require.NotEqual(t, sentinelA1, sentinelB1) + // Should be single message combining amount 1 and 2 + require.Equal(t, sentinelB1, sentinelB2) + msg = s.mockApi.getSentMessage(sentinelB1) + checkAddMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2)) +} + // TestFundManagerWithdrawal verifies that as many withdraw operations as // possible are processed -func TestFundManagerWithdrawal(t *testing.T) { +func TestFundManagerWithdrawalLimit(t *testing.T) { s := setup(t) defer s.fm.Stop() @@ -225,34 +319,180 @@ func TestFundManagerWithdrawal(t *testing.T) { err = s.fm.Release(s.acctAddr, amt) require.NoError(t, err) + // Wait until all the withdraw requests are queued up + queueReady := make(chan struct{}) + fa := s.fm.getFundedAddress(s.acctAddr) + withdrawalReqCount := 0 + withdrawalReqQueued := make(chan struct{}, 3) + fa.onProcessStart(func() bool { + if len(fa.withdrawals) > withdrawalReqCount { + withdrawalReqCount++ + withdrawalReqQueued <- struct{}{} + } + if withdrawalReqCount == 3 { + close(queueReady) + return true + } + return false + }) + + type withdrawResult struct { + reqIndex int + ws WaitSentinel + err error + } + withdrawRes := make(chan *withdrawResult) + for i := 0; i < 3; i++ { + i := i + go func() { + if i > 0 { + <-withdrawalReqQueued + } + amt := abi.NewTokenAmount(5) + ws, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) + withdrawRes <- &withdrawResult{reqIndex: i, ws: ws, err: err} + }() + } + + // Withdrawal requests are queued up + <-queueReady + + results := make([]*withdrawResult, 3) + for i := 0; i < 3; i++ { + res := <-withdrawRes + results[res.reqIndex] = res + } + // Available 10 - // Withdraw 6 - // Expect success - amt = abi.NewTokenAmount(6) - sentinelWithdraw, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) - require.NoError(t, err) - - s.mockApi.completeMsg(cid.Cid(sentinelWithdraw)) - err = s.fm.Wait(s.ctx, sentinelWithdraw) - require.NoError(t, err) - - // Available 4 - // Withdraw 4 - // Expect success - amt = abi.NewTokenAmount(4) - sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) - require.NoError(t, err) - - s.mockApi.completeMsg(cid.Cid(sentinelWithdraw)) - err = s.fm.Wait(s.ctx, sentinelWithdraw) - require.NoError(t, err) - + // Withdraw 5 + // Expect Success + require.NoError(t, results[0].err) + // Available 5 + // Withdraw 5 + // Expect Success + require.NoError(t, results[1].err) // Available 0 - // Withdraw 1 + // Withdraw 5 // Expect FAIL - amt = abi.NewTokenAmount(1) - sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) - require.Error(t, err) + require.Error(t, results[2].err) + + // Expect withdrawal requests that fit under reserved amount to be combined + // into a single message on-chain + require.Equal(t, results[0].ws, results[1].ws) + + s.mockApi.completeMsg(cid.Cid(results[0].ws)) + err = s.fm.Wait(s.ctx, results[0].ws) + require.NoError(t, err) +} + +// TestFundManagerWithdrawByWallet verifies that withdraw requests are grouped by wallet +func TestFundManagerWithdrawByWallet(t *testing.T) { + s := setup(t) + defer s.fm.Stop() + + walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1) + require.NoError(t, err) + walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1) + require.NoError(t, err) + + // Reserve 10 + reserveAmt := abi.NewTokenAmount(10) + sentinelReserve, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, reserveAmt) + require.NoError(t, err) + s.mockApi.completeMsg(cid.Cid(sentinelReserve)) + + time.Sleep(10 * time.Millisecond) + + // Release 10 + err = s.fm.Release(s.acctAddr, reserveAmt) + require.NoError(t, err) + + type withdrawResult struct { + ws WaitSentinel + err error + } + results := make(chan *withdrawResult) + + // Wait until withdrawals are queued up + walletAQueuedUp := make(chan struct{}) + queueReady := make(chan struct{}) + withdrawalCount := 0 + fa := s.fm.getFundedAddress(s.acctAddr) + fa.onProcessStart(func() bool { + if len(fa.withdrawals) == withdrawalCount { + return false + } + withdrawalCount = len(fa.withdrawals) + + if withdrawalCount == 1 { + close(walletAQueuedUp) + } else if withdrawalCount == 3 { + close(queueReady) + return true + } + return false + }) + + amtA1 := abi.NewTokenAmount(1) + go func() { + // Wallet A: Withdraw 1 + sentinelA1, err := s.fm.Withdraw(s.ctx, walletAddrA, s.acctAddr, amtA1) + results <- &withdrawResult{ + ws: sentinelA1, + err: err, + } + }() + + amtB1 := abi.NewTokenAmount(2) + amtB2 := abi.NewTokenAmount(3) + go func() { + // Wait until withdraw for wallet A is queued up + <-walletAQueuedUp + + // Wallet B: Withdraw 2 + go func() { + sentinelB1, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB1) + results <- &withdrawResult{ + ws: sentinelB1, + err: err, + } + }() + + // Wallet B: Withdraw 3 + sentinelB2, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB2) + results <- &withdrawResult{ + ws: sentinelB2, + err: err, + } + }() + + // Withdrawals are queued up + <-queueReady + + // Should withdraw from wallet A first + resA1 := <-results + sentinelA1 := resA1.ws + msg := s.mockApi.getSentMessage(sentinelA1) + checkWithdrawMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1) + + // Complete wallet A message + s.mockApi.completeMsg(cid.Cid(sentinelA1)) + err = s.fm.Wait(s.ctx, sentinelA1) + require.NoError(t, err) + + resB1 := <-results + resB2 := <-results + require.NoError(t, resB1.err) + require.NoError(t, resB2.err) + sentinelB1 := resB1.ws + sentinelB2 := resB2.ws + + // Should send different message for wallet B from wallet A + require.NotEqual(t, sentinelA1, sentinelB1) + // Should be single message combining amount 1 and 2 + require.Equal(t, sentinelB1, sentinelB2) + msg = s.mockApi.getSentMessage(sentinelB1) + checkWithdrawMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2)) } // TestFundManagerRestart verifies that waiting for incomplete requests resumes @@ -268,7 +508,7 @@ func TestFundManagerRestart(t *testing.T) { sentinelAddr1, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) require.NoError(t, err) - msg := s.mockApi.getSentMessage(cid.Cid(sentinelAddr1)) + msg := s.mockApi.getSentMessage(sentinelAddr1) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) // Address 2: Reserve 7 @@ -276,7 +516,7 @@ func TestFundManagerRestart(t *testing.T) { sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, s.walletAddr, acctAddr2, amt2) require.NoError(t, err) - msg2 := s.mockApi.getSentMessage(cid.Cid(sentinelAddr2Res7)) + msg2 := s.mockApi.getSentMessage(sentinelAddr2Res7) checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2) // Complete "Address 1: Reserve 10" @@ -317,13 +557,14 @@ func TestFundManagerRestart(t *testing.T) { // Expect waiting message to now be sent sentinel3 := <-reserveSentinel - msg3 := mockApiAfter.getSentMessage(cid.Cid(sentinel3)) + msg3 := mockApiAfter.getSentMessage(sentinel3) checkAddMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3) } type scaffold struct { ctx context.Context ds *ds_sync.MutexDatastore + wllt *wallet.LocalWallet walletAddr address.Address acctAddr address.Address mockApi *mockFundManagerAPI @@ -333,12 +574,12 @@ type scaffold struct { func setup(t *testing.T) *scaffold { ctx := context.Background() - w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + wallet, err := wallet.NewWallet(wallet.NewMemKeyStore()) if err != nil { t.Fatal(err) } - walletAddr, err := w.WalletNew(context.Background(), types.KTSecp256k1) + walletAddr, err := wallet.WalletNew(context.Background(), types.KTSecp256k1) if err != nil { t.Fatal(err) } @@ -351,6 +592,7 @@ func setup(t *testing.T) *scaffold { return &scaffold{ ctx: ctx, ds: ds, + wllt: wallet, walletAddr: walletAddr, acctAddr: acctAddr, mockApi: mockApi, @@ -416,10 +658,11 @@ func (mapi *mockFundManagerAPI) MpoolPushMessage(ctx context.Context, message *t return smsg, nil } -func (mapi *mockFundManagerAPI) getSentMessage(c cid.Cid) *types.Message { +func (mapi *mockFundManagerAPI) getSentMessage(ws WaitSentinel) *types.Message { mapi.lk.Lock() defer mapi.lk.Unlock() + c := cid.Cid(ws) for i := 0; i < 1000; i++ { if pending, ok := mapi.sentMsgs[c]; ok { return &pending.msg.Message