Merge pull request #2985 from filecoin-project/refactor/paych-mgr-api

Refactor paych mgr apis for easier mocking
This commit is contained in:
Łukasz Magiera 2020-08-11 19:23:52 +02:00 committed by GitHub
commit 75f5afc0da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 319 additions and 313 deletions

View File

@ -27,62 +27,71 @@ import (
var log = logging.Logger("paych")
type ManagerApi struct {
// PaychAPI is used by dependency injection to pass the consituent APIs to NewManager()
type PaychAPI struct {
fx.In
full.MpoolAPI
full.WalletAPI
full.StateAPI
}
type StateManagerApi interface {
// stateManagerAPI defines the methods needed from StateManager
type stateManagerAPI interface {
LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error)
Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error)
}
// paychAPI defines the API methods needed by the payment channel manager
type paychAPI interface {
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error)
}
// managerAPI defines all methods needed by the manager
type managerAPI interface {
stateManagerAPI
paychAPI
}
// managerAPIImpl is used to create a composite that implements managerAPI
type managerAPIImpl struct {
stateManagerAPI
paychAPI
}
type Manager struct {
// The Manager context is used to terminate wait operations on shutdown
ctx context.Context
shutdown context.CancelFunc
store *Store
sm StateManagerApi
sa *stateAccessor
pchapi paychApi
pchapi managerAPI
lk sync.RWMutex
channels map[string]*channelAccessor
mpool full.MpoolAPI
wallet full.WalletAPI
state full.StateAPI
}
func NewManager(mctx helpers.MetricsCtx, lc fx.Lifecycle, sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manager {
func NewManager(mctx helpers.MetricsCtx, lc fx.Lifecycle, sm *stmgr.StateManager, pchstore *Store, api PaychAPI) *Manager {
ctx := helpers.LifecycleCtx(mctx, lc)
ctx, shutdown := context.WithCancel(ctx)
impl := &managerAPIImpl{stateManagerAPI: sm, paychAPI: &api}
return &Manager{
ctx: ctx,
shutdown: shutdown,
store: pchstore,
sm: sm,
sa: &stateAccessor{sm: sm},
sa: &stateAccessor{sm: impl},
channels: make(map[string]*channelAccessor),
pchapi: &api,
mpool: api.MpoolAPI,
wallet: api.WalletAPI,
state: api.StateAPI,
pchapi: impl,
}
}
// newManager is used by the tests to supply mocks
func newManager(sm StateManagerApi, pchstore *Store, pchapi paychApi) (*Manager, error) {
func newManager(pchstore *Store, pchapi managerAPI) (*Manager, error) {
pm := &Manager{
store: pchstore,
sm: sm,
sa: &stateAccessor{sm: sm},
sa: &stateAccessor{sm: pchapi},
channels: make(map[string]*channelAccessor),
pchapi: pchapi,
}

180
paychmgr/mock_test.go Normal file
View File

@ -0,0 +1,180 @@
package paychmgr
import (
"context"
"fmt"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/builtin/account"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/ipfs/go-cid"
)
type mockManagerAPI struct {
*mockStateManager
*mockPaychAPI
}
func newMockManagerAPI() *mockManagerAPI {
return &mockManagerAPI{
mockStateManager: newMockStateManager(),
mockPaychAPI: newMockPaychAPI(),
}
}
type mockPchState struct {
actor *types.Actor
state paych.State
}
type mockStateManager struct {
lk sync.Mutex
accountState map[address.Address]account.State
paychState map[address.Address]mockPchState
response *api.InvocResult
}
func newMockStateManager() *mockStateManager {
return &mockStateManager{
accountState: make(map[address.Address]account.State),
paychState: make(map[address.Address]mockPchState),
}
}
func (sm *mockStateManager) setAccountState(a address.Address, state account.State) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.accountState[a] = state
}
func (sm *mockStateManager) setPaychState(a address.Address, actor *types.Actor, state paych.State) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.paychState[a] = mockPchState{actor, state}
}
func (sm *mockStateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) {
sm.lk.Lock()
defer sm.lk.Unlock()
if outState, ok := out.(*account.State); ok {
*outState = sm.accountState[a]
return nil, nil
}
if outState, ok := out.(*paych.State); ok {
info := sm.paychState[a]
*outState = info.state
return info.actor, nil
}
panic(fmt.Sprintf("unexpected state type %v", out))
}
func (sm *mockStateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) {
return sm.response, nil
}
type waitingCall struct {
response chan types.MessageReceipt
}
type waitingResponse struct {
receipt types.MessageReceipt
done chan struct{}
}
type mockPaychAPI struct {
lk sync.Mutex
messages map[cid.Cid]*types.SignedMessage
waitingCalls map[cid.Cid]*waitingCall
waitingResponses map[cid.Cid]*waitingResponse
}
func newMockPaychAPI() *mockPaychAPI {
return &mockPaychAPI{
messages: make(map[cid.Cid]*types.SignedMessage),
waitingCalls: make(map[cid.Cid]*waitingCall),
waitingResponses: make(map[cid.Cid]*waitingResponse),
}
}
func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, confidence uint64) (*api.MsgLookup, error) {
pchapi.lk.Lock()
response := make(chan types.MessageReceipt)
if response, ok := pchapi.waitingResponses[mcid]; ok {
defer pchapi.lk.Unlock()
defer func() {
go close(response.done)
}()
delete(pchapi.waitingResponses, mcid)
return &api.MsgLookup{Receipt: response.receipt}, nil
}
pchapi.waitingCalls[mcid] = &waitingCall{response: response}
pchapi.lk.Unlock()
receipt := <-response
return &api.MsgLookup{Receipt: receipt}, nil
}
func (pchapi *mockPaychAPI) receiveMsgResponse(mcid cid.Cid, receipt types.MessageReceipt) {
pchapi.lk.Lock()
if call, ok := pchapi.waitingCalls[mcid]; ok {
defer pchapi.lk.Unlock()
delete(pchapi.waitingCalls, mcid)
call.response <- receipt
return
}
done := make(chan struct{})
pchapi.waitingResponses[mcid] = &waitingResponse{receipt: receipt, done: done}
pchapi.lk.Unlock()
<-done
}
// Send success response for any waiting calls
func (pchapi *mockPaychAPI) close() {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
success := types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
}
for mcid, call := range pchapi.waitingCalls {
delete(pchapi.waitingCalls, mcid)
call.response <- success
}
}
func (pchapi *mockPaychAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
smsg := &types.SignedMessage{Message: *msg}
pchapi.messages[smsg.Cid()] = smsg
return smsg, nil
}
func (pchapi *mockPaychAPI) pushedMessages(c cid.Cid) *types.SignedMessage {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
return pchapi.messages[c]
}
func (pchapi *mockPaychAPI) pushedMessageCount() int {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
return len(pchapi.messages)
}

View File

@ -24,9 +24,8 @@ type channelAccessor struct {
// waitCtx is used by processes that wait for things to be confirmed
// on chain
waitCtx context.Context
sm StateManagerApi
sa *stateAccessor
api paychApi
api managerAPI
store *Store
lk *channelLock
fundsReqQueue []*fundsReq
@ -36,8 +35,7 @@ type channelAccessor struct {
func newChannelAccessor(pm *Manager) *channelAccessor {
return &channelAccessor{
lk: &channelLock{globalLock: &pm.lk},
sm: pm.sm,
sa: &stateAccessor{sm: pm.sm},
sa: pm.sa,
api: pm.pchapi,
store: pm.store,
msgListeners: newMsgListeners(),
@ -70,7 +68,7 @@ func (ca *channelAccessor) checkVoucherValidUnlocked(ctx context.Context, ch add
}
var actState account.State
_, err = ca.sm.LoadActorState(ctx, pchState.From, &actState, nil)
_, err = ca.api.LoadActorState(ctx, pchState.From, &actState, nil)
if err != nil {
return nil, err
}
@ -181,7 +179,7 @@ func (ca *channelAccessor) checkVoucherSpendable(ctx context.Context, ch address
return false, err
}
ret, err := ca.sm.Call(ctx, &types.Message{
ret, err := ca.api.Call(ctx, &types.Message{
From: recipient,
To: ch,
Method: builtin.MethodsPaych.UpdateChannelState,
@ -200,7 +198,7 @@ func (ca *channelAccessor) checkVoucherSpendable(ctx context.Context, ch address
func (ca *channelAccessor) getPaychRecipient(ctx context.Context, ch address.Address) (address.Address, error) {
var state paych.State
if _, err := ca.sm.LoadActorState(ctx, ch, &state, nil); err != nil {
if _, err := ca.api.LoadActorState(ctx, ch, &state, nil); err != nil {
return address.Address{}, err
}

View File

@ -2,8 +2,6 @@ package paychmgr
import (
"context"
"fmt"
"sync"
"testing"
"github.com/filecoin-project/specs-actors/actors/builtin"
@ -24,64 +22,12 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin/account"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
)
type testPchState struct {
actor *types.Actor
state paych.State
}
type mockStateManager struct {
lk sync.Mutex
accountState map[address.Address]account.State
paychState map[address.Address]testPchState
response *api.InvocResult
}
func newMockStateManager() *mockStateManager {
return &mockStateManager{
accountState: make(map[address.Address]account.State),
paychState: make(map[address.Address]testPchState),
}
}
func (sm *mockStateManager) setAccountState(a address.Address, state account.State) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.accountState[a] = state
}
func (sm *mockStateManager) setPaychState(a address.Address, actor *types.Actor, state paych.State) {
sm.lk.Lock()
defer sm.lk.Unlock()
sm.paychState[a] = testPchState{actor, state}
}
func (sm *mockStateManager) LoadActorState(ctx context.Context, a address.Address, out interface{}, ts *types.TipSet) (*types.Actor, error) {
sm.lk.Lock()
defer sm.lk.Unlock()
if outState, ok := out.(*account.State); ok {
*outState = sm.accountState[a]
return nil, nil
}
if outState, ok := out.(*paych.State); ok {
info := sm.paychState[a]
*outState = info.state
return info.actor, nil
}
panic(fmt.Sprintf("unexpected state type %v", out))
}
func (sm *mockStateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) {
return sm.response, nil
}
func TestPaychOutbound(t *testing.T) {
ctx := context.Background()
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
@ -92,10 +38,10 @@ func TestPaychOutbound(t *testing.T) {
fromAcct := tutils.NewIDAddr(t, 201)
toAcct := tutils.NewIDAddr(t, 202)
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
sm.setPaychState(ch, nil, paych.State{
mock := newMockManagerAPI()
mock.setAccountState(fromAcct, account.State{Address: from})
mock.setAccountState(toAcct, account.State{Address: to})
mock.setPaychState(ch, nil, paych.State{
From: fromAcct,
To: toAcct,
ToSend: big.NewInt(0),
@ -104,7 +50,7 @@ func TestPaychOutbound(t *testing.T) {
LaneStates: []*paych.LaneState{},
})
mgr, err := newManager(sm, store, nil)
mgr, err := newManager(store, mock)
require.NoError(t, err)
err = mgr.TrackOutboundChannel(ctx, ch)
@ -130,10 +76,10 @@ func TestPaychInbound(t *testing.T) {
fromAcct := tutils.NewIDAddr(t, 201)
toAcct := tutils.NewIDAddr(t, 202)
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
sm.setPaychState(ch, nil, paych.State{
mock := newMockManagerAPI()
mock.setAccountState(fromAcct, account.State{Address: from})
mock.setAccountState(toAcct, account.State{Address: to})
mock.setPaychState(ch, nil, paych.State{
From: fromAcct,
To: toAcct,
ToSend: big.NewInt(0),
@ -142,7 +88,7 @@ func TestPaychInbound(t *testing.T) {
LaneStates: []*paych.LaneState{},
})
mgr, err := newManager(sm, store, nil)
mgr, err := newManager(store, mock)
require.NoError(t, err)
err = mgr.TrackInboundChannel(ctx, ch)
@ -170,9 +116,9 @@ func TestCheckVoucherValid(t *testing.T) {
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
mock := newMockManagerAPI()
mock.setAccountState(fromAcct, account.State{Address: from})
mock.setAccountState(toAcct, account.State{Address: to})
tcases := []struct {
name string
@ -316,7 +262,7 @@ func TestCheckVoucherValid(t *testing.T) {
Nonce: 0,
Balance: tcase.actorBalance,
}
sm.setPaychState(ch, act, paych.State{
mock.setPaychState(ch, act, paych.State{
From: fromAcct,
To: toAcct,
ToSend: tcase.toSend,
@ -325,7 +271,7 @@ func TestCheckVoucherValid(t *testing.T) {
LaneStates: tcase.laneStates,
})
mgr, err := newManager(sm, store, nil)
mgr, err := newManager(store, mock)
require.NoError(t, err)
err = mgr.TrackInboundChannel(ctx, ch)
@ -355,9 +301,9 @@ func TestCheckVoucherValidCountingAllLanes(t *testing.T) {
toAcct := tutils.NewActorAddr(t, "toAct")
minDelta := big.NewInt(0)
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
mock := newMockManagerAPI()
mock.setAccountState(fromAcct, account.State{Address: from})
mock.setAccountState(toAcct, account.State{Address: to})
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
@ -379,7 +325,7 @@ func TestCheckVoucherValidCountingAllLanes(t *testing.T) {
Nonce: 0,
Balance: actorBalance,
}
sm.setPaychState(ch, act, paych.State{
mock.setPaychState(ch, act, paych.State{
From: fromAcct,
To: toAcct,
ToSend: toSend,
@ -388,7 +334,7 @@ func TestCheckVoucherValidCountingAllLanes(t *testing.T) {
LaneStates: laneStates,
})
mgr, err := newManager(sm, store, nil)
mgr, err := newManager(store, mock)
require.NoError(t, err)
err = mgr.TrackInboundChannel(ctx, ch)
@ -678,9 +624,9 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, addre
fromAcct := tutils.NewActorAddr(t, "fromAct")
toAcct := tutils.NewActorAddr(t, "toAct")
sm := newMockStateManager()
sm.setAccountState(fromAcct, account.State{Address: from})
sm.setAccountState(toAcct, account.State{Address: to})
mock := newMockManagerAPI()
mock.setAccountState(fromAcct, account.State{Address: from})
mock.setAccountState(toAcct, account.State{Address: to})
act := &types.Actor{
Code: builtin.AccountActorCodeID,
@ -688,7 +634,7 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, addre
Nonce: 0,
Balance: big.NewInt(20),
}
sm.setPaychState(ch, act, paych.State{
mock.setPaychState(ch, act, paych.State{
From: fromAcct,
To: toAcct,
ToSend: big.NewInt(0),
@ -698,7 +644,7 @@ func testSetupMgrWithChannel(ctx context.Context, t *testing.T) (*Manager, addre
})
store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
mgr, err := newManager(sm, store, nil)
mgr, err := newManager(store, mock)
require.NoError(t, err)
err = mgr.TrackInboundChannel(ctx, ch)

View File

@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-address"
@ -26,109 +25,6 @@ import (
"github.com/stretchr/testify/require"
)
type waitingCall struct {
response chan types.MessageReceipt
}
type waitingResponse struct {
receipt types.MessageReceipt
done chan struct{}
}
type mockPaychAPI struct {
lk sync.Mutex
messages map[cid.Cid]*types.SignedMessage
waitingCalls map[cid.Cid]*waitingCall
waitingResponses map[cid.Cid]*waitingResponse
}
func newMockPaychAPI() *mockPaychAPI {
return &mockPaychAPI{
messages: make(map[cid.Cid]*types.SignedMessage),
waitingCalls: make(map[cid.Cid]*waitingCall),
waitingResponses: make(map[cid.Cid]*waitingResponse),
}
}
func (pchapi *mockPaychAPI) StateWaitMsg(ctx context.Context, mcid cid.Cid, confidence uint64) (*api.MsgLookup, error) {
pchapi.lk.Lock()
response := make(chan types.MessageReceipt)
if response, ok := pchapi.waitingResponses[mcid]; ok {
defer pchapi.lk.Unlock()
defer func() {
go close(response.done)
}()
delete(pchapi.waitingResponses, mcid)
return &api.MsgLookup{Receipt: response.receipt}, nil
}
pchapi.waitingCalls[mcid] = &waitingCall{response: response}
pchapi.lk.Unlock()
receipt := <-response
return &api.MsgLookup{Receipt: receipt}, nil
}
func (pchapi *mockPaychAPI) receiveMsgResponse(mcid cid.Cid, receipt types.MessageReceipt) {
pchapi.lk.Lock()
if call, ok := pchapi.waitingCalls[mcid]; ok {
defer pchapi.lk.Unlock()
delete(pchapi.waitingCalls, mcid)
call.response <- receipt
return
}
done := make(chan struct{})
pchapi.waitingResponses[mcid] = &waitingResponse{receipt: receipt, done: done}
pchapi.lk.Unlock()
<-done
}
// Send success response for any waiting calls
func (pchapi *mockPaychAPI) close() {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
success := types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
}
for mcid, call := range pchapi.waitingCalls {
delete(pchapi.waitingCalls, mcid)
call.response <- success
}
}
func (pchapi *mockPaychAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
smsg := &types.SignedMessage{Message: *msg}
pchapi.messages[smsg.Cid()] = smsg
return smsg, nil
}
func (pchapi *mockPaychAPI) pushedMessages(c cid.Cid) *types.SignedMessage {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
return pchapi.messages[c]
}
func (pchapi *mockPaychAPI) pushedMessageCount() int {
pchapi.lk.Lock()
defer pchapi.lk.Unlock()
return len(pchapi.messages)
}
func testChannelResponse(t *testing.T, ch address.Address) types.MessageReceipt {
createChannelRet := init_.ExecReturn{
IDAddress: ch,
@ -152,11 +48,10 @@ func TestPaychGetCreateChannelMsg(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
amt := big.NewInt(10)
@ -164,7 +59,7 @@ func TestPaychGetCreateChannelMsg(t *testing.T) {
require.NoError(t, err)
require.Equal(t, address.Undef, ch)
pushedMsg := pchapi.pushedMessages(mcid)
pushedMsg := mock.pushedMessages(mcid)
require.Equal(t, from, pushedMsg.Message.From)
require.Equal(t, builtin.InitActorAddr, pushedMsg.Message.To)
require.Equal(t, amt, pushedMsg.Message.Value)
@ -180,11 +75,10 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
@ -234,7 +128,7 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
require.Nil(t, ci.CreateMsg)
// Trigger add funds confirmation
pchapi.receiveMsgResponse(addFundsMsgCid, types.MessageReceipt{ExitCode: 0})
mock.receiveMsgResponse(addFundsMsgCid, types.MessageReceipt{ExitCode: 0})
// Wait for add funds confirmation to be processed by manager
_, err = mgr.GetPaychWaitReady(ctx, addFundsMsgCid)
@ -255,7 +149,7 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) {
}()
// 3. Send create channel response
pchapi.receiveMsgResponse(createMsgCid, response)
mock.receiveMsgResponse(createMsgCid, response)
<-done
}
@ -270,11 +164,10 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel
@ -304,7 +197,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
// 4. Send a success response
ch := tutils.NewIDAddr(t, 100)
successResponse := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid2, successResponse)
mock.receiveMsgResponse(mcid2, successResponse)
_, err = mgr.GetPaychWaitReady(ctx, mcid2)
require.NoError(t, err)
@ -321,7 +214,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) {
}()
// 3. Send error response to first channel create
pchapi.receiveMsgResponse(mcid1, errResponse)
mock.receiveMsgResponse(mcid1, errResponse)
<-done
}
@ -336,11 +229,10 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel
@ -349,7 +241,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
require.NoError(t, err)
// Send error create channel response
pchapi.receiveMsgResponse(mcid, types.MessageReceipt{
mock.receiveMsgResponse(mcid, types.MessageReceipt{
ExitCode: 1, // error
Return: []byte{},
})
@ -361,7 +253,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) {
// Send success create channel response
response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid2, response)
mock.receiveMsgResponse(mcid2, response)
_, err = mgr.GetPaychWaitReady(ctx, mcid2)
require.NoError(t, err)
@ -389,11 +281,10 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel
@ -403,7 +294,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
// Send success create channel response
response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid1, response)
mock.receiveMsgResponse(mcid1, response)
// Send add funds message for channel
amt2 := big.NewInt(5)
@ -411,7 +302,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
require.NoError(t, err)
// Send error add funds response
pchapi.receiveMsgResponse(mcid2, types.MessageReceipt{
mock.receiveMsgResponse(mcid2, types.MessageReceipt{
ExitCode: 1, // error
Return: []byte{},
})
@ -438,7 +329,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) {
require.NoError(t, err)
// Send success add funds response
pchapi.receiveMsgResponse(mcid3, types.MessageReceipt{
mock.receiveMsgResponse(mcid3, types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
@ -472,10 +363,9 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
mock := newMockManagerAPI()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
@ -484,14 +374,13 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
require.NoError(t, err)
// Simulate shutting down system
pchapi.close()
mock.close()
// Create a new manager with the same datastore
sm2 := newMockStateManager()
pchapi2 := newMockPaychAPI()
defer pchapi2.close()
mock2 := newMockManagerAPI()
defer mock2.close()
mgr2, err := newManager(sm2, store, pchapi2)
mgr2, err := newManager(store, mock2)
require.NoError(t, err)
// Should have no channels yet (message sent but channel not created)
@ -537,7 +426,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) {
}()
// 3. Send create channel response
pchapi2.receiveMsgResponse(createMsgCid, response)
mock2.receiveMsgResponse(createMsgCid, response)
<-done
}
@ -553,10 +442,9 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
mock := newMockManagerAPI()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel
@ -566,7 +454,7 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
// Send success create channel response
response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(mcid1, response)
mock.receiveMsgResponse(mcid1, response)
// Send add funds message for channel
amt2 := big.NewInt(5)
@ -574,18 +462,17 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) {
require.NoError(t, err)
// Simulate shutting down system
pchapi.close()
mock.close()
// Create a new manager with the same datastore
sm2 := newMockStateManager()
pchapi2 := newMockPaychAPI()
defer pchapi2.close()
mock2 := newMockManagerAPI()
defer mock2.close()
mgr2, err := newManager(sm2, store, pchapi2)
mgr2, err := newManager(store, mock2)
require.NoError(t, err)
// Send success add funds response
pchapi2.receiveMsgResponse(mcid2, types.MessageReceipt{
mock2.receiveMsgResponse(mcid2, types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
@ -617,11 +504,10 @@ func TestPaychGetWait(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// 1. Get
@ -633,7 +519,7 @@ func TestPaychGetWait(t *testing.T) {
go func() {
// 3. Send response
response := testChannelResponse(t, expch)
pchapi.receiveMsgResponse(createMsgCid, response)
mock.receiveMsgResponse(createMsgCid, response)
}()
// 2. Wait till ready
@ -658,7 +544,7 @@ func TestPaychGetWait(t *testing.T) {
ExitCode: 0,
Return: []byte{},
}
pchapi.receiveMsgResponse(addFundsMsgCid, addFundsResponse)
mock.receiveMsgResponse(addFundsMsgCid, addFundsResponse)
}()
// 5. Wait for add funds
@ -675,11 +561,10 @@ func TestPaychGetWaitErr(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// 1. Create channel
@ -709,7 +594,7 @@ func TestPaychGetWaitErr(t *testing.T) {
ExitCode: 1, // error
Return: []byte{},
}
pchapi.receiveMsgResponse(mcid, response)
mock.receiveMsgResponse(mcid, response)
<-done
}
@ -723,11 +608,10 @@ func TestPaychGetWaitCtx(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
amt := big.NewInt(10)
@ -754,11 +638,10 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
@ -800,7 +683,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
// Send create channel response
response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(createMsgCid, response)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
@ -817,7 +700,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
require.Equal(t, addFundsMcid1, addFundsMcid2)
// Send success add funds response
pchapi.receiveMsgResponse(addFundsMcid1, types.MessageReceipt{
mock.receiveMsgResponse(addFundsMcid1, types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
@ -829,17 +712,17 @@ func TestPaychGetMergeAddFunds(t *testing.T) {
// Make sure that one create channel message and one add funds message was
// sent
require.Equal(t, 2, pchapi.pushedMessageCount())
require.Equal(t, 2, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := pchapi.pushedMessages(createMsgCid)
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, builtin.InitActorAddr, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
// Check merged add funds amount is the sum of the individual
// amounts
addFundsMsg := pchapi.pushedMessages(addFundsMcid1)
addFundsMsg := mock.pushedMessages(addFundsMcid1)
require.Equal(t, from, addFundsMsg.Message.From)
require.Equal(t, ch, addFundsMsg.Message.To)
require.Equal(t, types.BigAdd(addFundsAmt1, addFundsAmt2), addFundsMsg.Message.Value)
@ -855,11 +738,10 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
@ -899,7 +781,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
// Send create channel response
response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(createMsgCid, response)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
@ -914,7 +796,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
require.Equal(t, ch, addFundsCh2)
// Send success add funds response
pchapi.receiveMsgResponse(addFundsMcid2, types.MessageReceipt{
mock.receiveMsgResponse(addFundsMcid2, types.MessageReceipt{
ExitCode: 0,
Return: []byte{},
})
@ -926,17 +808,17 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) {
// Make sure that one create channel message and one add funds message was
// sent
require.Equal(t, 2, pchapi.pushedMessageCount())
require.Equal(t, 2, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := pchapi.pushedMessages(createMsgCid)
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, builtin.InitActorAddr, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)
// Check merged add funds amount only includes the second add funds amount
// (because first was cancelled)
addFundsMsg := pchapi.pushedMessages(addFundsMcid2)
addFundsMsg := mock.pushedMessages(addFundsMcid2)
require.Equal(t, from, addFundsMsg.Message.From)
require.Equal(t, ch, addFundsMsg.Message.To)
require.Equal(t, addFundsAmt2, addFundsMsg.Message.Value)
@ -952,11 +834,10 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
// Send create message for a channel with value 10
@ -995,7 +876,7 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
// Send create channel response
response := testChannelResponse(t, ch)
pchapi.receiveMsgResponse(createMsgCid, response)
mock.receiveMsgResponse(createMsgCid, response)
// Wait for create channel response
chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid)
@ -1009,10 +890,10 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) {
require.NotNil(t, addFundsErr2)
// Make sure that just the create channel message was sent
require.Equal(t, 1, pchapi.pushedMessageCount())
require.Equal(t, 1, mock.pushedMessageCount())
// Check create message amount is correct
createMsg := pchapi.pushedMessages(createMsgCid)
createMsg := mock.pushedMessages(createMsgCid)
require.Equal(t, from, createMsg.Message.From)
require.Equal(t, builtin.InitActorAddr, createMsg.Message.To)
require.Equal(t, createAmt, createMsg.Message.Value)

View File

@ -22,11 +22,10 @@ func TestPaychSettle(t *testing.T) {
from := tutils.NewIDAddr(t, 101)
to := tutils.NewIDAddr(t, 102)
sm := newMockStateManager()
pchapi := newMockPaychAPI()
defer pchapi.close()
mock := newMockManagerAPI()
defer mock.close()
mgr, err := newManager(sm, store, pchapi)
mgr, err := newManager(store, mock)
require.NoError(t, err)
amt := big.NewInt(10)
@ -35,7 +34,7 @@ func TestPaychSettle(t *testing.T) {
// Send channel create response
response := testChannelResponse(t, expch)
pchapi.receiveMsgResponse(mcid, response)
mock.receiveMsgResponse(mcid, response)
// Get the channel address
ch, err := mgr.GetPaychWaitReady(ctx, mcid)
@ -56,7 +55,7 @@ func TestPaychSettle(t *testing.T) {
// Send new channel create response
response2 := testChannelResponse(t, expch2)
pchapi.receiveMsgResponse(mcid2, response2)
mock.receiveMsgResponse(mcid2, response2)
// Make sure the new channel is different from the old channel
ch2, err := mgr.GetPaychWaitReady(ctx, mcid2)

View File

@ -8,8 +8,6 @@ import (
"golang.org/x/sync/errgroup"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
@ -25,11 +23,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type paychApi interface {
StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error)
MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error)
}
// paychFundsRes is the response to a create channel or add funds request
type paychFundsRes struct {
channel address.Address

View File

@ -12,7 +12,7 @@ import (
)
type stateAccessor struct {
sm StateManagerApi
sm stateManagerAPI
}
func (ca *stateAccessor) loadPaychState(ctx context.Context, ch address.Address) (*types.Actor, *paych.State, error) {