API Reference#

Asynchronous-style DataPipes#

An asynchronous-style DataPipe is one that implements the __aiter__ protocol, and represents an asynchronous iterable over data samples. This is well-suited for cases when I/O latency is slow, e.g. when waiting on network connections, or performing read operations on multiple files at once.

class bambooflow.datapipes.AsyncIterDataPipe[source]#

Bases: AsyncIterable

Asynchronous iterable-style DataPipes.

All DataPipes that represent an asynchronous iterable of data samples should subclass this. This style of DataPipes is particularly useful for performing I/O-bound tasks such as streaming data from a network disk drive or reading multiple files concurrently. AsyncIterDataPipe is initialized in a lazy fashion, and its elements are computed only when __anext__ is called on the async iterator of an AsyncIterDataPipe.

bambooflow.datapipes.AsyncIterableWrapper#

alias of AsyncIterableWrapperAsyncIterDataPipe

class bambooflow.datapipes.aiter.AsyncIterableWrapperAsyncIterDataPipe(iterable: Iterable)[source]#

Bases: AsyncIterDataPipe

Wraps an iterable object to create an AsyncIterDataPipe.

Adapted from https://peps.python.org/pep-0492/#example-2

Parameters:

iterable (collections.abc.Iterable) – An iterable object to be wrapped into an AsyncIterDataPipe.

Yields:

awaitable (collections.abc.Awaitable) – An awaitable object from the asynchronous iterator.

Example

>>> import asyncio
>>> from bambooflow.datapipes import AsyncIterableWrapper
...
>>> # Wrap a list into an asynchronous iterator
>>> dp = AsyncIterableWrapper(iterable=[3, 6, 9])
...
>>> # Loop or iterate over the DataPipe stream
>>> it = aiter(dp)
>>> number = anext(it)
>>> asyncio.run(number)
3
>>> number = anext(it)
>>> asyncio.run(number)
6
>>> # Or if running in an interactive REPL with top-level `await` support
>>> number = anext(it)  
>>> await number  
9

Mapping DataPipes#

Datapipes which apply a custom asynchronous function to elements in a DataPipe.

bambooflow.datapipes.Mapper#

alias of MapperAsyncIterDataPipe

class bambooflow.datapipes.callable.MapperAsyncIterDataPipe(datapipe: AsyncIterDataPipe, fn: Callable[[...], Coroutine[Any, Any, Any]])[source]#

Bases: AsyncIterDataPipe

Applies an asynchronous function over each item from the source DataPipe (functional name: map).

Parameters:
  • datapipe (AsyncIterDataPipe) – The source asynchronous iterable-style DataPipe.

  • fn (Callable) – Asynchronous function to be applied over each item.

Yields:

awaitable (collections.abc.Awaitable) – An awaitable object from the asynchronous iterator.

Raises:

ExceptionGroup – If any one of the concurrent tasks raises an Exception. See PEP654 for general advice on how to handle exception groups.

Example

>>> import asyncio
>>> from bambooflow.datapipes import AsyncIterableWrapper, Mapper
...
>>> # Apply an asynchronous multiply by two function
>>> async def times_two(x) -> float:
...     await asyncio.sleep(delay=x)
...     return x * 2
>>> dp = AsyncIterableWrapper(iterable=[0.1, 0.2, 0.3])
>>> dp_map = Mapper(datapipe=dp, fn=times_two)
...
>>> # Loop or iterate over the DataPipe stream
>>> it = aiter(dp_map)
>>> number = anext(it)
>>> asyncio.run(number)
0.2
>>> number = anext(it)
>>> asyncio.run(number)
0.4
>>> # Or if running in an interactive REPL with top-level `await` support
>>> number = anext(it)
>>> await number  
0.6

Helper Functions#

Functions, classes, decorators, and context managers to help build DataPipes.

class bambooflow.helpers.functional_datapipe(name: str)[source]#

Decorator to wrap an AsyncIterDataPipe to have a functional form.

Parameters:

name (str) – The name for the functional form of the bambooflow.datapipes.AsyncIterDataPipe.

Note

The functional method is injected into the private _functions variable of the base AsyncIterDataPipe class, and relies on the modified AsyncIterDataPipe.__getattr__ method to call the wrapped function.

Example

>>> from bambooflow.datapipes import AsyncIterableWrapper
>>> from bambooflow.helpers import functional_datapipe
>>>
>>> @functional_datapipe(name="pipe")
... class PiperAsyncIterDataPipe(AsyncIterDataPipe):
...     def __init__(self, datapipe):
...         ...
...     async def __aiter__(self):
...         ...
>>>
>>> dp = AsyncIterableWrapper(iterable=["a", "b", "c"])
>>> dp_pipe = dp.pipe()