package market

import (
	"bytes"
	"context"
	"sync"
	"testing"
	"time"

	"github.com/filecoin-project/go-address"
	"github.com/filecoin-project/go-state-types/abi"
	"github.com/filecoin-project/lotus/api"
	"github.com/filecoin-project/lotus/chain/actors/builtin/market"
	"github.com/filecoin-project/lotus/chain/types"
	"github.com/filecoin-project/lotus/chain/wallet"
	tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
	"github.com/ipfs/go-cid"
	ds "github.com/ipfs/go-datastore"
	ds_sync "github.com/ipfs/go-datastore/sync"
	"github.com/stretchr/testify/require"
)

// TestFundManagerBasic verifies that the basic fund manager operations work
func TestFundManagerBasic(t *testing.T) {
	s := setup(t)
	defer s.fm.Stop()

	// Reserve 10
	// balance:  0 -> 10
	// reserved: 0 -> 10
	amt := abi.NewTokenAmount(10)
	sentinel, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)

	msg := s.mockApi.getSentMessage(sentinel)
	checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)

	s.mockApi.completeMsg(sentinel)

	// Reserve 7
	// balance:  10 -> 17
	// reserved: 10 -> 17
	amt = abi.NewTokenAmount(7)
	sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)

	msg = s.mockApi.getSentMessage(sentinel)
	checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)

	s.mockApi.completeMsg(sentinel)

	// Release 5
	// balance:  17
	// reserved: 17 -> 12
	amt = abi.NewTokenAmount(5)
	err = s.fm.Release(s.acctAddr, amt)
	require.NoError(t, err)

	// Withdraw 2
	// balance:  17 -> 15
	// reserved: 12
	amt = abi.NewTokenAmount(2)
	sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)

	msg = s.mockApi.getSentMessage(sentinel)
	checkWithdrawMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)

	s.mockApi.completeMsg(sentinel)

	// Reserve 3
	// balance:  15
	// reserved: 12 -> 15
	// Note: reserved (15) is <= balance (15) so should not send on-chain
	// message
	msgCount := s.mockApi.messageCount()
	amt = abi.NewTokenAmount(3)
	sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)
	require.Equal(t, msgCount, s.mockApi.messageCount())
	require.Equal(t, sentinel, cid.Undef)

	// Reserve 1
	// balance:  15 -> 16
	// reserved: 15 -> 16
	// Note: reserved (16) is above balance (15) so *should* send on-chain
	// message to top up balance
	amt = abi.NewTokenAmount(1)
	topUp := abi.NewTokenAmount(1)
	sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)

	s.mockApi.completeMsg(sentinel)
	msg = s.mockApi.getSentMessage(sentinel)
	checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp)

	// Withdraw 1
	// balance:  16
	// reserved: 16
	// Note: Expect failure because there is no available balance to withdraw:
	// balance - reserved = 16 - 16 = 0
	amt = abi.NewTokenAmount(1)
	sentinel, err = s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.Error(t, err)
}

// TestFundManagerParallel verifies that operations can be run in parallel
func TestFundManagerParallel(t *testing.T) {
	s := setup(t)
	defer s.fm.Stop()

	// Reserve 10
	amt := abi.NewTokenAmount(10)
	sentinelReserve10, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)

	// Wait until all the subsequent requests are queued up
	queueReady := make(chan struct{})
	fa := s.fm.getFundedAddress(s.acctAddr)
	fa.onProcessStart(func() bool {
		if len(fa.withdrawals) == 1 && len(fa.reservations) == 2 && len(fa.releases) == 1 {
			close(queueReady)
			return true
		}
		return false
	})

	// Withdraw 5 (should not run until after reserves / releases)
	withdrawReady := make(chan error)
	go func() {
		amt = abi.NewTokenAmount(5)
		_, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
		withdrawReady <- err
	}()

	reserveSentinels := make(chan cid.Cid)

	// Reserve 3
	go func() {
		amt := abi.NewTokenAmount(3)
		sentinelReserve3, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
		require.NoError(t, err)
		reserveSentinels <- sentinelReserve3
	}()

	// Reserve 5
	go func() {
		amt := abi.NewTokenAmount(5)
		sentinelReserve5, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
		require.NoError(t, err)
		reserveSentinels <- sentinelReserve5
	}()

	// Release 2
	go func() {
		amt := abi.NewTokenAmount(2)
		err = s.fm.Release(s.acctAddr, amt)
		require.NoError(t, err)
	}()

	// Everything is queued up
	<-queueReady

	// Complete the "Reserve 10" message
	s.mockApi.completeMsg(sentinelReserve10)
	msg := s.mockApi.getSentMessage(sentinelReserve10)
	checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10))

	// The other requests should now be combined and be submitted on-chain as
	// a single message
	rs1 := <-reserveSentinels
	rs2 := <-reserveSentinels
	require.Equal(t, rs1, rs2)

	// Withdraw should not have been called yet, because reserve / release
	// requests run first
	select {
	case <-withdrawReady:
		require.Fail(t, "Withdraw should run after reserve / release")
	default:
	}

	// Complete the message
	s.mockApi.completeMsg(rs1)
	msg = s.mockApi.getSentMessage(rs1)

	// "Reserve 3" +3
	// "Reserve 5" +5
	// "Release 2" -2
	// Result:      6
	checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(6))

	// Expect withdraw to fail because not enough available funds
	err = <-withdrawReady
	require.Error(t, err)
}

// TestFundManagerReserveByWallet verifies that reserve requests are grouped by wallet
func TestFundManagerReserveByWallet(t *testing.T) {
	s := setup(t)
	defer s.fm.Stop()

	walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
	require.NoError(t, err)
	walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
	require.NoError(t, err)

	// Wait until all the reservation requests are queued up
	walletAQueuedUp := make(chan struct{})
	queueReady := make(chan struct{})
	fa := s.fm.getFundedAddress(s.acctAddr)
	fa.onProcessStart(func() bool {
		if len(fa.reservations) == 1 {
			close(walletAQueuedUp)
		}
		if len(fa.reservations) == 3 {
			close(queueReady)
			return true
		}
		return false
	})

	type reserveResult struct {
		ws  cid.Cid
		err error
	}
	results := make(chan *reserveResult)

	amtA1 := abi.NewTokenAmount(1)
	go func() {
		// Wallet A: Reserve 1
		sentinelA1, err := s.fm.Reserve(s.ctx, walletAddrA, s.acctAddr, amtA1)
		results <- &reserveResult{
			ws:  sentinelA1,
			err: err,
		}
	}()

	amtB1 := abi.NewTokenAmount(2)
	amtB2 := abi.NewTokenAmount(3)
	go func() {
		// Wait for reservation for wallet A to be queued up
		<-walletAQueuedUp

		// Wallet B: Reserve 2
		go func() {
			sentinelB1, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB1)
			results <- &reserveResult{
				ws:  sentinelB1,
				err: err,
			}
		}()

		// Wallet B: Reserve 3
		sentinelB2, err := s.fm.Reserve(s.ctx, walletAddrB, s.acctAddr, amtB2)
		results <- &reserveResult{
			ws:  sentinelB2,
			err: err,
		}
	}()

	// All reservation requests are queued up
	<-queueReady

	resA := <-results
	sentinelA1 := resA.ws

	// Should send to wallet A
	msg := s.mockApi.getSentMessage(sentinelA1)
	checkAddMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)

	// Complete wallet A message
	s.mockApi.completeMsg(sentinelA1)

	resB1 := <-results
	resB2 := <-results
	require.NoError(t, resB1.err)
	require.NoError(t, resB2.err)
	sentinelB1 := resB1.ws
	sentinelB2 := resB2.ws

	// Should send different message to wallet B
	require.NotEqual(t, sentinelA1, sentinelB1)
	// Should be single message combining amount 1 and 2
	require.Equal(t, sentinelB1, sentinelB2)
	msg = s.mockApi.getSentMessage(sentinelB1)
	checkAddMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2))
}

// TestFundManagerWithdrawal verifies that as many withdraw operations as
// possible are processed
func TestFundManagerWithdrawalLimit(t *testing.T) {
	s := setup(t)
	defer s.fm.Stop()

	// Reserve 10
	amt := abi.NewTokenAmount(10)
	sentinelReserve10, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)

	// Complete the "Reserve 10" message
	s.mockApi.completeMsg(sentinelReserve10)

	// Release 10
	err = s.fm.Release(s.acctAddr, amt)
	require.NoError(t, err)

	// Queue up withdraw requests
	queueReady := make(chan struct{})
	fa := s.fm.getFundedAddress(s.acctAddr)
	withdrawalReqTotal := 3
	withdrawalReqEnqueued := 0
	withdrawalReqQueue := make(chan func(), withdrawalReqTotal)
	fa.onProcessStart(func() bool {
		// If a new withdrawal request was enqueued
		if len(fa.withdrawals) > withdrawalReqEnqueued {
			withdrawalReqEnqueued++

			// Pop the next request and run it
			select {
			case fn := <-withdrawalReqQueue:
				go fn()
			default:
			}
		}
		// Once all the requests have arrived, we're ready to process the queue
		if withdrawalReqEnqueued == withdrawalReqTotal {
			close(queueReady)
			return true
		}
		return false
	})

	type withdrawResult struct {
		reqIndex int
		ws       cid.Cid
		err      error
	}
	withdrawRes := make(chan *withdrawResult)

	// Queue up three "Withdraw 5" requests
	enqueuedCount := 0
	for i := 0; i < withdrawalReqTotal; i++ {
		withdrawalReqQueue <- func() {
			idx := enqueuedCount
			enqueuedCount++

			amt := abi.NewTokenAmount(5)
			ws, err := s.fm.Withdraw(s.ctx, s.walletAddr, s.acctAddr, amt)
			withdrawRes <- &withdrawResult{reqIndex: idx, ws: ws, err: err}
		}
	}
	// Start the first request
	fn := <-withdrawalReqQueue
	go fn()

	// All withdrawal requests are queued up and ready to be processed
	<-queueReady

	// Organize results in request order
	results := make([]*withdrawResult, withdrawalReqTotal)
	for i := 0; i < 3; i++ {
		res := <-withdrawRes
		results[res.reqIndex] = res
	}

	// Available 10
	// Withdraw 5
	// Expect Success
	require.NoError(t, results[0].err)
	// Available 5
	// Withdraw 5
	// Expect Success
	require.NoError(t, results[1].err)
	// Available 0
	// Withdraw 5
	// Expect FAIL
	require.Error(t, results[2].err)

	// Expect withdrawal requests that fit under reserved amount to be combined
	// into a single message on-chain
	require.Equal(t, results[0].ws, results[1].ws)
}

// TestFundManagerWithdrawByWallet verifies that withdraw requests are grouped by wallet
func TestFundManagerWithdrawByWallet(t *testing.T) {
	s := setup(t)
	defer s.fm.Stop()

	walletAddrA, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
	require.NoError(t, err)
	walletAddrB, err := s.wllt.WalletNew(context.Background(), types.KTSecp256k1)
	require.NoError(t, err)

	// Reserve 10
	reserveAmt := abi.NewTokenAmount(10)
	sentinelReserve, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, reserveAmt)
	require.NoError(t, err)
	s.mockApi.completeMsg(sentinelReserve)

	time.Sleep(10 * time.Millisecond)

	// Release 10
	err = s.fm.Release(s.acctAddr, reserveAmt)
	require.NoError(t, err)

	type withdrawResult struct {
		ws  cid.Cid
		err error
	}
	results := make(chan *withdrawResult)

	// Wait until withdrawals are queued up
	walletAQueuedUp := make(chan struct{})
	queueReady := make(chan struct{})
	withdrawalCount := 0
	fa := s.fm.getFundedAddress(s.acctAddr)
	fa.onProcessStart(func() bool {
		if len(fa.withdrawals) == withdrawalCount {
			return false
		}
		withdrawalCount = len(fa.withdrawals)

		if withdrawalCount == 1 {
			close(walletAQueuedUp)
		} else if withdrawalCount == 3 {
			close(queueReady)
			return true
		}
		return false
	})

	amtA1 := abi.NewTokenAmount(1)
	go func() {
		// Wallet A: Withdraw 1
		sentinelA1, err := s.fm.Withdraw(s.ctx, walletAddrA, s.acctAddr, amtA1)
		results <- &withdrawResult{
			ws:  sentinelA1,
			err: err,
		}
	}()

	amtB1 := abi.NewTokenAmount(2)
	amtB2 := abi.NewTokenAmount(3)
	go func() {
		// Wait until withdraw for wallet A is queued up
		<-walletAQueuedUp

		// Wallet B: Withdraw 2
		go func() {
			sentinelB1, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB1)
			results <- &withdrawResult{
				ws:  sentinelB1,
				err: err,
			}
		}()

		// Wallet B: Withdraw 3
		sentinelB2, err := s.fm.Withdraw(s.ctx, walletAddrB, s.acctAddr, amtB2)
		results <- &withdrawResult{
			ws:  sentinelB2,
			err: err,
		}
	}()

	// Withdrawals are queued up
	<-queueReady

	// Should withdraw from wallet A first
	resA1 := <-results
	sentinelA1 := resA1.ws
	msg := s.mockApi.getSentMessage(sentinelA1)
	checkWithdrawMessageFields(t, msg, walletAddrA, s.acctAddr, amtA1)

	// Complete wallet A message
	s.mockApi.completeMsg(sentinelA1)

	resB1 := <-results
	resB2 := <-results
	require.NoError(t, resB1.err)
	require.NoError(t, resB2.err)
	sentinelB1 := resB1.ws
	sentinelB2 := resB2.ws

	// Should send different message for wallet B from wallet A
	require.NotEqual(t, sentinelA1, sentinelB1)
	// Should be single message combining amount 1 and 2
	require.Equal(t, sentinelB1, sentinelB2)
	msg = s.mockApi.getSentMessage(sentinelB1)
	checkWithdrawMessageFields(t, msg, walletAddrB, s.acctAddr, types.BigAdd(amtB1, amtB2))
}

// TestFundManagerRestart verifies that waiting for incomplete requests resumes
// on restart
func TestFundManagerRestart(t *testing.T) {
	s := setup(t)
	defer s.fm.Stop()

	acctAddr2 := tutils.NewActorAddr(t, "addr2")

	// Address 1: Reserve 10
	amt := abi.NewTokenAmount(10)
	sentinelAddr1, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)

	msg := s.mockApi.getSentMessage(sentinelAddr1)
	checkAddMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)

	// Address 2: Reserve 7
	amt2 := abi.NewTokenAmount(7)
	sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, s.walletAddr, acctAddr2, amt2)
	require.NoError(t, err)

	msg2 := s.mockApi.getSentMessage(sentinelAddr2Res7)
	checkAddMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2)

	// Complete "Address 1: Reserve 10"
	s.mockApi.completeMsg(sentinelAddr1)

	// Give the completed state a moment to be stored before restart
	time.Sleep(time.Millisecond * 10)

	// Restart
	mockApiAfter := s.mockApi
	fmAfter := newFundManager(mockApiAfter, s.ds)
	err = fmAfter.Start()
	require.NoError(t, err)

	amt3 := abi.NewTokenAmount(9)
	reserveSentinel := make(chan cid.Cid)
	go func() {
		// Address 2: Reserve 9
		sentinel3, err := fmAfter.Reserve(s.ctx, s.walletAddr, acctAddr2, amt3)
		require.NoError(t, err)
		reserveSentinel <- sentinel3
	}()

	// Expect no message to be sent, because still waiting for previous
	// message "Address 2: Reserve 7" to complete on-chain
	select {
	case <-reserveSentinel:
		require.Fail(t, "Expected no message to be sent")
	case <-time.After(10 * time.Millisecond):
	}

	// Complete "Address 2: Reserve 7"
	mockApiAfter.completeMsg(sentinelAddr2Res7)

	// Expect waiting message to now be sent
	sentinel3 := <-reserveSentinel
	msg3 := mockApiAfter.getSentMessage(sentinel3)
	checkAddMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3)
}

// TestFundManagerReleaseAfterPublish verifies that release is successful in
// the following scenario:
// 1. Deal A adds 5 to addr1:                     reserved  0 ->  5    available  0 ->  5
// 2. Deal B adds 7 to addr1:                     reserved  5 -> 12    available  5 -> 12
// 3. Deal B completes, reducing addr1 by 7:      reserved       12    available 12 ->  5
// 4. Deal A releases 5 from addr1:               reserved 12 ->  7    available        5
func TestFundManagerReleaseAfterPublish(t *testing.T) {
	s := setup(t)
	defer s.fm.Stop()

	// Deal A: Reserve 5
	// balance:  0 -> 5
	// reserved: 0 -> 5
	amt := abi.NewTokenAmount(5)
	sentinel, err := s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)
	s.mockApi.completeMsg(sentinel)

	// Deal B: Reserve 7
	// balance:  5 -> 12
	// reserved: 5 -> 12
	amt = abi.NewTokenAmount(7)
	sentinel, err = s.fm.Reserve(s.ctx, s.walletAddr, s.acctAddr, amt)
	require.NoError(t, err)
	s.mockApi.completeMsg(sentinel)

	// Deal B: Publish (removes Deal B amount from balance)
	// balance:  12 -> 5
	// reserved: 12
	amt = abi.NewTokenAmount(7)
	s.mockApi.publish(s.acctAddr, amt)

	// Deal A: Release 5
	// balance:  5
	// reserved: 12 -> 7
	amt = abi.NewTokenAmount(5)
	err = s.fm.Release(s.acctAddr, amt)
	require.NoError(t, err)

	// Deal B: Release 7
	// balance:  5
	// reserved: 12 -> 7
	amt = abi.NewTokenAmount(5)
	err = s.fm.Release(s.acctAddr, amt)
	require.NoError(t, err)
}

type scaffold struct {
	ctx        context.Context
	ds         *ds_sync.MutexDatastore
	wllt       *wallet.LocalWallet
	walletAddr address.Address
	acctAddr   address.Address
	mockApi    *mockFundManagerAPI
	fm         *FundManager
}

func setup(t *testing.T) *scaffold {
	ctx := context.Background()

	wllt, err := wallet.NewWallet(wallet.NewMemKeyStore())
	if err != nil {
		t.Fatal(err)
	}

	walletAddr, err := wllt.WalletNew(context.Background(), types.KTSecp256k1)
	if err != nil {
		t.Fatal(err)
	}

	acctAddr := tutils.NewActorAddr(t, "addr")

	mockApi := newMockFundManagerAPI(walletAddr)
	dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
	fm := newFundManager(mockApi, dstore)
	return &scaffold{
		ctx:        ctx,
		ds:         dstore,
		wllt:       wllt,
		walletAddr: walletAddr,
		acctAddr:   acctAddr,
		mockApi:    mockApi,
		fm:         fm,
	}
}

func checkAddMessageFields(t *testing.T, msg *types.Message, from address.Address, to address.Address, amt abi.TokenAmount) {
	require.Equal(t, from, msg.From)
	require.Equal(t, market.Address, msg.To)
	require.Equal(t, amt, msg.Value)

	var paramsTo address.Address
	err := paramsTo.UnmarshalCBOR(bytes.NewReader(msg.Params))
	require.NoError(t, err)
	require.Equal(t, to, paramsTo)
}

func checkWithdrawMessageFields(t *testing.T, msg *types.Message, from address.Address, addr address.Address, amt abi.TokenAmount) {
	require.Equal(t, from, msg.From)
	require.Equal(t, market.Address, msg.To)
	require.Equal(t, abi.NewTokenAmount(0), msg.Value)

	var params market.WithdrawBalanceParams
	err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
	require.NoError(t, err)
	require.Equal(t, addr, params.ProviderOrClientAddress)
	require.Equal(t, amt, params.Amount)
}

type sentMsg struct {
	msg   *types.SignedMessage
	ready chan struct{}
}

type mockFundManagerAPI struct {
	wallet address.Address

	lk            sync.Mutex
	escrow        map[address.Address]abi.TokenAmount
	sentMsgs      map[cid.Cid]*sentMsg
	completedMsgs map[cid.Cid]struct{}
	waitingFor    map[cid.Cid]chan struct{}
}

func newMockFundManagerAPI(wallet address.Address) *mockFundManagerAPI {
	return &mockFundManagerAPI{
		wallet:        wallet,
		escrow:        make(map[address.Address]abi.TokenAmount),
		sentMsgs:      make(map[cid.Cid]*sentMsg),
		completedMsgs: make(map[cid.Cid]struct{}),
		waitingFor:    make(map[cid.Cid]chan struct{}),
	}
}

func (mapi *mockFundManagerAPI) MpoolPushMessage(ctx context.Context, message *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
	mapi.lk.Lock()
	defer mapi.lk.Unlock()

	smsg := &types.SignedMessage{Message: *message}
	mapi.sentMsgs[smsg.Cid()] = &sentMsg{msg: smsg, ready: make(chan struct{})}

	return smsg, nil
}

func (mapi *mockFundManagerAPI) getSentMessage(c cid.Cid) *types.Message {
	mapi.lk.Lock()
	defer mapi.lk.Unlock()

	for i := 0; i < 1000; i++ {
		if pending, ok := mapi.sentMsgs[c]; ok {
			return &pending.msg.Message
		}
		time.Sleep(time.Millisecond)
	}
	panic("expected message to be sent")
}

func (mapi *mockFundManagerAPI) messageCount() int {
	mapi.lk.Lock()
	defer mapi.lk.Unlock()

	return len(mapi.sentMsgs)
}

func (mapi *mockFundManagerAPI) completeMsg(msgCid cid.Cid) {
	mapi.lk.Lock()

	pmsg, ok := mapi.sentMsgs[msgCid]
	if ok {
		if pmsg.msg.Message.Method == market.Methods.AddBalance {
			var escrowAcct address.Address
			err := escrowAcct.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params))
			if err != nil {
				panic(err)
			}

			escrow := mapi.getEscrow(escrowAcct)
			before := escrow
			escrow = types.BigAdd(escrow, pmsg.msg.Message.Value)
			mapi.escrow[escrowAcct] = escrow
			log.Debugf("%s:   escrow %d -> %d", escrowAcct, before, escrow)
		} else {
			var params market.WithdrawBalanceParams
			err := params.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params))
			if err != nil {
				panic(err)
			}
			escrowAcct := params.ProviderOrClientAddress

			escrow := mapi.getEscrow(escrowAcct)
			before := escrow
			escrow = types.BigSub(escrow, params.Amount)
			mapi.escrow[escrowAcct] = escrow
			log.Debugf("%s:   escrow %d -> %d", escrowAcct, before, escrow)
		}
	}

	mapi.completedMsgs[msgCid] = struct{}{}

	ready, ok := mapi.waitingFor[msgCid]

	mapi.lk.Unlock()

	if ok {
		close(ready)
	}
}

func (mapi *mockFundManagerAPI) StateMarketBalance(ctx context.Context, a address.Address, key types.TipSetKey) (api.MarketBalance, error) {
	mapi.lk.Lock()
	defer mapi.lk.Unlock()

	return api.MarketBalance{
		Locked: abi.NewTokenAmount(0),
		Escrow: mapi.getEscrow(a),
	}, nil
}

func (mapi *mockFundManagerAPI) getEscrow(a address.Address) abi.TokenAmount {
	escrow := mapi.escrow[a]
	if escrow.Nil() {
		return abi.NewTokenAmount(0)
	}
	return escrow
}

func (mapi *mockFundManagerAPI) publish(addr address.Address, amt abi.TokenAmount) {
	mapi.lk.Lock()
	defer mapi.lk.Unlock()

	escrow := mapi.escrow[addr]
	if escrow.Nil() {
		return
	}
	escrow = types.BigSub(escrow, amt)
	if escrow.LessThan(abi.NewTokenAmount(0)) {
		escrow = abi.NewTokenAmount(0)
	}
	mapi.escrow[addr] = escrow
}

func (mapi *mockFundManagerAPI) StateWaitMsg(ctx context.Context, c cid.Cid, confidence uint64) (*api.MsgLookup, error) {
	res := &api.MsgLookup{
		Message: c,
		Receipt: types.MessageReceipt{
			ExitCode: 0,
			Return:   nil,
		},
	}
	ready := make(chan struct{})

	mapi.lk.Lock()
	_, ok := mapi.completedMsgs[c]
	if !ok {
		mapi.waitingFor[c] = ready
	}
	mapi.lk.Unlock()

	if !ok {
		select {
		case <-ctx.Done():
		case <-ready:
		}
	}
	return res, nil
}