implement DagstoreInitializeAll.

This commit is contained in:
Raúl Kripalani 2021-08-05 13:27:43 +01:00
parent 1cc59ade98
commit 7c858ece76
8 changed files with 161 additions and 23 deletions

View File

@ -197,8 +197,17 @@ type StorageMiner interface {
// error.
DagstoreRecoverShard(ctx context.Context, key string) error //perm:write
// DagstoreInitializeAll initializes all uninitialized shards in bulk,
// according to the policy passed in the parameters.
//
// It is recommended to set a maximum concurrency to avoid extreme
// IO pressure if the storage subsystem has a large amount of deals.
//
// It returns the result for each shard it attempted to initialize.
DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) //perm:write
// DagstoreGC runs garbage collection on the DAG store.
DagstoreGC(ctx context.Context) ([]DagstoreGCResult, error) //perm:admin
DagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin
// RuntimeSubsystems returns the subsystems that are enabled
// in this instance.
@ -380,9 +389,13 @@ type DagstoreShardInfo struct {
Error string
}
// DagstoreGCResult is the serialized form of dagstore.GCResult that we expose
// through JSON-RPC to avoid clients having to depend on the dagstore lib.
type DagstoreGCResult struct {
Key string
Error string
// DagstoreShardResult enumerates results per shard.
type DagstoreShardResult struct {
Key string
Success bool
Error string
}
type DagstoreInitializeAllParams struct {
MaxConcurrency int
}

View File

@ -276,7 +276,7 @@ func init() {
api.SubsystemSectorStorage,
api.SubsystemMarkets,
})
addExample(api.DagstoreGCResult{
addExample(api.DagstoreShardResult{
Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
Error: "<error>",
})

View File

@ -603,7 +603,9 @@ type StorageMinerStruct struct {
CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"`
DagstoreGC func(p0 context.Context) ([]DagstoreGCResult, error) `perm:"admin"`
DagstoreGC func(p0 context.Context) ([]DagstoreShardResult, error) `perm:"admin"`
DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) `perm:"write"`
DagstoreInitializeShard func(p0 context.Context, p1 string) error `perm:"write"`
@ -3577,15 +3579,26 @@ func (s *StorageMinerStub) CreateBackup(p0 context.Context, p1 string) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreGC(p0 context.Context) ([]DagstoreGCResult, error) {
func (s *StorageMinerStruct) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) {
if s.Internal.DagstoreGC == nil {
return *new([]DagstoreGCResult), ErrNotSupported
return *new([]DagstoreShardResult), ErrNotSupported
}
return s.Internal.DagstoreGC(p0)
}
func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreGCResult, error) {
return *new([]DagstoreGCResult), ErrNotSupported
func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) {
return *new([]DagstoreShardResult), ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) {
if s.Internal.DagstoreInitializeAll == nil {
return nil, ErrNotSupported
}
return s.Internal.DagstoreInitializeAll(p0, p1)
}
func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) {
return nil, ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreInitializeShard(p0 context.Context, p1 string) error {

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -20,6 +20,7 @@
* [CreateBackup](#CreateBackup)
* [Dagstore](#Dagstore)
* [DagstoreGC](#DagstoreGC)
* [DagstoreInitializeAll](#DagstoreInitializeAll)
* [DagstoreInitializeShard](#DagstoreInitializeShard)
* [DagstoreListShards](#DagstoreListShards)
* [DagstoreRecoverShard](#DagstoreRecoverShard)
@ -363,6 +364,36 @@ Inputs: `null`
Response: `null`
### DagstoreInitializeAll
DagstoreInitializeAll initializes all uninitialized shards in bulk,
according to the policy passed in the parameters.
It is recommended to set a maximum concurrency to avoid extreme
IO pressure if the storage subsystem has a large amount of deals.
It returns the result for each shard it attempted to initialize.
Perms: write
Inputs:
```json
[
{
"MaxConcurrency": 123
}
]
```
Response:
```json
{
"Key": "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
"Success": false,
"Error": "\u003cerror\u003e"
}
```
### DagstoreInitializeShard
DagstoreInitializeShard initializes an uninitialized shard.

View File

@ -7,6 +7,7 @@ import (
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/filecoin-project/dagstore"
@ -616,6 +617,87 @@ func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key stri
return nil
}
func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreShardResult, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
// prepare the thottler tokens.
var throttle chan struct{}
if c := params.MaxConcurrency; c > 0 {
throttle = make(chan struct{}, c)
for i := 0; i < c; i++ {
throttle <- struct{}{}
}
}
info := sm.DAGStore.AllShardsInfo()
var uninit []string
for k, i := range info {
if i.ShardState != dagstore.ShardStateNew {
continue
}
uninit = append(uninit, k.String())
}
if len(uninit) == 0 {
out := make(chan api.DagstoreShardResult)
close(out)
return out, nil
}
var wg sync.WaitGroup
wg.Add(len(uninit))
// response channel must be closed when we're done, or the context is cancelled.
// this buffering is necessary to prevent inflight children goroutines from
// publishing to a closed channel (res) when the context is cancelled.
out := make(chan api.DagstoreShardResult, 32) // internal buffer.
res := make(chan api.DagstoreShardResult, 32) // returned to caller.
go func() {
close(res) // close the caller channel.
pending := len(uninit)
for pending > 0 {
select {
case res <- <-out:
pending--
continue
case <-throttle:
// acquired a throttle token, proceed.
case <-ctx.Done():
return
}
next := uninit[0]
uninit = uninit[1:]
go func() {
err := sm.DagstoreInitializeShard(ctx, next)
throttle <- struct{}{}
r := api.DagstoreShardResult{Key: next}
if err == nil {
r.Success = true
} else {
r.Success = false
r.Error = err.Error()
}
select {
case out <- r:
case <-ctx.Done():
}
}()
}
}()
return res, nil
}
func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string) error {
if sm.DAGStore == nil {
return fmt.Errorf("dagstore not available on this node")
@ -646,7 +728,7 @@ func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string)
return res.Error
}
func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreGCResult, error) {
func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreShardResult, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
@ -656,17 +738,16 @@ func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreGCResu
return nil, fmt.Errorf("failed to gc: %w", err)
}
ret := make([]api.DagstoreGCResult, 0, len(res.Shards))
ret := make([]api.DagstoreShardResult, 0, len(res.Shards))
for k, err := range res.Shards {
ret = append(ret, api.DagstoreGCResult{
Key: k.String(),
Error: func() string {
if err == nil {
return ""
}
return err.Error()
}(),
})
r := api.DagstoreShardResult{Key: k.String()}
if err == nil {
r.Success = true
} else {
r.Success = false
r.Error = err.Error()
}
ret = append(ret, r)
}
return ret, nil