219 lines
9.1 KiB
Go
219 lines
9.1 KiB
Go
package modules
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipfs/go-datastore/namespace"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-data-transfer/v2/channelmonitor"
|
|
dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl"
|
|
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
|
|
dtgstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
|
|
"github.com/filecoin-project/go-fil-markets/discovery"
|
|
discoveryimpl "github.com/filecoin-project/go-fil-markets/discovery/impl"
|
|
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
|
|
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
|
|
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
|
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
|
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
|
|
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
"github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/chain/market"
|
|
"github.com/filecoin-project/lotus/journal"
|
|
"github.com/filecoin-project/lotus/markets"
|
|
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
|
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
|
"github.com/filecoin-project/lotus/markets/storageadapter"
|
|
"github.com/filecoin-project/lotus/node/config"
|
|
"github.com/filecoin-project/lotus/node/impl/full"
|
|
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
"github.com/filecoin-project/lotus/node/repo/imports"
|
|
)
|
|
|
|
func HandleMigrateClientFunds(lc fx.Lifecycle, mctx helpers.MetricsCtx, ds dtypes.MetadataDS, wallet full.WalletAPI, fundMgr *market.FundManager) {
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
addr, err := wallet.WalletDefaultAddress(ctx)
|
|
// nothing to be done if there is no default address
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
b, err := ds.Get(helpers.LifecycleCtx(mctx, lc), datastore.NewKey("/marketfunds/client"))
|
|
if err != nil {
|
|
if xerrors.Is(err, datastore.ErrNotFound) {
|
|
return nil
|
|
}
|
|
log.Errorf("client funds migration - getting datastore value: %v", err)
|
|
return nil
|
|
}
|
|
|
|
var value abi.TokenAmount
|
|
if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil {
|
|
log.Errorf("client funds migration - unmarshalling datastore value: %v", err)
|
|
return nil
|
|
}
|
|
_, err = fundMgr.Reserve(ctx, addr, addr, value)
|
|
if err != nil {
|
|
log.Errorf("client funds migration - reserving funds (wallet %s, addr %s, funds %d): %v",
|
|
addr, addr, value, err)
|
|
return nil
|
|
}
|
|
|
|
return ds.Delete(helpers.LifecycleCtx(mctx, lc), datastore.NewKey("/marketfunds/client"))
|
|
},
|
|
})
|
|
}
|
|
|
|
func ClientImportMgr(ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ClientImportMgr, error) {
|
|
// store the imports under the repo's `imports` subdirectory.
|
|
dir := filepath.Join(r.Path(), "imports")
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return nil, xerrors.Errorf("failed to create directory %s: %w", dir, err)
|
|
}
|
|
|
|
ns := namespace.Wrap(ds, datastore.NewKey("/client"))
|
|
return imports.NewManager(ns, dir), nil
|
|
}
|
|
|
|
// TODO this should be removed.
|
|
func ClientBlockstore() dtypes.ClientBlockstore {
|
|
// in most cases this is now unused in normal operations -- however, it's important to preserve for the IPFS use case
|
|
return blockstore.WrapIDStore(blockstore.FromDatastore(datastore.NewMapDatastore()))
|
|
}
|
|
|
|
// NewClientGraphsyncDataTransfer returns a data transfer manager that just
|
|
// uses the clients's Client DAG service for transfers
|
|
func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Graphsync, ds dtypes.MetadataDS, r repo.LockedRepo) (dtypes.ClientDataTransfer, error) {
|
|
// go-data-transfer protocol retries:
|
|
// 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour
|
|
dtRetryParams := dtnet.RetryParameters(time.Second, 5*time.Minute, 15, 5)
|
|
net := dtnet.NewFromLibp2pHost(h, dtRetryParams)
|
|
|
|
dtDs := namespace.Wrap(ds, datastore.NewKey("/datatransfer/client/transfers"))
|
|
transport := dtgstransport.NewTransport(h.ID(), gs)
|
|
|
|
// data-transfer push / pull channel restart configuration:
|
|
dtRestartConfig := dtimpl.ChannelRestartConfig(channelmonitor.Config{
|
|
// Disable Accept and Complete timeouts until this issue is resolved:
|
|
// https://github.com/filecoin-project/lotus/issues/6343#
|
|
// Wait for the other side to respond to an Open channel message
|
|
AcceptTimeout: 0,
|
|
// Wait for the other side to send a Complete message once all
|
|
// data has been sent / received
|
|
CompleteTimeout: 0,
|
|
|
|
// When an error occurs, wait a little while until all related errors
|
|
// have fired before sending a restart message
|
|
RestartDebounce: 10 * time.Second,
|
|
// After sending a restart, wait for at least 1 minute before sending another
|
|
RestartBackoff: time.Minute,
|
|
// After trying to restart 3 times, give up and fail the transfer
|
|
MaxConsecutiveRestarts: 3,
|
|
})
|
|
dt, err := dtimpl.NewDataTransfer(dtDs, net, transport, dtRestartConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dt.OnReady(marketevents.ReadyLogger("client data transfer"))
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
dt.SubscribeToEvents(marketevents.DataTransferLogger)
|
|
return dt.Start(ctx)
|
|
},
|
|
OnStop: func(ctx context.Context) error {
|
|
return dt.Stop(ctx)
|
|
},
|
|
})
|
|
return dt, nil
|
|
}
|
|
|
|
// NewClientDatastore creates a datastore for the client to store its deals
|
|
func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
|
|
return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
|
|
}
|
|
|
|
// StorageBlockstoreAccessor returns the default storage blockstore accessor
|
|
// from the import manager.
|
|
func StorageBlockstoreAccessor(importmgr dtypes.ClientImportMgr) storagemarket.BlockstoreAccessor {
|
|
return storageadapter.NewImportsBlockstoreAccessor(importmgr)
|
|
}
|
|
|
|
// RetrievalBlockstoreAccessor returns the default retrieval blockstore accessor
|
|
// using the subdirectory `retrievals` under the repo.
|
|
func RetrievalBlockstoreAccessor(r repo.LockedRepo) (retrievalmarket.BlockstoreAccessor, error) {
|
|
dir := filepath.Join(r.Path(), "retrievals")
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return nil, xerrors.Errorf("failed to create directory %s: %w", dir, err)
|
|
}
|
|
return retrievaladapter.NewCARBlockstoreAccessor(dir), nil
|
|
}
|
|
|
|
func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local,
|
|
deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, accessor storagemarket.BlockstoreAccessor, j journal.Journal) (storagemarket.StorageClient, error) {
|
|
// go-fil-markets protocol retries:
|
|
// 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour
|
|
marketsRetryParams := smnet.RetryParameters(time.Second, 5*time.Minute, 15, 5)
|
|
net := smnet.NewFromLibp2pHost(h, marketsRetryParams)
|
|
|
|
c, err := storageimpl.NewClient(net, dataTransfer, discovery, deals, scn, accessor, storageimpl.DealPollingInterval(time.Second), storageimpl.MaxTraversalLinks(config.MaxTraversalLinks))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.OnReady(marketevents.ReadyLogger("storage client"))
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
c.SubscribeToEvents(marketevents.StorageClientLogger)
|
|
|
|
evtType := j.RegisterEventType("markets/storage/client", "state_change")
|
|
c.SubscribeToEvents(markets.StorageClientJournaler(j, evtType))
|
|
|
|
return c.Start(ctx)
|
|
},
|
|
OnStop: func(context.Context) error {
|
|
return c.Stop()
|
|
},
|
|
})
|
|
return c, nil
|
|
}
|
|
|
|
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
|
func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
|
|
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
|
|
return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
|
|
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
|
|
adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI)
|
|
network := rmnet.NewFromLibp2pHost(h)
|
|
ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))
|
|
client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client.OnReady(marketevents.ReadyLogger("retrieval client"))
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(ctx context.Context) error {
|
|
client.SubscribeToEvents(marketevents.RetrievalClientLogger)
|
|
|
|
evtType := j.RegisterEventType("markets/retrieval/client", "state_change")
|
|
client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType))
|
|
|
|
return client.Start(ctx)
|
|
},
|
|
})
|
|
return client, nil
|
|
}
|
|
}
|