Source code for bambooflow.datapipes.aiter
"""
Asynchronous Iterable DataPipes base class and wrapper.
"""
import functools
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable, Iterable
[docs]class AsyncIterDataPipe(AsyncIterable):
"""
Asynchronous iterable-style DataPipes.
All DataPipes that represent an asynchronous iterable of data samples
should subclass this. This style of DataPipes is particularly useful for
performing I/O-bound tasks such as streaming data from a network disk drive
or reading multiple files concurrently. ``AsyncIterDataPipe`` is
initialized in a lazy fashion, and its elements are computed only when
:py:meth:`__anext__ <object.__anext__>` is called on the async iterator of
an ``AsyncIterDataPipe``.
"""
_functions: dict[str, Callable] = {}
def __getattr__(self, attribute_name: str) -> Callable:
"""
Allow calling functions stored in the private ``_functions`` variable,
e.g. those added by the functional_datapipe decorator.
"""
if f := AsyncIterDataPipe._functions.get(attribute_name):
function = functools.partial(f, self)
functools.update_wrapper(wrapper=function, wrapped=f, assigned=("__doc__",))
else:
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{attribute_name}'"
)
return function
def __repr__(self) -> str:
# Instead of showing <bambooflow. ... .AsyncIterableWrapper at 0x.....>,
# return the qualified name of the class like <AsyncIterableWrapper>
return str(self.__class__.__qualname__)
[docs]class AsyncIterableWrapperAsyncIterDataPipe(AsyncIterDataPipe):
"""
Wraps an iterable object to create an AsyncIterDataPipe.
Adapted from https://peps.python.org/pep-0492/#example-2
Parameters
----------
iterable : collections.abc.Iterable
An :py-term:`iterable` object to be wrapped into an AsyncIterDataPipe.
Yields
------
awaitable : collections.abc.Awaitable
An :py-term:`awaitable` object from the
:py-term:`asynchronous iterator <asynchronous-iterator>`.
Example
-------
>>> import asyncio
>>> from bambooflow.datapipes import AsyncIterableWrapper
...
>>> # Wrap a list into an asynchronous iterator
>>> dp = AsyncIterableWrapper(iterable=[3, 6, 9])
...
>>> # Loop or iterate over the DataPipe stream
>>> it = aiter(dp)
>>> number = anext(it)
>>> asyncio.run(number)
3
>>> number = anext(it)
>>> asyncio.run(number)
6
>>> # Or if running in an interactive REPL with top-level `await` support
>>> number = anext(it) # doctest: +SKIP
>>> await number # doctest: +SKIP
9
"""
def __init__(self, iterable: Iterable):
self._iterable = iter(iterable)
def __aiter__(self) -> AsyncIterator:
return self
async def __anext__(self) -> Awaitable:
try:
value = next(self._iterable)
except StopIteration as err:
raise StopAsyncIteration from err
return value