Merge pull request #9784 from filecoin-project/frrist/config-migration-worker-limit

chain/migrations refactor: optionally set max migration workers
This commit is contained in:
Łukasz Magiera 2022-12-05 10:05:44 +01:00 committed by GitHub
commit 3155b345c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,7 +4,9 @@ import (
"context" "context"
_ "embed" _ "embed"
"fmt" "fmt"
"os"
"runtime" "runtime"
"strconv"
"time" "time"
"github.com/docker/go-units" "github.com/docker/go-units"
@ -53,6 +55,29 @@ import (
//go:embed FVMLiftoff.txt //go:embed FVMLiftoff.txt
var fvmLiftoffBanner string var fvmLiftoffBanner string
var (
MigrationMaxWorkerCount int
EnvMigrationMaxWorkerCount = "LOTUS_MIGRATION_MAX_WORKER_COUNT"
)
func init() {
// the default calculation used for migration worker count
MigrationMaxWorkerCount = runtime.NumCPU()
// check if an alternative value was request by environment
if mwcs := os.Getenv(EnvMigrationMaxWorkerCount); mwcs != "" {
mwc, err := strconv.ParseInt(mwcs, 10, 32)
if err != nil {
log.Warnf("invalid value for %s (%s) defaulting to %d: %s", EnvMigrationMaxWorkerCount, mwcs, MigrationMaxWorkerCount, err)
return
}
// use value from environment
log.Infof("migration worker cound set from %s (%d)", EnvMigrationMaxWorkerCount, mwc)
MigrationMaxWorkerCount = int(mwc)
return
}
log.Infof("migration worker count: %d", MigrationMaxWorkerCount)
}
func DefaultUpgradeSchedule() stmgr.UpgradeSchedule { func DefaultUpgradeSchedule() stmgr.UpgradeSchedule {
var us stmgr.UpgradeSchedule var us stmgr.UpgradeSchedule
@ -891,7 +916,7 @@ func UpgradeCalico(ctx context.Context, sm *stmgr.StateManager, _ stmgr.Migratio
func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3. // Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3 workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 { if workerCount <= 0 {
workerCount = 1 workerCount = 1
} }
@ -929,7 +954,7 @@ func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { func PreUpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3. // Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU() workerCount := MigrationMaxWorkerCount
if workerCount <= 4 { if workerCount <= 4 {
workerCount = 1 workerCount = 1
} else { } else {
@ -993,7 +1018,7 @@ func upgradeActorsV3Common(
func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3. // Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3 workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 { if workerCount <= 0 {
workerCount = 1 workerCount = 1
} }
@ -1015,7 +1040,7 @@ func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { func PreUpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3. // Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU() workerCount := MigrationMaxWorkerCount
if workerCount <= 4 { if workerCount <= 4 {
workerCount = 1 workerCount = 1
} else { } else {
@ -1079,7 +1104,7 @@ func upgradeActorsV4Common(
func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3. // Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3 workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 { if workerCount <= 0 {
workerCount = 1 workerCount = 1
} }
@ -1101,7 +1126,7 @@ func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { func PreUpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3. // Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU() workerCount := MigrationMaxWorkerCount
if workerCount <= 4 { if workerCount <= 4 {
workerCount = 1 workerCount = 1
} else { } else {
@ -1165,7 +1190,7 @@ func upgradeActorsV5Common(
func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3. // Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3 workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 { if workerCount <= 0 {
workerCount = 1 workerCount = 1
} }
@ -1187,7 +1212,7 @@ func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { func PreUpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3. // Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU() workerCount := MigrationMaxWorkerCount
if workerCount <= 4 { if workerCount <= 4 {
workerCount = 1 workerCount = 1
} else { } else {
@ -1251,7 +1276,7 @@ func upgradeActorsV6Common(
func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3. // Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3 workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 { if workerCount <= 0 {
workerCount = 1 workerCount = 1
} }
@ -1273,7 +1298,7 @@ func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { func PreUpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3. // Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU() workerCount := MigrationMaxWorkerCount
if workerCount <= 4 { if workerCount <= 4 {
workerCount = 1 workerCount = 1
} else { } else {
@ -1344,7 +1369,7 @@ func upgradeActorsV7Common(
func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3. // Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3 workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 { if workerCount <= 0 {
workerCount = 1 workerCount = 1
} }
@ -1368,7 +1393,7 @@ func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { func PreUpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3. // Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU() workerCount := MigrationMaxWorkerCount
if workerCount <= 4 { if workerCount <= 4 {
workerCount = 1 workerCount = 1
} else { } else {
@ -1451,7 +1476,7 @@ func upgradeActorsV8Common(
func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor,
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3. // Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3 workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 { if workerCount <= 0 {
workerCount = 1 workerCount = 1
} }
@ -1474,7 +1499,7 @@ func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, func PreUpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid,
epoch abi.ChainEpoch, ts *types.TipSet) error { epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3. // Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU() workerCount := MigrationMaxWorkerCount
if workerCount <= 4 { if workerCount <= 4 {
workerCount = 1 workerCount = 1
} else { } else {