Merge pull request #2869 from filecoin-project/fix/paych-sleepy-tests

Fix paych sleepy tests
This commit is contained in:
Łukasz Magiera 2020-08-06 19:57:09 +02:00 committed by GitHub
commit 85f4bc7717
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 101 deletions

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"sync" "sync"
"testing" "testing"
"time"
cborrpc "github.com/filecoin-project/go-cbor-util" cborrpc "github.com/filecoin-project/go-cbor-util"
@ -30,31 +29,39 @@ type waitingCall struct {
response chan types.MessageReceipt response chan types.MessageReceipt
} }
type waitingResponse struct {
receipt types.MessageReceipt
done chan struct{}
}
type mockPaychAPI struct { type mockPaychAPI struct {
lk sync.Mutex lk sync.Mutex
messages map[cid.Cid]*types.SignedMessage messages map[cid.Cid]*types.SignedMessage
waitingCalls map[cid.Cid]*waitingCall waitingCalls map[cid.Cid]*waitingCall
responses map[cid.Cid]types.MessageReceipt waitingResponses map[cid.Cid]*waitingResponse
} }
func newMockPaychAPI() *mockPaychAPI { func newMockPaychAPI() *mockPaychAPI {
return &mockPaychAPI{ return &mockPaychAPI{
messages: make(map[cid.Cid]*types.SignedMessage), messages: make(map[cid.Cid]*types.SignedMessage),
waitingCalls: make(map[cid.Cid]*waitingCall), waitingCalls: make(map[cid.Cid]*waitingCall),
responses: make(map[cid.Cid]types.MessageReceipt), waitingResponses: make(map[cid.Cid]*waitingResponse),
} }
} }
func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, confidence uint64) (*api.MsgLookup, error) { func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, confidence uint64) (*api.MsgLookup, error) {
response := make(chan types.MessageReceipt)
pchapi.lk.Lock() pchapi.lk.Lock()
if receipt, ok := pchapi.responses[mcid]; ok { response := make(chan types.MessageReceipt)
defer pchapi.lk.Unlock()
delete(pchapi.responses, mcid) if response, ok := pchapi.waitingResponses[mcid]; ok {
return &api.MsgLookup{Receipt: receipt}, nil defer pchapi.lk.Unlock()
defer func() {
go close(response.done)
}()
delete(pchapi.waitingResponses, mcid)
return &api.MsgLookup{Receipt: response.receipt}, nil
} }
pchapi.waitingCalls[mcid] = &waitingCall{response: response} pchapi.waitingCalls[mcid] = &waitingCall{response: response}
@ -66,15 +73,21 @@ func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, conf
func (pchapi *mockPaychAPI) receiveMsgResponse(mcid cid.Cid, receipt types.MessageReceipt) { func (pchapi *mockPaychAPI) receiveMsgResponse(mcid cid.Cid, receipt types.MessageReceipt) {
pchapi.lk.Lock() pchapi.lk.Lock()
defer pchapi.lk.Unlock()
if call, ok := pchapi.waitingCalls[mcid]; ok { if call, ok := pchapi.waitingCalls[mcid]; ok {
defer pchapi.lk.Unlock()
delete(pchapi.waitingCalls, mcid) delete(pchapi.waitingCalls, mcid)
call.response <- receipt call.response <- receipt
return return
} }
pchapi.responses[mcid] = receipt done := make(chan struct{})
pchapi.waitingResponses[mcid] = &waitingResponse{receipt: receipt, done: done}
pchapi.lk.Unlock()
<-done
} }
// Send success response for any waiting calls // Send success response for any waiting calls
@ -240,9 +253,6 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
require.Nil(t, ci.AddFundsMsg) require.Nil(t, ci.AddFundsMsg)
}() }()
// Give the go routine above a moment to run
time.Sleep(time.Millisecond * 10)
// 3. Send create channel response // 3. Send create channel response
pchapi.receiveMsgResponse(createMsgCid, response) pchapi.receiveMsgResponse(createMsgCid, response)
@ -290,14 +300,13 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, address.Undef, ch2) require.Equal(t, address.Undef, ch2)
time.Sleep(time.Millisecond * 10)
// 4. Send a success response // 4. Send a success response
ch := tutils.NewIDAddr(t, 100) ch := tutils.NewIDAddr(t, 100)
successResponse := testChannelResponse(t, ch) successResponse := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid2, successResponse) pchapi.receiveMsgResponse(mcid2, successResponse)
time.Sleep(time.Millisecond * 10) _, err = mgr.GetPaychWaitReady(ctx, mcid2)
require.NoError(t, err)
// Should have one channel, whose address is the channel that was created // Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels() cis, err := mgr.ListChannels()
@ -310,9 +319,6 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
require.Equal(t, amt2, ci.Amount) require.Equal(t, amt2, ci.Amount)
}() }()
// Give the go routine above a moment to run
time.Sleep(time.Millisecond * 10)
// 3. Send error response to first channel create // 3. Send error response to first channel create
pchapi.receiveMsgResponse(mcid1, errResponse) pchapi.receiveMsgResponse(mcid1, errResponse)
@ -341,28 +347,23 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
_, mcid, err := mgr.GetPaych(ctx, from, to, amt) _, mcid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send error create channel response // Send error create channel response
pchapi.receiveMsgResponse(mcid, types.MessageReceipt{ pchapi.receiveMsgResponse(mcid, types.MessageReceipt{
ExitCode: 1, // error ExitCode: 1, // error
Return: []byte{}, Return: []byte{},
}) })
time.Sleep(time.Millisecond * 10)
// Send create message for a channel again // Send create message for a channel again
amt2 := big.NewInt(7) amt2 := big.NewInt(7)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success create channel response // Send success create channel response
response := testChannelResponse(t, ch) response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid2, response) pchapi.receiveMsgResponse(mcid2, response)
time.Sleep(time.Millisecond * 10) _, err = mgr.GetPaychWaitReady(ctx, mcid2)
require.NoError(t, err)
// Should have one channel, whose address is the channel that was created // Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels() cis, err := mgr.ListChannels()
@ -399,28 +400,23 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt) _, mcid1, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success create channel response // Send success create channel response
response := testChannelResponse(t, ch) response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid1, response) pchapi.receiveMsgResponse(mcid1, response)
time.Sleep(time.Millisecond * 10)
// Send add funds message for channel // Send add funds message for channel
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send error add funds response // Send error add funds response
pchapi.receiveMsgResponse(mcid2, types.MessageReceipt{ pchapi.receiveMsgResponse(mcid2, types.MessageReceipt{
ExitCode: 1, // error ExitCode: 1, // error
Return: []byte{}, Return: []byte{},
}) })
time.Sleep(time.Millisecond * 10) _, err = mgr.GetPaychWaitReady(ctx, mcid2)
require.Error(t, err)
// Should have one channel, whose address is the channel that was created // Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels() cis, err := mgr.ListChannels()
@ -440,15 +436,14 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
_, mcid3, err := mgr.GetPaych(ctx, from, to, amt3) _, mcid3, err := mgr.GetPaych(ctx, from, to, amt3)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success add funds response // Send success add funds response
pchapi.receiveMsgResponse(mcid3, types.MessageReceipt{ pchapi.receiveMsgResponse(mcid3, types.MessageReceipt{
ExitCode: 0, ExitCode: 0,
Return: []byte{}, Return: []byte{},
}) })
time.Sleep(time.Millisecond * 10) _, err = mgr.GetPaychWaitReady(ctx, mcid3)
require.NoError(t, err)
// Should have one channel, whose address is the channel that was created // Should have one channel, whose address is the channel that was created
cis, err = mgr.ListChannels() cis, err = mgr.ListChannels()
@ -487,7 +482,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err) require.NoError(t, err)
// Simulate shutting down lotus // Simulate shutting down system
pchapi.close() pchapi.close()
// Create a new manager with the same datastore // Create a new manager with the same datastore
@ -540,9 +535,6 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
require.Nil(t, ci.CreateMsg) require.Nil(t, ci.CreateMsg)
}() }()
// Give the go routine above a moment to run
time.Sleep(time.Millisecond * 10)
// 3. Send create channel response // 3. Send create channel response
pchapi2.receiveMsgResponse(createMsgCid, response) pchapi2.receiveMsgResponse(createMsgCid, response)
@ -571,22 +563,16 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
_, mcid1, err := mgr.GetPaych(ctx, from, to, amt) _, mcid1, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success create channel response // Send success create channel response
response := testChannelResponse(t, ch) response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid1, response) pchapi.receiveMsgResponse(mcid1, response)
time.Sleep(time.Millisecond * 10)
// Send add funds message for channel // Send add funds message for channel
amt2 := big.NewInt(5) amt2 := big.NewInt(5)
_, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10) // Simulate shutting down system
// Simulate shutting down lotus
pchapi.close() pchapi.close()
// Create a new manager with the same datastore // Create a new manager with the same datastore
@ -594,8 +580,6 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
pchapi2 := newMockPaychAPI() pchapi2 := newMockPaychAPI()
defer pchapi2.close() defer pchapi2.close()
time.Sleep(time.Millisecond * 10)
mgr2, err := newManager(sm2, store, pchapi2) mgr2, err := newManager(sm2, store, pchapi2)
require.NoError(t, err) require.NoError(t, err)
@ -605,7 +589,8 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
Return: []byte{}, Return: []byte{},
}) })
time.Sleep(time.Millisecond * 10) _, err = mgr2.GetPaychWaitReady(ctx, mcid2)
require.NoError(t, err)
// Should have one channel, whose address is the channel that was created // Should have one channel, whose address is the channel that was created
cis, err := mgr2.ListChannels() cis, err := mgr2.ListChannels()
@ -643,25 +628,16 @@ func TestPaychGetWait(t *testing.T) {
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt) _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err) require.NoError(t, err)
done := make(chan address.Address) expch := tutils.NewIDAddr(t, 100)
go func() { go func() {
// 2. Wait till ready // 3. Send response
ch, err := mgr.GetPaychWaitReady(ctx, createMsgCid) response := testChannelResponse(t, expch)
require.NoError(t, err) pchapi.receiveMsgResponse(createMsgCid, response)
done <- ch
}() }()
time.Sleep(time.Millisecond * 10) // 2. Wait till ready
ch, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
// 3. Send response require.NoError(t, err)
expch := tutils.NewIDAddr(t, 100)
response := testChannelResponse(t, expch)
pchapi.receiveMsgResponse(createMsgCid, response)
time.Sleep(time.Millisecond * 10)
ch := <-done
require.Equal(t, expch, ch) require.Equal(t, expch, ch)
// 4. Wait again - message has already been received so should // 4. Wait again - message has already been received so should
@ -675,27 +651,19 @@ func TestPaychGetWait(t *testing.T) {
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2) _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
go func() { go func() {
// 5. Wait for add funds // 6. Send add funds response
ch, err := mgr.GetPaychWaitReady(ctx, addFundsMsgCid) addFundsResponse := types.MessageReceipt{
require.NoError(t, err) ExitCode: 0,
require.Equal(t, expch, ch) Return: []byte{},
}
done <- ch pchapi.receiveMsgResponse(addFundsMsgCid, addFundsResponse)
}() }()
time.Sleep(time.Millisecond * 10) // 5. Wait for add funds
ch, err = mgr.GetPaychWaitReady(ctx, addFundsMsgCid)
// 6. Send add funds response require.NoError(t, err)
addFundsResponse := types.MessageReceipt{ require.Equal(t, expch, ch)
ExitCode: 0,
Return: []byte{},
}
pchapi.receiveMsgResponse(addFundsMsgCid, addFundsResponse)
<-done
} }
// TestPaychGetWaitErr tests that GetPaychWaitReady correctly handles errors // TestPaychGetWaitErr tests that GetPaychWaitReady correctly handles errors
@ -735,9 +703,6 @@ func TestPaychGetWaitErr(t *testing.T) {
require.NotNil(t, err) require.NotNil(t, err)
}() }()
// Give the wait a moment to start before sending response
time.Sleep(time.Millisecond * 10)
// 3. Send error response to create channel // 3. Send error response to create channel
response := types.MessageReceipt{ response := types.MessageReceipt{
ExitCode: 1, // error ExitCode: 1, // error
@ -770,7 +735,6 @@ func TestPaychGetWaitCtx(t *testing.T) {
// When the context is cancelled, should unblock wait // When the context is cancelled, should unblock wait
go func() { go func() {
time.Sleep(time.Millisecond * 10)
cancel() cancel()
}() }()

View File

@ -3,7 +3,6 @@ package paychmgr
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -34,8 +33,6 @@ func TestPaychSettle(t *testing.T) {
_, mcid, err := mgr.GetPaych(ctx, from, to, amt) _, mcid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
// Send channel create response // Send channel create response
response := testChannelResponse(t, expch) response := testChannelResponse(t, expch)
pchapi.receiveMsgResponse(mcid, response) pchapi.receiveMsgResponse(mcid, response)
@ -57,14 +54,10 @@ func TestPaychSettle(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, cid.Undef, mcid2) require.NotEqual(t, cid.Undef, mcid2)
time.Sleep(10 * time.Millisecond)
// Send new channel create response // Send new channel create response
response2 := testChannelResponse(t, expch2) response2 := testChannelResponse(t, expch2)
pchapi.receiveMsgResponse(mcid2, response2) pchapi.receiveMsgResponse(mcid2, response2)
time.Sleep(10 * time.Millisecond)
// Make sure the new channel is different from the old channel // Make sure the new channel is different from the old channel
ch2, err := mgr.GetPaychWaitReady(ctx, mcid2) ch2, err := mgr.GetPaychWaitReady(ctx, mcid2)
require.NoError(t, err) require.NoError(t, err)