from __future__ import annotations import asyncio import re from pyrogram import Client, filters from pyrogram.errors import FloodWait, InputUserDeactivated, PeerIdInvalid, UserIsBlocked from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message from utils import filters as myfilters from utils.client import MyClient from utils.db import Depends, User, get_user, get_user_query, inject from utils.i18n import tr as _ from utils.messages import message_handler NO_BROADCAST_PATTERN = re.compile(r"^broadcast\.no$") YES_BROADCAST_PATTERN = re.compile(r"^f?broadcast\.yes$") @Client.on_message(filters.reply & filters.command(["bc"]) & myfilters.is_admin) @inject async def broadcast_main(client: MyClient, msg: Message, user: User = Depends(get_user)) -> None: await message_handler( msg.reply_to_message.reply, _("cp.broadcast_confirm", locale=user.language), quote=True, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton(_("common.yes", locale=user.language), "broadcast.yes"), InlineKeyboardButton(_("common.no", locale=user.language), "broadcast.no"), ] ] ), ) await message_handler(msg.delete) @Client.on_message(filters.reply & filters.command(["fbc"]) & myfilters.is_admin) @inject async def forward_broadcast_main( client: MyClient, msg: Message, user: User = Depends(get_user) ) -> None: await message_handler( msg.reply_to_message.reply, _("cp.broadcast_confirm", locale=user.language), quote=True, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton(_("common.yes", locale=user.language), "fbroadcast.yes"), InlineKeyboardButton(_("common.no", locale=user.language), "broadcast.no"), ] ] ), ) await message_handler(msg.delete) @Client.on_callback_query(filters.regex(NO_BROADCAST_PATTERN)) async def broadcast_no(client: MyClient, query: CallbackQuery) -> None: await message_handler(query.message.delete) async def send_message(msg: Message, user_id: int, copy: bool) -> tuple[bool, str | None]: try: if copy: await msg.copy(user_id) else: await msg.forward(user_id) except FloodWait as e: await asyncio.sleep(e.x * 1.2) return await send_message(user_id, msg) except (InputUserDeactivated, UserIsBlocked, PeerIdInvalid): return 400, None except Exception as e: return 500, str(repr(e)) else: return True, None @Client.on_callback_query(filters.regex(YES_BROADCAST_PATTERN)) @inject async def broadcast_yes( client: MyClient, query: CallbackQuery, user: User = Depends(get_user_query) ) -> None: msg_to_copy = query.message.reply_to_message pipeline = [ { "$project": { "_id": None, "user_id": 1, } }, ] users = await User.find().aggregate(pipeline).to_list() errors = 0 copy = not query.data.startswith("f") for index, tele_user in enumerate(users): success, err = await send_message(msg_to_copy, tele_user["user_id"], copy=copy) if success == 400: # noqa: PLR2004 errors += 1 elif success == 500: # noqa: PLR2004 errors += 1 await message_handler(query.message.reply, f"ERROR: {err}") if index % 100 == 0: await message_handler( query.edit_message_text, _( "cp.broadcast_status", locale=user.language, count=index + 1, errors=errors, finished="❌", ), ) await message_handler( query.edit_message_text, _( "cp.broadcast_status", locale=user.language, count=index + 1, errors=errors, finished="✅", ), ) await message_handler(query.message.reply_to_message.delete)