tasks.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852
  1. import json
  2. import os
  3. import re
  4. import shutil
  5. import subprocess
  6. import tempfile
  7. from datetime import datetime, timedelta
  8. from celery import Task
  9. from celery.decorators import task
  10. from celery.exceptions import SoftTimeLimitExceeded
  11. from celery.signals import task_revoked
  12. from celery.task.control import revoke
  13. from celery.utils.log import get_task_logger
  14. from django.conf import settings
  15. from django.core.cache import cache
  16. from django.core.files import File
  17. from django.db.models import Q
  18. from actions.models import USER_MEDIA_ACTIONS, MediaAction
  19. from actions.models import USER_VOICE_ACTIONS, VoiceAction
  20. from users.models import User
  21. from .backends import FFmpegBackend
  22. from .exceptions import VideoEncodingError
  23. from .helpers import (
  24. calculate_seconds,
  25. create_temp_file,
  26. get_file_name,
  27. get_file_type,
  28. media_file_info,
  29. produce_ffmpeg_commands,
  30. produce_friendly_token,
  31. rm_file,
  32. run_command,
  33. )
  34. from .methods import list_tasks, notify_users, pre_save_action
  35. from .methods import pre_save_action__voice
  36. from .models import Category, EncodeProfile, Encoding, Media, Rating, Tag
  37. from .models import Voice
  38. logger = get_task_logger(__name__)
  39. VALID_USER_ACTIONS = [action for action, name in USER_MEDIA_ACTIONS]
  40. VALID_VOICE_ACTIONS = [action for action, name in USER_VOICE_ACTIONS]
  41. ERRORS_LIST = [
  42. "Output file is empty, nothing was encoded",
  43. "Invalid data found when processing input",
  44. "Unable to find a suitable output format for",
  45. ]
  46. @task(name="chunkize_media", bind=True, queue="short_tasks", soft_time_limit=60 * 30)
  47. def chunkize_media(self, friendly_token, profiles, force=True):
  48. """Break media in chunks and start encoding tasks"""
  49. profiles = [EncodeProfile.objects.get(id=profile) for profile in profiles]
  50. media = Media.objects.get(friendly_token=friendly_token)
  51. cwd = os.path.dirname(os.path.realpath(media.media_file.path))
  52. file_name = media.media_file.path.split("/")[-1]
  53. random_prefix = produce_friendly_token()
  54. file_format = "{0}_{1}".format(random_prefix, file_name)
  55. chunks_file_name = "%02d_{0}".format(file_format)
  56. chunks_file_name += ".mkv"
  57. cmd = [
  58. settings.FFMPEG_COMMAND,
  59. "-y",
  60. "-i",
  61. media.media_file.path,
  62. "-c",
  63. "copy",
  64. "-f",
  65. "segment",
  66. "-segment_time",
  67. str(settings.VIDEO_CHUNKS_DURATION),
  68. chunks_file_name,
  69. ]
  70. chunks = []
  71. ret = run_command(cmd, cwd=cwd)
  72. if "out" in ret.keys():
  73. for line in ret.get("error").split("\n"):
  74. ch = re.findall(r"Opening \'([\W\w]+)\' for writing", line)
  75. if ch:
  76. chunks.append(ch[0])
  77. if not chunks:
  78. # command completely failed to segment file.putting to normal encode
  79. logger.info("Failed to break file {0} in chunks." " Putting to normal encode queue".format(friendly_token))
  80. for profile in profiles:
  81. if media.video_height and media.video_height < profile.resolution:
  82. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  83. continue
  84. encoding = Encoding(media=media, profile=profile)
  85. encoding.save()
  86. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  87. encode_media.delay(friendly_token, profile.id, encoding.id, enc_url, force=force)
  88. return False
  89. chunks = [os.path.join(cwd, ch) for ch in chunks]
  90. to_profiles = []
  91. chunks_dict = {}
  92. # calculate once md5sums
  93. for chunk in chunks:
  94. cmd = ["md5sum", chunk]
  95. stdout = run_command(cmd).get("out")
  96. md5sum = stdout.strip().split()[0]
  97. chunks_dict[chunk] = md5sum
  98. for profile in profiles:
  99. if media.video_height and media.video_height < profile.resolution:
  100. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  101. continue
  102. to_profiles.append(profile)
  103. for chunk in chunks:
  104. encoding = Encoding(
  105. media=media,
  106. profile=profile,
  107. chunk_file_path=chunk,
  108. chunk=True,
  109. chunks_info=json.dumps(chunks_dict),
  110. md5sum=chunks_dict[chunk],
  111. )
  112. encoding.save()
  113. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  114. if profile.resolution in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  115. priority = 0
  116. else:
  117. priority = 9
  118. encode_media.apply_async(
  119. args=[friendly_token, profile.id, encoding.id, enc_url],
  120. kwargs={"force": force, "chunk": True, "chunk_file_path": chunk},
  121. priority=priority,
  122. )
  123. logger.info("got {0} chunks and will encode to {1} profiles".format(len(chunks), to_profiles))
  124. return True
  125. class EncodingTask(Task):
  126. def on_failure(self, exc, task_id, args, kwargs, einfo):
  127. # mainly used to run some post failure steps
  128. # we get here if a task is revoked
  129. try:
  130. if hasattr(self, "encoding"):
  131. self.encoding.status = "fail"
  132. self.encoding.save(update_fields=["status"])
  133. kill_ffmpeg_process(self.encoding.temp_file)
  134. if hasattr(self.encoding, "media"):
  135. self.encoding.media.post_encode_actions()
  136. except BaseException:
  137. pass
  138. return False
  139. @task(
  140. name="encode_media",
  141. base=EncodingTask,
  142. bind=True,
  143. queue="long_tasks",
  144. soft_time_limit=settings.CELERY_SOFT_TIME_LIMIT,
  145. )
  146. def encode_media(
  147. self,
  148. friendly_token,
  149. profile_id,
  150. encoding_id,
  151. encoding_url,
  152. force=True,
  153. chunk=False,
  154. chunk_file_path="",
  155. ):
  156. """Encode a media to given profile, using ffmpeg, storing progress"""
  157. logger.info("Encode Media started, friendly token {0}, profile id {1}, force {2}".format(friendly_token, profile_id, force))
  158. if self.request.id:
  159. task_id = self.request.id
  160. else:
  161. task_id = None
  162. try:
  163. media = Media.objects.get(friendly_token=friendly_token)
  164. profile = EncodeProfile.objects.get(id=profile_id)
  165. except BaseException:
  166. Encoding.objects.filter(id=encoding_id).delete()
  167. return False
  168. # break logic with chunk True/False
  169. if chunk:
  170. # TODO: in case a video is chunkized and this enters here many times
  171. # it will always run since chunk_file_path is always different
  172. # thus find a better way for this check
  173. if Encoding.objects.filter(media=media, profile=profile, chunk_file_path=chunk_file_path).count() > 1 and force is False:
  174. Encoding.objects.filter(id=encoding_id).delete()
  175. return False
  176. else:
  177. try:
  178. encoding = Encoding.objects.get(id=encoding_id)
  179. encoding.status = "running"
  180. Encoding.objects.filter(
  181. media=media,
  182. profile=profile,
  183. chunk=True,
  184. chunk_file_path=chunk_file_path,
  185. ).exclude(id=encoding_id).delete()
  186. except BaseException:
  187. encoding = Encoding(
  188. media=media,
  189. profile=profile,
  190. status="running",
  191. chunk=True,
  192. chunk_file_path=chunk_file_path,
  193. )
  194. else:
  195. if Encoding.objects.filter(media=media, profile=profile).count() > 1 and force is False:
  196. Encoding.objects.filter(id=encoding_id).delete()
  197. return False
  198. else:
  199. try:
  200. encoding = Encoding.objects.get(id=encoding_id)
  201. encoding.status = "running"
  202. Encoding.objects.filter(media=media, profile=profile).exclude(id=encoding_id).delete()
  203. except BaseException:
  204. encoding = Encoding(media=media, profile=profile, status="running")
  205. if task_id:
  206. encoding.task_id = task_id
  207. encoding.worker = "localhost"
  208. encoding.retries = self.request.retries
  209. encoding.save()
  210. if profile.extension == "gif":
  211. tf = create_temp_file(suffix=".gif")
  212. # -ss 5 start from 5 second. -t 25 until 25 sec
  213. command = [
  214. settings.FFMPEG_COMMAND,
  215. "-y",
  216. "-ss",
  217. "3",
  218. "-i",
  219. media.media_file.path,
  220. "-hide_banner",
  221. "-vf",
  222. "scale=344:-1:flags=lanczos,fps=1",
  223. "-t",
  224. "25",
  225. "-f",
  226. "gif",
  227. tf,
  228. ]
  229. ret = run_command(command)
  230. if os.path.exists(tf) and get_file_type(tf) == "image":
  231. with open(tf, "rb") as f:
  232. myfile = File(f)
  233. encoding.status = "success"
  234. encoding.media_file.save(content=myfile, name=tf)
  235. rm_file(tf)
  236. return True
  237. else:
  238. return False
  239. if chunk:
  240. original_media_path = chunk_file_path
  241. else:
  242. original_media_path = media.media_file.path
  243. # if not media.duration:
  244. # encoding.status = "fail"
  245. # encoding.save(update_fields=["status"])
  246. # return False
  247. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
  248. tf = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  249. tfpass = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  250. ffmpeg_commands = produce_ffmpeg_commands(
  251. original_media_path,
  252. media.media_info,
  253. resolution=profile.resolution,
  254. codec=profile.codec,
  255. output_filename=tf,
  256. pass_file=tfpass,
  257. chunk=chunk,
  258. )
  259. if not ffmpeg_commands:
  260. encoding.status = "fail"
  261. encoding.save(update_fields=["status"])
  262. return False
  263. encoding.temp_file = tf
  264. encoding.commands = str(ffmpeg_commands)
  265. encoding.save(update_fields=["temp_file", "commands", "task_id"])
  266. # binding these, so they are available on on_failure
  267. self.encoding = encoding
  268. self.media = media
  269. # can be one-pass or two-pass
  270. for ffmpeg_command in ffmpeg_commands:
  271. ffmpeg_command = [str(s) for s in ffmpeg_command]
  272. encoding_backend = FFmpegBackend()
  273. try:
  274. encoding_command = encoding_backend.encode(ffmpeg_command)
  275. duration, n_times = 0, 0
  276. output = ""
  277. while encoding_command:
  278. try:
  279. # TODO: understand an eternal loop
  280. # eg h265 with mv4 file issue, and stop with error
  281. output = next(encoding_command)
  282. duration = calculate_seconds(output)
  283. if duration:
  284. percent = duration * 100 / media.duration
  285. if n_times % 60 == 0:
  286. encoding.progress = percent
  287. try:
  288. encoding.save(update_fields=["progress", "update_date"])
  289. logger.info("Saved {0}".format(round(percent, 2)))
  290. except BaseException:
  291. pass
  292. n_times += 1
  293. except StopIteration:
  294. break
  295. except VideoEncodingError:
  296. # ffmpeg error, or ffmpeg was killed
  297. raise
  298. except Exception as e:
  299. try:
  300. # output is empty, fail message is on the exception
  301. output = e.message
  302. except AttributeError:
  303. output = ""
  304. if isinstance(e, SoftTimeLimitExceeded):
  305. kill_ffmpeg_process(encoding.temp_file)
  306. encoding.logs = output
  307. encoding.status = "fail"
  308. encoding.save(update_fields=["status", "logs"])
  309. raise_exception = True
  310. # if this is an ffmpeg's valid error
  311. # no need for the task to be re-run
  312. # otherwise rerun task...
  313. for error_msg in ERRORS_LIST:
  314. if error_msg.lower() in output.lower():
  315. raise_exception = False
  316. if raise_exception:
  317. raise self.retry(exc=e, countdown=5, max_retries=1)
  318. encoding.logs = output
  319. encoding.progress = 100
  320. success = False
  321. encoding.status = "fail"
  322. if os.path.exists(tf) and os.path.getsize(tf) != 0:
  323. ret = media_file_info(tf)
  324. if ret.get("is_video") or ret.get("is_audio"):
  325. encoding.status = "success"
  326. success = True
  327. with open(tf, "rb") as f:
  328. myfile = File(f)
  329. output_name = "{0}.{1}".format(get_file_name(original_media_path), profile.extension)
  330. encoding.media_file.save(content=myfile, name=output_name)
  331. encoding.total_run_time = (encoding.update_date - encoding.add_date).seconds
  332. try:
  333. encoding.save(update_fields=["status", "logs", "progress", "total_run_time"])
  334. # this will raise a django.db.utils.DatabaseError error when task is revoked,
  335. # since we delete the encoding at that stage
  336. except BaseException:
  337. pass
  338. return success
  339. @task(name="produce_sprite_from_video", queue="long_tasks")
  340. def produce_sprite_from_video(friendly_token):
  341. """Produces a sprites file for a video, uses ffmpeg"""
  342. try:
  343. media = Media.objects.get(friendly_token=friendly_token)
  344. except BaseException:
  345. logger.info("failed to get media with friendly_token %s" % friendly_token)
  346. return False
  347. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as tmpdirname:
  348. try:
  349. tmpdir_image_files = tmpdirname + "/img%03d.jpg"
  350. output_name = tmpdirname + "/sprites.jpg"
  351. cmd = "{0} -i {1} -f image2 -vf 'fps=1/10, scale=160:90' {2}&&files=$(ls {3}/img*.jpg | sort -t '-' -n -k 2 | tr '\n' ' ')&&convert $files -append {4}".format(
  352. settings.FFMPEG_COMMAND,
  353. media.media_file.path,
  354. tmpdir_image_files,
  355. tmpdirname,
  356. output_name,
  357. )
  358. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  359. if os.path.exists(output_name) and get_file_type(output_name) == "image":
  360. with open(output_name, "rb") as f:
  361. myfile = File(f)
  362. media.sprites.save(
  363. content=myfile,
  364. name=get_file_name(media.media_file.path) + "sprites.jpg",
  365. )
  366. except BaseException:
  367. pass
  368. return True
  369. @task(name="create_hls", queue="long_tasks")
  370. def create_hls(friendly_token):
  371. """Creates HLS file for media, uses Bento4 mp4hls command"""
  372. if not hasattr(settings, "MP4HLS_COMMAND"):
  373. logger.info("Bento4 mp4hls command is missing from configuration")
  374. return False
  375. if not os.path.exists(settings.MP4HLS_COMMAND):
  376. logger.info("Bento4 mp4hls command is missing")
  377. return False
  378. try:
  379. media = Media.objects.get(friendly_token=friendly_token)
  380. except BaseException:
  381. logger.info("failed to get media with friendly_token %s" % friendly_token)
  382. return False
  383. p = media.uid.hex
  384. output_dir = os.path.join(settings.HLS_DIR, p)
  385. encodings = media.encodings.filter(profile__extension="mp4", status="success", chunk=False, profile__codec="h264")
  386. if encodings:
  387. existing_output_dir = None
  388. if os.path.exists(output_dir):
  389. existing_output_dir = output_dir
  390. output_dir = os.path.join(settings.HLS_DIR, p + produce_friendly_token())
  391. files = " ".join([f.media_file.path for f in encodings if f.media_file])
  392. cmd = "{0} --segment-duration=4 --output-dir={1} {2}".format(settings.MP4HLS_COMMAND, output_dir, files)
  393. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  394. if existing_output_dir:
  395. # override content with -T !
  396. cmd = "cp -rT {0} {1}".format(output_dir, existing_output_dir)
  397. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  398. shutil.rmtree(output_dir)
  399. output_dir = existing_output_dir
  400. pp = os.path.join(output_dir, "master.m3u8")
  401. if os.path.exists(pp):
  402. if media.hls_file != pp:
  403. media.hls_file = pp
  404. media.save(update_fields=["hls_file"])
  405. return True
  406. @task(name="check_running_states", queue="short_tasks")
  407. def check_running_states():
  408. # Experimental - unused
  409. """Check stale running encodings and delete/reencode media"""
  410. encodings = Encoding.objects.filter(status="running")
  411. logger.info("got {0} encodings that are in state running".format(encodings.count()))
  412. changed = 0
  413. for encoding in encodings:
  414. now = datetime.now(encoding.update_date.tzinfo)
  415. if (now - encoding.update_date).seconds > settings.RUNNING_STATE_STALE:
  416. media = encoding.media
  417. profile = encoding.profile
  418. task_id = encoding.task_id
  419. # terminate task
  420. if task_id:
  421. revoke(task_id, terminate=True)
  422. encoding.delete()
  423. media.encode(profiles=[profile])
  424. # TODO: allign with new code + chunksize...
  425. changed += 1
  426. if changed:
  427. logger.info("changed from running to pending on {0} items".format(changed))
  428. return True
  429. @task(name="check_media_states", queue="short_tasks")
  430. def check_media_states():
  431. # Experimental - unused
  432. # check encoding status of not success media
  433. media = Media.objects.filter(Q(encoding_status="running") | Q(encoding_status="fail") | Q(encoding_status="pending"))
  434. logger.info("got {0} media that are not in state success".format(media.count()))
  435. changed = 0
  436. for m in media:
  437. m.set_encoding_status()
  438. m.save(update_fields=["encoding_status"])
  439. changed += 1
  440. if changed:
  441. logger.info("changed encoding status to {0} media items".format(changed))
  442. return True
  443. @task(name="check_pending_states", queue="short_tasks")
  444. def check_pending_states():
  445. # Experimental - unused
  446. # check encoding profiles that are on state pending and not on a queue
  447. encodings = Encoding.objects.filter(status="pending")
  448. if not encodings:
  449. return True
  450. changed = 0
  451. tasks = list_tasks()
  452. task_ids = tasks["task_ids"]
  453. media_profile_pairs = tasks["media_profile_pairs"]
  454. for encoding in encodings:
  455. if encoding.task_id and encoding.task_id in task_ids:
  456. # encoding is in one of the active/reserved/scheduled tasks list
  457. continue
  458. elif (
  459. encoding.media.friendly_token,
  460. encoding.profile.id,
  461. ) in media_profile_pairs:
  462. continue
  463. # encoding is in one of the reserved/scheduled tasks list.
  464. # has no task_id but will be run, so need to re-enter the queue
  465. else:
  466. media = encoding.media
  467. profile = encoding.profile
  468. encoding.delete()
  469. media.encode(profiles=[profile], force=False)
  470. changed += 1
  471. if changed:
  472. logger.info("set to the encode queue {0} encodings that were on pending state".format(changed))
  473. return True
  474. @task(name="check_missing_profiles", queue="short_tasks")
  475. def check_missing_profiles():
  476. # Experimental - unused
  477. # check if video files have missing profiles. If so, add them
  478. media = Media.objects.filter(media_type="video")
  479. profiles = list(EncodeProfile.objects.all())
  480. changed = 0
  481. for m in media:
  482. existing_profiles = [p.profile for p in m.encodings.all()]
  483. missing_profiles = [p for p in profiles if p not in existing_profiles]
  484. if missing_profiles:
  485. m.encode(profiles=missing_profiles, force=False)
  486. # since we call with force=False
  487. # encode_media won't delete existing profiles
  488. # if they appear on the meanwhile (eg on a big queue)
  489. changed += 1
  490. if changed:
  491. logger.info("set to the encode queue {0} profiles".format(changed))
  492. return True
  493. @task(name="clear_sessions", queue="short_tasks")
  494. def clear_sessions():
  495. """Clear expired sessions"""
  496. try:
  497. from importlib import import_module
  498. from django.conf import settings
  499. engine = import_module(settings.SESSION_ENGINE)
  500. engine.SessionStore.clear_expired()
  501. except BaseException:
  502. return False
  503. return True
  504. @task(name="save_user_action", queue="short_tasks")
  505. def save_user_action(user_or_session, friendly_token=None, action="watch", extra_info=None):
  506. """Short task that saves a user action"""
  507. if action not in VALID_USER_ACTIONS:
  508. return False
  509. try:
  510. media = Media.objects.get(friendly_token=friendly_token)
  511. except BaseException:
  512. return False
  513. user = user_or_session.get("user_id")
  514. session_key = user_or_session.get("user_session")
  515. remote_ip = user_or_session.get("remote_ip_addr")
  516. if user:
  517. try:
  518. user = User.objects.get(id=user)
  519. except BaseException:
  520. return False
  521. if not (user or session_key):
  522. return False
  523. if action in ["like", "dislike", "watch", "report"]:
  524. if not pre_save_action(
  525. media=media,
  526. user=user,
  527. session_key=session_key,
  528. action=action,
  529. remote_ip=remote_ip,
  530. ):
  531. return False
  532. if action == "watch":
  533. if user:
  534. MediaAction.objects.filter(user=user, media=media, action="watch").delete()
  535. else:
  536. MediaAction.objects.filter(session_key=session_key, media=media, action="watch").delete()
  537. if action == "rate":
  538. try:
  539. score = extra_info.get("score")
  540. rating_category = extra_info.get("category_id")
  541. except BaseException:
  542. # TODO: better error handling?
  543. return False
  544. try:
  545. rating = Rating.objects.filter(user=user, media=media, rating_category_id=rating_category).first()
  546. if rating:
  547. rating.score = score
  548. rating.save(update_fields=["score"])
  549. else:
  550. rating = Rating.objects.create(
  551. user=user,
  552. media=media,
  553. rating_category_id=rating_category,
  554. score=score,
  555. )
  556. except Exception:
  557. # TODO: more specific handling, for errors in score, or
  558. # rating_category?
  559. return False
  560. ma = MediaAction(
  561. user=user,
  562. session_key=session_key,
  563. media=media,
  564. action=action,
  565. extra_info=extra_info,
  566. remote_ip=remote_ip,
  567. )
  568. ma.save()
  569. if action == "watch":
  570. media.views += 1
  571. media.save(update_fields=["views"])
  572. elif action == "report":
  573. media.reported_times += 1
  574. if media.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
  575. media.state = "private"
  576. media.save(update_fields=["reported_times", "state"])
  577. notify_users(
  578. friendly_token=media.friendly_token,
  579. action="media_reported",
  580. extra=extra_info,
  581. )
  582. elif action == "like":
  583. media.likes += 1
  584. media.save(update_fields=["likes"])
  585. elif action == "dislike":
  586. media.dislikes += 1
  587. media.save(update_fields=["dislikes"])
  588. return True
  589. @task(name="save_voice_action", queue="short_tasks")
  590. def save_voice_action(user_or_session, friendly_token=None, action="watch", extra_info=None, uid=None):
  591. """Short task that saves a voice action"""
  592. if action not in VALID_VOICE_ACTIONS:
  593. return False
  594. try:
  595. media = Media.objects.get(friendly_token=friendly_token)
  596. except BaseException:
  597. return False
  598. try:
  599. voice = Voice.objects.get(uid=uid)
  600. except BaseException:
  601. return False
  602. user = user_or_session.get("user_id")
  603. session_key = user_or_session.get("user_session")
  604. remote_ip = user_or_session.get("remote_ip_addr")
  605. if user:
  606. try:
  607. user = User.objects.get(id=user)
  608. except BaseException:
  609. return False
  610. if not (user or session_key):
  611. return False
  612. if action in ["like", "likeundo", "watch", "report"]:
  613. if not pre_save_action__voice(
  614. media=media,
  615. user=user,
  616. session_key=session_key,
  617. action=action,
  618. remote_ip=remote_ip,
  619. voice=voice
  620. ):
  621. return False
  622. if action == "watch":
  623. if user:
  624. VoiceAction.objects.filter(user=user, voice=voice, action="watch").delete()
  625. else:
  626. VoiceAction.objects.filter(session_key=session_key, voice=voice, action="watch").delete()
  627. # There is no `rate` action for voice. So, it's skipped.
  628. ma = VoiceAction(
  629. user=user,
  630. session_key=session_key,
  631. media=media,
  632. action=action,
  633. extra_info=extra_info,
  634. remote_ip=remote_ip,
  635. voice=voice,
  636. )
  637. ma.save()
  638. # TODO.
  639. @task(name="get_list_of_popular_media", queue="long_tasks")
  640. def get_list_of_popular_media():
  641. """Experimental task for preparing media listing
  642. for index page / recommended section
  643. calculate and return the top 50 popular media, based on two rules
  644. X = the top 25 videos that have the most views during the last week
  645. Y = the most recent 25 videos that have been liked over the last 6 months
  646. """
  647. valid_media_x = {}
  648. valid_media_y = {}
  649. basic_query = Q(listable=True)
  650. media_x = Media.objects.filter(basic_query).values("friendly_token")
  651. period_x = datetime.now() - timedelta(days=7)
  652. period_y = datetime.now() - timedelta(days=30 * 6)
  653. for media in media_x:
  654. ft = media["friendly_token"]
  655. num = MediaAction.objects.filter(action_date__gte=period_x, action="watch", media__friendly_token=ft).count()
  656. if num:
  657. valid_media_x[ft] = num
  658. num = MediaAction.objects.filter(action_date__gte=period_y, action="like", media__friendly_token=ft).count()
  659. if num:
  660. valid_media_y[ft] = num
  661. x = sorted(valid_media_x.items(), key=lambda kv: kv[1], reverse=True)[:25]
  662. y = sorted(valid_media_y.items(), key=lambda kv: kv[1], reverse=True)[:25]
  663. media_ids = [a[0] for a in x]
  664. media_ids.extend([a[0] for a in y])
  665. media_ids = list(set(media_ids))
  666. cache.set("popular_media_ids", media_ids, 60 * 60 * 12)
  667. logger.info("saved popular media ids")
  668. return True
  669. @task(name="update_listings_thumbnails", queue="long_tasks")
  670. def update_listings_thumbnails():
  671. """Populate listings_thumbnail field for models"""
  672. # Categories
  673. used_media = []
  674. saved = 0
  675. qs = Category.objects.filter().order_by("-media_count")
  676. for object in qs:
  677. media = Media.objects.exclude(friendly_token__in=used_media).filter(category=object, state="public", is_reviewed=True).order_by("-views").first()
  678. if media:
  679. object.listings_thumbnail = media.thumbnail_url
  680. object.save(update_fields=["listings_thumbnail"])
  681. used_media.append(media.friendly_token)
  682. saved += 1
  683. logger.info("updated {} categories".format(saved))
  684. # Tags
  685. used_media = []
  686. saved = 0
  687. qs = Tag.objects.filter().order_by("-media_count")
  688. for object in qs:
  689. media = Media.objects.exclude(friendly_token__in=used_media).filter(tags=object, state="public", is_reviewed=True).order_by("-views").first()
  690. if media:
  691. object.listings_thumbnail = media.thumbnail_url
  692. object.save(update_fields=["listings_thumbnail"])
  693. used_media.append(media.friendly_token)
  694. saved += 1
  695. logger.info("updated {} tags".format(saved))
  696. return True
  697. @task_revoked.connect
  698. def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
  699. # For encode_media tasks that are revoked,
  700. # ffmpeg command won't be stopped, since
  701. # it got started by a subprocess.
  702. # Need to stop that process
  703. # Also, removing the Encoding object,
  704. # since the task that would prepare it was killed
  705. # Maybe add a killed state for Encoding objects
  706. try:
  707. uid = kwargs["request"].task_id
  708. if uid:
  709. encoding = Encoding.objects.get(task_id=uid)
  710. encoding.delete()
  711. logger.info("deleted the Encoding object")
  712. if encoding.temp_file:
  713. kill_ffmpeg_process(encoding.temp_file)
  714. except BaseException:
  715. pass
  716. return True
  717. def kill_ffmpeg_process(filepath):
  718. # this is not ideal, ffmpeg pid could be linked to the Encoding object
  719. cmd = "ps aux|grep 'ffmpeg'|grep %s|grep -v grep |awk '{print $2}'" % filepath
  720. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  721. pid = result.stdout.decode("utf-8").strip()
  722. if pid:
  723. cmd = "kill -9 %s" % pid
  724. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  725. return result
  726. @task(name="remove_media_file", base=Task, queue="long_tasks")
  727. def remove_media_file(media_file=None):
  728. rm_file(media_file)
  729. return True
  730. # TODO LIST
  731. # 1 chunks are deleted from original server when file is fully encoded.
  732. # however need to enter this logic in cases of fail as well
  733. # 2 script to delete chunks in fail status
  734. # (and check for their encdings, and delete them as well, along with
  735. # all chunks)
  736. # 3 beat task, remove chunks