Merge pull request #6983 from filecoin-project/raulk/dagstore-api-cli

dagstore: add JSON-RPC operations and cli.
This commit is contained in:
raulk 2021-08-05 14:26:12 +01:00 committed by GitHub
commit a670359d78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 723 additions and 10 deletions

View File

@ -13,13 +13,14 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -166,6 +167,48 @@ type StorageMiner interface {
MarketPendingDeals(ctx context.Context) (PendingDealInfo, error) //perm:write MarketPendingDeals(ctx context.Context) (PendingDealInfo, error) //perm:write
MarketPublishPendingDeals(ctx context.Context) error //perm:admin MarketPublishPendingDeals(ctx context.Context) error //perm:admin
// DagstoreListShards returns information about all shards known to the
// DAG store. Only available on nodes running the markets subsystem.
DagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read
// DagstoreInitializeShard initializes an uninitialized shard.
//
// Initialization consists of fetching the shard's data (deal payload) from
// the storage subsystem, generating an index, and persisting the index
// to facilitate later retrievals, and/or to publish to external sources.
//
// This operation is intended to complement the initial migration. The
// migration registers a shard for every unique piece CID, with lazy
// initialization. Thus, shards are not initialized immediately to avoid
// IO activity competing with proving. Instead, shard are initialized
// when first accessed. This method forces the initialization of a shard by
// accessing it and immediately releasing it. This is useful to warm up the
// cache to facilitate subsequent retrievals, and to generate the indexes
// to publish them externally.
//
// This operation fails if the shard is not in ShardStateNew state.
// It blocks until initialization finishes.
DagstoreInitializeShard(ctx context.Context, key string) error //perm:write
// DagstoreRecoverShard attempts to recover a failed shard.
//
// This operation fails if the shard is not in ShardStateErrored state.
// It blocks until recovery finishes. If recovery failed, it returns the
// error.
DagstoreRecoverShard(ctx context.Context, key string) error //perm:write
// DagstoreInitializeAll initializes all uninitialized shards in bulk,
// according to the policy passed in the parameters.
//
// It is recommended to set a maximum concurrency to avoid extreme
// IO pressure if the storage subsystem has a large amount of deals.
//
// It returns the result for each shard it attempted to initialize.
DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) //perm:write
// DagstoreGC runs garbage collection on the DAG store.
DagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin
// RuntimeSubsystems returns the subsystems that are enabled // RuntimeSubsystems returns the subsystems that are enabled
// in this instance. // in this instance.
RuntimeSubsystems(ctx context.Context) (MinerSubsystems, error) //perm:read RuntimeSubsystems(ctx context.Context) (MinerSubsystems, error) //perm:read
@ -336,3 +379,23 @@ type DealSchedule struct {
StartEpoch abi.ChainEpoch StartEpoch abi.ChainEpoch
EndEpoch abi.ChainEpoch EndEpoch abi.ChainEpoch
} }
// DagstoreShardInfo is the serialized form of dagstore.DagstoreShardInfo that
// we expose through JSON-RPC to avoid clients having to depend on the
// dagstore lib.
type DagstoreShardInfo struct {
Key string
State string
Error string
}
// DagstoreShardResult enumerates results per shard.
type DagstoreShardResult struct {
Key string
Success bool
Error string
}
type DagstoreInitializeAllParams struct {
MaxConcurrency int
}

View File

@ -76,7 +76,7 @@ func TestReturnTypes(t *testing.T) {
seen[typ] = struct{}{} seen[typ] = struct{}{}
if typ.Kind() == reflect.Interface && typ != bareIface && !typ.Implements(jmarsh) { if typ.Kind() == reflect.Interface && typ != bareIface && !typ.Implements(jmarsh) {
t.Error("methods can't return interfaces", m.Name) t.Error("methods can't return interfaces or struct types not implementing json.Marshaller", m.Name)
} }
switch typ.Kind() { switch typ.Kind() {

View File

@ -14,7 +14,6 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore" "github.com/ipfs/go-filestore"
@ -25,6 +24,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/node/repo/importmgr"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
filestore2 "github.com/filecoin-project/go-fil-markets/filestore" filestore2 "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -275,6 +276,15 @@ func init() {
api.SubsystemSectorStorage, api.SubsystemSectorStorage,
api.SubsystemMarkets, api.SubsystemMarkets,
}) })
addExample(api.DagstoreShardResult{
Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
Error: "<error>",
})
addExample(api.DagstoreShardInfo{
Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
State: "ShardStateAvailable",
Error: "<error>",
})
} }
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) { func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -603,6 +603,16 @@ type StorageMinerStruct struct {
CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"` CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"`
DagstoreGC func(p0 context.Context) ([]DagstoreShardResult, error) `perm:"admin"`
DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) `perm:"write"`
DagstoreInitializeShard func(p0 context.Context, p1 string) error `perm:"write"`
DagstoreListShards func(p0 context.Context) ([]DagstoreShardInfo, error) `perm:"read"`
DagstoreRecoverShard func(p0 context.Context, p1 string) error `perm:"write"`
DealsConsiderOfflineRetrievalDeals func(p0 context.Context) (bool, error) `perm:"admin"` DealsConsiderOfflineRetrievalDeals func(p0 context.Context) (bool, error) `perm:"admin"`
DealsConsiderOfflineStorageDeals func(p0 context.Context) (bool, error) `perm:"admin"` DealsConsiderOfflineStorageDeals func(p0 context.Context) (bool, error) `perm:"admin"`
@ -3569,6 +3579,61 @@ func (s *StorageMinerStub) CreateBackup(p0 context.Context, p1 string) error {
return ErrNotSupported return ErrNotSupported
} }
func (s *StorageMinerStruct) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) {
if s.Internal.DagstoreGC == nil {
return *new([]DagstoreShardResult), ErrNotSupported
}
return s.Internal.DagstoreGC(p0)
}
func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) {
return *new([]DagstoreShardResult), ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) {
if s.Internal.DagstoreInitializeAll == nil {
return nil, ErrNotSupported
}
return s.Internal.DagstoreInitializeAll(p0, p1)
}
func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) {
return nil, ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreInitializeShard(p0 context.Context, p1 string) error {
if s.Internal.DagstoreInitializeShard == nil {
return ErrNotSupported
}
return s.Internal.DagstoreInitializeShard(p0, p1)
}
func (s *StorageMinerStub) DagstoreInitializeShard(p0 context.Context, p1 string) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreListShards(p0 context.Context) ([]DagstoreShardInfo, error) {
if s.Internal.DagstoreListShards == nil {
return *new([]DagstoreShardInfo), ErrNotSupported
}
return s.Internal.DagstoreListShards(p0)
}
func (s *StorageMinerStub) DagstoreListShards(p0 context.Context) ([]DagstoreShardInfo, error) {
return *new([]DagstoreShardInfo), ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreRecoverShard(p0 context.Context, p1 string) error {
if s.Internal.DagstoreRecoverShard == nil {
return ErrNotSupported
}
return s.Internal.DagstoreRecoverShard(p0, p1)
}
func (s *StorageMinerStub) DagstoreRecoverShard(p0 context.Context, p1 string) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) DealsConsiderOfflineRetrievalDeals(p0 context.Context) (bool, error) { func (s *StorageMinerStruct) DealsConsiderOfflineRetrievalDeals(p0 context.Context) (bool, error) {
if s.Internal.DealsConsiderOfflineRetrievalDeals == nil { if s.Internal.DealsConsiderOfflineRetrievalDeals == nil {
return false, ErrNotSupported return false, ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

181
cmd/lotus-miner/dagstore.go Normal file
View File

@ -0,0 +1,181 @@
package main
import (
"fmt"
"os"
"github.com/fatih/color"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/lotus/api"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/tablewriter"
)
var dagstoreCmd = &cli.Command{
Name: "dagstore",
Usage: "Manage the dagstore on the markets subsystem",
Subcommands: []*cli.Command{
dagstoreListShardsCmd,
dagstoreInitializeShardCmd,
dagstoreInitializeAllCmd,
dagstoreGcCmd,
},
}
var dagstoreListShardsCmd = &cli.Command{
Name: "list-shards",
Usage: "List all shards known to the dagstore, with their current status",
Action: func(cctx *cli.Context) error {
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
shards, err := marketsApi.DagstoreListShards(ctx)
if err != nil {
return err
}
if len(shards) == 0 {
return nil
}
tw := tablewriter.New(
tablewriter.Col("Key"),
tablewriter.Col("State"),
tablewriter.Col("Error"),
)
colors := map[string]color.Attribute{
"ShardStateAvailable": color.FgGreen,
"ShardStateServing": color.FgBlue,
"ShardStateErrored": color.FgRed,
"ShardStateNew": color.FgYellow,
}
for _, s := range shards {
m := map[string]interface{}{
"Key": s.Key,
"State": func() string {
if c, ok := colors[s.State]; ok {
return color.New(c).Sprint(s)
}
return s.State
}(),
"Error": s.Error,
}
tw.Write(m)
}
return tw.Flush(os.Stdout)
},
}
var dagstoreInitializeShardCmd = &cli.Command{
Name: "initialize-shard",
ArgsUsage: "[key]",
Usage: "Initialize the specified shard",
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return fmt.Errorf("must provide a single shard key")
}
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return marketsApi.DagstoreInitializeShard(ctx, cctx.Args().First())
},
}
var dagstoreInitializeAllCmd = &cli.Command{
Name: "initialize-all",
Usage: "Initialize all uninitialized shards, streaming results as they're produced",
Flags: []cli.Flag{
&cli.UintFlag{
Name: "concurrency",
Usage: "maximum shards to initialize concurrently at a time; use 0 for unlimited",
Required: true,
},
},
Action: func(cctx *cli.Context) error {
concurrency := cctx.Uint("concurrency")
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
params := api.DagstoreInitializeAllParams{
MaxConcurrency: int(concurrency),
}
ch, err := marketsApi.DagstoreInitializeAll(ctx, params)
if err != nil {
return err
}
for {
select {
case res, ok := <-ch:
if !ok {
return nil
}
if res.Success {
_, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgGreen).Sprint("SUCCESS"))
} else {
_, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgRed).Sprint("ERROR"), res.Error)
}
case <-ctx.Done():
return fmt.Errorf("aborted")
}
}
},
}
var dagstoreGcCmd = &cli.Command{
Name: "gc",
Usage: "Garbage collect the dagstore",
Action: func(cctx *cli.Context) error {
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
collected, err := marketsApi.DagstoreGC(ctx)
if err != nil {
return err
}
if len(collected) == 0 {
_, _ = fmt.Fprintln(os.Stdout, "no shards collected")
return nil
}
for _, e := range collected {
if e.Error == "" {
_, _ = fmt.Fprintln(os.Stdout, e.Key, "success")
} else {
_, _ = fmt.Fprintln(os.Stdout, e.Key, "failed:", e.Error)
}
}
return nil
},
}

View File

@ -5,13 +5,15 @@ import (
"fmt" "fmt"
"github.com/fatih/color" "github.com/fatih/color"
cliutil "github.com/filecoin-project/lotus/cli/util"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"golang.org/x/xerrors" "golang.org/x/xerrors"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
@ -46,6 +48,7 @@ func main() {
lcli.WithCategory("market", storageDealsCmd), lcli.WithCategory("market", storageDealsCmd),
lcli.WithCategory("market", retrievalDealsCmd), lcli.WithCategory("market", retrievalDealsCmd),
lcli.WithCategory("market", dataTransfersCmd), lcli.WithCategory("market", dataTransfersCmd),
lcli.WithCategory("market", dagstoreCmd),
lcli.WithCategory("storage", sectorsCmd), lcli.WithCategory("storage", sectorsCmd),
lcli.WithCategory("storage", provingCmd), lcli.WithCategory("storage", provingCmd),
lcli.WithCategory("storage", storageCmd), lcli.WithCategory("storage", storageCmd),

View File

@ -18,6 +18,12 @@
* [ComputeProof](#ComputeProof) * [ComputeProof](#ComputeProof)
* [Create](#Create) * [Create](#Create)
* [CreateBackup](#CreateBackup) * [CreateBackup](#CreateBackup)
* [Dagstore](#Dagstore)
* [DagstoreGC](#DagstoreGC)
* [DagstoreInitializeAll](#DagstoreInitializeAll)
* [DagstoreInitializeShard](#DagstoreInitializeShard)
* [DagstoreListShards](#DagstoreListShards)
* [DagstoreRecoverShard](#DagstoreRecoverShard)
* [Deals](#Deals) * [Deals](#Deals)
* [DealsConsiderOfflineRetrievalDeals](#DealsConsiderOfflineRetrievalDeals) * [DealsConsiderOfflineRetrievalDeals](#DealsConsiderOfflineRetrievalDeals)
* [DealsConsiderOfflineStorageDeals](#DealsConsiderOfflineStorageDeals) * [DealsConsiderOfflineStorageDeals](#DealsConsiderOfflineStorageDeals)
@ -345,6 +351,110 @@ Inputs:
Response: `{}` Response: `{}`
## Dagstore
### DagstoreGC
DagstoreGC runs garbage collection on the DAG store.
Perms: admin
Inputs: `null`
Response: `null`
### DagstoreInitializeAll
DagstoreInitializeAll initializes all uninitialized shards in bulk,
according to the policy passed in the parameters.
It is recommended to set a maximum concurrency to avoid extreme
IO pressure if the storage subsystem has a large amount of deals.
It returns the result for each shard it attempted to initialize.
Perms: write
Inputs:
```json
[
{
"MaxConcurrency": 123
}
]
```
Response:
```json
{
"Key": "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
"Success": false,
"Error": "\u003cerror\u003e"
}
```
### DagstoreInitializeShard
DagstoreInitializeShard initializes an uninitialized shard.
Initialization consists of fetching the shard's data (deal payload) from
the storage subsystem, generating an index, and persisting the index
to facilitate later retrievals, and/or to publish to external sources.
This operation is intended to complement the initial migration. The
migration registers a shard for every unique piece CID, with lazy
initialization. Thus, shards are not initialized immediately to avoid
IO activity competing with proving. Instead, shard are initialized
when first accessed. This method forces the initialization of a shard by
accessing it and immediately releasing it. This is useful to warm up the
cache to facilitate subsequent retrievals, and to generate the indexes
to publish them externally.
This operation fails if the shard is not in ShardStateNew state.
It blocks until initialization finishes.
Perms: write
Inputs:
```json
[
"string value"
]
```
Response: `{}`
### DagstoreListShards
DagstoreListShards returns information about all shards known to the
DAG store. Only available on nodes running the markets subsystem.
Perms: read
Inputs: `null`
Response: `null`
### DagstoreRecoverShard
DagstoreRecoverShard attempts to recover a failed shard.
This operation fails if the shard is not in ShardStateErrored state.
It blocks until recovery finishes. If recovery failed, it returns the
error.
Perms: write
Inputs:
```json
[
"string value"
]
```
Response: `{}`
## Deals ## Deals

View File

@ -29,6 +29,7 @@ COMMANDS:
storage-deals Manage storage deals and related configuration storage-deals Manage storage deals and related configuration
retrieval-deals Manage retrieval deals and related configuration retrieval-deals Manage retrieval deals and related configuration
data-transfers Manage data transfers data-transfers Manage data transfers
dagstore Manage the dagstore on the markets subsystem
NETWORK: NETWORK:
net Manage P2P Network net Manage P2P Network
RETRIEVAL: RETRIEVAL:
@ -1000,6 +1001,80 @@ OPTIONS:
``` ```
## lotus-miner dagstore
```
NAME:
lotus-miner dagstore - Manage the dagstore on the markets subsystem
USAGE:
lotus-miner dagstore command [command options] [arguments...]
COMMANDS:
list-shards List all shards known to the dagstore, with their current status
initialize-shard Initialize the specified shard
initialize-all Initialize all uninitialized shards, streaming results as they're produced
gc Garbage collect the dagstore
help, h Shows a list of commands or help for one command
OPTIONS:
--help, -h show help (default: false)
--version, -v print the version (default: false)
```
### lotus-miner dagstore list-shards
```
NAME:
lotus-miner dagstore list-shards - List all shards known to the dagstore, with their current status
USAGE:
lotus-miner dagstore list-shards [command options] [arguments...]
OPTIONS:
--help, -h show help (default: false)
```
### lotus-miner dagstore initialize-shard
```
NAME:
lotus-miner dagstore initialize-shard - Initialize the specified shard
USAGE:
lotus-miner dagstore initialize-shard [command options] [key]
OPTIONS:
--help, -h show help (default: false)
```
### lotus-miner dagstore initialize-all
```
NAME:
lotus-miner dagstore initialize-all - Initialize all uninitialized shards, streaming results as they're produced
USAGE:
lotus-miner dagstore initialize-all [command options] [arguments...]
OPTIONS:
--concurrency value maximum shards to initialize concurrently at a time; use 0 for unlimited (default: 0)
--help, -h show help (default: false)
```
### lotus-miner dagstore gc
```
NAME:
lotus-miner dagstore gc - Garbage collect the dagstore
USAGE:
lotus-miner dagstore gc [command options] [arguments...]
OPTIONS:
--help, -h show help (default: false)
```
## lotus-miner net ## lotus-miner net
``` ```
NAME: NAME:

View File

@ -3,16 +3,20 @@ package impl
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/build"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -20,6 +24,8 @@ import (
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/piecestore"
@ -34,6 +40,8 @@ import (
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
sto "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types" apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -42,7 +50,6 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
sto "github.com/filecoin-project/specs-storage/storage"
) )
type StorageMinerAPI struct { type StorageMinerAPI struct {
@ -65,6 +72,7 @@ type StorageMinerAPI struct {
DealPublisher *storageadapter.DealPublisher `optional:"true"` DealPublisher *storageadapter.DealPublisher `optional:"true"`
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"` SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
Host host.Host `optional:"true"` Host host.Host `optional:"true"`
DAGStore *dagstore.DAGStore `optional:"true"`
// Miner / storage // Miner / storage
Miner *storage.Miner `optional:"true"` Miner *storage.Miner `optional:"true"`
@ -98,6 +106,8 @@ type StorageMinerAPI struct {
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"` SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"`
} }
var _ api.StorageMiner = &StorageMinerAPI{}
func (sm *StorageMinerAPI) ServeRemote(perm bool) func(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) ServeRemote(perm bool) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if perm == true { if perm == true {
@ -545,6 +555,204 @@ func (sm *StorageMinerAPI) MarketPublishPendingDeals(ctx context.Context) error
return nil return nil
} }
func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.DagstoreShardInfo, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
info := sm.DAGStore.AllShardsInfo()
ret := make([]api.DagstoreShardInfo, 0, len(info))
for k, i := range info {
ret = append(ret, api.DagstoreShardInfo{
Key: k.String(),
State: i.ShardState.String(),
Error: func() string {
if i.Error == nil {
return ""
}
return i.Error.Error()
}(),
})
}
return ret, nil
}
func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key string) error {
if sm.DAGStore == nil {
return fmt.Errorf("dagstore not available on this node")
}
k := shard.KeyFromString(key)
info, err := sm.DAGStore.GetShardInfo(k)
if err != nil {
return fmt.Errorf("failed to get shard info: %w", err)
}
if st := info.ShardState; st != dagstore.ShardStateNew {
return fmt.Errorf("cannot initialize shard; expected state ShardStateNew, was: %s", st.String())
}
ch := make(chan dagstore.ShardResult, 1)
if err = sm.DAGStore.AcquireShard(ctx, k, ch, dagstore.AcquireOpts{}); err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
}
var res dagstore.ShardResult
select {
case res = <-ch:
case <-ctx.Done():
return ctx.Err()
}
if err := res.Error; err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
}
if res.Accessor != nil {
err = res.Accessor.Close()
if err != nil {
log.Warnw("failed to close shard accessor; continuing", "shard_key", k, "error", err)
}
}
return nil
}
func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreShardResult, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
// prepare the thottler tokens.
var throttle chan struct{}
if c := params.MaxConcurrency; c > 0 {
throttle = make(chan struct{}, c)
for i := 0; i < c; i++ {
throttle <- struct{}{}
}
}
info := sm.DAGStore.AllShardsInfo()
var uninit []string
for k, i := range info {
if i.ShardState != dagstore.ShardStateNew {
continue
}
uninit = append(uninit, k.String())
}
if len(uninit) == 0 {
out := make(chan api.DagstoreShardResult)
close(out)
return out, nil
}
var wg sync.WaitGroup
wg.Add(len(uninit))
// response channel must be closed when we're done, or the context is cancelled.
// this buffering is necessary to prevent inflight children goroutines from
// publishing to a closed channel (res) when the context is cancelled.
out := make(chan api.DagstoreShardResult, 32) // internal buffer.
res := make(chan api.DagstoreShardResult, 32) // returned to caller.
go func() {
close(res) // close the caller channel.
pending := len(uninit)
for pending > 0 {
select {
case res <- <-out:
pending--
continue
case <-throttle:
// acquired a throttle token, proceed.
case <-ctx.Done():
return
}
next := uninit[0]
uninit = uninit[1:]
go func() {
err := sm.DagstoreInitializeShard(ctx, next)
throttle <- struct{}{}
r := api.DagstoreShardResult{Key: next}
if err == nil {
r.Success = true
} else {
r.Success = false
r.Error = err.Error()
}
select {
case out <- r:
case <-ctx.Done():
}
}()
}
}()
return res, nil
}
func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string) error {
if sm.DAGStore == nil {
return fmt.Errorf("dagstore not available on this node")
}
k := shard.KeyFromString(key)
info, err := sm.DAGStore.GetShardInfo(k)
if err != nil {
return fmt.Errorf("failed to get shard info: %w", err)
}
if st := info.ShardState; st != dagstore.ShardStateErrored {
return fmt.Errorf("cannot recover shard; expected state ShardStateErrored, was: %s", st.String())
}
ch := make(chan dagstore.ShardResult, 1)
if err = sm.DAGStore.RecoverShard(ctx, k, ch, dagstore.RecoverOpts{}); err != nil {
return fmt.Errorf("failed to recover shard: %w", err)
}
var res dagstore.ShardResult
select {
case res = <-ch:
case <-ctx.Done():
return ctx.Err()
}
return res.Error
}
func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreShardResult, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
res, err := sm.DAGStore.GC(ctx)
if err != nil {
return nil, fmt.Errorf("failed to gc: %w", err)
}
ret := make([]api.DagstoreShardResult, 0, len(res.Shards))
for k, err := range res.Shards {
r := api.DagstoreShardResult{Key: k.String()}
if err == nil {
r.Success = true
} else {
r.Success = false
r.Error = err.Error()
}
ret = append(ret, r)
}
return ret, nil
}
func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) { func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) {
return sm.listDeals(ctx) return sm.listDeals(ctx)
} }
@ -708,5 +916,3 @@ func (sm *StorageMinerAPI) ComputeProof(ctx context.Context, ssi []builtin.Secto
func (sm *StorageMinerAPI) RuntimeSubsystems(context.Context) (res api.MinerSubsystems, err error) { func (sm *StorageMinerAPI) RuntimeSubsystems(context.Context) (res api.MinerSubsystems, err error) {
return sm.EnabledSubsystems, nil return sm.EnabledSubsystems, nil
} }
var _ api.StorageMiner = &StorageMinerAPI{}