12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034 |
- import json
- import os
- import re
- import shutil
- import subprocess
- import tempfile
- from datetime import datetime, timedelta
- from celery import Task
- from celery.decorators import task
- from celery.exceptions import SoftTimeLimitExceeded
- from celery.signals import task_revoked
- from celery.task.control import revoke
- from celery.utils.log import get_task_logger
- from django.conf import settings
- from django.core.cache import cache
- from django.core.files import File
- from django.db.models import Q
- from actions.models import USER_MEDIA_ACTIONS, MediaAction
- from actions.models import USER_VOICE_ACTIONS, VoiceAction
- from users.models import User
- from .backends import FFmpegBackend
- from .exceptions import VideoEncodingError
- from .helpers import (
- calculate_seconds,
- create_temp_file,
- get_file_name,
- get_file_type,
- media_file_info,
- produce_ffmpeg_commands,
- produce_friendly_token,
- rm_file,
- run_command,
- url_from_path, # Needed to combine video with some voices.
- )
- from .methods import list_tasks, notify_users, pre_save_action
- from .methods import pre_save_action__voice
- from .methods import pre_save_action__videowithvoices
- from .models import Category, EncodeProfile, Encoding, Media, Rating, Tag
- from .models import Voice
- logger = get_task_logger(__name__)
- VALID_USER_ACTIONS = [action for action, name in USER_MEDIA_ACTIONS]
- VALID_VOICE_ACTIONS = [action for action, name in USER_VOICE_ACTIONS]
- ERRORS_LIST = [
- "Output file is empty, nothing was encoded",
- "Invalid data found when processing input",
- "Unable to find a suitable output format for",
- ]
- @task(name="chunkize_media", bind=True, queue="short_tasks", soft_time_limit=60 * 30)
- def chunkize_media(self, friendly_token, profiles, force=True):
- """Break media in chunks and start encoding tasks"""
- profiles = [EncodeProfile.objects.get(id=profile) for profile in profiles]
- media = Media.objects.get(friendly_token=friendly_token)
- cwd = os.path.dirname(os.path.realpath(media.media_file.path))
- file_name = media.media_file.path.split("/")[-1]
- random_prefix = produce_friendly_token()
- file_format = "{0}_{1}".format(random_prefix, file_name)
- chunks_file_name = "%02d_{0}".format(file_format)
- chunks_file_name += ".mkv"
- cmd = [
- settings.FFMPEG_COMMAND,
- "-y",
- "-i",
- media.media_file.path,
- "-c",
- "copy",
- "-f",
- "segment",
- "-segment_time",
- str(settings.VIDEO_CHUNKS_DURATION),
- chunks_file_name,
- ]
- chunks = []
- ret = run_command(cmd, cwd=cwd)
- if "out" in ret.keys():
- for line in ret.get("error").split("\n"):
- ch = re.findall(r"Opening \'([\W\w]+)\' for writing", line)
- if ch:
- chunks.append(ch[0])
- if not chunks:
- # command completely failed to segment file.putting to normal encode
- logger.info("Failed to break file {0} in chunks." " Putting to normal encode queue".format(friendly_token))
- for profile in profiles:
- if media.video_height and media.video_height < profile.resolution:
- if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
- continue
- encoding = Encoding(media=media, profile=profile)
- encoding.save()
- enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
- encode_media.delay(friendly_token, profile.id, encoding.id, enc_url, force=force)
- return False
- chunks = [os.path.join(cwd, ch) for ch in chunks]
- to_profiles = []
- chunks_dict = {}
- # calculate once md5sums
- for chunk in chunks:
- cmd = ["md5sum", chunk]
- stdout = run_command(cmd).get("out")
- md5sum = stdout.strip().split()[0]
- chunks_dict[chunk] = md5sum
- for profile in profiles:
- if media.video_height and media.video_height < profile.resolution:
- if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
- continue
- to_profiles.append(profile)
- for chunk in chunks:
- encoding = Encoding(
- media=media,
- profile=profile,
- chunk_file_path=chunk,
- chunk=True,
- chunks_info=json.dumps(chunks_dict),
- md5sum=chunks_dict[chunk],
- )
- encoding.save()
- enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
- if profile.resolution in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
- priority = 0
- else:
- priority = 9
- encode_media.apply_async(
- args=[friendly_token, profile.id, encoding.id, enc_url],
- kwargs={"force": force, "chunk": True, "chunk_file_path": chunk},
- priority=priority,
- )
- logger.info("got {0} chunks and will encode to {1} profiles".format(len(chunks), to_profiles))
- return True
- class EncodingTask(Task):
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- # mainly used to run some post failure steps
- # we get here if a task is revoked
- try:
- if hasattr(self, "encoding"):
- self.encoding.status = "fail"
- self.encoding.save(update_fields=["status"])
- kill_ffmpeg_process(self.encoding.temp_file)
- if hasattr(self.encoding, "media"):
- self.encoding.media.post_encode_actions()
- except BaseException:
- pass
- return False
- @task(
- name="encode_media",
- base=EncodingTask,
- bind=True,
- queue="long_tasks",
- soft_time_limit=settings.CELERY_SOFT_TIME_LIMIT,
- )
- def encode_media(
- self,
- friendly_token,
- profile_id,
- encoding_id,
- encoding_url,
- force=True,
- chunk=False,
- chunk_file_path="",
- ):
- """Encode a media to given profile, using ffmpeg, storing progress"""
- logger.info("Encode Media started, friendly token {0}, profile id {1}, force {2}".format(friendly_token, profile_id, force))
- if self.request.id:
- task_id = self.request.id
- else:
- task_id = None
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- profile = EncodeProfile.objects.get(id=profile_id)
- except BaseException:
- Encoding.objects.filter(id=encoding_id).delete()
- return False
- # break logic with chunk True/False
- if chunk:
- # TODO: in case a video is chunkized and this enters here many times
- # it will always run since chunk_file_path is always different
- # thus find a better way for this check
- if Encoding.objects.filter(media=media, profile=profile, chunk_file_path=chunk_file_path).count() > 1 and force is False:
- Encoding.objects.filter(id=encoding_id).delete()
- return False
- else:
- try:
- encoding = Encoding.objects.get(id=encoding_id)
- encoding.status = "running"
- Encoding.objects.filter(
- media=media,
- profile=profile,
- chunk=True,
- chunk_file_path=chunk_file_path,
- ).exclude(id=encoding_id).delete()
- except BaseException:
- encoding = Encoding(
- media=media,
- profile=profile,
- status="running",
- chunk=True,
- chunk_file_path=chunk_file_path,
- )
- else:
- if Encoding.objects.filter(media=media, profile=profile).count() > 1 and force is False:
- Encoding.objects.filter(id=encoding_id).delete()
- return False
- else:
- try:
- encoding = Encoding.objects.get(id=encoding_id)
- encoding.status = "running"
- Encoding.objects.filter(media=media, profile=profile).exclude(id=encoding_id).delete()
- except BaseException:
- encoding = Encoding(media=media, profile=profile, status="running")
- if task_id:
- encoding.task_id = task_id
- encoding.worker = "localhost"
- encoding.retries = self.request.retries
- encoding.save()
- if profile.extension == "gif":
- tf = create_temp_file(suffix=".gif")
- # -ss 5 start from 5 second. -t 25 until 25 sec
- command = [
- settings.FFMPEG_COMMAND,
- "-y",
- "-ss",
- "3",
- "-i",
- media.media_file.path,
- "-hide_banner",
- "-vf",
- "scale=344:-1:flags=lanczos,fps=1",
- "-t",
- "25",
- "-f",
- "gif",
- tf,
- ]
- ret = run_command(command)
- if os.path.exists(tf) and get_file_type(tf) == "image":
- with open(tf, "rb") as f:
- myfile = File(f)
- encoding.status = "success"
- encoding.media_file.save(content=myfile, name=tf)
- rm_file(tf)
- return True
- else:
- return False
- if chunk:
- original_media_path = chunk_file_path
- else:
- original_media_path = media.media_file.path
- # if not media.duration:
- # encoding.status = "fail"
- # encoding.save(update_fields=["status"])
- # return False
- with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
- tf = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
- tfpass = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
- ffmpeg_commands = produce_ffmpeg_commands(
- original_media_path,
- media.media_info,
- resolution=profile.resolution,
- codec=profile.codec,
- output_filename=tf,
- pass_file=tfpass,
- chunk=chunk,
- )
- if not ffmpeg_commands:
- encoding.status = "fail"
- encoding.save(update_fields=["status"])
- return False
- encoding.temp_file = tf
- encoding.commands = str(ffmpeg_commands)
- encoding.save(update_fields=["temp_file", "commands", "task_id"])
- # binding these, so they are available on on_failure
- self.encoding = encoding
- self.media = media
- # can be one-pass or two-pass
- for ffmpeg_command in ffmpeg_commands:
- ffmpeg_command = [str(s) for s in ffmpeg_command]
- encoding_backend = FFmpegBackend()
- try:
- encoding_command = encoding_backend.encode(ffmpeg_command)
- duration, n_times = 0, 0
- output = ""
- while encoding_command:
- try:
- # TODO: understand an eternal loop
- # eg h265 with mv4 file issue, and stop with error
- output = next(encoding_command)
- duration = calculate_seconds(output)
- if duration:
- percent = duration * 100 / media.duration
- if n_times % 60 == 0:
- encoding.progress = percent
- try:
- encoding.save(update_fields=["progress", "update_date"])
- logger.info("Saved {0}".format(round(percent, 2)))
- except BaseException:
- pass
- n_times += 1
- except StopIteration:
- break
- except VideoEncodingError:
- # ffmpeg error, or ffmpeg was killed
- raise
- except Exception as e:
- try:
- # output is empty, fail message is on the exception
- output = e.message
- except AttributeError:
- output = ""
- if isinstance(e, SoftTimeLimitExceeded):
- kill_ffmpeg_process(encoding.temp_file)
- encoding.logs = output
- encoding.status = "fail"
- encoding.save(update_fields=["status", "logs"])
- raise_exception = True
- # if this is an ffmpeg's valid error
- # no need for the task to be re-run
- # otherwise rerun task...
- for error_msg in ERRORS_LIST:
- if error_msg.lower() in output.lower():
- raise_exception = False
- if raise_exception:
- raise self.retry(exc=e, countdown=5, max_retries=1)
- encoding.logs = output
- encoding.progress = 100
- success = False
- encoding.status = "fail"
- if os.path.exists(tf) and os.path.getsize(tf) != 0:
- ret = media_file_info(tf)
- if ret.get("is_video") or ret.get("is_audio"):
- encoding.status = "success"
- success = True
- with open(tf, "rb") as f:
- myfile = File(f)
- output_name = "{0}.{1}".format(get_file_name(original_media_path), profile.extension)
- encoding.media_file.save(content=myfile, name=output_name)
- encoding.total_run_time = (encoding.update_date - encoding.add_date).seconds
- try:
- encoding.save(update_fields=["status", "logs", "progress", "total_run_time"])
- # this will raise a django.db.utils.DatabaseError error when task is revoked,
- # since we delete the encoding at that stage
- except BaseException:
- pass
- return success
- @task(name="produce_sprite_from_video", queue="long_tasks")
- def produce_sprite_from_video(friendly_token):
- """Produces a sprites file for a video, uses ffmpeg"""
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- logger.info("failed to get media with friendly_token %s" % friendly_token)
- return False
- with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as tmpdirname:
- try:
- tmpdir_image_files = tmpdirname + "/img%03d.jpg"
- output_name = tmpdirname + "/sprites.jpg"
- cmd = "{0} -i {1} -f image2 -vf 'fps=1/10, scale=160:90' {2}&&files=$(ls {3}/img*.jpg | sort -t '-' -n -k 2 | tr '\n' ' ')&&convert $files -append {4}".format(
- settings.FFMPEG_COMMAND,
- media.media_file.path,
- tmpdir_image_files,
- tmpdirname,
- output_name,
- )
- subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- if os.path.exists(output_name) and get_file_type(output_name) == "image":
- with open(output_name, "rb") as f:
- myfile = File(f)
- media.sprites.save(
- content=myfile,
- name=get_file_name(media.media_file.path) + "sprites.jpg",
- )
- except BaseException:
- pass
- return True
- @task(name="create_hls", queue="long_tasks")
- def create_hls(friendly_token):
- """Creates HLS file for media, uses Bento4 mp4hls command"""
- if not hasattr(settings, "MP4HLS_COMMAND"):
- logger.info("Bento4 mp4hls command is missing from configuration")
- return False
- if not os.path.exists(settings.MP4HLS_COMMAND):
- logger.info("Bento4 mp4hls command is missing")
- return False
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- logger.info("failed to get media with friendly_token %s" % friendly_token)
- return False
- p = media.uid.hex
- output_dir = os.path.join(settings.HLS_DIR, p)
- encodings = media.encodings.filter(profile__extension="mp4", status="success", chunk=False, profile__codec="h264")
- if encodings:
- existing_output_dir = None
- if os.path.exists(output_dir):
- existing_output_dir = output_dir
- output_dir = os.path.join(settings.HLS_DIR, p + produce_friendly_token())
- files = " ".join([f.media_file.path for f in encodings if f.media_file])
- cmd = "{0} --segment-duration=4 --output-dir={1} {2}".format(settings.MP4HLS_COMMAND, output_dir, files)
- subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- if existing_output_dir:
- # override content with -T !
- cmd = "cp -rT {0} {1}".format(output_dir, existing_output_dir)
- subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- shutil.rmtree(output_dir)
- output_dir = existing_output_dir
- pp = os.path.join(output_dir, "master.m3u8")
- if os.path.exists(pp):
- if media.hls_file != pp:
- media.hls_file = pp
- media.save(update_fields=["hls_file"])
- return True
- @task(name="check_running_states", queue="short_tasks")
- def check_running_states():
- # Experimental - unused
- """Check stale running encodings and delete/reencode media"""
- encodings = Encoding.objects.filter(status="running")
- logger.info("got {0} encodings that are in state running".format(encodings.count()))
- changed = 0
- for encoding in encodings:
- now = datetime.now(encoding.update_date.tzinfo)
- if (now - encoding.update_date).seconds > settings.RUNNING_STATE_STALE:
- media = encoding.media
- profile = encoding.profile
- task_id = encoding.task_id
- # terminate task
- if task_id:
- revoke(task_id, terminate=True)
- encoding.delete()
- media.encode(profiles=[profile])
- # TODO: allign with new code + chunksize...
- changed += 1
- if changed:
- logger.info("changed from running to pending on {0} items".format(changed))
- return True
- @task(name="check_media_states", queue="short_tasks")
- def check_media_states():
- # Experimental - unused
- # check encoding status of not success media
- media = Media.objects.filter(Q(encoding_status="running") | Q(encoding_status="fail") | Q(encoding_status="pending"))
- logger.info("got {0} media that are not in state success".format(media.count()))
- changed = 0
- for m in media:
- m.set_encoding_status()
- m.save(update_fields=["encoding_status"])
- changed += 1
- if changed:
- logger.info("changed encoding status to {0} media items".format(changed))
- return True
- @task(name="check_pending_states", queue="short_tasks")
- def check_pending_states():
- # Experimental - unused
- # check encoding profiles that are on state pending and not on a queue
- encodings = Encoding.objects.filter(status="pending")
- if not encodings:
- return True
- changed = 0
- tasks = list_tasks()
- task_ids = tasks["task_ids"]
- media_profile_pairs = tasks["media_profile_pairs"]
- for encoding in encodings:
- if encoding.task_id and encoding.task_id in task_ids:
- # encoding is in one of the active/reserved/scheduled tasks list
- continue
- elif (
- encoding.media.friendly_token,
- encoding.profile.id,
- ) in media_profile_pairs:
- continue
- # encoding is in one of the reserved/scheduled tasks list.
- # has no task_id but will be run, so need to re-enter the queue
- else:
- media = encoding.media
- profile = encoding.profile
- encoding.delete()
- media.encode(profiles=[profile], force=False)
- changed += 1
- if changed:
- logger.info("set to the encode queue {0} encodings that were on pending state".format(changed))
- return True
- @task(name="check_missing_profiles", queue="short_tasks")
- def check_missing_profiles():
- # Experimental - unused
- # check if video files have missing profiles. If so, add them
- media = Media.objects.filter(media_type="video")
- profiles = list(EncodeProfile.objects.all())
- changed = 0
- for m in media:
- existing_profiles = [p.profile for p in m.encodings.all()]
- missing_profiles = [p for p in profiles if p not in existing_profiles]
- if missing_profiles:
- m.encode(profiles=missing_profiles, force=False)
- # since we call with force=False
- # encode_media won't delete existing profiles
- # if they appear on the meanwhile (eg on a big queue)
- changed += 1
- if changed:
- logger.info("set to the encode queue {0} profiles".format(changed))
- return True
- @task(name="clear_sessions", queue="short_tasks")
- def clear_sessions():
- """Clear expired sessions"""
- try:
- from importlib import import_module
- from django.conf import settings
- engine = import_module(settings.SESSION_ENGINE)
- engine.SessionStore.clear_expired()
- except BaseException:
- return False
- return True
- @task(name="save_user_action", queue="short_tasks")
- def save_user_action(user_or_session, friendly_token=None, action="watch", extra_info=None):
- """Short task that saves a user action"""
- if action not in VALID_USER_ACTIONS:
- return False
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- return False
- user = user_or_session.get("user_id")
- session_key = user_or_session.get("user_session")
- remote_ip = user_or_session.get("remote_ip_addr")
- if user:
- try:
- user = User.objects.get(id=user)
- except BaseException:
- return False
- if not (user or session_key):
- return False
- if action in ["like", "dislike", "watch", "report"]:
- if not pre_save_action(
- media=media,
- user=user,
- session_key=session_key,
- action=action,
- remote_ip=remote_ip,
- ):
- return False
- if action == "watch":
- if user:
- MediaAction.objects.filter(user=user, media=media, action="watch").delete()
- else:
- MediaAction.objects.filter(session_key=session_key, media=media, action="watch").delete()
- if action == "rate":
- try:
- score = extra_info.get("score")
- rating_category = extra_info.get("category_id")
- except BaseException:
- # TODO: better error handling?
- return False
- try:
- rating = Rating.objects.filter(user=user, media=media, rating_category_id=rating_category).first()
- if rating:
- rating.score = score
- rating.save(update_fields=["score"])
- else:
- rating = Rating.objects.create(
- user=user,
- media=media,
- rating_category_id=rating_category,
- score=score,
- )
- except Exception:
- # TODO: more specific handling, for errors in score, or
- # rating_category?
- return False
- ma = MediaAction(
- user=user,
- session_key=session_key,
- media=media,
- action=action,
- extra_info=extra_info,
- remote_ip=remote_ip,
- )
- ma.save()
- if action == "watch":
- media.views += 1
- media.save(update_fields=["views"])
- elif action == "report":
- media.reported_times += 1
- if media.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
- media.state = "private"
- media.save(update_fields=["reported_times", "state"])
- notify_users(
- friendly_token=media.friendly_token,
- action="media_reported",
- extra=extra_info,
- )
- elif action == "like":
- media.likes += 1
- media.save(update_fields=["likes"])
- elif action == "dislike":
- media.dislikes += 1
- media.save(update_fields=["dislikes"])
- return True
- @task(name="save_voice_action", queue="short_tasks")
- def save_voice_action(user_or_session, friendly_token=None, action="watch", extra_info=None, uid=None):
- """Short task that saves a voice action"""
- if action not in VALID_VOICE_ACTIONS:
- return False
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- return False
- try:
- voice = Voice.objects.get(uid=uid)
- except BaseException:
- return False
- user = user_or_session.get("user_id")
- session_key = user_or_session.get("user_session")
- remote_ip = user_or_session.get("remote_ip_addr")
- if user:
- try:
- user = User.objects.get(id=user)
- except BaseException:
- return False
- if not (user or session_key):
- return False
- # Check if user has alread done like/likeundo once. Avoid spam. And more.
- if action in ["like", "likeundo", "watch", "report"]:
- if not pre_save_action__voice(
- media=media,
- user=user,
- session_key=session_key,
- action=action,
- remote_ip=remote_ip,
- voice=voice
- ):
- return False
- # TODO: Exactly why the previous `watch` actions are deleted?
- if action == "watch":
- if user:
- VoiceAction.objects.filter(user=user, voice=voice, action="watch").delete()
- else:
- VoiceAction.objects.filter(session_key=session_key, voice=voice, action="watch").delete()
- # Check whether `filter` result is empty or not, by `.first()`
- # https://stackoverflow.com/a/50453580/3405291
- # In the case of `like` action, delete previous `likeundo` actions.
- if action == "like":
- if user:
- va = VoiceAction.objects.filter(user=user, voice=voice, action="likeundo").first()
- if va:
- VoiceAction.objects.filter(user=user, voice=voice, action="likeundo").delete()
- else:
- va = VoiceAction.objects.filter(session_key=session_key, voice=voice, action="likeundo").first()
- if va:
- VoiceAction.objects.filter(session_key=session_key, voice=voice, action="likeundo").delete()
- # In the case of `likeundo` action, delete previous `like` actions.
- if action == "likeundo":
- if user:
- va = VoiceAction.objects.filter(user=user, voice=voice, action="like").first()
- if va:
- VoiceAction.objects.filter(user=user, voice=voice, action="like").delete()
- else:
- va = VoiceAction.objects.filter(session_key=session_key, voice=voice, action="like").first()
- if va:
- VoiceAction.objects.filter(session_key=session_key, voice=voice, action="like").delete()
- # There is no `rate` action for voice. So, it's skipped.
- va = VoiceAction(
- user=user,
- session_key=session_key,
- media=media,
- action=action,
- extra_info=extra_info,
- remote_ip=remote_ip,
- voice=voice,
- )
- va.save()
- if action == "watch":
- voice.views += 1
- voice.save(update_fields=["views"])
- elif action == "report":
- voice.reported_times += 1
- if voice.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
- # Delete voice?
- voice.delete()
- voice.save(update_fields=["reported_times"])
- # TODO: notify_users
- # There is a code for notification. That might be helpful.
- elif action == "like":
- voice.likes += 1
- voice.save(update_fields=["likes"])
- elif action == "likeundo":
- voice.likes -= 1
- voice.save(update_fields=["likes"])
- return True
- ### Let's make this method a regular method, not a celery_short task.
- ### Since we have to return the path of the resulted video as HTTP response.
- ### Combining video with voices by FFMPEG should be quite fast. Just copy audio channels.
- ###
- #@task(name="video_with_voices", queue="short_tasks")
- def video_with_voices(user_or_session, friendly_token=None, voicesUid=None):
- #"""Short task that combines a video with some voices"""
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- return False
- # Only video is acceptable.
- if media.media_type != "video":
- return False
- voices = []
- for uid in voicesUid:
- # Double-check voice existence.
- try:
- voice = Voice.objects.get(uid=uid)
- except BaseException:
- return False
- voices.append(voice)
- # To download a video combined with some voices,
- # we require a valid user or session.
- # We have to check remote_ip to avoid spam.
- user = user_or_session.get("user_id")
- session_key = user_or_session.get("user_session")
- remote_ip = user_or_session.get("remote_ip_addr")
- if user:
- try:
- user = User.objects.get(id=user)
- except BaseException:
- return False
- if not (user or session_key):
- return False
- # Avoid spam and more.
- if not pre_save_action__videowithvoices(
- media=media,
- user=user,
- session_key=session_key,
- action="getvideowithvoices",
- remote_ip=remote_ip,
- ):
- return False
- va = VoiceAction(
- user=user,
- session_key=session_key,
- media=media,
- action="getvideowithvoices",
- remote_ip=remote_ip,
- # Voice is not really needed for this action, but pass voice just to avoid this database error:
- # ERROR: null value in column "voice_id" of relation "actions_voiceaction" violates not-null constraint
- voice=voice,
- )
- va.save()
- # Combine video with the voices.
- cwd = os.path.dirname(os.path.realpath(media.media_file.path))
- video_name = media.media_file.path.split("/")[-1]
- random_prefix = produce_friendly_token()
- result_file_name = "{0}_{1}".format(random_prefix, video_name)
- result_file_name = "combineVideoWithSomeVoices_{0}".format(result_file_name)
- result_file_name += ".mkv"
- # To add a new audio track into an existing video with audio, use:
- # https://stackoverflow.com/a/70001304/3405291
- cmd = [
- settings.FFMPEG_COMMAND,
- "-i",
- media.media_file.path,
- ]
- for voice in voices:
- cmd.append("-i")
- cmd.append(voice.voice_file.path)
- # To combine voices.
- cmd.append("-filter_complex")
- voiceCounter = 1
- arg = ""
- for voice in voices:
- arg += "[{0}:a] ".format(voiceCounter)
- voiceCounter += 1
- arg += "amix=inputs={0}:duration=longest:dropout_transition=0".format(len(voices))
- cmd.append(arg)
- cmd.append("-c:v")
- cmd.append("copy")
- # The -an option disables audio recording for the output file,
- # which means that the original audio track from the input video
- # will not be included in the output file.
- # The video stream is still copied over using -c:v copy,
- # and the combined audio track is created using the amix filter as before.
- # Note that if the input video file has multiple audio tracks,
- # the -an option will remove all audio tracks from the output file.
- cmd.append("-an")
- cmd.append(result_file_name)
- ret = run_command(cmd, cwd=cwd)
- # You can double-check result file by this command:
- # /usr/local/bin/docker-compose exec web ls -lhrtc /tmp/
- # You can copy result file from docker container to host by a command like:
- # docker cp <containerId>:/tmp/<result_file_name> /home/m3/Downloads/
- watermarked_file_name = "watermarked_{0}".format(result_file_name)
- cmd = [
- settings.FFMPEG_COMMAND,
- "-i",
- result_file_name,
- "-i",
- settings.STATIC_ROOT + "/images/logo_light.png",
- "-filter_complex",
- "[1][0]scale2ref=oh*mdar:ih*0.2[logo][video];[video][logo]overlay=(main_w-overlay_w):(main_h-overlay_h)",
- watermarked_file_name,
- ]
- ret = run_command(cmd, cwd=cwd)
- result_file_path = os.path.join(cwd, watermarked_file_name)
- result_file_url = url_from_path(result_file_path)
- return {"result_file_url": result_file_url, "ffmpeg_return": ret}
- @task(name="get_list_of_popular_media", queue="long_tasks")
- def get_list_of_popular_media():
- """Experimental task for preparing media listing
- for index page / recommended section
- calculate and return the top 50 popular media, based on two rules
- X = the top 25 videos that have the most views during the last week
- Y = the most recent 25 videos that have been liked over the last 6 months
- """
- valid_media_x = {}
- valid_media_y = {}
- basic_query = Q(listable=True)
- media_x = Media.objects.filter(basic_query).values("friendly_token")
- period_x = datetime.now() - timedelta(days=7)
- period_y = datetime.now() - timedelta(days=30 * 6)
- for media in media_x:
- ft = media["friendly_token"]
- num = MediaAction.objects.filter(action_date__gte=period_x, action="watch", media__friendly_token=ft).count()
- if num:
- valid_media_x[ft] = num
- num = MediaAction.objects.filter(action_date__gte=period_y, action="like", media__friendly_token=ft).count()
- if num:
- valid_media_y[ft] = num
- x = sorted(valid_media_x.items(), key=lambda kv: kv[1], reverse=True)[:25]
- y = sorted(valid_media_y.items(), key=lambda kv: kv[1], reverse=True)[:25]
- media_ids = [a[0] for a in x]
- media_ids.extend([a[0] for a in y])
- media_ids = list(set(media_ids))
- cache.set("popular_media_ids", media_ids, 60 * 60 * 12)
- logger.info("saved popular media ids")
- return True
- @task(name="update_listings_thumbnails", queue="long_tasks")
- def update_listings_thumbnails():
- """Populate listings_thumbnail field for models"""
- # Categories
- used_media = []
- saved = 0
- qs = Category.objects.filter().order_by("-media_count")
- for object in qs:
- media = Media.objects.exclude(friendly_token__in=used_media).filter(category=object, state="public", is_reviewed=True).order_by("-views").first()
- if media:
- object.listings_thumbnail = media.thumbnail_url
- object.save(update_fields=["listings_thumbnail"])
- used_media.append(media.friendly_token)
- saved += 1
- logger.info("updated {} categories".format(saved))
- # Tags
- used_media = []
- saved = 0
- qs = Tag.objects.filter().order_by("-media_count")
- for object in qs:
- media = Media.objects.exclude(friendly_token__in=used_media).filter(tags=object, state="public", is_reviewed=True).order_by("-views").first()
- if media:
- object.listings_thumbnail = media.thumbnail_url
- object.save(update_fields=["listings_thumbnail"])
- used_media.append(media.friendly_token)
- saved += 1
- logger.info("updated {} tags".format(saved))
- return True
- @task_revoked.connect
- def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
- # For encode_media tasks that are revoked,
- # ffmpeg command won't be stopped, since
- # it got started by a subprocess.
- # Need to stop that process
- # Also, removing the Encoding object,
- # since the task that would prepare it was killed
- # Maybe add a killed state for Encoding objects
- try:
- uid = kwargs["request"].task_id
- if uid:
- encoding = Encoding.objects.get(task_id=uid)
- encoding.delete()
- logger.info("deleted the Encoding object")
- if encoding.temp_file:
- kill_ffmpeg_process(encoding.temp_file)
- except BaseException:
- pass
- return True
- def kill_ffmpeg_process(filepath):
- # this is not ideal, ffmpeg pid could be linked to the Encoding object
- cmd = "ps aux|grep 'ffmpeg'|grep %s|grep -v grep |awk '{print $2}'" % filepath
- result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- pid = result.stdout.decode("utf-8").strip()
- if pid:
- cmd = "kill -9 %s" % pid
- result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- return result
- @task(name="remove_media_file", base=Task, queue="long_tasks")
- def remove_media_file(media_file=None):
- rm_file(media_file)
- return True
- # TODO LIST
- # 1 chunks are deleted from original server when file is fully encoded.
- # however need to enter this logic in cases of fail as well
- # 2 script to delete chunks in fail status
- # (and check for their encdings, and delete them as well, along with
- # all chunks)
- # 3 beat task, remove chunks
|