3795cc2bd2
This paves the way for better object lifetime management. Concretely, it makes it possible to: - have different stores backing chain and state data. - having the same datastore library, but using different parameters. - attach different caching layers/policies to each class of data, e.g. sizing caches differently. - specifying different retention policies for chain and state data. This separation is important because: - access patterns/frequency of chain and state data are different. - state is derivable from chain, so one could never expunge the chain store, and only retain state objects reachable from the last finality in the state store.
222 lines
8.7 KiB
Go
222 lines
8.7 KiB
Go
package modules
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/filecoin-project/go-multistore"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"golang.org/x/xerrors"
|
|
|
|
"go.uber.org/fx"
|
|
|
|
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
|
|
dtnet "github.com/filecoin-project/go-data-transfer/network"
|
|
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
|
|
"github.com/filecoin-project/go-fil-markets/discovery"
|
|
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
|
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
|
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
|
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
|
|
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
|
"github.com/filecoin-project/go-storedcounter"
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
|
|
"github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/chain/market"
|
|
"github.com/filecoin-project/lotus/journal"
|
|
"github.com/filecoin-project/lotus/markets"
|
|
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
|
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
|
"github.com/filecoin-project/lotus/node/impl/full"
|
|
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
"github.com/filecoin-project/lotus/node/repo/importmgr"
|
|
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
|
)
|
|
|
|
func HandleMigrateClientFunds(lc fx.Lifecycle, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
addr, err := wallet.WalletDefaultAddress(ctx)
|
|
// nothing to be done if there is no default address
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
b, err := ds.Get(datastore.NewKey("/marketfunds/client"))
|
|
if err != nil {
|
|
if xerrors.Is(err, datastore.ErrNotFound) {
|
|
return nil
|
|
}
|
|
log.Errorf("client funds migration - getting datastore value: %v", err)
|
|
return nil
|
|
}
|
|
|
|
var value abi.TokenAmount
|
|
if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
|
|
log.Errorf("client funds migration - unmarshalling datastore value: %v", err)
|
|
return nil
|
|
}
|
|
_, err = fundMgr.Reserve(ctx, addr, addr, value)
|
|
if err != nil {
|
|
log.Errorf("client funds migration - reserving funds (wallet %s, addr %s, funds %d): %v",
|
|
addr, addr, value, err)
|
|
return nil
|
|
}
|
|
|
|
return ds.Delete(datastore.NewKey("/marketfunds/client"))
|
|
},
|
|
})
|
|
}
|
|
|
|
func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ClientMultiDstore, error) {
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
ds, err := r.Datastore(ctx, "/client")
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("getting datastore out of repo: %w", err)
|
|
}
|
|
|
|
mds, err := multistore.NewMultiDstore(ds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return mds.Close()
|
|
},
|
|
})
|
|
|
|
return mds, nil
|
|
}
|
|
|
|
func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.ClientImportMgr {
|
|
return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client")))
|
|
}
|
|
|
|
func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore {
|
|
// in most cases this is now unused in normal operations -- however, it's important to preserve for the IPFS use case
|
|
return blockstore.WrapIDStore(imgr.Blockstore)
|
|
}
|
|
|
|
// RegisterClientValidator is an initialization hook that registers the client
|
|
// request validator with the data transfer module as the validator for
|
|
// StorageDataTransferVoucher types
|
|
func RegisterClientValidator(crv dtypes.ClientRequestValidator, dtm dtypes.ClientDataTransfer) {
|
|
if err := dtm.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, (*requestvalidation.UnifiedRequestValidator)(crv)); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// NewClientGraphsyncDataTransfer returns a data transfer manager that just
|
|
// uses the clients's Client DAG service for transfers
|
|
func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Graphsync, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ClientDataTransfer, error) {
|
|
sc := storedcounter.New(ds, datastore.NewKey("/datatransfer/client/counter"))
|
|
|
|
// go-data-transfer protocol retries:
|
|
// 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour
|
|
dtRetryParams := dtnet.RetryParameters(time.Second, 5*time.Minute, 15, 5)
|
|
net := dtnet.NewFromLibp2pHost(h, dtRetryParams)
|
|
|
|
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/client/transfers"))
|
|
transport := dtgstransport.NewTransport(h.ID(), gs)
|
|
err := os.MkdirAll(filepath.Join(r.Path(), "data-transfer"), 0755) //nolint: gosec
|
|
if err != nil && !os.IsExist(err) {
|
|
return nil, err
|
|
}
|
|
|
|
// data-transfer push channel restart configuration
|
|
dtRestartConfig := dtimpl.PushChannelRestartConfig(time.Minute, 10, 1024, 10*time.Minute, 3)
|
|
dt, err := dtimpl.NewDataTransfer(dtDs, filepath.Join(r.Path(), "data-transfer"), net, transport, sc, dtRestartConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dt.OnReady(marketevents.ReadyLogger("client data transfer"))
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
dt.SubscribeToEvents(marketevents.DataTransferLogger)
|
|
return dt.Start(ctx)
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return dt.Stop(ctx)
|
|
},
|
|
})
|
|
return dt, nil
|
|
}
|
|
|
|
// NewClientDatastore creates a datastore for the client to store its deals
|
|
func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
|
|
return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
|
|
}
|
|
|
|
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, j journal.Journal) (storagemarket.StorageClient, error) {
|
|
// go-fil-markets protocol retries:
|
|
// 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour
|
|
marketsRetryParams := smnet.RetryParameters(time.Second, 5*time.Minute, 15, 5)
|
|
net := smnet.NewFromLibp2pHost(h, marketsRetryParams)
|
|
|
|
c, err := storageimpl.NewClient(net, ibs, mds, dataTransfer, discovery, deals, scn, storageimpl.DealPollingInterval(time.Second))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.OnReady(marketevents.ReadyLogger("storage client"))
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
c.SubscribeToEvents(marketevents.StorageClientLogger)
|
|
|
|
evtType := j.RegisterEventType("markets/storage/client", "state_change")
|
|
c.SubscribeToEvents(markets.StorageClientJournaler(j, evtType))
|
|
|
|
return c.Start(ctx)
|
|
},
|
|
OnStop: func(context.Context) error {
|
|
return c.Stop()
|
|
},
|
|
})
|
|
return c, nil
|
|
}
|
|
|
|
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
|
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
|
|
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
|
|
network := rmnet.NewFromLibp2pHost(h)
|
|
sc := storedcounter.New(ds, datastore.NewKey("/retr"))
|
|
client, err := retrievalimpl.NewClient(network, mds, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")), sc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client.OnReady(marketevents.ReadyLogger("retrieval client"))
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
|
|
|
|
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
|
|
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
|
|
|
|
return client.Start(ctx)
|
|
},
|
|
})
|
|
return client, nil
|
|
}
|
|
|
|
// ClientRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
|
|
func ClientRetrievalStoreManager(imgr dtypes.ClientImportMgr) dtypes.ClientRetrievalStoreManager {
|
|
return retrievalstoremgr.NewMultiStoreRetrievalStoreManager(imgr)
|
|
}
|
|
|
|
// ClientBlockstoreRetrievalStoreManager is the default version of the RetrievalStoreManager that runs on multistore
|
|
func ClientBlockstoreRetrievalStoreManager(bs dtypes.ClientBlockstore) dtypes.ClientRetrievalStoreManager {
|
|
return retrievalstoremgr.NewBlockstoreRetrievalStoreManager(bs)
|
|
}
|