diff --git a/api/api_storage.go b/api/api_storage.go index c39114929..c783af106 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -13,13 +13,14 @@ import ( "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-fil-markets/piecestore" - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -166,6 +167,48 @@ type StorageMiner interface { MarketPendingDeals(ctx context.Context) (PendingDealInfo, error) //perm:write MarketPublishPendingDeals(ctx context.Context) error //perm:admin + // DagstoreListShards returns information about all shards known to the + // DAG store. Only available on nodes running the markets subsystem. + DagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read + + // DagstoreInitializeShard initializes an uninitialized shard. + // + // Initialization consists of fetching the shard's data (deal payload) from + // the storage subsystem, generating an index, and persisting the index + // to facilitate later retrievals, and/or to publish to external sources. + // + // This operation is intended to complement the initial migration. The + // migration registers a shard for every unique piece CID, with lazy + // initialization. Thus, shards are not initialized immediately to avoid + // IO activity competing with proving. Instead, shard are initialized + // when first accessed. This method forces the initialization of a shard by + // accessing it and immediately releasing it. This is useful to warm up the + // cache to facilitate subsequent retrievals, and to generate the indexes + // to publish them externally. + // + // This operation fails if the shard is not in ShardStateNew state. + // It blocks until initialization finishes. + DagstoreInitializeShard(ctx context.Context, key string) error //perm:write + + // DagstoreRecoverShard attempts to recover a failed shard. + // + // This operation fails if the shard is not in ShardStateErrored state. + // It blocks until recovery finishes. If recovery failed, it returns the + // error. + DagstoreRecoverShard(ctx context.Context, key string) error //perm:write + + // DagstoreInitializeAll initializes all uninitialized shards in bulk, + // according to the policy passed in the parameters. + // + // It is recommended to set a maximum concurrency to avoid extreme + // IO pressure if the storage subsystem has a large amount of deals. + // + // It returns the result for each shard it attempted to initialize. + DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) //perm:write + + // DagstoreGC runs garbage collection on the DAG store. + DagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin + // RuntimeSubsystems returns the subsystems that are enabled // in this instance. RuntimeSubsystems(ctx context.Context) (MinerSubsystems, error) //perm:read @@ -336,3 +379,23 @@ type DealSchedule struct { StartEpoch abi.ChainEpoch EndEpoch abi.ChainEpoch } + +// DagstoreShardInfo is the serialized form of dagstore.DagstoreShardInfo that +// we expose through JSON-RPC to avoid clients having to depend on the +// dagstore lib. +type DagstoreShardInfo struct { + Key string + State string + Error string +} + +// DagstoreShardResult enumerates results per shard. +type DagstoreShardResult struct { + Key string + Success bool + Error string +} + +type DagstoreInitializeAllParams struct { + MaxConcurrency int +} diff --git a/api/api_test.go b/api/api_test.go index 738e1b067..e65d50ca3 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -76,7 +76,7 @@ func TestReturnTypes(t *testing.T) { seen[typ] = struct{}{} if typ.Kind() == reflect.Interface && typ != bareIface && !typ.Implements(jmarsh) { - t.Error("methods can't return interfaces", m.Name) + t.Error("methods can't return interfaces or struct types not implementing json.Marshaller", m.Name) } switch typ.Kind() { diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index e0e07f758..725ea2bed 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -14,7 +14,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-multistore" - "github.com/filecoin-project/lotus/node/repo/importmgr" "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/ipfs/go-filestore" @@ -25,6 +24,8 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" + "github.com/filecoin-project/lotus/node/repo/importmgr" + datatransfer "github.com/filecoin-project/go-data-transfer" filestore2 "github.com/filecoin-project/go-fil-markets/filestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -275,6 +276,15 @@ func init() { api.SubsystemSectorStorage, api.SubsystemMarkets, }) + addExample(api.DagstoreShardResult{ + Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq", + Error: "", + }) + addExample(api.DagstoreShardInfo{ + Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq", + State: "ShardStateAvailable", + Error: "", + }) } func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) { diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 69003c3fe..f5df79d0e 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -603,6 +603,16 @@ type StorageMinerStruct struct { CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"` + DagstoreGC func(p0 context.Context) ([]DagstoreShardResult, error) `perm:"admin"` + + DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) `perm:"write"` + + DagstoreInitializeShard func(p0 context.Context, p1 string) error `perm:"write"` + + DagstoreListShards func(p0 context.Context) ([]DagstoreShardInfo, error) `perm:"read"` + + DagstoreRecoverShard func(p0 context.Context, p1 string) error `perm:"write"` + DealsConsiderOfflineRetrievalDeals func(p0 context.Context) (bool, error) `perm:"admin"` DealsConsiderOfflineStorageDeals func(p0 context.Context) (bool, error) `perm:"admin"` @@ -3569,6 +3579,61 @@ func (s *StorageMinerStub) CreateBackup(p0 context.Context, p1 string) error { return ErrNotSupported } +func (s *StorageMinerStruct) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) { + if s.Internal.DagstoreGC == nil { + return *new([]DagstoreShardResult), ErrNotSupported + } + return s.Internal.DagstoreGC(p0) +} + +func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) { + return *new([]DagstoreShardResult), ErrNotSupported +} + +func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) { + if s.Internal.DagstoreInitializeAll == nil { + return nil, ErrNotSupported + } + return s.Internal.DagstoreInitializeAll(p0, p1) +} + +func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) { + return nil, ErrNotSupported +} + +func (s *StorageMinerStruct) DagstoreInitializeShard(p0 context.Context, p1 string) error { + if s.Internal.DagstoreInitializeShard == nil { + return ErrNotSupported + } + return s.Internal.DagstoreInitializeShard(p0, p1) +} + +func (s *StorageMinerStub) DagstoreInitializeShard(p0 context.Context, p1 string) error { + return ErrNotSupported +} + +func (s *StorageMinerStruct) DagstoreListShards(p0 context.Context) ([]DagstoreShardInfo, error) { + if s.Internal.DagstoreListShards == nil { + return *new([]DagstoreShardInfo), ErrNotSupported + } + return s.Internal.DagstoreListShards(p0) +} + +func (s *StorageMinerStub) DagstoreListShards(p0 context.Context) ([]DagstoreShardInfo, error) { + return *new([]DagstoreShardInfo), ErrNotSupported +} + +func (s *StorageMinerStruct) DagstoreRecoverShard(p0 context.Context, p1 string) error { + if s.Internal.DagstoreRecoverShard == nil { + return ErrNotSupported + } + return s.Internal.DagstoreRecoverShard(p0, p1) +} + +func (s *StorageMinerStub) DagstoreRecoverShard(p0 context.Context, p1 string) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) DealsConsiderOfflineRetrievalDeals(p0 context.Context) (bool, error) { if s.Internal.DealsConsiderOfflineRetrievalDeals == nil { return false, ErrNotSupported diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index f8b4b33f0..38b71652a 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 2aa832034..89341da7d 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 8a39acec0..cb039b1ea 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-miner/dagstore.go b/cmd/lotus-miner/dagstore.go new file mode 100644 index 000000000..ab0f0cee7 --- /dev/null +++ b/cmd/lotus-miner/dagstore.go @@ -0,0 +1,181 @@ +package main + +import ( + "fmt" + "os" + + "github.com/fatih/color" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lotus/api" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/lib/tablewriter" +) + +var dagstoreCmd = &cli.Command{ + Name: "dagstore", + Usage: "Manage the dagstore on the markets subsystem", + Subcommands: []*cli.Command{ + dagstoreListShardsCmd, + dagstoreInitializeShardCmd, + dagstoreInitializeAllCmd, + dagstoreGcCmd, + }, +} + +var dagstoreListShardsCmd = &cli.Command{ + Name: "list-shards", + Usage: "List all shards known to the dagstore, with their current status", + Action: func(cctx *cli.Context) error { + marketsApi, closer, err := lcli.GetMarketsAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + shards, err := marketsApi.DagstoreListShards(ctx) + if err != nil { + return err + } + + if len(shards) == 0 { + return nil + } + + tw := tablewriter.New( + tablewriter.Col("Key"), + tablewriter.Col("State"), + tablewriter.Col("Error"), + ) + + colors := map[string]color.Attribute{ + "ShardStateAvailable": color.FgGreen, + "ShardStateServing": color.FgBlue, + "ShardStateErrored": color.FgRed, + "ShardStateNew": color.FgYellow, + } + + for _, s := range shards { + m := map[string]interface{}{ + "Key": s.Key, + "State": func() string { + if c, ok := colors[s.State]; ok { + return color.New(c).Sprint(s) + } + return s.State + }(), + "Error": s.Error, + } + tw.Write(m) + } + + return tw.Flush(os.Stdout) + }, +} + +var dagstoreInitializeShardCmd = &cli.Command{ + Name: "initialize-shard", + ArgsUsage: "[key]", + Usage: "Initialize the specified shard", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return fmt.Errorf("must provide a single shard key") + } + + marketsApi, closer, err := lcli.GetMarketsAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + return marketsApi.DagstoreInitializeShard(ctx, cctx.Args().First()) + }, +} + +var dagstoreInitializeAllCmd = &cli.Command{ + Name: "initialize-all", + Usage: "Initialize all uninitialized shards, streaming results as they're produced", + Flags: []cli.Flag{ + &cli.UintFlag{ + Name: "concurrency", + Usage: "maximum shards to initialize concurrently at a time; use 0 for unlimited", + Required: true, + }, + }, + Action: func(cctx *cli.Context) error { + concurrency := cctx.Uint("concurrency") + + marketsApi, closer, err := lcli.GetMarketsAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + params := api.DagstoreInitializeAllParams{ + MaxConcurrency: int(concurrency), + } + + ch, err := marketsApi.DagstoreInitializeAll(ctx, params) + if err != nil { + return err + } + + for { + select { + case res, ok := <-ch: + if !ok { + return nil + } + if res.Success { + _, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgGreen).Sprint("SUCCESS")) + } else { + _, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgRed).Sprint("ERROR"), res.Error) + } + + case <-ctx.Done(): + return fmt.Errorf("aborted") + } + } + }, +} + +var dagstoreGcCmd = &cli.Command{ + Name: "gc", + Usage: "Garbage collect the dagstore", + + Action: func(cctx *cli.Context) error { + marketsApi, closer, err := lcli.GetMarketsAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + collected, err := marketsApi.DagstoreGC(ctx) + if err != nil { + return err + } + + if len(collected) == 0 { + _, _ = fmt.Fprintln(os.Stdout, "no shards collected") + return nil + } + + for _, e := range collected { + if e.Error == "" { + _, _ = fmt.Fprintln(os.Stdout, e.Key, "success") + } else { + _, _ = fmt.Fprintln(os.Stdout, e.Key, "failed:", e.Error) + } + } + + return nil + }, +} diff --git a/cmd/lotus-miner/main.go b/cmd/lotus-miner/main.go index 2916fce1f..afda9d1d3 100644 --- a/cmd/lotus-miner/main.go +++ b/cmd/lotus-miner/main.go @@ -5,13 +5,15 @@ import ( "fmt" "github.com/fatih/color" - cliutil "github.com/filecoin-project/lotus/cli/util" logging "github.com/ipfs/go-log/v2" "github.com/urfave/cli/v2" "go.opencensus.io/trace" "golang.org/x/xerrors" + cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" @@ -46,6 +48,7 @@ func main() { lcli.WithCategory("market", storageDealsCmd), lcli.WithCategory("market", retrievalDealsCmd), lcli.WithCategory("market", dataTransfersCmd), + lcli.WithCategory("market", dagstoreCmd), lcli.WithCategory("storage", sectorsCmd), lcli.WithCategory("storage", provingCmd), lcli.WithCategory("storage", storageCmd), diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 5ebfe9a8e..6b42ec53a 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -18,6 +18,12 @@ * [ComputeProof](#ComputeProof) * [Create](#Create) * [CreateBackup](#CreateBackup) +* [Dagstore](#Dagstore) + * [DagstoreGC](#DagstoreGC) + * [DagstoreInitializeAll](#DagstoreInitializeAll) + * [DagstoreInitializeShard](#DagstoreInitializeShard) + * [DagstoreListShards](#DagstoreListShards) + * [DagstoreRecoverShard](#DagstoreRecoverShard) * [Deals](#Deals) * [DealsConsiderOfflineRetrievalDeals](#DealsConsiderOfflineRetrievalDeals) * [DealsConsiderOfflineStorageDeals](#DealsConsiderOfflineStorageDeals) @@ -345,6 +351,110 @@ Inputs: Response: `{}` +## Dagstore + + +### DagstoreGC +DagstoreGC runs garbage collection on the DAG store. + + +Perms: admin + +Inputs: `null` + +Response: `null` + +### DagstoreInitializeAll +DagstoreInitializeAll initializes all uninitialized shards in bulk, +according to the policy passed in the parameters. + +It is recommended to set a maximum concurrency to avoid extreme +IO pressure if the storage subsystem has a large amount of deals. + +It returns the result for each shard it attempted to initialize. + + +Perms: write + +Inputs: +```json +[ + { + "MaxConcurrency": 123 + } +] +``` + +Response: +```json +{ + "Key": "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq", + "Success": false, + "Error": "\u003cerror\u003e" +} +``` + +### DagstoreInitializeShard +DagstoreInitializeShard initializes an uninitialized shard. + +Initialization consists of fetching the shard's data (deal payload) from +the storage subsystem, generating an index, and persisting the index +to facilitate later retrievals, and/or to publish to external sources. + +This operation is intended to complement the initial migration. The +migration registers a shard for every unique piece CID, with lazy +initialization. Thus, shards are not initialized immediately to avoid +IO activity competing with proving. Instead, shard are initialized +when first accessed. This method forces the initialization of a shard by +accessing it and immediately releasing it. This is useful to warm up the +cache to facilitate subsequent retrievals, and to generate the indexes +to publish them externally. + +This operation fails if the shard is not in ShardStateNew state. +It blocks until initialization finishes. + + +Perms: write + +Inputs: +```json +[ + "string value" +] +``` + +Response: `{}` + +### DagstoreListShards +DagstoreListShards returns information about all shards known to the +DAG store. Only available on nodes running the markets subsystem. + + +Perms: read + +Inputs: `null` + +Response: `null` + +### DagstoreRecoverShard +DagstoreRecoverShard attempts to recover a failed shard. + +This operation fails if the shard is not in ShardStateErrored state. +It blocks until recovery finishes. If recovery failed, it returns the +error. + + +Perms: write + +Inputs: +```json +[ + "string value" +] +``` + +Response: `{}` + ## Deals diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index bd87774bc..15bff5f6d 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -29,6 +29,7 @@ COMMANDS: storage-deals Manage storage deals and related configuration retrieval-deals Manage retrieval deals and related configuration data-transfers Manage data transfers + dagstore Manage the dagstore on the markets subsystem NETWORK: net Manage P2P Network RETRIEVAL: @@ -1000,6 +1001,80 @@ OPTIONS: ``` +## lotus-miner dagstore +``` +NAME: + lotus-miner dagstore - Manage the dagstore on the markets subsystem + +USAGE: + lotus-miner dagstore command [command options] [arguments...] + +COMMANDS: + list-shards List all shards known to the dagstore, with their current status + initialize-shard Initialize the specified shard + initialize-all Initialize all uninitialized shards, streaming results as they're produced + gc Garbage collect the dagstore + help, h Shows a list of commands or help for one command + +OPTIONS: + --help, -h show help (default: false) + --version, -v print the version (default: false) + +``` + +### lotus-miner dagstore list-shards +``` +NAME: + lotus-miner dagstore list-shards - List all shards known to the dagstore, with their current status + +USAGE: + lotus-miner dagstore list-shards [command options] [arguments...] + +OPTIONS: + --help, -h show help (default: false) + +``` + +### lotus-miner dagstore initialize-shard +``` +NAME: + lotus-miner dagstore initialize-shard - Initialize the specified shard + +USAGE: + lotus-miner dagstore initialize-shard [command options] [key] + +OPTIONS: + --help, -h show help (default: false) + +``` + +### lotus-miner dagstore initialize-all +``` +NAME: + lotus-miner dagstore initialize-all - Initialize all uninitialized shards, streaming results as they're produced + +USAGE: + lotus-miner dagstore initialize-all [command options] [arguments...] + +OPTIONS: + --concurrency value maximum shards to initialize concurrently at a time; use 0 for unlimited (default: 0) + --help, -h show help (default: false) + +``` + +### lotus-miner dagstore gc +``` +NAME: + lotus-miner dagstore gc - Garbage collect the dagstore + +USAGE: + lotus-miner dagstore gc [command options] [arguments...] + +OPTIONS: + --help, -h show help (default: false) + +``` + ## lotus-miner net ``` NAME: diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 0fbd12111..fbfb0cff7 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,16 +3,20 @@ package impl import ( "context" "encoding/json" + "fmt" "net/http" "os" "strconv" + "sync" "time" + "github.com/filecoin-project/dagstore" + "github.com/filecoin-project/dagstore/shard" "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/gen" - "github.com/filecoin-project/lotus/build" "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/host" @@ -20,6 +24,8 @@ import ( "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -34,6 +40,8 @@ import ( sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + sto "github.com/filecoin-project/specs-storage/storage" + "github.com/filecoin-project/lotus/api" apitypes "github.com/filecoin-project/lotus/api/types" "github.com/filecoin-project/lotus/chain/types" @@ -42,7 +50,6 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" - sto "github.com/filecoin-project/specs-storage/storage" ) type StorageMinerAPI struct { @@ -65,6 +72,7 @@ type StorageMinerAPI struct { DealPublisher *storageadapter.DealPublisher `optional:"true"` SectorBlocks *sectorblocks.SectorBlocks `optional:"true"` Host host.Host `optional:"true"` + DAGStore *dagstore.DAGStore `optional:"true"` // Miner / storage Miner *storage.Miner `optional:"true"` @@ -98,6 +106,8 @@ type StorageMinerAPI struct { SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc `optional:"true"` } +var _ api.StorageMiner = &StorageMinerAPI{} + func (sm *StorageMinerAPI) ServeRemote(perm bool) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { if perm == true { @@ -545,6 +555,204 @@ func (sm *StorageMinerAPI) MarketPublishPendingDeals(ctx context.Context) error return nil } +func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.DagstoreShardInfo, error) { + if sm.DAGStore == nil { + return nil, fmt.Errorf("dagstore not available on this node") + } + info := sm.DAGStore.AllShardsInfo() + ret := make([]api.DagstoreShardInfo, 0, len(info)) + for k, i := range info { + ret = append(ret, api.DagstoreShardInfo{ + Key: k.String(), + State: i.ShardState.String(), + Error: func() string { + if i.Error == nil { + return "" + } + return i.Error.Error() + }(), + }) + } + return ret, nil +} + +func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key string) error { + if sm.DAGStore == nil { + return fmt.Errorf("dagstore not available on this node") + } + + k := shard.KeyFromString(key) + + info, err := sm.DAGStore.GetShardInfo(k) + if err != nil { + return fmt.Errorf("failed to get shard info: %w", err) + } + if st := info.ShardState; st != dagstore.ShardStateNew { + return fmt.Errorf("cannot initialize shard; expected state ShardStateNew, was: %s", st.String()) + } + + ch := make(chan dagstore.ShardResult, 1) + if err = sm.DAGStore.AcquireShard(ctx, k, ch, dagstore.AcquireOpts{}); err != nil { + return fmt.Errorf("failed to acquire shard: %w", err) + } + + var res dagstore.ShardResult + select { + case res = <-ch: + case <-ctx.Done(): + return ctx.Err() + } + + if err := res.Error; err != nil { + return fmt.Errorf("failed to acquire shard: %w", err) + } + + if res.Accessor != nil { + err = res.Accessor.Close() + if err != nil { + log.Warnw("failed to close shard accessor; continuing", "shard_key", k, "error", err) + } + } + + return nil +} + +func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreShardResult, error) { + if sm.DAGStore == nil { + return nil, fmt.Errorf("dagstore not available on this node") + } + + // prepare the thottler tokens. + var throttle chan struct{} + if c := params.MaxConcurrency; c > 0 { + throttle = make(chan struct{}, c) + for i := 0; i < c; i++ { + throttle <- struct{}{} + } + } + + info := sm.DAGStore.AllShardsInfo() + var uninit []string + for k, i := range info { + if i.ShardState != dagstore.ShardStateNew { + continue + } + uninit = append(uninit, k.String()) + } + + if len(uninit) == 0 { + out := make(chan api.DagstoreShardResult) + close(out) + return out, nil + } + + var wg sync.WaitGroup + wg.Add(len(uninit)) + + // response channel must be closed when we're done, or the context is cancelled. + // this buffering is necessary to prevent inflight children goroutines from + // publishing to a closed channel (res) when the context is cancelled. + out := make(chan api.DagstoreShardResult, 32) // internal buffer. + res := make(chan api.DagstoreShardResult, 32) // returned to caller. + + go func() { + close(res) // close the caller channel. + + pending := len(uninit) + for pending > 0 { + select { + case res <- <-out: + pending-- + continue + case <-throttle: + // acquired a throttle token, proceed. + case <-ctx.Done(): + return + } + + next := uninit[0] + uninit = uninit[1:] + + go func() { + err := sm.DagstoreInitializeShard(ctx, next) + throttle <- struct{}{} + + r := api.DagstoreShardResult{Key: next} + if err == nil { + r.Success = true + } else { + r.Success = false + r.Error = err.Error() + } + + select { + case out <- r: + case <-ctx.Done(): + } + }() + } + + }() + + return res, nil + +} + +func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string) error { + if sm.DAGStore == nil { + return fmt.Errorf("dagstore not available on this node") + } + + k := shard.KeyFromString(key) + + info, err := sm.DAGStore.GetShardInfo(k) + if err != nil { + return fmt.Errorf("failed to get shard info: %w", err) + } + if st := info.ShardState; st != dagstore.ShardStateErrored { + return fmt.Errorf("cannot recover shard; expected state ShardStateErrored, was: %s", st.String()) + } + + ch := make(chan dagstore.ShardResult, 1) + if err = sm.DAGStore.RecoverShard(ctx, k, ch, dagstore.RecoverOpts{}); err != nil { + return fmt.Errorf("failed to recover shard: %w", err) + } + + var res dagstore.ShardResult + select { + case res = <-ch: + case <-ctx.Done(): + return ctx.Err() + } + + return res.Error +} + +func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreShardResult, error) { + if sm.DAGStore == nil { + return nil, fmt.Errorf("dagstore not available on this node") + } + + res, err := sm.DAGStore.GC(ctx) + if err != nil { + return nil, fmt.Errorf("failed to gc: %w", err) + } + + ret := make([]api.DagstoreShardResult, 0, len(res.Shards)) + for k, err := range res.Shards { + r := api.DagstoreShardResult{Key: k.String()} + if err == nil { + r.Success = true + } else { + r.Success = false + r.Error = err.Error() + } + ret = append(ret, r) + } + + return ret, nil +} + func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) { return sm.listDeals(ctx) } @@ -708,5 +916,3 @@ func (sm *StorageMinerAPI) ComputeProof(ctx context.Context, ssi []builtin.Secto func (sm *StorageMinerAPI) RuntimeSubsystems(context.Context) (res api.MinerSubsystems, err error) { return sm.EnabledSubsystems, nil } - -var _ api.StorageMiner = &StorageMinerAPI{}