diff --git a/stack_orchestrator/deploy/webapp/registry_mutex.py b/stack_orchestrator/deploy/webapp/registry_mutex.py index 5e8e0219..c1fb3a22 100644 --- a/stack_orchestrator/deploy/webapp/registry_mutex.py +++ b/stack_orchestrator/deploy/webapp/registry_mutex.py @@ -1,13 +1,52 @@ import fcntl from functools import wraps +import os import time # Define default file path for the lock DEFAULT_LOCK_FILE_PATH = "/tmp/registry_mutex_lock_file" - +LOCK_TIMEOUT = 30 LOCK_RETRY_INTERVAL = 3 +def acquire_lock(lock_file_path, timeout): + while True: + try: + # Check if lock file exists and is potentially stale + if os.path.exists(lock_file_path): + with open(lock_file_path, 'r') as lock_file: + timestamp = float(lock_file.read().strip()) + + # If lock is stale, remove the lock file + if time.time() - timestamp > timeout: + print(f"Stale lock detected, removing lock file {lock_file_path}") + os.remove(lock_file_path) + else: + print(f"Lock file {lock_file_path} exists and is recent, waiting...") + time.sleep(LOCK_RETRY_INTERVAL) + continue + + # Try to create a new lock file with the current timestamp + fd = os.open(lock_file_path, os.O_CREAT | os.O_EXCL | os.O_RDWR) + with os.fdopen(fd, 'w') as lock_file: + lock_file.write(str(time.time())) + print(f"Registry lock acquired, {lock_file_path}") + + # Lock successfully acquired + return + + except FileExistsError: + print(f"Lock file {lock_file_path} exists, waiting...") + time.sleep(LOCK_RETRY_INTERVAL) + +def release_lock(lock_file_path): + try: + os.remove(lock_file_path) + print(f"Registry lock released, {lock_file_path}") + except FileNotFoundError: + # Lock file already removed + pass + def registry_mutex(): def decorator(func): @wraps(func) @@ -16,27 +55,13 @@ def registry_mutex(): if self.mutex_lock_file: lock_file_path = self.mutex_lock_file - result = None - with open(lock_file_path, 'w') as lock_file: - while True: - try: - # Try to acquire the lock - fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) - print(f"Registry lock acquired, {lock_file_path}") - - # Call the actual function - result = func(self, *args, **kwargs) - break - except BlockingIOError: - # Retry on error - print(f"Not able to acquire lock on {lock_file_path}, retrying in {LOCK_RETRY_INTERVAL}s...") - time.sleep(LOCK_RETRY_INTERVAL) - - # Always release the lock - fcntl.flock(lock_file, fcntl.LOCK_UN) - print(f"Registry lock released, {lock_file_path}") - - return result + # Acquire the lock before running the function + acquire_lock(lock_file_path, LOCK_TIMEOUT) + try: + return func(self, *args, **kwargs) + finally: + # Release the lock after the function completes + release_lock(lock_file_path) return wrapper