add JSON-RPC operations and cli for dagstore.

This commit is contained in:
Raúl Kripalani 2021-08-04 19:19:49 +01:00
parent 25f61c8a5d
commit 5f49101566
11 changed files with 408 additions and 9 deletions

View File

@ -13,13 +13,14 @@ import (
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -166,6 +167,17 @@ type StorageMiner interface {
MarketPendingDeals(ctx context.Context) (PendingDealInfo, error) //perm:write
MarketPublishPendingDeals(ctx context.Context) error //perm:admin
// DagstoreListShards returns information about all shards known to the
// DAG store. Only available on nodes running the markets subsystem.
DagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read
// DagstoreInitializeShard initializes an uninitialized shard by acquiring
// it and releasing it as soon as it's ready.
DagstoreInitializeShard(ctx context.Context, key string) error //perm:write
// DagstoreGC runs garbage collection on the DAG store.
DagstoreGC(ctx context.Context) ([]DagstoreGCResult, error) //perm:admin
// RuntimeSubsystems returns the subsystems that are enabled
// in this instance.
RuntimeSubsystems(ctx context.Context) (MinerSubsystems, error) //perm:read
@ -336,3 +348,19 @@ type DealSchedule struct {
StartEpoch abi.ChainEpoch
EndEpoch abi.ChainEpoch
}
// DagstoreShardInfo is the serialized form of dagstore.DagstoreShardInfo that
// we expose through JSON-RPC to avoid clients having to depend on the
// dagstore lib.
type DagstoreShardInfo struct {
Key string
State string
Error error
}
// DagstoreGCResult is the serialized form of dagstore.GCResult that we expose
// through JSON-RPC to avoid clients having to depend on the dagstore lib.
type DagstoreGCResult struct {
Key string
Error error
}

View File

@ -14,7 +14,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
@ -25,6 +24,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/node/repo/importmgr"
datatransfer "github.com/filecoin-project/go-data-transfer"
filestore2 "github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -275,6 +276,15 @@ func init() {
api.SubsystemSectorStorage,
api.SubsystemMarkets,
})
addExample(api.DagstoreGCResult{
Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
Error: nil,
})
addExample(api.DagstoreShardInfo{
Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
State: "ShardStateAvailable",
Error: nil,
})
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -603,6 +603,12 @@ type StorageMinerStruct struct {
CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"`
DagstoreGC func(p0 context.Context) ([]DagstoreGCResult, error) `perm:"admin"`
DagstoreInitializeShard func(p0 context.Context, p1 string) error `perm:"write"`
DagstoreListShards func(p0 context.Context) ([]DagstoreShardInfo, error) `perm:"read"`
DealsConsiderOfflineRetrievalDeals func(p0 context.Context) (bool, error) `perm:"admin"`
DealsConsiderOfflineStorageDeals func(p0 context.Context) (bool, error) `perm:"admin"`
@ -3569,6 +3575,39 @@ func (s *StorageMinerStub) CreateBackup(p0 context.Context, p1 string) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreGC(p0 context.Context) ([]DagstoreGCResult, error) {
if s.Internal.DagstoreGC == nil {
return *new([]DagstoreGCResult), ErrNotSupported
}
return s.Internal.DagstoreGC(p0)
}
func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreGCResult, error) {
return *new([]DagstoreGCResult), ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreInitializeShard(p0 context.Context, p1 string) error {
if s.Internal.DagstoreInitializeShard == nil {
return ErrNotSupported
}
return s.Internal.DagstoreInitializeShard(p0, p1)
}
func (s *StorageMinerStub) DagstoreInitializeShard(p0 context.Context, p1 string) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreListShards(p0 context.Context) ([]DagstoreShardInfo, error) {
if s.Internal.DagstoreListShards == nil {
return *new([]DagstoreShardInfo), ErrNotSupported
}
return s.Internal.DagstoreListShards(p0)
}
func (s *StorageMinerStub) DagstoreListShards(p0 context.Context) ([]DagstoreShardInfo, error) {
return *new([]DagstoreShardInfo), ErrNotSupported
}
func (s *StorageMinerStruct) DealsConsiderOfflineRetrievalDeals(p0 context.Context) (bool, error) {
if s.Internal.DealsConsiderOfflineRetrievalDeals == nil {
return false, ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

130
cmd/lotus-miner/dagstore.go Normal file
View File

@ -0,0 +1,130 @@
package main
import (
"fmt"
"os"
"github.com/fatih/color"
"github.com/urfave/cli/v2"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/tablewriter"
)
var dagstoreCmd = &cli.Command{
Name: "dagstore",
Usage: "Manage the dagstore on the markets subsystem",
Subcommands: []*cli.Command{
dagstoreListShardsCmd,
dagstoreInitializeShardCmd,
dagstoreGcCmd,
},
}
var dagstoreListShardsCmd = &cli.Command{
Name: "list-shards",
Usage: "List all shards known to the dagstore, with their current status",
Action: func(cctx *cli.Context) error {
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
shards, err := marketsApi.DagstoreListShards(ctx)
if err != nil {
return err
}
if len(shards) == 0 {
return nil
}
tw := tablewriter.New(
tablewriter.Col("Key"),
tablewriter.Col("State"),
tablewriter.Col("Error"),
)
colors := map[string]color.Attribute{
"ShardStateAvailable": color.FgGreen,
"ShardStateServing": color.FgBlue,
"ShardStateErrored": color.FgRed,
"ShardStateNew": color.FgYellow,
}
for _, s := range shards {
m := map[string]interface{}{
"Key": s.Key,
"State": func() string {
if c, ok := colors[s.State]; ok {
return color.New(c).Sprint(s)
}
return s.State
}(),
"Error": s.Error,
}
tw.Write(m)
}
return tw.Flush(os.Stdout)
},
}
var dagstoreInitializeShardCmd = &cli.Command{
Name: "initialize-shard",
ArgsUsage: "[key]",
Usage: "Initialize the specified shard",
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return fmt.Errorf("must provide a single shard key")
}
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return marketsApi.DagstoreInitializeShard(ctx, cctx.Args().First())
},
}
var dagstoreGcCmd = &cli.Command{
Name: "gc",
Usage: "Garbage collect the dagstore",
Action: func(cctx *cli.Context) error {
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
collected, err := marketsApi.DagstoreGC(ctx)
if err != nil {
return err
}
if len(collected) == 0 {
_, _ = fmt.Fprintln(os.Stdout, "no shards collected")
return nil
}
for _, e := range collected {
if e.Error == nil {
_, _ = fmt.Fprintln(os.Stdout, e.Key, "success")
} else {
_, _ = fmt.Fprintln(os.Stdout, e.Key, "failed:", e.Error)
}
}
return nil
},
}

View File

@ -5,13 +5,15 @@ import (
"fmt"
"github.com/fatih/color"
cliutil "github.com/filecoin-project/lotus/cli/util"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
@ -46,6 +48,7 @@ func main() {
lcli.WithCategory("market", storageDealsCmd),
lcli.WithCategory("market", retrievalDealsCmd),
lcli.WithCategory("market", dataTransfersCmd),
lcli.WithCategory("market", dagstoreCmd),
lcli.WithCategory("storage", sectorsCmd),
lcli.WithCategory("storage", provingCmd),
lcli.WithCategory("storage", storageCmd),

View File

@ -18,6 +18,10 @@
* [ComputeProof](#ComputeProof)
* [Create](#Create)
* [CreateBackup](#CreateBackup)
* [Dagstore](#Dagstore)
* [DagstoreGC](#DagstoreGC)
* [DagstoreInitializeShard](#DagstoreInitializeShard)
* [DagstoreListShards](#DagstoreListShards)
* [Deals](#Deals)
* [DealsConsiderOfflineRetrievalDeals](#DealsConsiderOfflineRetrievalDeals)
* [DealsConsiderOfflineStorageDeals](#DealsConsiderOfflineStorageDeals)
@ -345,6 +349,46 @@ Inputs:
Response: `{}`
## Dagstore
### DagstoreGC
DagstoreGC runs garbage collection on the DAG store.
Perms: admin
Inputs: `null`
Response: `null`
### DagstoreInitializeShard
DagstoreInitializeShard initializes an uninitialized shard by acquiring
it and releasing it as soon as it's ready.
Perms: write
Inputs:
```json
[
"string value"
]
```
Response: `{}`
### DagstoreListShards
DagstoreListShards returns information about all shards known to the
DAG store. Only available on nodes running the markets subsystem.
Perms: read
Inputs: `null`
Response: `null`
## Deals

View File

@ -29,6 +29,7 @@ COMMANDS:
storage-deals Manage storage deals and related configuration
retrieval-deals Manage retrieval deals and related configuration
data-transfers Manage data transfers
dagstore Manage the dagstore on the markets subsystem
NETWORK:
net Manage P2P Network
RETRIEVAL:
@ -1000,6 +1001,65 @@ OPTIONS:
```
## lotus-miner dagstore
```
NAME:
lotus-miner dagstore - Manage the dagstore on the markets subsystem
USAGE:
lotus-miner dagstore command [command options] [arguments...]
COMMANDS:
list-shards List all shards known to the dagstore, with their current status
initialize-shard Initialize the specified shard
gc Garbage collect the dagstore
help, h Shows a list of commands or help for one command
OPTIONS:
--help, -h show help (default: false)
--version, -v print the version (default: false)
```
### lotus-miner dagstore list-shards
```
NAME:
lotus-miner dagstore list-shards - List all shards known to the dagstore, with their current status
USAGE:
lotus-miner dagstore list-shards [command options] [arguments...]
OPTIONS:
--help, -h show help (default: false)
```
### lotus-miner dagstore initialize-shard
```
NAME:
lotus-miner dagstore initialize-shard - Initialize the specified shard
USAGE:
lotus-miner dagstore initialize-shard [command options] [key]
OPTIONS:
--help, -h show help (default: false)
```
### lotus-miner dagstore gc
```
NAME:
lotus-miner dagstore gc - Garbage collect the dagstore
USAGE:
lotus-miner dagstore gc [command options] [arguments...]
OPTIONS:
--help, -h show help (default: false)
```
## lotus-miner net
```
NAME:

View File

@ -3,16 +3,19 @@ package impl
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"time"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/build"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
@ -20,6 +23,8 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
@ -34,6 +39,8 @@ import (
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
sto "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
apitypes "github.com/filecoin-project/lotus/api/types"
"github.com/filecoin-project/lotus/chain/types"
@ -42,7 +49,6 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
sto "github.com/filecoin-project/specs-storage/storage"
)
type StorageMinerAPI struct {
@ -65,6 +71,7 @@ type StorageMinerAPI struct {
DealPublisher *storageadapter.DealPublisher `optional:"true"`
SectorBlocks *sectorblocks.SectorBlocks `optional:"true"`
Host host.Host `optional:"true"`
DAGStore *dagstore.DAGStore `optional:"true"`
// Miner / storage
Miner *storage.Miner `optional:"true"`
@ -98,6 +105,8 @@ type StorageMinerAPI struct {
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"`
}
var _ api.StorageMiner = &StorageMinerAPI{}
func (sm *StorageMinerAPI) ServeRemote(perm bool) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if perm == true {
@ -545,6 +554,84 @@ func (sm *StorageMinerAPI) MarketPublishPendingDeals(ctx context.Context) error
return nil
}
func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.DagstoreShardInfo, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
info := sm.DAGStore.AllShardsInfo()
ret := make([]api.DagstoreShardInfo, 0, len(info))
for k, i := range info {
ret = append(ret, api.DagstoreShardInfo{
Key: k.String(),
State: i.ShardState.String(),
Error: i.Error,
})
}
return ret, nil
}
func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key string) error {
if sm.DAGStore == nil {
return fmt.Errorf("dagstore not available on this node")
}
k := shard.KeyFromString(key)
info, err := sm.DAGStore.GetShardInfo(k)
if err != nil {
return fmt.Errorf("failed to get shard info: %w", err)
}
if st := info.ShardState; st != dagstore.ShardStateNew {
return fmt.Errorf("cannot initialize shard; expected state ShardStateNew, was: %s", st.String())
}
ch := make(chan dagstore.ShardResult, 1)
if err = sm.DAGStore.AcquireShard(ctx, k, ch, dagstore.AcquireOpts{}); err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
}
var res dagstore.ShardResult
select {
case res = <-ch:
case <-ctx.Done():
return ctx.Err()
}
if err := res.Error; err != nil {
return fmt.Errorf("failed to acquire shard: %w", err)
}
if res.Accessor != nil {
err = res.Accessor.Close()
if err != nil {
log.Warnw("failed to close shard accessor; continuing", "shard_key", k, "error", err)
}
}
return nil
}
func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreGCResult, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
res, err := sm.DAGStore.GC(ctx)
if err != nil {
return nil, fmt.Errorf("failed to gc: %w", err)
}
ret := make([]api.DagstoreGCResult, 0, len(res.Shards))
for k, err := range res.Shards {
ret = append(ret, api.DagstoreGCResult{
Key: k.String(),
Error: err,
})
}
return ret, nil
}
func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) {
return sm.listDeals(ctx)
}
@ -708,5 +795,3 @@ func (sm *StorageMinerAPI) ComputeProof(ctx context.Context, ssi []builtin.Secto
func (sm *StorageMinerAPI) RuntimeSubsystems(context.Context) (res api.MinerSubsystems, err error) {
return sm.EnabledSubsystems, nil
}
var _ api.StorageMiner = &StorageMinerAPI{}