lotus/paychmgr/paychget_test.go

757 lines
20 KiB
Go
Raw Normal View History

2020-07-28 23:16:47 +00:00
package paychmgr
import (
"context"
"sync"
"testing"
"time"
cborrpc "github.com/filecoin-project/go-cbor-util"
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi/big"
tutils "github.com/filecoin-project/specs-actors/support/testing"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)
type waitingCall struct {
response chan types.MessageReceipt
}
type mockPaychAPI struct {
lk sync.Mutex
messages map[cid.Cid]*types.SignedMessage
waitingCalls []*waitingCall
}
func newMockPaychAPI() *mockPaychAPI {
return &mockPaychAPI{
messages: make(map[cid.Cid]*types.SignedMessage),
}
}
func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
response := make(chan types.MessageReceipt)
pchapi.lk.Lock()
pchapi.waitingCalls = append(pchapi.waitingCalls, &waitingCall{response: response})
pchapi.lk.Unlock()
receipt := <-response
return &api.MsgLookup{Receipt: receipt}, nil
}
func (pchapi *mockPaychAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
smsg := &types.SignedMessage{Message: *msg}
pchapi.messages[smsg.Cid()] = smsg
return smsg, nil
}
func (pchapi *mockPaychAPI) pushedMessages(c cid.Cid) *types.SignedMessage {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
return pchapi.messages[c]
}
func (pchapi *mockPaychAPI) pushedMessageCount() int {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
return len(pchapi.messages)
}
func (pchapi *mockPaychAPI) finishWaitingCalls(receipt types.MessageReceipt) {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
for _, call := range pchapi.waitingCalls {
call.response <- receipt
}
pchapi.waitingCalls = nil
}
func (pchapi *mockPaychAPI) close() {
pchapi.finishWaitingCalls(types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
}
func testChannelResponse(t *testing.T, ch address.Address) types.MessageReceipt {
createChannelRet := init_.ExecReturn{
IDAddress: ch,
RobustAddress: ch,
}
createChannelRetBytes, err := cborrpc.Dump(&createChannelRet)
require.NoError(t, err)
createChannelResponse := types.MessageReceipt{
ExitCode: 0,
Return: createChannelRetBytes,
}
return createChannelResponse
}
// TestPaychGetCreateChannelMsg tests that GetPaych sends a message to create
// a new channel with the correct funds
func TestPaychGetCreateChannelMsg(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
amt := big.NewInt(10)
ch, mcid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
require.Equal(t, address.Undef, ch)
pushedMsg := pchapi.pushedMessages(mcid)
require.Equal(t, from, pushedMsg.Message.From)
require.Equal(t, builtin.InitActorAddr, pushedMsg.Message.To)
require.Equal(t, amt, pushedMsg.Message.Value)
}
// TestPaychGetCreateChannelThenAddFunds tests creating a channel and then
// adding funds to it
func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// Send create message for a channel with value 10
amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
// Should have no channels yet (message sent but channel not created)
cis, err := mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 0)
// 1. Set up create channel response (sent in response to WaitForMsg())
response := testChannelResponse(t, ch)
done := make(chan struct{})
go func() {
defer close(done)
// 2. Request add funds - should block until create channel has completed
amt2 := big.NewInt(5)
ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2)
// 4. This GetPaych should return after create channel from first
// GetPaych completes
require.NoError(t, err)
// Expect the channel to be the same
require.Equal(t, ch, ch2)
// Expect add funds message CID to be different to create message cid
require.NotEqual(t, createMsgCid, addFundsMsgCid)
// Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
// Amount should be amount sent to first GetPaych (to create
// channel).
// PendingAmount should be amount sent in second GetPaych
// (second GetPaych triggered add funds, which has not yet been confirmed)
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.EqualValues(t, 10, ci.Amount.Int64())
require.EqualValues(t, 5, ci.PendingAmount.Int64())
require.Nil(t, ci.CreateMsg)
// Trigger add funds confirmation
pchapi.finishWaitingCalls(types.MessageReceipt{ExitCode: 0})
time.Sleep(time.Millisecond * 10)
// Should still have one channel
cis, err = mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
// Channel amount should include last amount sent to GetPaych
ci, err = mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.EqualValues(t, 15, ci.Amount.Int64())
require.EqualValues(t, 0, ci.PendingAmount.Int64())
require.Nil(t, ci.AddFundsMsg)
}()
// Give the go routine above a moment to run
time.Sleep(time.Millisecond * 10)
// 3. Send create channel response
pchapi.finishWaitingCalls(response)
<-done
}
// TestPaychGetCreateChannelWithErrorThenCreateAgain tests that if an
// operation is queued up behind a create channel operation, and the create
// channel fails, then the waiting operation can succeed.
func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// Send create message for a channel
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
// 1. Set up create channel response (sent in response to WaitForMsg())
// This response indicates an error.
errResponse := types.MessageReceipt{
ExitCode: 1, // error
Return: []byte{},
}
done := make(chan struct{})
go func() {
defer close(done)
// 2. Should block until create channel has completed.
// Because first channel create fails, this request
// should be for channel create.
amt2 := big.NewInt(5)
ch2, _, err := mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err)
require.Equal(t, address.Undef, ch2)
time.Sleep(time.Millisecond * 10)
// 4. Send a success response
ch := tutils.NewIDAddr(t, 100)
successResponse := testChannelResponse(t, ch)
pchapi.finishWaitingCalls(successResponse)
time.Sleep(time.Millisecond * 10)
// Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Equal(t, amt2, ci.Amount)
}()
// Give the go routine above a moment to run
time.Sleep(time.Millisecond * 10)
// 3. Send error response to first channel create
pchapi.finishWaitingCalls(errResponse)
<-done
}
// TestPaychGetRecoverAfterError tests that after a create channel fails, the
// next attempt to create channel can succeed.
func TestPaychGetRecoverAfterError(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// Send create message for a channel
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send error create channel response
pchapi.finishWaitingCalls(types.MessageReceipt{
ExitCode: 1, // error
Return: []byte{},
})
time.Sleep(time.Millisecond * 10)
// Send create message for a channel again
amt2 := big.NewInt(7)
_, _, err = mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success create channel response
response := testChannelResponse(t, ch)
pchapi.finishWaitingCalls(response)
time.Sleep(time.Millisecond * 10)
// Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Equal(t, amt2, ci.Amount)
require.EqualValues(t, 0, ci.PendingAmount.Int64())
require.Nil(t, ci.CreateMsg)
}
// TestPaychGetRecoverAfterAddFundsError tests that after an add funds fails, the
// next attempt to add funds can succeed.
func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// Send create message for a channel
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success create channel response
response := testChannelResponse(t, ch)
pchapi.finishWaitingCalls(response)
time.Sleep(time.Millisecond * 10)
// Send add funds message for channel
amt2 := big.NewInt(5)
_, _, err = mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send error add funds response
pchapi.finishWaitingCalls(types.MessageReceipt{
ExitCode: 1, // error
Return: []byte{},
})
time.Sleep(time.Millisecond * 10)
// Should have one channel, whose address is the channel that was created
cis, err := mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
ci, err := mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Equal(t, amt, ci.Amount)
require.EqualValues(t, 0, ci.PendingAmount.Int64())
require.Nil(t, ci.CreateMsg)
require.Nil(t, ci.AddFundsMsg)
// Send add funds message for channel again
amt3 := big.NewInt(2)
_, _, err = mgr.GetPaych(ctx, from, to, amt3)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success add funds response
pchapi.finishWaitingCalls(types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
time.Sleep(time.Millisecond * 10)
// Should have one channel, whose address is the channel that was created
cis, err = mgr.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
// Amount should include amount for successful add funds msg
ci, err = mgr.GetChannelInfo(ch)
require.NoError(t, err)
require.Equal(t, amt.Int64()+amt3.Int64(), ci.Amount.Int64())
require.EqualValues(t, 0, ci.PendingAmount.Int64())
require.Nil(t, ci.CreateMsg)
require.Nil(t, ci.AddFundsMsg)
}
// TestPaychGetRestartAfterCreateChannelMsg tests that if the system stops
// right after the create channel message is sent, the channel will be
// created when the system restarts.
func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// Send create message for a channel with value 10
amt := big.NewInt(10)
_, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
// Simulate shutting down lotus
pchapi.close()
// Create a new manager with the same datastore
sm2 := newMockStateManager()
pchapi2 := newMockPaychAPI()
defer pchapi2.close()
mgr2, err := newManager(sm2, store, pchapi2)
require.NoError(t, err)
// Should have no channels yet (message sent but channel not created)
cis, err := mgr2.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 0)
// 1. Set up create channel response (sent in response to WaitForMsg())
response := testChannelResponse(t, ch)
done := make(chan struct{})
go func() {
defer close(done)
// 2. Request add funds - should block until create channel has completed
amt2 := big.NewInt(5)
ch2, addFundsMsgCid, err := mgr2.GetPaych(ctx, from, to, amt2)
// 4. This GetPaych should return after create channel from first
// GetPaych completes
require.NoError(t, err)
// Expect the channel to have been created
require.Equal(t, ch, ch2)
// Expect add funds message CID to be different to create message cid
require.NotEqual(t, createMsgCid, addFundsMsgCid)
// Should have one channel, whose address is the channel that was created
cis, err := mgr2.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
// Amount should be amount sent to first GetPaych (to create
// channel).
// PendingAmount should be amount sent in second GetPaych
// (second GetPaych triggered add funds, which has not yet been confirmed)
ci, err := mgr2.GetChannelInfo(ch)
require.NoError(t, err)
require.EqualValues(t, 10, ci.Amount.Int64())
require.EqualValues(t, 5, ci.PendingAmount.Int64())
require.Nil(t, ci.CreateMsg)
}()
// Give the go routine above a moment to run
time.Sleep(time.Millisecond * 10)
// 3. Send create channel response
pchapi2.finishWaitingCalls(response)
<-done
}
// TestPaychGetRestartAfterAddFundsMsg tests that if the system stops
// right after the add funds message is sent, the add funds will be
// processed when the system restarts.
func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
ch := tutils.NewIDAddr(t, 100)
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// Send create message for a channel
amt := big.NewInt(10)
_, _, err = mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Send success create channel response
response := testChannelResponse(t, ch)
pchapi.finishWaitingCalls(response)
time.Sleep(time.Millisecond * 10)
// Send add funds message for channel
amt2 := big.NewInt(5)
_, _, err = mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
// Simulate shutting down lotus
pchapi.close()
// Create a new manager with the same datastore
sm2 := newMockStateManager()
pchapi2 := newMockPaychAPI()
defer pchapi2.close()
time.Sleep(time.Millisecond * 10)
mgr2, err := newManager(sm2, store, pchapi2)
require.NoError(t, err)
// Send success add funds response
pchapi2.finishWaitingCalls(types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
time.Sleep(time.Millisecond * 10)
// Should have one channel, whose address is the channel that was created
cis, err := mgr2.ListChannels()
require.NoError(t, err)
require.Len(t, cis, 1)
require.Equal(t, ch, cis[0])
// Amount should include amount for successful add funds msg
ci, err := mgr2.GetChannelInfo(ch)
require.NoError(t, err)
require.Equal(t, amt.Int64()+amt2.Int64(), ci.Amount.Int64())
require.EqualValues(t, 0, ci.PendingAmount.Int64())
require.Nil(t, ci.CreateMsg)
require.Nil(t, ci.AddFundsMsg)
}
// TestPaychGetWait tests that GetPaychWaitReady correctly waits for the
// channel to be created or funds to be added
func TestPaychGetWait(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// 1. Get
amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
done := make(chan address.Address)
go func() {
// 2. Wait till ready
ch, err := mgr.GetPaychWaitReady(ctx, mcid)
require.NoError(t, err)
done <- ch
}()
time.Sleep(time.Millisecond * 10)
// 3. Send response
expch := tutils.NewIDAddr(t, 100)
response := testChannelResponse(t, expch)
pchapi.finishWaitingCalls(response)
time.Sleep(time.Millisecond * 10)
ch := <-done
require.Equal(t, expch, ch)
// 4. Wait again - message has already been received so should
// return immediately
ch, err = mgr.GetPaychWaitReady(ctx, mcid)
require.NoError(t, err)
require.Equal(t, expch, ch)
// Request add funds
amt2 := big.NewInt(15)
_, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2)
require.NoError(t, err)
time.Sleep(time.Millisecond * 10)
go func() {
// 5. Wait for add funds
ch, err := mgr.GetPaychWaitReady(ctx, addFundsMsgCid)
require.NoError(t, err)
require.Equal(t, expch, ch)
done <- ch
}()
time.Sleep(time.Millisecond * 10)
// 6. Send add funds response
addFundsResponse := types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
}
pchapi.finishWaitingCalls(addFundsResponse)
<-done
}
// TestPaychGetWaitErr tests that GetPaychWaitReady correctly handles errors
func TestPaychGetWaitErr(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
// 1. Create channel
amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
done := make(chan address.Address)
go func() {
defer close(done)
// 2. Wait for channel to be created
_, err := mgr.GetPaychWaitReady(ctx, mcid)
// 4. Channel creation should have failed
require.NotNil(t, err)
// 5. Call wait again with the same message CID
_, err = mgr.GetPaychWaitReady(ctx, mcid)
// 6. Should return immediately with the same error
require.NotNil(t, err)
}()
// Give the wait a moment to start before sending response
time.Sleep(time.Millisecond * 10)
// 3. Send error response to create channel
response := types.MessageReceipt{
ExitCode: 1, // error
Return: []byte{},
}
pchapi.finishWaitingCalls(response)
<-done
}
// TestPaychGetWaitCtx tests that GetPaychWaitReady returns early if the context
// is cancelled
func TestPaychGetWaitCtx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mgr, err := newManager(sm, store, pchapi)
require.NoError(t, err)
amt := big.NewInt(10)
_, mcid, err := mgr.GetPaych(ctx, from, to, amt)
require.NoError(t, err)
// When the context is cancelled, should unblock wait
go func() {
time.Sleep(time.Millisecond * 10)
cancel()
}()
_, err = mgr.GetPaychWaitReady(ctx, mcid)
require.Error(t, ctx.Err(), err)
}