Merge pull request #4787 from filecoin-project/refactor/fund-mgr-integ
refactor: integrate new FundManager
This commit is contained in:
commit
90dd39d581
@ -514,8 +514,10 @@ type FullNode interface {
|
|||||||
// along with the address removal.
|
// along with the address removal.
|
||||||
MsigRemoveSigner(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error)
|
MsigRemoveSigner(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error)
|
||||||
|
|
||||||
MarketEnsureAvailable(context.Context, address.Address, address.Address, types.BigInt) (cid.Cid, error)
|
// MarketReserveFunds reserves funds for a deal
|
||||||
// MarketFreeBalance
|
MarketReserveFunds(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error)
|
||||||
|
// MarketReleaseFunds releases funds reserved by MarketReserveFunds
|
||||||
|
MarketReleaseFunds(ctx context.Context, addr address.Address, amt types.BigInt) error
|
||||||
|
|
||||||
// MethodGroup: Paych
|
// MethodGroup: Paych
|
||||||
// The Paych methods are for interacting with and managing payment channels
|
// The Paych methods are for interacting with and managing payment channels
|
||||||
|
@ -241,7 +241,8 @@ type FullNodeStruct struct {
|
|||||||
MsigSwapCancel func(context.Context, address.Address, address.Address, uint64, address.Address, address.Address) (cid.Cid, error) `perm:"sign"`
|
MsigSwapCancel func(context.Context, address.Address, address.Address, uint64, address.Address, address.Address) (cid.Cid, error) `perm:"sign"`
|
||||||
MsigRemoveSigner func(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error) `perm:"sign"`
|
MsigRemoveSigner func(ctx context.Context, msig address.Address, proposer address.Address, toRemove address.Address, decrease bool) (cid.Cid, error) `perm:"sign"`
|
||||||
|
|
||||||
MarketEnsureAvailable func(context.Context, address.Address, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"`
|
MarketReserveFunds func(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error) `perm:"sign"`
|
||||||
|
MarketReleaseFunds func(ctx context.Context, addr address.Address, amt types.BigInt) error `perm:"sign"`
|
||||||
|
|
||||||
PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
|
PaychGet func(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) `perm:"sign"`
|
||||||
PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"`
|
PaychGetWaitReady func(context.Context, cid.Cid) (address.Address, error) `perm:"sign"`
|
||||||
@ -1117,8 +1118,12 @@ func (c *FullNodeStruct) MsigRemoveSigner(ctx context.Context, msig address.Addr
|
|||||||
return c.Internal.MsigRemoveSigner(ctx, msig, proposer, toRemove, decrease)
|
return c.Internal.MsigRemoveSigner(ctx, msig, proposer, toRemove, decrease)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) {
|
func (c *FullNodeStruct) MarketReserveFunds(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error) {
|
||||||
return c.Internal.MarketEnsureAvailable(ctx, addr, wallet, amt)
|
return c.Internal.MarketReserveFunds(ctx, wallet, addr, amt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *FullNodeStruct) MarketReleaseFunds(ctx context.Context, addr address.Address, amt types.BigInt) error {
|
||||||
|
return c.Internal.MarketReleaseFunds(ctx, addr, amt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
|
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) {
|
||||||
|
@ -4,24 +4,20 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/node/impl/full"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("market_adapter")
|
var log = logging.Logger("market_adapter")
|
||||||
@ -35,6 +31,7 @@ type FundManagerAPI struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// fundManagerAPI is the specific methods called by the FundManager
|
// fundManagerAPI is the specific methods called by the FundManager
|
||||||
|
// (used by the tests)
|
||||||
type fundManagerAPI interface {
|
type fundManagerAPI interface {
|
||||||
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
|
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
|
||||||
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
|
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
|
||||||
@ -52,7 +49,22 @@ type FundManager struct {
|
|||||||
fundedAddrs map[address.Address]*fundedAddress
|
fundedAddrs map[address.Address]*fundedAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager {
|
func NewFundManager(lc fx.Lifecycle, api FundManagerAPI, ds dtypes.MetadataDS) *FundManager {
|
||||||
|
fm := newFundManager(&api, ds)
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(ctx context.Context) error {
|
||||||
|
return fm.Start()
|
||||||
|
},
|
||||||
|
OnStop: func(ctx context.Context) error {
|
||||||
|
fm.Stop()
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return fm
|
||||||
|
}
|
||||||
|
|
||||||
|
// newFundManager is used by the tests
|
||||||
|
func newFundManager(api fundManagerAPI, ds datastore.Batching) *FundManager {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
return &FundManager{
|
return &FundManager{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
@ -7,23 +7,17 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/go-address"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
|
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
ds "github.com/ipfs/go-datastore"
|
ds "github.com/ipfs/go-datastore"
|
||||||
ds_sync "github.com/ipfs/go-datastore/sync"
|
ds_sync "github.com/ipfs/go-datastore/sync"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestFundManagerBasic verifies that the basic fund manager operations work
|
// TestFundManagerBasic verifies that the basic fund manager operations work
|
||||||
@ -528,7 +522,7 @@ func TestFundManagerRestart(t *testing.T) {
|
|||||||
|
|
||||||
// Restart
|
// Restart
|
||||||
mockApiAfter := s.mockApi
|
mockApiAfter := s.mockApi
|
||||||
fmAfter := NewFundManager(mockApiAfter, s.ds)
|
fmAfter := newFundManager(mockApiAfter, s.ds)
|
||||||
err = fmAfter.Start()
|
err = fmAfter.Start()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -585,7 +579,7 @@ func setup(t *testing.T) *scaffold {
|
|||||||
|
|
||||||
mockApi := newMockFundManagerAPI(walletAddr)
|
mockApi := newMockFundManagerAPI(walletAddr)
|
||||||
dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
|
dstore := ds_sync.MutexWrap(ds.NewMapDatastore())
|
||||||
fm := NewFundManager(mockApi, dstore)
|
fm := newFundManager(mockApi, dstore)
|
||||||
return &scaffold{
|
return &scaffold{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
ds: dstore,
|
ds: dstore,
|
||||||
|
@ -1,163 +0,0 @@
|
|||||||
package market
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
"github.com/filecoin-project/go-state-types/big"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"go.uber.org/fx"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
|
||||||
"github.com/filecoin-project/lotus/chain/events"
|
|
||||||
"github.com/filecoin-project/lotus/chain/events/state"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
|
||||||
)
|
|
||||||
|
|
||||||
// API is the dependencies need to run a fund manager
|
|
||||||
type API struct {
|
|
||||||
fx.In
|
|
||||||
|
|
||||||
full.ChainAPI
|
|
||||||
full.StateAPI
|
|
||||||
full.MpoolAPI
|
|
||||||
}
|
|
||||||
|
|
||||||
// FundMgr monitors available balances and adds funds when EnsureAvailable is called
|
|
||||||
type FundMgr struct {
|
|
||||||
api fundMgrAPI
|
|
||||||
|
|
||||||
lk sync.RWMutex
|
|
||||||
available map[address.Address]types.BigInt
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartFundManager creates a new fund manager and sets up event hooks to manage state changes
|
|
||||||
func StartFundManager(lc fx.Lifecycle, api API) *FundMgr {
|
|
||||||
fm := newFundMgr(&api)
|
|
||||||
lc.Append(fx.Hook{
|
|
||||||
OnStart: func(ctx context.Context) error {
|
|
||||||
ev := events.NewEvents(ctx, &api)
|
|
||||||
preds := state.NewStatePredicates(&api)
|
|
||||||
dealDiffFn := preds.OnStorageMarketActorChanged(preds.OnBalanceChanged(preds.AvailableBalanceChangedForAddresses(fm.getAddresses)))
|
|
||||||
match := func(oldTs, newTs *types.TipSet) (bool, events.StateChange, error) {
|
|
||||||
return dealDiffFn(ctx, oldTs.Key(), newTs.Key())
|
|
||||||
}
|
|
||||||
return ev.StateChanged(fm.checkFunc, fm.stateChanged, fm.revert, 0, events.NoTimeout, match)
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return fm
|
|
||||||
}
|
|
||||||
|
|
||||||
type fundMgrAPI interface {
|
|
||||||
StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error)
|
|
||||||
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
|
|
||||||
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newFundMgr(api fundMgrAPI) *FundMgr {
|
|
||||||
return &FundMgr{
|
|
||||||
api: api,
|
|
||||||
available: map[address.Address]types.BigInt{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkFunc tells the events api to simply proceed (we always want to watch)
|
|
||||||
func (fm *FundMgr) checkFunc(ts *types.TipSet) (done bool, more bool, err error) {
|
|
||||||
return false, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// revert handles reverts to balances
|
|
||||||
func (fm *FundMgr) revert(ctx context.Context, ts *types.TipSet) error {
|
|
||||||
// TODO: Is it ok to just ignore this?
|
|
||||||
log.Warn("balance change reverted; TODO: actually handle this!")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// stateChanged handles balance changes monitored on the chain from one tipset to the next
|
|
||||||
func (fm *FundMgr) stateChanged(ts *types.TipSet, ts2 *types.TipSet, states events.StateChange, h abi.ChainEpoch) (more bool, err error) {
|
|
||||||
changedBalances, ok := states.(state.ChangedBalances)
|
|
||||||
if !ok {
|
|
||||||
panic("Expected state.ChangedBalances")
|
|
||||||
}
|
|
||||||
// overwrite our in memory cache with new values from chain (chain is canonical)
|
|
||||||
fm.lk.Lock()
|
|
||||||
for addr, balanceChange := range changedBalances {
|
|
||||||
if fm.available[addr].Int != nil {
|
|
||||||
log.Infof("State balance change recorded, prev: %s, new: %s", fm.available[addr].String(), balanceChange.To.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
fm.available[addr] = balanceChange.To
|
|
||||||
}
|
|
||||||
fm.lk.Unlock()
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fm *FundMgr) getAddresses() []address.Address {
|
|
||||||
fm.lk.RLock()
|
|
||||||
defer fm.lk.RUnlock()
|
|
||||||
addrs := make([]address.Address, 0, len(fm.available))
|
|
||||||
for addr := range fm.available {
|
|
||||||
addrs = append(addrs, addr)
|
|
||||||
}
|
|
||||||
return addrs
|
|
||||||
}
|
|
||||||
|
|
||||||
// EnsureAvailable looks at the available balance in escrow for a given
|
|
||||||
// address, and if less than the passed in amount, adds the difference
|
|
||||||
func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) {
|
|
||||||
idAddr, err := fm.api.StateLookupID(ctx, addr, types.EmptyTSK)
|
|
||||||
if err != nil {
|
|
||||||
return cid.Undef, err
|
|
||||||
}
|
|
||||||
fm.lk.Lock()
|
|
||||||
defer fm.lk.Unlock()
|
|
||||||
|
|
||||||
bal, err := fm.api.StateMarketBalance(ctx, addr, types.EmptyTSK)
|
|
||||||
if err != nil {
|
|
||||||
return cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
stateAvail := types.BigSub(bal.Escrow, bal.Locked)
|
|
||||||
|
|
||||||
avail, ok := fm.available[idAddr]
|
|
||||||
if !ok {
|
|
||||||
avail = stateAvail
|
|
||||||
}
|
|
||||||
|
|
||||||
toAdd := types.BigSub(amt, avail)
|
|
||||||
if toAdd.LessThan(types.NewInt(0)) {
|
|
||||||
toAdd = types.NewInt(0)
|
|
||||||
}
|
|
||||||
fm.available[idAddr] = big.Add(avail, toAdd)
|
|
||||||
|
|
||||||
log.Infof("Funds operation w/ Expected Balance: %s, In State: %s, Requested: %s, Adding: %s", avail.String(), stateAvail.String(), amt.String(), toAdd.String())
|
|
||||||
|
|
||||||
if toAdd.LessThanEqual(big.Zero()) {
|
|
||||||
return cid.Undef, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
params, err := actors.SerializeParams(&addr)
|
|
||||||
if err != nil {
|
|
||||||
fm.available[idAddr] = avail
|
|
||||||
return cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
smsg, err := fm.api.MpoolPushMessage(ctx, &types.Message{
|
|
||||||
To: market.Address,
|
|
||||||
From: wallet,
|
|
||||||
Value: toAdd,
|
|
||||||
Method: market.Methods.AddBalance,
|
|
||||||
Params: params,
|
|
||||||
}, nil)
|
|
||||||
if err != nil {
|
|
||||||
fm.available[idAddr] = avail
|
|
||||||
return cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return smsg.Cid(), nil
|
|
||||||
}
|
|
@ -1,199 +0,0 @@
|
|||||||
package market
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"math/rand"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
|
||||||
|
|
||||||
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type fakeAPI struct {
|
|
||||||
returnedBalance api.MarketBalance
|
|
||||||
returnedBalanceErr error
|
|
||||||
signature crypto.Signature
|
|
||||||
receivedMessage *types.Message
|
|
||||||
pushMessageErr error
|
|
||||||
lookupIDErr error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fapi *fakeAPI) StateLookupID(_ context.Context, addr address.Address, _ types.TipSetKey) (address.Address, error) {
|
|
||||||
return addr, fapi.lookupIDErr
|
|
||||||
}
|
|
||||||
func (fapi *fakeAPI) StateMarketBalance(context.Context, address.Address, types.TipSetKey) (api.MarketBalance, error) {
|
|
||||||
return fapi.returnedBalance, fapi.returnedBalanceErr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (fapi *fakeAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
|
||||||
fapi.receivedMessage = msg
|
|
||||||
return &types.SignedMessage{
|
|
||||||
Message: *msg,
|
|
||||||
Signature: fapi.signature,
|
|
||||||
}, fapi.pushMessageErr
|
|
||||||
}
|
|
||||||
|
|
||||||
func addFundsMsg(toAdd abi.TokenAmount, addr address.Address, wallet address.Address) *types.Message {
|
|
||||||
params, _ := actors.SerializeParams(&addr)
|
|
||||||
return &types.Message{
|
|
||||||
To: market.Address,
|
|
||||||
From: wallet,
|
|
||||||
Value: toAdd,
|
|
||||||
Method: market.Methods.AddBalance,
|
|
||||||
Params: params,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type expectedResult struct {
|
|
||||||
addAmt abi.TokenAmount
|
|
||||||
shouldAdd bool
|
|
||||||
err error
|
|
||||||
cachedAvailable abi.TokenAmount
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddFunds(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
testCases := map[string]struct {
|
|
||||||
returnedBalanceErr error
|
|
||||||
returnedBalance api.MarketBalance
|
|
||||||
addAmounts []abi.TokenAmount
|
|
||||||
pushMessageErr error
|
|
||||||
expectedResults []expectedResult
|
|
||||||
lookupIDErr error
|
|
||||||
}{
|
|
||||||
"succeeds, trivial case": {
|
|
||||||
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)},
|
|
||||||
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
|
||||||
expectedResults: []expectedResult{
|
|
||||||
{
|
|
||||||
addAmt: abi.NewTokenAmount(100),
|
|
||||||
shouldAdd: true,
|
|
||||||
err: nil,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"succeeds, money already present": {
|
|
||||||
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)},
|
|
||||||
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
|
||||||
expectedResults: []expectedResult{
|
|
||||||
{
|
|
||||||
shouldAdd: false,
|
|
||||||
err: nil,
|
|
||||||
cachedAvailable: abi.NewTokenAmount(100),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"succeeds, multiple adds": {
|
|
||||||
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(150), Locked: abi.NewTokenAmount(50)},
|
|
||||||
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100), abi.NewTokenAmount(200), abi.NewTokenAmount(250), abi.NewTokenAmount(250)},
|
|
||||||
expectedResults: []expectedResult{
|
|
||||||
{
|
|
||||||
shouldAdd: false,
|
|
||||||
err: nil,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
addAmt: abi.NewTokenAmount(100),
|
|
||||||
shouldAdd: true,
|
|
||||||
err: nil,
|
|
||||||
cachedAvailable: abi.NewTokenAmount(200),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
addAmt: abi.NewTokenAmount(50),
|
|
||||||
shouldAdd: true,
|
|
||||||
err: nil,
|
|
||||||
cachedAvailable: abi.NewTokenAmount(250),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
shouldAdd: false,
|
|
||||||
err: nil,
|
|
||||||
cachedAvailable: abi.NewTokenAmount(250),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"error on market balance": {
|
|
||||||
returnedBalanceErr: errors.New("something went wrong"),
|
|
||||||
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
|
||||||
expectedResults: []expectedResult{
|
|
||||||
{
|
|
||||||
err: errors.New("something went wrong"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"error on push message": {
|
|
||||||
returnedBalance: api.MarketBalance{Escrow: abi.NewTokenAmount(0), Locked: abi.NewTokenAmount(0)},
|
|
||||||
pushMessageErr: errors.New("something went wrong"),
|
|
||||||
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
|
||||||
expectedResults: []expectedResult{
|
|
||||||
{
|
|
||||||
err: errors.New("something went wrong"),
|
|
||||||
cachedAvailable: abi.NewTokenAmount(0),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"error looking up address": {
|
|
||||||
lookupIDErr: errors.New("something went wrong"),
|
|
||||||
addAmounts: []abi.TokenAmount{abi.NewTokenAmount(100)},
|
|
||||||
expectedResults: []expectedResult{
|
|
||||||
{
|
|
||||||
err: errors.New("something went wrong"),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for testCase, data := range testCases {
|
|
||||||
//nolint:scopelint
|
|
||||||
t.Run(testCase, func(t *testing.T) {
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
sig := make([]byte, 100)
|
|
||||||
_, err := rand.Read(sig)
|
|
||||||
require.NoError(t, err)
|
|
||||||
fapi := &fakeAPI{
|
|
||||||
returnedBalance: data.returnedBalance,
|
|
||||||
returnedBalanceErr: data.returnedBalanceErr,
|
|
||||||
signature: crypto.Signature{
|
|
||||||
Type: crypto.SigTypeUnknown,
|
|
||||||
Data: sig,
|
|
||||||
},
|
|
||||||
pushMessageErr: data.pushMessageErr,
|
|
||||||
lookupIDErr: data.lookupIDErr,
|
|
||||||
}
|
|
||||||
fundMgr := newFundMgr(fapi)
|
|
||||||
addr := tutils.NewIDAddr(t, uint64(rand.Uint32()))
|
|
||||||
wallet := tutils.NewIDAddr(t, uint64(rand.Uint32()))
|
|
||||||
for i, amount := range data.addAmounts {
|
|
||||||
fapi.receivedMessage = nil
|
|
||||||
_, err := fundMgr.EnsureAvailable(ctx, addr, wallet, amount)
|
|
||||||
expected := data.expectedResults[i]
|
|
||||||
if expected.err == nil {
|
|
||||||
require.NoError(t, err)
|
|
||||||
if expected.shouldAdd {
|
|
||||||
expectedMessage := addFundsMsg(expected.addAmt, addr, wallet)
|
|
||||||
require.Equal(t, expectedMessage, fapi.receivedMessage)
|
|
||||||
} else {
|
|
||||||
require.Nil(t, fapi.receivedMessage)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
require.EqualError(t, err, expected.err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
if !expected.cachedAvailable.Nil() {
|
|
||||||
require.Equal(t, expected.cachedAvailable, fundMgr.available[addr])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -38,6 +38,23 @@ func (ps *Store) save(state *FundedAddressState) error {
|
|||||||
return ps.ds.Put(k, b)
|
return ps.ds.Put(k, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get the state for the given address
|
||||||
|
func (ps *Store) get(addr address.Address) (*FundedAddressState, error) {
|
||||||
|
k := dskeyForAddr(addr)
|
||||||
|
|
||||||
|
data, err := ps.ds.Get(k)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var state FundedAddressState
|
||||||
|
err = cborrpc.ReadCborRPC(bytes.NewReader(data), &state)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &state, nil
|
||||||
|
}
|
||||||
|
|
||||||
// forEach calls iter with each address in the datastore
|
// forEach calls iter with each address in the datastore
|
||||||
func (ps *Store) forEach(iter func(*FundedAddressState)) error {
|
func (ps *Store) forEach(iter func(*FundedAddressState)) error {
|
||||||
res, err := ps.ds.Query(dsq.Query{Prefix: dsKeyAddr})
|
res, err := ps.ds.Query(dsq.Query{Prefix: dsKeyAddr})
|
||||||
|
@ -68,7 +68,8 @@
|
|||||||
* [LogList](#LogList)
|
* [LogList](#LogList)
|
||||||
* [LogSetLevel](#LogSetLevel)
|
* [LogSetLevel](#LogSetLevel)
|
||||||
* [Market](#Market)
|
* [Market](#Market)
|
||||||
* [MarketEnsureAvailable](#MarketEnsureAvailable)
|
* [MarketReleaseFunds](#MarketReleaseFunds)
|
||||||
|
* [MarketReserveFunds](#MarketReserveFunds)
|
||||||
* [Miner](#Miner)
|
* [Miner](#Miner)
|
||||||
* [MinerCreateBlock](#MinerCreateBlock)
|
* [MinerCreateBlock](#MinerCreateBlock)
|
||||||
* [MinerGetBaseInfo](#MinerGetBaseInfo)
|
* [MinerGetBaseInfo](#MinerGetBaseInfo)
|
||||||
@ -1612,8 +1613,24 @@ Response: `{}`
|
|||||||
## Market
|
## Market
|
||||||
|
|
||||||
|
|
||||||
### MarketEnsureAvailable
|
### MarketReleaseFunds
|
||||||
MarketFreeBalance
|
MarketReleaseFunds releases funds reserved by MarketReserveFunds
|
||||||
|
|
||||||
|
|
||||||
|
Perms: sign
|
||||||
|
|
||||||
|
Inputs:
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
"f01234",
|
||||||
|
"0"
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
Response: `{}`
|
||||||
|
|
||||||
|
### MarketReserveFunds
|
||||||
|
MarketReserveFunds reserves funds for a deal
|
||||||
|
|
||||||
|
|
||||||
Perms: sign
|
Perms: sign
|
||||||
|
4
go.mod
4
go.mod
@ -30,7 +30,7 @@ require (
|
|||||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||||
github.com/filecoin-project/go-data-transfer v1.0.1
|
github.com/filecoin-project/go-data-transfer v1.0.1
|
||||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
|
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
|
||||||
github.com/filecoin-project/go-fil-markets v1.0.1
|
github.com/filecoin-project/go-fil-markets v1.0.4
|
||||||
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20201008195726-68c6a2704e49
|
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20201008195726-68c6a2704e49
|
||||||
github.com/filecoin-project/go-multistore v0.0.3
|
github.com/filecoin-project/go-multistore v0.0.3
|
||||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
|
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
|
||||||
@ -67,7 +67,7 @@ require (
|
|||||||
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
|
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
|
||||||
github.com/ipfs/go-filestore v1.0.0
|
github.com/ipfs/go-filestore v1.0.0
|
||||||
github.com/ipfs/go-fs-lock v0.0.6
|
github.com/ipfs/go-fs-lock v0.0.6
|
||||||
github.com/ipfs/go-graphsync v0.4.2
|
github.com/ipfs/go-graphsync v0.4.3
|
||||||
github.com/ipfs/go-ipfs-blockstore v1.0.2
|
github.com/ipfs/go-ipfs-blockstore v1.0.2
|
||||||
github.com/ipfs/go-ipfs-chunker v0.0.5
|
github.com/ipfs/go-ipfs-chunker v0.0.5
|
||||||
github.com/ipfs/go-ipfs-ds-help v1.0.0
|
github.com/ipfs/go-ipfs-ds-help v1.0.0
|
||||||
|
6
go.sum
6
go.sum
@ -253,8 +253,8 @@ github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl
|
|||||||
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
|
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
|
||||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
|
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
|
||||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
||||||
github.com/filecoin-project/go-fil-markets v1.0.1 h1:xyhsHPnaECkOP8amFlp/2nK1L5xaQ9mCscXXxX2lwcs=
|
github.com/filecoin-project/go-fil-markets v1.0.4 h1:OGEoNppGcAjzIznDHFb/yy7ypVgHMO2CQZg6E9nViWI=
|
||||||
github.com/filecoin-project/go-fil-markets v1.0.1/go.mod h1:qdAqt05NWpmkGycb4duXaMtidLDmyaz1aG5goWIMm/E=
|
github.com/filecoin-project/go-fil-markets v1.0.4/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
|
||||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||||
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
||||||
@ -549,6 +549,8 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28
|
|||||||
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
|
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
|
||||||
github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo=
|
github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo=
|
||||||
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
|
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
|
||||||
|
github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6pM=
|
||||||
|
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
|
||||||
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
|
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
|
||||||
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
|
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
|
||||||
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
|
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
|
||||||
|
@ -38,7 +38,7 @@ type ClientNodeAdapter struct {
|
|||||||
full.ChainAPI
|
full.ChainAPI
|
||||||
full.MpoolAPI
|
full.MpoolAPI
|
||||||
|
|
||||||
fm *market.FundMgr
|
fundmgr *market.FundManager
|
||||||
ev *events.Events
|
ev *events.Events
|
||||||
dsMatcher *dealStateMatcher
|
dsMatcher *dealStateMatcher
|
||||||
}
|
}
|
||||||
@ -48,14 +48,14 @@ type clientApi struct {
|
|||||||
full.StateAPI
|
full.StateAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fm *market.FundMgr) storagemarket.StorageClientNode {
|
func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode {
|
||||||
capi := &clientApi{chain, stateapi}
|
capi := &clientApi{chain, stateapi}
|
||||||
return &ClientNodeAdapter{
|
return &ClientNodeAdapter{
|
||||||
StateAPI: stateapi,
|
StateAPI: stateapi,
|
||||||
ChainAPI: chain,
|
ChainAPI: chain,
|
||||||
MpoolAPI: mpool,
|
MpoolAPI: mpool,
|
||||||
|
|
||||||
fm: fm,
|
fundmgr: fundmgr,
|
||||||
ev: events.NewEvents(context.TODO(), capi),
|
ev: events.NewEvents(context.TODO(), capi),
|
||||||
dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)),
|
dsMatcher: newDealStateMatcher(state.NewStatePredicates(capi)),
|
||||||
}
|
}
|
||||||
@ -112,8 +112,12 @@ func (c *ClientNodeAdapter) AddFunds(ctx context.Context, addr address.Address,
|
|||||||
return smsg.Cid(), nil
|
return smsg.Cid(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientNodeAdapter) EnsureFunds(ctx context.Context, addr, wallet address.Address, amount abi.TokenAmount, ts shared.TipSetToken) (cid.Cid, error) {
|
func (c *ClientNodeAdapter) ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
||||||
return c.fm.EnsureAvailable(ctx, addr, wallet, amount)
|
return c.fundmgr.Reserve(ctx, wallet, addr, amt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientNodeAdapter) ReleaseFunds(ctx context.Context, addr address.Address, amt abi.TokenAmount) error {
|
||||||
|
return c.fundmgr.Release(addr, amt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (storagemarket.Balance, error) {
|
func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (storagemarket.Balance, error) {
|
||||||
|
@ -180,8 +180,12 @@ func (n *ProviderNodeAdapter) SignBytes(ctx context.Context, signer address.Addr
|
|||||||
return localSignature, nil
|
return localSignature, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *ProviderNodeAdapter) EnsureFunds(ctx context.Context, addr, wallet address.Address, amt abi.TokenAmount, encodedTs shared.TipSetToken) (cid.Cid, error) {
|
func (n *ProviderNodeAdapter) ReserveFunds(ctx context.Context, wallet, addr address.Address, amt abi.TokenAmount) (cid.Cid, error) {
|
||||||
return n.MarketEnsureAvailable(ctx, addr, wallet, amt)
|
return n.MarketReserveFunds(ctx, wallet, addr, amt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *ProviderNodeAdapter) ReleaseFunds(ctx context.Context, addr address.Address, amt abi.TokenAmount) error {
|
||||||
|
return n.MarketReleaseFunds(ctx, addr, amt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
|
// Adds funds with the StorageMinerActor for a storage participant. Used by both providers and clients.
|
||||||
|
@ -125,11 +125,12 @@ const (
|
|||||||
|
|
||||||
HandleIncomingBlocksKey
|
HandleIncomingBlocksKey
|
||||||
HandleIncomingMessagesKey
|
HandleIncomingMessagesKey
|
||||||
|
HandleMigrateClientFundsKey
|
||||||
HandlePaymentChannelManagerKey
|
HandlePaymentChannelManagerKey
|
||||||
|
|
||||||
// miner
|
// miner
|
||||||
GetParamsKey
|
GetParamsKey
|
||||||
|
HandleMigrateProviderFundsKey
|
||||||
HandleDealsKey
|
HandleDealsKey
|
||||||
HandleRetrievalKey
|
HandleRetrievalKey
|
||||||
RunSectorServiceKey
|
RunSectorServiceKey
|
||||||
@ -294,14 +295,14 @@ func Online() Option {
|
|||||||
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
|
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
|
||||||
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),
|
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),
|
||||||
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
|
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
|
||||||
Override(new(modules.ClientDealFunds), modules.NewClientDealFunds),
|
|
||||||
Override(new(storagemarket.StorageClient), modules.StorageClient),
|
Override(new(storagemarket.StorageClient), modules.StorageClient),
|
||||||
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
|
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
|
||||||
Override(new(beacon.Schedule), modules.RandomSchedule),
|
Override(new(beacon.Schedule), modules.RandomSchedule),
|
||||||
|
|
||||||
Override(new(*paychmgr.Store), paychmgr.NewStore),
|
Override(new(*paychmgr.Store), paychmgr.NewStore),
|
||||||
Override(new(*paychmgr.Manager), paychmgr.NewManager),
|
Override(new(*paychmgr.Manager), paychmgr.NewManager),
|
||||||
Override(new(*market.FundMgr), market.StartFundManager),
|
Override(new(*market.FundManager), market.NewFundManager),
|
||||||
|
Override(HandleMigrateClientFundsKey, modules.HandleMigrateClientFunds),
|
||||||
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
|
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
|
||||||
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
|
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
|
||||||
),
|
),
|
||||||
@ -365,9 +366,9 @@ func Online() Option {
|
|||||||
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
|
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
|
||||||
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
|
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
|
||||||
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
|
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
|
||||||
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
|
|
||||||
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
|
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
|
||||||
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
|
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
|
||||||
|
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
|
||||||
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
Override(HandleRetrievalKey, modules.HandleRetrieval),
|
||||||
Override(GetParamsKey, modules.GetParams),
|
Override(GetParamsKey, modules.GetParams),
|
||||||
Override(HandleDealsKey, modules.HandleDeals),
|
Override(HandleDealsKey, modules.HandleDeals),
|
||||||
|
@ -14,9 +14,13 @@ import (
|
|||||||
type MarketAPI struct {
|
type MarketAPI struct {
|
||||||
fx.In
|
fx.In
|
||||||
|
|
||||||
FMgr *market.FundMgr
|
FMgr *market.FundManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MarketAPI) MarketEnsureAvailable(ctx context.Context, addr, wallet address.Address, amt types.BigInt) (cid.Cid, error) {
|
func (a *MarketAPI) MarketReserveFunds(ctx context.Context, wallet address.Address, addr address.Address, amt types.BigInt) (cid.Cid, error) {
|
||||||
return a.FMgr.EnsureAvailable(ctx, addr, wallet, amt)
|
return a.FMgr.Reserve(ctx, wallet, addr, amt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *MarketAPI) MarketReleaseFunds(ctx context.Context, addr address.Address, amt types.BigInt) error {
|
||||||
|
return a.FMgr.Release(addr, amt)
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
package modules
|
package modules
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-multistore"
|
"github.com/filecoin-project/go-multistore"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
@ -19,7 +21,6 @@ import (
|
|||||||
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
|
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
|
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
|
||||||
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
||||||
"github.com/filecoin-project/go-storedcounter"
|
"github.com/filecoin-project/go-storedcounter"
|
||||||
@ -27,6 +28,7 @@ import (
|
|||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/market"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||||
"github.com/filecoin-project/lotus/markets"
|
"github.com/filecoin-project/lotus/markets"
|
||||||
@ -40,6 +42,36 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(ctx context.Context) error {
|
||||||
|
addr, err := wallet.WalletDefaultAddress(ctx)
|
||||||
|
// nothing to be done if there is no default address
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b, err := ds.Get(datastore.NewKey("/marketfunds/client"))
|
||||||
|
if err != nil {
|
||||||
|
if xerrors.Is(err, datastore.ErrNotFound) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var value abi.TokenAmount
|
||||||
|
if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = fundMgr.Reserve(ctx, addr, addr, value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ds.Delete(datastore.NewKey("/marketfunds/client"))
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
|
func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
|
||||||
ds, err := r.Datastore("/client")
|
ds, err := r.Datastore("/client")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -108,15 +140,9 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
|
|||||||
return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
|
return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientDealFunds funds.DealFunds
|
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, j journal.Journal) (storagemarket.StorageClient, error) {
|
||||||
|
|
||||||
func NewClientDealFunds(ds dtypes.MetadataDS) (ClientDealFunds, error) {
|
|
||||||
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/client"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, dealFunds ClientDealFunds, j journal.Journal) (storagemarket.StorageClient, error) {
|
|
||||||
net := smnet.NewFromLibp2pHost(h)
|
net := smnet.NewFromLibp2pHost(h)
|
||||||
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, dealFunds, storageimpl.DealPollingInterval(time.Second))
|
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package modules
|
package modules
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -36,7 +37,6 @@ import (
|
|||||||
"github.com/filecoin-project/go-fil-markets/shared"
|
"github.com/filecoin-project/go-fil-markets/shared"
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
|
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
|
||||||
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
||||||
"github.com/filecoin-project/go-jsonrpc/auth"
|
"github.com/filecoin-project/go-jsonrpc/auth"
|
||||||
@ -46,6 +46,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-statestore"
|
"github.com/filecoin-project/go-statestore"
|
||||||
"github.com/filecoin-project/go-storedcounter"
|
"github.com/filecoin-project/go-storedcounter"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
@ -245,6 +246,41 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.FullNode, minerAddress dtypes.MinerAddress) {
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStart: func(ctx context.Context) error {
|
||||||
|
b, err := ds.Get(datastore.NewKey("/marketfunds/provider"))
|
||||||
|
if err != nil {
|
||||||
|
if xerrors.Is(err, datastore.ErrNotFound) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var value abi.TokenAmount
|
||||||
|
if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ts, err := node.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
mi, err := node.StateMinerInfo(ctx, address.Address(minerAddress), ts.Key())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = node.MarketReserveFunds(ctx, mi.Worker, address.Address(minerAddress), value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ds.Delete(datastore.NewKey("/marketfunds/provider"))
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
|
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
|
||||||
// uses the provider's Staging DAG service for transfers
|
// uses the provider's Staging DAG service for transfers
|
||||||
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) (dtypes.ProviderDataTransfer, error) {
|
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) (dtypes.ProviderDataTransfer, error) {
|
||||||
@ -395,12 +431,6 @@ func NewStorageAsk(ctx helpers.MetricsCtx, fapi lapi.FullNode, ds dtypes.Metadat
|
|||||||
return storedAsk, nil
|
return storedAsk, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ProviderDealFunds funds.DealFunds
|
|
||||||
|
|
||||||
func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) {
|
|
||||||
return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
|
func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
|
||||||
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
|
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
|
||||||
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
|
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
|
||||||
@ -487,7 +517,6 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
|
|||||||
dataTransfer dtypes.ProviderDataTransfer,
|
dataTransfer dtypes.ProviderDataTransfer,
|
||||||
spn storagemarket.StorageProviderNode,
|
spn storagemarket.StorageProviderNode,
|
||||||
df dtypes.StorageDealFilter,
|
df dtypes.StorageDealFilter,
|
||||||
funds ProviderDealFunds,
|
|
||||||
) (storagemarket.StorageProvider, error) {
|
) (storagemarket.StorageProvider, error) {
|
||||||
net := smnet.NewFromLibp2pHost(h)
|
net := smnet.NewFromLibp2pHost(h)
|
||||||
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
|
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
|
||||||
@ -497,7 +526,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
|
|||||||
|
|
||||||
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
|
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
|
||||||
|
|
||||||
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt)
|
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
|
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
|
||||||
|
Loading…
Reference in New Issue
Block a user