Simple market fund manager
This commit is contained in:
parent
eae0b84b80
commit
be2e58a2fb
@ -111,6 +111,9 @@ type FullNode interface {
|
||||
StateMarketDeals(context.Context, *types.TipSet) (map[string]actors.OnChainDeal, error)
|
||||
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
|
||||
|
||||
MarketEnsureAvailable(context.Context, address.Address, types.BigInt) error
|
||||
// MarketFreeBalance
|
||||
|
||||
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
|
||||
PaychList(context.Context) ([]address.Address, error)
|
||||
PaychStatus(context.Context, address.Address) (*PaychStatus, error)
|
||||
|
@ -107,6 +107,8 @@ type FullNodeStruct struct {
|
||||
StateMarketDeals func(context.Context, *types.TipSet) (map[string]actors.OnChainDeal, error) `perm:"read"`
|
||||
StateMarketStorageDeal func(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) `perm:"read"`
|
||||
|
||||
MarketEnsureAvailable func(context.Context, address.Address, types.BigInt) error `perm:"sign"`
|
||||
|
||||
PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"`
|
||||
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
|
||||
PaychStatus func(context.Context, address.Address) (*PaychStatus, error) `perm:"read"`
|
||||
@ -418,6 +420,10 @@ func (c *FullNodeStruct) StateMarketStorageDeal(ctx context.Context, dealid uint
|
||||
return c.Internal.StateMarketStorageDeal(ctx, dealid, ts)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr address.Address, amt types.BigInt) error {
|
||||
return c.Internal.MarketEnsureAvailable(ctx, addr, amt)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) {
|
||||
return c.Internal.PaychGet(ctx, from, to, ensureFunds)
|
||||
}
|
||||
|
@ -3,9 +3,6 @@ package deals
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
@ -19,11 +16,14 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/events"
|
||||
"github.com/filecoin-project/lotus/chain/market"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/lib/cborutil"
|
||||
"github.com/filecoin-project/lotus/lib/statestore"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||
)
|
||||
@ -50,8 +50,8 @@ type Client struct {
|
||||
w *wallet.Wallet
|
||||
dag dtypes.ClientDAG
|
||||
discovery *discovery.Local
|
||||
mpool full.MpoolAPI
|
||||
events *events.Events
|
||||
fm *market.FundMgr
|
||||
|
||||
deals *statestore.StateStore
|
||||
conns map[cid.Cid]inet.Stream
|
||||
@ -70,7 +70,7 @@ type clientDealUpdate struct {
|
||||
mut func(*ClientDeal)
|
||||
}
|
||||
|
||||
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local, mpool full.MpoolAPI, chainapi full.ChainAPI) *Client {
|
||||
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local, fm *market.FundMgr, chainapi full.ChainAPI) *Client {
|
||||
c := &Client{
|
||||
sm: sm,
|
||||
chain: chain,
|
||||
@ -78,7 +78,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *
|
||||
w: w,
|
||||
dag: dag,
|
||||
discovery: discovery,
|
||||
mpool: mpool,
|
||||
fm: fm,
|
||||
events: events.NewEvents(context.TODO(), &chainapi),
|
||||
|
||||
deals: statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/client"))),
|
||||
@ -137,7 +137,7 @@ func (c *Client) onIncoming(deal *ClientDeal) {
|
||||
}
|
||||
|
||||
func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
|
||||
log.Infof("Deal %s updated state to %d", update.id, update.newState)
|
||||
log.Infof("Client deal %s updated state to %s", update.id, api.DealStates[update.newState])
|
||||
var deal ClientDeal
|
||||
err := c.deals.Mutate(update.id, func(d *ClientDeal) error {
|
||||
d.State = update.newState
|
||||
@ -184,35 +184,8 @@ type ClientDealProposal struct {
|
||||
}
|
||||
|
||||
func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, error) {
|
||||
// check market funds
|
||||
clientMarketBalance, err := c.sm.MarketBalance(ctx, p.Client, nil)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("getting client market balance failed: %w", err)
|
||||
}
|
||||
|
||||
if clientMarketBalance.Available.LessThan(types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))) {
|
||||
// TODO: move to a smarter market funds manager
|
||||
|
||||
smsg, err := c.mpool.MpoolPushMessage(ctx, &types.Message{
|
||||
To: actors.StorageMarketAddress,
|
||||
From: p.Client,
|
||||
Value: types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration)),
|
||||
GasPrice: types.NewInt(0),
|
||||
GasLimit: types.NewInt(1000000),
|
||||
Method: actors.SMAMethods.AddBalance,
|
||||
})
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
_, r, err := c.sm.WaitForMessage(ctx, smsg.Cid())
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
if r.ExitCode != 0 {
|
||||
return cid.Undef, xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.ExitCode)
|
||||
}
|
||||
if err := c.fm.EnsureAvailable(ctx, p.Client, types.BigMul(p.PricePerEpoch, types.NewInt(p.Duration))); err != nil {
|
||||
return cid.Undef, xerrors.Errorf("adding market funds failed: %w", err)
|
||||
}
|
||||
|
||||
commP, pieceSize, err := c.commP(ctx, p.Data)
|
||||
|
@ -160,7 +160,7 @@ func (p *Provider) onIncoming(deal MinerDeal) {
|
||||
}
|
||||
|
||||
func (p *Provider) onUpdated(ctx context.Context, update minerDealUpdate) {
|
||||
log.Infof("Deal %s updated state to %d", update.id, update.newState)
|
||||
log.Infof("Deal %s updated state to %s", update.id, api.DealStates[update.newState])
|
||||
if update.err != nil {
|
||||
log.Errorf("deal %s (newSt: %d) failed: %+v", update.id, update.newState, update.err)
|
||||
p.failDeal(update.id, update.err)
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/padreader"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
@ -39,33 +38,6 @@ func (p *Provider) handle(ctx context.Context, deal MinerDeal, cb providerHandle
|
||||
}
|
||||
|
||||
// ACCEPTED
|
||||
|
||||
func (p *Provider) addMarketFunds(ctx context.Context, worker address.Address, deal MinerDeal) error {
|
||||
log.Info("Adding market funds for storage collateral")
|
||||
smsg, err := p.full.MpoolPushMessage(ctx, &types.Message{
|
||||
To: actors.StorageMarketAddress,
|
||||
From: worker,
|
||||
Value: deal.Proposal.StorageCollateral,
|
||||
GasPrice: types.NewInt(0),
|
||||
GasLimit: types.NewInt(1000000),
|
||||
Method: actors.SMAMethods.AddBalance,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r, err := p.full.StateWaitMsg(ctx, smsg.Cid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if r.Receipt.ExitCode != 0 {
|
||||
return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.Receipt.ExitCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||
switch deal.Proposal.PieceSerialization {
|
||||
//case SerializationRaw:
|
||||
@ -111,16 +83,9 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
providerMarketBalance, err := p.full.StateMarketBalance(ctx, waddr, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting provider market balance failed: %w", err)
|
||||
}
|
||||
|
||||
// TODO: this needs to be atomic
|
||||
if providerMarketBalance.Available.LessThan(deal.Proposal.StorageCollateral) {
|
||||
if err := p.addMarketFunds(ctx, waddr, deal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: check StorageCollateral (may be too large (or too small))
|
||||
if err := p.full.MarketEnsureAvailable(ctx, waddr, deal.Proposal.StorageCollateral); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Info("publishing deal")
|
||||
|
78
chain/market/fundmgr.go
Normal file
78
chain/market/fundmgr.go
Normal file
@ -0,0 +1,78 @@
|
||||
package market
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
)
|
||||
|
||||
type FundMgr struct {
|
||||
sm *stmgr.StateManager
|
||||
mpool full.MpoolAPI
|
||||
|
||||
lk sync.Mutex
|
||||
available map[address.Address]types.BigInt
|
||||
}
|
||||
|
||||
func NewFundMgr(sm *stmgr.StateManager, mpool full.MpoolAPI) *FundMgr {
|
||||
return &FundMgr{
|
||||
sm: sm,
|
||||
mpool: mpool,
|
||||
|
||||
available: map[address.Address]types.BigInt{},
|
||||
}
|
||||
}
|
||||
|
||||
func (fm *FundMgr) EnsureAvailable(ctx context.Context, addr address.Address, amt types.BigInt) error {
|
||||
fm.lk.Lock()
|
||||
avail, ok := fm.available[addr]
|
||||
if !ok {
|
||||
bal, err := fm.sm.MarketBalance(ctx, addr, nil)
|
||||
if err != nil {
|
||||
fm.lk.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
avail = bal.Available
|
||||
}
|
||||
|
||||
toAdd := types.NewInt(0)
|
||||
avail = types.BigSub(avail, amt)
|
||||
if avail.LessThan(types.NewInt(0)) {
|
||||
// TODO: some rules around adding more to avoid doing stuff on-chain
|
||||
// all the time
|
||||
toAdd = types.BigSub(toAdd, avail)
|
||||
avail = types.NewInt(0)
|
||||
}
|
||||
fm.available[addr] = avail
|
||||
fm.lk.Unlock()
|
||||
|
||||
smsg, err := fm.mpool.MpoolPushMessage(ctx, &types.Message{
|
||||
To: actors.StorageMarketAddress,
|
||||
From: addr,
|
||||
Value: toAdd,
|
||||
GasPrice: types.NewInt(0),
|
||||
GasLimit: types.NewInt(1000000),
|
||||
Method: actors.SMAMethods.AddBalance,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, r, err := fm.sm.WaitForMessage(ctx, smsg.Cid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if r.ExitCode != 0 {
|
||||
return xerrors.Errorf("adding funds to storage miner market actor failed: exit %d", r.ExitCode)
|
||||
}
|
||||
return nil
|
||||
}
|
@ -3,6 +3,7 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/filecoin-project/lotus/chain/market"
|
||||
"time"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
@ -221,6 +222,7 @@ func Online() Option {
|
||||
|
||||
Override(new(*paych.Store), paych.NewStore),
|
||||
Override(new(*paych.Manager), paych.NewManager),
|
||||
Override(new(*market.FundMgr), market.NewFundMgr),
|
||||
|
||||
Override(new(*miner.Miner), miner.NewMiner),
|
||||
),
|
||||
|
@ -2,6 +2,7 @@ package impl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/lotus/node/impl/market"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/impl/client"
|
||||
"github.com/filecoin-project/lotus/node/impl/paych"
|
||||
@ -21,6 +22,7 @@ type FullNodeAPI struct {
|
||||
full.ChainAPI
|
||||
client.API
|
||||
full.MpoolAPI
|
||||
market.MarketAPI
|
||||
paych.PaychAPI
|
||||
full.StateAPI
|
||||
full.WalletAPI
|
||||
|
21
node/impl/market/market.go
Normal file
21
node/impl/market/market.go
Normal file
@ -0,0 +1,21 @@
|
||||
package market
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/chain/market"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
type MarketAPI struct {
|
||||
fx.In
|
||||
|
||||
FMgr *market.FundMgr
|
||||
}
|
||||
|
||||
func (a *MarketAPI) MarketEnsureAvailable(ctx context.Context, addr address.Address, amt types.BigInt) error {
|
||||
return a.FMgr.EnsureAvailable(ctx, addr, amt)
|
||||
}
|
Loading…
Reference in New Issue
Block a user