from __future__ import annotations import asyncio from collections.abc import Awaitable, Callable, Iterable from typing import Any ASYNC_FUNC = Callable[[Any, Any], Awaitable[Any]] async def gather_with_concurrency( number_of_parller_tasks: int, tasks: Iterable[ASYNC_FUNC[Any]] ) -> list[Any]: """ run many functions in parallel Args: number_of_parller_tasks (int): Number of parallel tasks. Returns: List[Any]: The return of all given functions. """ semaphore = asyncio.Semaphore(number_of_parller_tasks) async def sem_task(task: ASYNC_FUNC) -> Any: # noqa: ANN401 async with semaphore: return await task return await asyncio.gather(*(sem_task(task) for task in tasks))