API Reference#
Asynchronous-style DataPipes#
An asynchronous-style DataPipe is one that implements the
__aiter__ protocol, and represents an
asynchronous iterable over data samples.
This is well-suited for cases when I/O latency is slow, e.g. when waiting on
network connections, or performing read operations on multiple files at once.
- class bambooflow.datapipes.AsyncIterDataPipe[source]#
Bases:
AsyncIterableAsynchronous iterable-style DataPipes.
All DataPipes that represent an asynchronous iterable of data samples should subclass this. This style of DataPipes is particularly useful for performing I/O-bound tasks such as streaming data from a network disk drive or reading multiple files concurrently.
AsyncIterDataPipeis initialized in a lazy fashion, and its elements are computed only when__anext__is called on the async iterator of anAsyncIterDataPipe.
- bambooflow.datapipes.AsyncIterableWrapper#
- class bambooflow.datapipes.aiter.AsyncIterableWrapperAsyncIterDataPipe(iterable: Iterable)[source]#
Bases:
AsyncIterDataPipeWraps an iterable object to create an AsyncIterDataPipe.
Adapted from https://peps.python.org/pep-0492/#example-2
- Parameters:
iterable (collections.abc.Iterable) – An iterable object to be wrapped into an AsyncIterDataPipe.
- Yields:
awaitable (collections.abc.Awaitable) – An awaitable object from the asynchronous iterator.
Example
>>> import asyncio >>> from bambooflow.datapipes import AsyncIterableWrapper ... >>> # Wrap a list into an asynchronous iterator >>> dp = AsyncIterableWrapper(iterable=[3, 6, 9]) ... >>> # Loop or iterate over the DataPipe stream >>> it = aiter(dp) >>> number = anext(it) >>> asyncio.run(number) 3 >>> number = anext(it) >>> asyncio.run(number) 6 >>> # Or if running in an interactive REPL with top-level `await` support >>> number = anext(it) >>> await number 9
Mapping DataPipes#
Datapipes which apply a custom asynchronous function to elements in a DataPipe.
- bambooflow.datapipes.Mapper#
alias of
MapperAsyncIterDataPipe
- class bambooflow.datapipes.callable.MapperAsyncIterDataPipe(datapipe: AsyncIterDataPipe, fn: Callable[[...], Coroutine[Any, Any, Any]])[source]#
Bases:
AsyncIterDataPipeApplies an asynchronous function over each item from the source DataPipe (functional name:
map).- Parameters:
datapipe (AsyncIterDataPipe) – The source asynchronous iterable-style DataPipe.
fn (Callable) – Asynchronous function to be applied over each item.
- Yields:
awaitable (collections.abc.Awaitable) – An awaitable object from the asynchronous iterator.
- Raises:
ExceptionGroup – If any one of the concurrent tasks raises an
Exception. See PEP654 for general advice on how to handle exception groups.
Example
>>> import asyncio >>> from bambooflow.datapipes import AsyncIterableWrapper, Mapper ... >>> # Apply an asynchronous multiply by two function >>> async def times_two(x) -> float: ... await asyncio.sleep(delay=x) ... return x * 2 >>> dp = AsyncIterableWrapper(iterable=[0.1, 0.2, 0.3]) >>> dp_map = Mapper(datapipe=dp, fn=times_two) ... >>> # Loop or iterate over the DataPipe stream >>> it = aiter(dp_map) >>> number = anext(it) >>> asyncio.run(number) 0.2 >>> number = anext(it) >>> asyncio.run(number) 0.4 >>> # Or if running in an interactive REPL with top-level `await` support >>> number = anext(it) >>> await number 0.6
Helper Functions#
Functions, classes, decorators, and context managers to help build DataPipes.
- class bambooflow.helpers.functional_datapipe(name: str)[source]#
Decorator to wrap an
AsyncIterDataPipeto have a functional form.- Parameters:
name (str) – The name for the functional form of the
bambooflow.datapipes.AsyncIterDataPipe.
Note
The functional method is injected into the private
_functionsvariable of the baseAsyncIterDataPipeclass, and relies on the modifiedAsyncIterDataPipe.__getattr__method to call the wrapped function.Example
>>> from bambooflow.datapipes import AsyncIterableWrapper >>> from bambooflow.helpers import functional_datapipe >>> >>> @functional_datapipe(name="pipe") ... class PiperAsyncIterDataPipe(AsyncIterDataPipe): ... def __init__(self, datapipe): ... ... ... async def __aiter__(self): ... ... >>> >>> dp = AsyncIterableWrapper(iterable=["a", "b", "c"]) >>> dp_pipe = dp.pipe()