import functools import uuid from collections.abc import Callable from datetime import UTC, datetime from pathlib import Path from typing import Annotated from db import Task as DBTask, User from pydantic import BaseModel, ConfigDict, Field from pymediainfo import MediaInfo from pyrogram.enums import MessageMediaType from pyrogram.types import ( Animation as TAnimation, Document as TDocument, Message, Video as TVideo, VideoNote as TVideoNote, ) from utils.videos import ( TEMP_VIDEOS_DIRECTORY, get_video_duration, is_real_video, take_screenshot, ) utc_now = functools.partial(datetime.now, UTC) class Task(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) user: User task: DBTask video_message: Message progress_message: Message uuid: Annotated[uuid.UUID, Field(default_factory=uuid.uuid4)] okru_tries: int = 0 next_check_time: datetime = Field(default_factory=utc_now) video_duration: int | None = None def get_file(self: "Task") -> TVideo | TAnimation | TDocument | TVideoNote: match self.video_message.media: case MessageMediaType.VIDEO: return self.video_message.video case MessageMediaType.ANIMATION: return self.video_message.animation case MessageMediaType.DOCUMENT: return self.video_message.document case MessageMediaType.VIDEO_NOTE: return self.video_message.video_note case _: raise NotValidVideoError @property def temp_file_name(self: "Task") -> str: return f"{self.get_file().file_unique_id}"[:200] @property def temp_folder(self: "Task") -> Path: path = TEMP_VIDEOS_DIRECTORY / str(self.uuid) if not path.exists(): path.mkdir() return path @property def video_path(self: "Task") -> Path: return self.temp_folder / (self.temp_file_name + ".mp4") @property def thumbnail_path(self: "Task") -> Path: return self.temp_folder / (self.temp_file_name + ".jpeg") async def download_and_prepare(self: "Task", progress: Callable, progress_args: tuple) -> Path: await self.video_message.download( file_name=self.video_path, progress=progress, progress_args=progress_args, ) if not (await is_real_video(self.video_path)): raise NotValidVideoError v = MediaInfo.parse(self.video_path) if not v.can_parse(): raise NotValidVideoError if len(v.video_tracks) == 0: raise NotValidVideoError vt = v.video_tracks[0] duration: float | None = getattr(vt, "duration", None) if not duration: duration = await get_video_duration(self.video_path) if not duration: raise NotValidVideoError else: duration = float(duration) / 1000 self.video_duration = float(duration) await take_screenshot(self.video_path, self.thumbnail_path, self.video_duration) return self.video_path class NotValidVideoError(Exception): pass