2019-08-01 14:19:53 +00:00
package modules
import (
2020-11-11 05:06:50 +00:00
"bytes"
2019-08-01 14:19:53 +00:00
"context"
2020-06-11 19:59:50 +00:00
"errors"
2020-06-18 20:32:20 +00:00
"fmt"
2020-07-08 08:35:50 +00:00
"net/http"
2020-11-16 18:56:53 +00:00
"os"
"path/filepath"
2020-07-08 08:35:50 +00:00
"time"
2020-07-23 02:05:11 +00:00
"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
2020-03-18 19:43:06 +00:00
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
2020-06-18 20:15:18 +00:00
"github.com/ipfs/go-cid"
2020-03-18 19:43:06 +00:00
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
graphsync "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
2020-01-15 20:49:11 +00:00
"github.com/filecoin-project/go-address"
2020-07-08 08:35:50 +00:00
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
2020-01-24 20:19:52 +00:00
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
2020-09-29 11:53:30 +00:00
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
2020-01-15 20:49:11 +00:00
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
2020-01-24 20:19:52 +00:00
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
2020-09-29 11:53:30 +00:00
"github.com/filecoin-project/go-fil-markets/shared"
2020-01-15 20:49:11 +00:00
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
2020-05-20 22:46:44 +00:00
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
2020-02-06 02:43:37 +00:00
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
2020-05-20 18:23:51 +00:00
"github.com/filecoin-project/go-jsonrpc/auth"
2020-07-28 06:13:10 +00:00
"github.com/filecoin-project/go-multistore"
2020-01-02 19:08:49 +00:00
paramfetch "github.com/filecoin-project/go-paramfetch"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-09-21 22:52:33 +00:00
"github.com/filecoin-project/go-statestore"
2020-04-28 17:13:46 +00:00
"github.com/filecoin-project/go-storedcounter"
2020-05-19 23:24:59 +00:00
2020-11-11 05:06:50 +00:00
"github.com/filecoin-project/lotus/api"
2020-08-17 13:39:33 +00:00
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
2020-08-17 13:26:18 +00:00
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
2020-08-17 13:39:33 +00:00
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
2020-08-18 16:27:18 +00:00
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
2020-08-17 13:26:18 +00:00
2020-03-08 08:07:58 +00:00
lapi "github.com/filecoin-project/lotus/api"
2019-10-27 08:56:53 +00:00
"github.com/filecoin-project/lotus/build"
2020-10-08 01:09:33 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin"
2019-11-25 04:45:13 +00:00
"github.com/filecoin-project/lotus/chain/gen"
2020-08-06 01:16:54 +00:00
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
2020-03-08 08:07:58 +00:00
"github.com/filecoin-project/lotus/chain/types"
2020-09-21 22:52:33 +00:00
"github.com/filecoin-project/lotus/journal"
2020-07-23 02:05:11 +00:00
"github.com/filecoin-project/lotus/lib/blockstore"
2020-09-21 22:52:33 +00:00
"github.com/filecoin-project/lotus/markets"
2020-07-31 01:27:42 +00:00
marketevents "github.com/filecoin-project/lotus/markets/loggers"
2020-01-15 20:49:11 +00:00
"github.com/filecoin-project/lotus/markets/retrievaladapter"
2019-11-25 04:45:13 +00:00
"github.com/filecoin-project/lotus/miner"
2020-07-23 02:05:11 +00:00
"github.com/filecoin-project/lotus/node/config"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage"
2019-08-01 14:19:53 +00:00
)
2020-05-19 23:24:59 +00:00
var StorageCounterDSPrefix = "/storage/nextid"
2019-08-08 01:16:58 +00:00
func minerAddrFromDS ( ds dtypes . MetadataDS ) ( address . Address , error ) {
maddrb , err := ds . Get ( datastore . NewKey ( "miner-address" ) )
if err != nil {
return address . Undef , err
}
return address . NewFromBytes ( maddrb )
}
2020-11-05 06:44:46 +00:00
func GetParams ( spt abi . RegisteredSealProof ) error {
ssize , err := spt . SectorSize ( )
2020-02-27 21:45:31 +00:00
if err != nil {
return err
}
2020-06-22 18:41:52 +00:00
// If built-in assets are disabled, we expect the user to have placed the right
// parameters in the right location on the filesystem (/var/tmp/filecoin-proof-parameters).
if build . DisableBuiltinAssets {
return nil
}
2020-11-05 06:44:46 +00:00
// TODO: We should fetch the params for the actual proof type, not just based on the size.
2020-06-05 22:58:24 +00:00
if err := paramfetch . GetParams ( context . TODO ( ) , build . ParametersJSON ( ) , uint64 ( ssize ) ) ; err != nil {
2019-12-04 19:44:15 +00:00
return xerrors . Errorf ( "fetching proof parameters: %w" , err )
}
return nil
}
2020-03-18 01:08:11 +00:00
func MinerAddress ( ds dtypes . MetadataDS ) ( dtypes . MinerAddress , error ) {
ma , err := minerAddrFromDS ( ds )
return dtypes . MinerAddress ( ma ) , err
}
func MinerID ( ma dtypes . MinerAddress ) ( dtypes . MinerID , error ) {
id , err := address . IDFromAddress ( address . Address ( ma ) )
return dtypes . MinerID ( id ) , err
}
2019-12-06 00:27:32 +00:00
2020-03-31 23:13:37 +00:00
func StorageNetworkName ( ctx helpers . MetricsCtx , a lapi . FullNode ) ( dtypes . NetworkName , error ) {
2020-09-24 21:30:11 +00:00
if ! build . Devnet {
return "testnetnet" , nil
}
2020-03-31 23:13:37 +00:00
return a . StateNetworkName ( ctx )
}
2020-11-05 06:44:46 +00:00
func SealProofType ( maddr dtypes . MinerAddress , fnapi lapi . FullNode ) ( abi . RegisteredSealProof , error ) {
2020-04-16 17:36:36 +00:00
mi , err := fnapi . StateMinerInfo ( context . TODO ( ) , address . Address ( maddr ) , types . EmptyTSK )
2020-03-03 22:19:22 +00:00
if err != nil {
2020-11-05 06:44:46 +00:00
return 0 , err
2019-08-01 14:19:53 +00:00
}
2020-03-03 22:19:22 +00:00
2020-11-05 06:44:46 +00:00
return mi . SealProofType , nil
2019-08-01 14:19:53 +00:00
}
2020-03-05 00:38:07 +00:00
type sidsc struct {
sc * storedcounter . StoredCounter
}
func ( s * sidsc ) Next ( ) ( abi . SectorNumber , error ) {
i , err := s . sc . Next ( )
return abi . SectorNumber ( i ) , err
}
2020-03-17 20:19:52 +00:00
func SectorIDCounter ( ds dtypes . MetadataDS ) sealing . SectorIDCounter {
2020-05-19 23:24:59 +00:00
sc := storedcounter . New ( ds , datastore . NewKey ( StorageCounterDSPrefix ) )
2020-03-05 00:38:07 +00:00
return & sidsc { sc }
}
2020-07-20 13:45:17 +00:00
type StorageMinerParams struct {
fx . In
2020-08-26 15:38:23 +00:00
Lifecycle fx . Lifecycle
MetricsCtx helpers . MetricsCtx
API lapi . FullNode
Host host . Host
MetadataDS dtypes . MetadataDS
Sealer sectorstorage . SectorManager
SectorIDCounter sealing . SectorIDCounter
Verifier ffiwrapper . Verifier
GetSealingConfigFn dtypes . GetSealingConfigFunc
2020-10-09 19:52:04 +00:00
Journal journal . Journal
2020-08-26 15:38:23 +00:00
}
func StorageMiner ( fc config . MinerFeeConfig ) func ( params StorageMinerParams ) ( * storage . Miner , error ) {
return func ( params StorageMinerParams ) ( * storage . Miner , error ) {
var (
ds = params . MetadataDS
mctx = params . MetricsCtx
lc = params . Lifecycle
api = params . API
sealer = params . Sealer
h = params . Host
sc = params . SectorIDCounter
verif = params . Verifier
gsd = params . GetSealingConfigFn
2020-10-09 19:52:04 +00:00
j = params . Journal
2020-08-26 15:38:23 +00:00
)
2020-08-12 17:47:00 +00:00
maddr , err := minerAddrFromDS ( ds )
if err != nil {
return nil , err
}
ctx := helpers . LifecycleCtx ( mctx , lc )
2020-10-21 03:35:18 +00:00
fps , err := storage . NewWindowedPoStScheduler ( api , fc , sealer , sealer , j , maddr )
2020-08-12 17:47:00 +00:00
if err != nil {
return nil , err
}
2020-10-21 03:35:18 +00:00
sm , err := storage . NewMiner ( api , maddr , h , ds , sealer , sc , verif , gsd , fc , j )
2020-08-12 17:47:00 +00:00
if err != nil {
return nil , err
}
lc . Append ( fx . Hook {
OnStart : func ( context . Context ) error {
go fps . Run ( ctx )
return sm . Run ( ctx )
} ,
OnStop : sm . Stop ,
} )
return sm , nil
2020-04-10 21:29:05 +00:00
}
2019-08-01 14:19:53 +00:00
}
2019-08-02 16:25:10 +00:00
2020-10-09 19:52:04 +00:00
func HandleRetrieval ( host host . Host , lc fx . Lifecycle , m retrievalmarket . RetrievalProvider , j journal . Journal ) {
2020-09-29 11:53:30 +00:00
m . OnReady ( marketevents . ReadyLogger ( "retrieval provider" ) )
2019-08-26 13:45:36 +00:00
lc . Append ( fx . Hook {
2020-09-29 11:53:30 +00:00
OnStart : func ( ctx context . Context ) error {
2020-07-31 01:27:42 +00:00
m . SubscribeToEvents ( marketevents . RetrievalProviderLogger )
2020-09-14 15:20:01 +00:00
2020-10-09 19:52:04 +00:00
evtType := j . RegisterEventType ( "markets/retrieval/provider" , "state_change" )
m . SubscribeToEvents ( markets . RetrievalProviderJournaler ( j , evtType ) )
2020-09-14 15:20:01 +00:00
2020-09-29 11:53:30 +00:00
return m . Start ( ctx )
2020-01-24 20:19:52 +00:00
} ,
OnStop : func ( context . Context ) error {
2020-05-27 20:53:20 +00:00
return m . Stop ( )
2019-08-26 13:45:36 +00:00
} ,
} )
}
2020-10-09 19:52:04 +00:00
func HandleDeals ( mctx helpers . MetricsCtx , lc fx . Lifecycle , host host . Host , h storagemarket . StorageProvider , j journal . Journal ) {
2019-08-06 23:08:34 +00:00
ctx := helpers . LifecycleCtx ( mctx , lc )
2020-09-29 11:53:30 +00:00
h . OnReady ( marketevents . ReadyLogger ( "storage provider" ) )
2019-08-06 23:08:34 +00:00
lc . Append ( fx . Hook {
OnStart : func ( context . Context ) error {
2020-07-31 01:27:42 +00:00
h . SubscribeToEvents ( marketevents . StorageProviderLogger )
2020-09-14 15:20:01 +00:00
2020-10-09 19:52:04 +00:00
evtType := j . RegisterEventType ( "markets/storage/provider" , "state_change" )
h . SubscribeToEvents ( markets . StorageProviderJournaler ( j , evtType ) )
2020-09-14 15:20:01 +00:00
2020-05-27 20:53:20 +00:00
return h . Start ( ctx )
2019-08-06 23:08:34 +00:00
} ,
OnStop : func ( context . Context ) error {
2020-05-27 20:53:20 +00:00
return h . Stop ( )
2019-08-06 23:08:34 +00:00
} ,
} )
2019-08-02 16:25:10 +00:00
}
2019-08-06 22:04:21 +00:00
2020-11-11 05:06:50 +00:00
func HandleMigrateProviderFunds ( lc fx . Lifecycle , ds dtypes . MetadataDS , node api . FullNode , minerAddress dtypes . MinerAddress ) {
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
b , err := ds . Get ( datastore . NewKey ( "/marketfunds/provider" ) )
if err != nil {
if xerrors . Is ( err , datastore . ErrNotFound ) {
return nil
}
return err
}
var value abi . TokenAmount
if err = value . UnmarshalCBOR ( bytes . NewReader ( b ) ) ; err != nil {
return err
}
ts , err := node . ChainHead ( ctx )
if err != nil {
2020-11-12 16:15:05 +00:00
log . Errorf ( "provider funds migration - getting chain head: %w" , err )
return nil
2020-11-11 05:06:50 +00:00
}
mi , err := node . StateMinerInfo ( ctx , address . Address ( minerAddress ) , ts . Key ( ) )
if err != nil {
2020-11-12 16:15:05 +00:00
log . Errorf ( "provider funds migration - getting miner info %s: %w" , minerAddress , err )
return nil
2020-11-11 05:06:50 +00:00
}
_ , err = node . MarketReserveFunds ( ctx , mi . Worker , address . Address ( minerAddress ) , value )
if err != nil {
2020-11-12 16:15:05 +00:00
log . Errorf ( "provider funds migration - reserving funds (wallet %s, addr %s, funds %d): %w" ,
mi . Worker , minerAddress , value , err )
return nil
2020-11-11 05:06:50 +00:00
}
return ds . Delete ( datastore . NewKey ( "/marketfunds/provider" ) )
} ,
} )
}
2019-11-11 20:51:28 +00:00
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
2020-11-16 18:56:53 +00:00
func NewProviderDAGServiceDataTransfer ( lc fx . Lifecycle , h host . Host , gs dtypes . StagingGraphsync , ds dtypes . MetadataDS , r repo . LockedRepo ) ( dtypes . ProviderDataTransfer , error ) {
2020-05-05 01:31:56 +00:00
sc := storedcounter . New ( ds , datastore . NewKey ( "/datatransfer/provider/counter" ) )
2020-07-08 08:35:50 +00:00
net := dtnet . NewFromLibp2pHost ( h )
dtDs := namespace . Wrap ( ds , datastore . NewKey ( "/datatransfer/provider/transfers" ) )
transport := dtgstransport . NewTransport ( h . ID ( ) , gs )
2020-11-16 18:56:53 +00:00
err := os . MkdirAll ( filepath . Join ( r . Path ( ) , "data-transfer" ) , 0755 ) //nolint: gosec
if err != nil && ! os . IsExist ( err ) {
return nil , err
}
dt , err := dtimpl . NewDataTransfer ( dtDs , filepath . Join ( r . Path ( ) , "data-transfer" ) , net , transport , sc )
2020-07-08 08:35:50 +00:00
if err != nil {
return nil , err
}
2020-10-13 10:37:00 +00:00
dt . OnReady ( marketevents . ReadyLogger ( "provider data transfer" ) )
2020-07-08 08:35:50 +00:00
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
return dt . Start ( ctx )
} ,
2020-08-29 01:03:27 +00:00
OnStop : func ( ctx context . Context ) error {
return dt . Stop ( ctx )
2020-07-08 08:35:50 +00:00
} ,
} )
return dt , nil
2019-11-11 20:51:28 +00:00
}
2020-01-24 20:19:52 +00:00
// NewProviderPieceStore creates a statestore for storing metadata about pieces
// shared by the storage and retrieval providers
2020-09-29 11:53:30 +00:00
func NewProviderPieceStore ( lc fx . Lifecycle , ds dtypes . MetadataDS ) ( dtypes . ProviderPieceStore , error ) {
ps , err := piecestoreimpl . NewPieceStore ( namespace . Wrap ( ds , datastore . NewKey ( "/storagemarket" ) ) )
if err != nil {
return nil , err
}
ps . OnReady ( marketevents . ReadyLogger ( "piecestore" ) )
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
return ps . Start ( ctx )
} ,
} )
return ps , nil
2020-01-24 20:19:52 +00:00
}
2020-07-28 06:13:10 +00:00
func StagingMultiDatastore ( lc fx . Lifecycle , r repo . LockedRepo ) ( dtypes . StagingMultiDstore , error ) {
ds , err := r . Datastore ( "/staging" )
if err != nil {
return nil , xerrors . Errorf ( "getting datastore out of reop: %w" , err )
}
mds , err := multistore . NewMultiDstore ( ds )
if err != nil {
return nil , err
}
lc . Append ( fx . Hook {
OnStop : func ( ctx context . Context ) error {
return mds . Close ( )
} ,
} )
return mds , nil
}
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
// StagingBlockstore creates a blockstore for staging blocks for a miner
// in a storage deal, prior to sealing
func StagingBlockstore ( r repo . LockedRepo ) ( dtypes . StagingBlockstore , error ) {
2019-08-06 22:04:21 +00:00
stagingds , err := r . Datastore ( "/staging" )
if err != nil {
return nil , err
}
2020-07-23 02:05:11 +00:00
return blockstore . NewBlockstore ( stagingds ) , nil
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
}
// StagingDAG is a DAGService for the StagingBlockstore
func StagingDAG ( mctx helpers . MetricsCtx , lc fx . Lifecycle , ibs dtypes . StagingBlockstore , rt routing . Routing , h host . Host ) ( dtypes . StagingDAG , error ) {
2019-08-06 22:04:21 +00:00
bitswapNetwork := network . NewFromIpfsHost ( h , rt )
2020-04-27 22:54:16 +00:00
bitswapOptions := [ ] bitswap . Option { bitswap . ProvideEnabled ( false ) }
2020-08-31 17:24:04 +00:00
exch := bitswap . New ( mctx , bitswapNetwork , ibs , bitswapOptions ... )
2019-08-06 22:04:21 +00:00
bsvc := blockservice . New ( ibs , exch )
dag := merkledag . NewDAGService ( bsvc )
lc . Append ( fx . Hook {
OnStop : func ( _ context . Context ) error {
2020-08-31 17:24:04 +00:00
// blockservice closes the exchange
2019-08-06 22:04:21 +00:00
return bsvc . Close ( )
} ,
} )
return dag , nil
}
2019-08-20 17:19:24 +00:00
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
// StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore
func StagingGraphsync ( mctx helpers . MetricsCtx , lc fx . Lifecycle , ibs dtypes . StagingBlockstore , h host . Host ) dtypes . StagingGraphsync {
graphsyncNetwork := gsnet . NewFromLibp2pHost ( h )
loader := storeutil . LoaderForBlockstore ( ibs )
storer := storeutil . StorerForBlockstore ( ibs )
2020-04-07 02:17:02 +00:00
gs := graphsync . New ( helpers . LifecycleCtx ( mctx , lc ) , graphsyncNetwork , loader , storer , graphsync . RejectAllRequestsByDefault ( ) )
feat(datatransfer): implement and extract
feat(datatransfer): setup implementation path
Sets up a path to implementation, offering both the dagservice implementation and a future graphsync
implement, establishes message interfaces and network layer, and isolates the datatransfer module
from the app
WIP using CBOR encoding for dataxfermsg
* Bring cbor-gen stuff into datatransfer package
* make transferRequest private struct
* add transferResponse + funcs
* Rename VoucherID to VoucherType
* more tests passing
WIP trying out some stuff
* Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway
* get rid of pb stuff
all message tests passing, some others in datatransfer
Some cleanup for PR
Cleanup for PR, clarifying and additional comments
mod tidy
Respond to PR comments:
* Make DataTransferRequest/Response be returned in from Net
* Regenerate cbor_gen and fix the generator caller so it works better
* Please the linters
Fix tests
Initiate push and pull requests (#536)
* add issue link for data TransferID generation
* comment out failing but not relevant tests
* finish voucher rename from Identifier --> Type
tests passing
cleanup for PR
remove unused fmt import in graphsync_test
a better reflection
send data transfer response
other tests passing
feat(datatransfer): milestone 2 infrastructure
Setup test path for all tickets for milestone 2
responses alert subscribers when request is not accepted (#607)
Graphsync response is scheduled when a valid push request is received (#625)
fix(datatransfer): fix tests
fix an error with read buffers in tests
fix(deps): fix go.sum
Feat/dt graphsync pullreqs (#627)
* graphsync responses to pull requests
Feat/dt initiator cleanup (#645)
* ChannelID.To --> ChannelID.Initiator
* We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs.
* InProgressChannels returns all of impl.channels, currently just for testing
* Implements go-data-transfer issue
* Some assertions were changed based on the above.
* Renamed some variables and added some assertions based on the new understanding
* Updated SHA for graphsync module
* Updated fakeGraphSync test structs to use new interfaces from new SHA above
Techdebt/dt split graphsync impl receiver (#651)
* Split up graphsyncImpl and graphsyncReceiver
* rename graphsync to utils
DTM sends data over graphsync for validated push requests (#665)
* create channels when a request is received. register push request hook with graphsync. fix tests.
* better NewReaders
* use mutex lock around impl.channels access
* fix(datatransfer): fix test uncertainty
* fix a data race and also don't use random bytes in basic block which can fail
* privatize 3 funcs
with @hannahhoward
Feat/dt gs pullrequests (#693)
* Implements DTM Sends Data Over Graphsync For Validated Pull Requests
* rename a field in a test struct
* refactor a couple of private functions (one was refactored out of existence)
Feat/dt subscribe, file Xfer round trip (#720)
Implements the rest of Subscriber Is Notified When Request Completed #24:
* send a graphsync message within a go func and consume responses until error or transfer is complete.
* notify subscribers of results.
* Rename datatransfer.Event to EventCode.
* datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses
* Add extension data to graphsync request hook, gsReq
* rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync
* use a mutex lock for last transfer ID
* obey the linter
Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754)
* update to correct graphsync version, update tests & code to call the new graphsync hooks
* getExtensionData returns an empty struct + nil if we can't find our extension
* Don't respond with error when we can't find the extension.
* Test for same
* mod tidy
minor fix to go.sum
feat(datatransfer): switch to graphsync implementation
Move over to real graphsync implementation of data transfer, add constructors for graphsync
instances on client and miner side
fix(datatransfer): Fix validators
Validators were checking payload cid against commP -- which are not the same any more. Added a
payloadCid to client deal to maintain the record, fixed validator logic
Feat/dt extraction use go-fil-components/datatransfer (#770)
* Initial commit after changing to go-fil-components/datatransfer
* blow away the datatransfer dir
* use go-fil-components master after its PR #1 was merged
* go mod tidy
use a package
updates after rebase with master
2019-10-30 02:42:16 +00:00
return gs
}
2020-10-09 19:52:04 +00:00
func SetupBlockProducer ( lc fx . Lifecycle , ds dtypes . MetadataDS , api lapi . FullNode , epp gen . WinningPoStProver , sf * slashfilter . SlashFilter , j journal . Journal ) ( * miner . Miner , error ) {
2019-08-20 17:19:24 +00:00
minerAddr , err := minerAddrFromDS ( ds )
if err != nil {
2019-11-25 04:45:13 +00:00
return nil , err
2019-08-20 17:19:24 +00:00
}
2020-10-09 19:52:04 +00:00
m := miner . NewMiner ( api , epp , minerAddr , sf , j )
2019-11-25 04:45:13 +00:00
2019-08-20 17:19:24 +00:00
lc . Append ( fx . Hook {
OnStart : func ( ctx context . Context ) error {
2020-05-05 19:01:44 +00:00
if err := m . Start ( ctx ) ; err != nil {
2019-11-25 04:45:13 +00:00
return err
2019-10-31 22:04:13 +00:00
}
return nil
2019-08-20 17:19:24 +00:00
} ,
2019-09-17 14:23:08 +00:00
OnStop : func ( ctx context . Context ) error {
2020-05-05 19:01:44 +00:00
return m . Stop ( ctx )
2019-09-17 14:23:08 +00:00
} ,
2019-08-20 17:19:24 +00:00
} )
2019-11-25 04:45:13 +00:00
return m , nil
2019-08-20 17:19:24 +00:00
}
2019-10-27 08:56:53 +00:00
2020-05-20 22:46:44 +00:00
func NewStorageAsk ( ctx helpers . MetricsCtx , fapi lapi . FullNode , ds dtypes . MetadataDS , minerAddress dtypes . MinerAddress , spn storagemarket . StorageProviderNode ) ( * storedask . StoredAsk , error ) {
2020-02-27 21:45:31 +00:00
2020-05-20 22:46:44 +00:00
mi , err := fapi . StateMinerInfo ( ctx , address . Address ( minerAddress ) , types . EmptyTSK )
2020-02-06 02:43:37 +00:00
if err != nil {
return nil , err
}
2020-02-27 21:45:31 +00:00
2020-09-29 11:53:30 +00:00
providerDs := namespace . Wrap ( ds , datastore . NewKey ( "/deals/provider" ) )
// legacy this was mistake where this key was place -- so we move the legacy key if need be
err = shared . MoveKey ( providerDs , "/latest-ask" , "/storage-ask/latest" )
if err != nil {
return nil , err
}
2020-10-14 05:47:44 +00:00
storedAsk , err := storedask . NewStoredAsk ( namespace . Wrap ( providerDs , datastore . NewKey ( "/storage-ask" ) ) , datastore . NewKey ( "latest" ) , spn , address . Address ( minerAddress ) ,
storagemarket . MaxPieceSize ( abi . PaddedPieceSize ( mi . SectorSize ) ) )
2020-05-29 00:05:17 +00:00
if err != nil {
return nil , err
}
2020-02-27 21:45:31 +00:00
if err != nil {
2020-05-20 22:46:44 +00:00
return storedAsk , err
2020-02-27 21:45:31 +00:00
}
2020-05-20 22:46:44 +00:00
return storedAsk , nil
}
2020-02-27 21:45:31 +00:00
2020-10-15 16:24:48 +00:00
func BasicDealFilter ( user dtypes . StorageDealFilter ) func ( onlineOk dtypes . ConsiderOnlineStorageDealsConfigFunc ,
2020-07-30 17:36:31 +00:00
offlineOk dtypes . ConsiderOfflineStorageDealsConfigFunc ,
blocklistFunc dtypes . StorageDealPieceCidBlocklistConfigFunc ,
expectedSealTimeFunc dtypes . GetExpectedSealDurationFunc ,
2020-10-15 16:24:48 +00:00
spn storagemarket . StorageProviderNode ) dtypes . StorageDealFilter {
2020-07-30 17:36:31 +00:00
return func ( onlineOk dtypes . ConsiderOnlineStorageDealsConfigFunc ,
offlineOk dtypes . ConsiderOfflineStorageDealsConfigFunc ,
blocklistFunc dtypes . StorageDealPieceCidBlocklistConfigFunc ,
expectedSealTimeFunc dtypes . GetExpectedSealDurationFunc ,
2020-10-15 16:24:48 +00:00
spn storagemarket . StorageProviderNode ) dtypes . StorageDealFilter {
2020-07-30 17:36:31 +00:00
return func ( ctx context . Context , deal storagemarket . MinerDeal ) ( bool , string , error ) {
b , err := onlineOk ( )
if err != nil {
return false , "miner error" , err
}
if deal . Ref != nil && deal . Ref . TransferType != storagemarket . TTManual && ! b {
log . Warnf ( "online storage deal consideration disabled; rejecting storage deal proposal from client: %s" , deal . Client . String ( ) )
return false , "miner is not considering online storage deals" , nil
}
b , err = offlineOk ( )
if err != nil {
return false , "miner error" , err
}
if deal . Ref != nil && deal . Ref . TransferType == storagemarket . TTManual && ! b {
log . Warnf ( "offline storage deal consideration disabled; rejecting storage deal proposal from client: %s" , deal . Client . String ( ) )
return false , "miner is not accepting offline storage deals" , nil
}
blocklist , err := blocklistFunc ( )
if err != nil {
return false , "miner error" , err
}
for idx := range blocklist {
if deal . Proposal . PieceCID . Equals ( blocklist [ idx ] ) {
log . Warnf ( "piece CID in proposal %s is blocklisted; rejecting storage deal proposal from client: %s" , deal . Proposal . PieceCID , deal . Client . String ( ) )
return false , fmt . Sprintf ( "miner has blocklisted piece CID %s" , deal . Proposal . PieceCID ) , nil
}
}
sealDuration , err := expectedSealTimeFunc ( )
if err != nil {
return false , "miner error" , err
}
sealEpochs := sealDuration / ( time . Duration ( build . BlockDelaySecs ) * time . Second )
_ , ht , err := spn . GetChainHead ( ctx )
if err != nil {
return false , "failed to get chain head" , err
}
earliest := abi . ChainEpoch ( sealEpochs ) + ht
if deal . Proposal . StartEpoch < earliest {
log . Warnw ( "proposed deal would start before sealing can be completed; rejecting storage deal proposal from client" , "piece_cid" , deal . Proposal . PieceCID , "client" , deal . Client . String ( ) , "seal_duration" , sealDuration , "earliest" , earliest , "curepoch" , ht )
return false , fmt . Sprintf ( "cannot seal a sector before %s" , deal . Proposal . StartEpoch ) , nil
}
2020-10-05 19:35:58 +00:00
// Reject if it's more than 7 days in the future
2020-10-05 17:32:49 +00:00
// TODO: read from cfg
2020-10-15 04:28:36 +00:00
maxStartEpoch := earliest + abi . ChainEpoch ( 7 * builtin . SecondsInDay / build . BlockDelaySecs )
2020-10-05 17:32:49 +00:00
if deal . Proposal . StartEpoch > maxStartEpoch {
return false , fmt . Sprintf ( "deal start epoch is too far in the future: %s > %s" , deal . Proposal . StartEpoch , maxStartEpoch ) , nil
}
2020-07-30 17:36:31 +00:00
if user != nil {
return user ( ctx , deal )
}
return true , "" , nil
}
}
}
2020-07-12 17:54:53 +00:00
func StorageProvider ( minerAddress dtypes . MinerAddress ,
storedAsk * storedask . StoredAsk ,
h host . Host , ds dtypes . MetadataDS ,
2020-07-28 06:13:10 +00:00
mds dtypes . StagingMultiDstore ,
2020-07-12 17:54:53 +00:00
r repo . LockedRepo ,
pieceStore dtypes . ProviderPieceStore ,
dataTransfer dtypes . ProviderDataTransfer ,
spn storagemarket . StorageProviderNode ,
2020-10-15 16:24:48 +00:00
df dtypes . StorageDealFilter ,
2020-07-30 17:36:31 +00:00
) ( storagemarket . StorageProvider , error ) {
2020-05-20 22:46:44 +00:00
net := smnet . NewFromLibp2pHost ( h )
store , err := piecefilestore . NewLocalFileStore ( piecefilestore . OsPath ( r . Path ( ) ) )
2020-02-27 21:45:31 +00:00
if err != nil {
return nil , err
}
2020-07-30 17:36:31 +00:00
opt := storageimpl . CustomDealDecisionLogic ( storageimpl . DealDeciderFunc ( df ) )
2020-06-11 18:29:59 +00:00
2020-11-13 17:15:23 +00:00
return storageimpl . NewProvider ( net , namespace . Wrap ( ds , datastore . NewKey ( "/deals/provider" ) ) , store , mds , pieceStore , dataTransfer , spn , address . Address ( minerAddress ) , storedAsk , opt )
2019-12-17 10:46:39 +00:00
}
2020-10-15 16:24:48 +00:00
func RetrievalDealFilter ( userFilter dtypes . RetrievalDealFilter ) func ( onlineOk dtypes . ConsiderOnlineRetrievalDealsConfigFunc ,
offlineOk dtypes . ConsiderOfflineRetrievalDealsConfigFunc ) dtypes . RetrievalDealFilter {
return func ( onlineOk dtypes . ConsiderOnlineRetrievalDealsConfigFunc ,
offlineOk dtypes . ConsiderOfflineRetrievalDealsConfigFunc ) dtypes . RetrievalDealFilter {
return func ( ctx context . Context , state retrievalmarket . ProviderDealState ) ( bool , string , error ) {
b , err := onlineOk ( )
if err != nil {
return false , "miner error" , err
}
if ! b {
log . Warn ( "online retrieval deal consideration disabled; rejecting retrieval deal proposal from client" )
return false , "miner is not accepting online retrieval deals" , nil
}
b , err = offlineOk ( )
if err != nil {
return false , "miner error" , err
}
if ! b {
log . Info ( "offline retrieval has not been implemented yet" )
}
if userFilter != nil {
return userFilter ( ctx , state )
}
return true , "" , nil
}
}
}
2019-12-10 04:19:59 +00:00
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
2020-10-15 16:24:48 +00:00
func RetrievalProvider ( h host . Host ,
miner * storage . Miner ,
sealer sectorstorage . SectorManager ,
full lapi . FullNode ,
ds dtypes . MetadataDS ,
pieceStore dtypes . ProviderPieceStore ,
mds dtypes . StagingMultiDstore ,
dt dtypes . ProviderDataTransfer ,
onlineOk dtypes . ConsiderOnlineRetrievalDealsConfigFunc ,
offlineOk dtypes . ConsiderOfflineRetrievalDealsConfigFunc ,
userFilter dtypes . RetrievalDealFilter ,
) ( retrievalmarket . RetrievalProvider , error ) {
2020-03-03 22:19:22 +00:00
adapter := retrievaladapter . NewRetrievalProviderNode ( miner , sealer , full )
2020-06-23 16:04:46 +00:00
maddr , err := minerAddrFromDS ( ds )
2020-01-24 20:19:52 +00:00
if err != nil {
return nil , err
}
2020-06-23 16:04:46 +00:00
netwk := rmnet . NewFromLibp2pHost ( h )
2020-10-15 16:24:48 +00:00
opt := retrievalimpl . DealDeciderOpt ( retrievalimpl . DealDecider ( userFilter ) )
2020-06-23 16:04:46 +00:00
2020-07-28 06:13:10 +00:00
return retrievalimpl . NewProvider ( maddr , adapter , netwk , pieceStore , mds , dt , namespace . Wrap ( ds , datastore . NewKey ( "/retrievals/provider" ) ) , opt )
2019-12-10 04:19:59 +00:00
}
2020-03-24 18:00:08 +00:00
2020-09-14 07:44:55 +00:00
var WorkerCallsPrefix = datastore . NewKey ( "/worker/calls" )
2020-09-16 20:33:49 +00:00
var ManagerWorkPrefix = datastore . NewKey ( "/stmgr/calls" )
2020-09-14 07:44:55 +00:00
2020-11-05 06:44:46 +00:00
func SectorStorage ( mctx helpers . MetricsCtx , lc fx . Lifecycle , ls stores . LocalStorage , si stores . SectorIndex , sc sectorstorage . SealerConfig , urls sectorstorage . URLs , sa sectorstorage . StorageAuth , ds dtypes . MetadataDS ) ( * sectorstorage . Manager , error ) {
2020-03-24 18:00:08 +00:00
ctx := helpers . LifecycleCtx ( mctx , lc )
2020-09-14 07:44:55 +00:00
wsts := statestore . New ( namespace . Wrap ( ds , WorkerCallsPrefix ) )
2020-09-16 20:33:49 +00:00
smsts := statestore . New ( namespace . Wrap ( ds , ManagerWorkPrefix ) )
2020-09-14 07:44:55 +00:00
2020-11-05 06:44:46 +00:00
sst , err := sectorstorage . New ( ctx , ls , si , sc , urls , sa , wsts , smsts )
2020-03-24 23:49:45 +00:00
if err != nil {
return nil , err
}
lc . Append ( fx . Hook {
2020-07-17 22:31:14 +00:00
OnStop : sst . Close ,
2020-03-24 23:49:45 +00:00
} )
return sst , nil
2020-03-24 18:00:08 +00:00
}
2020-03-27 20:08:06 +00:00
2020-08-17 13:26:18 +00:00
func StorageAuth ( ctx helpers . MetricsCtx , ca lapi . Common ) ( sectorstorage . StorageAuth , error ) {
2020-05-20 18:23:51 +00:00
token , err := ca . AuthNew ( ctx , [ ] auth . Permission { "admin" } )
2020-03-27 20:08:06 +00:00
if err != nil {
return nil , xerrors . Errorf ( "creating storage auth header: %w" , err )
}
headers := http . Header { }
headers . Add ( "Authorization" , "Bearer " + string ( token ) )
2020-08-17 13:26:18 +00:00
return sectorstorage . StorageAuth ( headers ) , nil
2020-03-27 20:08:06 +00:00
}
2020-06-11 18:29:59 +00:00
2020-06-26 19:27:41 +00:00
func NewConsiderOnlineStorageDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOnlineStorageDealsConfigFunc , error ) {
2020-06-23 16:04:46 +00:00
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
out = cfg . Dealmaking . ConsiderOnlineStorageDeals
2020-06-23 16:04:46 +00:00
} )
return
} , nil
}
2020-06-26 19:27:41 +00:00
func NewSetConsideringOnlineStorageDealsFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOnlineStorageDealsConfigFunc , error ) {
2020-06-23 16:04:46 +00:00
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
cfg . Dealmaking . ConsiderOnlineStorageDeals = b
2020-06-23 16:04:46 +00:00
} )
return
} , nil
}
2020-06-26 19:27:41 +00:00
func NewConsiderOnlineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOnlineRetrievalDealsConfigFunc , error ) {
2020-06-18 20:15:18 +00:00
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
out = cfg . Dealmaking . ConsiderOnlineRetrievalDeals
2020-06-18 20:15:18 +00:00
} )
return
2020-06-11 18:29:59 +00:00
} , nil
}
2020-06-11 19:59:50 +00:00
2020-06-26 19:27:41 +00:00
func NewSetConsiderOnlineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOnlineRetrievalDealsConfigFunc , error ) {
2020-06-18 20:15:18 +00:00
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-26 19:27:41 +00:00
cfg . Dealmaking . ConsiderOnlineRetrievalDeals = b
2020-06-11 19:59:50 +00:00
} )
2020-06-18 20:15:18 +00:00
return
} , nil
}
2020-06-11 19:59:50 +00:00
2020-06-18 22:42:24 +00:00
func NewStorageDealPieceCidBlocklistConfigFunc ( r repo . LockedRepo ) ( dtypes . StorageDealPieceCidBlocklistConfigFunc , error ) {
2020-06-18 20:15:18 +00:00
return func ( ) ( out [ ] cid . Cid , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-18 22:42:24 +00:00
out = cfg . Dealmaking . PieceCidBlocklist
2020-06-18 20:15:18 +00:00
} )
return
2020-06-11 19:59:50 +00:00
} , nil
}
2020-06-18 20:15:18 +00:00
2020-06-18 22:42:24 +00:00
func NewSetStorageDealPieceCidBlocklistConfigFunc ( r repo . LockedRepo ) ( dtypes . SetStorageDealPieceCidBlocklistConfigFunc , error ) {
return func ( blocklist [ ] cid . Cid ) ( err error ) {
2020-06-18 20:15:18 +00:00
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
2020-06-18 22:42:24 +00:00
cfg . Dealmaking . PieceCidBlocklist = blocklist
2020-06-18 20:15:18 +00:00
} )
return
} , nil
}
2020-06-26 19:27:41 +00:00
func NewConsiderOfflineStorageDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOfflineStorageDealsConfigFunc , error ) {
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = cfg . Dealmaking . ConsiderOfflineStorageDeals
} )
return
} , nil
}
func NewSetConsideringOfflineStorageDealsFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOfflineStorageDealsConfigFunc , error ) {
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ConsiderOfflineStorageDeals = b
} )
return
} , nil
}
func NewConsiderOfflineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . ConsiderOfflineRetrievalDealsConfigFunc , error ) {
return func ( ) ( out bool , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = cfg . Dealmaking . ConsiderOfflineRetrievalDeals
} )
return
} , nil
}
func NewSetConsiderOfflineRetrievalDealsConfigFunc ( r repo . LockedRepo ) ( dtypes . SetConsiderOfflineRetrievalDealsConfigFunc , error ) {
return func ( b bool ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ConsiderOfflineRetrievalDeals = b
} )
return
} , nil
}
2020-08-18 14:20:31 +00:00
func NewSetSealConfigFunc ( r repo . LockedRepo ) ( dtypes . SetSealingConfigFunc , error ) {
2020-08-18 16:27:18 +00:00
return func ( cfg sealiface . Config ) ( err error ) {
2020-08-18 14:20:31 +00:00
err = mutateCfg ( r , func ( c * config . StorageMiner ) {
c . Sealing = config . SealingConfig {
2020-11-16 23:28:49 +00:00
MaxWaitDealsSectors : cfg . MaxWaitDealsSectors ,
MaxSealingSectors : cfg . MaxSealingSectors ,
MaxSealingSectorsForDeals : cfg . MaxSealingSectorsForDeals ,
WaitDealsDelay : config . Duration ( cfg . WaitDealsDelay ) ,
2020-08-18 14:20:31 +00:00
}
2020-07-06 18:39:26 +00:00
} )
return
} , nil
}
2020-08-18 14:20:31 +00:00
func NewGetSealConfigFunc ( r repo . LockedRepo ) ( dtypes . GetSealingConfigFunc , error ) {
2020-08-18 16:27:18 +00:00
return func ( ) ( out sealiface . Config , err error ) {
2020-07-06 18:39:26 +00:00
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
2020-08-18 16:27:18 +00:00
out = sealiface . Config {
2020-08-18 17:52:20 +00:00
MaxWaitDealsSectors : cfg . Sealing . MaxWaitDealsSectors ,
MaxSealingSectors : cfg . Sealing . MaxSealingSectors ,
MaxSealingSectorsForDeals : cfg . Sealing . MaxSealingSectorsForDeals ,
WaitDealsDelay : time . Duration ( cfg . Sealing . WaitDealsDelay ) ,
2020-08-18 14:20:31 +00:00
}
2020-07-06 18:39:26 +00:00
} )
return
} , nil
}
2020-07-12 17:54:53 +00:00
func NewSetExpectedSealDurationFunc ( r repo . LockedRepo ) ( dtypes . SetExpectedSealDurationFunc , error ) {
return func ( delay time . Duration ) ( err error ) {
err = mutateCfg ( r , func ( cfg * config . StorageMiner ) {
cfg . Dealmaking . ExpectedSealDuration = config . Duration ( delay )
} )
return
} , nil
}
func NewGetExpectedSealDurationFunc ( r repo . LockedRepo ) ( dtypes . GetExpectedSealDurationFunc , error ) {
return func ( ) ( out time . Duration , err error ) {
err = readCfg ( r , func ( cfg * config . StorageMiner ) {
out = time . Duration ( cfg . Dealmaking . ExpectedSealDuration )
} )
return
} , nil
}
2020-06-18 20:15:18 +00:00
func readCfg ( r repo . LockedRepo , accessor func ( * config . StorageMiner ) ) error {
raw , err := r . Config ( )
if err != nil {
return err
}
cfg , ok := raw . ( * config . StorageMiner )
if ! ok {
return xerrors . New ( "expected address of config.StorageMiner" )
}
accessor ( cfg )
return nil
}
func mutateCfg ( r repo . LockedRepo , mutator func ( * config . StorageMiner ) ) error {
var typeErr error
setConfigErr := r . SetConfig ( func ( raw interface { } ) {
cfg , ok := raw . ( * config . StorageMiner )
if ! ok {
2020-07-11 08:55:13 +00:00
typeErr = errors . New ( "expected miner config" )
2020-06-18 20:15:18 +00:00
return
}
mutator ( cfg )
} )
return multierr . Combine ( typeErr , setConfigErr )
}