DagstoreInitializeAll: richer output.

This commit is contained in:
Raúl Kripalani 2021-08-05 17:32:52 +01:00
parent 4974dc65d5
commit dd0eec3994
6 changed files with 109 additions and 39 deletions

View File

@ -203,8 +203,8 @@ type StorageMiner interface {
// It is recommended to set a maximum concurrency to avoid extreme
// IO pressure if the storage subsystem has a large amount of deals.
//
// It returns the result for each shard it attempted to initialize.
DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) //perm:write
// It returns a stream of events to report progress.
DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:write
// DagstoreGC runs garbage collection on the DAG store.
DagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin
@ -399,3 +399,13 @@ type DagstoreShardResult struct {
type DagstoreInitializeAllParams struct {
MaxConcurrency int
}
// DagstoreInitializeAllEvent represents an initialization event.
type DagstoreInitializeAllEvent struct {
Key string
Event string // "start", "end"
Success bool
Error string
Total int
Current int
}

View File

@ -605,7 +605,7 @@ type StorageMinerStruct struct {
DagstoreGC func(p0 context.Context) ([]DagstoreShardResult, error) `perm:"admin"`
DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) `perm:"write"`
DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) `perm:"write"`
DagstoreInitializeShard func(p0 context.Context, p1 string) error `perm:"write"`
@ -3590,14 +3590,14 @@ func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreShardResult
return *new([]DagstoreShardResult), ErrNotSupported
}
func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) {
func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) {
if s.Internal.DagstoreInitializeAll == nil {
return nil, ErrNotSupported
}
return s.Internal.DagstoreInitializeAll(p0, p1)
}
func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) {
func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) {
return nil, ErrNotSupported
}

View File

@ -18,6 +18,7 @@ var dagstoreCmd = &cli.Command{
Subcommands: []*cli.Command{
dagstoreListShardsCmd,
dagstoreInitializeShardCmd,
dagstoreRecoverShardCmd,
dagstoreInitializeAllCmd,
dagstoreGcCmd,
},
@ -96,6 +97,27 @@ var dagstoreInitializeShardCmd = &cli.Command{
},
}
var dagstoreRecoverShardCmd = &cli.Command{
Name: "recover-shard",
ArgsUsage: "[key]",
Usage: "Attempt to recover a shard in errored state",
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return fmt.Errorf("must provide a single shard key")
}
marketsApi, closer, err := lcli.GetMarketsAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return marketsApi.DagstoreRecoverShard(ctx, cctx.Args().First())
},
}
var dagstoreInitializeAllCmd = &cli.Command{
Name: "initialize-all",
Usage: "Initialize all uninitialized shards, streaming results as they're produced",
@ -128,14 +150,20 @@ var dagstoreInitializeAllCmd = &cli.Command{
for {
select {
case res, ok := <-ch:
case evt, ok := <-ch:
if !ok {
return nil
}
if res.Success {
_, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgGreen).Sprint("SUCCESS"))
_, _ = fmt.Fprintf(os.Stdout, color.New(color.BgHiBlack).Sprintf("(%d/%d)", evt.Current, evt.Total))
_, _ = fmt.Fprintf(os.Stdout, " ")
if evt.Event == "start" {
_, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.Reset).Sprint("STARTING"))
} else {
_, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgRed).Sprint("ERROR"), res.Error)
if evt.Success {
_, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.FgGreen).Sprint("SUCCESS"))
} else {
_, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.FgRed).Sprint("ERROR"), evt.Error)
}
}
case <-ctx.Done():

View File

@ -371,7 +371,7 @@ according to the policy passed in the parameters.
It is recommended to set a maximum concurrency to avoid extreme
IO pressure if the storage subsystem has a large amount of deals.
It returns the result for each shard it attempted to initialize.
It returns a stream of events to report progress.
Perms: write
@ -388,9 +388,12 @@ Inputs:
Response:
```json
{
"Key": "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq",
"Success": false,
"Error": "\u003cerror\u003e"
"Key": "string value",
"Event": "string value",
"Success": true,
"Error": "string value",
"Total": 123,
"Current": 123
}
```

View File

@ -1012,6 +1012,7 @@ USAGE:
COMMANDS:
list-shards List all shards known to the dagstore, with their current status
initialize-shard Initialize the specified shard
recover-shard Attempt to recover a shard in errored state
initialize-all Initialize all uninitialized shards, streaming results as they're produced
gc Garbage collect the dagstore
help, h Shows a list of commands or help for one command
@ -1048,6 +1049,19 @@ OPTIONS:
```
### lotus-miner dagstore recover-shard
```
NAME:
lotus-miner dagstore recover-shard - Attempt to recover a shard in errored state
USAGE:
lotus-miner dagstore recover-shard [command options] [key]
OPTIONS:
--help, -h show help (default: false)
```
### lotus-miner dagstore initialize-all
```
NAME:

View File

@ -6,8 +6,8 @@ import (
"fmt"
"net/http"
"os"
"sort"
"strconv"
"sync"
"time"
"github.com/filecoin-project/dagstore"
@ -559,6 +559,7 @@ func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.Dagsto
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
info := sm.DAGStore.AllShardsInfo()
ret := make([]api.DagstoreShardInfo, 0, len(info))
for k, i := range info {
@ -573,6 +574,11 @@ func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.Dagsto
}(),
})
}
sort.SliceStable(ret, func(i, j int) bool {
return ret[i].Key < ret[j].Key
})
return ret, nil
}
@ -617,7 +623,7 @@ func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key stri
return nil
}
func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreShardResult, error) {
func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreInitializeAllEvent, error) {
if sm.DAGStore == nil {
return nil, fmt.Errorf("dagstore not available on this node")
}
@ -629,11 +635,6 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api
for i := 0; i < c; i++ {
throttle <- struct{}{}
}
} else {
// zero concurrency means no limit; a closed channel will always
// be receivable.
throttle = make(chan struct{})
close(throttle)
}
info := sm.DAGStore.AllShardsInfo()
@ -645,44 +646,59 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api
uninit = append(uninit, k.String())
}
if len(uninit) == 0 {
out := make(chan api.DagstoreShardResult)
total := len(uninit)
if total == 0 {
out := make(chan api.DagstoreInitializeAllEvent)
close(out)
return out, nil
}
var wg sync.WaitGroup
wg.Add(len(uninit))
// response channel must be closed when we're done, or the context is cancelled.
// this buffering is necessary to prevent inflight children goroutines from
// publishing to a closed channel (res) when the context is cancelled.
out := make(chan api.DagstoreShardResult, 32) // internal buffer.
res := make(chan api.DagstoreShardResult, 32) // returned to caller.
out := make(chan api.DagstoreInitializeAllEvent, 32) // internal buffer.
res := make(chan api.DagstoreInitializeAllEvent, 32) // returned to caller.
// pump events back to caller.
// two events per shard.
go func() {
defer close(res) // close the caller channel.
defer close(res)
pending := len(uninit)
for pending > 0 {
for i := 0; i < total*2; i++ {
select {
case res <- <-out:
pending--
continue
case <-ctx.Done():
return
}
}
}()
go func() {
for i, k := range uninit {
select {
case <-throttle:
// acquired a throttle token, proceed.
case <-ctx.Done():
return
}
next := uninit[0]
uninit = uninit[1:]
go func(k string, i int) {
r := api.DagstoreInitializeAllEvent{
Key: k,
Event: "start",
Total: total,
Current: i,
}
select {
case out <- r:
case <-ctx.Done():
return
}
go func() {
err := sm.DagstoreInitializeShard(ctx, next)
err := sm.DagstoreInitializeShard(ctx, k)
throttle <- struct{}{}
r := api.DagstoreShardResult{Key: next}
r.Event = "end"
if err == nil {
r.Success = true
} else {
@ -694,9 +710,8 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api
case out <- r:
case <-ctx.Done():
}
}()
}(k, i)
}
}()
return res, nil