Source code for bambooflow.datapipes.callable
"""
Asynchronous Iterable DataPipes for asynchronous functions.
"""
import asyncio
from collections.abc import AsyncIterator, Callable, Coroutine
from typing import Any
from bambooflow.datapipes.aiter import AsyncIterDataPipe
from bambooflow.helpers import functional_datapipe
[docs]@functional_datapipe(name="map")
class MapperAsyncIterDataPipe(AsyncIterDataPipe):
"""
Applies an asynchronous function over each item from the source DataPipe
(functional name: ``map``).
Parameters
----------
datapipe : AsyncIterDataPipe
The source asynchronous iterable-style DataPipe.
fn : Callable
Asynchronous function to be applied over each item.
Yields
------
awaitable : collections.abc.Awaitable
An :py-term:`awaitable` object from the
:py-term:`asynchronous iterator <asynchronous-iterator>`.
Raises
------
ExceptionGroup
If any one of the concurrent tasks raises an :py:class:`Exception`. See
`PEP654 <https://peps.python.org/pep-0654/#handling-exception-groups>`_
for general advice on how to handle exception groups.
Example
-------
>>> import asyncio
>>> from bambooflow.datapipes import AsyncIterableWrapper, Mapper
...
>>> # Apply an asynchronous multiply by two function
>>> async def times_two(x) -> float:
... await asyncio.sleep(delay=x)
... return x * 2
>>> dp = AsyncIterableWrapper(iterable=[0.1, 0.2, 0.3])
>>> dp_map = Mapper(datapipe=dp, fn=times_two)
...
>>> # Loop or iterate over the DataPipe stream
>>> it = aiter(dp_map)
>>> number = anext(it)
>>> asyncio.run(number)
0.2
>>> number = anext(it)
>>> asyncio.run(number)
0.4
>>> # Or if running in an interactive REPL with top-level `await` support
>>> number = anext(it)
>>> await number # doctest: +SKIP
0.6
"""
def __init__(
self, datapipe: AsyncIterDataPipe, fn: Callable[..., Coroutine[Any, Any, Any]]
):
super().__init__()
self._datapipe = datapipe
self._fn = fn
async def __aiter__(self) -> AsyncIterator:
try:
async with asyncio.TaskGroup() as task_group:
tasks: list[asyncio.Task] = [
task_group.create_task(coro=self._fn(data))
async for data in self._datapipe
]
except* BaseException as err:
raise ValueError(f"{err=}") from err
for task in tasks:
result = await task
yield result