diff options
| author | Craig Jennings <c@cjennings.net> | 2024-04-07 13:41:34 -0500 |
|---|---|---|
| committer | Craig Jennings <c@cjennings.net> | 2024-04-07 13:41:34 -0500 |
| commit | 754bbf7a25a8dda49b5d08ef0d0443bbf5af0e36 (patch) | |
| tree | f1190704f78f04a2b0b4c977d20fe96a828377f1 /devdocs/python~3.12/library%2Fasyncio-queue.html | |
new repository
Diffstat (limited to 'devdocs/python~3.12/library%2Fasyncio-queue.html')
| -rw-r--r-- | devdocs/python~3.12/library%2Fasyncio-queue.html | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/devdocs/python~3.12/library%2Fasyncio-queue.html b/devdocs/python~3.12/library%2Fasyncio-queue.html new file mode 100644 index 00000000..757550e6 --- /dev/null +++ b/devdocs/python~3.12/library%2Fasyncio-queue.html @@ -0,0 +1,104 @@ + <span id="asyncio-queues"></span><h1>Queues</h1> <p><strong>Source code:</strong> <a class="reference external" href="https://github.com/python/cpython/tree/3.12/Lib/asyncio/queues.py">Lib/asyncio/queues.py</a></p> <p>asyncio queues are designed to be similar to classes of the <a class="reference internal" href="queue#module-queue" title="queue: A synchronized queue class."><code>queue</code></a> module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.</p> <p>Note that methods of asyncio queues don’t have a <em>timeout</em> parameter; use <a class="reference internal" href="asyncio-task#asyncio.wait_for" title="asyncio.wait_for"><code>asyncio.wait_for()</code></a> function to do queue operations with a timeout.</p> <p>See also the <a class="reference internal" href="#examples">Examples</a> section below.</p> <section id="queue"> <h2>Queue</h2> <dl class="py class"> <dt class="sig sig-object py" id="asyncio.Queue"> +<code>class asyncio.Queue(maxsize=0)</code> </dt> <dd> +<p>A first in, first out (FIFO) queue.</p> <p>If <em>maxsize</em> is less than or equal to zero, the queue size is infinite. If it is an integer greater than <code>0</code>, then <code>await put()</code> blocks when the queue reaches <em>maxsize</em> until an item is removed by <a class="reference internal" href="#asyncio.Queue.get" title="asyncio.Queue.get"><code>get()</code></a>.</p> <p>Unlike the standard library threading <a class="reference internal" href="queue#module-queue" title="queue: A synchronized queue class."><code>queue</code></a>, the size of the queue is always known and can be returned by calling the <a class="reference internal" href="#asyncio.Queue.qsize" title="asyncio.Queue.qsize"><code>qsize()</code></a> method.</p> <div class="versionchanged"> <p><span class="versionmodified changed">Changed in version 3.10: </span>Removed the <em>loop</em> parameter.</p> </div> <p>This class is <a class="reference internal" href="asyncio-dev#asyncio-multithreading"><span class="std std-ref">not thread safe</span></a>.</p> <dl class="py attribute"> <dt class="sig sig-object py" id="asyncio.Queue.maxsize"> +<code>maxsize</code> </dt> <dd> +<p>Number of items allowed in the queue.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.empty"> +<code>empty()</code> </dt> <dd> +<p>Return <code>True</code> if the queue is empty, <code>False</code> otherwise.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.full"> +<code>full()</code> </dt> <dd> +<p>Return <code>True</code> if there are <a class="reference internal" href="#asyncio.Queue.maxsize" title="asyncio.Queue.maxsize"><code>maxsize</code></a> items in the queue.</p> <p>If the queue was initialized with <code>maxsize=0</code> (the default), then <a class="reference internal" href="#asyncio.Queue.full" title="asyncio.Queue.full"><code>full()</code></a> never returns <code>True</code>.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.get"> +<code>coroutine get()</code> </dt> <dd> +<p>Remove and return an item from the queue. If queue is empty, wait until an item is available.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.get_nowait"> +<code>get_nowait()</code> </dt> <dd> +<p>Return an item if one is immediately available, else raise <a class="reference internal" href="#asyncio.QueueEmpty" title="asyncio.QueueEmpty"><code>QueueEmpty</code></a>.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.join"> +<code>coroutine join()</code> </dt> <dd> +<p>Block until all items in the queue have been received and processed.</p> <p>The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer coroutine calls <a class="reference internal" href="#asyncio.Queue.task_done" title="asyncio.Queue.task_done"><code>task_done()</code></a> to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, <a class="reference internal" href="#asyncio.Queue.join" title="asyncio.Queue.join"><code>join()</code></a> unblocks.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.put"> +<code>coroutine put(item)</code> </dt> <dd> +<p>Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.put_nowait"> +<code>put_nowait(item)</code> </dt> <dd> +<p>Put an item into the queue without blocking.</p> <p>If no free slot is immediately available, raise <a class="reference internal" href="#asyncio.QueueFull" title="asyncio.QueueFull"><code>QueueFull</code></a>.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.qsize"> +<code>qsize()</code> </dt> <dd> +<p>Return the number of items in the queue.</p> </dd> +</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.task_done"> +<code>task_done()</code> </dt> <dd> +<p>Indicate that a formerly enqueued task is complete.</p> <p>Used by queue consumers. For each <a class="reference internal" href="#asyncio.Queue.get" title="asyncio.Queue.get"><code>get()</code></a> used to fetch a task, a subsequent call to <a class="reference internal" href="#asyncio.Queue.task_done" title="asyncio.Queue.task_done"><code>task_done()</code></a> tells the queue that the processing on the task is complete.</p> <p>If a <a class="reference internal" href="#asyncio.Queue.join" title="asyncio.Queue.join"><code>join()</code></a> is currently blocking, it will resume when all items have been processed (meaning that a <a class="reference internal" href="#asyncio.Queue.task_done" title="asyncio.Queue.task_done"><code>task_done()</code></a> call was received for every item that had been <a class="reference internal" href="#asyncio.Queue.put" title="asyncio.Queue.put"><code>put()</code></a> into the queue).</p> <p>Raises <a class="reference internal" href="exceptions#ValueError" title="ValueError"><code>ValueError</code></a> if called more times than there were items placed in the queue.</p> </dd> +</dl> </dd> +</dl> </section> <section id="priority-queue"> <h2>Priority Queue</h2> <dl class="py class"> <dt class="sig sig-object py" id="asyncio.PriorityQueue"> +<code>class asyncio.PriorityQueue</code> </dt> <dd> +<p>A variant of <a class="reference internal" href="#asyncio.Queue" title="asyncio.Queue"><code>Queue</code></a>; retrieves entries in priority order (lowest first).</p> <p>Entries are typically tuples of the form <code>(priority_number, data)</code>.</p> </dd> +</dl> </section> <section id="lifo-queue"> <h2>LIFO Queue</h2> <dl class="py class"> <dt class="sig sig-object py" id="asyncio.LifoQueue"> +<code>class asyncio.LifoQueue</code> </dt> <dd> +<p>A variant of <a class="reference internal" href="#asyncio.Queue" title="asyncio.Queue"><code>Queue</code></a> that retrieves most recently added entries first (last in, first out).</p> </dd> +</dl> </section> <section id="exceptions"> <h2>Exceptions</h2> <dl class="py exception"> <dt class="sig sig-object py" id="asyncio.QueueEmpty"> +<code>exception asyncio.QueueEmpty</code> </dt> <dd> +<p>This exception is raised when the <a class="reference internal" href="#asyncio.Queue.get_nowait" title="asyncio.Queue.get_nowait"><code>get_nowait()</code></a> method is called on an empty queue.</p> </dd> +</dl> <dl class="py exception"> <dt class="sig sig-object py" id="asyncio.QueueFull"> +<code>exception asyncio.QueueFull</code> </dt> <dd> +<p>Exception raised when the <a class="reference internal" href="#asyncio.Queue.put_nowait" title="asyncio.Queue.put_nowait"><code>put_nowait()</code></a> method is called on a queue that has reached its <em>maxsize</em>.</p> </dd> +</dl> </section> <section id="examples"> <h2>Examples</h2> <p id="asyncio-example-queue-dist">Queues can be used to distribute workload between several concurrent tasks:</p> <pre data-language="python">import asyncio +import random +import time + + +async def worker(name, queue): + while True: + # Get a "work item" out of the queue. + sleep_for = await queue.get() + + # Sleep for the "sleep_for" seconds. + await asyncio.sleep(sleep_for) + + # Notify the queue that the "work item" has been processed. + queue.task_done() + + print(f'{name} has slept for {sleep_for:.2f} seconds') + + +async def main(): + # Create a queue that we will use to store our "workload". + queue = asyncio.Queue() + + # Generate random timings and put them into the queue. + total_sleep_time = 0 + for _ in range(20): + sleep_for = random.uniform(0.05, 1.0) + total_sleep_time += sleep_for + queue.put_nowait(sleep_for) + + # Create three worker tasks to process the queue concurrently. + tasks = [] + for i in range(3): + task = asyncio.create_task(worker(f'worker-{i}', queue)) + tasks.append(task) + + # Wait until the queue is fully processed. + started_at = time.monotonic() + await queue.join() + total_slept_for = time.monotonic() - started_at + + # Cancel our worker tasks. + for task in tasks: + task.cancel() + # Wait until all worker tasks are cancelled. + await asyncio.gather(*tasks, return_exceptions=True) + + print('====') + print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') + print(f'total expected sleep time: {total_sleep_time:.2f} seconds') + + +asyncio.run(main()) +</pre> </section> <div class="_attribution"> + <p class="_attribution-p"> + © 2001–2023 Python Software Foundation<br>Licensed under the PSF License.<br> + <a href="https://docs.python.org/3.12/library/asyncio-queue.html" class="_attribution-link">https://docs.python.org/3.12/library/asyncio-queue.html</a> + </p> +</div> |
