diff --git a/paychmgr/manager.go b/paychmgr/manager.go index e5270d9c1..79ccfe51a 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -181,7 +181,10 @@ func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt t // The returned channel address can safely be used against the Manager methods. func (pm *Manager) GetPaychWaitReady(ctx context.Context, mcid cid.Cid) (address.Address, error) { // Find the channel associated with the message CID + pm.lk.Lock() ci, err := pm.store.ByMessageCid(mcid) + pm.lk.Unlock() + if err != nil { if err == datastore.ErrNotFound { return address.Undef, xerrors.Errorf("Could not find wait msg cid %s", mcid) diff --git a/paychmgr/msglistener.go b/paychmgr/msglistener.go index 6b4cd8346..0a38cc2da 100644 --- a/paychmgr/msglistener.go +++ b/paychmgr/msglistener.go @@ -43,6 +43,9 @@ func (ml *msgListeners) fireMsgComplete(mcid cid.Cid, err error) { } func (ml *msgListeners) unsubscribe(sub string) { + ml.lk.Lock() + defer ml.lk.Unlock() + for i, l := range ml.listeners { if l.id == sub { ml.removeListener(i) diff --git a/paychmgr/paychget_test.go b/paychmgr/paychget_test.go index bdc19d9ac..3fdb8554e 100644 --- a/paychmgr/paychget_test.go +++ b/paychmgr/paychget_test.go @@ -33,27 +33,58 @@ type waitingCall struct { type mockPaychAPI struct { lk sync.Mutex messages map[cid.Cid]*types.SignedMessage - waitingCalls []*waitingCall + waitingCalls map[cid.Cid]*waitingCall + responses map[cid.Cid]types.MessageReceipt } func newMockPaychAPI() *mockPaychAPI { return &mockPaychAPI{ - messages: make(map[cid.Cid]*types.SignedMessage), + messages: make(map[cid.Cid]*types.SignedMessage), + waitingCalls: make(map[cid.Cid]*waitingCall), + responses: make(map[cid.Cid]types.MessageReceipt), } } -func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { +func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, confidence uint64) (*api.MsgLookup, error) { response := make(chan types.MessageReceipt) pchapi.lk.Lock() - pchapi.waitingCalls = append(pchapi.waitingCalls, &waitingCall{response: response}) + + if receipt, ok := pchapi.responses[mcid]; ok { + defer pchapi.lk.Unlock() + + delete(pchapi.responses, mcid) + return &api.MsgLookup{Receipt: receipt}, nil + } + + pchapi.waitingCalls[mcid] = &waitingCall{response: response} pchapi.lk.Unlock() receipt := <-response - return &api.MsgLookup{Receipt: receipt}, nil } +func (pchapi *mockPaychAPI) receiveMsgResponse(mcid cid.Cid, receipt types.MessageReceipt) { + pchapi.lk.Lock() + defer pchapi.lk.Unlock() + + if call, ok := pchapi.waitingCalls[mcid]; ok { + delete(pchapi.waitingCalls, mcid) + call.response <- receipt + } + + pchapi.responses[mcid] = receipt +} + +func (pchapi *mockPaychAPI) close() { + for mcid := range pchapi.waitingCalls { + pchapi.receiveMsgResponse(mcid, types.MessageReceipt{ + ExitCode: 0, + Return: []byte{}, + }) + } +} + func (pchapi *mockPaychAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) { pchapi.lk.Lock() defer pchapi.lk.Unlock() @@ -77,23 +108,6 @@ func (pchapi *mockPaychAPI) pushedMessageCount() int { return len(pchapi.messages) } -func (pchapi *mockPaychAPI) finishWaitingCalls(receipt types.MessageReceipt) { - pchapi.lk.Lock() - defer pchapi.lk.Unlock() - - for _, call := range pchapi.waitingCalls { - call.response <- receipt - } - pchapi.waitingCalls = nil -} - -func (pchapi *mockPaychAPI) close() { - pchapi.finishWaitingCalls(types.MessageReceipt{ - ExitCode: 0, - Return: []byte{}, - }) -} - func testChannelResponse(t *testing.T, ch address.Address) types.MessageReceipt { createChannelRet := init_.ExecReturn{ IDAddress: ch, @@ -199,9 +213,11 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) { require.Nil(t, ci.CreateMsg) // Trigger add funds confirmation - pchapi.finishWaitingCalls(types.MessageReceipt{ExitCode: 0}) + pchapi.receiveMsgResponse(addFundsMsgCid, types.MessageReceipt{ExitCode: 0}) - time.Sleep(time.Millisecond * 10) + // Wait for add funds confirmation to be processed by manager + _, err = mgr.GetPaychWaitReady(ctx, addFundsMsgCid) + require.NoError(t, err) // Should still have one channel cis, err = mgr.ListChannels() @@ -221,7 +237,7 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) { time.Sleep(time.Millisecond * 10) // 3. Send create channel response - pchapi.finishWaitingCalls(response) + pchapi.receiveMsgResponse(createMsgCid, response) <-done } @@ -245,7 +261,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) { // Send create message for a channel amt := big.NewInt(10) - _, _, err = mgr.GetPaych(ctx, from, to, amt) + _, mcid1, err := mgr.GetPaych(ctx, from, to, amt) require.NoError(t, err) // 1. Set up create channel response (sent in response to WaitForMsg()) @@ -263,7 +279,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) { // Because first channel create fails, this request // should be for channel create. amt2 := big.NewInt(5) - ch2, _, err := mgr.GetPaych(ctx, from, to, amt2) + ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) require.NoError(t, err) require.Equal(t, address.Undef, ch2) @@ -272,7 +288,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) { // 4. Send a success response ch := tutils.NewIDAddr(t, 100) successResponse := testChannelResponse(t, ch) - pchapi.finishWaitingCalls(successResponse) + pchapi.receiveMsgResponse(mcid2, successResponse) time.Sleep(time.Millisecond * 10) @@ -291,7 +307,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) { time.Sleep(time.Millisecond * 10) // 3. Send error response to first channel create - pchapi.finishWaitingCalls(errResponse) + pchapi.receiveMsgResponse(mcid1, errResponse) <-done } @@ -315,13 +331,13 @@ func TestPaychGetRecoverAfterError(t *testing.T) { // Send create message for a channel amt := big.NewInt(10) - _, _, err = mgr.GetPaych(ctx, from, to, amt) + _, mcid, err := mgr.GetPaych(ctx, from, to, amt) require.NoError(t, err) time.Sleep(time.Millisecond * 10) // Send error create channel response - pchapi.finishWaitingCalls(types.MessageReceipt{ + pchapi.receiveMsgResponse(mcid, types.MessageReceipt{ ExitCode: 1, // error Return: []byte{}, }) @@ -330,14 +346,14 @@ func TestPaychGetRecoverAfterError(t *testing.T) { // Send create message for a channel again amt2 := big.NewInt(7) - _, _, err = mgr.GetPaych(ctx, from, to, amt2) + _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) require.NoError(t, err) time.Sleep(time.Millisecond * 10) // Send success create channel response response := testChannelResponse(t, ch) - pchapi.finishWaitingCalls(response) + pchapi.receiveMsgResponse(mcid2, response) time.Sleep(time.Millisecond * 10) @@ -373,26 +389,26 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) { // Send create message for a channel amt := big.NewInt(10) - _, _, err = mgr.GetPaych(ctx, from, to, amt) + _, mcid1, err := mgr.GetPaych(ctx, from, to, amt) require.NoError(t, err) time.Sleep(time.Millisecond * 10) // Send success create channel response response := testChannelResponse(t, ch) - pchapi.finishWaitingCalls(response) + pchapi.receiveMsgResponse(mcid1, response) time.Sleep(time.Millisecond * 10) // Send add funds message for channel amt2 := big.NewInt(5) - _, _, err = mgr.GetPaych(ctx, from, to, amt2) + _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) require.NoError(t, err) time.Sleep(time.Millisecond * 10) // Send error add funds response - pchapi.finishWaitingCalls(types.MessageReceipt{ + pchapi.receiveMsgResponse(mcid2, types.MessageReceipt{ ExitCode: 1, // error Return: []byte{}, }) @@ -414,13 +430,13 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) { // Send add funds message for channel again amt3 := big.NewInt(2) - _, _, err = mgr.GetPaych(ctx, from, to, amt3) + _, mcid3, err := mgr.GetPaych(ctx, from, to, amt3) require.NoError(t, err) time.Sleep(time.Millisecond * 10) // Send success add funds response - pchapi.finishWaitingCalls(types.MessageReceipt{ + pchapi.receiveMsgResponse(mcid3, types.MessageReceipt{ ExitCode: 0, Return: []byte{}, }) @@ -521,7 +537,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) { time.Sleep(time.Millisecond * 10) // 3. Send create channel response - pchapi2.finishWaitingCalls(response) + pchapi2.receiveMsgResponse(createMsgCid, response) <-done } @@ -545,20 +561,20 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) { // Send create message for a channel amt := big.NewInt(10) - _, _, err = mgr.GetPaych(ctx, from, to, amt) + _, mcid1, err := mgr.GetPaych(ctx, from, to, amt) require.NoError(t, err) time.Sleep(time.Millisecond * 10) // Send success create channel response response := testChannelResponse(t, ch) - pchapi.finishWaitingCalls(response) + pchapi.receiveMsgResponse(mcid1, response) time.Sleep(time.Millisecond * 10) // Send add funds message for channel amt2 := big.NewInt(5) - _, _, err = mgr.GetPaych(ctx, from, to, amt2) + _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) require.NoError(t, err) time.Sleep(time.Millisecond * 10) @@ -577,7 +593,7 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) { require.NoError(t, err) // Send success add funds response - pchapi2.finishWaitingCalls(types.MessageReceipt{ + pchapi2.receiveMsgResponse(mcid2, types.MessageReceipt{ ExitCode: 0, Return: []byte{}, }) @@ -617,13 +633,13 @@ func TestPaychGetWait(t *testing.T) { // 1. Get amt := big.NewInt(10) - _, mcid, err := mgr.GetPaych(ctx, from, to, amt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt) require.NoError(t, err) done := make(chan address.Address) go func() { // 2. Wait till ready - ch, err := mgr.GetPaychWaitReady(ctx, mcid) + ch, err := mgr.GetPaychWaitReady(ctx, createMsgCid) require.NoError(t, err) done <- ch @@ -634,7 +650,7 @@ func TestPaychGetWait(t *testing.T) { // 3. Send response expch := tutils.NewIDAddr(t, 100) response := testChannelResponse(t, expch) - pchapi.finishWaitingCalls(response) + pchapi.receiveMsgResponse(createMsgCid, response) time.Sleep(time.Millisecond * 10) @@ -643,7 +659,7 @@ func TestPaychGetWait(t *testing.T) { // 4. Wait again - message has already been received so should // return immediately - ch, err = mgr.GetPaychWaitReady(ctx, mcid) + ch, err = mgr.GetPaychWaitReady(ctx, createMsgCid) require.NoError(t, err) require.Equal(t, expch, ch) @@ -670,7 +686,7 @@ func TestPaychGetWait(t *testing.T) { ExitCode: 0, Return: []byte{}, } - pchapi.finishWaitingCalls(addFundsResponse) + pchapi.receiveMsgResponse(addFundsMsgCid, addFundsResponse) <-done } @@ -720,7 +736,7 @@ func TestPaychGetWaitErr(t *testing.T) { ExitCode: 1, // error Return: []byte{}, } - pchapi.finishWaitingCalls(response) + pchapi.receiveMsgResponse(mcid, response) <-done } diff --git a/paychmgr/settle_test.go b/paychmgr/settle_test.go index a60351d4d..88f44d848 100644 --- a/paychmgr/settle_test.go +++ b/paychmgr/settle_test.go @@ -38,7 +38,7 @@ func TestPaychSettle(t *testing.T) { // Send channel create response response := testChannelResponse(t, expch) - pchapi.finishWaitingCalls(response) + pchapi.receiveMsgResponse(mcid, response) // Get the channel address ch, err := mgr.GetPaychWaitReady(ctx, mcid) @@ -61,7 +61,7 @@ func TestPaychSettle(t *testing.T) { // Send new channel create response response2 := testChannelResponse(t, expch2) - pchapi.finishWaitingCalls(response2) + pchapi.receiveMsgResponse(mcid2, response2) time.Sleep(10 * time.Millisecond)