From 59e12f0ab170ce832a17bedd611badd4b48013de Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Mon, 28 Oct 2024 14:32:19 +0530 Subject: [PATCH 1/3] Use non-blocking mutex on registry CLI with retry on error --- .../deploy/webapp/registry_mutex.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/stack_orchestrator/deploy/webapp/registry_mutex.py b/stack_orchestrator/deploy/webapp/registry_mutex.py index 7c835f19..5e8e0219 100644 --- a/stack_orchestrator/deploy/webapp/registry_mutex.py +++ b/stack_orchestrator/deploy/webapp/registry_mutex.py @@ -1,9 +1,12 @@ import fcntl from functools import wraps +import time # Define default file path for the lock DEFAULT_LOCK_FILE_PATH = "/tmp/registry_mutex_lock_file" +LOCK_RETRY_INTERVAL = 3 + def registry_mutex(): def decorator(func): @@ -13,16 +16,25 @@ def registry_mutex(): if self.mutex_lock_file: lock_file_path = self.mutex_lock_file + result = None with open(lock_file_path, 'w') as lock_file: - try: - # Try to acquire the lock - fcntl.flock(lock_file, fcntl.LOCK_EX) + while True: + try: + # Try to acquire the lock + fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) + print(f"Registry lock acquired, {lock_file_path}") - # Call the actual function - result = func(self, *args, **kwargs) - finally: - # Always release the lock - fcntl.flock(lock_file, fcntl.LOCK_UN) + # Call the actual function + result = func(self, *args, **kwargs) + break + except BlockingIOError: + # Retry on error + print(f"Not able to acquire lock on {lock_file_path}, retrying in {LOCK_RETRY_INTERVAL}s...") + time.sleep(LOCK_RETRY_INTERVAL) + + # Always release the lock + fcntl.flock(lock_file, fcntl.LOCK_UN) + print(f"Registry lock released, {lock_file_path}") return result -- 2.45.2 From b3d20a74782d72555c0afbfe5e784b4981ac2984 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Mon, 28 Oct 2024 16:45:18 +0530 Subject: [PATCH 2/3] Use file existence for registry mutex --- .../deploy/webapp/registry_mutex.py | 69 +++++++++++++------ 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/stack_orchestrator/deploy/webapp/registry_mutex.py b/stack_orchestrator/deploy/webapp/registry_mutex.py index 5e8e0219..c1fb3a22 100644 --- a/stack_orchestrator/deploy/webapp/registry_mutex.py +++ b/stack_orchestrator/deploy/webapp/registry_mutex.py @@ -1,13 +1,52 @@ import fcntl from functools import wraps +import os import time # Define default file path for the lock DEFAULT_LOCK_FILE_PATH = "/tmp/registry_mutex_lock_file" - +LOCK_TIMEOUT = 30 LOCK_RETRY_INTERVAL = 3 +def acquire_lock(lock_file_path, timeout): + while True: + try: + # Check if lock file exists and is potentially stale + if os.path.exists(lock_file_path): + with open(lock_file_path, 'r') as lock_file: + timestamp = float(lock_file.read().strip()) + + # If lock is stale, remove the lock file + if time.time() - timestamp > timeout: + print(f"Stale lock detected, removing lock file {lock_file_path}") + os.remove(lock_file_path) + else: + print(f"Lock file {lock_file_path} exists and is recent, waiting...") + time.sleep(LOCK_RETRY_INTERVAL) + continue + + # Try to create a new lock file with the current timestamp + fd = os.open(lock_file_path, os.O_CREAT | os.O_EXCL | os.O_RDWR) + with os.fdopen(fd, 'w') as lock_file: + lock_file.write(str(time.time())) + print(f"Registry lock acquired, {lock_file_path}") + + # Lock successfully acquired + return + + except FileExistsError: + print(f"Lock file {lock_file_path} exists, waiting...") + time.sleep(LOCK_RETRY_INTERVAL) + +def release_lock(lock_file_path): + try: + os.remove(lock_file_path) + print(f"Registry lock released, {lock_file_path}") + except FileNotFoundError: + # Lock file already removed + pass + def registry_mutex(): def decorator(func): @wraps(func) @@ -16,27 +55,13 @@ def registry_mutex(): if self.mutex_lock_file: lock_file_path = self.mutex_lock_file - result = None - with open(lock_file_path, 'w') as lock_file: - while True: - try: - # Try to acquire the lock - fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) - print(f"Registry lock acquired, {lock_file_path}") - - # Call the actual function - result = func(self, *args, **kwargs) - break - except BlockingIOError: - # Retry on error - print(f"Not able to acquire lock on {lock_file_path}, retrying in {LOCK_RETRY_INTERVAL}s...") - time.sleep(LOCK_RETRY_INTERVAL) - - # Always release the lock - fcntl.flock(lock_file, fcntl.LOCK_UN) - print(f"Registry lock released, {lock_file_path}") - - return result + # Acquire the lock before running the function + acquire_lock(lock_file_path, LOCK_TIMEOUT) + try: + return func(self, *args, **kwargs) + finally: + # Release the lock after the function completes + release_lock(lock_file_path) return wrapper -- 2.45.2 From 6caf131236a9b720c5d173e078d3058d2d635ae6 Mon Sep 17 00:00:00 2001 From: Prathamesh Musale Date: Mon, 28 Oct 2024 18:57:10 +0530 Subject: [PATCH 3/3] Check if lock already acquired by current client --- .../deploy/webapp/registry_mutex.py | 19 ++++++++++++++----- stack_orchestrator/deploy/webapp/util.py | 4 +++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/stack_orchestrator/deploy/webapp/registry_mutex.py b/stack_orchestrator/deploy/webapp/registry_mutex.py index c1fb3a22..e464f58d 100644 --- a/stack_orchestrator/deploy/webapp/registry_mutex.py +++ b/stack_orchestrator/deploy/webapp/registry_mutex.py @@ -1,4 +1,3 @@ -import fcntl from functools import wraps import os import time @@ -9,7 +8,11 @@ LOCK_TIMEOUT = 30 LOCK_RETRY_INTERVAL = 3 -def acquire_lock(lock_file_path, timeout): +def acquire_lock(client, lock_file_path, timeout): + # Lock alreay acquired by the current client + if client.mutex_lock_acquired: + return + while True: try: # Check if lock file exists and is potentially stale @@ -30,6 +33,8 @@ def acquire_lock(lock_file_path, timeout): fd = os.open(lock_file_path, os.O_CREAT | os.O_EXCL | os.O_RDWR) with os.fdopen(fd, 'w') as lock_file: lock_file.write(str(time.time())) + + client.mutex_lock_acquired = True print(f"Registry lock acquired, {lock_file_path}") # Lock successfully acquired @@ -39,14 +44,18 @@ def acquire_lock(lock_file_path, timeout): print(f"Lock file {lock_file_path} exists, waiting...") time.sleep(LOCK_RETRY_INTERVAL) -def release_lock(lock_file_path): + +def release_lock(client, lock_file_path): try: os.remove(lock_file_path) + + client.mutex_lock_acquired = False print(f"Registry lock released, {lock_file_path}") except FileNotFoundError: # Lock file already removed pass + def registry_mutex(): def decorator(func): @wraps(func) @@ -56,12 +65,12 @@ def registry_mutex(): lock_file_path = self.mutex_lock_file # Acquire the lock before running the function - acquire_lock(lock_file_path, LOCK_TIMEOUT) + acquire_lock(self, lock_file_path, LOCK_TIMEOUT) try: return func(self, *args, **kwargs) finally: # Release the lock after the function completes - release_lock(lock_file_path) + release_lock(self, lock_file_path) return wrapper diff --git a/stack_orchestrator/deploy/webapp/util.py b/stack_orchestrator/deploy/webapp/util.py index e587787e..dd3bfe96 100644 --- a/stack_orchestrator/deploy/webapp/util.py +++ b/stack_orchestrator/deploy/webapp/util.py @@ -117,7 +117,6 @@ class LaconicRegistryClient: def __init__(self, config_file, log_file=None, mutex_lock_file=None): self.config_file = config_file self.log_file = log_file - self.mutex_lock_file = mutex_lock_file self.cache = AttrDict( { "name_or_id": {}, @@ -126,6 +125,9 @@ class LaconicRegistryClient: } ) + self.mutex_lock_file = mutex_lock_file + self.mutex_lock_acquired = False + def whoami(self, refresh=False): if not refresh and "whoami" in self.cache: return self.cache["whoami"] -- 2.45.2