tasks.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. import json
  2. import os
  3. import re
  4. import shutil
  5. import subprocess
  6. import tempfile
  7. from datetime import datetime, timedelta
  8. from celery import Task
  9. from celery import shared_task as task
  10. from celery.exceptions import SoftTimeLimitExceeded
  11. from celery.signals import task_revoked
  12. # from celery.task.control import revoke
  13. from celery.utils.log import get_task_logger
  14. from django.conf import settings
  15. from django.core.cache import cache
  16. from django.core.files import File
  17. from django.db.models import Q
  18. from actions.models import USER_MEDIA_ACTIONS, MediaAction
  19. from users.models import User
  20. from .backends import FFmpegBackend
  21. from .exceptions import VideoEncodingError
  22. from .helpers import (
  23. calculate_seconds,
  24. create_temp_file,
  25. get_file_name,
  26. get_file_type,
  27. media_file_info,
  28. produce_ffmpeg_commands,
  29. produce_friendly_token,
  30. rm_file,
  31. run_command,
  32. )
  33. from .methods import list_tasks, notify_users, pre_save_action
  34. from .models import Category, EncodeProfile, Encoding, Media, Rating, Tag
  35. logger = get_task_logger(__name__)
  36. VALID_USER_ACTIONS = [action for action, name in USER_MEDIA_ACTIONS]
  37. ERRORS_LIST = [
  38. "Output file is empty, nothing was encoded",
  39. "Invalid data found when processing input",
  40. "Unable to find a suitable output format for",
  41. ]
  42. @task(name="chunkize_media", bind=True, queue="short_tasks", soft_time_limit=60 * 30)
  43. def chunkize_media(self, friendly_token, profiles, force=True):
  44. """Break media in chunks and start encoding tasks"""
  45. profiles = [EncodeProfile.objects.get(id=profile) for profile in profiles]
  46. media = Media.objects.get(friendly_token=friendly_token)
  47. cwd = os.path.dirname(os.path.realpath(media.media_file.path))
  48. file_name = media.media_file.path.split("/")[-1]
  49. random_prefix = produce_friendly_token()
  50. file_format = "{0}_{1}".format(random_prefix, file_name)
  51. chunks_file_name = "%02d_{0}".format(file_format)
  52. chunks_file_name += ".mkv"
  53. cmd = [
  54. settings.FFMPEG_COMMAND,
  55. "-y",
  56. "-i",
  57. media.media_file.path,
  58. "-c",
  59. "copy",
  60. "-f",
  61. "segment",
  62. "-segment_time",
  63. str(settings.VIDEO_CHUNKS_DURATION),
  64. chunks_file_name,
  65. ]
  66. chunks = []
  67. ret = run_command(cmd, cwd=cwd)
  68. if "out" in ret.keys():
  69. for line in ret.get("error").split("\n"):
  70. ch = re.findall(r"Opening \'([\W\w]+)\' for writing", line)
  71. if ch:
  72. chunks.append(ch[0])
  73. if not chunks:
  74. # command completely failed to segment file.putting to normal encode
  75. logger.info("Failed to break file {0} in chunks." " Putting to normal encode queue".format(friendly_token))
  76. for profile in profiles:
  77. if media.video_height and media.video_height < profile.resolution:
  78. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  79. continue
  80. encoding = Encoding(media=media, profile=profile)
  81. encoding.save()
  82. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  83. encode_media.delay(friendly_token, profile.id, encoding.id, enc_url, force=force)
  84. return False
  85. chunks = [os.path.join(cwd, ch) for ch in chunks]
  86. to_profiles = []
  87. chunks_dict = {}
  88. # calculate once md5sums
  89. for chunk in chunks:
  90. cmd = ["md5sum", chunk]
  91. stdout = run_command(cmd).get("out")
  92. md5sum = stdout.strip().split()[0]
  93. chunks_dict[chunk] = md5sum
  94. for profile in profiles:
  95. if media.video_height and media.video_height < profile.resolution:
  96. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  97. continue
  98. to_profiles.append(profile)
  99. for chunk in chunks:
  100. encoding = Encoding(
  101. media=media,
  102. profile=profile,
  103. chunk_file_path=chunk,
  104. chunk=True,
  105. chunks_info=json.dumps(chunks_dict),
  106. md5sum=chunks_dict[chunk],
  107. )
  108. encoding.save()
  109. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  110. if profile.resolution in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  111. priority = 0
  112. else:
  113. priority = 9
  114. encode_media.apply_async(
  115. args=[friendly_token, profile.id, encoding.id, enc_url],
  116. kwargs={"force": force, "chunk": True, "chunk_file_path": chunk},
  117. priority=priority,
  118. )
  119. logger.info("got {0} chunks and will encode to {1} profiles".format(len(chunks), to_profiles))
  120. return True
  121. class EncodingTask(Task):
  122. def on_failure(self, exc, task_id, args, kwargs, einfo):
  123. # mainly used to run some post failure steps
  124. # we get here if a task is revoked
  125. try:
  126. if hasattr(self, "encoding"):
  127. self.encoding.status = "fail"
  128. self.encoding.save(update_fields=["status"])
  129. kill_ffmpeg_process(self.encoding.temp_file)
  130. if hasattr(self.encoding, "media"):
  131. self.encoding.media.post_encode_actions()
  132. except BaseException:
  133. pass
  134. return False
  135. @task(
  136. name="encode_media",
  137. base=EncodingTask,
  138. bind=True,
  139. queue="long_tasks",
  140. soft_time_limit=settings.CELERY_SOFT_TIME_LIMIT,
  141. )
  142. def encode_media(
  143. self,
  144. friendly_token,
  145. profile_id,
  146. encoding_id,
  147. encoding_url,
  148. force=True,
  149. chunk=False,
  150. chunk_file_path="",
  151. ):
  152. """Encode a media to given profile, using ffmpeg, storing progress"""
  153. logger.info("Encode Media started, friendly token {0}, profile id {1}, force {2}".format(friendly_token, profile_id, force))
  154. if self.request.id:
  155. task_id = self.request.id
  156. else:
  157. task_id = None
  158. try:
  159. media = Media.objects.get(friendly_token=friendly_token)
  160. profile = EncodeProfile.objects.get(id=profile_id)
  161. except BaseException:
  162. Encoding.objects.filter(id=encoding_id).delete()
  163. return False
  164. # break logic with chunk True/False
  165. if chunk:
  166. # TODO: in case a video is chunkized and this enters here many times
  167. # it will always run since chunk_file_path is always different
  168. # thus find a better way for this check
  169. if Encoding.objects.filter(media=media, profile=profile, chunk_file_path=chunk_file_path).count() > 1 and force is False:
  170. Encoding.objects.filter(id=encoding_id).delete()
  171. return False
  172. else:
  173. try:
  174. encoding = Encoding.objects.get(id=encoding_id)
  175. encoding.status = "running"
  176. Encoding.objects.filter(
  177. media=media,
  178. profile=profile,
  179. chunk=True,
  180. chunk_file_path=chunk_file_path,
  181. ).exclude(id=encoding_id).delete()
  182. except BaseException:
  183. encoding = Encoding(
  184. media=media,
  185. profile=profile,
  186. status="running",
  187. chunk=True,
  188. chunk_file_path=chunk_file_path,
  189. )
  190. else:
  191. if Encoding.objects.filter(media=media, profile=profile).count() > 1 and force is False:
  192. Encoding.objects.filter(id=encoding_id).delete()
  193. return False
  194. else:
  195. try:
  196. encoding = Encoding.objects.get(id=encoding_id)
  197. encoding.status = "running"
  198. Encoding.objects.filter(media=media, profile=profile).exclude(id=encoding_id).delete()
  199. except BaseException:
  200. encoding = Encoding(media=media, profile=profile, status="running")
  201. if task_id:
  202. encoding.task_id = task_id
  203. encoding.worker = "localhost"
  204. encoding.retries = self.request.retries
  205. encoding.save()
  206. if profile.extension == "gif":
  207. tf = create_temp_file(suffix=".gif")
  208. # -ss 5 start from 5 second. -t 25 until 25 sec
  209. command = [
  210. settings.FFMPEG_COMMAND,
  211. "-y",
  212. "-ss",
  213. "3",
  214. "-i",
  215. media.media_file.path,
  216. "-hide_banner",
  217. "-vf",
  218. "scale=344:-1:flags=lanczos,fps=1",
  219. "-t",
  220. "25",
  221. "-f",
  222. "gif",
  223. tf,
  224. ]
  225. ret = run_command(command)
  226. if os.path.exists(tf) and get_file_type(tf) == "image":
  227. with open(tf, "rb") as f:
  228. myfile = File(f)
  229. encoding.status = "success"
  230. encoding.media_file.save(content=myfile, name=tf)
  231. rm_file(tf)
  232. return True
  233. else:
  234. return False
  235. if chunk:
  236. original_media_path = chunk_file_path
  237. else:
  238. original_media_path = media.media_file.path
  239. # if not media.duration:
  240. # encoding.status = "fail"
  241. # encoding.save(update_fields=["status"])
  242. # return False
  243. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
  244. tf = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  245. tfpass = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  246. ffmpeg_commands = produce_ffmpeg_commands(
  247. original_media_path,
  248. media.media_info,
  249. resolution=profile.resolution,
  250. codec=profile.codec,
  251. output_filename=tf,
  252. pass_file=tfpass,
  253. chunk=chunk,
  254. )
  255. if not ffmpeg_commands:
  256. encoding.status = "fail"
  257. encoding.save(update_fields=["status"])
  258. return False
  259. encoding.temp_file = tf
  260. encoding.commands = str(ffmpeg_commands)
  261. encoding.save(update_fields=["temp_file", "commands", "task_id"])
  262. # binding these, so they are available on on_failure
  263. self.encoding = encoding
  264. self.media = media
  265. # can be one-pass or two-pass
  266. for ffmpeg_command in ffmpeg_commands:
  267. ffmpeg_command = [str(s) for s in ffmpeg_command]
  268. encoding_backend = FFmpegBackend()
  269. try:
  270. encoding_command = encoding_backend.encode(ffmpeg_command)
  271. duration, n_times = 0, 0
  272. output = ""
  273. while encoding_command:
  274. try:
  275. # TODO: understand an eternal loop
  276. # eg h265 with mv4 file issue, and stop with error
  277. output = next(encoding_command)
  278. duration = calculate_seconds(output)
  279. if duration:
  280. percent = duration * 100 / media.duration
  281. if n_times % 60 == 0:
  282. encoding.progress = percent
  283. try:
  284. encoding.save(update_fields=["progress", "update_date"])
  285. logger.info("Saved {0}".format(round(percent, 2)))
  286. except BaseException:
  287. pass
  288. n_times += 1
  289. except StopIteration:
  290. break
  291. except VideoEncodingError:
  292. # ffmpeg error, or ffmpeg was killed
  293. raise
  294. except Exception as e:
  295. try:
  296. # output is empty, fail message is on the exception
  297. output = e.message
  298. except AttributeError:
  299. output = ""
  300. if isinstance(e, SoftTimeLimitExceeded):
  301. kill_ffmpeg_process(encoding.temp_file)
  302. encoding.logs = output
  303. encoding.status = "fail"
  304. encoding.save(update_fields=["status", "logs"])
  305. raise_exception = True
  306. # if this is an ffmpeg's valid error
  307. # no need for the task to be re-run
  308. # otherwise rerun task...
  309. for error_msg in ERRORS_LIST:
  310. if error_msg.lower() in output.lower():
  311. raise_exception = False
  312. if raise_exception:
  313. raise self.retry(exc=e, countdown=5, max_retries=1)
  314. encoding.logs = output
  315. encoding.progress = 100
  316. success = False
  317. encoding.status = "fail"
  318. if os.path.exists(tf) and os.path.getsize(tf) != 0:
  319. ret = media_file_info(tf)
  320. if ret.get("is_video") or ret.get("is_audio"):
  321. encoding.status = "success"
  322. success = True
  323. with open(tf, "rb") as f:
  324. myfile = File(f)
  325. output_name = "{0}.{1}".format(get_file_name(original_media_path), profile.extension)
  326. encoding.media_file.save(content=myfile, name=output_name)
  327. encoding.total_run_time = (encoding.update_date - encoding.add_date).seconds
  328. try:
  329. encoding.save(update_fields=["status", "logs", "progress", "total_run_time"])
  330. # this will raise a django.db.utils.DatabaseError error when task is revoked,
  331. # since we delete the encoding at that stage
  332. except BaseException:
  333. pass
  334. return success
  335. @task(name="produce_sprite_from_video", queue="long_tasks")
  336. def produce_sprite_from_video(friendly_token):
  337. """Produces a sprites file for a video, uses ffmpeg"""
  338. try:
  339. media = Media.objects.get(friendly_token=friendly_token)
  340. except BaseException:
  341. logger.info("failed to get media with friendly_token %s" % friendly_token)
  342. return False
  343. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as tmpdirname:
  344. try:
  345. tmpdir_image_files = tmpdirname + "/img%03d.jpg"
  346. output_name = tmpdirname + "/sprites.jpg"
  347. cmd = "{0} -i {1} -f image2 -vf 'fps=1/10, scale=160:90' {2}&&files=$(ls {3}/img*.jpg | sort -t '-' -n -k 2 | tr '\n' ' ')&&convert $files -append {4}".format(
  348. settings.FFMPEG_COMMAND,
  349. media.media_file.path,
  350. tmpdir_image_files,
  351. tmpdirname,
  352. output_name,
  353. )
  354. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  355. if os.path.exists(output_name) and get_file_type(output_name) == "image":
  356. with open(output_name, "rb") as f:
  357. myfile = File(f)
  358. media.sprites.save(
  359. content=myfile,
  360. name=get_file_name(media.media_file.path) + "sprites.jpg",
  361. )
  362. except BaseException:
  363. pass
  364. return True
  365. @task(name="create_hls", queue="long_tasks")
  366. def create_hls(friendly_token):
  367. """Creates HLS file for media, uses Bento4 mp4hls command"""
  368. if not hasattr(settings, "MP4HLS_COMMAND"):
  369. logger.info("Bento4 mp4hls command is missing from configuration")
  370. return False
  371. if not os.path.exists(settings.MP4HLS_COMMAND):
  372. logger.info("Bento4 mp4hls command is missing")
  373. return False
  374. try:
  375. media = Media.objects.get(friendly_token=friendly_token)
  376. except BaseException:
  377. logger.info("failed to get media with friendly_token %s" % friendly_token)
  378. return False
  379. p = media.uid.hex
  380. output_dir = os.path.join(settings.HLS_DIR, p)
  381. encodings = media.encodings.filter(profile__extension="mp4", status="success", chunk=False, profile__codec="h264")
  382. if encodings:
  383. existing_output_dir = None
  384. if os.path.exists(output_dir):
  385. existing_output_dir = output_dir
  386. output_dir = os.path.join(settings.HLS_DIR, p + produce_friendly_token())
  387. files = " ".join([f.media_file.path for f in encodings if f.media_file])
  388. cmd = "{0} --segment-duration=4 --output-dir={1} {2}".format(settings.MP4HLS_COMMAND, output_dir, files)
  389. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  390. if existing_output_dir:
  391. # override content with -T !
  392. cmd = "cp -rT {0} {1}".format(output_dir, existing_output_dir)
  393. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  394. shutil.rmtree(output_dir)
  395. output_dir = existing_output_dir
  396. pp = os.path.join(output_dir, "master.m3u8")
  397. if os.path.exists(pp):
  398. if media.hls_file != pp:
  399. media.hls_file = pp
  400. media.save(update_fields=["hls_file"])
  401. return True
  402. @task(name="check_running_states", queue="short_tasks")
  403. def check_running_states():
  404. # Experimental - unused
  405. """Check stale running encodings and delete/reencode media"""
  406. encodings = Encoding.objects.filter(status="running")
  407. logger.info("got {0} encodings that are in state running".format(encodings.count()))
  408. changed = 0
  409. for encoding in encodings:
  410. now = datetime.now(encoding.update_date.tzinfo)
  411. if (now - encoding.update_date).seconds > settings.RUNNING_STATE_STALE:
  412. media = encoding.media
  413. profile = encoding.profile
  414. # task_id = encoding.task_id
  415. # terminate task
  416. # TODO: not imported
  417. # if task_id:
  418. # revoke(task_id, terminate=True)
  419. encoding.delete()
  420. media.encode(profiles=[profile])
  421. # TODO: allign with new code + chunksize...
  422. changed += 1
  423. if changed:
  424. logger.info("changed from running to pending on {0} items".format(changed))
  425. return True
  426. @task(name="check_media_states", queue="short_tasks")
  427. def check_media_states():
  428. # Experimental - unused
  429. # check encoding status of not success media
  430. media = Media.objects.filter(Q(encoding_status="running") | Q(encoding_status="fail") | Q(encoding_status="pending"))
  431. logger.info("got {0} media that are not in state success".format(media.count()))
  432. changed = 0
  433. for m in media:
  434. m.set_encoding_status()
  435. m.save(update_fields=["encoding_status"])
  436. changed += 1
  437. if changed:
  438. logger.info("changed encoding status to {0} media items".format(changed))
  439. return True
  440. @task(name="check_pending_states", queue="short_tasks")
  441. def check_pending_states():
  442. # Experimental - unused
  443. # check encoding profiles that are on state pending and not on a queue
  444. encodings = Encoding.objects.filter(status="pending")
  445. if not encodings:
  446. return True
  447. changed = 0
  448. tasks = list_tasks()
  449. task_ids = tasks["task_ids"]
  450. media_profile_pairs = tasks["media_profile_pairs"]
  451. for encoding in encodings:
  452. if encoding.task_id and encoding.task_id in task_ids:
  453. # encoding is in one of the active/reserved/scheduled tasks list
  454. continue
  455. elif (
  456. encoding.media.friendly_token,
  457. encoding.profile.id,
  458. ) in media_profile_pairs:
  459. continue
  460. # encoding is in one of the reserved/scheduled tasks list.
  461. # has no task_id but will be run, so need to re-enter the queue
  462. else:
  463. media = encoding.media
  464. profile = encoding.profile
  465. encoding.delete()
  466. media.encode(profiles=[profile], force=False)
  467. changed += 1
  468. if changed:
  469. logger.info("set to the encode queue {0} encodings that were on pending state".format(changed))
  470. return True
  471. @task(name="check_missing_profiles", queue="short_tasks")
  472. def check_missing_profiles():
  473. # Experimental - unused
  474. # check if video files have missing profiles. If so, add them
  475. media = Media.objects.filter(media_type="video")
  476. profiles = list(EncodeProfile.objects.all())
  477. changed = 0
  478. for m in media:
  479. existing_profiles = [p.profile for p in m.encodings.all()]
  480. missing_profiles = [p for p in profiles if p not in existing_profiles]
  481. if missing_profiles:
  482. m.encode(profiles=missing_profiles, force=False)
  483. # since we call with force=False
  484. # encode_media won't delete existing profiles
  485. # if they appear on the meanwhile (eg on a big queue)
  486. changed += 1
  487. if changed:
  488. logger.info("set to the encode queue {0} profiles".format(changed))
  489. return True
  490. @task(name="clear_sessions", queue="short_tasks")
  491. def clear_sessions():
  492. """Clear expired sessions"""
  493. try:
  494. from importlib import import_module
  495. from django.conf import settings
  496. engine = import_module(settings.SESSION_ENGINE)
  497. engine.SessionStore.clear_expired()
  498. except BaseException:
  499. return False
  500. return True
  501. @task(name="save_user_action", queue="short_tasks")
  502. def save_user_action(user_or_session, friendly_token=None, action="watch", extra_info=None):
  503. """Short task that saves a user action"""
  504. if action not in VALID_USER_ACTIONS:
  505. return False
  506. try:
  507. media = Media.objects.get(friendly_token=friendly_token)
  508. except BaseException:
  509. return False
  510. user = user_or_session.get("user_id")
  511. session_key = user_or_session.get("user_session")
  512. remote_ip = user_or_session.get("remote_ip_addr")
  513. if user:
  514. try:
  515. user = User.objects.get(id=user)
  516. except BaseException:
  517. return False
  518. if not (user or session_key):
  519. return False
  520. if action in ["like", "dislike", "watch", "report"]:
  521. if not pre_save_action(
  522. media=media,
  523. user=user,
  524. session_key=session_key,
  525. action=action,
  526. remote_ip=remote_ip,
  527. ):
  528. return False
  529. if action == "watch":
  530. if user:
  531. MediaAction.objects.filter(user=user, media=media, action="watch").delete()
  532. else:
  533. MediaAction.objects.filter(session_key=session_key, media=media, action="watch").delete()
  534. if action == "rate":
  535. try:
  536. score = extra_info.get("score")
  537. rating_category = extra_info.get("category_id")
  538. except BaseException:
  539. # TODO: better error handling?
  540. return False
  541. try:
  542. rating = Rating.objects.filter(user=user, media=media, rating_category_id=rating_category).first()
  543. if rating:
  544. rating.score = score
  545. rating.save(update_fields=["score"])
  546. else:
  547. rating = Rating.objects.create(
  548. user=user,
  549. media=media,
  550. rating_category_id=rating_category,
  551. score=score,
  552. )
  553. except Exception:
  554. # TODO: more specific handling, for errors in score, or
  555. # rating_category?
  556. return False
  557. ma = MediaAction(
  558. user=user,
  559. session_key=session_key,
  560. media=media,
  561. action=action,
  562. extra_info=extra_info,
  563. remote_ip=remote_ip,
  564. )
  565. ma.save()
  566. if action == "watch":
  567. media.views += 1
  568. media.save(update_fields=["views"])
  569. elif action == "report":
  570. media.reported_times += 1
  571. if media.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
  572. media.state = "private"
  573. media.save(update_fields=["reported_times", "state"])
  574. notify_users(
  575. friendly_token=media.friendly_token,
  576. action="media_reported",
  577. extra=extra_info,
  578. )
  579. elif action == "like":
  580. media.likes += 1
  581. media.save(update_fields=["likes"])
  582. elif action == "dislike":
  583. media.dislikes += 1
  584. media.save(update_fields=["dislikes"])
  585. return True
  586. @task(name="get_list_of_popular_media", queue="long_tasks")
  587. def get_list_of_popular_media():
  588. """Experimental task for preparing media listing
  589. for index page / recommended section
  590. calculate and return the top 50 popular media, based on two rules
  591. X = the top 25 videos that have the most views during the last week
  592. Y = the most recent 25 videos that have been liked over the last 6 months
  593. """
  594. valid_media_x = {}
  595. valid_media_y = {}
  596. basic_query = Q(listable=True)
  597. media_x = Media.objects.filter(basic_query).values("friendly_token")
  598. period_x = datetime.now() - timedelta(days=7)
  599. period_y = datetime.now() - timedelta(days=30 * 6)
  600. for media in media_x:
  601. ft = media["friendly_token"]
  602. num = MediaAction.objects.filter(action_date__gte=period_x, action="watch", media__friendly_token=ft).count()
  603. if num:
  604. valid_media_x[ft] = num
  605. num = MediaAction.objects.filter(action_date__gte=period_y, action="like", media__friendly_token=ft).count()
  606. if num:
  607. valid_media_y[ft] = num
  608. x = sorted(valid_media_x.items(), key=lambda kv: kv[1], reverse=True)[:25]
  609. y = sorted(valid_media_y.items(), key=lambda kv: kv[1], reverse=True)[:25]
  610. media_ids = [a[0] for a in x]
  611. media_ids.extend([a[0] for a in y])
  612. media_ids = list(set(media_ids))
  613. cache.set("popular_media_ids", media_ids, 60 * 60 * 12)
  614. logger.info("saved popular media ids")
  615. return True
  616. @task(name="update_listings_thumbnails", queue="long_tasks")
  617. def update_listings_thumbnails():
  618. """Populate listings_thumbnail field for models"""
  619. # Categories
  620. used_media = []
  621. saved = 0
  622. qs = Category.objects.filter().order_by("-media_count")
  623. for object in qs:
  624. media = Media.objects.exclude(friendly_token__in=used_media).filter(category=object, state="public", is_reviewed=True).order_by("-views").first()
  625. if media:
  626. object.listings_thumbnail = media.thumbnail_url
  627. object.save(update_fields=["listings_thumbnail"])
  628. used_media.append(media.friendly_token)
  629. saved += 1
  630. logger.info("updated {} categories".format(saved))
  631. # Tags
  632. used_media = []
  633. saved = 0
  634. qs = Tag.objects.filter().order_by("-media_count")
  635. for object in qs:
  636. media = Media.objects.exclude(friendly_token__in=used_media).filter(tags=object, state="public", is_reviewed=True).order_by("-views").first()
  637. if media:
  638. object.listings_thumbnail = media.thumbnail_url
  639. object.save(update_fields=["listings_thumbnail"])
  640. used_media.append(media.friendly_token)
  641. saved += 1
  642. logger.info("updated {} tags".format(saved))
  643. return True
  644. @task_revoked.connect
  645. def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
  646. # For encode_media tasks that are revoked,
  647. # ffmpeg command won't be stopped, since
  648. # it got started by a subprocess.
  649. # Need to stop that process
  650. # Also, removing the Encoding object,
  651. # since the task that would prepare it was killed
  652. # Maybe add a killed state for Encoding objects
  653. try:
  654. uid = kwargs["request"].task_id
  655. if uid:
  656. encoding = Encoding.objects.get(task_id=uid)
  657. encoding.delete()
  658. logger.info("deleted the Encoding object")
  659. if encoding.temp_file:
  660. kill_ffmpeg_process(encoding.temp_file)
  661. except BaseException:
  662. pass
  663. return True
  664. def kill_ffmpeg_process(filepath):
  665. # this is not ideal, ffmpeg pid could be linked to the Encoding object
  666. cmd = "ps aux|grep 'ffmpeg'|grep %s|grep -v grep |awk '{print $2}'" % filepath
  667. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  668. pid = result.stdout.decode("utf-8").strip()
  669. if pid:
  670. cmd = "kill -9 %s" % pid
  671. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  672. return result
  673. @task(name="remove_media_file", base=Task, queue="long_tasks")
  674. def remove_media_file(media_file=None):
  675. rm_file(media_file)
  676. return True
  677. # TODO LIST
  678. # 1 chunks are deleted from original server when file is fully encoded.
  679. # however need to enter this logic in cases of fail as well
  680. # 2 script to delete chunks in fail status
  681. # (and check for their encdings, and delete them as well, along with
  682. # all chunks)
  683. # 3 beat task, remove chunks