Coverage for aiostream/stream/create.py: 96%
81 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-02 23:18 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-02 23:18 +0000
1"""Non-pipable creation operators."""
2from __future__ import annotations
4import sys
5import asyncio
6import inspect
7import builtins
8import itertools
10from typing import (
11 AsyncIterable,
12 Awaitable,
13 Iterable,
14 Protocol,
15 TypeVar,
16 AsyncIterator,
17 cast,
18)
19from typing_extensions import ParamSpec
21from ..stream import time
22from ..core import operator, streamcontext
24__all__ = [
25 "iterate",
26 "preserve",
27 "just",
28 "call",
29 "throw",
30 "empty",
31 "never",
32 "repeat",
33 "range",
34 "count",
35]
37T = TypeVar("T")
38P = ParamSpec("P")
40# Hack for python 3.8 compatibility
41if sys.version_info < (3, 9):
42 P = TypeVar("P")
44# Convert regular iterables
47@operator
48async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
49 """Generate values from a regular iterable."""
50 for item in it:
51 await asyncio.sleep(0)
52 yield item
55@operator
56def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]:
57 """Generate values from an asynchronous iterable.
59 Note: the corresponding iterator will be explicitely closed
60 when leaving the context manager."""
61 return streamcontext(ait)
64@operator
65def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]:
66 """Generate values from a sychronous or asynchronous iterable."""
67 if isinstance(it, AsyncIterable):
68 return from_async_iterable.raw(it)
69 if isinstance(it, Iterable):
70 return from_iterable.raw(it)
71 raise TypeError(f"{type(it).__name__!r} object is not (async) iterable")
74@operator
75async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]:
76 """Generate values from an asynchronous iterable without
77 explicitly closing the corresponding iterator."""
78 async for item in ait:
79 yield item
82# Simple operators
85@operator
86async def just(value: T) -> AsyncIterator[T]:
87 """Await if possible, and generate a single value."""
88 if inspect.isawaitable(value):
89 yield await value
90 else:
91 yield value
94Y = TypeVar("Y", covariant=True)
97class SyncCallable(Protocol[P, Y]):
98 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
99 ...
102class AsyncCallable(Protocol[P, Y]):
103 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
104 ...
107@operator
108async def call(
109 func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
110) -> AsyncIterator[T]:
111 """Call the given function and generate a single value.
113 Await if the provided function is asynchronous.
114 """
115 if asyncio.iscoroutinefunction(func):
116 async_func = cast("AsyncCallable[P, T]", func)
117 yield await async_func(*args, **kwargs)
118 else:
119 sync_func = cast("SyncCallable[P, T]", func)
120 yield sync_func(*args, **kwargs)
123@operator
124async def throw(exc: Exception) -> AsyncIterator[None]:
125 """Throw an exception without generating any value."""
126 if False:
127 yield
128 raise exc
131@operator
132async def empty() -> AsyncIterator[None]:
133 """Terminate without generating any value."""
134 if False:
135 yield
138@operator
139async def never() -> AsyncIterator[None]:
140 """Hang forever without generating any value."""
141 if False:
142 yield
143 future: asyncio.Future[None] = asyncio.Future()
144 try:
145 await future
146 finally:
147 future.cancel()
150@operator
151def repeat(
152 value: T, times: int | None = None, *, interval: float = 0.0
153) -> AsyncIterator[T]:
154 """Generate the same value a given number of times.
156 If ``times`` is ``None``, the value is repeated indefinitely.
157 An optional interval can be given to space the values out.
158 """
159 args = () if times is None else (times,)
160 it = itertools.repeat(value, *args)
161 agen = from_iterable.raw(it)
162 return time.spaceout.raw(agen, interval) if interval else agen
165# Counting operators
168@operator
169def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]:
170 """Generate a given range of numbers.
172 It supports the same arguments as the builtin function.
173 An optional interval can be given to space the values out.
174 """
175 agen = from_iterable.raw(builtins.range(*args))
176 return time.spaceout.raw(agen, interval) if interval else agen
179@operator
180def count(
181 start: int = 0, step: int = 1, *, interval: float = 0.0
182) -> AsyncIterator[int]:
183 """Generate consecutive numbers indefinitely.
185 Optional starting point and increment can be defined,
186 respectively defaulting to ``0`` and ``1``.
188 An optional interval can be given to space the values out.
189 """
190 agen = from_iterable.raw(itertools.count(start, step))
191 return time.spaceout.raw(agen, interval) if interval else agen