2019-08-01 14:19:53 +00:00
package modules
import (
2020-11-11 05:06:50 +00:00
"bytes"
2019-08-01 14:19:53 +00:00
"context"
2020-06-11 19:59:50 +00:00
"errors"
2020-06-18 20:32:20 +00:00
"fmt"
2020-07-08 08:35:50 +00:00
"net/http"
2020-11-16 18:56:53 +00:00
"os"
"path/filepath"
2021-06-01 09:42:28 +00:00
"strings"
2020-07-08 08:35:50 +00:00
"time"
2020-07-23 02:05:11 +00:00
"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
2020-01-15 20:49:11 +00:00
"github.com/filecoin-project/go-address"
2020-07-08 08:35:50 +00:00
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
2020-01-24 20:19:52 +00:00
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
2020-09-29 11:53:30 +00:00
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
2020-01-15 20:49:11 +00:00
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
2020-01-24 20:19:52 +00:00
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
2020-09-29 11:53:30 +00:00
"github.com/filecoin-project/go-fil-markets/shared"
2020-01-15 20:49:11 +00:00
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
2020-05-20 22:46:44 +00:00
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
2020-02-06 02:43:37 +00:00
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
2020-05-20 18:23:51 +00:00
"github.com/filecoin-project/go-jsonrpc/auth"
2021-07-01 12:53:20 +00:00
"github.com/filecoin-project/go-paramfetch"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-09-21 22:52:33 +00:00
"github.com/filecoin-project/go-statestore"
2020-04-28 17:13:46 +00:00
"github.com/filecoin-project/go-storedcounter"
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
graphsync "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/libp2p/go-libp2p-core/host"
2020-05-19 23:24:59 +00:00
2020-08-17 13:39:33 +00:00
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
2020-08-17 13:26:18 +00:00
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
2020-08-17 13:39:33 +00:00
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
2020-08-18 16:27:18 +00:00
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
2020-08-17 13:26:18 +00:00
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
"github.com/filecoin-project/lotus/api"
2021-04-05 11:23:46 +00:00
"github.com/filecoin-project/lotus/api/v0api"
2021-04-05 17:56:53 +00:00
"github.com/filecoin-project/lotus/api/v1api"
2021-01-29 20:01:00 +00:00
"github.com/filecoin-project/lotus/blockstore"
2019-10-27 08:56:53 +00:00
"github.com/filecoin-project/lotus/build"
2021-01-20 02:06:00 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
2019-11-25 04:45:13 +00:00
"github.com/filecoin-project/lotus/chain/gen"
2020-08-06 01:16:54 +00:00
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
2020-03-08 08:07:58 +00:00
"github.com/filecoin-project/lotus/chain/types"
2020-09-21 22:52:33 +00:00
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/markets"
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
"github.com/filecoin-project/lotus/markets/dagstore"
2020-07-31 01:27:42 +00:00
marketevents "github.com/filecoin-project/lotus/markets/loggers"
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
"github.com/filecoin-project/lotus/markets/pricing"
2021-01-20 02:06:00 +00:00
lotusminer "github.com/filecoin-project/lotus/miner"
2020-07-23 02:05:11 +00:00
"github.com/filecoin-project/lotus/node/config"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage"
2019-08-01 14:19:53 +00:00
)
2020-05-19 23:24:59 +00:00
var StorageCounterDSPrefix = "/storage/nextid"
2019-08-08 01:16:58 +00:00
func minerAddrFromDS ( ds dtypes . MetadataDS ) ( address . Address , error ) {
maddrb , err := ds . Get ( datastore . NewKey ( "miner-address" ) )
if err != nil {
return address . Undef , err
}
return address . NewFromBytes ( maddrb )
}
2020-11-05 06:44:46 +00:00
func GetParams ( spt abi . RegisteredSealProof ) error {
ssize , err := spt . SectorSize ( )
2020-02-27 21:45:31 +00:00
if err != nil {
return err
}
2020-06-22 18:41:52 +00:00
// If built-in assets are disabled, we expect the user to have placed the right
// parameters in the right location on the filesystem (/var/tmp/filecoin-proof-parameters).
if build . DisableBuiltinAssets {
return nil
}
2020-11-05 06:44:46 +00:00
// TODO: We should fetch the params for the actual proof type, not just based on the size.
2021-03-10 15:16:44 +00:00
if err := paramfetch . GetParams ( context . TODO ( ) , build . ParametersJSON ( ) , build . SrsJSON ( ) , uint64 ( ssize ) ) ; err != nil {
2019-12-04 19:44:15 +00:00
return xerrors . Errorf ( "fetching proof parameters: %w" , err )
}
return nil
}
2020-03-18 01:08:11 +00:00
func MinerAddress ( ds dtypes . MetadataDS ) ( dtypes . MinerAddress , error ) {
ma , err := minerAddrFromDS ( ds )
return dtypes . MinerAddress ( ma ) , err
}
func MinerID ( ma dtypes . MinerAddress ) ( dtypes . MinerID , error ) {
id , err := address . IDFromAddress ( address . Address ( ma ) )
return dtypes . MinerID ( id ) , err
}
2019-12-06 00:27:32 +00:00
2021-04-05 17:56:53 +00:00
func StorageNetworkName ( ctx helpers . MetricsCtx , a v1api . FullNode ) ( dtypes . NetworkName , error ) {
2020-09-24 21:30:11 +00:00
if ! build . Devnet {
return "testnetnet" , nil
}
2020-03-31 23:13:37 +00:00
return a . StateNetworkName ( ctx )
}
2021-04-05 17:56:53 +00:00
func SealProofType ( maddr dtypes . MinerAddress , fnapi v1api . FullNode ) ( abi . RegisteredSealProof , error ) {
2020-04-16 17:36:36 +00:00
mi , err := fnapi . StateMinerInfo ( context . TODO ( ) , address . Address ( maddr ) , types . EmptyTSK )
2020-03-03 22:19:22 +00:00
if err != nil {
2020-11-05 06:44:46 +00:00
return 0 , err
2019-08-01 14:19:53 +00:00
}
2021-01-20 02:06:00 +00:00
networkVersion , err := fnapi . StateNetworkVersion ( context . TODO ( ) , types . EmptyTSK )
if err != nil {
return 0 , err
}
2020-03-03 22:19:22 +00:00
2021-01-20 02:06:00 +00:00
return miner . PreferredSealProofTypeFromWindowPoStType ( networkVersion , mi . WindowPoStProofType )
2019-08-01 14:19:53 +00:00
}
2020-03-05 00:38:07 +00:00
type sidsc struct {
sc * storedcounter . StoredCounter
}
func ( s * sidsc ) Next ( ) ( abi . SectorNumber , error ) {
i , err := s . sc . Next ( )
return abi . SectorNumber ( i ) , err
}
2020-03-17 20:19:52 +00:00
func SectorIDCounter ( ds dtypes . MetadataDS ) sealing . SectorIDCounter {
2020-05-19 23:24:59 +00:00
sc := storedcounter . New ( ds , datastore . NewKey ( StorageCounterDSPrefix ) )
2020-03-05 00:38:07 +00:00
return & sidsc { sc }
}
2020-12-02 18:58:00 +00:00
func AddressSelector ( addrConf * config . MinerAddressConfig ) func ( ) ( * storage . AddressSelector , error ) {
return func ( ) ( * storage . AddressSelector , error ) {
as := & storage . AddressSelector { }
if addrConf == nil {
return as , nil
}
2021-02-17 15:56:32 +00:00
as . DisableOwnerFallback = addrConf . DisableOwnerFallback
as . DisableWorkerFallback = addrConf . DisableWorkerFallback
2020-12-02 18:58:00 +00:00
for _ , s := range addrConf . PreCommitControl {
addr , err := address . NewFromString ( s )
if err != nil {
return nil , xerrors . Errorf ( "parsing precommit control address: %w" , err )
}
as . PreCommitControl = append ( as . PreCommitControl , addr )
}
for _ , s := range addrConf . CommitControl {
addr , err := address . NewFromString ( s )
if err != nil {
return nil , xerrors . Errorf ( "parsing commit control address: %w" , err )
}
as . CommitControl = append ( as . CommitControl , addr )
}
2021-02-17 15:56:32 +00:00
for _ , s := range addrConf . TerminateControl {
addr , err := address . NewFromString ( s )
if err != nil {
return nil , xerrors . Errorf ( "parsing terminate control address: %w" , err )
}
as . TerminateControl = append ( as . TerminateControl , addr )
}
2021-07-07 16:00:54 +00:00
for _ , s := range addrConf . DealPublishControl {
addr , err := address . NewFromString ( s )
if err != nil {
return nil , xerrors . Errorf ( "parsing deal publishing control address: %w" , err )
}
as . DealPublishControl = append ( as . DealPublishControl , addr )
}
2020-12-02 18:58:00 +00:00
return as , nil
}
}
2020-07-20 13:45:17 +00:00
type StorageMinerParams struct {
fx . In
2020-08-26 15:38:23 +00:00
Lifecycle fx . Lifecycle
MetricsCtx helpers . MetricsCtx
2021-04-05 17:56:53 +00:00
API v1api . FullNode
2020-08-26 15:38:23 +00:00
MetadataDS dtypes . MetadataDS
Sealer sectorstorage . SectorManager
SectorIDCounter sealing . SectorIDCounter
Verifier ffiwrapper . Verifier
2021-05-19 13:20:23 +00:00
Prover ffiwrapper . Prover
2020-08-26 15:38:23 +00:00
GetSealingConfigFn dtypes . GetSealingConfigFunc
2020-10-09 19:52:04 +00:00
Journal journal . Journal
2020-12-02 18:58:00 +00:00
AddrSel * storage . AddressSelector
2020-08-26 15:38:23 +00:00
}
func StorageMiner ( fc config . MinerFeeConfig ) func ( params StorageMinerParams ) ( * storage . Miner , error ) {
return func ( params StorageMinerParams ) ( * storage . Miner , error ) {
var (
ds = params . MetadataDS
mctx = params . MetricsCtx
lc = params . Lifecycle
api = params . API
sealer = params . Sealer
sc = params . SectorIDCounter
verif = params . Verifier
2021-05-19 13:20:23 +00:00
prover = params . Prover
2020-08-26 15:38:23 +00:00
gsd = params . GetSealingConfigFn
2020-10-09 19:52:04 +00:00
j = params . Journal
2020-12-02 18:58:00 +00:00
as = params . AddrSel
2020-08-26 15:38:23 +00:00
)
2020-08-12 17:47:00 +00:00
maddr , err := minerAddrFromDS ( ds )
if err != nil {
return nil , err
}
ctx := helpers . LifecycleCtx ( mctx , lc )
2021-01-22 21:02:54 +00:00
fps , err := storage . NewWindowedPoStScheduler ( api , fc , as , sealer , verif , sealer , j , maddr )
2020-08-12 17:47:00 +00:00
if err != nil {
return nil , err
}
2021-07-01 12:53:20 +00:00
sm , err := storage . NewMiner ( api , maddr , ds , sealer , sc , verif , prover , gsd , fc , j , as )
2020-08-12 17:47:00 +00:00
if err != nil {
return nil , err
}
lc . Append ( fx . Hook {
OnStart : func ( context . Context ) error {
go fps . Run ( ctx )
return sm . Run ( ctx )
} ,
OnStop : sm . Stop ,
} )
return sm , nil
2020-04-10 21:29:05 +00:00
}
2019-08-01 14:19:53 +00:00
}
2019-08-02 16:25:10 +00:00
2020-10-09 19:52:04 +00:00
func HandleRetrieval ( host host . Host , lc fx . Lifecycle , m retrievalmarket . RetrievalProvider , j journal . Journal ) {
2020-09-29 11:53:30 +00:00
m . OnReady ( marketevents . ReadyLogger ( "retrieval provider" ) )
2019-08-26 13:45:36 +00:00
lc . Append ( fx . Hook {
2020-09-29 11:53:30 +00:00
OnStart : func ( ctx context . Context ) error {
2020-07-31 01:27:42 +00:00
m . SubscribeToEvents ( marketevents . RetrievalProviderLogger )
2020-09-14 15:20:01 +00:00
2020-10-09 19:52:04 +00:00
evtType := j . RegisterEventType ( "markets/retrieval/provider" , "state_change" )
m . SubscribeToEvents ( markets . RetrievalProviderJournaler ( j , evtType ) )
2020-09-14 15:20:01 +00:00
2020-09-29 11:53:30 +00:00
return m . Start ( ctx )
2020-01-24 20:19:52 +00:00
} ,
OnStop : func ( context . Context ) error {
2020-05-27 20:53:20 +00:00
return m . Stop ( )
2019-08-26 13:45:36 +00:00
} ,
} )
}
2020-10-09 19:52:04 +00:00
func HandleDeals ( mctx helpers . MetricsCtx , lc fx . Lifecycle , host host . Host , h storagemarket . StorageProvider , j journal . Journal ) {
2019-08-06 23:08:34 +00:00
ctx := helpers . LifecycleCtx ( mctx , lc )
2020-09-29 11:53:30 +00:00
h . OnReady ( marketevents . ReadyLogger ( "storage provider" ) )
2019-08-06 23:08:34 +00:00
lc . Append ( fx . Hook {
OnStart : func ( context . Context ) error {
2020-07-31 01:27:42 +00:00
h . SubscribeToEvents ( marketevents . StorageProviderLogger )
2020-09-14 15:20:01 +00:00
2020-10-09 19:52:04 +00:00
evtType := j . RegisterEventType ( "markets/storage/provider" , "state_change" )
h . SubscribeToEvents ( markets . StorageProviderJournaler ( j , evtType ) )
2020-09-14 15:20:01 +00:00
2020-05-27 20:53:20 +00:00
return h . Start ( ctx )
2019-08-06 23:08:34 +00:00
} ,
OnStop : func ( context . Context ) error {
2020-05-27 20:53:20 +00:00
return h . Stop ( )
2019-08-06 23:08:34 +00:00
} ,
} )
2019-08-02 16:25:10 +00:00
}
2019-08-06 22:04:21 +00:00
2020-11-11 05:06:50 +00:00
func HandleMigrateProviderFunds ( lc fx . Lifecycle , ds dtypes . MetadataDS , node api . FullNode , minerAddress dtypes . MinerAddress ) {
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
b , err := ds . Get ( datastore . NewKey ( "/marketfunds/provider" ) )
if err != nil {
if xerrors . Is ( err , datastore . ErrNotFound ) {
return nil
}
return err
}
var value abi . TokenAmount
if err = value . UnmarshalCBOR ( bytes . NewReader ( b ) ) ; err != nil {
return err
}
ts , err := node . ChainHead ( ctx )
if err != nil {
2021-02-11 11:00:26 +00:00
log . Errorf ( "provider funds migration - getting chain head: %v" , err )
2020-11-12 16:15:05 +00:00
return nil
2020-11-11 05:06:50 +00:00
}
mi , err := node . StateMinerInfo ( ctx , address . Address ( minerAddress ) , ts . Key ( ) )
if err != nil {
2021-02-11 11:00:26 +00:00
log . Errorf ( "provider funds migration - getting miner info %s: %v" , minerAddress , err )
2020-11-12 16:15:05 +00:00
return nil
2020-11-11 05:06:50 +00:00
}
_ , err = node . MarketReserveFunds ( ctx , mi . Worker , address . Address ( minerAddress ) , value )
if err != nil {
2021-02-11 11:00:26 +00:00
log . Errorf ( "provider funds migration - reserving funds (wallet %s, addr %s, funds %d): %v" ,
2020-11-12 16:15:05 +00:00
mi . Worker , minerAddress , value , err )
return nil
2020-11-11 05:06:50 +00:00
}
return ds . Delete ( datastore . NewKey ( "/marketfunds/provider" ) )
} ,
} )
}
2019-11-11 20:51:28 +00:00
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
2020-11-16 18:56:53 +00:00
func NewProviderDAGServiceDataTransfer ( lc fx . Lifecycle , h host . Host , gs dtypes . StagingGraphsync , ds dtypes . MetadataDS , r repo . LockedRepo ) ( dtypes . ProviderDataTransfer , error ) {
2020-07-08 08:35:50 +00:00
net := dtnet . NewFromLibp2pHost ( h )
dtDs := namespace . Wrap ( ds , datastore . NewKey ( "/datatransfer/provider/transfers" ) )
transport := dtgstransport . NewTransport ( h . ID ( ) , gs )
2020-11-16 18:56:53 +00:00
err := os . MkdirAll ( filepath . Join ( r . Path ( ) , "data-transfer" ) , 0755 ) //nolint: gosec
if err != nil && ! os . IsExist ( err ) {
return nil , err
}
2021-03-23 12:28:30 +00:00
dt , err := dtimpl . NewDataTransfer ( dtDs , filepath . Join ( r . Path ( ) , "data-transfer" ) , net , transport )
2020-07-08 08:35:50 +00:00
if err != nil {
return nil , err
}
2020-10-13 10:37:00 +00:00
dt . OnReady ( marketevents . ReadyLogger ( "provider data transfer" ) )
2020-07-08 08:35:50 +00:00
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
2020-11-23 20:43:30 +00:00
dt . SubscribeToEvents ( marketevents . DataTransferLogger )
2020-07-08 08:35:50 +00:00
return dt . Start ( ctx )
} ,
2020-08-29 01:03:27 +00:00
OnStop : func ( ctx context . Context ) error {
return dt . Stop ( ctx )
2020-07-08 08:35:50 +00:00
} ,
} )
return dt , nil
2019-11-11 20:51:28 +00:00
}
2020-01-24 20:19:52 +00:00
// NewProviderPieceStore creates a statestore for storing metadata about pieces
// shared by the storage and retrieval providers
2020-09-29 11:53:30 +00:00
func NewProviderPieceStore ( lc fx . Lifecycle , ds dtypes . MetadataDS ) ( dtypes . ProviderPieceStore , error ) {
ps , err := piecestoreimpl . NewPieceStore ( namespace . Wrap ( ds , datastore . NewKey ( "/storagemarket" ) ) )
if err != nil {
return nil , err
}
ps . OnReady ( marketevents . ReadyLogger ( "piecestore" ) )
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
return ps . Start ( ctx )
} ,
} )
return ps , nil
2020-01-24 20:19:52 +00:00
}
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
// StagingBlockstore creates a blockstore for staging blocks for a miner
// in a storage deal, prior to sealing
2021-01-26 10:25:34 +00:00
func StagingBlockstore ( lc fx . Lifecycle , mctx helpers . MetricsCtx , r repo . LockedRepo ) ( dtypes . StagingBlockstore , error ) {
ctx := helpers . LifecycleCtx ( mctx , lc )
stagingds , err := r . Datastore ( ctx , "/staging" )
2019-08-06 22:04:21 +00:00
if err != nil {
return nil , err
}
2021-01-29 20:01:00 +00:00
return blockstore . FromDatastore ( stagingds ) , nil
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
}
// StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore
2021-06-28 09:39:01 +00:00
func StagingGraphsync ( parallelTransfers uint64 ) func ( mctx helpers . MetricsCtx , lc fx . Lifecycle , ibs dtypes . StagingBlockstore , h host . Host ) dtypes . StagingGraphsync {
return func ( mctx helpers . MetricsCtx , lc fx . Lifecycle , ibs dtypes . StagingBlockstore , h host . Host ) dtypes . StagingGraphsync {
graphsyncNetwork := gsnet . NewFromLibp2pHost ( h )
loader := storeutil . LoaderForBlockstore ( ibs )
storer := storeutil . StorerForBlockstore ( ibs )
gs := graphsync . New ( helpers . LifecycleCtx ( mctx , lc ) , graphsyncNetwork , loader , storer , graphsync . RejectAllRequestsByDefault ( ) , graphsync . MaxInProgressRequests ( parallelTransfers ) )
return gs
}
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
}
2021-04-05 17:56:53 +00:00
func SetupBlockProducer ( lc fx . Lifecycle , ds dtypes . MetadataDS , api v1api . FullNode , epp gen . WinningPoStProver , sf * slashfilter . SlashFilter , j journal . Journal ) ( * lotusminer . Miner , error ) {
2019-08-20 17:19:24 +00:00
minerAddr , err := minerAddrFromDS ( ds )
if err != nil {
2019-11-25 04:45:13 +00:00
return nil , err
2019-08-20 17:19:24 +00:00
}
2021-01-20 02:06:00 +00:00
m := lotusminer . NewMiner ( api , epp , minerAddr , sf , j )
2019-11-25 04:45:13 +00:00
2019-08-20 17:19:24 +00:00
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
2020-05-05 19:01:44 +00:00
if err := m . Start ( ctx ) ; err != nil {
2019-11-25 04:45:13 +00:00
return err
2019-10-31 22:04:13 +00:00
}
return nil
2019-08-20 17:19:24 +00:00
} ,
2019-09-17 14:23:08 +00:00
OnStop : func ( ctx context . Context ) error {
2020-05-05 19:01:44 +00:00
return m . Stop ( ctx )
2019-09-17 14:23:08 +00:00
} ,
2019-08-20 17:19:24 +00:00
} )
2019-11-25 04:45:13 +00:00
return m , nil
2019-08-20 17:19:24 +00:00
}
2019-10-27 08:56:53 +00:00
2021-04-05 17:56:53 +00:00
func NewStorageAsk ( ctx helpers . MetricsCtx , fapi v1api . FullNode , ds dtypes . MetadataDS , minerAddress dtypes . MinerAddress , spn storagemarket . StorageProviderNode ) ( * storedask . StoredAsk , error ) {
2020-02-27 21:45:31 +00:00
2020-05-20 22:46:44 +00:00
mi , err := fapi . StateMinerInfo ( ctx , address . Address ( minerAddress ) , types . EmptyTSK )
2020-02-06 02:43:37 +00:00
if err != nil {
return nil , err
}
2020-02-27 21:45:31 +00:00
2020-09-29 11:53:30 +00:00
providerDs := namespace . Wrap ( ds , datastore . NewKey ( "/deals/provider" ) )
// legacy this was mistake where this key was place -- so we move the legacy key if need be
err = shared . MoveKey ( providerDs , "/latest-ask" , "/storage-ask/latest" )
if err != nil {
return nil , err
}
2020-11-18 05:53:48 +00:00
return storedask . NewStoredAsk ( namespace . Wrap ( providerDs , datastore . NewKey ( "/storage-ask" ) ) , datastore . NewKey ( "latest" ) , spn , address . Address ( minerAddress ) ,
2020-10-14 05:47:44 +00:00
storagemarket . MaxPieceSize ( abi . PaddedPieceSize ( mi . SectorSize ) ) )
2020-05-20 22:46:44 +00:00
}
2020-02-27 21:45:31 +00:00
2020-10-15 16:24:48 +00:00
func BasicDealFilter ( user dtypes . StorageDealFilter ) func ( onlineOk dtypes . ConsiderOnlineStorageDealsConfigFunc ,
2020-07-30 17:36:31 +00:00
offlineOk dtypes . ConsiderOfflineStorageDealsConfigFunc ,
2020-12-02 06:21:29 +00:00
verifiedOk dtypes . ConsiderVerifiedStorageDealsConfigFunc ,
unverifiedOk dtypes . ConsiderUnverifiedStorageDealsConfigFunc ,
2020-07-30 17:36:31 +00:00
blocklistFunc dtypes . StorageDealPieceCidBlocklistConfigFunc ,
expectedSealTimeFunc dtypes . GetExpectedSealDurationFunc ,
2021-06-23 17:45:08 +00:00
startDelay dtypes . GetMaxDealStartDelayFunc ,
2020-10-15 16:24:48 +00:00
spn storagemarket . StorageProviderNode ) dtypes . StorageDealFilter {
2020-07-30 17:36:31 +00:00
return func ( onlineOk dtypes . ConsiderOnlineStorageDealsConfigFunc ,
offlineOk dtypes . ConsiderOfflineStorageDealsConfigFunc ,
2020-12-02 06:21:29 +00:00
verifiedOk dtypes . ConsiderVerifiedStorageDealsConfigFunc ,
unverifiedOk dtypes . ConsiderUnverifiedStorageDealsConfigFunc ,
2020-07-30 17:36:31 +00:00
blocklistFunc dtypes . StorageDealPieceCidBlocklistConfigFunc ,
expectedSealTimeFunc dtypes . GetExpectedSealDurationFunc ,
2021-06-23 17:45:08 +00:00
startDelay dtypes . GetMaxDealStartDelayFunc ,
2020-10-15 16:24:48 +00:00
spn storagemarket . StorageProviderNode ) dtypes . StorageDealFilter {
2020-07-30 17:36:31 +00:00
return func ( ctx context . Context , deal storagemarket . MinerDeal ) ( bool , string , error ) {
b , err := onlineOk ( )
if err != nil {
return false , "miner error" , err
}
if deal . Ref != nil && deal . Ref . TransferType != storagemarket . TTManual && ! b {
log . Warnf ( "online storage deal consideration disabled; rejecting storage deal proposal from client: %s" , deal . Client . String ( ) )
return false , "miner is not considering online storage deals" , nil
}
b , err = offlineOk ( )
if err != nil {
return false , "miner error" , err
}
if deal . Ref != nil && deal . Ref . TransferType == storagemarket . TTManual && ! b {
log . Warnf ( "offline storage deal consideration disabled; rejecting storage deal proposal from client: %s" , deal . Client . String ( ) )
return false , "miner is not accepting offline storage deals" , nil
}
2020-12-02 06:21:29 +00:00
b , err = verifiedOk ( )
if err != nil {
return false , "miner error" , err
}
if deal . Proposal . VerifiedDeal && ! b {
log . Warnf ( "verified storage deal consideration disabled; rejecting storage deal proposal from client: %s" , deal . Client . String ( ) )
return false , "miner is not accepting verified storage deals" , nil
}
b , err = unverifiedOk ( )
if err != nil {
return false , "miner error" , err
}
if ! deal . Proposal . VerifiedDeal && ! b {
log . Warnf ( "unverified storage deal consideration disabled; rejecting storage deal proposal from client: %s" , deal . Client . String ( ) )
return false , "miner is not accepting unverified storage deals" , nil
}
2020-07-30 17:36:31 +00:00
blocklist , err := blocklistFunc ( )
if err != nil {
return false , "miner error" , err
}
for idx := range blocklist {
if deal . Proposal . PieceCID . Equals ( blocklist [ idx ] ) {
log . Warnf ( "piece CID in proposal %s is blocklisted; rejecting storage deal proposal from client: %s" , deal . Proposal . PieceCID , deal . Client . String ( ) )
return false , fmt . Sprintf ( "miner has blocklisted piece CID %s" , deal . Proposal . PieceCID ) , nil
}
}
sealDuration , err := expectedSealTimeFunc ( )
if err != nil {
return false , "miner error" , err
}
sealEpochs := sealDuration / ( time . Duration ( build . BlockDelaySecs ) * time . Second )
_ , ht , err := spn . GetChainHead ( ctx )
if err != nil {
return false , "failed to get chain head" , err
}
earliest := abi . ChainEpoch ( sealEpochs ) + ht
if deal . Proposal . StartEpoch < earliest {
log . Warnw ( "proposed deal would start before sealing can be completed; rejecting storage deal proposal from client" , "piece_cid" , deal . Proposal . PieceCID , "client" , deal . Client . String ( ) , "seal_duration" , sealDuration , "earliest" , earliest , "curepoch" , ht )
return false , fmt . Sprintf ( "cannot seal a sector before %s" , deal . Proposal . StartEpoch ) , nil
}
2021-06-23 17:45:08 +00:00
sd , err := startDelay ( )
if err != nil {
return false , "miner error" , err
}
2020-10-05 19:35:58 +00:00
// Reject if it's more than 7 days in the future
2020-10-05 17:32:49 +00:00
// TODO: read from cfg
2021-06-23 17:45:08 +00:00
maxStartEpoch := earliest + abi . ChainEpoch ( uint64 ( sd . Seconds ( ) ) / build . BlockDelaySecs )
2020-10-05 17:32:49 +00:00
if deal . Proposal . StartEpoch > maxStartEpoch {
return false , fmt . Sprintf ( "deal start epoch is too far in the future: %s > %s" , deal . Proposal . StartEpoch , maxStartEpoch ) , nil
}
2020-07-30 17:36:31 +00:00
if user != nil {
return user ( ctx , deal )
}
return true , "" , nil
}
}
}
2020-07-12 17:54:53 +00:00
func StorageProvider ( minerAddress dtypes . MinerAddress ,
storedAsk * storedask . StoredAsk ,
h host . Host , ds dtypes . MetadataDS ,
r repo . LockedRepo ,
pieceStore dtypes . ProviderPieceStore ,
dataTransfer dtypes . ProviderDataTransfer ,
spn storagemarket . StorageProviderNode ,
2020-10-15 16:24:48 +00:00
df dtypes . StorageDealFilter ,
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
dsw * dagstore . Wrapper ,
2020-07-30 17:36:31 +00:00
) ( storagemarket . StorageProvider , error ) {
2020-05-20 22:46:44 +00:00
net := smnet . NewFromLibp2pHost ( h )
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
2021-08-16 23:16:06 +00:00
dir := filepath . Join ( r . Path ( ) , "deal-staging" )
// migrate temporary files that were created directly under the repo, by
// moving them to the new directory and symlinking them.
oldDir := r . Path ( )
if err := migrateDealStaging ( oldDir , dir ) ; err != nil {
return nil , xerrors . Errorf ( "failed to make deal staging directory %w" , err )
}
store , err := piecefilestore . NewLocalFileStore ( piecefilestore . OsPath ( dir ) )
2020-02-27 21:45:31 +00:00
if err != nil {
return nil , err
}
2020-07-30 17:36:31 +00:00
opt := storageimpl . CustomDealDecisionLogic ( storageimpl . DealDeciderFunc ( df ) )
2020-06-11 18:29:59 +00:00
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
return storageimpl . NewProvider (
net ,
namespace . Wrap ( ds , datastore . NewKey ( "/deals/provider" ) ) ,
store ,
dsw ,
pieceStore ,
dataTransfer ,
spn ,
address . Address ( minerAddress ) ,
storedAsk ,
opt ,
)
2019-12-17 10:46:39 +00:00
}
2020-10-15 16:24:48 +00:00
func RetrievalDealFilter ( userFilter dtypes . RetrievalDealFilter ) func ( onlineOk dtypes . ConsiderOnlineRetrievalDealsConfigFunc ,
offlineOk dtypes . ConsiderOfflineRetrievalDealsConfigFunc ) dtypes . RetrievalDealFilter {
return func ( onlineOk dtypes . ConsiderOnlineRetrievalDealsConfigFunc ,
offlineOk dtypes . ConsiderOfflineRetrievalDealsConfigFunc ) dtypes . RetrievalDealFilter {
return func ( ctx context . Context , state retrievalmarket . ProviderDealState ) ( bool , string , error ) {
b , err := onlineOk ( )
if err != nil {
return false , "miner error" , err
}
if ! b {
log . Warn ( "online retrieval deal consideration disabled; rejecting retrieval deal proposal from client" )
return false , "miner is not accepting online retrieval deals" , nil
}
b , err = offlineOk ( )
if err != nil {
return false , "miner error" , err
}
if ! b {
log . Info ( "offline retrieval has not been implemented yet" )
}
if userFilter != nil {
return userFilter ( ctx , state )
}
return true , "" , nil
}
}
}
2021-05-20 10:49:44 +00:00
func RetrievalNetwork ( h host . Host ) rmnet . RetrievalMarketNetwork {
return rmnet . NewFromLibp2pHost ( h )
}
2021-05-22 17:10:21 +00:00
// RetrievalPricingFunc configures the pricing function to use for retrieval deals.
func RetrievalPricingFunc ( cfg config . DealmakingConfig ) func ( _ dtypes . ConsiderOnlineRetrievalDealsConfigFunc ,
_ dtypes . ConsiderOfflineRetrievalDealsConfigFunc ) dtypes . RetrievalPricingFunc {
return func ( _ dtypes . ConsiderOnlineRetrievalDealsConfigFunc ,
_ dtypes . ConsiderOfflineRetrievalDealsConfigFunc ) dtypes . RetrievalPricingFunc {
2021-06-14 04:40:29 +00:00
if cfg . RetrievalPricing . Strategy == config . RetrievalPricingExternalMode {
2021-05-22 17:10:21 +00:00
return pricing . ExternalRetrievalPricingFunc ( cfg . RetrievalPricing . External . Path )
}
return retrievalimpl . DefaultPricingFunc ( cfg . RetrievalPricing . Default . VerifiedDealsFreeTransfer )
}
}
2019-12-10 04:19:59 +00:00
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
2021-05-20 10:49:44 +00:00
func RetrievalProvider (
maddr dtypes . MinerAddress ,
adapter retrievalmarket . RetrievalProviderNode ,
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
sa retrievalmarket . SectorAccessor ,
2021-05-20 10:49:44 +00:00
netwk rmnet . RetrievalMarketNetwork ,
2020-10-15 16:24:48 +00:00
ds dtypes . MetadataDS ,
pieceStore dtypes . ProviderPieceStore ,
dt dtypes . ProviderDataTransfer ,
2021-05-22 17:10:21 +00:00
pricingFnc dtypes . RetrievalPricingFunc ,
2020-10-15 16:24:48 +00:00
userFilter dtypes . RetrievalDealFilter ,
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
dagStore * dagstore . Wrapper ,
2020-10-15 16:24:48 +00:00
) ( retrievalmarket . RetrievalProvider , error ) {
opt := retrievalimpl . DealDeciderOpt ( retrievalimpl . DealDecider ( userFilter ) )
integrate DAG store and CARv2 in deal-making (#6671)
This commit removes badger from the deal-making processes, and
moves to a new architecture with the dagstore as the cental
component on the miner-side, and CARv2s on the client-side.
Every deal that has been handed off to the sealing subsystem becomes
a shard in the dagstore. Shards are mounted via the LotusMount, which
teaches the dagstore how to load the related piece when serving
retrievals.
When the miner starts the Lotus for the first time with this patch,
we will perform a one-time migration of all active deals into the
dagstore. This is a lightweight process, and it consists simply
of registering the shards in the dagstore.
Shards are backed by the unsealed copy of the piece. This is currently
a CARv1. However, the dagstore keeps CARv2 indices for all pieces, so
when it's time to acquire a shard to serve a retrieval, the unsealed
CARv1 is joined with its index (safeguarded by the dagstore), to form
a read-only blockstore, thus taking the place of the monolithic
badger.
Data transfers have been adjusted to interface directly with CARv2 files.
On inbound transfers (client retrievals, miner storage deals), we stream
the received data into a CARv2 ReadWrite blockstore. On outbound transfers
(client storage deals, miner retrievals), we serve the data off a CARv2
ReadOnly blockstore.
Client-side imports are managed by the refactored *imports.Manager
component (when not using IPFS integration). Just like it before, we use
the go-filestore library to avoid duplicating the data from the original
file in the resulting UnixFS DAG (concretely the leaves). However, the
target of those imports are what we call "ref-CARv2s": CARv2 files placed
under the `$LOTUS_PATH/imports` directory, containing the intermediate
nodes in full, and the leaves as positional references to the original file
on disk.
Client-side retrievals are placed into CARv2 files in the location:
`$LOTUS_PATH/retrievals`.
A new set of `Dagstore*` JSON-RPC operations and `lotus-miner dagstore`
subcommands have been introduced on the miner-side to inspect and manage
the dagstore.
Despite moving to a CARv2-backed system, the IPFS integration has been
respected, and it continues to be possible to make storage deals with data
held in an IPFS node, and to perform retrievals directly into an IPFS node.
NOTE: because the "staging" and "client" Badger blockstores are no longer
used, existing imports on the client will be rendered useless. On startup,
Lotus will enumerate all imports and print WARN statements on the log for
each import that needs to be reimported. These log lines contain these
messages:
- import lacks carv2 path; import will not work; please reimport
- import has missing/broken carv2; please reimport
At the end, we will print a "sanity check completed" message indicating
the count of imports found, and how many were deemed broken.
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Raúl Kripalani <raul@protocol.ai>
Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
2021-08-16 22:34:32 +00:00
return retrievalimpl . NewProvider (
address . Address ( maddr ) ,
adapter ,
sa ,
netwk ,
pieceStore ,
dagStore ,
dt ,
namespace . Wrap ( ds , datastore . NewKey ( "/retrievals/provider" ) ) ,
retrievalimpl . RetrievalPricingFunc ( pricingFnc ) ,
opt ,
)
2019-12-10 04:19:59 +00:00
}
2020-03-24 18:00:08 +00:00
2020-09-14 07:44:55 +00:00
var WorkerCallsPrefix = datastore . NewKey ( "/worker/calls" )
2020-09-16 20:33:49 +00:00
var ManagerWorkPrefix = datastore . NewKey ( "/stmgr/calls" )
2020-09-14 07:44:55 +00:00
2021-05-20 10:32:29 +00:00
func LocalStorage ( mctx helpers . MetricsCtx , lc fx . Lifecycle , ls stores . LocalStorage , si stores . SectorIndex , urls stores . URLs ) ( * stores . Local , error ) {
ctx := helpers . LifecycleCtx ( mctx , lc )
return stores . NewLocal ( ctx , ls , si , urls )
}
func RemoteStorage ( lstor * stores . Local , si stores . SectorIndex , sa sectorstorage . StorageAuth , sc sectorstorage . SealerConfig ) * stores . Remote {
2021-05-20 11:01:25 +00:00
return stores . NewRemote ( lstor , si , http . Header ( sa ) , sc . ParallelFetchLimit , & stores . DefaultPartialFileHandler { } )
2021-05-20 10:32:29 +00:00
}
2021-07-06 15:46:21 +00:00
func SectorStorage ( mctx helpers . MetricsCtx , lc fx . Lifecycle , lstor * stores . Local , stor * stores . Remote , ls stores . LocalStorage , si stores . SectorIndex , sc sectorstorage . SealerConfig , ds dtypes . MetadataDS ) ( * sectorstorage . Manager , error ) {
2020-03-24 18:00:08 +00:00
ctx := helpers . LifecycleCtx ( mctx , lc )
2020-09-14 07:44:55 +00:00
wsts := statestore . New ( namespace . Wrap ( ds , WorkerCallsPrefix ) )
2020-09-16 20:33:49 +00:00
smsts := statestore . New ( namespace . Wrap ( ds , ManagerWorkPrefix ) )
2020-09-14 07:44:55 +00:00
2021-07-06 15:46:21 +00:00
sst , err := sectorstorage . New ( ctx , lstor , stor , ls , si , sc , wsts , smsts )
2020-03-24 23:49:45 +00:00
if err != nil {
return nil , err
}
lc . Append ( fx . Hook {
2020-07-17 22:31:14 +00:00
OnStop : sst . Close ,
2020-03-24 23:49:45 +00:00
} )
return sst , nil
2020-03-24 18:00:08 +00:00
}
2020-03-27 20:08:06 +00:00
2021-04-05 11:23:46 +00:00
func StorageAuth ( ctx helpers . MetricsCtx , ca v0api . Common ) ( sectorstorage . StorageAuth , error ) {
2020-05-20 18:23:51 +00:00
token , err := ca . AuthNew ( ctx , [ ] auth . Permission { "admin" } )
2020-03-27 20:08:06 +00:00
if err != nil {
return nil , xerrors . Errorf ( "creating storage auth header: %w" , err )
}
headers := http . Header { }
headers . Add ( "Authorization" , "Bearer " + string ( token ) )
2020-08-17 13:26:18 +00:00
return sectorstorage . StorageAuth ( headers ) , nil
2020-03-27 20:08:06 +00:00
}
2020-06-11 18:29:59 +00:00
2021-07-12 09:36:22 +00:00
func StorageAuthWithURL ( apiInfo string ) func ( ctx helpers . MetricsCtx , ca v0api . Common ) ( sectorstorage . StorageAuth , error ) {
2021-06-01 09:42:28 +00:00
return func ( ctx helpers . MetricsCtx , ca v0api . Common ) ( sectorstorage . StorageAuth , error ) {
2021-07-12 09:36:22 +00:00
s := strings . Split ( apiInfo , ":" )
2021-06-01 09:42:28 +00:00
if len ( s ) != 2 {
2021-07-12 09:36:22 +00:00
return nil , errors . New ( "unexpected format of `apiInfo`" )
2021-06-01 09:42:28 +00:00
}
headers := http . Header { }
headers . Add ( "Authorization" , "Bearer " + s [ 0 ] )
return sectorstorage . StorageAuth ( headers ) , nil
}
}
2020-06-26 19:27:41 +00:00
func NewConsiderOnlineStorageDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOnlineStorageDealsConfigFunc , error ) {
2020-06-23 16:04:46 +00:00
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
out = cfg . Dealmaking . ConsiderOnlineStorageDeals
2020-06-23 16:04:46 +00:00
} )
return
} , nil
}
2020-06-26 19:27:41 +00:00
func NewSetConsideringOnlineStorageDealsFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOnlineStorageDealsConfigFunc , error ) {
2020-06-23 16:04:46 +00:00
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
cfg . Dealmaking . ConsiderOnlineStorageDeals = b
2020-06-23 16:04:46 +00:00
} )
return
} , nil
}
2020-06-26 19:27:41 +00:00
func NewConsiderOnlineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOnlineRetrievalDealsConfigFunc , error ) {
2020-06-18 20:15:18 +00:00
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
out = cfg . Dealmaking . ConsiderOnlineRetrievalDeals
2020-06-18 20:15:18 +00:00
} )
return
2020-06-11 18:29:59 +00:00
} , nil
}
2020-06-11 19:59:50 +00:00
2020-06-26 19:27:41 +00:00
func NewSetConsiderOnlineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOnlineRetrievalDealsConfigFunc , error ) {
2020-06-18 20:15:18 +00:00
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
cfg . Dealmaking . ConsiderOnlineRetrievalDeals = b
2020-06-11 19:59:50 +00:00
} )
2020-06-18 20:15:18 +00:00
return
} , nil
}
2020-06-11 19:59:50 +00:00
2020-06-18 22:42:24 +00:00
func NewStorageDealPieceCidBlocklistConfigFunc ( r repo . LockedRepo ) ( dtypes . StorageDealPieceCidBlocklistConfigFunc , error ) {
2020-06-18 20:15:18 +00:00
return func ( ) ( out [ ] cid . Cid , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-18 22:42:24 +00:00
out = cfg . Dealmaking . PieceCidBlocklist
2020-06-18 20:15:18 +00:00
} )
return
2020-06-11 19:59:50 +00:00
} , nil
}
2020-06-18 20:15:18 +00:00
2020-06-18 22:42:24 +00:00
func NewSetStorageDealPieceCidBlocklistConfigFunc ( r repo . LockedRepo ) ( dtypes . SetStorageDealPieceCidBlocklistConfigFunc , error ) {
return func ( blocklist [ ] cid . Cid ) ( err error ) {
2020-06-18 20:15:18 +00:00
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-18 22:42:24 +00:00
cfg . Dealmaking . PieceCidBlocklist = blocklist
2020-06-18 20:15:18 +00:00
} )
return
} , nil
}
2020-06-26 19:27:41 +00:00
func NewConsiderOfflineStorageDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOfflineStorageDealsConfigFunc , error ) {
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = cfg . Dealmaking . ConsiderOfflineStorageDeals
} )
return
} , nil
}
func NewSetConsideringOfflineStorageDealsFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOfflineStorageDealsConfigFunc , error ) {
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ConsiderOfflineStorageDeals = b
} )
return
} , nil
}
func NewConsiderOfflineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOfflineRetrievalDealsConfigFunc , error ) {
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = cfg . Dealmaking . ConsiderOfflineRetrievalDeals
} )
return
} , nil
}
func NewSetConsiderOfflineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOfflineRetrievalDealsConfigFunc , error ) {
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ConsiderOfflineRetrievalDeals = b
} )
return
} , nil
}
2020-12-02 06:21:29 +00:00
func NewConsiderVerifiedStorageDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderVerifiedStorageDealsConfigFunc , error ) {
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = cfg . Dealmaking . ConsiderVerifiedStorageDeals
} )
return
} , nil
}
func NewSetConsideringVerifiedStorageDealsFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderVerifiedStorageDealsConfigFunc , error ) {
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ConsiderVerifiedStorageDeals = b
} )
return
} , nil
}
func NewConsiderUnverifiedStorageDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderUnverifiedStorageDealsConfigFunc , error ) {
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = cfg . Dealmaking . ConsiderUnverifiedStorageDeals
} )
return
} , nil
}
func NewSetConsideringUnverifiedStorageDealsFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderUnverifiedStorageDealsConfigFunc , error ) {
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ConsiderUnverifiedStorageDeals = b
} )
return
} , nil
}
2020-08-18 14:20:31 +00:00
func NewSetSealConfigFunc ( r repo . LockedRepo ) ( dtypes . SetSealingConfigFunc , error ) {
2020-08-18 16:27:18 +00:00
return func ( cfg sealiface . Config ) ( err error ) {
2020-08-18 14:20:31 +00:00
err = mutateCfg ( r , func ( c * config . StorageMiner ) {
c . Sealing = config . SealingConfig {
2020-11-16 23:28:49 +00:00
MaxWaitDealsSectors : cfg . MaxWaitDealsSectors ,
MaxSealingSectors : cfg . MaxSealingSectors ,
MaxSealingSectorsForDeals : cfg . MaxSealingSectorsForDeals ,
WaitDealsDelay : config . Duration ( cfg . WaitDealsDelay ) ,
2021-01-26 16:50:31 +00:00
AlwaysKeepUnsealedCopy : cfg . AlwaysKeepUnsealedCopy ,
2021-06-11 09:41:28 +00:00
FinalizeEarly : cfg . FinalizeEarly ,
2021-05-18 14:53:49 +00:00
2021-06-29 16:17:08 +00:00
CollateralFromMinerBalance : cfg . CollateralFromMinerBalance ,
2021-07-12 16:46:05 +00:00
AvailableBalanceBuffer : types . FIL ( cfg . AvailableBalanceBuffer ) ,
DisableCollateralFallback : cfg . DisableCollateralFallback ,
2021-05-18 14:53:49 +00:00
BatchPreCommits : cfg . BatchPreCommits ,
MaxPreCommitBatch : cfg . MaxPreCommitBatch ,
PreCommitBatchWait : config . Duration ( cfg . PreCommitBatchWait ) ,
PreCommitBatchSlack : config . Duration ( cfg . PreCommitBatchSlack ) ,
2021-07-01 11:33:54 +00:00
AggregateCommits : cfg . AggregateCommits ,
MinCommitBatch : cfg . MinCommitBatch ,
MaxCommitBatch : cfg . MaxCommitBatch ,
CommitBatchWait : config . Duration ( cfg . CommitBatchWait ) ,
CommitBatchSlack : config . Duration ( cfg . CommitBatchSlack ) ,
AggregateAboveBaseFee : types . FIL ( cfg . AggregateAboveBaseFee ) ,
2021-05-18 14:53:49 +00:00
TerminateBatchMax : cfg . TerminateBatchMax ,
TerminateBatchMin : cfg . TerminateBatchMin ,
TerminateBatchWait : config . Duration ( cfg . TerminateBatchWait ) ,
2020-08-18 14:20:31 +00:00
}
2020-07-06 18:39:26 +00:00
} )
return
} , nil
}
2021-07-01 19:07:53 +00:00
func ToSealingConfig ( cfg * config . StorageMiner ) sealiface . Config {
return sealiface . Config {
MaxWaitDealsSectors : cfg . Sealing . MaxWaitDealsSectors ,
MaxSealingSectors : cfg . Sealing . MaxSealingSectors ,
MaxSealingSectorsForDeals : cfg . Sealing . MaxSealingSectorsForDeals ,
WaitDealsDelay : time . Duration ( cfg . Sealing . WaitDealsDelay ) ,
AlwaysKeepUnsealedCopy : cfg . Sealing . AlwaysKeepUnsealedCopy ,
FinalizeEarly : cfg . Sealing . FinalizeEarly ,
2021-06-29 16:17:08 +00:00
CollateralFromMinerBalance : cfg . Sealing . CollateralFromMinerBalance ,
2021-07-12 16:46:05 +00:00
AvailableBalanceBuffer : types . BigInt ( cfg . Sealing . AvailableBalanceBuffer ) ,
DisableCollateralFallback : cfg . Sealing . DisableCollateralFallback ,
2021-07-01 19:07:53 +00:00
BatchPreCommits : cfg . Sealing . BatchPreCommits ,
MaxPreCommitBatch : cfg . Sealing . MaxPreCommitBatch ,
PreCommitBatchWait : time . Duration ( cfg . Sealing . PreCommitBatchWait ) ,
PreCommitBatchSlack : time . Duration ( cfg . Sealing . PreCommitBatchSlack ) ,
AggregateCommits : cfg . Sealing . AggregateCommits ,
MinCommitBatch : cfg . Sealing . MinCommitBatch ,
MaxCommitBatch : cfg . Sealing . MaxCommitBatch ,
CommitBatchWait : time . Duration ( cfg . Sealing . CommitBatchWait ) ,
CommitBatchSlack : time . Duration ( cfg . Sealing . CommitBatchSlack ) ,
AggregateAboveBaseFee : types . BigInt ( cfg . Sealing . AggregateAboveBaseFee ) ,
TerminateBatchMax : cfg . Sealing . TerminateBatchMax ,
TerminateBatchMin : cfg . Sealing . TerminateBatchMin ,
TerminateBatchWait : time . Duration ( cfg . Sealing . TerminateBatchWait ) ,
}
}
2020-08-18 14:20:31 +00:00
func NewGetSealConfigFunc ( r repo . LockedRepo ) ( dtypes . GetSealingConfigFunc , error ) {
2020-08-18 16:27:18 +00:00
return func ( ) ( out sealiface . Config , err error ) {
2020-07-06 18:39:26 +00:00
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2021-07-01 19:07:53 +00:00
out = ToSealingConfig ( cfg )
2020-07-06 18:39:26 +00:00
} )
return
} , nil
}
2020-07-12 17:54:53 +00:00
func NewSetExpectedSealDurationFunc ( r repo . LockedRepo ) ( dtypes . SetExpectedSealDurationFunc , error ) {
return func ( delay time . Duration ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ExpectedSealDuration = config . Duration ( delay )
} )
return
} , nil
}
func NewGetExpectedSealDurationFunc ( r repo . LockedRepo ) ( dtypes . GetExpectedSealDurationFunc , error ) {
return func ( ) ( out time . Duration , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = time . Duration ( cfg . Dealmaking . ExpectedSealDuration )
} )
return
} , nil
}
2021-06-23 17:45:08 +00:00
func NewSetMaxDealStartDelayFunc ( r repo . LockedRepo ) ( dtypes . SetMaxDealStartDelayFunc , error ) {
return func ( delay time . Duration ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . MaxDealStartDelay = config . Duration ( delay )
} )
return
} , nil
}
func NewGetMaxDealStartDelayFunc ( r repo . LockedRepo ) ( dtypes . GetMaxDealStartDelayFunc , error ) {
return func ( ) ( out time . Duration , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = time . Duration ( cfg . Dealmaking . MaxDealStartDelay )
} )
return
} , nil
}
2020-06-18 20:15:18 +00:00
func readCfg ( r repo . LockedRepo , accessor func ( * config . StorageMiner ) ) error {
raw , err := r . Config ( )
if err != nil {
return err
}
cfg , ok := raw . ( * config . StorageMiner )
if ! ok {
return xerrors . New ( "expected address of config.StorageMiner" )
}
accessor ( cfg )
return nil
}
func mutateCfg ( r repo . LockedRepo , mutator func ( * config . StorageMiner ) ) error {
var typeErr error
setConfigErr := r . SetConfig ( func ( raw interface { } ) {
cfg , ok := raw . ( * config . StorageMiner )
if ! ok {
2020-07-11 08:55:13 +00:00
typeErr = errors . New ( "expected miner config" )
2020-06-18 20:15:18 +00:00
return
}
mutator ( cfg )
} )
return multierr . Combine ( typeErr , setConfigErr )
}
2021-07-28 09:10:17 +00:00
2021-08-16 23:16:06 +00:00
func migrateDealStaging ( oldPath , newPath string ) error {
dirInfo , err := os . Stat ( newPath )
if err == nil {
if ! dirInfo . IsDir ( ) {
return xerrors . Errorf ( "%s is not a directory" , newPath )
}
// The newPath exists already, below migration has already occurred.
return nil
}
// if the directory doesn't exist, create it
if os . IsNotExist ( err ) {
if err := os . MkdirAll ( newPath , 0755 ) ; err != nil {
return xerrors . Errorf ( "failed to mk directory %s for deal staging: %w" , newPath , err )
}
} else { // if we failed for other reasons, abort.
return err
}
// if this is the first time we created the directory, symlink all staged deals into it. "Migration"
// get a list of files in the miner repo
dirEntries , err := os . ReadDir ( oldPath )
if err != nil {
return xerrors . Errorf ( "failed to list directory %s for deal staging: %w" , oldPath , err )
}
for _ , entry := range dirEntries {
// ignore directories, they are not the deals.
if entry . IsDir ( ) {
continue
}
// the FileStore from fil-storage-market creates temporary staged deal files with the pattern "fstmp"
// https://github.com/filecoin-project/go-fil-markets/blob/00ff81e477d846ac0cb58a0c7d1c2e9afb5ee1db/filestore/filestore.go#L69
name := entry . Name ( )
if strings . Contains ( name , "fstmp" ) {
// from the miner repo
oldPath := filepath . Join ( oldPath , name )
// to its subdir "deal-staging"
newPath := filepath . Join ( newPath , name )
// create a symbolic link in the new deal staging directory to preserve existing staged deals.
// all future staged deals will be created here.
if err := os . Rename ( oldPath , newPath ) ; err != nil {
return xerrors . Errorf ( "failed to move %s to %s: %w" , oldPath , newPath , err )
}
if err := os . Symlink ( newPath , oldPath ) ; err != nil {
return xerrors . Errorf ( "failed to symlink %s to %s: %w" , oldPath , newPath , err )
}
log . Infow ( "symlinked staged deal" , "from" , oldPath , "to" , newPath )
}
}
return nil
}
2021-08-17 14:35:17 +00:00
2021-07-28 22:46:21 +00:00
func ExtractEnabledMinerSubsystems ( cfg config . MinerSubsystemConfig ) ( res api . MinerSubsystems ) {
2021-07-28 09:10:17 +00:00
if cfg . EnableMining {
2021-07-28 18:51:45 +00:00
res = append ( res , api . SubsystemMining )
2021-07-28 09:10:17 +00:00
}
if cfg . EnableSealing {
2021-07-28 18:51:45 +00:00
res = append ( res , api . SubsystemSealing )
2021-07-28 09:10:17 +00:00
}
if cfg . EnableSectorStorage {
2021-07-28 18:51:45 +00:00
res = append ( res , api . SubsystemSectorStorage )
2021-07-28 09:10:17 +00:00
}
if cfg . EnableMarkets {
2021-07-28 18:51:45 +00:00
res = append ( res , api . SubsystemMarkets )
2021-07-28 09:10:17 +00:00
}
2021-07-28 22:46:21 +00:00
return res
2021-07-28 09:10:17 +00:00
}