test_encryption.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. """Test encryption"""
  2. import os
  3. import pipes
  4. import pytest
  5. KEY_FILE = 'test/test_key'
  6. KEY_FINGERPRINT = 'F8BBFC746C58945442349BCEBA54FFD04C599B1A'
  7. KEY_NAME = 'yadm-test1'
  8. KEY_TRUST = 'test/ownertrust.txt'
  9. PASSPHRASE = 'ExamplePassword'
  10. pytestmark = pytest.mark.usefixtures('config_git')
  11. def add_asymmetric_key():
  12. """Add asymmetric key"""
  13. os.system(f'gpg --import {pipes.quote(KEY_FILE)}')
  14. os.system(f'gpg --import-ownertrust < {pipes.quote(KEY_TRUST)}')
  15. def remove_asymmetric_key():
  16. """Remove asymmetric key"""
  17. os.system(
  18. f'gpg --batch --yes '
  19. f'--delete-secret-keys {pipes.quote(KEY_FINGERPRINT)}')
  20. os.system(f'gpg --batch --yes --delete-key {pipes.quote(KEY_FINGERPRINT)}')
  21. @pytest.fixture
  22. def asymmetric_key():
  23. """Fixture for asymmetric key, removed in teardown"""
  24. add_asymmetric_key()
  25. yield KEY_NAME
  26. remove_asymmetric_key()
  27. @pytest.fixture
  28. def encrypt_targets(yadm_y, paths):
  29. """Fixture for setting up data to encrypt
  30. This fixture:
  31. * inits an empty repo
  32. * creates test files in the work tree
  33. * creates a ".yadm/encrypt" file for testing:
  34. * standard files
  35. * standard globs
  36. * directories
  37. * comments
  38. * empty lines and lines with just space
  39. * exclusions
  40. * returns a list of expected encrypted files
  41. """
  42. # init empty yadm repo
  43. os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f')))
  44. expected = []
  45. # standard files w/ dirs & spaces
  46. paths.work.join('inc file1').write('inc file1')
  47. expected.append('inc file1')
  48. paths.encrypt.write('inc file1\n')
  49. paths.work.join('inc dir').mkdir()
  50. paths.work.join('inc dir/inc file2').write('inc file2')
  51. expected.append('inc dir/inc file2')
  52. paths.encrypt.write('inc dir/inc file2\n', mode='a')
  53. # standard globs w/ dirs & spaces
  54. paths.work.join('globs file1').write('globs file1')
  55. expected.append('globs file1')
  56. paths.work.join('globs dir').mkdir()
  57. paths.work.join('globs dir/globs file2').write('globs file2')
  58. expected.append('globs dir/globs file2')
  59. paths.encrypt.write('globs*\n', mode='a')
  60. # blank lines
  61. paths.encrypt.write('\n \n\t\n', mode='a')
  62. # comments
  63. paths.work.join('commentfile1').write('commentfile1')
  64. paths.encrypt.write('#commentfile1\n', mode='a')
  65. paths.encrypt.write(' #commentfile1\n', mode='a')
  66. # exclusions
  67. paths.work.join('extest').mkdir()
  68. paths.encrypt.write('extest/*\n', mode='a') # include within extest
  69. paths.work.join('extest/inglob1').write('inglob1')
  70. paths.work.join('extest/exglob1').write('exglob1')
  71. paths.work.join('extest/exglob2').write('exglob2')
  72. paths.encrypt.write('!extest/ex*\n', mode='a') # exclude the ex*
  73. expected.append('extest/inglob1') # should be left with only in*
  74. return expected
  75. @pytest.fixture(scope='session')
  76. def decrypt_targets(tmpdir_factory, runner):
  77. """Fixture for setting data to decrypt
  78. This fixture:
  79. * creates symmetric/asymmetric encrypted archives
  80. * creates a list of expected decrypted files
  81. """
  82. tmpdir = tmpdir_factory.mktemp('decrypt_targets')
  83. symmetric = tmpdir.join('symmetric.tar.gz.gpg')
  84. asymmetric = tmpdir.join('asymmetric.tar.gz.gpg')
  85. expected = []
  86. tmpdir.join('decrypt1').write('decrypt1')
  87. expected.append('decrypt1')
  88. tmpdir.join('decrypt2').write('decrypt2')
  89. expected.append('decrypt2')
  90. tmpdir.join('subdir').mkdir()
  91. tmpdir.join('subdir/decrypt3').write('subdir/decrypt3')
  92. expected.append('subdir/decrypt3')
  93. run = runner(
  94. ['tar', 'cvf', '-'] +
  95. expected +
  96. ['|', 'gpg', '--batch', '--yes', '-c'] +
  97. ['--passphrase', pipes.quote(PASSPHRASE)] +
  98. ['--output', pipes.quote(str(symmetric))],
  99. cwd=tmpdir,
  100. shell=True)
  101. assert run.success
  102. add_asymmetric_key()
  103. run = runner(
  104. ['tar', 'cvf', '-'] +
  105. expected +
  106. ['|', 'gpg', '--batch', '--yes', '-e'] +
  107. ['-r', pipes.quote(KEY_NAME)] +
  108. ['--output', pipes.quote(str(asymmetric))],
  109. cwd=tmpdir,
  110. shell=True)
  111. assert run.success
  112. remove_asymmetric_key()
  113. return {
  114. 'asymmetric': asymmetric,
  115. 'expected': expected,
  116. 'symmetric': symmetric,
  117. }
  118. @pytest.mark.parametrize(
  119. 'mismatched_phrase', [False, True],
  120. ids=['matching_phrase', 'mismatched_phrase'])
  121. @pytest.mark.parametrize(
  122. 'missing_encrypt', [False, True],
  123. ids=['encrypt_exists', 'encrypt_missing'])
  124. @pytest.mark.parametrize(
  125. 'overwrite', [False, True],
  126. ids=['clean', 'overwrite'])
  127. def test_symmetric_encrypt(
  128. runner, yadm_y, paths, encrypt_targets,
  129. overwrite, missing_encrypt, mismatched_phrase):
  130. """Test symmetric encryption"""
  131. if missing_encrypt:
  132. paths.encrypt.remove()
  133. matched_phrase = PASSPHRASE
  134. if mismatched_phrase:
  135. matched_phrase = 'mismatched'
  136. if overwrite:
  137. paths.archive.write('existing archive')
  138. run = runner(yadm_y('encrypt'), expect=[
  139. ('passphrase:', PASSPHRASE),
  140. ('passphrase:', matched_phrase),
  141. ])
  142. if missing_encrypt or mismatched_phrase:
  143. assert run.failure
  144. else:
  145. assert run.success
  146. assert run.err == ''
  147. if missing_encrypt:
  148. assert 'does not exist' in run.out
  149. elif mismatched_phrase:
  150. assert 'invalid passphrase' in run.out
  151. else:
  152. assert encrypted_data_valid(runner, paths.archive, encrypt_targets)
  153. @pytest.mark.parametrize(
  154. 'wrong_phrase', [False, True],
  155. ids=['correct_phrase', 'wrong_phrase'])
  156. @pytest.mark.parametrize(
  157. 'archive_exists', [True, False],
  158. ids=['archive_exists', 'archive_missing'])
  159. @pytest.mark.parametrize(
  160. 'dolist', [False, True],
  161. ids=['decrypt', 'list'])
  162. def test_symmetric_decrypt(
  163. runner, yadm_y, paths, decrypt_targets,
  164. dolist, archive_exists, wrong_phrase):
  165. """Test decryption"""
  166. # init empty yadm repo
  167. os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f')))
  168. phrase = PASSPHRASE
  169. if wrong_phrase:
  170. phrase = 'wrong-phrase'
  171. if archive_exists:
  172. decrypt_targets['symmetric'].copy(paths.archive)
  173. # to test overwriting
  174. paths.work.join('decrypt1').write('pre-existing file')
  175. args = []
  176. if dolist:
  177. args.append('-l')
  178. run = runner(yadm_y('decrypt') + args, expect=[('passphrase:', phrase)])
  179. if archive_exists and not wrong_phrase:
  180. assert run.success
  181. assert run.err == ''
  182. if dolist:
  183. for filename in decrypt_targets['expected']:
  184. if filename != 'decrypt1': # this one should exist
  185. assert not paths.work.join(filename).exists()
  186. assert filename in run.out
  187. else:
  188. for filename in decrypt_targets['expected']:
  189. assert paths.work.join(filename).read() == filename
  190. else:
  191. assert run.failure
  192. @pytest.mark.usefixtures('asymmetric_key')
  193. @pytest.mark.parametrize(
  194. 'ask', [False, True],
  195. ids=['no_ask', 'ask'])
  196. @pytest.mark.parametrize(
  197. 'key_exists', [True, False],
  198. ids=['key_exists', 'key_missing'])
  199. @pytest.mark.parametrize(
  200. 'overwrite', [False, True],
  201. ids=['clean', 'overwrite'])
  202. def test_asymmetric_encrypt(
  203. runner, yadm_y, paths, encrypt_targets,
  204. overwrite, key_exists, ask):
  205. """Test asymmetric encryption"""
  206. # specify encryption recipient
  207. if ask:
  208. os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', 'ASK')))
  209. expect = [('Enter the user ID', KEY_NAME), ('Enter the user ID', '')]
  210. else:
  211. os.system(' '.join(yadm_y('config', 'yadm.gpg-recipient', KEY_NAME)))
  212. expect = []
  213. if overwrite:
  214. paths.archive.write('existing archive')
  215. if not key_exists:
  216. remove_asymmetric_key()
  217. run = runner(yadm_y('encrypt'), expect=expect)
  218. if key_exists:
  219. assert run.success
  220. assert encrypted_data_valid(runner, paths.archive, encrypt_targets)
  221. else:
  222. assert run.failure
  223. assert 'Unable to write' in run.out
  224. if ask:
  225. assert 'Enter the user ID' in run.out
  226. @pytest.mark.usefixtures('asymmetric_key')
  227. @pytest.mark.parametrize(
  228. 'key_exists', [True, False],
  229. ids=['key_exists', 'key_missing'])
  230. @pytest.mark.parametrize(
  231. 'dolist', [False, True],
  232. ids=['decrypt', 'list'])
  233. def test_asymmetric_decrypt(
  234. runner, yadm_y, paths, decrypt_targets,
  235. dolist, key_exists):
  236. """Test decryption"""
  237. # init empty yadm repo
  238. os.system(' '.join(yadm_y('init', '-w', str(paths.work), '-f')))
  239. decrypt_targets['asymmetric'].copy(paths.archive)
  240. # to test overwriting
  241. paths.work.join('decrypt1').write('pre-existing file')
  242. if not key_exists:
  243. remove_asymmetric_key()
  244. args = []
  245. if dolist:
  246. args.append('-l')
  247. run = runner(yadm_y('decrypt') + args)
  248. if key_exists:
  249. assert run.success
  250. if dolist:
  251. for filename in decrypt_targets['expected']:
  252. if filename != 'decrypt1': # this one should exist
  253. assert not paths.work.join(filename).exists()
  254. assert filename in run.out
  255. else:
  256. for filename in decrypt_targets['expected']:
  257. assert paths.work.join(filename).read() == filename
  258. else:
  259. assert run.failure
  260. assert 'Unable to extract encrypted files' in run.out
  261. @pytest.mark.parametrize(
  262. 'untracked',
  263. [False, 'y', 'n'],
  264. ids=['tracked', 'untracked_answer_y', 'untracked_answer_n'])
  265. def test_offer_to_add(runner, yadm_y, paths, encrypt_targets, untracked):
  266. """Test offer to add encrypted archive
  267. All the other encryption tests use an archive outside of the work tree.
  268. However, the archive is often inside the work tree, and if it is, there
  269. should be an offer to add it to the repo if it is not tracked.
  270. """
  271. worktree_archive = paths.work.join('worktree-archive.tar.gpg')
  272. expect = [
  273. ('passphrase:', PASSPHRASE),
  274. ('passphrase:', PASSPHRASE),
  275. ]
  276. if untracked:
  277. expect.append(('add it now', untracked))
  278. else:
  279. worktree_archive.write('exists')
  280. os.system(' '.join(yadm_y('add', str(worktree_archive))))
  281. run = runner(
  282. yadm_y('encrypt', '--yadm-archive', str(worktree_archive)),
  283. expect=expect
  284. )
  285. assert run.success
  286. assert run.err == ''
  287. assert encrypted_data_valid(runner, worktree_archive, encrypt_targets)
  288. run = runner(
  289. yadm_y('status', '--porcelain', '-uall', str(worktree_archive)))
  290. assert run.success
  291. assert run.err == ''
  292. if untracked == 'y':
  293. # should be added to the index
  294. assert f'A {worktree_archive.basename}' in run.out
  295. elif untracked == 'n':
  296. # should NOT be added to the index
  297. assert f'?? {worktree_archive.basename}' in run.out
  298. else:
  299. # should appear modified in the index
  300. assert f'AM {worktree_archive.basename}' in run.out
  301. def encrypted_data_valid(runner, encrypted, expected):
  302. """Verify encrypted data matches expectations"""
  303. run = runner([
  304. 'gpg',
  305. '--passphrase', pipes.quote(PASSPHRASE),
  306. '-d', pipes.quote(str(encrypted)),
  307. '2>/dev/null',
  308. '|', 'tar', 't'], shell=True, report=False)
  309. file_count = 0
  310. for filename in run.out.splitlines():
  311. if filename.endswith('/'):
  312. continue
  313. file_count += 1
  314. assert filename in expected, (
  315. f'Unexpected file in archive: {filename}')
  316. assert file_count == len(expected), (
  317. 'Number of files in archive does not match expected')
  318. return True