From dd0eec39949f82b072c4bdd6bd97f04057401435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 5 Aug 2021 17:32:52 +0100 Subject: [PATCH] DagstoreInitializeAll: richer output. --- api/api_storage.go | 14 ++++- api/proxy_gen.go | 6 +-- cmd/lotus-miner/dagstore.go | 36 +++++++++++-- documentation/en/api-v0-methods-miner.md | 11 ++-- documentation/en/cli-lotus-miner.md | 14 +++++ node/impl/storminer.go | 67 +++++++++++++++--------- 6 files changed, 109 insertions(+), 39 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index c783af106..9a5b6ccd1 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -203,8 +203,8 @@ type StorageMiner interface { // It is recommended to set a maximum concurrency to avoid extreme // IO pressure if the storage subsystem has a large amount of deals. // - // It returns the result for each shard it attempted to initialize. - DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) //perm:write + // It returns a stream of events to report progress. + DagstoreInitializeAll(ctx context.Context, params DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) //perm:write // DagstoreGC runs garbage collection on the DAG store. DagstoreGC(ctx context.Context) ([]DagstoreShardResult, error) //perm:admin @@ -399,3 +399,13 @@ type DagstoreShardResult struct { type DagstoreInitializeAllParams struct { MaxConcurrency int } + +// DagstoreInitializeAllEvent represents an initialization event. +type DagstoreInitializeAllEvent struct { + Key string + Event string // "start", "end" + Success bool + Error string + Total int + Current int +} diff --git a/api/proxy_gen.go b/api/proxy_gen.go index f5df79d0e..86fbf16ac 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -605,7 +605,7 @@ type StorageMinerStruct struct { DagstoreGC func(p0 context.Context) ([]DagstoreShardResult, error) `perm:"admin"` - DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) `perm:"write"` + DagstoreInitializeAll func(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) `perm:"write"` DagstoreInitializeShard func(p0 context.Context, p1 string) error `perm:"write"` @@ -3590,14 +3590,14 @@ func (s *StorageMinerStub) DagstoreGC(p0 context.Context) ([]DagstoreShardResult return *new([]DagstoreShardResult), ErrNotSupported } -func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) { +func (s *StorageMinerStruct) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) { if s.Internal.DagstoreInitializeAll == nil { return nil, ErrNotSupported } return s.Internal.DagstoreInitializeAll(p0, p1) } -func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreShardResult, error) { +func (s *StorageMinerStub) DagstoreInitializeAll(p0 context.Context, p1 DagstoreInitializeAllParams) (<-chan DagstoreInitializeAllEvent, error) { return nil, ErrNotSupported } diff --git a/cmd/lotus-miner/dagstore.go b/cmd/lotus-miner/dagstore.go index 0fd95a411..10c0e813b 100644 --- a/cmd/lotus-miner/dagstore.go +++ b/cmd/lotus-miner/dagstore.go @@ -18,6 +18,7 @@ var dagstoreCmd = &cli.Command{ Subcommands: []*cli.Command{ dagstoreListShardsCmd, dagstoreInitializeShardCmd, + dagstoreRecoverShardCmd, dagstoreInitializeAllCmd, dagstoreGcCmd, }, @@ -96,6 +97,27 @@ var dagstoreInitializeShardCmd = &cli.Command{ }, } +var dagstoreRecoverShardCmd = &cli.Command{ + Name: "recover-shard", + ArgsUsage: "[key]", + Usage: "Attempt to recover a shard in errored state", + Action: func(cctx *cli.Context) error { + if cctx.NArg() != 1 { + return fmt.Errorf("must provide a single shard key") + } + + marketsApi, closer, err := lcli.GetMarketsAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + return marketsApi.DagstoreRecoverShard(ctx, cctx.Args().First()) + }, +} + var dagstoreInitializeAllCmd = &cli.Command{ Name: "initialize-all", Usage: "Initialize all uninitialized shards, streaming results as they're produced", @@ -128,14 +150,20 @@ var dagstoreInitializeAllCmd = &cli.Command{ for { select { - case res, ok := <-ch: + case evt, ok := <-ch: if !ok { return nil } - if res.Success { - _, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgGreen).Sprint("SUCCESS")) + _, _ = fmt.Fprintf(os.Stdout, color.New(color.BgHiBlack).Sprintf("(%d/%d)", evt.Current, evt.Total)) + _, _ = fmt.Fprintf(os.Stdout, " ") + if evt.Event == "start" { + _, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.Reset).Sprint("STARTING")) } else { - _, _ = fmt.Fprintln(os.Stdout, res.Key, color.New(color.FgRed).Sprint("ERROR"), res.Error) + if evt.Success { + _, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.FgGreen).Sprint("SUCCESS")) + } else { + _, _ = fmt.Fprintln(os.Stdout, evt.Key, color.New(color.FgRed).Sprint("ERROR"), evt.Error) + } } case <-ctx.Done(): diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 6b42ec53a..ab3eaf8db 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -371,7 +371,7 @@ according to the policy passed in the parameters. It is recommended to set a maximum concurrency to avoid extreme IO pressure if the storage subsystem has a large amount of deals. -It returns the result for each shard it attempted to initialize. +It returns a stream of events to report progress. Perms: write @@ -388,9 +388,12 @@ Inputs: Response: ```json { - "Key": "baga6ea4seaqecmtz7iak33dsfshi627abz4i4665dfuzr3qfs4bmad6dx3iigdq", - "Success": false, - "Error": "\u003cerror\u003e" + "Key": "string value", + "Event": "string value", + "Success": true, + "Error": "string value", + "Total": 123, + "Current": 123 } ``` diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 15bff5f6d..c1284b140 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -1012,6 +1012,7 @@ USAGE: COMMANDS: list-shards List all shards known to the dagstore, with their current status initialize-shard Initialize the specified shard + recover-shard Attempt to recover a shard in errored state initialize-all Initialize all uninitialized shards, streaming results as they're produced gc Garbage collect the dagstore help, h Shows a list of commands or help for one command @@ -1048,6 +1049,19 @@ OPTIONS: ``` +### lotus-miner dagstore recover-shard +``` +NAME: + lotus-miner dagstore recover-shard - Attempt to recover a shard in errored state + +USAGE: + lotus-miner dagstore recover-shard [command options] [key] + +OPTIONS: + --help, -h show help (default: false) + +``` + ### lotus-miner dagstore initialize-all ``` NAME: diff --git a/node/impl/storminer.go b/node/impl/storminer.go index f5b0bd0c3..ea8906248 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -6,8 +6,8 @@ import ( "fmt" "net/http" "os" + "sort" "strconv" - "sync" "time" "github.com/filecoin-project/dagstore" @@ -559,6 +559,7 @@ func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.Dagsto if sm.DAGStore == nil { return nil, fmt.Errorf("dagstore not available on this node") } + info := sm.DAGStore.AllShardsInfo() ret := make([]api.DagstoreShardInfo, 0, len(info)) for k, i := range info { @@ -573,6 +574,11 @@ func (sm *StorageMinerAPI) DagstoreListShards(ctx context.Context) ([]api.Dagsto }(), }) } + + sort.SliceStable(ret, func(i, j int) bool { + return ret[i].Key < ret[j].Key + }) + return ret, nil } @@ -617,7 +623,7 @@ func (sm *StorageMinerAPI) DagstoreInitializeShard(ctx context.Context, key stri return nil } -func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreShardResult, error) { +func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api.DagstoreInitializeAllParams) (<-chan api.DagstoreInitializeAllEvent, error) { if sm.DAGStore == nil { return nil, fmt.Errorf("dagstore not available on this node") } @@ -629,11 +635,6 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api for i := 0; i < c; i++ { throttle <- struct{}{} } - } else { - // zero concurrency means no limit; a closed channel will always - // be receivable. - throttle = make(chan struct{}) - close(throttle) } info := sm.DAGStore.AllShardsInfo() @@ -645,44 +646,59 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api uninit = append(uninit, k.String()) } - if len(uninit) == 0 { - out := make(chan api.DagstoreShardResult) + total := len(uninit) + if total == 0 { + out := make(chan api.DagstoreInitializeAllEvent) close(out) return out, nil } - var wg sync.WaitGroup - wg.Add(len(uninit)) - // response channel must be closed when we're done, or the context is cancelled. // this buffering is necessary to prevent inflight children goroutines from // publishing to a closed channel (res) when the context is cancelled. - out := make(chan api.DagstoreShardResult, 32) // internal buffer. - res := make(chan api.DagstoreShardResult, 32) // returned to caller. + out := make(chan api.DagstoreInitializeAllEvent, 32) // internal buffer. + res := make(chan api.DagstoreInitializeAllEvent, 32) // returned to caller. + // pump events back to caller. + // two events per shard. go func() { - defer close(res) // close the caller channel. + defer close(res) - pending := len(uninit) - for pending > 0 { + for i := 0; i < total*2; i++ { select { case res <- <-out: - pending-- - continue + case <-ctx.Done(): + return + } + } + }() + + go func() { + for i, k := range uninit { + select { case <-throttle: // acquired a throttle token, proceed. case <-ctx.Done(): return } - next := uninit[0] - uninit = uninit[1:] + go func(k string, i int) { + r := api.DagstoreInitializeAllEvent{ + Key: k, + Event: "start", + Total: total, + Current: i, + } + select { + case out <- r: + case <-ctx.Done(): + return + } - go func() { - err := sm.DagstoreInitializeShard(ctx, next) + err := sm.DagstoreInitializeShard(ctx, k) throttle <- struct{}{} - r := api.DagstoreShardResult{Key: next} + r.Event = "end" if err == nil { r.Success = true } else { @@ -694,9 +710,8 @@ func (sm *StorageMinerAPI) DagstoreInitializeAll(ctx context.Context, params api case out <- r: case <-ctx.Done(): } - }() + }(k, i) } - }() return res, nil