diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index 2f8438b10..f3345ea7d 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -4,7 +4,9 @@ import ( "context" _ "embed" "fmt" + "os" "runtime" + "strconv" "time" "github.com/docker/go-units" @@ -53,6 +55,29 @@ import ( //go:embed FVMLiftoff.txt var fvmLiftoffBanner string +var ( + MigrationMaxWorkerCount int + EnvMigrationMaxWorkerCount = "LOTUS_MIGRATION_MAX_WORKER_COUNT" +) + +func init() { + // the default calculation used for migration worker count + MigrationMaxWorkerCount = runtime.NumCPU() + // check if an alternative value was request by environment + if mwcs := os.Getenv(EnvMigrationMaxWorkerCount); mwcs != "" { + mwc, err := strconv.ParseInt(mwcs, 10, 32) + if err != nil { + log.Warnf("invalid value for %s (%s) defaulting to %d: %s", EnvMigrationMaxWorkerCount, mwcs, MigrationMaxWorkerCount, err) + return + } + // use value from environment + log.Infof("migration worker cound set from %s (%d)", EnvMigrationMaxWorkerCount, mwc) + MigrationMaxWorkerCount = int(mwc) + return + } + log.Infof("migration worker count: %d", MigrationMaxWorkerCount) +} + func DefaultUpgradeSchedule() stmgr.UpgradeSchedule { var us stmgr.UpgradeSchedule @@ -891,7 +916,7 @@ func UpgradeCalico(ctx context.Context, sm *stmgr.StateManager, _ stmgr.Migratio func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { // Use all the CPUs except 3. - workerCount := runtime.NumCPU() - 3 + workerCount := MigrationMaxWorkerCount - 3 if workerCount <= 0 { workerCount = 1 } @@ -929,7 +954,7 @@ func UpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi func PreUpgradeActorsV3(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { // Use half the CPUs for pre-migration, but leave at least 3. - workerCount := runtime.NumCPU() + workerCount := MigrationMaxWorkerCount if workerCount <= 4 { workerCount = 1 } else { @@ -993,7 +1018,7 @@ func upgradeActorsV3Common( func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { // Use all the CPUs except 3. - workerCount := runtime.NumCPU() - 3 + workerCount := MigrationMaxWorkerCount - 3 if workerCount <= 0 { workerCount = 1 } @@ -1015,7 +1040,7 @@ func UpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi func PreUpgradeActorsV4(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { // Use half the CPUs for pre-migration, but leave at least 3. - workerCount := runtime.NumCPU() + workerCount := MigrationMaxWorkerCount if workerCount <= 4 { workerCount = 1 } else { @@ -1079,7 +1104,7 @@ func upgradeActorsV4Common( func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { // Use all the CPUs except 3. - workerCount := runtime.NumCPU() - 3 + workerCount := MigrationMaxWorkerCount - 3 if workerCount <= 0 { workerCount = 1 } @@ -1101,7 +1126,7 @@ func UpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi func PreUpgradeActorsV5(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { // Use half the CPUs for pre-migration, but leave at least 3. - workerCount := runtime.NumCPU() + workerCount := MigrationMaxWorkerCount if workerCount <= 4 { workerCount = 1 } else { @@ -1165,7 +1190,7 @@ func upgradeActorsV5Common( func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { // Use all the CPUs except 3. - workerCount := runtime.NumCPU() - 3 + workerCount := MigrationMaxWorkerCount - 3 if workerCount <= 0 { workerCount = 1 } @@ -1187,7 +1212,7 @@ func UpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi func PreUpgradeActorsV6(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { // Use half the CPUs for pre-migration, but leave at least 3. - workerCount := runtime.NumCPU() + workerCount := MigrationMaxWorkerCount if workerCount <= 4 { workerCount = 1 } else { @@ -1251,7 +1276,7 @@ func upgradeActorsV6Common( func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { // Use all the CPUs except 3. - workerCount := runtime.NumCPU() - 3 + workerCount := MigrationMaxWorkerCount - 3 if workerCount <= 0 { workerCount = 1 } @@ -1273,7 +1298,7 @@ func UpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi func PreUpgradeActorsV7(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { // Use half the CPUs for pre-migration, but leave at least 3. - workerCount := runtime.NumCPU() + workerCount := MigrationMaxWorkerCount if workerCount <= 4 { workerCount = 1 } else { @@ -1344,7 +1369,7 @@ func upgradeActorsV7Common( func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { // Use all the CPUs except 3. - workerCount := runtime.NumCPU() - 3 + workerCount := MigrationMaxWorkerCount - 3 if workerCount <= 0 { workerCount = 1 } @@ -1368,7 +1393,7 @@ func UpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi func PreUpgradeActorsV8(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { // Use half the CPUs for pre-migration, but leave at least 3. - workerCount := runtime.NumCPU() + workerCount := MigrationMaxWorkerCount if workerCount <= 4 { workerCount = 1 } else { @@ -1451,7 +1476,7 @@ func upgradeActorsV8Common( func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, cb stmgr.ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { // Use all the CPUs except 3. - workerCount := runtime.NumCPU() - 3 + workerCount := MigrationMaxWorkerCount - 3 if workerCount <= 0 { workerCount = 1 } @@ -1474,7 +1499,7 @@ func UpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.Mi func PreUpgradeActorsV9(ctx context.Context, sm *stmgr.StateManager, cache stmgr.MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { // Use half the CPUs for pre-migration, but leave at least 3. - workerCount := runtime.NumCPU() + workerCount := MigrationMaxWorkerCount if workerCount <= 4 { workerCount = 1 } else {