import asyncio from asyncio import QueueEmpty, QueueFull # noqa: F401 from typing import Any, Self class AsyncQueue(asyncio.Queue): def insert_nowait(self: Self, index: int, item: Any) -> None: # noqa: ANN401 if self.full(): raise QueueFull self._insert(index, item) self._unfinished_tasks += 1 self._finished.clear() self._wakeup_next(self._getters) async def insert(self: Self, index: int, item: Any) -> None: # noqa: ANN401 """Insert an item into the queue. Insert an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ while self.full(): putter = self._get_loop().create_future() self._putters.append(putter) try: await putter except: putter.cancel() # Just in case putter is not done yet. try: # noqa: SIM105 # Clean self._putters from canceled putters. self._putters.remove(putter) except ValueError: # The putter could be removed from self._putters by a # previous get_nowait call. pass if not self.full() and not putter.cancelled(): # We were woken up by get_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._putters) raise return self.insert_nowait(index, item) def _insert(self: Self, index: int, item: Any) -> None: # noqa: ANN401 self._queue.insert(index, item) tg_download_queue = AsyncQueue() tg_download_queue_lock = asyncio.Lock() okru_queue = AsyncQueue() okru_queue_lock = asyncio.Lock()