refactor: FundManager

This commit is contained in:
Dirk McCormick 2020-11-05 17:50:40 +01:00 committed by hannahhoward
parent 1323dbddfe
commit 4d3cd7dcb8
6 changed files with 1315 additions and 3 deletions

126
chain/market/cbor_gen.go Normal file
View File

@ -0,0 +1,126 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
package market
import (
"fmt"
"io"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
var _ = xerrors.Errorf
var lengthBufFundedAddressState = []byte{132}
func (t *FundedAddressState) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write(lengthBufFundedAddressState); err != nil {
return err
}
scratch := make([]byte, 9)
// t.Wallet (address.Address) (struct)
if err := t.Wallet.MarshalCBOR(w); err != nil {
return err
}
// t.Addr (address.Address) (struct)
if err := t.Addr.MarshalCBOR(w); err != nil {
return err
}
// t.AmtReserved (big.Int) (struct)
if err := t.AmtReserved.MarshalCBOR(w); err != nil {
return err
}
// t.MsgCid (cid.Cid) (struct)
if t.MsgCid == nil {
if _, err := w.Write(cbg.CborNull); err != nil {
return err
}
} else {
if err := cbg.WriteCidBuf(scratch, w, *t.MsgCid); err != nil {
return xerrors.Errorf("failed to write cid field t.MsgCid: %w", err)
}
}
return nil
}
func (t *FundedAddressState) UnmarshalCBOR(r io.Reader) error {
*t = FundedAddressState{}
br := cbg.GetPeeker(r)
scratch := make([]byte, 8)
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 4 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Wallet (address.Address) (struct)
{
if err := t.Wallet.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.Wallet: %w", err)
}
}
// t.Addr (address.Address) (struct)
{
if err := t.Addr.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.Addr: %w", err)
}
}
// t.AmtReserved (big.Int) (struct)
{
if err := t.AmtReserved.UnmarshalCBOR(br); err != nil {
return xerrors.Errorf("unmarshaling t.AmtReserved: %w", err)
}
}
// t.MsgCid (cid.Cid) (struct)
{
b, err := br.ReadByte()
if err != nil {
return err
}
if b != cbg.CborNull[0] {
if err := br.UnreadByte(); err != nil {
return err
}
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.MsgCid: %w", err)
}
t.MsgCid = &c
}
}
return nil
}

600
chain/market/fundmanager.go Normal file
View File

@ -0,0 +1,600 @@
package market
import (
"context"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/build"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"go.uber.org/fx"
)
var log = logging.Logger("market_adapter")
// API is the fx dependencies need to run a fund manager
type FundManagerAPI struct {
fx.In
full.StateAPI
full.MpoolAPI
}
// fundManagerAPI is the specific methods called by the FundManager
type fundManagerAPI interface {
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error)
}
// FundManager keeps track of funds in a set of addresses
type FundManager struct {
ctx context.Context
shutdown context.CancelFunc
api fundManagerAPI
wallet address.Address
str *Store
lk sync.Mutex
fundedAddrs map[address.Address]*fundedAddress
}
type waitSentinel cid.Cid
var waitSentinelUndef = waitSentinel(cid.Undef)
func NewFundManager(api fundManagerAPI, ds datastore.Batching, wallet address.Address) *FundManager {
ctx, cancel := context.WithCancel(context.Background())
return &FundManager{
ctx: ctx,
shutdown: cancel,
api: api,
wallet: wallet,
str: newStore(ds),
fundedAddrs: make(map[address.Address]*fundedAddress),
}
}
func (fm *FundManager) Stop() {
fm.shutdown()
}
func (fm *FundManager) Start() error {
fm.lk.Lock()
defer fm.lk.Unlock()
// TODO:
// To save memory:
// - in State() only load addresses with in-progress messages
// - load the others just-in-time from getFundedAddress
// - delete(fm.fundedAddrs, addr) when the queue has been processed
return fm.str.forEach(func(state *FundedAddressState) {
fa := newFundedAddress(fm, state.Addr)
fa.state = state
fm.fundedAddrs[fa.state.Addr] = fa
fa.start()
})
}
// Creates a fundedAddress if it doesn't already exist, and returns it
func (fm *FundManager) getFundedAddress(addr address.Address) *fundedAddress {
fm.lk.Lock()
defer fm.lk.Unlock()
fa, ok := fm.fundedAddrs[addr]
if !ok {
fa = newFundedAddress(fm, addr)
fm.fundedAddrs[addr] = fa
}
return fa
}
// Reserve adds amt to `reserved`. If there is not enough available funds for
// the address, submits a message on chain to top up available funds.
func (fm *FundManager) Reserve(ctx context.Context, addr address.Address, amt abi.TokenAmount) (waitSentinel, error) {
return fm.getFundedAddress(addr).reserve(ctx, amt)
}
// Subtract from `reserved`.
func (fm *FundManager) Release(ctx context.Context, addr address.Address, amt abi.TokenAmount) error {
return fm.getFundedAddress(addr).release(ctx, amt)
}
// Withdraw unreserved funds. Only succeeds if there are enough unreserved
// funds for the address.
func (fm *FundManager) Withdraw(ctx context.Context, addr address.Address, amt abi.TokenAmount) (waitSentinel, error) {
return fm.getFundedAddress(addr).withdraw(ctx, amt)
}
// Waits for a reserve or withdraw to complete.
func (fm *FundManager) Wait(ctx context.Context, sentinel waitSentinel) error {
_, err := fm.api.StateWaitMsg(ctx, cid.Cid(sentinel), build.MessageConfidence)
return err
}
// FundedAddressState keeps track of the state of an address with funds in the
// datastore
type FundedAddressState struct {
// Wallet is the wallet from which funds are added to the address
Wallet address.Address
Addr address.Address
// AmtReserved is the amount that must be kept in the address (cannot be
// withdrawn)
AmtReserved abi.TokenAmount
// MsgCid is the cid of an in-progress on-chain message
MsgCid *cid.Cid
}
// fundedAddress keeps track of the state and request queues for a
// particular address
type fundedAddress struct {
ctx context.Context
env *fundManagerEnvironment
str *Store
lk sync.Mutex
state *FundedAddressState
// Note: These request queues are ephemeral, they are not saved to store
reservations []*fundRequest
releases []*fundRequest
withdrawals []*fundRequest
// Used by the tests
onProcessStartListener func() bool
}
func newFundedAddress(fm *FundManager, addr address.Address) *fundedAddress {
return &fundedAddress{
ctx: fm.ctx,
env: &fundManagerEnvironment{api: fm.api},
str: fm.str,
state: &FundedAddressState{
Wallet: fm.wallet,
Addr: addr,
AmtReserved: abi.NewTokenAmount(0),
},
}
}
// If there is a in-progress on-chain message, don't submit any more messages
// on chain until it completes
func (a *fundedAddress) start() {
a.lk.Lock()
defer a.lk.Unlock()
if a.state.MsgCid != nil {
a.debugf("restart: wait for %s", a.state.MsgCid)
a.startWaitForResults(*a.state.MsgCid)
}
}
func (a *fundedAddress) reserve(ctx context.Context, amt abi.TokenAmount) (waitSentinel, error) {
return a.requestAndWait(ctx, amt, &a.reservations)
}
func (a *fundedAddress) release(ctx context.Context, amt abi.TokenAmount) error {
_, err := a.requestAndWait(ctx, amt, &a.releases)
return err
}
func (a *fundedAddress) withdraw(ctx context.Context, amt abi.TokenAmount) (waitSentinel, error) {
return a.requestAndWait(ctx, amt, &a.withdrawals)
}
func (a *fundedAddress) requestAndWait(ctx context.Context, amt abi.TokenAmount, reqs *[]*fundRequest) (waitSentinel, error) {
// Create a request and add it to the request queue
req := newFundRequest(ctx, amt)
a.lk.Lock()
*reqs = append(*reqs, req)
a.lk.Unlock()
// Process the queue
go a.process()
// Wait for the results
select {
case <-ctx.Done():
return waitSentinelUndef, ctx.Err()
case r := <-req.Result:
return waitSentinel(r.msgCid), r.err
}
}
// Used by the tests
func (a *fundedAddress) onProcessStart(fn func() bool) {
a.lk.Lock()
defer a.lk.Unlock()
a.onProcessStartListener = fn
}
// Process queued requests
func (a *fundedAddress) process() {
a.lk.Lock()
defer a.lk.Unlock()
// Used by the tests
if a.onProcessStartListener != nil {
done := a.onProcessStartListener()
if done {
a.onProcessStartListener = nil
}
}
// Check if we're still waiting for the response to a message
if a.state.MsgCid != nil {
return
}
// Check if there's anything to do
if len(a.reservations) == 0 && len(a.releases) == 0 && len(a.withdrawals) == 0 {
return
}
res, _ := a.processRequests()
a.reservations = filterOutProcessedReqs(a.reservations)
a.releases = filterOutProcessedReqs(a.releases)
a.withdrawals = filterOutProcessedReqs(a.withdrawals)
a.applyStateChange(res)
}
// Filter out completed requests
func filterOutProcessedReqs(reqs []*fundRequest) []*fundRequest {
filtered := make([]*fundRequest, 0, len(reqs))
for _, req := range reqs {
if !req.Completed() {
filtered = append(filtered, req)
}
}
return filtered
}
// Apply the results of processing queues and save to the datastore
func (a *fundedAddress) applyStateChange(res *processResult) {
a.state.MsgCid = res.msgCid
a.state.AmtReserved = res.amtReserved
a.saveState()
}
// Clear the pending message cid so that a new message can be sent
func (a *fundedAddress) clearWaitState() {
a.state.MsgCid = nil
a.saveState()
}
// Save state to datastore
func (a *fundedAddress) saveState() {
// Not much we can do if saving to the datastore fails, just log
err := a.str.save(a.state)
if err != nil {
log.Errorf("saving state to store for addr %s: %w", a.state.Addr, err)
}
}
// The result of processing the request queues
type processResult struct {
// The new reserved amount
amtReserved abi.TokenAmount
// The message cid, if a message was pushed
msgCid *cid.Cid
}
// process request queues and return the resulting changes to state
func (a *fundedAddress) processRequests() (pr *processResult, prerr error) {
// If there's an error, mark reserve requests as errored
defer func() {
if prerr != nil {
for _, req := range a.reservations {
req.Complete(cid.Undef, prerr)
}
}
}()
// Start with the reserved amount in state
reserved := a.state.AmtReserved
// Add the amount of each reserve request
for _, req := range a.reservations {
amt := req.Amount()
a.debugf("reserve %d", amt)
reserved = types.BigAdd(reserved, amt)
}
// Subtract the amount of each release request
for _, req := range a.releases {
amt := req.Amount()
a.debugf("release %d", amt)
reserved = types.BigSub(reserved, amt)
// Mark release as complete
req.Complete(cid.Undef, nil)
}
// If reserved amount is negative, set it to zero
if reserved.LessThan(abi.NewTokenAmount(0)) {
reserved = abi.NewTokenAmount(0)
}
res := &processResult{amtReserved: reserved}
// Work out the amount to add to the balance
toAdd := abi.NewTokenAmount(0)
// If the new reserved amount is greater than the existing amount
if reserved.GreaterThan(a.state.AmtReserved) {
a.debugf("reserved %d > state.AmtReserved %d", reserved, a.state.AmtReserved)
// Get available funds for address
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
if err != nil {
return res, err
}
// amount to add = new reserved amount - available
toAdd = types.BigSub(reserved, avail)
a.debugf("reserved %d - avail %d = %d", reserved, avail, toAdd)
}
// If there's nothing to add to the balance
if toAdd.LessThanEqual(abi.NewTokenAmount(0)) {
// Mark reserve requests as complete
for _, req := range a.reservations {
req.Complete(cid.Undef, nil)
}
// Process withdrawals
return a.processWithdrawals(reserved)
}
// Add funds to address
a.debugf("add funds %d", toAdd)
addFundsCid, err := a.env.AddFunds(a.ctx, a.state.Wallet, a.state.Addr, toAdd)
if err != nil {
return res, err
}
// Mark reserve requests as complete
for _, req := range a.reservations {
req.Complete(addFundsCid, nil)
}
// Start waiting for results (async)
defer a.startWaitForResults(addFundsCid)
// Save the message CID to state
res.msgCid = &addFundsCid
return res, nil
}
// process withdrawal queue
func (a *fundedAddress) processWithdrawals(reserved abi.TokenAmount) (pr *processResult, prerr error) {
// If there's an error, mark withdrawal requests as errored
defer func() {
if prerr != nil {
for _, req := range a.withdrawals {
req.Complete(cid.Undef, prerr)
}
}
}()
res := &processResult{
amtReserved: reserved,
}
// Get the net available balance
avail, err := a.env.AvailableFunds(a.ctx, a.state.Addr)
if err != nil {
return res, err
}
netAvail := types.BigSub(avail, reserved)
// Fit as many withdrawals as possible into the available balance, and fail
// the rest
withdrawalAmt := abi.NewTokenAmount(0)
allowedAmt := abi.NewTokenAmount(0)
allowed := make([]*fundRequest, 0, len(a.withdrawals))
for _, req := range a.withdrawals {
amt := req.Amount()
withdrawalAmt = types.BigAdd(withdrawalAmt, amt)
if withdrawalAmt.LessThanEqual(netAvail) {
a.debugf("withdraw %d", amt)
allowed = append(allowed, req)
allowedAmt = types.BigAdd(allowedAmt, amt)
} else {
err := xerrors.Errorf("insufficient funds for withdrawal %d", amt)
a.debugf("%s", err)
req.Complete(cid.Undef, err)
}
}
// Check if there is anything to withdraw
if allowedAmt.Equals(abi.NewTokenAmount(0)) {
// Mark allowed requests as complete
for _, req := range allowed {
req.Complete(cid.Undef, nil)
}
return res, nil
}
// Withdraw funds
a.debugf("withdraw funds %d", allowedAmt)
withdrawFundsCid, err := a.env.WithdrawFunds(a.ctx, a.state.Wallet, a.state.Addr, allowedAmt)
if err != nil {
return res, err
}
// Mark allowed requests as complete
for _, req := range allowed {
req.Complete(withdrawFundsCid, nil)
}
// Start waiting for results of message (async)
defer a.startWaitForResults(withdrawFundsCid)
// Save the message CID to state
res.msgCid = &withdrawFundsCid
return res, nil
}
// asynchonously wait for results of message
func (a *fundedAddress) startWaitForResults(msgCid cid.Cid) {
go func() {
err := a.env.WaitMsg(a.ctx, msgCid)
if err != nil {
// We don't really care about the results here, we're just waiting
// so as to only process one on-chain message at a time
log.Errorf("waiting for results of message %s for addr %s: %w", msgCid, a.state.Addr, err)
}
a.lk.Lock()
a.debugf("complete wait")
a.clearWaitState()
a.lk.Unlock()
a.process()
}()
}
func (a *fundedAddress) debugf(args ...interface{}) {
fmtStr := args[0].(string)
args = args[1:]
log.Debugf(a.state.Addr.String()+": "+fmtStr, args...)
}
// The result of a fund request
type reqResult struct {
msgCid cid.Cid
err error
}
// A request to change funds
type fundRequest struct {
ctx context.Context
amt abi.TokenAmount
completed chan struct{}
Result chan reqResult
}
func newFundRequest(ctx context.Context, amt abi.TokenAmount) *fundRequest {
return &fundRequest{
ctx: ctx,
amt: amt,
Result: make(chan reqResult),
completed: make(chan struct{}),
}
}
// Amount returns zero if the context has expired
func (frp *fundRequest) Amount() abi.TokenAmount {
if frp.ctx.Err() != nil {
return abi.NewTokenAmount(0)
}
return frp.amt
}
// Complete is called with the message CID when the funds request has been
// started or with the error if there was an error
func (frp *fundRequest) Complete(msgCid cid.Cid, err error) {
select {
case <-frp.completed:
case <-frp.ctx.Done():
case frp.Result <- reqResult{msgCid: msgCid, err: err}:
}
close(frp.completed)
}
// Completed indicates if Complete has already been called
func (frp *fundRequest) Completed() bool {
select {
case <-frp.completed:
return true
default:
return false
}
}
func (frp *fundRequest) Equals(other *fundRequest) bool {
return frp == other
}
// fundManagerEnvironment simplifies some API calls
type fundManagerEnvironment struct {
api fundManagerAPI
}
func (env *fundManagerEnvironment) AvailableFunds(ctx context.Context, addr address.Address) (abi.TokenAmount, error) {
bal, err := env.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
if err != nil {
return abi.NewTokenAmount(0), err
}
return types.BigSub(bal.Escrow, bal.Locked), nil
}
func (env *fundManagerEnvironment) AddFunds(
ctx context.Context,
wallet address.Address,
addr address.Address,
amt abi.TokenAmount,
) (cid.Cid, error) {
return env.sendFunds(ctx, wallet, addr, amt)
}
func (env *fundManagerEnvironment) WithdrawFunds(
ctx context.Context,
wallet address.Address,
addr address.Address,
amt abi.TokenAmount,
) (cid.Cid, error) {
return env.sendFunds(ctx, addr, wallet, amt)
}
func (env *fundManagerEnvironment) sendFunds(
ctx context.Context,
from address.Address,
to address.Address,
amt abi.TokenAmount,
) (cid.Cid, error) {
params, err := actors.SerializeParams(&to)
if err != nil {
return cid.Undef, err
}
smsg, aerr := env.api.MpoolPushMessage(ctx, &types.Message{
To: market.Address,
From: from,
Value: amt,
Method: market.Methods.AddBalance,
Params: params,
}, nil)
if aerr != nil {
return cid.Undef, aerr
}
return smsg.Cid(), nil
}
func (env *fundManagerEnvironment) WaitMsg(ctx context.Context, c cid.Cid) error {
_, err := env.api.StateWaitMsg(ctx, c, build.MessageConfidence)
return err
}

View File

@ -0,0 +1,506 @@
package market
import (
"bytes"
"context"
"sync"
"testing"
"time"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/go-state-types/abi"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/filecoin-project/lotus/chain/wallet"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
// TestFundManagerBasic verifies that the basic fund manager operations work
func TestFundManagerBasic(t *testing.T) {
s := setup(t)
defer s.fm.Stop()
// Reserve 10
// balance: 0 -> 10
// reserved: 0 -> 10
amt := abi.NewTokenAmount(10)
sentinel, err := s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
msg := s.mockApi.getSentMessage(cid.Cid(sentinel))
checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel))
err = s.fm.Wait(s.ctx, sentinel)
require.NoError(t, err)
// Reserve 7
// balance: 10 -> 17
// reserved: 10 -> 17
amt = abi.NewTokenAmount(7)
sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
msg = s.mockApi.getSentMessage(cid.Cid(sentinel))
checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel))
err = s.fm.Wait(s.ctx, sentinel)
require.NoError(t, err)
// Release 5
// balance: 17
// reserved: 17 -> 12
amt = abi.NewTokenAmount(5)
err = s.fm.Release(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
// Withdraw 2
// balance: 17 -> 15
// reserved: 12
amt = abi.NewTokenAmount(2)
sentinel, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
msg = s.mockApi.getSentMessage(cid.Cid(sentinel))
checkMessageFields(t, msg, s.acctAddr, s.walletAddr, amt)
s.mockApi.completeMsg(cid.Cid(sentinel))
err = s.fm.Wait(s.ctx, sentinel)
require.NoError(t, err)
// Reserve 3
// balance: 15
// reserved: 12 -> 15
// Note: reserved (15) is <= balance (15) so should not send on-chain
// message
msgCount := s.mockApi.messageCount()
amt = abi.NewTokenAmount(3)
sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
require.Equal(t, msgCount, s.mockApi.messageCount())
require.Equal(t, sentinel, waitSentinelUndef)
// Reserve 1
// balance: 15 -> 16
// reserved: 15 -> 16
// Note: reserved (16) is above balance (15) so *should* send on-chain
// message to top up balance
amt = abi.NewTokenAmount(1)
topUp := abi.NewTokenAmount(1)
sentinel, err = s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinel))
msg = s.mockApi.getSentMessage(cid.Cid(sentinel))
checkMessageFields(t, msg, s.walletAddr, s.acctAddr, topUp)
// Withdraw 1
// balance: 16
// reserved: 16
// Note: Expect failure because there is no available balance to withdraw:
// balance - reserved = 16 - 16 = 0
amt = abi.NewTokenAmount(1)
sentinel, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt)
require.Error(t, err)
}
// TestFundManagerParallel verifies that operations can be run in parallel
func TestFundManagerParallel(t *testing.T) {
s := setup(t)
defer s.fm.Stop()
// Reserve 10
amt := abi.NewTokenAmount(10)
sentinelReserve10, err := s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
// Wait until all the subsequent requests are queued up
queueReady := make(chan struct{})
fa := s.fm.getFundedAddress(s.acctAddr)
fa.onProcessStart(func() bool {
if len(fa.withdrawals) == 1 && len(fa.reservations) == 2 && len(fa.releases) == 1 {
close(queueReady)
return true
}
return false
})
// Withdraw 5 (should not run until after reserves / releases)
withdrawReady := make(chan error)
go func() {
amt = abi.NewTokenAmount(5)
_, err := s.fm.Withdraw(s.ctx, s.acctAddr, amt)
withdrawReady <- err
}()
reserveSentinels := make(chan waitSentinel)
// Reserve 3
go func() {
amt := abi.NewTokenAmount(3)
sentinelReserve3, err := s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
reserveSentinels <- sentinelReserve3
}()
// Reserve 5
go func() {
amt := abi.NewTokenAmount(5)
sentinelReserve5, err := s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
reserveSentinels <- sentinelReserve5
}()
// Release 2
go func() {
amt := abi.NewTokenAmount(2)
err = s.fm.Release(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
}()
// Everything is queued up
<-queueReady
// Complete the "Reserve 10" message
s.mockApi.completeMsg(cid.Cid(sentinelReserve10))
msg := s.mockApi.getSentMessage(cid.Cid(sentinelReserve10))
checkMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(10))
// The other requests should now be combined and be submitted on-chain as
// a single message
rs1 := <-reserveSentinels
rs2 := <-reserveSentinels
require.Equal(t, rs1, rs2)
// Withdraw should not have been called yet, because reserve / release
// requests run first
select {
case <-withdrawReady:
require.Fail(t, "Withdraw should run after reserve / release")
default:
}
// Complete the message
s.mockApi.completeMsg(cid.Cid(rs1))
msg = s.mockApi.getSentMessage(cid.Cid(rs1))
// "Reserve 3" +3
// "Reserve 5" +5
// "Release 2" -2
// Result: 6
checkMessageFields(t, msg, s.walletAddr, s.acctAddr, abi.NewTokenAmount(6))
// Expect withdraw to fail because not enough available funds
err = <-withdrawReady
require.Error(t, err)
}
// TestFundManagerWithdrawal verifies that as many withdraw operations as
// possible are processed
func TestFundManagerWithdrawal(t *testing.T) {
s := setup(t)
defer s.fm.Stop()
// Reserve 10
amt := abi.NewTokenAmount(10)
sentinelReserve10, err := s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
// Complete the "Reserve 10" message
s.mockApi.completeMsg(cid.Cid(sentinelReserve10))
// Release 10
err = s.fm.Release(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
// Available 10
// Withdraw 6
// Expect success
amt = abi.NewTokenAmount(6)
sentinelWithdraw, err := s.fm.Withdraw(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinelWithdraw))
err = s.fm.Wait(s.ctx, sentinelWithdraw)
require.NoError(t, err)
// Available 4
// Withdraw 4
// Expect success
amt = abi.NewTokenAmount(4)
sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
s.mockApi.completeMsg(cid.Cid(sentinelWithdraw))
err = s.fm.Wait(s.ctx, sentinelWithdraw)
require.NoError(t, err)
// Available 0
// Withdraw 1
// Expect FAIL
amt = abi.NewTokenAmount(1)
sentinelWithdraw, err = s.fm.Withdraw(s.ctx, s.acctAddr, amt)
require.Error(t, err)
}
// TestFundManagerRestart verifies that waiting for incomplete requests resumes
// on restart
func TestFundManagerRestart(t *testing.T) {
s := setup(t)
defer s.fm.Stop()
acctAddr2 := tutils.NewActorAddr(t, "addr2")
// Address 1: Reserve 10
amt := abi.NewTokenAmount(10)
sentinelAddr1, err := s.fm.Reserve(s.ctx, s.acctAddr, amt)
require.NoError(t, err)
msg := s.mockApi.getSentMessage(cid.Cid(sentinelAddr1))
checkMessageFields(t, msg, s.walletAddr, s.acctAddr, amt)
// Address 2: Reserve 7
amt2 := abi.NewTokenAmount(7)
sentinelAddr2Res7, err := s.fm.Reserve(s.ctx, acctAddr2, amt2)
require.NoError(t, err)
msg2 := s.mockApi.getSentMessage(cid.Cid(sentinelAddr2Res7))
checkMessageFields(t, msg2, s.walletAddr, acctAddr2, amt2)
// Complete "Address 1: Reserve 10"
s.mockApi.completeMsg(cid.Cid(sentinelAddr1))
err = s.fm.Wait(s.ctx, sentinelAddr1)
require.NoError(t, err)
// Give the completed state a moment to be stored before restart
time.Sleep(time.Millisecond * 10)
// Restart
mockApiAfter := s.mockApi
fmAfter := NewFundManager(mockApiAfter, s.ds, s.walletAddr)
fmAfter.Start()
amt3 := abi.NewTokenAmount(9)
reserveSentinel := make(chan waitSentinel)
go func() {
// Address 2: Reserve 9
sentinel3, err := fmAfter.Reserve(s.ctx, acctAddr2, amt3)
require.NoError(t, err)
reserveSentinel <- sentinel3
}()
// Expect no message to be sent, because still waiting for previous
// message "Address 2: Reserve 7" to complete on-chain
select {
case <-reserveSentinel:
require.Fail(t, "Expected no message to be sent")
case <-time.After(10 * time.Millisecond):
}
// Complete "Address 2: Reserve 7"
mockApiAfter.completeMsg(cid.Cid(sentinelAddr2Res7))
err = fmAfter.Wait(s.ctx, sentinelAddr2Res7)
require.NoError(t, err)
// Expect waiting message to now be sent
sentinel3 := <-reserveSentinel
msg3 := mockApiAfter.getSentMessage(cid.Cid(sentinel3))
checkMessageFields(t, msg3, s.walletAddr, acctAddr2, amt3)
}
type scaffold struct {
ctx context.Context
ds *ds_sync.MutexDatastore
walletAddr address.Address
acctAddr address.Address
mockApi *mockFundManagerAPI
fm *FundManager
}
func setup(t *testing.T) *scaffold {
ctx := context.Background()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
walletAddr, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
acctAddr := tutils.NewActorAddr(t, "addr")
mockApi := newMockFundManagerAPI(walletAddr)
ds := ds_sync.MutexWrap(ds.NewMapDatastore())
fm := NewFundManager(mockApi, ds, walletAddr)
return &scaffold{
ctx: ctx,
ds: ds,
walletAddr: walletAddr,
acctAddr: acctAddr,
mockApi: mockApi,
fm: fm,
}
}
func checkMessageFields(t *testing.T, msg *types.Message, from address.Address, to address.Address, amt abi.TokenAmount) {
require.Equal(t, from, msg.From)
require.Equal(t, market.Address, msg.To)
require.Equal(t, amt, msg.Value)
var paramsTo address.Address
err := paramsTo.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Equal(t, to, paramsTo)
}
type sentMsg struct {
msg *types.SignedMessage
ready chan struct{}
}
type mockFundManagerAPI struct {
wallet address.Address
lk sync.Mutex
escrow map[address.Address]abi.TokenAmount
sentMsgs map[cid.Cid]*sentMsg
completedMsgs map[cid.Cid]struct{}
waitingFor map[cid.Cid]chan struct{}
}
func newMockFundManagerAPI(wallet address.Address) *mockFundManagerAPI {
return &mockFundManagerAPI{
wallet: wallet,
escrow: make(map[address.Address]abi.TokenAmount),
sentMsgs: make(map[cid.Cid]*sentMsg),
completedMsgs: make(map[cid.Cid]struct{}),
waitingFor: make(map[cid.Cid]chan struct{}),
}
}
func (mapi *mockFundManagerAPI) MpoolPushMessage(ctx context.Context, message *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
mapi.lk.Lock()
defer mapi.lk.Unlock()
smsg := &types.SignedMessage{Message: *message}
mapi.sentMsgs[smsg.Cid()] = &sentMsg{msg: smsg, ready: make(chan struct{})}
return smsg, nil
}
func (mapi *mockFundManagerAPI) getSentMessage(c cid.Cid) *types.Message {
mapi.lk.Lock()
defer mapi.lk.Unlock()
for i := 0; i < 1000; i++ {
if pending, ok := mapi.sentMsgs[c]; ok {
return &pending.msg.Message
}
time.Sleep(time.Millisecond)
}
panic("expected message to be sent")
}
func (mapi *mockFundManagerAPI) messageCount() int {
mapi.lk.Lock()
defer mapi.lk.Unlock()
return len(mapi.sentMsgs)
}
func (mapi *mockFundManagerAPI) completeMsg(msgCid cid.Cid) {
mapi.lk.Lock()
pmsg, ok := mapi.sentMsgs[msgCid]
if ok {
if pmsg.msg.Message.From == mapi.wallet {
var escrowAcct address.Address
err := escrowAcct.UnmarshalCBOR(bytes.NewReader(pmsg.msg.Message.Params))
if err != nil {
panic(err)
}
escrow := mapi.getEscrow(escrowAcct)
before := escrow
escrow = types.BigAdd(escrow, pmsg.msg.Message.Value)
mapi.escrow[escrowAcct] = escrow
log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow)
} else {
escrowAcct := pmsg.msg.Message.From
escrow := mapi.getEscrow(escrowAcct)
before := escrow
escrow = types.BigSub(escrow, pmsg.msg.Message.Value)
mapi.escrow[escrowAcct] = escrow
log.Debugf("%s: escrow %d -> %d", escrowAcct, before, escrow)
}
}
mapi.completedMsgs[msgCid] = struct{}{}
ready, ok := mapi.waitingFor[msgCid]
mapi.lk.Unlock()
if ok {
close(ready)
}
}
func (mapi *mockFundManagerAPI) StateMarketBalance(ctx context.Context, a address.Address, key types.TipSetKey) (api.MarketBalance, error) {
mapi.lk.Lock()
defer mapi.lk.Unlock()
return api.MarketBalance{
Locked: abi.NewTokenAmount(0),
Escrow: mapi.getEscrow(a),
}, nil
}
func (mapi *mockFundManagerAPI) getEscrow(a address.Address) abi.TokenAmount {
escrow := mapi.escrow[a]
if escrow.Nil() {
return abi.NewTokenAmount(0)
}
return escrow
}
func (mapi *mockFundManagerAPI) StateWaitMsg(ctx context.Context, c cid.Cid, confidence uint64) (*api.MsgLookup, error) {
res := &api.MsgLookup{
Message: c,
Receipt: types.MessageReceipt{
ExitCode: 0,
Return: nil,
},
}
ready := make(chan struct{})
mapi.lk.Lock()
_, ok := mapi.completedMsgs[c]
if !ok {
mapi.waitingFor[c] = ready
}
mapi.lk.Unlock()
if !ok {
select {
case <-ctx.Done():
case <-ready:
}
}
return res, nil
}

View File

@ -8,7 +8,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/api"
@ -20,8 +19,6 @@ import (
"github.com/filecoin-project/lotus/node/impl/full"
)
var log = logging.Logger("market_adapter")
// API is the dependencies need to run a fund manager
type API struct {
fx.In

73
chain/market/store.go Normal file
View File

@ -0,0 +1,73 @@
package market
import (
"bytes"
cborrpc "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dsq "github.com/ipfs/go-datastore/query"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
const dsKeyAddr = "Addr"
type Store struct {
ds datastore.Batching
}
func newStore(ds dtypes.MetadataDS) *Store {
ds = namespace.Wrap(ds, datastore.NewKey("/fundmgr/"))
return &Store{
ds: ds,
}
}
// save the state to the datastore
func (ps *Store) save(state *FundedAddressState) error {
k := dskeyForAddr(state.Addr)
b, err := cborrpc.Dump(state)
if err != nil {
return err
}
return ps.ds.Put(k, b)
}
// forEach calls iter with each address in the datastore
func (ps *Store) forEach(iter func(*FundedAddressState)) error {
res, err := ps.ds.Query(dsq.Query{Prefix: dsKeyAddr})
if err != nil {
return err
}
defer res.Close() //nolint:errcheck
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return err
}
var stored FundedAddressState
if err := stored.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil {
return err
}
iter(&stored)
}
return nil
}
// The datastore key used to identify the address state
func dskeyForAddr(addr address.Address) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyAddr, addr.String()})
}

View File

@ -4,6 +4,8 @@ import (
"fmt"
"os"
"github.com/filecoin-project/lotus/chain/market"
gen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/api"
@ -67,6 +69,14 @@ func main() {
os.Exit(1)
}
err = gen.WriteTupleEncodersToFile("./chain/market/cbor_gen.go", "market",
market.FundedAddressState{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
err = gen.WriteTupleEncodersToFile("./chain/exchange/cbor_gen.go", "exchange",
exchange.Request{},
exchange.Response{},