Use file existence for registry mutex #959

Merged
ashwin merged 3 commits from deep-stack/stack-orchestrator:pm-nonblocking-mutex into main 2024-10-29 04:05:36 +00:00
Showing only changes of commit b3d20a7478 - Show all commits

View File

@ -1,13 +1,52 @@
import fcntl
from functools import wraps
import os
import time
# Define default file path for the lock
DEFAULT_LOCK_FILE_PATH = "/tmp/registry_mutex_lock_file"
LOCK_TIMEOUT = 30
LOCK_RETRY_INTERVAL = 3
def acquire_lock(lock_file_path, timeout):
while True:
try:
# Check if lock file exists and is potentially stale
if os.path.exists(lock_file_path):
with open(lock_file_path, 'r') as lock_file:
timestamp = float(lock_file.read().strip())
# If lock is stale, remove the lock file
if time.time() - timestamp > timeout:
print(f"Stale lock detected, removing lock file {lock_file_path}")
os.remove(lock_file_path)
else:
print(f"Lock file {lock_file_path} exists and is recent, waiting...")
time.sleep(LOCK_RETRY_INTERVAL)
continue
# Try to create a new lock file with the current timestamp
fd = os.open(lock_file_path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
with os.fdopen(fd, 'w') as lock_file:
lock_file.write(str(time.time()))
print(f"Registry lock acquired, {lock_file_path}")
# Lock successfully acquired
return
except FileExistsError:
print(f"Lock file {lock_file_path} exists, waiting...")
time.sleep(LOCK_RETRY_INTERVAL)
def release_lock(lock_file_path):
try:
os.remove(lock_file_path)
print(f"Registry lock released, {lock_file_path}")
except FileNotFoundError:
# Lock file already removed
pass
def registry_mutex():
def decorator(func):
@wraps(func)
@ -16,27 +55,13 @@ def registry_mutex():
if self.mutex_lock_file:
lock_file_path = self.mutex_lock_file
result = None
with open(lock_file_path, 'w') as lock_file:
while True:
try:
# Try to acquire the lock
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
print(f"Registry lock acquired, {lock_file_path}")
# Call the actual function
result = func(self, *args, **kwargs)
break
except BlockingIOError:
# Retry on error
print(f"Not able to acquire lock on {lock_file_path}, retrying in {LOCK_RETRY_INTERVAL}s...")
time.sleep(LOCK_RETRY_INTERVAL)
# Always release the lock
fcntl.flock(lock_file, fcntl.LOCK_UN)
print(f"Registry lock released, {lock_file_path}")
return result
# Acquire the lock before running the function
acquire_lock(lock_file_path, LOCK_TIMEOUT)
try:
return func(self, *args, **kwargs)
finally:
# Release the lock after the function completes
release_lock(lock_file_path)
return wrapper