2023-10-24 20:44:48 +00:00
|
|
|
# Copyright © 2022, 2023 Vulcanize
|
|
|
|
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU Affero General Public License for more details.
|
|
|
|
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
import typing
|
2024-02-08 06:43:41 +00:00
|
|
|
import humanfriendly
|
|
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
2023-11-07 07:06:55 +00:00
|
|
|
from stack_orchestrator.util import get_yaml
|
2023-11-21 23:04:36 +00:00
|
|
|
from stack_orchestrator import constants
|
2023-10-24 20:44:48 +00:00
|
|
|
|
|
|
|
|
2024-02-08 06:43:41 +00:00
|
|
|
class ResourceLimits:
|
|
|
|
cpus: float = None
|
|
|
|
memory: int = None
|
|
|
|
storage: int = None
|
|
|
|
|
|
|
|
def __init__(self, obj={}):
|
|
|
|
if "cpus" in obj:
|
|
|
|
self.cpus = float(obj["cpus"])
|
|
|
|
if "memory" in obj:
|
|
|
|
self.memory = humanfriendly.parse_size(obj["memory"])
|
|
|
|
if "storage" in obj:
|
|
|
|
self.storage = humanfriendly.parse_size(obj["storage"])
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
return len(self.__dict__)
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
for k in self.__dict__:
|
|
|
|
yield k, self.__dict__[k]
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return str(self.__dict__)
|
|
|
|
|
|
|
|
|
|
|
|
class Resources:
|
|
|
|
limits: ResourceLimits = None
|
|
|
|
reservations: ResourceLimits = None
|
|
|
|
|
|
|
|
def __init__(self, obj={}):
|
|
|
|
if "reservations" in obj:
|
|
|
|
self.reservations = ResourceLimits(obj["reservations"])
|
|
|
|
if "limits" in obj:
|
|
|
|
self.limits = ResourceLimits(obj["limits"])
|
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
return len(self.__dict__)
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
for k in self.__dict__:
|
|
|
|
yield k, self.__dict__[k]
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return str(self.__dict__)
|
|
|
|
|
|
|
|
|
2023-10-24 20:44:48 +00:00
|
|
|
class Spec:
|
|
|
|
|
|
|
|
obj: typing.Any
|
2024-01-31 05:09:48 +00:00
|
|
|
file_path: Path
|
2023-10-24 20:44:48 +00:00
|
|
|
|
2024-02-14 21:45:01 +00:00
|
|
|
def __init__(self, file_path: Path = None, obj={}) -> None:
|
|
|
|
self.file_path = file_path
|
|
|
|
self.obj = obj
|
|
|
|
|
|
|
|
def __getitem__(self, item):
|
|
|
|
return self.obj[item]
|
|
|
|
|
|
|
|
def __contains__(self, item):
|
|
|
|
return item in self.obj
|
|
|
|
|
|
|
|
def get(self, item, default=None):
|
|
|
|
return self.obj.get(item, default)
|
2023-10-24 20:44:48 +00:00
|
|
|
|
|
|
|
def init_from_file(self, file_path: Path):
|
|
|
|
with file_path:
|
|
|
|
self.obj = get_yaml().load(open(file_path, "r"))
|
2024-01-31 05:09:48 +00:00
|
|
|
self.file_path = file_path
|
2023-11-21 23:04:36 +00:00
|
|
|
|
|
|
|
def get_image_registry(self):
|
2024-02-14 21:45:01 +00:00
|
|
|
return (self.obj[constants.image_registry_key]
|
|
|
|
if self.obj and constants.image_registry_key in self.obj
|
2023-11-21 23:04:36 +00:00
|
|
|
else None)
|
|
|
|
|
2024-01-31 05:09:48 +00:00
|
|
|
def get_volumes(self):
|
|
|
|
return (self.obj["volumes"]
|
|
|
|
if self.obj and "volumes" in self.obj
|
|
|
|
else {})
|
|
|
|
|
|
|
|
def get_configmaps(self):
|
|
|
|
return (self.obj["configmaps"]
|
|
|
|
if self.obj and "configmaps" in self.obj
|
|
|
|
else {})
|
|
|
|
|
2024-02-07 22:48:02 +00:00
|
|
|
def get_container_resources(self):
|
2024-02-08 06:43:41 +00:00
|
|
|
return Resources(self.obj.get("resources", {}).get("containers", {}))
|
2024-02-07 22:48:02 +00:00
|
|
|
|
|
|
|
def get_volume_resources(self):
|
2024-02-08 06:43:41 +00:00
|
|
|
return Resources(self.obj.get("resources", {}).get("volumes", {}))
|
2024-02-07 22:48:02 +00:00
|
|
|
|
2023-11-21 23:04:36 +00:00
|
|
|
def get_http_proxy(self):
|
|
|
|
return (self.obj[constants.network_key][constants.http_proxy_key]
|
|
|
|
if self.obj and constants.network_key in self.obj
|
|
|
|
and constants.http_proxy_key in self.obj[constants.network_key]
|
|
|
|
else None)
|
2024-02-09 00:11:07 +00:00
|
|
|
|
|
|
|
def get_annotations(self):
|
|
|
|
return self.obj.get("annotations", {})
|
|
|
|
|
|
|
|
def get_labels(self):
|
|
|
|
return self.obj.get("labels", {})
|
|
|
|
|
|
|
|
def get_privileged(self):
|
|
|
|
return "true" == str(self.obj.get("security", {}).get("privileged", "false")).lower()
|
|
|
|
|
|
|
|
def get_capabilities(self):
|
|
|
|
return self.obj.get("security", {}).get("capabilities", [])
|
2024-02-14 21:45:01 +00:00
|
|
|
|
|
|
|
def get_deployment_type(self):
|
|
|
|
return self.obj[constants.deploy_to_key]
|
|
|
|
|
|
|
|
def is_kubernetes_deployment(self):
|
|
|
|
return self.get_deployment_type() in [constants.k8s_kind_deploy_type, constants.k8s_deploy_type]
|
|
|
|
|
|
|
|
def is_kind_deployment(self):
|
|
|
|
return self.get_deployment_type() in [constants.k8s_kind_deploy_type]
|
|
|
|
|
|
|
|
def is_docker_deployment(self):
|
|
|
|
return self.get_deployment_type() in [constants.compose_deploy_type]
|