refactor: integrate new FundManager

This commit is contained in:
Dirk McCormick 2020-11-10 16:45:48 +01:00 committed by hannahhoward
parent e53b0ef068
commit 0d243bb824
12 changed files with 70 additions and 425 deletions

View File

@ -514,8 +514,10 @@ type FullNode interface {
// along with the address removal. // along with the address removal.
MsigRemoveSigner(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error) MsigRemoveSigner(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error)
MarketEnsureAvailable(context.Context, address.Address, address.Address, types.BigInt) (cid.Cid, error) // MarketReserveFunds reserves funds for a deal
// MarketFreeBalance MarketReserveFunds(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error)
// MarketReleaseFunds releases funds reserved by MarketReserveFunds
MarketReleaseFunds(ctx context.Context, addr address.Address, amt types.BigInt) error
// MethodGroup: Paych // MethodGroup: Paych
// The Paych methods are for interacting with and managing payment channels // The Paych methods are for interacting with and managing payment channels

View File

@ -241,7 +241,8 @@ type FullNodeStruct struct {
MsigSwapCancel func(context.Context, address.Address, address.Address, uint64, address.Address, address.Address) (cid.Cid, error) `perm:"sign"` MsigSwapCancel func(context.Context, address.Address, address.Address, uint64, address.Address, address.Address) (cid.Cid, error) `perm:"sign"`
MsigRemoveSigner func(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error) `perm:"sign"` MsigRemoveSigner func(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error) `perm:"sign"`
MarketEnsureAvailable func(context.Context, address.Address, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"` MarketReserveFunds func(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error) `perm:"sign"`
MarketReleaseFunds func(ctx context.Context, addr address.Address, amt types.BigInt) error `perm:"sign"`
PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"` PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"` PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"`
@ -1117,8 +1118,12 @@ func (c *FullNodeStruct) MsigRemoveSigner(ctx context.Context, msig address.Addr
return c.Internal.MsigRemoveSigner(ctx, msig, proposer, toRemove, decrease) return c.Internal.MsigRemoveSigner(ctx, msig, proposer, toRemove, decrease)
} }
func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) { func (c *FullNodeStruct) MarketReserveFunds(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error) {
return c.Internal.MarketEnsureAvailable(ctx, addr, wallet, amt) return c.Internal.MarketReserveFunds(ctx, wallet, addr, amt)
}
func (c *FullNodeStruct) MarketReleaseFunds(ctx context.Context, addr address.Address, amt types.BigInt) error {
return c.Internal.MarketReleaseFunds(ctx, addr, amt)
} }
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {

View File

@ -4,24 +4,19 @@ import (
"context" "context"
"sync" "sync"
"golang.org/x/xerrors" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/node/impl/full"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors"
) )
var log = logging.Logger("market_adapter") var log = logging.Logger("market_adapter")
@ -35,6 +30,7 @@ type FundManagerAPI struct {
} }
// fundManagerAPI is the specific methods called by the FundManager // fundManagerAPI is the specific methods called by the FundManager
// (used by the tests)
type fundManagerAPI interface { type fundManagerAPI interface {
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
@ -52,7 +48,22 @@ type FundManager struct {
fundedAddrs map[address.Address]*fundedAddress fundedAddrs map[address.Address]*fundedAddress
} }
func NewFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager { func NewFundManager(lc fx.Lifecycle, api FundManagerAPI, ds datastore.Batching) *FundManager {
fm := newFundManager(&api, ds)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return fm.Start()
},
OnStop: func(ctx context.Context) error {
fm.Stop()
return nil
},
})
return fm
}
// newFundManager is used by the tests
func newFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &FundManager{ return &FundManager{
ctx: ctx, ctx: ctx,

View File

@ -7,23 +7,17 @@ import (
"testing" "testing"
"time" "time"
"github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing" "github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync" ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
) )
// TestFundManagerBasic verifies that the basic fund manager operations work // TestFundManagerBasic verifies that the basic fund manager operations work
@ -528,7 +522,7 @@ func TestFundManagerRestart(t *testing.T) {
// Restart // Restart
mockApiAfter := s.mockApi mockApiAfter := s.mockApi
fmAfter := NewFundManager(mockApiAfter, s.ds) fmAfter := newFundManager(mockApiAfter, s.ds)
err = fmAfter.Start() err = fmAfter.Start()
require.NoError(t, err) require.NoError(t, err)
@ -585,7 +579,7 @@ func setup(t *testing.T) *scaffold {
mockApi := newMockFundManagerAPI(walletAddr) mockApi := newMockFundManagerAPI(walletAddr)
dstore := ds_sync.MutexWrap(ds.NewMapDatastore()) dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
fm := NewFundManager(mockApi, dstore) fm := newFundManager(mockApi, dstore)
return &scaffold{ return &scaffold{
ctx: ctx, ctx: ctx,
ds: dstore, ds: dstore,

View File

@ -1,163 +0,0 @@
package market
import (
"context"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
)
// API is the dependencies need to run a fund manager
type API struct {
fx.In
full.ChainAPI
full.StateAPI
full.MpoolAPI
}
// FundMgr monitors available balances and adds funds when EnsureAvailable is called
type FundMgr struct {
api fundMgrAPI
lk sync.RWMutex
available map[address.Address]types.BigInt
}
// StartFundManager creates a new fund manager and sets up event hooks to manage state changes
func StartFundManager(lc fx.Lifecycle, api API) *FundMgr {
fm := newFundMgr(&api)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ev := events.NewEvents(ctx, &api)
preds := state.NewStatePredicates(&api)
dealDiffFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(fm.getAddresses)))
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
return dealDiffFn(ctx, oldTs.Key(), newTs.Key())
}
return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, 0, events.NoTimeout, match)
},
})
return fm
}
type fundMgrAPI interface {
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
}
func newFundMgr(api fundMgrAPI) *FundMgr {
return &FundMgr{
api: api,
available: map[address.Address]types.BigInt{},
}
}
// checkFunc tells the events api to simply proceed (we always want to watch)
func (fm *FundMgr) checkFunc(ts *types.TipSet) (done bool, more bool, err error) {
return false, true, nil
}
// revert handles reverts to balances
func (fm *FundMgr) revert(ctx context.Context, ts *types.TipSet) error {
// TODO: Is it ok to just ignore this?
log.Warn("balance change reverted; TODO: actually handle this!")
return nil
}
// stateChanged handles balance changes monitored on the chain from one tipset to the next
func (fm *FundMgr) stateChanged(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
changedBalances, ok := states.(state.ChangedBalances)
if !ok {
panic("Expected state.ChangedBalances")
}
// overwrite our in memory cache with new values from chain (chain is canonical)
fm.lk.Lock()
for addr, balanceChange := range changedBalances {
if fm.available[addr].Int != nil {
log.Infof("State balance change recorded, prev: %s, new: %s", fm.available[addr].String(), balanceChange.To.String())
}
fm.available[addr] = balanceChange.To
}
fm.lk.Unlock()
return true, nil
}
func (fm *FundMgr) getAddresses() []address.Address {
fm.lk.RLock()
defer fm.lk.RUnlock()
addrs := make([]address.Address, 0, len(fm.available))
for addr := range fm.available {
addrs = append(addrs, addr)
}
return addrs
}
// EnsureAvailable looks at the available balance in escrow for a given
// address, and if less than the passed in amount, adds the difference
func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) {
idAddr, err := fm.api.StateLookupID(ctx, addr, types.EmptyTSK)
if err != nil {
return cid.Undef, err
}
fm.lk.Lock()
defer fm.lk.Unlock()
bal, err := fm.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
if err != nil {
return cid.Undef, err
}
stateAvail := types.BigSub(bal.Escrow, bal.Locked)
avail, ok := fm.available[idAddr]
if !ok {
avail = stateAvail
}
toAdd := types.BigSub(amt, avail)
if toAdd.LessThan(types.NewInt(0)) {
toAdd = types.NewInt(0)
}
fm.available[idAddr] = big.Add(avail, toAdd)
log.Infof("Funds operation w/ Expected Balance: %s, In State: %s, Requested: %s, Adding: %s", avail.String(), stateAvail.String(), amt.String(), toAdd.String())
if toAdd.LessThanEqual(big.Zero()) {
return cid.Undef, nil
}
params, err := actors.SerializeParams(&addr)
if err != nil {
fm.available[idAddr] = avail
return cid.Undef, err
}
smsg, err := fm.api.MpoolPushMessage(ctx, &types.Message{
To: market.Address,
From: wallet,
Value: toAdd,
Method: market.Methods.AddBalance,
Params: params,
}, nil)
if err != nil {
fm.available[idAddr] = avail
return cid.Undef, err
}
return smsg.Cid(), nil
}

View File

@ -1,199 +0,0 @@
package market
import (
"context"
"errors"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
)
type fakeAPI struct {
returnedBalance api.MarketBalance
returnedBalanceErr error
signature crypto.Signature
receivedMessage *types.Message
pushMessageErr error
lookupIDErr error
}
func (fapi *fakeAPI) StateLookupID(_ context.Context, addr address.Address, _ types.TipSetKey) (address.Address, error) {
return addr, fapi.lookupIDErr
}
func (fapi *fakeAPI) StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) {
return fapi.returnedBalance, fapi.returnedBalanceErr
}
func (fapi *fakeAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
fapi.receivedMessage = msg
return &types.SignedMessage{
Message: *msg,
Signature: fapi.signature,
}, fapi.pushMessageErr
}
func addFundsMsg(toAdd abi.TokenAmount, addr address.Address, wallet address.Address) *types.Message {
params, _ := actors.SerializeParams(&addr)
return &types.Message{
To: market.Address,
From: wallet,
Value: toAdd,
Method: market.Methods.AddBalance,
Params: params,
}
}
type expectedResult struct {
addAmt abi.TokenAmount
shouldAdd bool
err error
cachedAvailable abi.TokenAmount
}
func TestAddFunds(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
returnedBalanceErr error
returnedBalance api.MarketBalance
addAmounts []abi.TokenAmount
pushMessageErr error
expectedResults []expectedResult
lookupIDErr error
}{
"succeeds, trivial case": {
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)},
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
expectedResults: []expectedResult{
{
addAmt: abi.NewTokenAmount(100),
shouldAdd: true,
err: nil,
},
},
},
"succeeds, money already present": {
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)},
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
expectedResults: []expectedResult{
{
shouldAdd: false,
err: nil,
cachedAvailable: abi.NewTokenAmount(100),
},
},
},
"succeeds, multiple adds": {
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)},
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100), abi.NewTokenAmount(200), abi.NewTokenAmount(250), abi.NewTokenAmount(250)},
expectedResults: []expectedResult{
{
shouldAdd: false,
err: nil,
},
{
addAmt: abi.NewTokenAmount(100),
shouldAdd: true,
err: nil,
cachedAvailable: abi.NewTokenAmount(200),
},
{
addAmt: abi.NewTokenAmount(50),
shouldAdd: true,
err: nil,
cachedAvailable: abi.NewTokenAmount(250),
},
{
shouldAdd: false,
err: nil,
cachedAvailable: abi.NewTokenAmount(250),
},
},
},
"error on market balance": {
returnedBalanceErr: errors.New("something went wrong"),
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
expectedResults: []expectedResult{
{
err: errors.New("something went wrong"),
},
},
},
"error on push message": {
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)},
pushMessageErr: errors.New("something went wrong"),
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
expectedResults: []expectedResult{
{
err: errors.New("something went wrong"),
cachedAvailable: abi.NewTokenAmount(0),
},
},
},
"error looking up address": {
lookupIDErr: errors.New("something went wrong"),
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
expectedResults: []expectedResult{
{
err: errors.New("something went wrong"),
},
},
},
}
for testCase, data := range testCases {
//nolint:scopelint
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
sig := make([]byte, 100)
_, err := rand.Read(sig)
require.NoError(t, err)
fapi := &fakeAPI{
returnedBalance: data.returnedBalance,
returnedBalanceErr: data.returnedBalanceErr,
signature: crypto.Signature{
Type: crypto.SigTypeUnknown,
Data: sig,
},
pushMessageErr: data.pushMessageErr,
lookupIDErr: data.lookupIDErr,
}
fundMgr := newFundMgr(fapi)
addr := tutils.NewIDAddr(t, uint64(rand.Uint32()))
wallet := tutils.NewIDAddr(t, uint64(rand.Uint32()))
for i, amount := range data.addAmounts {
fapi.receivedMessage = nil
_, err := fundMgr.EnsureAvailable(ctx, addr, wallet, amount)
expected := data.expectedResults[i]
if expected.err == nil {
require.NoError(t, err)
if expected.shouldAdd {
expectedMessage := addFundsMsg(expected.addAmt, addr, wallet)
require.Equal(t, expectedMessage, fapi.receivedMessage)
} else {
require.Nil(t, fapi.receivedMessage)
}
} else {
require.EqualError(t, err, expected.err.Error())
}
if !expected.cachedAvailable.Nil() {
require.Equal(t, expected.cachedAvailable, fundMgr.available[addr])
}
}
})
}
}

View File

@ -38,7 +38,7 @@ type ClientNodeAdapter struct {
full.ChainAPI full.ChainAPI
full.MpoolAPI full.MpoolAPI
fm *market.FundMgr fundmgr *market.FundManager
ev *events.Events ev *events.Events
dsMatcher *dealStateMatcher dsMatcher *dealStateMatcher
} }
@ -48,14 +48,14 @@ type clientApi struct {
full.StateAPI full.StateAPI
} }
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode { func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
capi := &clientApi{chain, stateapi} capi := &clientApi{chain, stateapi}
return &ClientNodeAdapter{ return &ClientNodeAdapter{
StateAPI: stateapi, StateAPI: stateapi,
ChainAPI: chain, ChainAPI: chain,
MpoolAPI: mpool, MpoolAPI: mpool,
fm: fm, fundmgr: fundmgr,
ev: events.NewEvents(context.TODO(), capi), ev: events.NewEvents(context.TODO(), capi),
dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)), dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)),
} }
@ -112,8 +112,12 @@ func (c *ClientNodeAdapter) AddFunds(ctx context.Context, addr address.Address,
return smsg.Cid(), nil return smsg.Cid(), nil
} }
func (c *ClientNodeAdapter) EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, ts shared.TipSetToken) (cid.Cid, error) { func (c *ClientNodeAdapter) ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return c.fm.EnsureAvailable(ctx, addr, wallet, amount) return c.fundmgr.Reserve(ctx, addr, wallet, amt)
}
func (c *ClientNodeAdapter) ReleaseFunds(ctx context.Context, addr address.Address, amt abi.TokenAmount) error {
return c.fundmgr.Release(addr, amt)
} }
func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (storagemarket.Balance, error) { func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (storagemarket.Balance, error) {

View File

@ -180,8 +180,12 @@ func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Addr
return localSignature, nil return localSignature, nil
} }
func (n *ProviderNodeAdapter) EnsureFunds(ctx context.Context, addr, wallet address.Address, amt abi.TokenAmount, encodedTs shared.TipSetToken) (cid.Cid, error) { func (n *ProviderNodeAdapter) ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
return n.MarketEnsureAvailable(ctx, addr, wallet, amt) return n.MarketReserveFunds(ctx, wallet, addr, amt)
}
func (n *ProviderNodeAdapter) ReleaseFunds(ctx context.Context, addr address.Address, amt abi.TokenAmount) error {
return n.MarketReleaseFunds(ctx, addr, amt)
} }
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients. // Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.

View File

@ -294,14 +294,13 @@ func Online() Option {
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient), Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore), Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer), Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
Override(new(modules.ClientDealFunds), modules.NewClientDealFunds),
Override(new(storagemarket.StorageClient), modules.StorageClient), Override(new(storagemarket.StorageClient), modules.StorageClient),
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter), Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
Override(new(beacon.Schedule), modules.RandomSchedule), Override(new(beacon.Schedule), modules.RandomSchedule),
Override(new(*paychmgr.Store), paychmgr.NewStore), Override(new(*paychmgr.Store), paychmgr.NewStore),
Override(new(*paychmgr.Manager), paychmgr.NewManager), Override(new(*paychmgr.Manager), paychmgr.NewManager),
Override(new(*market.FundMgr), market.StartFundManager), Override(new(*market.FundManager), market.NewFundManager),
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager), Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels), Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
), ),
@ -365,7 +364,6 @@ func Online() Option {
Override(new(*storedask.StoredAsk), modules.NewStorageAsk), Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
Override(HandleRetrievalKey, modules.HandleRetrieval), Override(HandleRetrievalKey, modules.HandleRetrieval),

View File

@ -14,9 +14,13 @@ import (
type MarketAPI struct { type MarketAPI struct {
fx.In fx.In
FMgr *market.FundMgr FMgr *market.FundManager
} }
func (a *MarketAPI) MarketEnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) { func (a *MarketAPI) MarketReserveFunds(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error) {
return a.FMgr.EnsureAvailable(ctx, addr, wallet, amt) return a.FMgr.Reserve(ctx, wallet, addr, amt)
}
func (a *MarketAPI) MarketReleaseFunds(ctx context.Context, addr address.Address, amt types.BigInt) error {
return a.FMgr.Release(addr, amt)
} }

View File

@ -19,7 +19,6 @@ import (
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-storedcounter" "github.com/filecoin-project/go-storedcounter"
@ -108,15 +107,9 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
return namespace.Wrap(ds, datastore.NewKey("/deals/client")) return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
} }
type ClientDealFunds funds.DealFunds func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, j journal.Journal) (storagemarket.StorageClient, error) {
func NewClientDealFunds(ds dtypes.MetadataDS) (ClientDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/client"))
}
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds, j journal.Journal) (storagemarket.StorageClient, error) {
net := smnet.NewFromLibp2pHost(h) net := smnet.NewFromLibp2pHost(h)
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, dealFunds, storageimpl.DealPollingInterval(time.Second)) c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -36,7 +36,6 @@ import (
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
@ -395,12 +394,6 @@ func NewStorageAsk(ctx helpers.MetricsCtx, fapi lapi.FullNode, ds dtypes.Metadat
return storedAsk, nil return storedAsk, nil
} }
type ProviderDealFunds funds.DealFunds
func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) {
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider"))
}
func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
@ -487,7 +480,6 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
dataTransfer dtypes.ProviderDataTransfer, dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode, spn storagemarket.StorageProviderNode,
df dtypes.StorageDealFilter, df dtypes.StorageDealFilter,
funds ProviderDealFunds,
) (storagemarket.StorageProvider, error) { ) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h) net := smnet.NewFromLibp2pHost(h)
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path())) store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
@ -497,7 +489,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df)) opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt) return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)
} }
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,