wafcache.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
  1. #! /usr/bin/env python
  2. # encoding: utf-8
  3. # Thomas Nagy, 2019 (ita)
  4. """
  5. Filesystem-based cache system to share and re-use build artifacts
  6. Cache access operations (copy to and from) are delegated to
  7. independent pre-forked worker subprocesses.
  8. The following environment variables may be set:
  9. * WAFCACHE: several possibilities:
  10. - File cache:
  11. absolute path of the waf cache (~/.cache/wafcache_user,
  12. where `user` represents the currently logged-in user)
  13. - URL to a cache server, for example:
  14. export WAFCACHE=http://localhost:8080/files/
  15. in that case, GET/POST requests are made to urls of the form
  16. http://localhost:8080/files/000000000/0 (cache management is delegated to the server)
  17. - GCS, S3 or MINIO bucket
  18. gs://my-bucket/ (uses gsutil command line tool or WAFCACHE_CMD)
  19. s3://my-bucket/ (uses aws command line tool or WAFCACHE_CMD)
  20. minio://my-bucket/ (uses mc command line tool or WAFCACHE_CMD)
  21. * WAFCACHE_CMD: bucket upload/download command, for example:
  22. WAFCACHE_CMD="gsutil cp %{SRC} %{TGT}"
  23. Note that the WAFCACHE bucket value is used for the source or destination
  24. depending on the operation (upload or download). For example, with:
  25. WAFCACHE="gs://mybucket/"
  26. the following commands may be run:
  27. gsutil cp build/myprogram gs://mybucket/aa/aaaaa/1
  28. gsutil cp gs://mybucket/bb/bbbbb/2 build/somefile
  29. * WAFCACHE_NO_PUSH: if set, disables pushing to the cache
  30. * WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
  31. * WAFCACHE_STATS: if set, displays cache usage statistics on exit
  32. File cache specific options:
  33. Files are copied using hard links by default; if the cache is located
  34. onto another partition, the system switches to file copies instead.
  35. * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
  36. * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
  37. * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
  38. and trim the cache (3 minutes)
  39. Upload specific options:
  40. * WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously
  41. this may improve build performance with many/long file uploads
  42. the default is unset (synchronous uploads)
  43. * WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False)
  44. this requires asynchonous uploads to have an effect
  45. Usage::
  46. def build(bld):
  47. bld.load('wafcache')
  48. ...
  49. To troubleshoot::
  50. waf clean build --zone=wafcache
  51. """
  52. import atexit, base64, errno, getpass, os, re, shutil, sys, time, threading, traceback, shlex
  53. try:
  54. import subprocess32 as subprocess
  55. except ImportError:
  56. import subprocess
  57. base_cache = os.path.expanduser('~/.cache/')
  58. if not os.path.isdir(base_cache):
  59. base_cache = '/tmp/'
  60. default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
  61. CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
  62. WAFCACHE_CMD = os.environ.get('WAFCACHE_CMD')
  63. TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
  64. EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
  65. EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
  66. WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
  67. WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
  68. WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
  69. WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS')
  70. WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT')
  71. OK = "ok"
  72. re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
  73. try:
  74. import cPickle
  75. except ImportError:
  76. import pickle as cPickle
  77. if __name__ != '__main__':
  78. from waflib import Task, Logs, Utils, Build
  79. def can_retrieve_cache(self):
  80. """
  81. New method for waf Task classes
  82. """
  83. if not self.outputs:
  84. return False
  85. self.cached = False
  86. sig = self.signature()
  87. ssig = Utils.to_hex(self.uid() + sig)
  88. if WAFCACHE_STATS:
  89. self.generator.bld.cache_reqs += 1
  90. files_to = [node.abspath() for node in self.outputs]
  91. proc = get_process()
  92. err = cache_command(proc, ssig, [], files_to)
  93. process_pool.append(proc)
  94. if err.startswith(OK):
  95. if WAFCACHE_VERBOSITY:
  96. Logs.pprint('CYAN', ' Fetched %r from cache' % files_to)
  97. else:
  98. Logs.debug('wafcache: fetched %r from cache', files_to)
  99. if WAFCACHE_STATS:
  100. self.generator.bld.cache_hits += 1
  101. else:
  102. if WAFCACHE_VERBOSITY:
  103. Logs.pprint('YELLOW', ' No cache entry %s' % files_to)
  104. else:
  105. Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
  106. return False
  107. self.cached = True
  108. return True
  109. def put_files_cache(self):
  110. """
  111. New method for waf Task classes
  112. """
  113. if WAFCACHE_NO_PUSH or getattr(self, 'cached', None) or not self.outputs:
  114. return
  115. files_from = []
  116. for node in self.outputs:
  117. path = node.abspath()
  118. if not os.path.isfile(path):
  119. return
  120. files_from.append(path)
  121. bld = self.generator.bld
  122. old_sig = self.signature()
  123. for node in self.inputs:
  124. try:
  125. del node.ctx.cache_sig[node]
  126. except KeyError:
  127. pass
  128. delattr(self, 'cache_sig')
  129. sig = self.signature()
  130. def _async_put_files_cache(bld, ssig, files_from):
  131. proc = get_process()
  132. if WAFCACHE_ASYNC_WORKERS:
  133. with bld.wafcache_lock:
  134. if bld.wafcache_stop:
  135. process_pool.append(proc)
  136. return
  137. bld.wafcache_procs.add(proc)
  138. err = cache_command(proc, ssig, files_from, [])
  139. process_pool.append(proc)
  140. if err.startswith(OK):
  141. if WAFCACHE_VERBOSITY:
  142. Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from)
  143. else:
  144. Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
  145. if WAFCACHE_STATS:
  146. bld.cache_puts += 1
  147. else:
  148. if WAFCACHE_VERBOSITY:
  149. Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err))
  150. else:
  151. Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
  152. if old_sig == sig:
  153. ssig = Utils.to_hex(self.uid() + sig)
  154. if WAFCACHE_ASYNC_WORKERS:
  155. fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from)
  156. bld.wafcache_uploads.append(fut)
  157. else:
  158. _async_put_files_cache(bld, ssig, files_from)
  159. else:
  160. Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs)
  161. bld.task_sigs[self.uid()] = self.cache_sig
  162. def hash_env_vars(self, env, vars_lst):
  163. """
  164. Reimplement BuildContext.hash_env_vars so that the resulting hash does not depend on local paths
  165. """
  166. if not env.table:
  167. env = env.parent
  168. if not env:
  169. return Utils.SIG_NIL
  170. idx = str(id(env)) + str(vars_lst)
  171. try:
  172. cache = self.cache_env
  173. except AttributeError:
  174. cache = self.cache_env = {}
  175. else:
  176. try:
  177. return self.cache_env[idx]
  178. except KeyError:
  179. pass
  180. v = str([env[a] for a in vars_lst])
  181. v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
  182. m = Utils.md5()
  183. m.update(v.encode())
  184. ret = m.digest()
  185. Logs.debug('envhash: %r %r', ret, v)
  186. cache[idx] = ret
  187. return ret
  188. def uid(self):
  189. """
  190. Reimplement Task.uid() so that the signature does not depend on local paths
  191. """
  192. try:
  193. return self.uid_
  194. except AttributeError:
  195. m = Utils.md5()
  196. src = self.generator.bld.srcnode
  197. up = m.update
  198. up(self.__class__.__name__.encode())
  199. for x in self.inputs + self.outputs:
  200. up(x.path_from(src).encode())
  201. self.uid_ = m.digest()
  202. return self.uid_
  203. def make_cached(cls):
  204. """
  205. Enable the waf cache for a given task class
  206. """
  207. if getattr(cls, 'nocache', None) or getattr(cls, 'has_cache', False):
  208. return
  209. full_name = "%s.%s" % (cls.__module__, cls.__name__)
  210. if full_name in ('waflib.Tools.ccroot.vnum', 'waflib.Build.inst'):
  211. return
  212. m1 = getattr(cls, 'run', None)
  213. def run(self):
  214. if getattr(self, 'nocache', False):
  215. return m1(self)
  216. if self.can_retrieve_cache():
  217. return 0
  218. return m1(self)
  219. cls.run = run
  220. m2 = getattr(cls, 'post_run', None)
  221. def post_run(self):
  222. if getattr(self, 'nocache', False):
  223. return m2(self)
  224. ret = m2(self)
  225. self.put_files_cache()
  226. return ret
  227. cls.post_run = post_run
  228. cls.has_cache = True
  229. process_pool = []
  230. def get_process():
  231. """
  232. Returns a worker process that can process waf cache commands
  233. The worker process is assumed to be returned to the process pool when unused
  234. """
  235. try:
  236. return process_pool.pop()
  237. except IndexError:
  238. filepath = os.path.dirname(os.path.abspath(__file__)) + os.sep + 'wafcache.py'
  239. cmd = [sys.executable, '-c', Utils.readf(filepath)]
  240. return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
  241. def atexit_pool():
  242. for proc in process_pool:
  243. proc.kill()
  244. atexit.register(atexit_pool)
  245. def build(bld):
  246. """
  247. Called during the build process to enable file caching
  248. """
  249. if WAFCACHE_ASYNC_WORKERS:
  250. try:
  251. num_workers = int(WAFCACHE_ASYNC_WORKERS)
  252. except ValueError:
  253. Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS)
  254. else:
  255. from concurrent.futures import ThreadPoolExecutor
  256. bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers)
  257. bld.wafcache_uploads = []
  258. bld.wafcache_procs = set([])
  259. bld.wafcache_stop = False
  260. bld.wafcache_lock = threading.Lock()
  261. def finalize_upload_async(bld):
  262. if WAFCACHE_ASYNC_NOWAIT:
  263. with bld.wafcache_lock:
  264. bld.wafcache_stop = True
  265. for fut in reversed(bld.wafcache_uploads):
  266. fut.cancel()
  267. for proc in bld.wafcache_procs:
  268. proc.kill()
  269. bld.wafcache_procs.clear()
  270. else:
  271. Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads))
  272. bld.wafcache_executor.shutdown(wait=True)
  273. bld.add_post_fun(finalize_upload_async)
  274. if WAFCACHE_STATS:
  275. # Init counter for statistics and hook to print results at the end
  276. bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0
  277. def printstats(bld):
  278. hit_ratio = 0
  279. if bld.cache_reqs > 0:
  280. hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100
  281. Logs.pprint('CYAN', ' wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' %
  282. (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) )
  283. bld.add_post_fun(printstats)
  284. if process_pool:
  285. # already called once
  286. return
  287. # pre-allocation
  288. processes = [get_process() for x in range(bld.jobs)]
  289. process_pool.extend(processes)
  290. Task.Task.can_retrieve_cache = can_retrieve_cache
  291. Task.Task.put_files_cache = put_files_cache
  292. Task.Task.uid = uid
  293. Build.BuildContext.hash_env_vars = hash_env_vars
  294. for x in reversed(list(Task.classes.values())):
  295. make_cached(x)
  296. def cache_command(proc, sig, files_from, files_to):
  297. """
  298. Create a command for cache worker processes, returns a pickled
  299. base64-encoded tuple containing the task signature, a list of files to
  300. cache and a list of files files to get from cache (one of the lists
  301. is assumed to be empty)
  302. """
  303. obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
  304. proc.stdin.write(obj)
  305. proc.stdin.write('\n'.encode())
  306. proc.stdin.flush()
  307. obj = proc.stdout.readline()
  308. if not obj:
  309. raise OSError('Preforked sub-process %r died' % proc.pid)
  310. return cPickle.loads(base64.b64decode(obj))
  311. try:
  312. copyfun = os.link
  313. except NameError:
  314. copyfun = shutil.copy2
  315. def atomic_copy(orig, dest):
  316. """
  317. Copy files to the cache, the operation is atomic for a given file
  318. """
  319. global copyfun
  320. tmp = dest + '.tmp'
  321. up = os.path.dirname(dest)
  322. try:
  323. os.makedirs(up)
  324. except OSError:
  325. pass
  326. try:
  327. copyfun(orig, tmp)
  328. except OSError as e:
  329. if e.errno == errno.EXDEV:
  330. copyfun = shutil.copy2
  331. copyfun(orig, tmp)
  332. else:
  333. raise
  334. os.rename(tmp, dest)
  335. def lru_trim():
  336. """
  337. the cache folders take the form:
  338. `CACHE_DIR/0b/0b180f82246d726ece37c8ccd0fb1cde2650d7bfcf122ec1f169079a3bfc0ab9`
  339. they are listed in order of last access, and then removed
  340. until the amount of folders is within TRIM_MAX_FOLDERS and the total space
  341. taken by files is less than EVICT_MAX_BYTES
  342. """
  343. lst = []
  344. for up in os.listdir(CACHE_DIR):
  345. if len(up) == 2:
  346. sub = os.path.join(CACHE_DIR, up)
  347. for hval in os.listdir(sub):
  348. path = os.path.join(sub, hval)
  349. size = 0
  350. for fname in os.listdir(path):
  351. try:
  352. size += os.lstat(os.path.join(path, fname)).st_size
  353. except OSError:
  354. pass
  355. lst.append((os.stat(path).st_mtime, size, path))
  356. lst.sort(key=lambda x: x[0])
  357. lst.reverse()
  358. tot = sum(x[1] for x in lst)
  359. while tot > EVICT_MAX_BYTES or len(lst) > TRIM_MAX_FOLDERS:
  360. _, tmp_size, path = lst.pop()
  361. tot -= tmp_size
  362. tmp = path + '.remove'
  363. try:
  364. shutil.rmtree(tmp)
  365. except OSError:
  366. pass
  367. try:
  368. os.rename(path, tmp)
  369. except OSError:
  370. sys.stderr.write('Could not rename %r to %r\n' % (path, tmp))
  371. else:
  372. try:
  373. shutil.rmtree(tmp)
  374. except OSError:
  375. sys.stderr.write('Could not remove %r\n' % tmp)
  376. sys.stderr.write("Cache trimmed: %r bytes in %r folders left\n" % (tot, len(lst)))
  377. def lru_evict():
  378. """
  379. Reduce the cache size
  380. """
  381. lockfile = os.path.join(CACHE_DIR, 'all.lock')
  382. try:
  383. st = os.stat(lockfile)
  384. except EnvironmentError as e:
  385. if e.errno == errno.ENOENT:
  386. with open(lockfile, 'w') as f:
  387. f.write('')
  388. else:
  389. # any other errors such as permissions
  390. raise
  391. if st.st_mtime < time.time() - EVICT_INTERVAL_MINUTES * 60:
  392. # check every EVICT_INTERVAL_MINUTES minutes if the cache is too big
  393. # OCLOEXEC is unnecessary because no cleaning processes are spawned
  394. fd = os.open(lockfile, os.O_RDWR | os.O_CREAT, 0o755)
  395. try:
  396. try:
  397. import fcntl
  398. except ImportError:
  399. import msvcrt, ctypes, ctypes.wintypes
  400. handle = msvcrt.get_osfhandle(fd)
  401. kernel32 = ctypes.windll('kernel32', use_last_error=True)
  402. DWORD = ctypes.wintypes.DWORD
  403. HANDLE = ctypes.wintypes.HANDLE
  404. class DUMMYSTRUCTNAME(ctypes.Structure):
  405. _fields = [('Offset', ctypes.wintypes.DWORD), ('OffsetHigh', DWORD)]
  406. class DUMMYUNIONNAME(ctypes.Union):
  407. _fields_ = [('_dummystructname', DUMMYSTRUCTNAME), ('Pointer', ctypes.c_void_p)]
  408. class OVERLAPPED(ctypes.Structure):
  409. _fields_ = [('Internal', ctypes.c_void_p), ('InternalHigh', ctypes.c_void_p), ('_dummyunionname', DUMMYUNIONNAME), ('hEvent', HANDLE)]
  410. LockFileEx = kernel32.LockFileEx
  411. LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, POINTER(OVERLAPPED)]
  412. LockFileEx.restype = BOOL
  413. UnlockFileEx = kernel32.UnlockFileEx
  414. UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, POINTER(OVERLAPPED)]
  415. UnlockFileEx.restype = BOOL
  416. if LockFileEx(handle, 3, 0, 1, 0, ctypes.pointer(OVERLAPPED())):
  417. try:
  418. lru_trim()
  419. os.utime(lockfile, None)
  420. finally:
  421. win32file.UnlockFileEx(handle, 0, 1, 0, ctypes.pointer(OVERLAPPED()))
  422. else:
  423. last_error = kernel32.GetLastError()
  424. if last_error == 33:
  425. if WAFCACHE_VERBOSITY:
  426. sys.stderr.write('wafcache: another cleaning process is running\n')
  427. else:
  428. raise OSError(last_error)
  429. else:
  430. try:
  431. fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
  432. except EnvironmentError:
  433. if WAFCACHE_VERBOSITY:
  434. sys.stderr.write('wafcache: another cleaning process is running\n')
  435. else:
  436. # now dow the actual cleanup
  437. lru_trim()
  438. os.utime(lockfile, None)
  439. finally:
  440. os.close(fd)
  441. class netcache(object):
  442. def __init__(self):
  443. import urllib3
  444. self.http = urllib3.PoolManager()
  445. def url_of(self, sig, i):
  446. return "%s/%s/%s" % (CACHE_DIR, sig, i)
  447. def upload(self, file_path, sig, i):
  448. url = self.url_of(sig, i)
  449. with open(file_path, 'rb') as f:
  450. file_data = f.read()
  451. r = self.http.request('POST', url, timeout=60,
  452. fields={ 'file': ('%s/%s' % (sig, i), file_data), })
  453. if r.status >= 400:
  454. raise OSError("Invalid status %r %r" % (url, r.status))
  455. def download(self, file_path, sig, i):
  456. url = self.url_of(sig, i)
  457. with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
  458. if inf.status >= 400:
  459. raise OSError("Invalid status %r %r" % (url, inf.status))
  460. with open(file_path, 'wb') as out:
  461. shutil.copyfileobj(inf, out)
  462. def copy_to_cache(self, sig, files_from, files_to):
  463. try:
  464. for i, x in enumerate(files_from):
  465. if not os.path.islink(x):
  466. self.upload(x, sig, i)
  467. except Exception:
  468. return traceback.format_exc()
  469. return OK
  470. def copy_from_cache(self, sig, files_from, files_to):
  471. try:
  472. for i, x in enumerate(files_to):
  473. self.download(x, sig, i)
  474. except Exception:
  475. return traceback.format_exc()
  476. return OK
  477. class fcache(object):
  478. def __init__(self):
  479. if not os.path.exists(CACHE_DIR):
  480. try:
  481. os.makedirs(CACHE_DIR)
  482. except OSError:
  483. pass
  484. if not os.path.exists(CACHE_DIR):
  485. raise ValueError('Could not initialize the cache directory')
  486. def copy_to_cache(self, sig, files_from, files_to):
  487. """
  488. Copy files to the cache, existing files are overwritten,
  489. and the copy is atomic only for a given file, not for all files
  490. that belong to a given task object
  491. """
  492. try:
  493. for i, x in enumerate(files_from):
  494. dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
  495. atomic_copy(x, dest)
  496. except Exception:
  497. return traceback.format_exc()
  498. else:
  499. # attempt trimming if caching was successful:
  500. # we may have things to trim!
  501. try:
  502. lru_evict()
  503. except Exception:
  504. return traceback.format_exc()
  505. return OK
  506. def copy_from_cache(self, sig, files_from, files_to):
  507. """
  508. Copy files from the cache
  509. """
  510. try:
  511. for i, x in enumerate(files_to):
  512. orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
  513. atomic_copy(orig, x)
  514. # success! update the cache time
  515. os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
  516. except Exception:
  517. return traceback.format_exc()
  518. return OK
  519. class bucket_cache(object):
  520. def bucket_copy(self, source, target):
  521. if WAFCACHE_CMD:
  522. def replacer(match):
  523. if match.group('src'):
  524. return source
  525. elif match.group('tgt'):
  526. return target
  527. cmd = [re_waf_cmd.sub(replacer, x) for x in shlex.split(WAFCACHE_CMD)]
  528. elif CACHE_DIR.startswith('s3://'):
  529. cmd = ['aws', 's3', 'cp', source, target]
  530. elif CACHE_DIR.startswith('gs://'):
  531. cmd = ['gsutil', 'cp', source, target]
  532. else:
  533. cmd = ['mc', 'cp', source, target]
  534. proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  535. out, err = proc.communicate()
  536. if proc.returncode:
  537. raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % (
  538. source, target, cmd, proc.returncode, out.decode(errors='replace'), err.decode(errors='replace')))
  539. def copy_to_cache(self, sig, files_from, files_to):
  540. try:
  541. for i, x in enumerate(files_from):
  542. dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
  543. self.bucket_copy(x, dest)
  544. except Exception:
  545. return traceback.format_exc()
  546. return OK
  547. def copy_from_cache(self, sig, files_from, files_to):
  548. try:
  549. for i, x in enumerate(files_to):
  550. orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
  551. self.bucket_copy(orig, x)
  552. except EnvironmentError:
  553. return traceback.format_exc()
  554. return OK
  555. def loop(service):
  556. """
  557. This function is run when this file is run as a standalone python script,
  558. it assumes a parent process that will communicate the commands to it
  559. as pickled-encoded tuples (one line per command)
  560. The commands are to copy files to the cache or copy files from the
  561. cache to a target destination
  562. """
  563. # one operation is performed at a single time by a single process
  564. # therefore stdin never has more than one line
  565. txt = sys.stdin.readline().strip()
  566. if not txt:
  567. # parent process probably ended
  568. sys.exit(1)
  569. ret = OK
  570. [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
  571. if files_from:
  572. # TODO return early when pushing files upstream
  573. ret = service.copy_to_cache(sig, files_from, files_to)
  574. elif files_to:
  575. # the build process waits for workers to (possibly) obtain files from the cache
  576. ret = service.copy_from_cache(sig, files_from, files_to)
  577. else:
  578. ret = "Invalid command"
  579. obj = base64.b64encode(cPickle.dumps(ret))
  580. sys.stdout.write(obj.decode())
  581. sys.stdout.write('\n')
  582. sys.stdout.flush()
  583. if __name__ == '__main__':
  584. if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://') or CACHE_DIR.startswith('minio://'):
  585. if CACHE_DIR.startswith('minio://'):
  586. CACHE_DIR = CACHE_DIR[8:] # minio doesn't need the protocol part, uses config aliases
  587. service = bucket_cache()
  588. elif CACHE_DIR.startswith('http'):
  589. service = netcache()
  590. else:
  591. service = fcache()
  592. while 1:
  593. try:
  594. loop(service)
  595. except KeyboardInterrupt:
  596. break