refactor: optionally set max migration workers

- configure via LOTUS_MIGRATION_MAX_WORKER_COUNT
This commit is contained in:
frrist 2022-12-02 11:40:43 -08:00
parent d886b46139
commit 0236952955

View File

@ -4,7 +4,9 @@ import (
"context"
_ "embed"
"fmt"
"os"
"runtime"
"strconv"
"time"
"github.com/docker/go-units"
@ -53,6 +55,29 @@ import (
//go:embed FVMLiftoff.txt
var fvmLiftoffBanner string
var (
MigrationMaxWorkerCount int
EnvMigrationMaxWorkerCount = "LOTUS_MIGRATION_MAX_WORKER_COUNT"
)
func init() {
// the default calculation used for migration worker count
MigrationMaxWorkerCount = runtime.NumCPU()
// check if an alternative value was request by environment
if mwcs := os.Getenv(EnvMigrationMaxWorkerCount); mwcs != "" {
mwc, err := strconv.ParseInt(mwcs, 10, 64)
if err != nil {
log.Warnf("invalid value for %s (%s) defaulting to %d: %s", EnvMigrationMaxWorkerCount, mwcs, MigrationMaxWorkerCount, err)
return
}
// use value from environment
log.Infof("migration worker cound set from %s (%d)", EnvMigrationMaxWorkerCount, mwc)
MigrationMaxWorkerCount = int(mwc)
return
}
log.Infof("migration worker count: %d", MigrationMaxWorkerCount)
}
func DefaultUpgradeSchedule() stmgr.UpgradeSchedule {
var us stmgr.UpgradeSchedule
@ -891,7 +916,7 @@ func UpgradeCalico(ctx context.Context, sm *stmgr.StateManager, _ stmgr.Migratio
func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
@ -929,7 +954,7 @@ func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
@ -993,7 +1018,7 @@ func upgradeActorsV3Common(
func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
@ -1015,7 +1040,7 @@ func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
@ -1079,7 +1104,7 @@ func upgradeActorsV4Common(
func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
@ -1101,7 +1126,7 @@ func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
@ -1165,7 +1190,7 @@ func upgradeActorsV5Common(
func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
@ -1187,7 +1212,7 @@ func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
@ -1251,7 +1276,7 @@ func upgradeActorsV6Common(
func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
@ -1273,7 +1298,7 @@ func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
@ -1344,7 +1369,7 @@ func upgradeActorsV7Common(
func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
@ -1368,7 +1393,7 @@ func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
@ -1451,7 +1476,7 @@ func upgradeActorsV8Common(
func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor,
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
@ -1474,7 +1499,7 @@ func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi
func PreUpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid,
epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {