fix: batch withdrawal requests by wallet

This commit is contained in:
Dirk McCormick 2020-11-09 15:33:13 +01:00 committed by hannahhoward
parent 86c204d8a8
commit a51a62cb42
2 changed files with 331 additions and 63 deletions

View File

@ -227,9 +227,10 @@ func (a *fundedAddress) process() {
// Used by the tests // Used by the tests
if a.onProcessStartListener != nil { if a.onProcessStartListener != nil {
done := a.onProcessStartListener() done := a.onProcessStartListener()
if done { if !done {
a.onProcessStartListener = nil return
} }
a.onProcessStartListener = nil
} }
// Check if we're still waiting for the response to a message // Check if we're still waiting for the response to a message
@ -387,7 +388,6 @@ func (a *fundedAddress) processReservations(reservations []*fundRequest, release
// If there's nothing to add to the balance, bail out // If there's nothing to add to the balance, bail out
if amtToAdd.LessThanEqual(abi.NewTokenAmount(0)) { if amtToAdd.LessThanEqual(abi.NewTokenAmount(0)) {
a.debugf(" queued for cancel %d", len(toAdd))
res.covered = append(res.covered, toAdd...) res.covered = append(res.covered, toAdd...)
return res, nil return res, nil
} }
@ -424,7 +424,7 @@ func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([]
} }
// We only want to combine requests that come from the same wallet // We only want to combine requests that come from the same wallet
wallet := address.Undef batchWallet := address.Undef
for _, req := range reservations { for _, req := range reservations {
amt := req.Amount() amt := req.Amount()
@ -433,22 +433,23 @@ func splitReservations(reservations []*fundRequest, releases []*fundRequest) ([]
// Cancel the request and update the release total // Cancel the request and update the release total
releaseAmt = types.BigSub(releaseAmt, amt) releaseAmt = types.BigSub(releaseAmt, amt)
toCancel = append(toCancel, req) toCancel = append(toCancel, req)
} else { continue
// The amount to add is greater that the release total so we want }
// to send an add funds request
// The first time the wallet will be undefined // The amount to add is greater that the release total so we want
if wallet == address.Undef { // to send an add funds request
wallet = req.Wallet
} // The first time the wallet will be undefined
// If this request's wallet is the same as the first request's if batchWallet == address.Undef {
// wallet, the requests will be combined batchWallet = req.Wallet
if wallet == req.Wallet { }
delta := types.BigSub(amt, releaseAmt) // If this request's wallet is the same as the batch wallet,
toAddAmt = types.BigAdd(toAddAmt, delta) // the requests will be combined
releaseAmt = abi.NewTokenAmount(0) if batchWallet == req.Wallet {
toAdd = append(toAdd, req) delta := types.BigSub(amt, releaseAmt)
} toAddAmt = types.BigAdd(toAddAmt, delta)
releaseAmt = abi.NewTokenAmount(0)
toAdd = append(toAdd, req)
} }
} }
@ -482,18 +483,42 @@ func (a *fundedAddress) processWithdrawals(withdrawals []*fundRequest) (msgCid c
withdrawalAmt := abi.NewTokenAmount(0) withdrawalAmt := abi.NewTokenAmount(0)
allowedAmt := abi.NewTokenAmount(0) allowedAmt := abi.NewTokenAmount(0)
allowed := make([]*fundRequest, 0, len(a.withdrawals)) allowed := make([]*fundRequest, 0, len(a.withdrawals))
var batchWallet address.Address
for _, req := range a.withdrawals { for _, req := range a.withdrawals {
amt := req.Amount() amt := req.Amount()
withdrawalAmt = types.BigAdd(withdrawalAmt, amt) if amt.IsZero() {
if withdrawalAmt.LessThanEqual(netAvail) { // If the context for the request was cancelled, bail out
a.debugf("withdraw %d", amt) req.Complete(cid.Undef, err)
allowed = append(allowed, req) continue
allowedAmt = types.BigAdd(allowedAmt, amt) }
} else {
// If the amount would exceed the available amount, complete the
// request with an error
newWithdrawalAmt := types.BigAdd(withdrawalAmt, amt)
if newWithdrawalAmt.GreaterThan(netAvail) {
err := xerrors.Errorf("insufficient funds for withdrawal %d", amt) err := xerrors.Errorf("insufficient funds for withdrawal %d", amt)
a.debugf("%s", err) a.debugf("%s", err)
req.Complete(cid.Undef, err) req.Complete(cid.Undef, err)
continue
} }
// If this is the first allowed withdrawal request in this batch, save
// its wallet address
if batchWallet == address.Undef {
batchWallet = req.Wallet
}
// If the request wallet doesn't match the batch wallet, bail out
// (the withdrawal will be processed after the current batch has
// completed)
if req.Wallet != batchWallet {
continue
}
// Include this withdrawal request in the batch
withdrawalAmt = newWithdrawalAmt
a.debugf("withdraw %d", amt)
allowed = append(allowed, req)
allowedAmt = types.BigAdd(allowedAmt, amt)
} }
// Check if there is anything to withdraw. // Check if there is anything to withdraw.

View File

@ -38,7 +38,7 @@ func TestFundManagerBasic(t *testing.T) {
sentinel, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) sentinel, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
require.NoError(t, err) require.NoError(t, err)
msg := s.mockApi.getSentMessage(cid.Cid(sentinel)) msg := s.mockApi.getSentMessage(sentinel)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(cid.Cid(sentinel))
@ -52,7 +52,7 @@ func TestFundManagerBasic(t *testing.T) {
sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
require.NoError(t, err) require.NoError(t, err)
msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) msg = s.mockApi.getSentMessage(sentinel)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(cid.Cid(sentinel))
@ -73,7 +73,7 @@ func TestFundManagerBasic(t *testing.T) {
sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
require.NoError(t, err) require.NoError(t, err)
msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) msg = s.mockApi.getSentMessage(sentinel)
checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(cid.Cid(sentinel))
@ -103,7 +103,7 @@ func TestFundManagerBasic(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(cid.Cid(sentinel))
msg = s.mockApi.getSentMessage(cid.Cid(sentinel)) msg = s.mockApi.getSentMessage(sentinel)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp)
// Withdraw 1 // Withdraw 1
@ -175,7 +175,7 @@ func TestFundManagerParallel(t *testing.T) {
// Complete the "Reserve 10" message // Complete the "Reserve 10" message
s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) s.mockApi.completeMsg(cid.Cid(sentinelReserve10))
msg := s.mockApi.getSentMessage(cid.Cid(sentinelReserve10)) msg := s.mockApi.getSentMessage(sentinelReserve10)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10)) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10))
// The other requests should now be combined and be submitted on-chain as // The other requests should now be combined and be submitted on-chain as
@ -194,7 +194,7 @@ func TestFundManagerParallel(t *testing.T) {
// Complete the message // Complete the message
s.mockApi.completeMsg(cid.Cid(rs1)) s.mockApi.completeMsg(cid.Cid(rs1))
msg = s.mockApi.getSentMessage(cid.Cid(rs1)) msg = s.mockApi.getSentMessage(rs1)
// "Reserve 3" +3 // "Reserve 3" +3
// "Reserve 5" +5 // "Reserve 5" +5
@ -207,9 +207,103 @@ func TestFundManagerParallel(t *testing.T) {
require.Error(t, err) require.Error(t, err)
} }
// TestFundManagerReserveByWallet verifies that reserve requests are grouped by wallet
func TestFundManagerReserveByWallet(t *testing.T) {
s := setup(t)
defer s.fm.Stop()
walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
require.NoError(t, err)
walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
require.NoError(t, err)
// Wait until all the reservation requests are queued up
walletAQueuedUp := make(chan struct{})
queueReady := make(chan struct{})
fa := s.fm.getFundedAddress(s.acctAddr)
fa.onProcessStart(func() bool {
if len(fa.reservations) == 1 {
close(walletAQueuedUp)
}
if len(fa.reservations) == 3 {
close(queueReady)
return true
}
return false
})
type reserveResult struct {
ws WaitSentinel
err error
}
results := make(chan *reserveResult)
amtA1 := abi.NewTokenAmount(1)
go func() {
// Wallet A: Reserve 1
sentinelA1, err := s.fm.Reserve(s.ctx, walletAddrA, s.acctAddr, amtA1)
results <- &reserveResult{
ws: sentinelA1,
err: err,
}
}()
amtB1 := abi.NewTokenAmount(2)
amtB2 := abi.NewTokenAmount(3)
go func() {
// Wait for reservation for wallet A to be queued up
<-walletAQueuedUp
// Wallet B: Reserve 2
go func() {
sentinelB1, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB1)
results <- &reserveResult{
ws: sentinelB1,
err: err,
}
}()
// Wallet B: Reserve 3
sentinelB2, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB2)
results <- &reserveResult{
ws: sentinelB2,
err: err,
}
}()
// All reservation requests are queued up
<-queueReady
resA := <-results
sentinelA1 := resA.ws
// Should send to wallet A
msg := s.mockApi.getSentMessage(sentinelA1)
checkAddMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)
// Complete wallet A message
s.mockApi.completeMsg(cid.Cid(sentinelA1))
err = s.fm.Wait(s.ctx, sentinelA1)
require.NoError(t, err)
resB1 := <-results
resB2 := <-results
require.NoError(t, resB1.err)
require.NoError(t, resB2.err)
sentinelB1 := resB1.ws
sentinelB2 := resB2.ws
// Should send different message to wallet B
require.NotEqual(t, sentinelA1, sentinelB1)
// Should be single message combining amount 1 and 2
require.Equal(t, sentinelB1, sentinelB2)
msg = s.mockApi.getSentMessage(sentinelB1)
checkAddMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2))
}
// TestFundManagerWithdrawal verifies that as many withdraw operations as // TestFundManagerWithdrawal verifies that as many withdraw operations as
// possible are processed // possible are processed
func TestFundManagerWithdrawal(t *testing.T) { func TestFundManagerWithdrawalLimit(t *testing.T) {
s := setup(t) s := setup(t)
defer s.fm.Stop() defer s.fm.Stop()
@ -225,34 +319,180 @@ func TestFundManagerWithdrawal(t *testing.T) {
err = s.fm.Release(s.acctAddr, amt) err = s.fm.Release(s.acctAddr, amt)
require.NoError(t, err) require.NoError(t, err)
// Wait until all the withdraw requests are queued up
queueReady := make(chan struct{})
fa := s.fm.getFundedAddress(s.acctAddr)
withdrawalReqCount := 0
withdrawalReqQueued := make(chan struct{}, 3)
fa.onProcessStart(func() bool {
if len(fa.withdrawals) > withdrawalReqCount {
withdrawalReqCount++
withdrawalReqQueued <- struct{}{}
}
if withdrawalReqCount == 3 {
close(queueReady)
return true
}
return false
})
type withdrawResult struct {
reqIndex int
ws WaitSentinel
err error
}
withdrawRes := make(chan *withdrawResult)
for i := 0; i < 3; i++ {
i := i
go func() {
if i > 0 {
<-withdrawalReqQueued
}
amt := abi.NewTokenAmount(5)
ws, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
withdrawRes <- &withdrawResult{reqIndex: i, ws: ws, err: err}
}()
}
// Withdrawal requests are queued up
<-queueReady
results := make([]*withdrawResult, 3)
for i := 0; i < 3; i++ {
res := <-withdrawRes
results[res.reqIndex] = res
}
// Available 10 // Available 10
// Withdraw 6 // Withdraw 5
// Expect success // Expect Success
amt = abi.NewTokenAmount(6) require.NoError(t, results[0].err)
sentinelWithdraw, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt) // Available 5
require.NoError(t, err) // Withdraw 5
// Expect Success
s.mockApi.completeMsg(cid.Cid(sentinelWithdraw)) require.NoError(t, results[1].err)
err = s.fm.Wait(s.ctx, sentinelWithdraw)
require.NoError(t, err)
// Available 4
// Withdraw 4
// Expect success
amt = abi.NewTokenAmount(4)
sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinelWithdraw))
err = s.fm.Wait(s.ctx, sentinelWithdraw)
require.NoError(t, err)
// Available 0 // Available 0
// Withdraw 1 // Withdraw 5
// Expect FAIL // Expect FAIL
amt = abi.NewTokenAmount(1) require.Error(t, results[2].err)
sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
require.Error(t, err) // Expect withdrawal requests that fit under reserved amount to be combined
// into a single message on-chain
require.Equal(t, results[0].ws, results[1].ws)
s.mockApi.completeMsg(cid.Cid(results[0].ws))
err = s.fm.Wait(s.ctx, results[0].ws)
require.NoError(t, err)
}
// TestFundManagerWithdrawByWallet verifies that withdraw requests are grouped by wallet
func TestFundManagerWithdrawByWallet(t *testing.T) {
s := setup(t)
defer s.fm.Stop()
walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
require.NoError(t, err)
walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
require.NoError(t, err)
// Reserve 10
reserveAmt := abi.NewTokenAmount(10)
sentinelReserve, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, reserveAmt)
require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinelReserve))
time.Sleep(10 * time.Millisecond)
// Release 10
err = s.fm.Release(s.acctAddr, reserveAmt)
require.NoError(t, err)
type withdrawResult struct {
ws WaitSentinel
err error
}
results := make(chan *withdrawResult)
// Wait until withdrawals are queued up
walletAQueuedUp := make(chan struct{})
queueReady := make(chan struct{})
withdrawalCount := 0
fa := s.fm.getFundedAddress(s.acctAddr)
fa.onProcessStart(func() bool {
if len(fa.withdrawals) == withdrawalCount {
return false
}
withdrawalCount = len(fa.withdrawals)
if withdrawalCount == 1 {
close(walletAQueuedUp)
} else if withdrawalCount == 3 {
close(queueReady)
return true
}
return false
})
amtA1 := abi.NewTokenAmount(1)
go func() {
// Wallet A: Withdraw 1
sentinelA1, err := s.fm.Withdraw(s.ctx, walletAddrA, s.acctAddr, amtA1)
results <- &withdrawResult{
ws: sentinelA1,
err: err,
}
}()
amtB1 := abi.NewTokenAmount(2)
amtB2 := abi.NewTokenAmount(3)
go func() {
// Wait until withdraw for wallet A is queued up
<-walletAQueuedUp
// Wallet B: Withdraw 2
go func() {
sentinelB1, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB1)
results <- &withdrawResult{
ws: sentinelB1,
err: err,
}
}()
// Wallet B: Withdraw 3
sentinelB2, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB2)
results <- &withdrawResult{
ws: sentinelB2,
err: err,
}
}()
// Withdrawals are queued up
<-queueReady
// Should withdraw from wallet A first
resA1 := <-results
sentinelA1 := resA1.ws
msg := s.mockApi.getSentMessage(sentinelA1)
checkWithdrawMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)
// Complete wallet A message
s.mockApi.completeMsg(cid.Cid(sentinelA1))
err = s.fm.Wait(s.ctx, sentinelA1)
require.NoError(t, err)
resB1 := <-results
resB2 := <-results
require.NoError(t, resB1.err)
require.NoError(t, resB2.err)
sentinelB1 := resB1.ws
sentinelB2 := resB2.ws
// Should send different message for wallet B from wallet A
require.NotEqual(t, sentinelA1, sentinelB1)
// Should be single message combining amount 1 and 2
require.Equal(t, sentinelB1, sentinelB2)
msg = s.mockApi.getSentMessage(sentinelB1)
checkWithdrawMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2))
} }
// TestFundManagerRestart verifies that waiting for incomplete requests resumes // TestFundManagerRestart verifies that waiting for incomplete requests resumes
@ -268,7 +508,7 @@ func TestFundManagerRestart(t *testing.T) {
sentinelAddr1, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) sentinelAddr1, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
require.NoError(t, err) require.NoError(t, err)
msg := s.mockApi.getSentMessage(cid.Cid(sentinelAddr1)) msg := s.mockApi.getSentMessage(sentinelAddr1)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
// Address 2: Reserve 7 // Address 2: Reserve 7
@ -276,7 +516,7 @@ func TestFundManagerRestart(t *testing.T) {
sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, s.walletAddr, acctAddr2, amt2) sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, s.walletAddr, acctAddr2, amt2)
require.NoError(t, err) require.NoError(t, err)
msg2 := s.mockApi.getSentMessage(cid.Cid(sentinelAddr2Res7)) msg2 := s.mockApi.getSentMessage(sentinelAddr2Res7)
checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2) checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2)
// Complete "Address 1: Reserve 10" // Complete "Address 1: Reserve 10"
@ -317,13 +557,14 @@ func TestFundManagerRestart(t *testing.T) {
// Expect waiting message to now be sent // Expect waiting message to now be sent
sentinel3 := <-reserveSentinel sentinel3 := <-reserveSentinel
msg3 := mockApiAfter.getSentMessage(cid.Cid(sentinel3)) msg3 := mockApiAfter.getSentMessage(sentinel3)
checkAddMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3) checkAddMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3)
} }
type scaffold struct { type scaffold struct {
ctx context.Context ctx context.Context
ds *ds_sync.MutexDatastore ds *ds_sync.MutexDatastore
wllt *wallet.LocalWallet
walletAddr address.Address walletAddr address.Address
acctAddr address.Address acctAddr address.Address
mockApi *mockFundManagerAPI mockApi *mockFundManagerAPI
@ -333,12 +574,12 @@ type scaffold struct {
func setup(t *testing.T) *scaffold { func setup(t *testing.T) *scaffold {
ctx := context.Background() ctx := context.Background()
w, err := wallet.NewWallet(wallet.NewMemKeyStore()) wallet, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
walletAddr, err := w.WalletNew(context.Background(), types.KTSecp256k1) walletAddr, err := wallet.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -351,6 +592,7 @@ func setup(t *testing.T) *scaffold {
return &scaffold{ return &scaffold{
ctx: ctx, ctx: ctx,
ds: ds, ds: ds,
wllt: wallet,
walletAddr: walletAddr, walletAddr: walletAddr,
acctAddr: acctAddr, acctAddr: acctAddr,
mockApi: mockApi, mockApi: mockApi,
@ -416,10 +658,11 @@ func (mapi *mockFundManagerAPI) MpoolPushMessage(ctx context.Context, message *t
return smsg, nil return smsg, nil
} }
func (mapi *mockFundManagerAPI) getSentMessage(c cid.Cid) *types.Message { func (mapi *mockFundManagerAPI) getSentMessage(ws WaitSentinel) *types.Message {
mapi.lk.Lock() mapi.lk.Lock()
defer mapi.lk.Unlock() defer mapi.lk.Unlock()
c := cid.Cid(ws)
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
if pending, ok := mapi.sentMsgs[c]; ok { if pending, ok := mapi.sentMsgs[c]; ok {
return &pending.msg.Message return &pending.msg.Message