diff --git a/api/api_storage.go b/api/api_storage.go index 55ed033aa..c783af106 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -197,8 +197,17 @@ type StorageMiner interface { // error. DagstoreRecoverShard(ctx context.Context, key string) error //perm:write + // DagstoreInitializeAll initializes all uninitialized shards in bulk, + // according to the policy passed in the parameters. + // + // It is recommended to set a maximum concurrency to avoid extreme + // IO pressure if the storage subsystem has a large amount of deals. + // + // It returns the result for each shard it attempted to initialize. + DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) //perm:write + // DagstoreGC runs garbage collection on the DAG store. - DagstoreGC(ctx context.Context) ([]DagstoreGCResult, error) //perm:admin + DagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin // RuntimeSubsystems returns the subsystems that are enabled // in this instance. @@ -380,9 +389,13 @@ type DagstoreShardInfo struct { Error string } -// DagstoreGCResult is the serialized form of dagstore.GCResult that we expose -// through JSON-RPC to avoid clients having to depend on the dagstore lib. -type DagstoreGCResult struct { - Key string - Error string +// DagstoreShardResult enumerates results per shard. +type DagstoreShardResult struct { + Key string + Success bool + Error string +} + +type DagstoreInitializeAllParams struct { + MaxConcurrency int } diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 8e16e2d42..725ea2bed 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -276,7 +276,7 @@ func init() { api.SubsystemSectorStorage, api.SubsystemMarkets, }) - addExample(api.DagstoreGCResult{ + addExample(api.DagstoreShardResult{ Key: "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq", Error: "", }) diff --git a/api/proxy_gen.go b/api/proxy_gen.go index d32874890..f5df79d0e 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -603,7 +603,9 @@ type StorageMinerStruct struct { CreateBackup func(p0 context.Context, p1 string) error `perm:"admin"` - DagstoreGC func(p0 context.Context) ([]DagstoreGCResult, error) `perm:"admin"` + DagstoreGC func(p0 context.Context) ([]DagstoreShardResult, error) `perm:"admin"` + + DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) `perm:"write"` DagstoreInitializeShard func(p0 context.Context, p1 string) error `perm:"write"` @@ -3577,15 +3579,26 @@ func (s *StorageMinerStub) CreateBackup(p0 context.Context, p1 string) error { return ErrNotSupported } -func (s *StorageMinerStruct) DagstoreGC(p0 context.Context) ([]DagstoreGCResult, error) { +func (s *StorageMinerStruct) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) { if s.Internal.DagstoreGC == nil { - return *new([]DagstoreGCResult), ErrNotSupported + return *new([]DagstoreShardResult), ErrNotSupported } return s.Internal.DagstoreGC(p0) } -func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreGCResult, error) { - return *new([]DagstoreGCResult), ErrNotSupported +func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreShardResult, error) { + return *new([]DagstoreShardResult), ErrNotSupported +} + +func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) { + if s.Internal.DagstoreInitializeAll == nil { + return nil, ErrNotSupported + } + return s.Internal.DagstoreInitializeAll(p0, p1) +} + +func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) { + return nil, ErrNotSupported } func (s *StorageMinerStruct) DagstoreInitializeShard(p0 context.Context, p1 string) error { diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 725fc80f9..38b71652a 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 93de3a98e..89341da7d 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index c1f825ab3..cb039b1ea 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 0bd212a3d..6b42ec53a 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -20,6 +20,7 @@ * [CreateBackup](#CreateBackup) * [Dagstore](#Dagstore) * [DagstoreGC](#DagstoreGC) + * [DagstoreInitializeAll](#DagstoreInitializeAll) * [DagstoreInitializeShard](#DagstoreInitializeShard) * [DagstoreListShards](#DagstoreListShards) * [DagstoreRecoverShard](#DagstoreRecoverShard) @@ -363,6 +364,36 @@ Inputs: `null` Response: `null` +### DagstoreInitializeAll +DagstoreInitializeAll initializes all uninitialized shards in bulk, +according to the policy passed in the parameters. + +It is recommended to set a maximum concurrency to avoid extreme +IO pressure if the storage subsystem has a large amount of deals. + +It returns the result for each shard it attempted to initialize. + + +Perms: write + +Inputs: +```json +[ + { + "MaxConcurrency": 123 + } +] +``` + +Response: +```json +{ + "Key": "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq", + "Success": false, + "Error": "\u003cerror\u003e" +} +``` + ### DagstoreInitializeShard DagstoreInitializeShard initializes an uninitialized shard. diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 4b766b6f6..fbfb0cff7 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "strconv" + "sync" "time" "github.com/filecoin-project/dagstore" @@ -616,6 +617,87 @@ func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key stri return nil } +func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreShardResult, error) { + if sm.DAGStore == nil { + return nil, fmt.Errorf("dagstore not available on this node") + } + + // prepare the thottler tokens. + var throttle chan struct{} + if c := params.MaxConcurrency; c > 0 { + throttle = make(chan struct{}, c) + for i := 0; i < c; i++ { + throttle <- struct{}{} + } + } + + info := sm.DAGStore.AllShardsInfo() + var uninit []string + for k, i := range info { + if i.ShardState != dagstore.ShardStateNew { + continue + } + uninit = append(uninit, k.String()) + } + + if len(uninit) == 0 { + out := make(chan api.DagstoreShardResult) + close(out) + return out, nil + } + + var wg sync.WaitGroup + wg.Add(len(uninit)) + + // response channel must be closed when we're done, or the context is cancelled. + // this buffering is necessary to prevent inflight children goroutines from + // publishing to a closed channel (res) when the context is cancelled. + out := make(chan api.DagstoreShardResult, 32) // internal buffer. + res := make(chan api.DagstoreShardResult, 32) // returned to caller. + + go func() { + close(res) // close the caller channel. + + pending := len(uninit) + for pending > 0 { + select { + case res <- <-out: + pending-- + continue + case <-throttle: + // acquired a throttle token, proceed. + case <-ctx.Done(): + return + } + + next := uninit[0] + uninit = uninit[1:] + + go func() { + err := sm.DagstoreInitializeShard(ctx, next) + throttle <- struct{}{} + + r := api.DagstoreShardResult{Key: next} + if err == nil { + r.Success = true + } else { + r.Success = false + r.Error = err.Error() + } + + select { + case out <- r: + case <-ctx.Done(): + } + }() + } + + }() + + return res, nil + +} + func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string) error { if sm.DAGStore == nil { return fmt.Errorf("dagstore not available on this node") @@ -646,7 +728,7 @@ func (sm *StorageMinerAPI) DagstoreRecoverShard(ctx context.Context, key string) return res.Error } -func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreGCResult, error) { +func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreShardResult, error) { if sm.DAGStore == nil { return nil, fmt.Errorf("dagstore not available on this node") } @@ -656,17 +738,16 @@ func (sm *StorageMinerAPI) DagstoreGC(ctx context.Context) ([]api.DagstoreGCResu return nil, fmt.Errorf("failed to gc: %w", err) } - ret := make([]api.DagstoreGCResult, 0, len(res.Shards)) + ret := make([]api.DagstoreShardResult, 0, len(res.Shards)) for k, err := range res.Shards { - ret = append(ret, api.DagstoreGCResult{ - Key: k.String(), - Error: func() string { - if err == nil { - return "" - } - return err.Error() - }(), - }) + r := api.DagstoreShardResult{Key: k.String()} + if err == nil { + r.Success = true + } else { + r.Success = false + r.Error = err.Error() + } + ret = append(ret, r) } return ret, nil