feat: Add ETA to lotus sync wait (#11211)

This commit is contained in:
Friðrik Ásmundsson 2023-09-15 17:29:27 +00:00 committed by GitHub
parent e06604d342
commit 26b35b7f8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 1 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
) )
var SyncCmd = &cli.Command{ var SyncCmd = &cli.Command{
@ -262,6 +263,9 @@ func SyncWait(ctx context.Context, napi v0api.FullNode, watch bool) error {
} }
firstApp = state.VMApplied firstApp = state.VMApplied
// eta computes the ETA for the sync to complete (with a lookback of 10 processed items)
eta := cliutil.NewETA(10)
for { for {
state, err := napi.SyncState(ctx) state, err := napi.SyncState(ctx)
if err != nil { if err != nil {
@ -312,8 +316,10 @@ func SyncWait(ctx context.Context, napi v0api.FullNode, watch bool) error {
fmt.Print("\r\x1b[2K\x1b[A") fmt.Print("\r\x1b[2K\x1b[A")
} }
todo := theight - ss.Height
fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", workerID, baseHeight, theight, heightDiff) fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", workerID, baseHeight, theight, heightDiff)
fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height) fmt.Printf("State: %s; Current Epoch: %d; Todo: %d, ETA: %s\n", ss.Stage, ss.Height, todo, eta.Update(int64(todo)))
lastLines = 2 lastLines = 2
if i%samples == 0 { if i%samples == 0 {

94
cli/util/eta.go Normal file
View File

@ -0,0 +1,94 @@
package cliutil
import (
"fmt"
"math"
"time"
)
// ETA implements a very simple eta calculator based on the number of remaining items. It does not
// require knowing the work size in advance and is therefore suitable for streaming workloads and
// also does not require that consecutive updates have a monotonically decreasing remaining value.
type ETA struct {
// max number of items to keep in memory
maxItems int
// a queue of most recently updated items
items []item
// we store the last calculated ETA which we reuse if there was not change in remaining items
lastETA string
}
type item struct {
timestamp time.Time
remaining int64
}
// NewETA creates a new ETA calculator of the given size
func NewETA(maxItems int) *ETA {
return &ETA{
maxItems: maxItems,
items: make([]item, 0),
}
}
// Update updates the ETA calculator with the remaining number of items and returns the ETA
func (e *ETA) Update(remaining int64) string {
item := item{
timestamp: time.Now(),
remaining: remaining,
}
if len(e.items) == 0 {
e.items = append(e.items, item)
return ""
}
if e.items[len(e.items)-1].remaining == remaining {
// we ignore updates with the same remaining value and just return the previous ETA
return e.lastETA
} else if e.items[len(e.items)-1].remaining < remaining {
// remaining went up from previous update, lets estimate how many items were processed using the
// average number processed items in the queue.
var avgProcessedPerItem int64 = 1
if len(e.items) > 1 {
diffRemaining := e.items[0].remaining - e.items[len(e.items)-1].remaining
avgProcessedPerItem = int64(math.Round(float64(diffRemaining) / float64(len(e.items))))
}
// diff is the difference in increase in remaining since last update plus the average number of processed
// items we estimate that were processed this round
diff := remaining - e.items[len(e.items)-1].remaining + avgProcessedPerItem
// we update all items in the queue by shifting their remaining value accordingly. This means that we
// always have strictly decreasing remaining values in the queue
for i := range e.items {
e.items[i].remaining += diff
}
}
// append the item to the queue and remove the oldest item if needed
if len(e.items) >= e.maxItems {
e.items = e.items[1:]
}
e.items = append(e.items, item)
// calculate the average processing time per item in the queue
diffMs := e.items[len(e.items)-1].timestamp.Sub(e.items[0].timestamp).Milliseconds()
nrItemsProcessed := e.items[0].remaining - e.items[len(e.items)-1].remaining
avg := diffMs / nrItemsProcessed
// use that average processing time to estimate how long the remaining items will take
// and cache that ETA so we don't have to recalculate it on every update unless the
// remaining value changes
e.lastETA = msToETA(avg * remaining)
return e.lastETA
}
func msToETA(ms int64) string {
seconds := ms / 1000
minutes := seconds / 60
hours := minutes / 60
return fmt.Sprintf("%02dh:%02dm:%02ds", hours, minutes%60, seconds%60)
}