tasks.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. import json
  2. import os
  3. import re
  4. import shutil
  5. import subprocess
  6. import tempfile
  7. from datetime import datetime, timedelta
  8. from celery import Task
  9. from celery.decorators import task
  10. from celery.exceptions import SoftTimeLimitExceeded
  11. from celery.signals import task_revoked
  12. from celery.task.control import revoke
  13. from celery.utils.log import get_task_logger
  14. from django.conf import settings
  15. from django.core.cache import cache
  16. from django.core.files import File
  17. from django.db.models import Q
  18. from actions.models import USER_MEDIA_ACTIONS, MediaAction
  19. from users.models import User
  20. from .backends import FFmpegBackend
  21. from .exceptions import VideoEncodingError
  22. from .helpers import (
  23. calculate_seconds,
  24. create_temp_file,
  25. get_file_name,
  26. get_file_type,
  27. media_file_info,
  28. produce_ffmpeg_commands,
  29. produce_friendly_token,
  30. rm_file,
  31. run_command,
  32. )
  33. from .methods import list_tasks, notify_users, pre_save_action
  34. from .models import Category, EncodeProfile, Encoding, Media, Rating, Tag
  35. logger = get_task_logger(__name__)
  36. VALID_USER_ACTIONS = [action for action, name in USER_MEDIA_ACTIONS]
  37. ERRORS_LIST = [
  38. "Output file is empty, nothing was encoded",
  39. "Invalid data found when processing input",
  40. "Unable to find a suitable output format for",
  41. ]
  42. @task(name="chunkize_media", bind=True, queue="short_tasks", soft_time_limit=60 * 30)
  43. def chunkize_media(self, friendly_token, profiles, force=True):
  44. """Break media in chunks and start encoding tasks"""
  45. profiles = [EncodeProfile.objects.get(id=profile) for profile in profiles]
  46. media = Media.objects.get(friendly_token=friendly_token)
  47. cwd = os.path.dirname(os.path.realpath(media.media_file.path))
  48. file_name = media.media_file.path.split("/")[-1]
  49. random_prefix = produce_friendly_token()
  50. file_format = "{0}_{1}".format(random_prefix, file_name)
  51. chunks_file_name = "%02d_{0}".format(file_format)
  52. chunks_file_name += ".mkv"
  53. cmd = [
  54. settings.FFMPEG_COMMAND,
  55. "-y",
  56. "-i",
  57. media.media_file.path,
  58. "-c",
  59. "copy",
  60. "-f",
  61. "segment",
  62. "-segment_time",
  63. str(settings.VIDEO_CHUNKS_DURATION),
  64. chunks_file_name,
  65. ]
  66. chunks = []
  67. ret = run_command(cmd, cwd=cwd)
  68. if "out" in ret.keys():
  69. for line in ret.get("error").split("\n"):
  70. ch = re.findall(r"Opening \'([\W\w]+)\' for writing", line)
  71. if ch:
  72. chunks.append(ch[0])
  73. if not chunks:
  74. # command completely failed to segment file.putting to normal encode
  75. logger.info("Failed to break file {0} in chunks." " Putting to normal encode queue".format(friendly_token))
  76. for profile in profiles:
  77. if media.video_height and media.video_height < profile.resolution:
  78. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  79. continue
  80. encoding = Encoding(media=media, profile=profile)
  81. encoding.save()
  82. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  83. encode_media.delay(friendly_token, profile.id, encoding.id, enc_url, force=force)
  84. return False
  85. chunks = [os.path.join(cwd, ch) for ch in chunks]
  86. to_profiles = []
  87. chunks_dict = {}
  88. # calculate once md5sums
  89. for chunk in chunks:
  90. cmd = ["md5sum", chunk]
  91. stdout = run_command(cmd).get("out")
  92. md5sum = stdout.strip().split()[0]
  93. chunks_dict[chunk] = md5sum
  94. for profile in profiles:
  95. if media.video_height and media.video_height < profile.resolution:
  96. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  97. continue
  98. to_profiles.append(profile)
  99. for chunk in chunks:
  100. encoding = Encoding(
  101. media=media,
  102. profile=profile,
  103. chunk_file_path=chunk,
  104. chunk=True,
  105. chunks_info=json.dumps(chunks_dict),
  106. md5sum=chunks_dict[chunk],
  107. )
  108. encoding.save()
  109. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  110. if profile.resolution in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  111. priority = 0
  112. else:
  113. priority = 9
  114. encode_media.apply_async(
  115. args=[friendly_token, profile.id, encoding.id, enc_url],
  116. kwargs={"force": force, "chunk": True, "chunk_file_path": chunk},
  117. priority=priority,
  118. )
  119. logger.info("got {0} chunks and will encode to {1} profiles".format(len(chunks), to_profiles))
  120. return True
  121. class EncodingTask(Task):
  122. def on_failure(self, exc, task_id, args, kwargs, einfo):
  123. # mainly used to run some post failure steps
  124. # we get here if a task is revoked
  125. try:
  126. if hasattr(self, "encoding"):
  127. self.encoding.status = "fail"
  128. self.encoding.save(update_fields=["status"])
  129. kill_ffmpeg_process(self.encoding.temp_file)
  130. if hasattr(self.encoding, "media"):
  131. self.encoding.media.post_encode_actions()
  132. except BaseException:
  133. pass
  134. return False
  135. @task(
  136. name="encode_media",
  137. base=EncodingTask,
  138. bind=True,
  139. queue="long_tasks",
  140. soft_time_limit=settings.CELERY_SOFT_TIME_LIMIT,
  141. )
  142. def encode_media(
  143. self,
  144. friendly_token,
  145. profile_id,
  146. encoding_id,
  147. encoding_url,
  148. force=True,
  149. chunk=False,
  150. chunk_file_path="",
  151. ):
  152. """Encode a media to given profile, using ffmpeg, storing progress"""
  153. logger.info("Encode Media started, friendly token {0}, profile id {1}, force {2}".format(friendly_token, profile_id, force))
  154. if self.request.id:
  155. task_id = self.request.id
  156. else:
  157. task_id = None
  158. try:
  159. media = Media.objects.get(friendly_token=friendly_token)
  160. profile = EncodeProfile.objects.get(id=profile_id)
  161. except BaseException:
  162. Encoding.objects.filter(id=encoding_id).delete()
  163. return False
  164. # break logic with chunk True/False
  165. if chunk:
  166. # TODO: in case a video is chunkized and this enters here many times
  167. # it will always run since chunk_file_path is always different
  168. # thus find a better way for this check
  169. if Encoding.objects.filter(media=media, profile=profile, chunk_file_path=chunk_file_path).count() > 1 and force is False:
  170. Encoding.objects.filter(id=encoding_id).delete()
  171. return False
  172. else:
  173. try:
  174. encoding = Encoding.objects.get(id=encoding_id)
  175. encoding.status = "running"
  176. Encoding.objects.filter(
  177. media=media,
  178. profile=profile,
  179. chunk=True,
  180. chunk_file_path=chunk_file_path,
  181. ).exclude(id=encoding_id).delete()
  182. except BaseException:
  183. encoding = Encoding(
  184. media=media,
  185. profile=profile,
  186. status="running",
  187. chunk=True,
  188. chunk_file_path=chunk_file_path,
  189. )
  190. else:
  191. if Encoding.objects.filter(media=media, profile=profile).count() > 1 and force is False:
  192. Encoding.objects.filter(id=encoding_id).delete()
  193. return False
  194. else:
  195. try:
  196. encoding = Encoding.objects.get(id=encoding_id)
  197. encoding.status = "running"
  198. Encoding.objects.filter(media=media, profile=profile).exclude(id=encoding_id).delete()
  199. except BaseException:
  200. encoding = Encoding(media=media, profile=profile, status="running")
  201. if task_id:
  202. encoding.task_id = task_id
  203. encoding.worker = "localhost"
  204. encoding.retries = self.request.retries
  205. encoding.save()
  206. if profile.extension == "gif":
  207. tf = create_temp_file(suffix=".gif")
  208. # -ss 5 start from 5 second. -t 25 until 25 sec
  209. command = [
  210. settings.FFMPEG_COMMAND,
  211. "-y",
  212. "-ss",
  213. "3",
  214. "-i",
  215. media.media_file.path,
  216. "-hide_banner",
  217. "-vf",
  218. "scale=344:-1:flags=lanczos,fps=1",
  219. "-t",
  220. "25",
  221. "-f",
  222. "gif",
  223. tf,
  224. ]
  225. ret = run_command(command)
  226. if os.path.exists(tf) and get_file_type(tf) == "image":
  227. with open(tf, "rb") as f:
  228. myfile = File(f)
  229. encoding.status = "success"
  230. encoding.media_file.save(content=myfile, name=tf)
  231. rm_file(tf)
  232. return True
  233. else:
  234. return False
  235. if chunk:
  236. original_media_path = chunk_file_path
  237. else:
  238. original_media_path = media.media_file.path
  239. # if not media.duration:
  240. # encoding.status = "fail"
  241. # encoding.save(update_fields=["status"])
  242. # return False
  243. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
  244. tf = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  245. tfpass = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  246. ffmpeg_commands = produce_ffmpeg_commands(
  247. original_media_path,
  248. media.media_info,
  249. resolution=profile.resolution,
  250. codec=profile.codec,
  251. output_filename=tf,
  252. pass_file=tfpass,
  253. chunk=chunk,
  254. )
  255. if not ffmpeg_commands:
  256. encoding.status = "fail"
  257. encoding.save(update_fields=["status"])
  258. return False
  259. encoding.temp_file = tf
  260. encoding.commands = str(ffmpeg_commands)
  261. encoding.save(update_fields=["temp_file", "commands", "task_id"])
  262. # binding these, so they are available on on_failure
  263. self.encoding = encoding
  264. self.media = media
  265. # can be one-pass or two-pass
  266. for ffmpeg_command in ffmpeg_commands:
  267. ffmpeg_command = [str(s) for s in ffmpeg_command]
  268. encoding_backend = FFmpegBackend()
  269. try:
  270. encoding_command = encoding_backend.encode(ffmpeg_command)
  271. duration, n_times = 0, 0
  272. output = ""
  273. while encoding_command:
  274. try:
  275. # TODO: understand an eternal loop
  276. # eg h265 with mv4 file issue, and stop with error
  277. output = next(encoding_command)
  278. duration = calculate_seconds(output)
  279. if duration:
  280. percent = duration * 100 / media.duration
  281. if n_times % 60 == 0:
  282. encoding.progress = percent
  283. try:
  284. encoding.save(update_fields=["progress", "update_date"])
  285. logger.info("Saved {0}".format(round(percent, 2)))
  286. except BaseException:
  287. pass
  288. n_times += 1
  289. except StopIteration:
  290. break
  291. except VideoEncodingError:
  292. # ffmpeg error, or ffmpeg was killed
  293. raise
  294. except Exception as e:
  295. try:
  296. # output is empty, fail message is on the exception
  297. output = e.message
  298. except AttributeError:
  299. output = ""
  300. if isinstance(e, SoftTimeLimitExceeded):
  301. kill_ffmpeg_process(encoding.temp_file)
  302. encoding.logs = output
  303. encoding.status = "fail"
  304. encoding.save(update_fields=["status", "logs"])
  305. raise_exception = True
  306. # if this is an ffmpeg's valid error
  307. # no need for the task to be re-run
  308. # otherwise rerun task...
  309. for error_msg in ERRORS_LIST:
  310. if error_msg.lower() in output.lower():
  311. raise_exception = False
  312. if raise_exception:
  313. raise self.retry(exc=e, countdown=5, max_retries=1)
  314. encoding.logs = output
  315. encoding.progress = 100
  316. success = False
  317. encoding.status = "fail"
  318. if os.path.exists(tf) and os.path.getsize(tf) != 0:
  319. ret = media_file_info(tf)
  320. if ret.get("is_video") or ret.get("is_audio"):
  321. encoding.status = "success"
  322. success = True
  323. with open(tf, "rb") as f:
  324. myfile = File(f)
  325. output_name = "{0}.{1}".format(get_file_name(original_media_path), profile.extension)
  326. encoding.media_file.save(content=myfile, name=output_name)
  327. encoding.total_run_time = (encoding.update_date - encoding.add_date).seconds
  328. try:
  329. encoding.save(update_fields=["status", "logs", "progress", "total_run_time"])
  330. # this will raise a django.db.utils.DatabaseError error when task is revoked,
  331. # since we delete the encoding at that stage
  332. except BaseException:
  333. pass
  334. return success
  335. @task(name="produce_sprite_from_video", queue="long_tasks")
  336. def produce_sprite_from_video(friendly_token):
  337. """Produces a sprites file for a video, uses ffmpeg"""
  338. try:
  339. media = Media.objects.get(friendly_token=friendly_token)
  340. except BaseException:
  341. logger.info("failed to get media with friendly_token %s" % friendly_token)
  342. return False
  343. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as tmpdirname:
  344. try:
  345. tmpdir_image_files = tmpdirname + "/img%03d.jpg"
  346. output_name = tmpdirname + "/sprites.jpg"
  347. cmd = "{0} -i {1} -f image2 -vf 'fps=1/10, scale=160:90' {2}&&files=$(ls {3}/img*.jpg | sort -t '-' -n -k 2 | tr '\n' ' ')&&convert $files -append {4}".format(
  348. settings.FFMPEG_COMMAND,
  349. media.media_file.path,
  350. tmpdir_image_files,
  351. tmpdirname,
  352. output_name,
  353. )
  354. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  355. if os.path.exists(output_name) and get_file_type(output_name) == "image":
  356. with open(output_name, "rb") as f:
  357. myfile = File(f)
  358. media.sprites.save(
  359. content=myfile,
  360. name=get_file_name(media.media_file.path) + "sprites.jpg",
  361. )
  362. except BaseException:
  363. pass
  364. return True
  365. @task(name="create_hls", queue="long_tasks")
  366. def create_hls(friendly_token):
  367. """Creates HLS file for media, uses Bento4 mp4hls command"""
  368. if not hasattr(settings, "MP4HLS_COMMAND"):
  369. logger.info("Bento4 mp4hls command is missing from configuration")
  370. return False
  371. if not os.path.exists(settings.MP4HLS_COMMAND):
  372. logger.info("Bento4 mp4hls command is missing")
  373. return False
  374. try:
  375. media = Media.objects.get(friendly_token=friendly_token)
  376. except BaseException:
  377. logger.info("failed to get media with friendly_token %s" % friendly_token)
  378. return False
  379. p = media.uid.hex
  380. output_dir = os.path.join(settings.HLS_DIR, p)
  381. encodings = media.encodings.filter(profile__extension="mp4", status="success", chunk=False, profile__codec="h264")
  382. if encodings:
  383. existing_output_dir = None
  384. if os.path.exists(output_dir):
  385. existing_output_dir = output_dir
  386. output_dir = os.path.join(settings.HLS_DIR, p + produce_friendly_token())
  387. files = " ".join([f.media_file.path for f in encodings if f.media_file])
  388. cmd = "{0} --segment-duration=4 --output-dir={1} {2}".format(settings.MP4HLS_COMMAND, output_dir, files)
  389. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  390. if existing_output_dir:
  391. # override content with -T !
  392. cmd = "cp -rT {0} {1}".format(output_dir, existing_output_dir)
  393. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  394. shutil.rmtree(output_dir)
  395. output_dir = existing_output_dir
  396. pp = os.path.join(output_dir, "master.m3u8")
  397. if os.path.exists(pp):
  398. if media.hls_file != pp:
  399. media.hls_file = pp
  400. media.save(update_fields=["hls_file"])
  401. return True
  402. @task(name="check_running_states", queue="short_tasks")
  403. def check_running_states():
  404. # Experimental - unused
  405. """Check stale running encodings and delete/reencode media"""
  406. encodings = Encoding.objects.filter(status="running")
  407. logger.info("got {0} encodings that are in state running".format(encodings.count()))
  408. changed = 0
  409. for encoding in encodings:
  410. now = datetime.now(encoding.update_date.tzinfo)
  411. if (now - encoding.update_date).seconds > settings.RUNNING_STATE_STALE:
  412. media = encoding.media
  413. profile = encoding.profile
  414. task_id = encoding.task_id
  415. # terminate task
  416. if task_id:
  417. revoke(task_id, terminate=True)
  418. encoding.delete()
  419. media.encode(profiles=[profile])
  420. # TODO: allign with new code + chunksize...
  421. changed += 1
  422. if changed:
  423. logger.info("changed from running to pending on {0} items".format(changed))
  424. return True
  425. @task(name="check_media_states", queue="short_tasks")
  426. def check_media_states():
  427. # Experimental - unused
  428. # check encoding status of not success media
  429. media = Media.objects.filter(Q(encoding_status="running") | Q(encoding_status="fail") | Q(encoding_status="pending"))
  430. logger.info("got {0} media that are not in state success".format(media.count()))
  431. changed = 0
  432. for m in media:
  433. m.set_encoding_status()
  434. m.save(update_fields=["encoding_status"])
  435. changed += 1
  436. if changed:
  437. logger.info("changed encoding status to {0} media items".format(changed))
  438. return True
  439. @task(name="check_pending_states", queue="short_tasks")
  440. def check_pending_states():
  441. # Experimental - unused
  442. # check encoding profiles that are on state pending and not on a queue
  443. encodings = Encoding.objects.filter(status="pending")
  444. if not encodings:
  445. return True
  446. changed = 0
  447. tasks = list_tasks()
  448. task_ids = tasks["task_ids"]
  449. media_profile_pairs = tasks["media_profile_pairs"]
  450. for encoding in encodings:
  451. if encoding.task_id and encoding.task_id in task_ids:
  452. # encoding is in one of the active/reserved/scheduled tasks list
  453. continue
  454. elif (
  455. encoding.media.friendly_token,
  456. encoding.profile.id,
  457. ) in media_profile_pairs:
  458. continue
  459. # encoding is in one of the reserved/scheduled tasks list.
  460. # has no task_id but will be run, so need to re-enter the queue
  461. else:
  462. media = encoding.media
  463. profile = encoding.profile
  464. encoding.delete()
  465. media.encode(profiles=[profile], force=False)
  466. changed += 1
  467. if changed:
  468. logger.info("set to the encode queue {0} encodings that were on pending state".format(changed))
  469. return True
  470. @task(name="check_missing_profiles", queue="short_tasks")
  471. def check_missing_profiles():
  472. # Experimental - unused
  473. # check if video files have missing profiles. If so, add them
  474. media = Media.objects.filter(media_type="video")
  475. profiles = list(EncodeProfile.objects.all())
  476. changed = 0
  477. for m in media:
  478. existing_profiles = [p.profile for p in m.encodings.all()]
  479. missing_profiles = [p for p in profiles if p not in existing_profiles]
  480. if missing_profiles:
  481. m.encode(profiles=missing_profiles, force=False)
  482. # since we call with force=False
  483. # encode_media won't delete existing profiles
  484. # if they appear on the meanwhile (eg on a big queue)
  485. changed += 1
  486. if changed:
  487. logger.info("set to the encode queue {0} profiles".format(changed))
  488. return True
  489. @task(name="clear_sessions", queue="short_tasks")
  490. def clear_sessions():
  491. """Clear expired sessions"""
  492. try:
  493. from importlib import import_module
  494. from django.conf import settings
  495. engine = import_module(settings.SESSION_ENGINE)
  496. engine.SessionStore.clear_expired()
  497. except BaseException:
  498. return False
  499. return True
  500. @task(name="save_user_action", queue="short_tasks")
  501. def save_user_action(user_or_session, friendly_token=None, action="watch", extra_info=None):
  502. """Short task that saves a user action"""
  503. if action not in VALID_USER_ACTIONS:
  504. return False
  505. try:
  506. media = Media.objects.get(friendly_token=friendly_token)
  507. except BaseException:
  508. return False
  509. user = user_or_session.get("user_id")
  510. session_key = user_or_session.get("user_session")
  511. remote_ip = user_or_session.get("remote_ip_addr")
  512. if user:
  513. try:
  514. user = User.objects.get(id=user)
  515. except BaseException:
  516. return False
  517. if not (user or session_key):
  518. return False
  519. if action in ["like", "dislike", "report"]:
  520. if not pre_save_action(
  521. media=media,
  522. user=user,
  523. session_key=session_key,
  524. action=action,
  525. remote_ip=remote_ip,
  526. ):
  527. return False
  528. if action == "watch":
  529. if user:
  530. MediaAction.objects.filter(user=user, media=media, action="watch").delete()
  531. else:
  532. MediaAction.objects.filter(session_key=session_key, media=media, action="watch").delete()
  533. if action == "rate":
  534. try:
  535. score = extra_info.get("score")
  536. rating_category = extra_info.get("category_id")
  537. except BaseException:
  538. # TODO: better error handling?
  539. return False
  540. try:
  541. rating = Rating.objects.filter(user=user, media=media, rating_category_id=rating_category).first()
  542. if rating:
  543. rating.score = score
  544. rating.save(update_fields=["score"])
  545. else:
  546. rating = Rating.objects.create(
  547. user=user,
  548. media=media,
  549. rating_category_id=rating_category,
  550. score=score,
  551. )
  552. except Exception:
  553. # TODO: more specific handling, for errors in score, or
  554. # rating_category?
  555. return False
  556. ma = MediaAction(
  557. user=user,
  558. session_key=session_key,
  559. media=media,
  560. action=action,
  561. extra_info=extra_info,
  562. remote_ip=remote_ip,
  563. )
  564. ma.save()
  565. if action == "watch":
  566. media.views += 1
  567. media.save(update_fields=["views"])
  568. elif action == "report":
  569. media.reported_times += 1
  570. if media.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
  571. media.state = "private"
  572. media.save(update_fields=["reported_times", "state"])
  573. notify_users(
  574. friendly_token=media.friendly_token,
  575. action="media_reported",
  576. extra=extra_info,
  577. )
  578. elif action == "like":
  579. media.likes += 1
  580. media.save(update_fields=["likes"])
  581. elif action == "dislike":
  582. media.dislikes += 1
  583. media.save(update_fields=["dislikes"])
  584. return True
  585. @task(name="get_list_of_popular_media", queue="long_tasks")
  586. def get_list_of_popular_media():
  587. """Experimental task for preparing media listing
  588. for index page / recommended section
  589. calculate and return the top 50 popular media, based on two rules
  590. X = the top 25 videos that have the most views during the last week
  591. Y = the most recent 25 videos that have been liked over the last 6 months
  592. """
  593. valid_media_x = {}
  594. valid_media_y = {}
  595. basic_query = Q(listable=True)
  596. media_x = Media.objects.filter(basic_query).values("friendly_token")
  597. period_x = datetime.now() - timedelta(days=7)
  598. period_y = datetime.now() - timedelta(days=30 * 6)
  599. for media in media_x:
  600. ft = media["friendly_token"]
  601. num = MediaAction.objects.filter(action_date__gte=period_x, action="watch", media__friendly_token=ft).count()
  602. if num:
  603. valid_media_x[ft] = num
  604. num = MediaAction.objects.filter(action_date__gte=period_y, action="like", media__friendly_token=ft).count()
  605. if num:
  606. valid_media_y[ft] = num
  607. x = sorted(valid_media_x.items(), key=lambda kv: kv[1], reverse=True)[:25]
  608. y = sorted(valid_media_y.items(), key=lambda kv: kv[1], reverse=True)[:25]
  609. media_ids = [a[0] for a in x]
  610. media_ids.extend([a[0] for a in y])
  611. media_ids = list(set(media_ids))
  612. cache.set("popular_media_ids", media_ids, 60 * 60 * 12)
  613. logger.info("saved popular media ids")
  614. return True
  615. @task(name="update_listings_thumbnails", queue="long_tasks")
  616. def update_listings_thumbnails():
  617. """Populate listings_thumbnail field for models"""
  618. # Categories
  619. used_media = []
  620. saved = 0
  621. qs = Category.objects.filter().order_by("-media_count")
  622. for object in qs:
  623. media = Media.objects.exclude(friendly_token__in=used_media).filter(category=object, state="public", is_reviewed=True).order_by("-views").first()
  624. if media:
  625. object.listings_thumbnail = media.thumbnail_url
  626. object.save(update_fields=["listings_thumbnail"])
  627. used_media.append(media.friendly_token)
  628. saved += 1
  629. logger.info("updated {} categories".format(saved))
  630. # Tags
  631. used_media = []
  632. saved = 0
  633. qs = Tag.objects.filter().order_by("-media_count")
  634. for object in qs:
  635. media = Media.objects.exclude(friendly_token__in=used_media).filter(tags=object, state="public", is_reviewed=True).order_by("-views").first()
  636. if media:
  637. object.listings_thumbnail = media.thumbnail_url
  638. object.save(update_fields=["listings_thumbnail"])
  639. used_media.append(media.friendly_token)
  640. saved += 1
  641. logger.info("updated {} tags".format(saved))
  642. return True
  643. @task_revoked.connect
  644. def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
  645. # For encode_media tasks that are revoked,
  646. # ffmpeg command won't be stopped, since
  647. # it got started by a subprocess.
  648. # Need to stop that process
  649. # Also, removing the Encoding object,
  650. # since the task that would prepare it was killed
  651. # Maybe add a killed state for Encoding objects
  652. try:
  653. uid = kwargs["request"].task_id
  654. if uid:
  655. encoding = Encoding.objects.get(task_id=uid)
  656. encoding.delete()
  657. logger.info("deleted the Encoding object")
  658. if encoding.temp_file:
  659. kill_ffmpeg_process(encoding.temp_file)
  660. except BaseException:
  661. pass
  662. return True
  663. def kill_ffmpeg_process(filepath):
  664. # this is not ideal, ffmpeg pid could be linked to the Encoding object
  665. cmd = "ps aux|grep 'ffmpeg'|grep %s|grep -v grep |awk '{print $2}'" % filepath
  666. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  667. pid = result.stdout.decode("utf-8").strip()
  668. if pid:
  669. cmd = "kill -9 %s" % pid
  670. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  671. return result
  672. @task(name="remove_media_file", base=Task, queue="long_tasks")
  673. def remove_media_file(media_file=None):
  674. rm_file(media_file)
  675. return True
  676. # TODO LIST
  677. # 1 chunks are deleted from original server when file is fully encoded.
  678. # however need to enter this logic in cases of fail as well
  679. # 2 script to delete chunks in fail status
  680. # (and check for their encdings, and delete them as well, along with
  681. # all chunks)
  682. # 3 beat task, remove chunks