refactor: remove FundManager.Wait

This commit is contained in:
Dirk McCormick 2020-11-10 09:55:38 +01:00 committed by hannahhoward
parent a51a62cb42
commit 0e0ffc9c10
2 changed files with 35 additions and 61 deletions

View File

@ -52,10 +52,6 @@ type FundManager struct {
fundedAddrs map[address.Address]*fundedAddress fundedAddrs map[address.Address]*fundedAddress
} }
type WaitSentinel cid.Cid
var WaitSentinelUndef = WaitSentinel(cid.Undef)
func NewFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager { func NewFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &FundManager{ return &FundManager{
@ -103,7 +99,9 @@ func (fm *FundManager) getFundedAddress(addr address.Address) *fundedAddress {
// Reserve adds amt to `reserved`. If there are not enough available funds for // Reserve adds amt to `reserved`. If there are not enough available funds for
// the address, submits a message on chain to top up available funds. // the address, submits a message on chain to top up available funds.
func (fm *FundManager) Reserve(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (WaitSentinel, error) { // Returns the cid of the message that was submitted on chain, or cid.Undef if
// the required funds were already available.
func (fm *FundManager) Reserve(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return fm.getFundedAddress(addr).reserve(ctx, wallet, amt) return fm.getFundedAddress(addr).reserve(ctx, wallet, amt)
} }
@ -114,16 +112,11 @@ func (fm *FundManager) Release(addr address.Address, amt abi.TokenAmount) error
// Withdraw unreserved funds. Only succeeds if there are enough unreserved // Withdraw unreserved funds. Only succeeds if there are enough unreserved
// funds for the address. // funds for the address.
func (fm *FundManager) Withdraw(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (WaitSentinel, error) { // Returns the cid of the message that was submitted on chain.
func (fm *FundManager) Withdraw(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return fm.getFundedAddress(addr).withdraw(ctx, wallet, amt) return fm.getFundedAddress(addr).withdraw(ctx, wallet, amt)
} }
// Waits for a reserve or withdraw to complete.
func (fm *FundManager) Wait(ctx context.Context, sentinel WaitSentinel) error {
_, err := fm.api.StateWaitMsg(ctx, cid.Cid(sentinel), build.MessageConfidence)
return err
}
// FundedAddressState keeps track of the state of an address with funds in the // FundedAddressState keeps track of the state of an address with funds in the
// datastore // datastore
type FundedAddressState struct { type FundedAddressState struct {
@ -178,7 +171,7 @@ func (a *fundedAddress) start() {
} }
} }
func (a *fundedAddress) reserve(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (WaitSentinel, error) { func (a *fundedAddress) reserve(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return a.requestAndWait(ctx, wallet, amt, &a.reservations) return a.requestAndWait(ctx, wallet, amt, &a.reservations)
} }
@ -187,11 +180,11 @@ func (a *fundedAddress) release(amt abi.TokenAmount) error {
return err return err
} }
func (a *fundedAddress) withdraw(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (WaitSentinel, error) { func (a *fundedAddress) withdraw(ctx context.Context, wallet address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return a.requestAndWait(ctx, wallet, amt, &a.withdrawals) return a.requestAndWait(ctx, wallet, amt, &a.withdrawals)
} }
func (a *fundedAddress) requestAndWait(ctx context.Context, wallet address.Address, amt abi.TokenAmount, reqs *[]*fundRequest) (WaitSentinel, error) { func (a *fundedAddress) requestAndWait(ctx context.Context, wallet address.Address, amt abi.TokenAmount, reqs *[]*fundRequest) (cid.Cid, error) {
// Create a request and add it to the request queue // Create a request and add it to the request queue
req := newFundRequest(ctx, wallet, amt) req := newFundRequest(ctx, wallet, amt)
@ -205,9 +198,9 @@ func (a *fundedAddress) requestAndWait(ctx context.Context, wallet address.Addre
// Wait for the results // Wait for the results
select { select {
case <-ctx.Done(): case <-ctx.Done():
return WaitSentinelUndef, ctx.Err() return cid.Undef, ctx.Err()
case r := <-req.Result: case r := <-req.Result:
return WaitSentinel(r.msgCid), r.err return r.msgCid, r.err
} }
} }

View File

@ -41,9 +41,7 @@ func TestFundManagerBasic(t *testing.T) {
msg := s.mockApi.getSentMessage(sentinel) msg := s.mockApi.getSentMessage(sentinel)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(sentinel)
err = s.fm.Wait(s.ctx, sentinel)
require.NoError(t, err)
// Reserve 7 // Reserve 7
// balance: 10 -> 17 // balance: 10 -> 17
@ -55,9 +53,7 @@ func TestFundManagerBasic(t *testing.T) {
msg = s.mockApi.getSentMessage(sentinel) msg = s.mockApi.getSentMessage(sentinel)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(sentinel)
err = s.fm.Wait(s.ctx, sentinel)
require.NoError(t, err)
// Release 5 // Release 5
// balance: 17 // balance: 17
@ -76,9 +72,7 @@ func TestFundManagerBasic(t *testing.T) {
msg = s.mockApi.getSentMessage(sentinel) msg = s.mockApi.getSentMessage(sentinel)
checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt) checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(sentinel)
err = s.fm.Wait(s.ctx, sentinel)
require.NoError(t, err)
// Reserve 3 // Reserve 3
// balance: 15 // balance: 15
@ -90,7 +84,7 @@ func TestFundManagerBasic(t *testing.T) {
sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, msgCount, s.mockApi.messageCount()) require.Equal(t, msgCount, s.mockApi.messageCount())
require.Equal(t, sentinel, WaitSentinelUndef) require.Equal(t, sentinel, cid.Undef)
// Reserve 1 // Reserve 1
// balance: 15 -> 16 // balance: 15 -> 16
@ -102,7 +96,7 @@ func TestFundManagerBasic(t *testing.T) {
sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt) sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
require.NoError(t, err) require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinel)) s.mockApi.completeMsg(sentinel)
msg = s.mockApi.getSentMessage(sentinel) msg = s.mockApi.getSentMessage(sentinel)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp)
@ -145,7 +139,7 @@ func TestFundManagerParallel(t *testing.T) {
withdrawReady <- err withdrawReady <- err
}() }()
reserveSentinels := make(chan WaitSentinel) reserveSentinels := make(chan cid.Cid)
// Reserve 3 // Reserve 3
go func() { go func() {
@ -174,7 +168,7 @@ func TestFundManagerParallel(t *testing.T) {
<-queueReady <-queueReady
// Complete the "Reserve 10" message // Complete the "Reserve 10" message
s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) s.mockApi.completeMsg(sentinelReserve10)
msg := s.mockApi.getSentMessage(sentinelReserve10) msg := s.mockApi.getSentMessage(sentinelReserve10)
checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10)) checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10))
@ -193,7 +187,7 @@ func TestFundManagerParallel(t *testing.T) {
} }
// Complete the message // Complete the message
s.mockApi.completeMsg(cid.Cid(rs1)) s.mockApi.completeMsg(rs1)
msg = s.mockApi.getSentMessage(rs1) msg = s.mockApi.getSentMessage(rs1)
// "Reserve 3" +3 // "Reserve 3" +3
@ -233,7 +227,7 @@ func TestFundManagerReserveByWallet(t *testing.T) {
}) })
type reserveResult struct { type reserveResult struct {
ws WaitSentinel ws cid.Cid
err error err error
} }
results := make(chan *reserveResult) results := make(chan *reserveResult)
@ -282,9 +276,7 @@ func TestFundManagerReserveByWallet(t *testing.T) {
checkAddMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1) checkAddMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)
// Complete wallet A message // Complete wallet A message
s.mockApi.completeMsg(cid.Cid(sentinelA1)) s.mockApi.completeMsg(sentinelA1)
err = s.fm.Wait(s.ctx, sentinelA1)
require.NoError(t, err)
resB1 := <-results resB1 := <-results
resB2 := <-results resB2 := <-results
@ -313,7 +305,7 @@ func TestFundManagerWithdrawalLimit(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Complete the "Reserve 10" message // Complete the "Reserve 10" message
s.mockApi.completeMsg(cid.Cid(sentinelReserve10)) s.mockApi.completeMsg(sentinelReserve10)
// Release 10 // Release 10
err = s.fm.Release(s.acctAddr, amt) err = s.fm.Release(s.acctAddr, amt)
@ -338,7 +330,7 @@ func TestFundManagerWithdrawalLimit(t *testing.T) {
type withdrawResult struct { type withdrawResult struct {
reqIndex int reqIndex int
ws WaitSentinel ws cid.Cid
err error err error
} }
withdrawRes := make(chan *withdrawResult) withdrawRes := make(chan *withdrawResult)
@ -379,10 +371,6 @@ func TestFundManagerWithdrawalLimit(t *testing.T) {
// Expect withdrawal requests that fit under reserved amount to be combined // Expect withdrawal requests that fit under reserved amount to be combined
// into a single message on-chain // into a single message on-chain
require.Equal(t, results[0].ws, results[1].ws) require.Equal(t, results[0].ws, results[1].ws)
s.mockApi.completeMsg(cid.Cid(results[0].ws))
err = s.fm.Wait(s.ctx, results[0].ws)
require.NoError(t, err)
} }
// TestFundManagerWithdrawByWallet verifies that withdraw requests are grouped by wallet // TestFundManagerWithdrawByWallet verifies that withdraw requests are grouped by wallet
@ -399,7 +387,7 @@ func TestFundManagerWithdrawByWallet(t *testing.T) {
reserveAmt := abi.NewTokenAmount(10) reserveAmt := abi.NewTokenAmount(10)
sentinelReserve, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, reserveAmt) sentinelReserve, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, reserveAmt)
require.NoError(t, err) require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinelReserve)) s.mockApi.completeMsg(sentinelReserve)
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
@ -408,7 +396,7 @@ func TestFundManagerWithdrawByWallet(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
type withdrawResult struct { type withdrawResult struct {
ws WaitSentinel ws cid.Cid
err error err error
} }
results := make(chan *withdrawResult) results := make(chan *withdrawResult)
@ -476,9 +464,7 @@ func TestFundManagerWithdrawByWallet(t *testing.T) {
checkWithdrawMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1) checkWithdrawMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)
// Complete wallet A message // Complete wallet A message
s.mockApi.completeMsg(cid.Cid(sentinelA1)) s.mockApi.completeMsg(sentinelA1)
err = s.fm.Wait(s.ctx, sentinelA1)
require.NoError(t, err)
resB1 := <-results resB1 := <-results
resB2 := <-results resB2 := <-results
@ -520,9 +506,7 @@ func TestFundManagerRestart(t *testing.T) {
checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2) checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2)
// Complete "Address 1: Reserve 10" // Complete "Address 1: Reserve 10"
s.mockApi.completeMsg(cid.Cid(sentinelAddr1)) s.mockApi.completeMsg(sentinelAddr1)
err = s.fm.Wait(s.ctx, sentinelAddr1)
require.NoError(t, err)
// Give the completed state a moment to be stored before restart // Give the completed state a moment to be stored before restart
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
@ -534,7 +518,7 @@ func TestFundManagerRestart(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
amt3 := abi.NewTokenAmount(9) amt3 := abi.NewTokenAmount(9)
reserveSentinel := make(chan WaitSentinel) reserveSentinel := make(chan cid.Cid)
go func() { go func() {
// Address 2: Reserve 9 // Address 2: Reserve 9
sentinel3, err := fmAfter.Reserve(s.ctx, s.walletAddr, acctAddr2, amt3) sentinel3, err := fmAfter.Reserve(s.ctx, s.walletAddr, acctAddr2, amt3)
@ -551,9 +535,7 @@ func TestFundManagerRestart(t *testing.T) {
} }
// Complete "Address 2: Reserve 7" // Complete "Address 2: Reserve 7"
mockApiAfter.completeMsg(cid.Cid(sentinelAddr2Res7)) mockApiAfter.completeMsg(sentinelAddr2Res7)
err = fmAfter.Wait(s.ctx, sentinelAddr2Res7)
require.NoError(t, err)
// Expect waiting message to now be sent // Expect waiting message to now be sent
sentinel3 := <-reserveSentinel sentinel3 := <-reserveSentinel
@ -574,12 +556,12 @@ type scaffold struct {
func setup(t *testing.T) *scaffold { func setup(t *testing.T) *scaffold {
ctx := context.Background() ctx := context.Background()
wallet, err := wallet.NewWallet(wallet.NewMemKeyStore()) wllt, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
walletAddr, err := wallet.WalletNew(context.Background(), types.KTSecp256k1) walletAddr, err := wllt.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -587,12 +569,12 @@ func setup(t *testing.T) *scaffold {
acctAddr := tutils.NewActorAddr(t, "addr") acctAddr := tutils.NewActorAddr(t, "addr")
mockApi := newMockFundManagerAPI(walletAddr) mockApi := newMockFundManagerAPI(walletAddr)
ds := ds_sync.MutexWrap(ds.NewMapDatastore()) dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
fm := NewFundManager(mockApi, ds) fm := NewFundManager(mockApi, dstore)
return &scaffold{ return &scaffold{
ctx: ctx, ctx: ctx,
ds: ds, ds: dstore,
wllt: wallet, wllt: wllt,
walletAddr: walletAddr, walletAddr: walletAddr,
acctAddr: acctAddr, acctAddr: acctAddr,
mockApi: mockApi, mockApi: mockApi,
@ -658,11 +640,10 @@ func (mapi *mockFundManagerAPI) MpoolPushMessage(ctx context.Context, message *t
return smsg, nil return smsg, nil
} }
func (mapi *mockFundManagerAPI) getSentMessage(ws WaitSentinel) *types.Message { func (mapi *mockFundManagerAPI) getSentMessage(c cid.Cid) *types.Message {
mapi.lk.Lock() mapi.lk.Lock()
defer mapi.lk.Unlock() defer mapi.lk.Unlock()
c := cid.Cid(ws)
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
if pending, ok := mapi.sentMsgs[c]; ok { if pending, ok := mapi.sentMsgs[c]; ok {
return &pending.msg.Message return &pending.msg.Message