feat(builder): finish new FundManager setup

Finish setup of new FundManager and provide a migration for previously reserved funds
This commit is contained in:
hannahhoward 2020-11-10 21:06:50 -08:00
parent 0d243bb824
commit 9f7204ee26
7 changed files with 99 additions and 6 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
@ -48,7 +49,7 @@ type FundManager struct {
fundedAddrs map[address.Address]*fundedAddress
}
func NewFundManager(lc fx.Lifecycle, api FundManagerAPI, ds datastore.Batching) *FundManager {
func NewFundManager(lc fx.Lifecycle, api FundManagerAPI, ds dtypes.MetadataDS) *FundManager {
fm := newFundManager(&api, ds)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {

View File

@ -38,6 +38,23 @@ func (ps *Store) save(state *FundedAddressState) error {
return ps.ds.Put(k, b)
}
// get the state for the given address
func (ps *Store) get(addr address.Address) (*FundedAddressState, error) {
k := dskeyForAddr(addr)
data, err := ps.ds.Get(k)
if err != nil {
return nil, err
}
var state FundedAddressState
err = cborrpc.ReadCborRPC(bytes.NewReader(data), &state)
if err != nil {
return nil, err
}
return &state, nil
}
// forEach calls iter with each address in the datastore
func (ps *Store) forEach(iter func(*FundedAddressState)) error {
res, err := ps.ds.Query(dsq.Query{Prefix: dsKeyAddr})

4
go.mod
View File

@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.0.1
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v1.0.1
github.com/filecoin-project/go-fil-markets v1.0.2-0.20201111032051-24fdce33ac9b
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20201008195726-68c6a2704e49
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
@ -67,7 +67,7 @@ require (
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.4.2
github.com/ipfs/go-graphsync v0.4.3-0.20201111025458-1b22ee580973
github.com/ipfs/go-ipfs-blockstore v1.0.2
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0

6
go.sum
View File

@ -253,8 +253,8 @@ github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.1 h1:xyhsHPnaECkOP8amFlp/2nK1L5xaQ9mCscXXxX2lwcs=
github.com/filecoin-project/go-fil-markets v1.0.1/go.mod h1:qdAqt05NWpmkGycb4duXaMtidLDmyaz1aG5goWIMm/E=
github.com/filecoin-project/go-fil-markets v1.0.2-0.20201111032051-24fdce33ac9b h1:bp9kz3PkEH0ZLS8U5+1MHnvAr9AXtSGE32MtproV7Xk=
github.com/filecoin-project/go-fil-markets v1.0.2-0.20201111032051-24fdce33ac9b/go.mod h1:jGc5NUeifN2qlYgO9yR87+9GaCkXncOhSZBlnEZC0jg=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
@ -549,6 +549,8 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo=
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-graphsync v0.4.3-0.20201111025458-1b22ee580973 h1:O1TZosrLfqgTGaU9HhKirQI4p0O7H82upZLa96w9IqA=
github.com/ipfs/go-graphsync v0.4.3-0.20201111025458-1b22ee580973/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=

View File

@ -125,11 +125,12 @@ const (
HandleIncomingBlocksKey
HandleIncomingMessagesKey
HandleMigrateClientFundsKey
HandlePaymentChannelManagerKey
// miner
GetParamsKey
HandleMigrateProviderFundsKey
HandleDealsKey
HandleRetrievalKey
RunSectorServiceKey
@ -301,6 +302,7 @@ func Online() Option {
Override(new(*paychmgr.Store), paychmgr.NewStore),
Override(new(*paychmgr.Manager), paychmgr.NewManager),
Override(new(*market.FundManager), market.NewFundManager),
Override(HandleMigrateClientFundsKey, modules.HandleMigrateClientFunds),
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
),
@ -366,6 +368,7 @@ func Online() Option {
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
Override(HandleRetrievalKey, modules.HandleRetrieval),
Override(GetParamsKey, modules.GetParams),
Override(HandleDealsKey, modules.HandleDeals),

View File

@ -1,10 +1,12 @@
package modules
import (
"bytes"
"context"
"time"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi"
"golang.org/x/xerrors"
"go.uber.org/fx"
@ -26,6 +28,7 @@ import (
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p-core/host"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/markets"
@ -39,6 +42,36 @@ import (
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
)
func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
addr, err := wallet.WalletDefaultAddress(ctx)
// nothing to be done if there is no default address
if err != nil {
return nil
}
b, err := ds.Get(datastore.NewKey("/marketfunds/client"))
if err != nil {
if xerrors.Is(err, datastore.ErrNotFound) {
return nil
}
return err
}
var value abi.TokenAmount
if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
return err
}
_, err = fundMgr.Reserve(ctx, addr, addr, value)
if err != nil {
return err
}
return ds.Delete(datastore.NewKey("/marketfunds/client"))
},
})
}
func ClientMultiDatastore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
ds, err := r.Datastore("/client")
if err != nil {

View File

@ -1,6 +1,7 @@
package modules
import (
"bytes"
"context"
"errors"
"fmt"
@ -45,6 +46,7 @@ import (
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
"github.com/filecoin-project/lotus/api"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -244,6 +246,41 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
})
}
func HandleMigrateProviderFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, node api.FullNode, minerAddress dtypes.MinerAddress) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
b, err := ds.Get(datastore.NewKey("/marketfunds/provider"))
if err != nil {
if xerrors.Is(err, datastore.ErrNotFound) {
return nil
}
return err
}
var value abi.TokenAmount
if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
return err
}
ts, err := node.ChainHead(ctx)
if err != nil {
return err
}
mi, err := node.StateMinerInfo(ctx, address.Address(minerAddress), ts.Key())
if err != nil {
return err
}
_, err = node.MarketReserveFunds(ctx, mi.Worker, address.Address(minerAddress), value)
if err != nil {
return err
}
return ds.Delete(datastore.NewKey("/marketfunds/provider"))
},
})
}
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) (dtypes.ProviderDataTransfer, error) {