summaryrefslogtreecommitdiff
path: root/devdocs/python~3.12/library%2Fasyncio-queue.html
blob: 757550e67e22d73182b08cddc4e7074837d7f5bc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
 <span id="asyncio-queues"></span><h1>Queues</h1> <p><strong>Source code:</strong> <a class="reference external" href="https://github.com/python/cpython/tree/3.12/Lib/asyncio/queues.py">Lib/asyncio/queues.py</a></p>  <p>asyncio queues are designed to be similar to classes of the <a class="reference internal" href="queue#module-queue" title="queue: A synchronized queue class."><code>queue</code></a> module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.</p> <p>Note that methods of asyncio queues don’t have a <em>timeout</em> parameter; use <a class="reference internal" href="asyncio-task#asyncio.wait_for" title="asyncio.wait_for"><code>asyncio.wait_for()</code></a> function to do queue operations with a timeout.</p> <p>See also the <a class="reference internal" href="#examples">Examples</a> section below.</p> <section id="queue"> <h2>Queue</h2> <dl class="py class"> <dt class="sig sig-object py" id="asyncio.Queue">
<code>class asyncio.Queue(maxsize=0)</code> </dt> <dd>
<p>A first in, first out (FIFO) queue.</p> <p>If <em>maxsize</em> is less than or equal to zero, the queue size is infinite. If it is an integer greater than <code>0</code>, then <code>await put()</code> blocks when the queue reaches <em>maxsize</em> until an item is removed by <a class="reference internal" href="#asyncio.Queue.get" title="asyncio.Queue.get"><code>get()</code></a>.</p> <p>Unlike the standard library threading <a class="reference internal" href="queue#module-queue" title="queue: A synchronized queue class."><code>queue</code></a>, the size of the queue is always known and can be returned by calling the <a class="reference internal" href="#asyncio.Queue.qsize" title="asyncio.Queue.qsize"><code>qsize()</code></a> method.</p> <div class="versionchanged"> <p><span class="versionmodified changed">Changed in version 3.10: </span>Removed the <em>loop</em> parameter.</p> </div> <p>This class is <a class="reference internal" href="asyncio-dev#asyncio-multithreading"><span class="std std-ref">not thread safe</span></a>.</p> <dl class="py attribute"> <dt class="sig sig-object py" id="asyncio.Queue.maxsize">
<code>maxsize</code> </dt> <dd>
<p>Number of items allowed in the queue.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.empty">
<code>empty()</code> </dt> <dd>
<p>Return <code>True</code> if the queue is empty, <code>False</code> otherwise.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.full">
<code>full()</code> </dt> <dd>
<p>Return <code>True</code> if there are <a class="reference internal" href="#asyncio.Queue.maxsize" title="asyncio.Queue.maxsize"><code>maxsize</code></a> items in the queue.</p> <p>If the queue was initialized with <code>maxsize=0</code> (the default), then <a class="reference internal" href="#asyncio.Queue.full" title="asyncio.Queue.full"><code>full()</code></a> never returns <code>True</code>.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.get">
<code>coroutine get()</code> </dt> <dd>
<p>Remove and return an item from the queue. If queue is empty, wait until an item is available.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.get_nowait">
<code>get_nowait()</code> </dt> <dd>
<p>Return an item if one is immediately available, else raise <a class="reference internal" href="#asyncio.QueueEmpty" title="asyncio.QueueEmpty"><code>QueueEmpty</code></a>.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.join">
<code>coroutine join()</code> </dt> <dd>
<p>Block until all items in the queue have been received and processed.</p> <p>The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer coroutine calls <a class="reference internal" href="#asyncio.Queue.task_done" title="asyncio.Queue.task_done"><code>task_done()</code></a> to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, <a class="reference internal" href="#asyncio.Queue.join" title="asyncio.Queue.join"><code>join()</code></a> unblocks.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.put">
<code>coroutine put(item)</code> </dt> <dd>
<p>Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.put_nowait">
<code>put_nowait(item)</code> </dt> <dd>
<p>Put an item into the queue without blocking.</p> <p>If no free slot is immediately available, raise <a class="reference internal" href="#asyncio.QueueFull" title="asyncio.QueueFull"><code>QueueFull</code></a>.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.qsize">
<code>qsize()</code> </dt> <dd>
<p>Return the number of items in the queue.</p> </dd>
</dl> <dl class="py method"> <dt class="sig sig-object py" id="asyncio.Queue.task_done">
<code>task_done()</code> </dt> <dd>
<p>Indicate that a formerly enqueued task is complete.</p> <p>Used by queue consumers. For each <a class="reference internal" href="#asyncio.Queue.get" title="asyncio.Queue.get"><code>get()</code></a> used to fetch a task, a subsequent call to <a class="reference internal" href="#asyncio.Queue.task_done" title="asyncio.Queue.task_done"><code>task_done()</code></a> tells the queue that the processing on the task is complete.</p> <p>If a <a class="reference internal" href="#asyncio.Queue.join" title="asyncio.Queue.join"><code>join()</code></a> is currently blocking, it will resume when all items have been processed (meaning that a <a class="reference internal" href="#asyncio.Queue.task_done" title="asyncio.Queue.task_done"><code>task_done()</code></a> call was received for every item that had been <a class="reference internal" href="#asyncio.Queue.put" title="asyncio.Queue.put"><code>put()</code></a> into the queue).</p> <p>Raises <a class="reference internal" href="exceptions#ValueError" title="ValueError"><code>ValueError</code></a> if called more times than there were items placed in the queue.</p> </dd>
</dl> </dd>
</dl> </section> <section id="priority-queue"> <h2>Priority Queue</h2> <dl class="py class"> <dt class="sig sig-object py" id="asyncio.PriorityQueue">
<code>class asyncio.PriorityQueue</code> </dt> <dd>
<p>A variant of <a class="reference internal" href="#asyncio.Queue" title="asyncio.Queue"><code>Queue</code></a>; retrieves entries in priority order (lowest first).</p> <p>Entries are typically tuples of the form <code>(priority_number, data)</code>.</p> </dd>
</dl> </section> <section id="lifo-queue"> <h2>LIFO Queue</h2> <dl class="py class"> <dt class="sig sig-object py" id="asyncio.LifoQueue">
<code>class asyncio.LifoQueue</code> </dt> <dd>
<p>A variant of <a class="reference internal" href="#asyncio.Queue" title="asyncio.Queue"><code>Queue</code></a> that retrieves most recently added entries first (last in, first out).</p> </dd>
</dl> </section> <section id="exceptions"> <h2>Exceptions</h2> <dl class="py exception"> <dt class="sig sig-object py" id="asyncio.QueueEmpty">
<code>exception asyncio.QueueEmpty</code> </dt> <dd>
<p>This exception is raised when the <a class="reference internal" href="#asyncio.Queue.get_nowait" title="asyncio.Queue.get_nowait"><code>get_nowait()</code></a> method is called on an empty queue.</p> </dd>
</dl> <dl class="py exception"> <dt class="sig sig-object py" id="asyncio.QueueFull">
<code>exception asyncio.QueueFull</code> </dt> <dd>
<p>Exception raised when the <a class="reference internal" href="#asyncio.Queue.put_nowait" title="asyncio.Queue.put_nowait"><code>put_nowait()</code></a> method is called on a queue that has reached its <em>maxsize</em>.</p> </dd>
</dl> </section> <section id="examples"> <h2>Examples</h2> <p id="asyncio-example-queue-dist">Queues can be used to distribute workload between several concurrent tasks:</p> <pre data-language="python">import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())
</pre> </section> <div class="_attribution">
  <p class="_attribution-p">
    &copy; 2001&ndash;2023 Python Software Foundation<br>Licensed under the PSF License.<br>
    <a href="https://docs.python.org/3.12/library/asyncio-queue.html" class="_attribution-link">https://docs.python.org/3.12/library/asyncio-queue.html</a>
  </p>
</div>