|
- import json
- import os
- import re
- import shutil
- import subprocess
- import tempfile
- from datetime import datetime, timedelta
- from celery import Task
- from celery.decorators import task
- from celery.exceptions import SoftTimeLimitExceeded
- from celery.signals import task_revoked
- from celery.task.control import revoke
- from celery.utils.log import get_task_logger
- from django.conf import settings
- from django.core.cache import cache
- from django.core.files import File
- from django.db.models import Q
- from actions.models import USER_MEDIA_ACTIONS, MediaAction
- from users.models import User
- from .backends import FFmpegBackend
- from .exceptions import VideoEncodingError
- from .helpers import (
- calculate_seconds,
- create_temp_file,
- get_file_name,
- get_file_type,
- media_file_info,
- produce_ffmpeg_commands,
- produce_friendly_token,
- rm_file,
- run_command,
- )
- from .methods import list_tasks, notify_users, pre_save_action
- from .models import Category, EncodeProfile, Encoding, Media, Rating, Tag
- logger = get_task_logger(__name__)
- VALID_USER_ACTIONS = [action for action, name in USER_MEDIA_ACTIONS]
- ERRORS_LIST = [
- "Output file is empty, nothing was encoded",
- "Invalid data found when processing input",
- "Unable to find a suitable output format for",
- ]
- @task(name="chunkize_media", bind=True, queue="short_tasks", soft_time_limit=60 * 30)
- def chunkize_media(self, friendly_token, profiles, force=True):
- """Break media in chunks and start encoding tasks"""
- profiles = [EncodeProfile.objects.get(id=profile) for profile in profiles]
- media = Media.objects.get(friendly_token=friendly_token)
- cwd = os.path.dirname(os.path.realpath(media.media_file.path))
- file_name = media.media_file.path.split("/")[-1]
- random_prefix = produce_friendly_token()
- file_format = "{0}_{1}".format(random_prefix, file_name)
- chunks_file_name = "%02d_{0}".format(file_format)
- chunks_file_name += ".mkv"
- cmd = [
- settings.FFMPEG_COMMAND,
- "-y",
- "-i",
- media.media_file.path,
- "-c",
- "copy",
- "-f",
- "segment",
- "-segment_time",
- str(settings.VIDEO_CHUNKS_DURATION),
- chunks_file_name,
- ]
- chunks = []
- ret = run_command(cmd, cwd=cwd)
- if "out" in ret.keys():
- for line in ret.get("error").split("\n"):
- ch = re.findall(r"Opening \'([\W\w]+)\' for writing", line)
- if ch:
- chunks.append(ch[0])
- if not chunks:
- # command completely failed to segment file.putting to normal encode
- logger.info("Failed to break file {0} in chunks." " Putting to normal encode queue".format(friendly_token))
- for profile in profiles:
- if media.video_height and media.video_height < profile.resolution:
- if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
- continue
- encoding = Encoding(media=media, profile=profile)
- encoding.save()
- enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
- encode_media.delay(friendly_token, profile.id, encoding.id, enc_url, force=force)
- return False
- chunks = [os.path.join(cwd, ch) for ch in chunks]
- to_profiles = []
- chunks_dict = {}
- # calculate once md5sums
- for chunk in chunks:
- cmd = ["md5sum", chunk]
- stdout = run_command(cmd).get("out")
- md5sum = stdout.strip().split()[0]
- chunks_dict[chunk] = md5sum
- for profile in profiles:
- if media.video_height and media.video_height < profile.resolution:
- if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
- continue
- to_profiles.append(profile)
- for chunk in chunks:
- encoding = Encoding(
- media=media,
- profile=profile,
- chunk_file_path=chunk,
- chunk=True,
- chunks_info=json.dumps(chunks_dict),
- md5sum=chunks_dict[chunk],
- )
- encoding.save()
- enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
- if profile.resolution in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
- priority = 0
- else:
- priority = 9
- encode_media.apply_async(
- args=[friendly_token, profile.id, encoding.id, enc_url],
- kwargs={"force": force, "chunk": True, "chunk_file_path": chunk},
- priority=priority,
- )
- logger.info("got {0} chunks and will encode to {1} profiles".format(len(chunks), to_profiles))
- return True
- class EncodingTask(Task):
- def on_failure(self, exc, task_id, args, kwargs, einfo):
- # mainly used to run some post failure steps
- # we get here if a task is revoked
- try:
- if hasattr(self, "encoding"):
- self.encoding.status = "fail"
- self.encoding.save(update_fields=["status"])
- kill_ffmpeg_process(self.encoding.temp_file)
- if hasattr(self.encoding, "media"):
- self.encoding.media.post_encode_actions()
- except BaseException:
- pass
- return False
- @task(
- name="encode_media",
- base=EncodingTask,
- bind=True,
- queue="long_tasks",
- soft_time_limit=settings.CELERY_SOFT_TIME_LIMIT,
- )
- def encode_media(
- self,
- friendly_token,
- profile_id,
- encoding_id,
- encoding_url,
- force=True,
- chunk=False,
- chunk_file_path="",
- ):
- """Encode a media to given profile, using ffmpeg, storing progress"""
- logger.info("Encode Media started, friendly token {0}, profile id {1}, force {2}".format(friendly_token, profile_id, force))
- if self.request.id:
- task_id = self.request.id
- else:
- task_id = None
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- profile = EncodeProfile.objects.get(id=profile_id)
- except BaseException:
- Encoding.objects.filter(id=encoding_id).delete()
- return False
- # break logic with chunk True/False
- if chunk:
- # TODO: in case a video is chunkized and this enters here many times
- # it will always run since chunk_file_path is always different
- # thus find a better way for this check
- if Encoding.objects.filter(media=media, profile=profile, chunk_file_path=chunk_file_path).count() > 1 and force is False:
- Encoding.objects.filter(id=encoding_id).delete()
- return False
- else:
- try:
- encoding = Encoding.objects.get(id=encoding_id)
- encoding.status = "running"
- Encoding.objects.filter(
- media=media,
- profile=profile,
- chunk=True,
- chunk_file_path=chunk_file_path,
- ).exclude(id=encoding_id).delete()
- except BaseException:
- encoding = Encoding(
- media=media,
- profile=profile,
- status="running",
- chunk=True,
- chunk_file_path=chunk_file_path,
- )
- else:
- if Encoding.objects.filter(media=media, profile=profile).count() > 1 and force is False:
- Encoding.objects.filter(id=encoding_id).delete()
- return False
- else:
- try:
- encoding = Encoding.objects.get(id=encoding_id)
- encoding.status = "running"
- Encoding.objects.filter(media=media, profile=profile).exclude(id=encoding_id).delete()
- except BaseException:
- encoding = Encoding(media=media, profile=profile, status="running")
- if task_id:
- encoding.task_id = task_id
- encoding.worker = "localhost"
- encoding.retries = self.request.retries
- encoding.save()
- if profile.extension == "gif":
- tf = create_temp_file(suffix=".gif")
- # -ss 5 start from 5 second. -t 25 until 25 sec
- command = [
- settings.FFMPEG_COMMAND,
- "-y",
- "-ss",
- "3",
- "-i",
- media.media_file.path,
- "-hide_banner",
- "-vf",
- "scale=344:-1:flags=lanczos,fps=1",
- "-t",
- "25",
- "-f",
- "gif",
- tf,
- ]
- ret = run_command(command)
- if os.path.exists(tf) and get_file_type(tf) == "image":
- with open(tf, "rb") as f:
- myfile = File(f)
- encoding.status = "success"
- encoding.media_file.save(content=myfile, name=tf)
- rm_file(tf)
- return True
- else:
- return False
- if chunk:
- original_media_path = chunk_file_path
- else:
- original_media_path = media.media_file.path
- # if not media.duration:
- # encoding.status = "fail"
- # encoding.save(update_fields=["status"])
- # return False
- with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
- tf = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
- tfpass = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
- ffmpeg_commands = produce_ffmpeg_commands(
- original_media_path,
- media.media_info,
- resolution=profile.resolution,
- codec=profile.codec,
- output_filename=tf,
- pass_file=tfpass,
- chunk=chunk,
- )
- if not ffmpeg_commands:
- encoding.status = "fail"
- encoding.save(update_fields=["status"])
- return False
- encoding.temp_file = tf
- encoding.commands = str(ffmpeg_commands)
- encoding.save(update_fields=["temp_file", "commands", "task_id"])
- # binding these, so they are available on on_failure
- self.encoding = encoding
- self.media = media
- # can be one-pass or two-pass
- for ffmpeg_command in ffmpeg_commands:
- ffmpeg_command = [str(s) for s in ffmpeg_command]
- encoding_backend = FFmpegBackend()
- try:
- encoding_command = encoding_backend.encode(ffmpeg_command)
- duration, n_times = 0, 0
- output = ""
- while encoding_command:
- try:
- # TODO: understand an eternal loop
- # eg h265 with mv4 file issue, and stop with error
- output = next(encoding_command)
- duration = calculate_seconds(output)
- if duration:
- percent = duration * 100 / media.duration
- if n_times % 60 == 0:
- encoding.progress = percent
- try:
- encoding.save(update_fields=["progress", "update_date"])
- logger.info("Saved {0}".format(round(percent, 2)))
- except BaseException:
- pass
- n_times += 1
- except StopIteration:
- break
- except VideoEncodingError:
- # ffmpeg error, or ffmpeg was killed
- raise
- except Exception as e:
- try:
- # output is empty, fail message is on the exception
- output = e.message
- except AttributeError:
- output = ""
- if isinstance(e, SoftTimeLimitExceeded):
- kill_ffmpeg_process(encoding.temp_file)
- encoding.logs = output
- encoding.status = "fail"
- encoding.save(update_fields=["status", "logs"])
- raise_exception = True
- # if this is an ffmpeg's valid error
- # no need for the task to be re-run
- # otherwise rerun task...
- for error_msg in ERRORS_LIST:
- if error_msg.lower() in output.lower():
- raise_exception = False
- if raise_exception:
- raise self.retry(exc=e, countdown=5, max_retries=1)
- encoding.logs = output
- encoding.progress = 100
- success = False
- encoding.status = "fail"
- if os.path.exists(tf) and os.path.getsize(tf) != 0:
- ret = media_file_info(tf)
- if ret.get("is_video") or ret.get("is_audio"):
- encoding.status = "success"
- success = True
- with open(tf, "rb") as f:
- myfile = File(f)
- output_name = "{0}.{1}".format(get_file_name(original_media_path), profile.extension)
- encoding.media_file.save(content=myfile, name=output_name)
- encoding.total_run_time = (encoding.update_date - encoding.add_date).seconds
- try:
- encoding.save(update_fields=["status", "logs", "progress", "total_run_time"])
- # this will raise a django.db.utils.DatabaseError error when task is revoked,
- # since we delete the encoding at that stage
- except BaseException:
- pass
- return success
- @task(name="produce_sprite_from_video", queue="long_tasks")
- def produce_sprite_from_video(friendly_token):
- """Produces a sprites file for a video, uses ffmpeg"""
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- logger.info("failed to get media with friendly_token %s" % friendly_token)
- return False
- with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as tmpdirname:
- try:
- tmpdir_image_files = tmpdirname + "/img%03d.jpg"
- output_name = tmpdirname + "/sprites.jpg"
- cmd = "{0} -i {1} -f image2 -vf 'fps=1/10, scale=160:90' {2}&&files=$(ls {3}/img*.jpg | sort -t '-' -n -k 2 | tr '\n' ' ')&&convert $files -append {4}".format(
- settings.FFMPEG_COMMAND,
- media.media_file.path,
- tmpdir_image_files,
- tmpdirname,
- output_name,
- )
- subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- if os.path.exists(output_name) and get_file_type(output_name) == "image":
- with open(output_name, "rb") as f:
- myfile = File(f)
- media.sprites.save(
- content=myfile,
- name=get_file_name(media.media_file.path) + "sprites.jpg",
- )
- except BaseException:
- pass
- return True
- @task(name="create_hls", queue="long_tasks")
- def create_hls(friendly_token):
- """Creates HLS file for media, uses Bento4 mp4hls command"""
- if not hasattr(settings, "MP4HLS_COMMAND"):
- logger.info("Bento4 mp4hls command is missing from configuration")
- return False
- if not os.path.exists(settings.MP4HLS_COMMAND):
- logger.info("Bento4 mp4hls command is missing")
- return False
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- logger.info("failed to get media with friendly_token %s" % friendly_token)
- return False
- p = media.uid.hex
- output_dir = os.path.join(settings.HLS_DIR, p)
- encodings = media.encodings.filter(profile__extension="mp4", status="success", chunk=False, profile__codec="h264")
- if encodings:
- existing_output_dir = None
- if os.path.exists(output_dir):
- existing_output_dir = output_dir
- output_dir = os.path.join(settings.HLS_DIR, p + produce_friendly_token())
- files = " ".join([f.media_file.path for f in encodings if f.media_file])
- cmd = "{0} --segment-duration=4 --output-dir={1} {2}".format(settings.MP4HLS_COMMAND, output_dir, files)
- subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- if existing_output_dir:
- # override content with -T !
- cmd = "cp -rT {0} {1}".format(output_dir, existing_output_dir)
- subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- shutil.rmtree(output_dir)
- output_dir = existing_output_dir
- pp = os.path.join(output_dir, "master.m3u8")
- if os.path.exists(pp):
- if media.hls_file != pp:
- media.hls_file = pp
- media.save(update_fields=["hls_file"])
- return True
- @task(name="check_running_states", queue="short_tasks")
- def check_running_states():
- # Experimental - unused
- """Check stale running encodings and delete/reencode media"""
- encodings = Encoding.objects.filter(status="running")
- logger.info("got {0} encodings that are in state running".format(encodings.count()))
- changed = 0
- for encoding in encodings:
- now = datetime.now(encoding.update_date.tzinfo)
- if (now - encoding.update_date).seconds > settings.RUNNING_STATE_STALE:
- media = encoding.media
- profile = encoding.profile
- task_id = encoding.task_id
- # terminate task
- if task_id:
- revoke(task_id, terminate=True)
- encoding.delete()
- media.encode(profiles=[profile])
- # TODO: allign with new code + chunksize...
- changed += 1
- if changed:
- logger.info("changed from running to pending on {0} items".format(changed))
- return True
- @task(name="check_media_states", queue="short_tasks")
- def check_media_states():
- # Experimental - unused
- # check encoding status of not success media
- media = Media.objects.filter(Q(encoding_status="running") | Q(encoding_status="fail") | Q(encoding_status="pending"))
- logger.info("got {0} media that are not in state success".format(media.count()))
- changed = 0
- for m in media:
- m.set_encoding_status()
- m.save(update_fields=["encoding_status"])
- changed += 1
- if changed:
- logger.info("changed encoding status to {0} media items".format(changed))
- return True
- @task(name="check_pending_states", queue="short_tasks")
- def check_pending_states():
- # Experimental - unused
- # check encoding profiles that are on state pending and not on a queue
- encodings = Encoding.objects.filter(status="pending")
- if not encodings:
- return True
- changed = 0
- tasks = list_tasks()
- task_ids = tasks["task_ids"]
- media_profile_pairs = tasks["media_profile_pairs"]
- for encoding in encodings:
- if encoding.task_id and encoding.task_id in task_ids:
- # encoding is in one of the active/reserved/scheduled tasks list
- continue
- elif (
- encoding.media.friendly_token,
- encoding.profile.id,
- ) in media_profile_pairs:
- continue
- # encoding is in one of the reserved/scheduled tasks list.
- # has no task_id but will be run, so need to re-enter the queue
- else:
- media = encoding.media
- profile = encoding.profile
- encoding.delete()
- media.encode(profiles=[profile], force=False)
- changed += 1
- if changed:
- logger.info("set to the encode queue {0} encodings that were on pending state".format(changed))
- return True
- @task(name="check_missing_profiles", queue="short_tasks")
- def check_missing_profiles():
- # Experimental - unused
- # check if video files have missing profiles. If so, add them
- media = Media.objects.filter(media_type="video")
- profiles = list(EncodeProfile.objects.all())
- changed = 0
- for m in media:
- existing_profiles = [p.profile for p in m.encodings.all()]
- missing_profiles = [p for p in profiles if p not in existing_profiles]
- if missing_profiles:
- m.encode(profiles=missing_profiles, force=False)
- # since we call with force=False
- # encode_media won't delete existing profiles
- # if they appear on the meanwhile (eg on a big queue)
- changed += 1
- if changed:
- logger.info("set to the encode queue {0} profiles".format(changed))
- return True
- @task(name="clear_sessions", queue="short_tasks")
- def clear_sessions():
- """Clear expired sessions"""
- try:
- from importlib import import_module
- from django.conf import settings
- engine = import_module(settings.SESSION_ENGINE)
- engine.SessionStore.clear_expired()
- except BaseException:
- return False
- return True
- @task(name="save_user_action", queue="short_tasks")
- def save_user_action(user_or_session, friendly_token=None, action="watch", extra_info=None):
- """Short task that saves a user action"""
- if action not in VALID_USER_ACTIONS:
- return False
- try:
- media = Media.objects.get(friendly_token=friendly_token)
- except BaseException:
- return False
- user = user_or_session.get("user_id")
- session_key = user_or_session.get("user_session")
- remote_ip = user_or_session.get("remote_ip_addr")
- if user:
- try:
- user = User.objects.get(id=user)
- except BaseException:
- return False
- if not (user or session_key):
- return False
- if action in ["like", "dislike", "report"]:
- if not pre_save_action(
- media=media,
- user=user,
- session_key=session_key,
- action=action,
- remote_ip=remote_ip,
- ):
- return False
- if action == "watch":
- if user:
- MediaAction.objects.filter(user=user, media=media, action="watch").delete()
- else:
- MediaAction.objects.filter(session_key=session_key, media=media, action="watch").delete()
- if action == "rate":
- try:
- score = extra_info.get("score")
- rating_category = extra_info.get("category_id")
- except BaseException:
- # TODO: better error handling?
- return False
- try:
- rating = Rating.objects.filter(user=user, media=media, rating_category_id=rating_category).first()
- if rating:
- rating.score = score
- rating.save(update_fields=["score"])
- else:
- rating = Rating.objects.create(
- user=user,
- media=media,
- rating_category_id=rating_category,
- score=score,
- )
- except Exception:
- # TODO: more specific handling, for errors in score, or
- # rating_category?
- return False
- ma = MediaAction(
- user=user,
- session_key=session_key,
- media=media,
- action=action,
- extra_info=extra_info,
- remote_ip=remote_ip,
- )
- ma.save()
- if action == "watch":
- media.views += 1
- media.save(update_fields=["views"])
- elif action == "report":
- media.reported_times += 1
- if media.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
- media.state = "private"
- media.save(update_fields=["reported_times", "state"])
- notify_users(
- friendly_token=media.friendly_token,
- action="media_reported",
- extra=extra_info,
- )
- elif action == "like":
- media.likes += 1
- media.save(update_fields=["likes"])
- elif action == "dislike":
- media.dislikes += 1
- media.save(update_fields=["dislikes"])
- return True
- @task(name="get_list_of_popular_media", queue="long_tasks")
- def get_list_of_popular_media():
- """Experimental task for preparing media listing
- for index page / recommended section
- calculate and return the top 50 popular media, based on two rules
- X = the top 25 videos that have the most views during the last week
- Y = the most recent 25 videos that have been liked over the last 6 months
- """
- valid_media_x = {}
- valid_media_y = {}
- basic_query = Q(listable=True)
- media_x = Media.objects.filter(basic_query).values("friendly_token")
- period_x = datetime.now() - timedelta(days=7)
- period_y = datetime.now() - timedelta(days=30 * 6)
- for media in media_x:
- ft = media["friendly_token"]
- num = MediaAction.objects.filter(action_date__gte=period_x, action="watch", media__friendly_token=ft).count()
- if num:
- valid_media_x[ft] = num
- num = MediaAction.objects.filter(action_date__gte=period_y, action="like", media__friendly_token=ft).count()
- if num:
- valid_media_y[ft] = num
- x = sorted(valid_media_x.items(), key=lambda kv: kv[1], reverse=True)[:25]
- y = sorted(valid_media_y.items(), key=lambda kv: kv[1], reverse=True)[:25]
- media_ids = [a[0] for a in x]
- media_ids.extend([a[0] for a in y])
- media_ids = list(set(media_ids))
- cache.set("popular_media_ids", media_ids, 60 * 60 * 12)
- logger.info("saved popular media ids")
- return True
- @task(name="update_listings_thumbnails", queue="long_tasks")
- def update_listings_thumbnails():
- """Populate listings_thumbnail field for models"""
- # Categories
- used_media = []
- saved = 0
- qs = Category.objects.filter().order_by("-media_count")
- for object in qs:
- media = Media.objects.exclude(friendly_token__in=used_media).filter(category=object, state="public", is_reviewed=True).order_by("-views").first()
- if media:
- object.listings_thumbnail = media.thumbnail_url
- object.save(update_fields=["listings_thumbnail"])
- used_media.append(media.friendly_token)
- saved += 1
- logger.info("updated {} categories".format(saved))
- # Tags
- used_media = []
- saved = 0
- qs = Tag.objects.filter().order_by("-media_count")
- for object in qs:
- media = Media.objects.exclude(friendly_token__in=used_media).filter(tags=object, state="public", is_reviewed=True).order_by("-views").first()
- if media:
- object.listings_thumbnail = media.thumbnail_url
- object.save(update_fields=["listings_thumbnail"])
- used_media.append(media.friendly_token)
- saved += 1
- logger.info("updated {} tags".format(saved))
- return True
- @task_revoked.connect
- def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
- # For encode_media tasks that are revoked,
- # ffmpeg command won't be stopped, since
- # it got started by a subprocess.
- # Need to stop that process
- # Also, removing the Encoding object,
- # since the task that would prepare it was killed
- # Maybe add a killed state for Encoding objects
- try:
- uid = kwargs["request"].task_id
- if uid:
- encoding = Encoding.objects.get(task_id=uid)
- encoding.delete()
- logger.info("deleted the Encoding object")
- if encoding.temp_file:
- kill_ffmpeg_process(encoding.temp_file)
- except BaseException:
- pass
- return True
- def kill_ffmpeg_process(filepath):
- # this is not ideal, ffmpeg pid could be linked to the Encoding object
- cmd = "ps aux|grep 'ffmpeg'|grep %s|grep -v grep |awk '{print $2}'" % filepath
- result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- pid = result.stdout.decode("utf-8").strip()
- if pid:
- cmd = "kill -9 %s" % pid
- result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
- return result
- @task(name="remove_media_file", base=Task, queue="long_tasks")
- def remove_media_file(media_file=None):
- rm_file(media_file)
- return True
- # TODO LIST
- # 1 chunks are deleted from original server when file is fully encoded.
- # however need to enter this logic in cases of fail as well
- # 2 script to delete chunks in fail status
- # (and check for their encdings, and delete them as well, along with
- # all chunks)
- # 3 beat task, remove chunks
|