2019-08-01 14:19:53 +00:00
package modules
import (
"context"
2019-10-31 22:04:13 +00:00
"fmt"
2019-11-04 16:47:08 +00:00
"math"
2019-08-08 04:24:49 +00:00
"path/filepath"
2019-11-11 20:25:19 +00:00
"reflect"
2019-08-08 04:24:49 +00:00
2019-08-06 22:04:21 +00:00
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
2019-08-01 14:19:53 +00:00
"github.com/ipfs/go-datastore"
2019-11-11 20:51:28 +00:00
"github.com/ipfs/go-datastore/namespace"
2019-08-06 22:04:21 +00:00
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-merkledag"
2019-08-01 14:19:53 +00:00
"github.com/libp2p/go-libp2p-core/host"
2019-08-29 15:09:34 +00:00
"github.com/libp2p/go-libp2p-core/routing"
2019-08-01 14:19:53 +00:00
"github.com/mitchellh/go-homedir"
"go.uber.org/fx"
2019-10-27 08:56:53 +00:00
"golang.org/x/xerrors"
2019-08-01 14:19:53 +00:00
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/api"
2019-10-27 08:56:53 +00:00
"github.com/filecoin-project/lotus/build"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/deals"
2019-11-11 20:25:19 +00:00
"github.com/filecoin-project/lotus/datatransfer"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/lib/sectorbuilder"
2019-11-11 20:51:28 +00:00
"github.com/filecoin-project/lotus/lib/statestore"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/storage"
2019-08-01 14:19:53 +00:00
)
2019-08-08 01:16:58 +00:00
func minerAddrFromDS ( ds dtypes . MetadataDS ) ( address . Address , error ) {
maddrb , err := ds . Get ( datastore . NewKey ( "miner-address" ) )
if err != nil {
return address . Undef , err
}
return address . NewFromBytes ( maddrb )
}
2019-11-04 16:47:08 +00:00
func SectorBuilderConfig ( storagePath string , threads uint ) func ( dtypes . MetadataDS , api . FullNode ) ( * sectorbuilder . Config , error ) {
return func ( ds dtypes . MetadataDS , api api . FullNode ) ( * sectorbuilder . Config , error ) {
2019-08-08 01:16:58 +00:00
minerAddr , err := minerAddrFromDS ( ds )
if err != nil {
return nil , err
}
2019-10-16 07:07:16 +00:00
ssize , err := api . StateMinerSectorSize ( context . TODO ( ) , minerAddr , nil )
if err != nil {
return nil , err
}
2019-08-01 14:19:53 +00:00
sp , err := homedir . Expand ( storagePath )
if err != nil {
return nil , err
}
2019-11-04 16:47:08 +00:00
if threads > math . MaxUint8 {
return nil , xerrors . Errorf ( "too many sectorbuilder threads specified: %d, max allowed: %d" , threads , math . MaxUint8 )
}
2019-10-30 18:10:29 +00:00
cache := filepath . Join ( sp , "cache" )
2019-08-01 14:19:53 +00:00
metadata := filepath . Join ( sp , "meta" )
sealed := filepath . Join ( sp , "sealed" )
staging := filepath . Join ( sp , "staging" )
2019-11-04 16:47:08 +00:00
sb := & sectorbuilder . Config {
Miner : minerAddr ,
SectorSize : ssize ,
WorkerThreads : uint8 ( threads ) ,
2019-10-31 01:22:50 +00:00
CacheDir : cache ,
2019-08-01 14:19:53 +00:00
MetadataDir : metadata ,
SealedDir : sealed ,
StagedDir : staging ,
}
return sb , nil
}
}
2019-11-07 18:22:59 +00:00
func StorageMiner ( mctx helpers . MetricsCtx , lc fx . Lifecycle , api api . FullNode , h host . Host , ds dtypes . MetadataDS , sb * sectorbuilder . SectorBuilder , tktFn storage . TicketFn ) ( * storage . Miner , error ) {
2019-08-08 01:16:58 +00:00
maddr , err := minerAddrFromDS ( ds )
2019-08-01 14:19:53 +00:00
if err != nil {
return nil , err
}
2019-11-07 18:22:59 +00:00
sm , err := storage . NewMiner ( api , maddr , h , ds , sb , tktFn )
2019-08-01 14:19:53 +00:00
if err != nil {
return nil , err
}
ctx := helpers . LifecycleCtx ( mctx , lc )
lc . Append ( fx . Hook {
OnStart : func ( context . Context ) error {
return sm . Run ( ctx )
} ,
2019-11-01 13:58:48 +00:00
OnStop : sm . Stop ,
2019-08-01 14:19:53 +00:00
} )
return sm , nil
}
2019-08-02 16:25:10 +00:00
2019-08-26 13:45:36 +00:00
func HandleRetrieval ( host host . Host , lc fx . Lifecycle , m * retrieval . Miner ) {
lc . Append ( fx . Hook {
OnStart : func ( context . Context ) error {
2019-08-27 18:45:21 +00:00
host . SetStreamHandler ( retrieval . QueryProtocolID , m . HandleQueryStream )
host . SetStreamHandler ( retrieval . ProtocolID , m . HandleDealStream )
2019-08-26 13:45:36 +00:00
return nil
} ,
} )
}
2019-10-21 18:12:11 +00:00
func HandleDeals ( mctx helpers . MetricsCtx , lc fx . Lifecycle , host host . Host , h * deals . Provider ) {
2019-08-06 23:08:34 +00:00
ctx := helpers . LifecycleCtx ( mctx , lc )
lc . Append ( fx . Hook {
OnStart : func ( context . Context ) error {
h . Run ( ctx )
2019-10-21 18:12:11 +00:00
host . SetStreamHandler ( deals . DealProtocolID , h . HandleStream )
2019-09-13 21:00:36 +00:00
host . SetStreamHandler ( deals . AskProtocolID , h . HandleAskStream )
2019-08-06 23:08:34 +00:00
return nil
} ,
OnStop : func ( context . Context ) error {
h . Stop ( )
return nil
} ,
} )
2019-08-02 16:25:10 +00:00
}
2019-08-06 22:04:21 +00:00
2019-11-11 20:25:19 +00:00
// RegisterProviderValidator is an initialization hook that registers the provider
// request validator with the data transfer module as the validator for
// StorageDataTransferVoucher types
2019-11-11 20:51:28 +00:00
func RegisterProviderValidator ( mrv * deals . ProviderRequestValidator , dtm dtypes . ProviderDataTransfer ) {
2019-11-11 20:25:19 +00:00
dtm . RegisterVoucherType ( reflect . TypeOf ( deals . StorageDataTransferVoucher { } ) , mrv )
}
2019-11-11 20:51:28 +00:00
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
func NewProviderDAGServiceDataTransfer ( dag dtypes . StagingDAG ) dtypes . ProviderDataTransfer {
return datatransfer . NewDAGServiceDataTransfer ( dag )
}
// NewProviderDealStore creates a statestore for the client to store its deals
func NewProviderDealStore ( ds dtypes . MetadataDS ) dtypes . ProviderDealStore {
return statestore . New ( namespace . Wrap ( ds , datastore . NewKey ( "/deals/client" ) ) )
}
2019-08-06 22:04:21 +00:00
func StagingDAG ( mctx helpers . MetricsCtx , lc fx . Lifecycle , r repo . LockedRepo , rt routing . Routing , h host . Host ) ( dtypes . StagingDAG , error ) {
stagingds , err := r . Datastore ( "/staging" )
if err != nil {
return nil , err
}
bs := blockstore . NewBlockstore ( stagingds )
ibs := blockstore . NewIdStore ( bs )
bitswapNetwork := network . NewFromIpfsHost ( h , rt )
exch := bitswap . New ( helpers . LifecycleCtx ( mctx , lc ) , bitswapNetwork , bs )
bsvc := blockservice . New ( ibs , exch )
dag := merkledag . NewDAGService ( bsvc )
lc . Append ( fx . Hook {
OnStop : func ( _ context . Context ) error {
return bsvc . Close ( )
} ,
} )
return dag , nil
}
2019-08-20 17:19:24 +00:00
func RegisterMiner ( lc fx . Lifecycle , ds dtypes . MetadataDS , api api . FullNode ) error {
minerAddr , err := minerAddrFromDS ( ds )
if err != nil {
return err
}
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
2019-09-17 14:23:08 +00:00
log . Infof ( "Registering miner '%s' with full node" , minerAddr )
2019-10-31 22:04:13 +00:00
if err := api . MinerRegister ( ctx , minerAddr ) ; err != nil {
return fmt . Errorf ( "Failed to register miner: %s\nIf you are certain no other storage miner instance is running, try running 'lotus unregister-miner %s' and restarting the storage miner" , err , minerAddr )
}
return nil
2019-08-20 17:19:24 +00:00
} ,
2019-09-17 14:23:08 +00:00
OnStop : func ( ctx context . Context ) error {
log . Infof ( "Unregistering miner '%s' from full node" , minerAddr )
return api . MinerUnregister ( ctx , minerAddr )
} ,
2019-08-20 17:19:24 +00:00
} )
return nil
}
2019-10-27 08:56:53 +00:00
2019-11-08 20:30:50 +00:00
func SectorBuilder ( lc fx . Lifecycle , cfg * sectorbuilder . Config , ds dtypes . MetadataDS ) ( * sectorbuilder . SectorBuilder , error ) {
sb , err := sectorbuilder . New ( cfg , ds )
if err != nil {
return nil , err
}
lc . Append ( fx . Hook {
OnStop : func ( context . Context ) error {
sb . Destroy ( )
return nil
} ,
} )
return sb , nil
}
2019-11-07 18:22:59 +00:00
func SealTicketGen ( api api . FullNode ) storage . TicketFn {
2019-10-27 08:56:53 +00:00
return func ( ctx context . Context ) ( * sectorbuilder . SealTicket , error ) {
ts , err := api . ChainHead ( ctx )
if err != nil {
return nil , xerrors . Errorf ( "getting head ts for SealTicket failed: %w" , err )
}
2019-11-08 05:36:50 +00:00
r , err := api . ChainGetRandomness ( ctx , ts . Key ( ) , nil , build . SealRandomnessLookback )
2019-10-27 08:56:53 +00:00
if err != nil {
return nil , xerrors . Errorf ( "getting randomness for SealTicket failed: %w" , err )
}
var tkt [ sectorbuilder . CommLen ] byte
if n := copy ( tkt [ : ] , r ) ; n != sectorbuilder . CommLen {
return nil , xerrors . Errorf ( "unexpected randomness len: %d (expected %d)" , n , sectorbuilder . CommLen )
}
return & sectorbuilder . SealTicket {
2019-11-01 23:43:54 +00:00
BlockHeight : ts . Height ( ) ,
2019-10-27 08:56:53 +00:00
TicketBytes : tkt ,
} , nil
}
}