Skip to content

Commit

Permalink
Merge pull request #752 from screamerbg/f/thread_safety
Browse files Browse the repository at this point in the history
Fix: Caching thread safety
  • Loading branch information
theotherjimmy authored Sep 12, 2018
2 parents 63e3fae + d0c1dd6 commit fa61b91
Showing 1 changed file with 118 additions and 3 deletions.
121 changes: 118 additions & 3 deletions mbed/mbed.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@
import errno
import ctypes
from itertools import chain, repeat
import time
import zipfile
import argparse
from random import randint
from contextlib import contextmanager


# Application version
Expand Down Expand Up @@ -155,7 +158,10 @@ def log(msg, is_error=False):
sys.stderr.write(msg) if is_error else sys.stdout.write(msg)

def message(msg):
return "[mbed] %s\n" % msg
if very_verbose:
return "[mbed-%s] %s\n" % (os.getpid(), msg)
else:
return "[mbed] %s\n" % msg

def info(msg, level=1):
if level <= 0 or verbose:
Expand Down Expand Up @@ -1223,7 +1229,8 @@ def clone(self, url, path, rev=None, depth=None, protocol=None, offline=False, *
os.makedirs(os.path.split(path)[0])

info("Carbon copy from \"%s\" to \"%s\"" % (cache, path))
shutil.copytree(cache, path)
with self.cache_lock_held(url):
shutil.copytree(cache, path)

with cd(path):
scm.seturl(formaturl(url, protocol))
Expand Down Expand Up @@ -1253,7 +1260,8 @@ def clone(self, url, path, rev=None, depth=None, protocol=None, offline=False, *
self.url = url
self.path = os.path.abspath(path)
self.ignores()
self.set_cache(url)
with self.cache_lock_held(url):
self.set_cache(url)
return True

if offline:
Expand Down Expand Up @@ -1336,6 +1344,113 @@ def set_cache(self, url):
warning("Unable to cache \"%s\" to \"%s\"" % (self.path, cpath))
return False

def cache_lock(self, url):
cpath = self.url2cachedir(url)
if not cpath:
return False

if not os.path.isdir(cpath):
os.makedirs(cpath)

lock_dir = os.path.join(cpath, '.lock')
lock_file = os.path.join(lock_dir, 'pid')
timeout = 300

for i in range(timeout):
if i:
time.sleep(1)

if os.path.exists(lock_dir):
try:
if os.path.isfile(lock_file):
with open(lock_file, 'r') as f:
pid = f.read(8)
if not pid:
if int(os.path.getmtime(lock_file)) + timeout < int(time.time()):
info("Cache lock file exists, but is empty. Cleaning up")
os.remove(lock_file)
os.rmdir(lock_dir)
elif int(pid) != os.getpid() and self.pid_exists(pid):
info("Cache lock file exists and process %s is alive." % pid)
else:
info("Cache lock file exists, but %s is dead. Cleaning up" % pid)
os.remove(lock_file)
os.rmdir(lock_dir)
else:
os.rmdir(lock_dir)
continue
except (OSError) as e:
continue
else:
try:
os.mkdir(lock_dir)
with open(lock_file, 'w') as f:
info("Writing cache lock file %s for pid %s" % (lock_file, os.getpid()))
f.write(str(os.getpid()))
f.flush()
os.fsync(f)
break
except (OSError) as e:
## Windows:
## <type 'exceptions.WindowsError'> 17 [Error 183] Cannot create a file when that file already exists: 'testing'
## or when concurrent: 13 WindowsError(5, 'Access is denied')
## Linux: <type 'exceptions.OSError'> 17 [Errno 17] File exists: 'testing'
## or when concurrent & virtualbox 71, OSError(71, 'Protocol error')
## or when full: 28, OSError(28, 'No space left on device')
if e.errno in (17,13,71,28):
continue
else:
raise e
else:
error("Exceeded 5 minutes limit while waiting for other process to finish caching")
return True

def cache_unlock(self, url):
cpath = self.url2cachedir(url)
if not cpath:
return False

lock_dir = os.path.join(cpath, '.lock')
lock_file = os.path.join(lock_dir, 'pid')
try:
if os.path.exists(lock_dir):
if os.path.isfile(lock_file):
try:
with open(lock_file, 'r') as f:
pid = f.read(8)
if int(pid) != os.getpid():
error("Cache lock file exists with a different pid (\"%s\" vs \"%s\")" % (pid, os.getpid()))
else:
info("Cache lock file exists with my pid (\"%s\"). Cleaning up." % (pid))
except OSError:
error("Unable to unlock cache dir \"%s\"" % (cpath))
os.remove(lock_file)
os.rmdir(lock_dir)
except (OSError) as e:
pass
return True

@contextmanager
def cache_lock_held(self, url):
self.cache_lock(url)
try:
yield
finally:
self.cache_unlock(url)

def pid_exists(self, pid):
try:
os.kill(int(pid), 0)
except OSError as err:
if err.errno == errno.ESRCH:
return False
elif err.errno == errno.EPERM:
return True
else:
raise err
else:
return True

def can_update(self, clean, clean_deps):
err = None
if (self.is_local or self.url is None) and not clean_deps:
Expand Down

0 comments on commit fa61b91

Please sign in to comment.