tasks.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017
  1. import json
  2. import os
  3. import re
  4. import shutil
  5. import subprocess
  6. import tempfile
  7. from datetime import datetime, timedelta
  8. from celery import Task
  9. from celery.decorators import task
  10. from celery.exceptions import SoftTimeLimitExceeded
  11. from celery.signals import task_revoked
  12. from celery.task.control import revoke
  13. from celery.utils.log import get_task_logger
  14. from django.conf import settings
  15. from django.core.cache import cache
  16. from django.core.files import File
  17. from django.db.models import Q
  18. from actions.models import USER_MEDIA_ACTIONS, MediaAction
  19. from actions.models import USER_VOICE_ACTIONS, VoiceAction
  20. from users.models import User
  21. from .backends import FFmpegBackend
  22. from .exceptions import VideoEncodingError
  23. from .helpers import (
  24. calculate_seconds,
  25. create_temp_file,
  26. get_file_name,
  27. get_file_type,
  28. media_file_info,
  29. produce_ffmpeg_commands,
  30. produce_friendly_token,
  31. rm_file,
  32. run_command,
  33. url_from_path, # Needed to combine video with some voices.
  34. )
  35. from .methods import list_tasks, notify_users, pre_save_action
  36. from .methods import pre_save_action__voice
  37. from .methods import pre_save_action__videowithvoices
  38. from .models import Category, EncodeProfile, Encoding, Media, Rating, Tag
  39. from .models import Voice
  40. logger = get_task_logger(__name__)
  41. VALID_USER_ACTIONS = [action for action, name in USER_MEDIA_ACTIONS]
  42. VALID_VOICE_ACTIONS = [action for action, name in USER_VOICE_ACTIONS]
  43. ERRORS_LIST = [
  44. "Output file is empty, nothing was encoded",
  45. "Invalid data found when processing input",
  46. "Unable to find a suitable output format for",
  47. ]
  48. @task(name="chunkize_media", bind=True, queue="short_tasks", soft_time_limit=60 * 30)
  49. def chunkize_media(self, friendly_token, profiles, force=True):
  50. """Break media in chunks and start encoding tasks"""
  51. profiles = [EncodeProfile.objects.get(id=profile) for profile in profiles]
  52. media = Media.objects.get(friendly_token=friendly_token)
  53. cwd = os.path.dirname(os.path.realpath(media.media_file.path))
  54. file_name = media.media_file.path.split("/")[-1]
  55. random_prefix = produce_friendly_token()
  56. file_format = "{0}_{1}".format(random_prefix, file_name)
  57. chunks_file_name = "%02d_{0}".format(file_format)
  58. chunks_file_name += ".mkv"
  59. cmd = [
  60. settings.FFMPEG_COMMAND,
  61. "-y",
  62. "-i",
  63. media.media_file.path,
  64. "-c",
  65. "copy",
  66. "-f",
  67. "segment",
  68. "-segment_time",
  69. str(settings.VIDEO_CHUNKS_DURATION),
  70. chunks_file_name,
  71. ]
  72. chunks = []
  73. ret = run_command(cmd, cwd=cwd)
  74. if "out" in ret.keys():
  75. for line in ret.get("error").split("\n"):
  76. ch = re.findall(r"Opening \'([\W\w]+)\' for writing", line)
  77. if ch:
  78. chunks.append(ch[0])
  79. if not chunks:
  80. # command completely failed to segment file.putting to normal encode
  81. logger.info("Failed to break file {0} in chunks." " Putting to normal encode queue".format(friendly_token))
  82. for profile in profiles:
  83. if media.video_height and media.video_height < profile.resolution:
  84. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  85. continue
  86. encoding = Encoding(media=media, profile=profile)
  87. encoding.save()
  88. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  89. encode_media.delay(friendly_token, profile.id, encoding.id, enc_url, force=force)
  90. return False
  91. chunks = [os.path.join(cwd, ch) for ch in chunks]
  92. to_profiles = []
  93. chunks_dict = {}
  94. # calculate once md5sums
  95. for chunk in chunks:
  96. cmd = ["md5sum", chunk]
  97. stdout = run_command(cmd).get("out")
  98. md5sum = stdout.strip().split()[0]
  99. chunks_dict[chunk] = md5sum
  100. for profile in profiles:
  101. if media.video_height and media.video_height < profile.resolution:
  102. if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  103. continue
  104. to_profiles.append(profile)
  105. for chunk in chunks:
  106. encoding = Encoding(
  107. media=media,
  108. profile=profile,
  109. chunk_file_path=chunk,
  110. chunk=True,
  111. chunks_info=json.dumps(chunks_dict),
  112. md5sum=chunks_dict[chunk],
  113. )
  114. encoding.save()
  115. enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
  116. if profile.resolution in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
  117. priority = 0
  118. else:
  119. priority = 9
  120. encode_media.apply_async(
  121. args=[friendly_token, profile.id, encoding.id, enc_url],
  122. kwargs={"force": force, "chunk": True, "chunk_file_path": chunk},
  123. priority=priority,
  124. )
  125. logger.info("got {0} chunks and will encode to {1} profiles".format(len(chunks), to_profiles))
  126. return True
  127. class EncodingTask(Task):
  128. def on_failure(self, exc, task_id, args, kwargs, einfo):
  129. # mainly used to run some post failure steps
  130. # we get here if a task is revoked
  131. try:
  132. if hasattr(self, "encoding"):
  133. self.encoding.status = "fail"
  134. self.encoding.save(update_fields=["status"])
  135. kill_ffmpeg_process(self.encoding.temp_file)
  136. if hasattr(self.encoding, "media"):
  137. self.encoding.media.post_encode_actions()
  138. except BaseException:
  139. pass
  140. return False
  141. @task(
  142. name="encode_media",
  143. base=EncodingTask,
  144. bind=True,
  145. queue="long_tasks",
  146. soft_time_limit=settings.CELERY_SOFT_TIME_LIMIT,
  147. )
  148. def encode_media(
  149. self,
  150. friendly_token,
  151. profile_id,
  152. encoding_id,
  153. encoding_url,
  154. force=True,
  155. chunk=False,
  156. chunk_file_path="",
  157. ):
  158. """Encode a media to given profile, using ffmpeg, storing progress"""
  159. logger.info("Encode Media started, friendly token {0}, profile id {1}, force {2}".format(friendly_token, profile_id, force))
  160. if self.request.id:
  161. task_id = self.request.id
  162. else:
  163. task_id = None
  164. try:
  165. media = Media.objects.get(friendly_token=friendly_token)
  166. profile = EncodeProfile.objects.get(id=profile_id)
  167. except BaseException:
  168. Encoding.objects.filter(id=encoding_id).delete()
  169. return False
  170. # break logic with chunk True/False
  171. if chunk:
  172. # TODO: in case a video is chunkized and this enters here many times
  173. # it will always run since chunk_file_path is always different
  174. # thus find a better way for this check
  175. if Encoding.objects.filter(media=media, profile=profile, chunk_file_path=chunk_file_path).count() > 1 and force is False:
  176. Encoding.objects.filter(id=encoding_id).delete()
  177. return False
  178. else:
  179. try:
  180. encoding = Encoding.objects.get(id=encoding_id)
  181. encoding.status = "running"
  182. Encoding.objects.filter(
  183. media=media,
  184. profile=profile,
  185. chunk=True,
  186. chunk_file_path=chunk_file_path,
  187. ).exclude(id=encoding_id).delete()
  188. except BaseException:
  189. encoding = Encoding(
  190. media=media,
  191. profile=profile,
  192. status="running",
  193. chunk=True,
  194. chunk_file_path=chunk_file_path,
  195. )
  196. else:
  197. if Encoding.objects.filter(media=media, profile=profile).count() > 1 and force is False:
  198. Encoding.objects.filter(id=encoding_id).delete()
  199. return False
  200. else:
  201. try:
  202. encoding = Encoding.objects.get(id=encoding_id)
  203. encoding.status = "running"
  204. Encoding.objects.filter(media=media, profile=profile).exclude(id=encoding_id).delete()
  205. except BaseException:
  206. encoding = Encoding(media=media, profile=profile, status="running")
  207. if task_id:
  208. encoding.task_id = task_id
  209. encoding.worker = "localhost"
  210. encoding.retries = self.request.retries
  211. encoding.save()
  212. if profile.extension == "gif":
  213. tf = create_temp_file(suffix=".gif")
  214. # -ss 5 start from 5 second. -t 25 until 25 sec
  215. command = [
  216. settings.FFMPEG_COMMAND,
  217. "-y",
  218. "-ss",
  219. "3",
  220. "-i",
  221. media.media_file.path,
  222. "-hide_banner",
  223. "-vf",
  224. "scale=344:-1:flags=lanczos,fps=1",
  225. "-t",
  226. "25",
  227. "-f",
  228. "gif",
  229. tf,
  230. ]
  231. ret = run_command(command)
  232. if os.path.exists(tf) and get_file_type(tf) == "image":
  233. with open(tf, "rb") as f:
  234. myfile = File(f)
  235. encoding.status = "success"
  236. encoding.media_file.save(content=myfile, name=tf)
  237. rm_file(tf)
  238. return True
  239. else:
  240. return False
  241. if chunk:
  242. original_media_path = chunk_file_path
  243. else:
  244. original_media_path = media.media_file.path
  245. # if not media.duration:
  246. # encoding.status = "fail"
  247. # encoding.save(update_fields=["status"])
  248. # return False
  249. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
  250. tf = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  251. tfpass = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
  252. ffmpeg_commands = produce_ffmpeg_commands(
  253. original_media_path,
  254. media.media_info,
  255. resolution=profile.resolution,
  256. codec=profile.codec,
  257. output_filename=tf,
  258. pass_file=tfpass,
  259. chunk=chunk,
  260. )
  261. if not ffmpeg_commands:
  262. encoding.status = "fail"
  263. encoding.save(update_fields=["status"])
  264. return False
  265. encoding.temp_file = tf
  266. encoding.commands = str(ffmpeg_commands)
  267. encoding.save(update_fields=["temp_file", "commands", "task_id"])
  268. # binding these, so they are available on on_failure
  269. self.encoding = encoding
  270. self.media = media
  271. # can be one-pass or two-pass
  272. for ffmpeg_command in ffmpeg_commands:
  273. ffmpeg_command = [str(s) for s in ffmpeg_command]
  274. encoding_backend = FFmpegBackend()
  275. try:
  276. encoding_command = encoding_backend.encode(ffmpeg_command)
  277. duration, n_times = 0, 0
  278. output = ""
  279. while encoding_command:
  280. try:
  281. # TODO: understand an eternal loop
  282. # eg h265 with mv4 file issue, and stop with error
  283. output = next(encoding_command)
  284. duration = calculate_seconds(output)
  285. if duration:
  286. percent = duration * 100 / media.duration
  287. if n_times % 60 == 0:
  288. encoding.progress = percent
  289. try:
  290. encoding.save(update_fields=["progress", "update_date"])
  291. logger.info("Saved {0}".format(round(percent, 2)))
  292. except BaseException:
  293. pass
  294. n_times += 1
  295. except StopIteration:
  296. break
  297. except VideoEncodingError:
  298. # ffmpeg error, or ffmpeg was killed
  299. raise
  300. except Exception as e:
  301. try:
  302. # output is empty, fail message is on the exception
  303. output = e.message
  304. except AttributeError:
  305. output = ""
  306. if isinstance(e, SoftTimeLimitExceeded):
  307. kill_ffmpeg_process(encoding.temp_file)
  308. encoding.logs = output
  309. encoding.status = "fail"
  310. encoding.save(update_fields=["status", "logs"])
  311. raise_exception = True
  312. # if this is an ffmpeg's valid error
  313. # no need for the task to be re-run
  314. # otherwise rerun task...
  315. for error_msg in ERRORS_LIST:
  316. if error_msg.lower() in output.lower():
  317. raise_exception = False
  318. if raise_exception:
  319. raise self.retry(exc=e, countdown=5, max_retries=1)
  320. encoding.logs = output
  321. encoding.progress = 100
  322. success = False
  323. encoding.status = "fail"
  324. if os.path.exists(tf) and os.path.getsize(tf) != 0:
  325. ret = media_file_info(tf)
  326. if ret.get("is_video") or ret.get("is_audio"):
  327. encoding.status = "success"
  328. success = True
  329. with open(tf, "rb") as f:
  330. myfile = File(f)
  331. output_name = "{0}.{1}".format(get_file_name(original_media_path), profile.extension)
  332. encoding.media_file.save(content=myfile, name=output_name)
  333. encoding.total_run_time = (encoding.update_date - encoding.add_date).seconds
  334. try:
  335. encoding.save(update_fields=["status", "logs", "progress", "total_run_time"])
  336. # this will raise a django.db.utils.DatabaseError error when task is revoked,
  337. # since we delete the encoding at that stage
  338. except BaseException:
  339. pass
  340. return success
  341. @task(name="produce_sprite_from_video", queue="long_tasks")
  342. def produce_sprite_from_video(friendly_token):
  343. """Produces a sprites file for a video, uses ffmpeg"""
  344. try:
  345. media = Media.objects.get(friendly_token=friendly_token)
  346. except BaseException:
  347. logger.info("failed to get media with friendly_token %s" % friendly_token)
  348. return False
  349. with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as tmpdirname:
  350. try:
  351. tmpdir_image_files = tmpdirname + "/img%03d.jpg"
  352. output_name = tmpdirname + "/sprites.jpg"
  353. cmd = "{0} -i {1} -f image2 -vf 'fps=1/10, scale=160:90' {2}&&files=$(ls {3}/img*.jpg | sort -t '-' -n -k 2 | tr '\n' ' ')&&convert $files -append {4}".format(
  354. settings.FFMPEG_COMMAND,
  355. media.media_file.path,
  356. tmpdir_image_files,
  357. tmpdirname,
  358. output_name,
  359. )
  360. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  361. if os.path.exists(output_name) and get_file_type(output_name) == "image":
  362. with open(output_name, "rb") as f:
  363. myfile = File(f)
  364. media.sprites.save(
  365. content=myfile,
  366. name=get_file_name(media.media_file.path) + "sprites.jpg",
  367. )
  368. except BaseException:
  369. pass
  370. return True
  371. @task(name="create_hls", queue="long_tasks")
  372. def create_hls(friendly_token):
  373. """Creates HLS file for media, uses Bento4 mp4hls command"""
  374. if not hasattr(settings, "MP4HLS_COMMAND"):
  375. logger.info("Bento4 mp4hls command is missing from configuration")
  376. return False
  377. if not os.path.exists(settings.MP4HLS_COMMAND):
  378. logger.info("Bento4 mp4hls command is missing")
  379. return False
  380. try:
  381. media = Media.objects.get(friendly_token=friendly_token)
  382. except BaseException:
  383. logger.info("failed to get media with friendly_token %s" % friendly_token)
  384. return False
  385. p = media.uid.hex
  386. output_dir = os.path.join(settings.HLS_DIR, p)
  387. encodings = media.encodings.filter(profile__extension="mp4", status="success", chunk=False, profile__codec="h264")
  388. if encodings:
  389. existing_output_dir = None
  390. if os.path.exists(output_dir):
  391. existing_output_dir = output_dir
  392. output_dir = os.path.join(settings.HLS_DIR, p + produce_friendly_token())
  393. files = " ".join([f.media_file.path for f in encodings if f.media_file])
  394. cmd = "{0} --segment-duration=4 --output-dir={1} {2}".format(settings.MP4HLS_COMMAND, output_dir, files)
  395. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  396. if existing_output_dir:
  397. # override content with -T !
  398. cmd = "cp -rT {0} {1}".format(output_dir, existing_output_dir)
  399. subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  400. shutil.rmtree(output_dir)
  401. output_dir = existing_output_dir
  402. pp = os.path.join(output_dir, "master.m3u8")
  403. if os.path.exists(pp):
  404. if media.hls_file != pp:
  405. media.hls_file = pp
  406. media.save(update_fields=["hls_file"])
  407. return True
  408. @task(name="check_running_states", queue="short_tasks")
  409. def check_running_states():
  410. # Experimental - unused
  411. """Check stale running encodings and delete/reencode media"""
  412. encodings = Encoding.objects.filter(status="running")
  413. logger.info("got {0} encodings that are in state running".format(encodings.count()))
  414. changed = 0
  415. for encoding in encodings:
  416. now = datetime.now(encoding.update_date.tzinfo)
  417. if (now - encoding.update_date).seconds > settings.RUNNING_STATE_STALE:
  418. media = encoding.media
  419. profile = encoding.profile
  420. task_id = encoding.task_id
  421. # terminate task
  422. if task_id:
  423. revoke(task_id, terminate=True)
  424. encoding.delete()
  425. media.encode(profiles=[profile])
  426. # TODO: allign with new code + chunksize...
  427. changed += 1
  428. if changed:
  429. logger.info("changed from running to pending on {0} items".format(changed))
  430. return True
  431. @task(name="check_media_states", queue="short_tasks")
  432. def check_media_states():
  433. # Experimental - unused
  434. # check encoding status of not success media
  435. media = Media.objects.filter(Q(encoding_status="running") | Q(encoding_status="fail") | Q(encoding_status="pending"))
  436. logger.info("got {0} media that are not in state success".format(media.count()))
  437. changed = 0
  438. for m in media:
  439. m.set_encoding_status()
  440. m.save(update_fields=["encoding_status"])
  441. changed += 1
  442. if changed:
  443. logger.info("changed encoding status to {0} media items".format(changed))
  444. return True
  445. @task(name="check_pending_states", queue="short_tasks")
  446. def check_pending_states():
  447. # Experimental - unused
  448. # check encoding profiles that are on state pending and not on a queue
  449. encodings = Encoding.objects.filter(status="pending")
  450. if not encodings:
  451. return True
  452. changed = 0
  453. tasks = list_tasks()
  454. task_ids = tasks["task_ids"]
  455. media_profile_pairs = tasks["media_profile_pairs"]
  456. for encoding in encodings:
  457. if encoding.task_id and encoding.task_id in task_ids:
  458. # encoding is in one of the active/reserved/scheduled tasks list
  459. continue
  460. elif (
  461. encoding.media.friendly_token,
  462. encoding.profile.id,
  463. ) in media_profile_pairs:
  464. continue
  465. # encoding is in one of the reserved/scheduled tasks list.
  466. # has no task_id but will be run, so need to re-enter the queue
  467. else:
  468. media = encoding.media
  469. profile = encoding.profile
  470. encoding.delete()
  471. media.encode(profiles=[profile], force=False)
  472. changed += 1
  473. if changed:
  474. logger.info("set to the encode queue {0} encodings that were on pending state".format(changed))
  475. return True
  476. @task(name="check_missing_profiles", queue="short_tasks")
  477. def check_missing_profiles():
  478. # Experimental - unused
  479. # check if video files have missing profiles. If so, add them
  480. media = Media.objects.filter(media_type="video")
  481. profiles = list(EncodeProfile.objects.all())
  482. changed = 0
  483. for m in media:
  484. existing_profiles = [p.profile for p in m.encodings.all()]
  485. missing_profiles = [p for p in profiles if p not in existing_profiles]
  486. if missing_profiles:
  487. m.encode(profiles=missing_profiles, force=False)
  488. # since we call with force=False
  489. # encode_media won't delete existing profiles
  490. # if they appear on the meanwhile (eg on a big queue)
  491. changed += 1
  492. if changed:
  493. logger.info("set to the encode queue {0} profiles".format(changed))
  494. return True
  495. @task(name="clear_sessions", queue="short_tasks")
  496. def clear_sessions():
  497. """Clear expired sessions"""
  498. try:
  499. from importlib import import_module
  500. from django.conf import settings
  501. engine = import_module(settings.SESSION_ENGINE)
  502. engine.SessionStore.clear_expired()
  503. except BaseException:
  504. return False
  505. return True
  506. @task(name="save_user_action", queue="short_tasks")
  507. def save_user_action(user_or_session, friendly_token=None, action="watch", extra_info=None):
  508. """Short task that saves a user action"""
  509. if action not in VALID_USER_ACTIONS:
  510. return False
  511. try:
  512. media = Media.objects.get(friendly_token=friendly_token)
  513. except BaseException:
  514. return False
  515. user = user_or_session.get("user_id")
  516. session_key = user_or_session.get("user_session")
  517. remote_ip = user_or_session.get("remote_ip_addr")
  518. if user:
  519. try:
  520. user = User.objects.get(id=user)
  521. except BaseException:
  522. return False
  523. if not (user or session_key):
  524. return False
  525. if action in ["like", "dislike", "watch", "report"]:
  526. if not pre_save_action(
  527. media=media,
  528. user=user,
  529. session_key=session_key,
  530. action=action,
  531. remote_ip=remote_ip,
  532. ):
  533. return False
  534. if action == "watch":
  535. if user:
  536. MediaAction.objects.filter(user=user, media=media, action="watch").delete()
  537. else:
  538. MediaAction.objects.filter(session_key=session_key, media=media, action="watch").delete()
  539. if action == "rate":
  540. try:
  541. score = extra_info.get("score")
  542. rating_category = extra_info.get("category_id")
  543. except BaseException:
  544. # TODO: better error handling?
  545. return False
  546. try:
  547. rating = Rating.objects.filter(user=user, media=media, rating_category_id=rating_category).first()
  548. if rating:
  549. rating.score = score
  550. rating.save(update_fields=["score"])
  551. else:
  552. rating = Rating.objects.create(
  553. user=user,
  554. media=media,
  555. rating_category_id=rating_category,
  556. score=score,
  557. )
  558. except Exception:
  559. # TODO: more specific handling, for errors in score, or
  560. # rating_category?
  561. return False
  562. ma = MediaAction(
  563. user=user,
  564. session_key=session_key,
  565. media=media,
  566. action=action,
  567. extra_info=extra_info,
  568. remote_ip=remote_ip,
  569. )
  570. ma.save()
  571. if action == "watch":
  572. media.views += 1
  573. media.save(update_fields=["views"])
  574. elif action == "report":
  575. media.reported_times += 1
  576. if media.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
  577. media.state = "private"
  578. media.save(update_fields=["reported_times", "state"])
  579. notify_users(
  580. friendly_token=media.friendly_token,
  581. action="media_reported",
  582. extra=extra_info,
  583. )
  584. elif action == "like":
  585. media.likes += 1
  586. media.save(update_fields=["likes"])
  587. elif action == "dislike":
  588. media.dislikes += 1
  589. media.save(update_fields=["dislikes"])
  590. return True
  591. @task(name="save_voice_action", queue="short_tasks")
  592. def save_voice_action(user_or_session, friendly_token=None, action="watch", extra_info=None, uid=None):
  593. """Short task that saves a voice action"""
  594. if action not in VALID_VOICE_ACTIONS:
  595. return False
  596. try:
  597. media = Media.objects.get(friendly_token=friendly_token)
  598. except BaseException:
  599. return False
  600. try:
  601. voice = Voice.objects.get(uid=uid)
  602. except BaseException:
  603. return False
  604. user = user_or_session.get("user_id")
  605. session_key = user_or_session.get("user_session")
  606. remote_ip = user_or_session.get("remote_ip_addr")
  607. if user:
  608. try:
  609. user = User.objects.get(id=user)
  610. except BaseException:
  611. return False
  612. if not (user or session_key):
  613. return False
  614. # Check if user has alread done like/likeundo once. Avoid spam. And more.
  615. if action in ["like", "likeundo", "watch", "report"]:
  616. if not pre_save_action__voice(
  617. media=media,
  618. user=user,
  619. session_key=session_key,
  620. action=action,
  621. remote_ip=remote_ip,
  622. voice=voice
  623. ):
  624. return False
  625. # TODO: Exactly why the previous `watch` actions are deleted?
  626. if action == "watch":
  627. if user:
  628. VoiceAction.objects.filter(user=user, voice=voice, action="watch").delete()
  629. else:
  630. VoiceAction.objects.filter(session_key=session_key, voice=voice, action="watch").delete()
  631. # Check whether `filter` result is empty or not, by `.first()`
  632. # https://stackoverflow.com/a/50453580/3405291
  633. # In the case of `like` action, delete previous `likeundo` actions.
  634. if action == "like":
  635. if user:
  636. va = VoiceAction.objects.filter(user=user, voice=voice, action="likeundo").first()
  637. if va:
  638. VoiceAction.objects.filter(user=user, voice=voice, action="likeundo").delete()
  639. else:
  640. va = VoiceAction.objects.filter(session_key=session_key, voice=voice, action="likeundo").first()
  641. if va:
  642. VoiceAction.objects.filter(session_key=session_key, voice=voice, action="likeundo").delete()
  643. # In the case of `likeundo` action, delete previous `like` actions.
  644. if action == "likeundo":
  645. if user:
  646. va = VoiceAction.objects.filter(user=user, voice=voice, action="like").first()
  647. if va:
  648. VoiceAction.objects.filter(user=user, voice=voice, action="like").delete()
  649. else:
  650. va = VoiceAction.objects.filter(session_key=session_key, voice=voice, action="like").first()
  651. if va:
  652. VoiceAction.objects.filter(session_key=session_key, voice=voice, action="like").delete()
  653. # There is no `rate` action for voice. So, it's skipped.
  654. va = VoiceAction(
  655. user=user,
  656. session_key=session_key,
  657. media=media,
  658. action=action,
  659. extra_info=extra_info,
  660. remote_ip=remote_ip,
  661. voice=voice,
  662. )
  663. va.save()
  664. if action == "watch":
  665. voice.views += 1
  666. voice.save(update_fields=["views"])
  667. elif action == "report":
  668. voice.reported_times += 1
  669. if voice.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
  670. # Delete voice?
  671. voice.delete()
  672. voice.save(update_fields=["reported_times"])
  673. # TODO: notify_users
  674. # There is a code for notification. That might be helpful.
  675. elif action == "like":
  676. voice.likes += 1
  677. voice.save(update_fields=["likes"])
  678. elif action == "likeundo":
  679. voice.likes -= 1
  680. voice.save(update_fields=["likes"])
  681. return True
  682. ### Let's make this method a regular method, not a celery_short task.
  683. ### Since we have to return the path of the resulted video as HTTP response.
  684. ### Combining video with voices by FFMPEG should be quite fast. Just copy audio channels.
  685. ###
  686. #@task(name="video_with_voices", queue="short_tasks")
  687. def video_with_voices(user_or_session, friendly_token=None, voicesUid=None):
  688. #"""Short task that combines a video with some voices"""
  689. try:
  690. media = Media.objects.get(friendly_token=friendly_token)
  691. except BaseException:
  692. return False
  693. # Only video is acceptable.
  694. if media.media_type != "video":
  695. return False
  696. voices = []
  697. for uid in voicesUid:
  698. # Double-check voice existence.
  699. try:
  700. voice = Voice.objects.get(uid=uid)
  701. except BaseException:
  702. return False
  703. voices.append(voice)
  704. # To download a video combined with some voices,
  705. # we require a valid user or session.
  706. # We have to check remote_ip to avoid spam.
  707. user = user_or_session.get("user_id")
  708. session_key = user_or_session.get("user_session")
  709. remote_ip = user_or_session.get("remote_ip_addr")
  710. if user:
  711. try:
  712. user = User.objects.get(id=user)
  713. except BaseException:
  714. return False
  715. if not (user or session_key):
  716. return False
  717. # Avoid spam and more.
  718. if not pre_save_action__videowithvoices(
  719. media=media,
  720. user=user,
  721. session_key=session_key,
  722. action="getvideowithvoices",
  723. remote_ip=remote_ip,
  724. ):
  725. return False
  726. va = VoiceAction(
  727. user=user,
  728. session_key=session_key,
  729. media=media,
  730. action="getvideowithvoices",
  731. remote_ip=remote_ip,
  732. # Voice is not really needed for this action, but pass voice just to avoid this database error:
  733. # ERROR: null value in column "voice_id" of relation "actions_voiceaction" violates not-null constraint
  734. voice=voice,
  735. )
  736. va.save()
  737. # Combine video with the voices.
  738. cwd = os.path.dirname(os.path.realpath(media.media_file.path))
  739. video_name = media.media_file.path.split("/")[-1]
  740. random_prefix = produce_friendly_token()
  741. result_file_name = "{0}_{1}".format(random_prefix, video_name)
  742. result_file_name = "combineVideoWithSomeVoices_{0}".format(result_file_name)
  743. result_file_name += ".mkv"
  744. # To add a new audio track into an existing video with audio, use:
  745. # https://stackoverflow.com/a/70001304/3405291
  746. cmd = [
  747. settings.FFMPEG_COMMAND,
  748. "-i",
  749. media.media_file.path,
  750. ]
  751. for voice in voices:
  752. cmd.append("-i")
  753. cmd.append(voice.voice_file.path)
  754. # To combine voices.
  755. cmd.append("-filter_complex")
  756. voiceCounter = 1
  757. arg = ""
  758. for voice in voices:
  759. arg += "[{0}:a] ".format(voiceCounter)
  760. voiceCounter += 1
  761. arg += "amix=inputs={0}:duration=longest:dropout_transition=0".format(len(voices))
  762. cmd.append(arg)
  763. cmd.append("-c:v")
  764. cmd.append("copy")
  765. # The -an option disables audio recording for the output file,
  766. # which means that the original audio track from the input video
  767. # will not be included in the output file.
  768. # The video stream is still copied over using -c:v copy,
  769. # and the combined audio track is created using the amix filter as before.
  770. cmd.append("-an")
  771. cmd.append(result_file_name)
  772. ret = run_command(cmd, cwd=cwd)
  773. # You can double-check result file by this command:
  774. # /usr/local/bin/docker-compose exec web ls -lhrtc /tmp/
  775. # You can copy result file from docker container to host by a command like:
  776. # docker cp <containerId>:/tmp/<result_file_name> /home/m3/Downloads/
  777. result_file_path = os.path.join(cwd, result_file_name)
  778. result_file_url = url_from_path(result_file_path)
  779. return {"result_file_url": result_file_url, "ffmpeg_return": ret}
  780. @task(name="get_list_of_popular_media", queue="long_tasks")
  781. def get_list_of_popular_media():
  782. """Experimental task for preparing media listing
  783. for index page / recommended section
  784. calculate and return the top 50 popular media, based on two rules
  785. X = the top 25 videos that have the most views during the last week
  786. Y = the most recent 25 videos that have been liked over the last 6 months
  787. """
  788. valid_media_x = {}
  789. valid_media_y = {}
  790. basic_query = Q(listable=True)
  791. media_x = Media.objects.filter(basic_query).values("friendly_token")
  792. period_x = datetime.now() - timedelta(days=7)
  793. period_y = datetime.now() - timedelta(days=30 * 6)
  794. for media in media_x:
  795. ft = media["friendly_token"]
  796. num = MediaAction.objects.filter(action_date__gte=period_x, action="watch", media__friendly_token=ft).count()
  797. if num:
  798. valid_media_x[ft] = num
  799. num = MediaAction.objects.filter(action_date__gte=period_y, action="like", media__friendly_token=ft).count()
  800. if num:
  801. valid_media_y[ft] = num
  802. x = sorted(valid_media_x.items(), key=lambda kv: kv[1], reverse=True)[:25]
  803. y = sorted(valid_media_y.items(), key=lambda kv: kv[1], reverse=True)[:25]
  804. media_ids = [a[0] for a in x]
  805. media_ids.extend([a[0] for a in y])
  806. media_ids = list(set(media_ids))
  807. cache.set("popular_media_ids", media_ids, 60 * 60 * 12)
  808. logger.info("saved popular media ids")
  809. return True
  810. @task(name="update_listings_thumbnails", queue="long_tasks")
  811. def update_listings_thumbnails():
  812. """Populate listings_thumbnail field for models"""
  813. # Categories
  814. used_media = []
  815. saved = 0
  816. qs = Category.objects.filter().order_by("-media_count")
  817. for object in qs:
  818. media = Media.objects.exclude(friendly_token__in=used_media).filter(category=object, state="public", is_reviewed=True).order_by("-views").first()
  819. if media:
  820. object.listings_thumbnail = media.thumbnail_url
  821. object.save(update_fields=["listings_thumbnail"])
  822. used_media.append(media.friendly_token)
  823. saved += 1
  824. logger.info("updated {} categories".format(saved))
  825. # Tags
  826. used_media = []
  827. saved = 0
  828. qs = Tag.objects.filter().order_by("-media_count")
  829. for object in qs:
  830. media = Media.objects.exclude(friendly_token__in=used_media).filter(tags=object, state="public", is_reviewed=True).order_by("-views").first()
  831. if media:
  832. object.listings_thumbnail = media.thumbnail_url
  833. object.save(update_fields=["listings_thumbnail"])
  834. used_media.append(media.friendly_token)
  835. saved += 1
  836. logger.info("updated {} tags".format(saved))
  837. return True
  838. @task_revoked.connect
  839. def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
  840. # For encode_media tasks that are revoked,
  841. # ffmpeg command won't be stopped, since
  842. # it got started by a subprocess.
  843. # Need to stop that process
  844. # Also, removing the Encoding object,
  845. # since the task that would prepare it was killed
  846. # Maybe add a killed state for Encoding objects
  847. try:
  848. uid = kwargs["request"].task_id
  849. if uid:
  850. encoding = Encoding.objects.get(task_id=uid)
  851. encoding.delete()
  852. logger.info("deleted the Encoding object")
  853. if encoding.temp_file:
  854. kill_ffmpeg_process(encoding.temp_file)
  855. except BaseException:
  856. pass
  857. return True
  858. def kill_ffmpeg_process(filepath):
  859. # this is not ideal, ffmpeg pid could be linked to the Encoding object
  860. cmd = "ps aux|grep 'ffmpeg'|grep %s|grep -v grep |awk '{print $2}'" % filepath
  861. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  862. pid = result.stdout.decode("utf-8").strip()
  863. if pid:
  864. cmd = "kill -9 %s" % pid
  865. result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
  866. return result
  867. @task(name="remove_media_file", base=Task, queue="long_tasks")
  868. def remove_media_file(media_file=None):
  869. rm_file(media_file)
  870. return True
  871. # TODO LIST
  872. # 1 chunks are deleted from original server when file is fully encoded.
  873. # however need to enter this logic in cases of fail as well
  874. # 2 script to delete chunks in fail status
  875. # (and check for their encdings, and delete them as well, along with
  876. # all chunks)
  877. # 3 beat task, remove chunks