Async IO is a concurrent programming style that has been given specific support in Python. It has evolved significantly from Python 3.4 to Python 3.7, and presumably will continue to do so afterwards.
It’s possible that the words “concurrency, parallelism, threading, and multiprocessing” are giving you chills. Already, it is a lot of information to take in. Where does asynchronous I/O come into play? ”
This course is designed to help you answer that issue and give you a clearer knowledge of Python’s approach to asynchronous input and output (async IO). Here is the material that you will go over:
-
Asynchronous IO (async IO)
: a language-agnostic paradigm (model) that has implementations across a host of programming languages -
async
/
await
: two new
Python keywords
that are used to define coroutines -
asyncio
: the Python package that provides a foundation and API for running and managing coroutines
We’ll delve into Python’s coroutines, which are specialised generator functions, later on in this lesson since they are at the core of async IO in Python. Note: The word “async IO” will be used throughout this article to refer to the language-independent architecture of asynchronous IO. The name “asyncio” will be used to refer to the Python package.
Before you begin, you will need to check that your system is prepared to make use of asyncio and the other libraries that may be found in here.
tutorial.
Setting Up Your Environment
If you want to read this article in its full, you’ll need Python version 3.7 or later, in addition to the aiohttp and aiofiles packages.
packages:
$ python3.7 -m venv ./py37async
$ source ./py37async/bin/activate # Windows: .\py37async\Scripts\activate.bat
$ pip install --upgrade pip aiohttp aiofiles # Optional: aiodns
Check out the Python 3 Installation & Setup Guide or the Virtual Environments Primer if you need assistance installing Python 3.7 or establishing a virtual environment. Both of these guides are available on this website.
With that said, let’s get going.
in.
The 10,000-Foot View of Async IO
Async I/O is not as well-known as its more established relatives, multiprocessing and threading, but it is becoming more popular. This part will provide you with a more in-depth understanding of what async IO is and how it functions within the context of its surroundings.
landscape.
Where Does Async IO Fit In?
Both concurrency and parallelism are large topics that might be difficult to get a foothold in. Although while the primary emphasis of this essay is on asynchronous I/O and how it is implemented in Python, it is important to take a moment to examine how asynchronous I/O compares to its analogues in order to have a better understanding of how it fits into the wider, more complicated jigsaw. The practise of carrying out a number of different activities in tandem is known as parallelism. Parallelism may be achieved by a technique known as multiprocessing, which involves distributing work assignments over several central processing units of a computer (CPUs, or cores). Multiprocessing works effectively for jobs that need a lot of processing power from the central processing unit (CPU). Tightly constrained for loops and mathematical calculations often fall into this category. Parallelism is a subset of the more general concept known as concurrency. It gives the impression that many different jobs may be carried out in a way that overlaps with one another. (There is a proverb that says concurrency does not imply parallelism.) Threading is an example of a paradigm for concurrent execution in which many threads take turns completing tasks. Several threads may be contained inside a single process. Because of its GIL, Python has a convoluted connection with threading, although discussing that aspect of the language would be beyond the scope of this article.
What is essential to understand about threading is that it performs better for jobs that are IO-bound. An IO-bound job is dominated by a significant amount of waiting for input and output to finish, in contrast to a CPU-bound job, which is defined by the cores of the computer always working hard from the beginning to the end of the operation.
To summarise what has been said above, concurrency includes both multiprocessing (which is great for jobs that are CPU-bound) and threading (suited for IO-bound tasks). Parallelism is a particular kind (subset) of concurrency, and multiprocessing is a form of parallelism. Parallelism is a subset of concurrency. Both of these are supported by the Python standard library’s multiprocessing, threading, and concurrent.futures packages, and this support has been available for a considerable amount of time.
It is time to add a fresh face to the ensemble at this point. During the course of the last several years, a different design has been more thoroughly integrated into CPython. This design is known as asynchronous input/output (IO), and it is made possible by the asyncio package of the standard library as well as the new async and await language keywords. As a point of clarification, asynchronous input/output is not a recently developed idea; in fact, it has been implemented in other languages and runtime environments, such as Go, C#, and Scala, or is in the process of being implemented in these languages.
The documentation for Python describes the asyncio package as a library that may be used to construct code that runs in parallel with other processes. But, asynchronous input/output is not the same thing as threading or multiprocessing. It is not constructed on any of these two things.
As a matter of fact, async I/O is a design that utilises a single thread and a single process. It makes use of cooperative multitasking, which is a concept that you will learn more about by the time this lesson is complete. In other words, it has been claimed that synchronous I/O offers the impression of concurrency while only using a single thread inside a single processing iteration. Coroutines, which are an essential part of async I/O, may be scheduled to run in parallel, but they do not have a concurrent nature by design.
To restate, async I/O is a flavour of concurrent programming; nevertheless, it is not the same thing as parallelism. It is more closely associated with threading than it is with multiprocessing, although it is very much separate from both of these and is an independent member of concurrency’s bag of tricks. Threading is more closely aligned with it than multiprocessing.
There is still one more term left. What exactly does it imply when someone or something is said to be asynchronous? This is not an exact definition, but for the sake of this discussion, I can think of two examples:
properties:
- Asynchronous routines are able to “pause” while waiting on their ultimate result and let other routines run in the meantime.
-
Asynchronous code
, through the mechanism above, facilitates concurrent execution. To put it differently, asynchronous code gives the look and feel of concurrency.
To further understand everything, we’ve included a graphic below. The words written in white indicate ideas, while the terms written in green reflect methods in which those conceptions are actualized or brought into effect:
The comparisons between different paradigms of concurrent programming will end here. The async I/O subcomponent, its many application programming interfaces (APIs), and how to make use of all of them are the primary focuses of this course. Check out Jim Anderson’s article on Python’s concurrency for a comprehensive look at the differences between threading, multiprocessing, and async IO. You may find it by pausing this text and going to his website. Jim is far funnier than I am, and he has also participated in a greater number of meetings than I have.
boot.
Async IO Explained
At first glance, using async IO could seem to be both contradictory and paradoxical. How can anything that makes it easier to run many pieces of code concurrently utilise just one thread on the computer and only one core? While I’ve never been particularly good at coming up with examples, I’d like to copy one from Miguel Grinberg’s lecture at the 2017 PyCon, which I think covers things very nicely:
An event in which the chess grandmaster Judit Polgár competes against various amateur players is being hosted by Polgár. She is able to run the display in either a synchronous or asynchronous fashion.
asynchronously.
Assumptions:
- 24 opponents
- Judit makes each chess move in 5 seconds
- Opponents each take 55 seconds to make a move
- Games average 30 pair-moves (60 moves total)
Version synchronous: Judit only only plays one game at a time; she never plays two games at once. She does this until the game is over. Each game is played with
(55 plus 5) multiplied by 30 equals 1800 seconds, which is equal to 30 minutes. The duration of the full show is.
24 * 30 == 720 minutes, or
12 hours .
Version asynchronous: Judit walks around the table, making one move at each table before moving on to the next table. She gets up from the table and gives the opponent the opportunity to make their next move while she is gone. One move on all 24 games takes Judit
24 multiplied by 5 equals 120 seconds, or two minutes. The whole of the show has been condensed into
120 multiplied by 30 is 3600 seconds, or just about
1 hour .
(Source)
There is only one Judit Polgár, who has only two hands, and she can only make one move at a time while she is playing alone. However by playing in an asynchronous fashion, the duration of the display may be reduced from 12 to one hour. Cooperative multitasking is just a fancier way of stating that the event loop of a programme (more on that in a bit) interacts with several jobs so that they may take turns executing at the appropriate moment for each work.
Async I/O enables other processes to execute during the downtime that would otherwise be blocked by extended waiting periods by taking those waiting periods away from those functions. (A function that blocks effectively prevents other functions from executing between the time that it begins and the time that it finishes.
returns.)
Async IO Is Not Easy
I have heard it stated, “Use async IO when you can; use threading when you must.” The reality is that developing long-lasting multithreaded programmes may be challenging and prone to errors. When you use async IO, you can avoid some of the possible speedbumps that you would have to deal with if you used a threaded architecture instead.
Nevertheless, this does not imply that asynchronous input and output in Python is simple. A word of caution: async programming may become challenging when you go a little deeper than the surface level. The async model in Python is constructed around ideas like callbacks, events, transports, protocols, and futures; the vocabulary alone might be confusing. The fact that its application programming interface (API) is always being updated makes it far more difficult.
Fortunately, Asyncio has developed to the point where the majority of its capabilities are no longer considered to be experimental. Moreover, its documentation has undergone a significant revision, and high-quality materials on the topic are beginning to become more readily available.
well.
The
Let’s investigate Python’s implementation now that you have a general understanding of how the async IO concept works in general. The asyncio package, which was introduced in Python version 3.4, together with its two associated keywords, async and await, have distinct functions but work in tandem to assist you in declaring, building, executing, and managing asynchronous code.
code.
The
Caution is Advised: You should think critically about anything that you read on the Internet. Between Python 3.4 to Python 3.7, the async IO API has undergone a significant amount of change. Several of the older patterns are no longer utilised, and some of the items that were once forbidden have been made permissible as a result of new developments.
Coroutines are the fundamental building block of async I/O. A generator function in Python may be thought of as an oversimplified form of a coroutine. A coroutine is a function that can suspend its execution before reaching return, and it can indirectly pass control to another coroutine for some time. Let’s begin with a baseline definition, and then build off of it as you progress here: a coroutine is a function that can suspend its execution before reaching return.
In the following sections, you will go much further into the specifics of how the conventional generator gets repurposed into a coroutine. For the time being, getting started with some coroutines of your own is the simplest approach to learn how they operate.
Let’s take an immersive approach, and develop some code that handles asynchronous input and output. This little programme is the equivalent of “Hello World” for asynchronous input/output, but it does an excellent job of demonstrating its fundamental concepts.
functionality:
#!/usr/bin/env python3
# countasync.py
import asyncio
async def count():
print("One")
await asyncio.sleep(1)
print("Two")
async def main():
await asyncio.gather(count(), count(), count())
if __name__ == "__main__":
import time
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
As you run this code, make a mental note of anything that seems to be different from how it would appear if the functions were defined using just def and time.sleep ()
:
$ python3 countasync.py
One
One
One
Two
Two
Two
countasync.py executed in 1.01 seconds.
The order of this output is the most important aspect of asynchronous input/output. A single event loop, also known as a coordinator, is responsible for communicating with each of the calls to count(). When each task reaches the await asyncio.sleep(1) method, the function screams up to the event loop and passes control back to it, stating, “I’m going to be sleeping for 1 second. I’ll see you in 1 second.” Proceed, and in the meanwhile, let’s see if we can’t do anything else worthwhile.
This may be contrasted with the synchronous
version:
#!/usr/bin/env python3
# countsync.py
import time
def count():
print("One")
time.sleep(1)
print("Two")
def main():
for _ in range(3):
count()
if __name__ == "__main__":
s = time.perf_counter()
main()
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
When put into action, there is a subtle but significant shift in the sequence in which things are carried out.
time:
$ python3 countsync.py
One
Two
One
Two
One
Two
countsync.py executed in 3.01 seconds.
While the usage of time.sleep() and asyncio.sleep() may seem to be mundane, these functions are really utilised as stand-ins for any time-consuming activities that need a period of waiting. (A sleep() call that accomplishes absolutely nothing is the most boring thing you can wait for.) This means that time.sleep() may stand in for any time-consuming blocking function call, but asyncio.sleep() is used to represent a non-blocking call (but one that also takes some time to complete).
As you’ll see in the following section, one of the advantages of awaiting something, including using the asyncio.sleep() function, is that it allows the surrounding function to momentarily cede control to another function that is more readily able to do something immediately. This is something that you can see for yourself in the following section. On the other hand, calls to time.sleep() or any other blocking function are incompatible with asynchronous Python programming since they bring a complete halt to the execution of the programme for the length of the sleep period.
time.
The Rules of Async IO
At this point, it might be appropriate to provide a more formal explanation of async and await, as well as the coroutine functions that are created when these keywords are combined. This part is quite difficult to understand, but understanding async and await is essential, so refer back to it if you find that you need to.
to:
-
The syntax
async def
introduces either a
native coroutine
or an
asynchronous generator
. The expressions
async with
and
async for
are also valid, and you’ll see them later on. -
The keyword
await
passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an
await f()
expression in the scope of
g()
, this is how
await
tells the event loop, “Suspend execution of
g()
until whatever I’m waiting on—the result of
f()
—is returned. In the meantime, go let something else run.”
This second point’s representation in the code is essentially equivalent to
this:
async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r
There is also a stringent set of constraints that dictate when and how you may use async / await as well as when and how you cannot. Whether you are just beginning to learn the syntax or have a lot of experience working with async and await, you may find them helpful.
:
-
A function that you introduce with
async def
is a coroutine. It may use
await
,
return
, or
yield
, but all of these are optional. Declaring
async def noop(): pass
is valid:-
Using
await
and/or
return
creates a coroutine function. To call a coroutine function, you must
await
it to get its results. -
It is less common (and only recently legal in Python) to use
yield
in an
async def
block. This creates an
asynchronous generator
, which you iterate over with
async for
. Forget about async generators for the time being and focus on getting down the syntax for coroutine functions, which use
await
and/or
return
. -
Anything defined with
async def
may not use
yield from
, which will raise a
SyntaxError
.
-
-
Just like it’s a
SyntaxError
to use
yield
outside of a
def
function, it is a
SyntaxError
to use
await
outside of an
async def
coroutine. You can only use
await
in the body of coroutines.
The following are a few brief examples that are aimed to encapsulate the previous few.
rules:
async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return y
async def g(x):
yield x # OK - this is an async generator
async def m(x):
yield from gen(x) # No - SyntaxError
def m(x):
y = await z(x) # Still no - SyntaxError (no `async def` here)
return y
In conclusion, in order to utilise the await operator on an object, f() must first be an object that has the awaitable property. To be honest, it doesn’t really help, does it? Just keep in mind that an awaitable object is either (1) another coroutine or (2) an object that defines an. await__() dunder function that returns an iterator. This is all you need to know for the time being. If you’re building a programme, you should only have to be concerned about case number one for the vast majority of the objectives the programme will serve.
That leads us to one more technical difference that could come up in your work: an earlier method of designating a function as a coroutine is to decorate a standard def function with the notation @asyncio.coroutine. This is a technique that is no longer used. The final product is a coroutine that is based on a generator. Because Python 3.5 introduced the async / await syntax, this particular construction has become obsolete and should no longer be used.
The first coroutine is generator-based, whereas the second is a native coroutine. Nonetheless, these two coroutines are functionally equal since both awaitable.
:
import asyncio
@asyncio.coroutine
def py34_coro():
"""Generator-based coroutine, older syntax"""
yield from stuff()
async def py35_coro():
"""Native coroutine, modern syntax"""
await stuff()
If you are going to write any code yourself, native coroutines are the way to go since they are explicit whereas implicit ones are not. In Python 3.10, generator-based coroutines won’t be supported anymore.
This section of the course will wrap up with a brief discussion of generator-based coroutines, which will serve mostly as an explanatory aid. Coroutines are a separate feature of Python that can readily be identified from a standard generator function, which is why async and await were added. This was done in order to reduce ambiguity and make the use of coroutines more straightforward.
Try your best to avoid getting caught up in generator-based coroutines, which have been rendered purposefully obsolete by async and await. They have their own tiny set of restrictions, which become essentially irrelevant if you adhere to the async / await syntax. For example, await cannot be used in a generator-based coroutine.
Let’s go right into a couple instances that are a little more complicated without further ado.
Given a coroutine makerandom() that keeps producing random integers in the range [0, 10], until one of them exceeds a threshold, you want to let multiple calls of this coroutine not need to wait for each other’s completion in succession. One example of how async IO reduces the amount of time spent waiting is as follows: given a coroutine makerandom() that keeps producing random integers in the range [0, 10], until one of them exceeds a threshold You may, to a significant extent, mimic the patterns from the two programmes shown before, with some
changes:
#!/usr/bin/env python3
# rand.py
import asyncio
import random
# ANSI colors
c = (
"\033[0m", # End of color
"\033[36m", # Cyan
"\033[91m", # Red
"\033[35m", # Magenta
)
async def makerandom(idx: int, threshold: int = 6) -> int:
print(c[idx + 1] + f"Initiated makerandom({idx}).")
i = random.randint(0, 10)
while i <= threshold:
print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
await asyncio.sleep(idx + 1)
i = random.randint(0, 10)
print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
return i
async def main():
res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
return res
if __name__ == "__main__":
random.seed(444)
r1, r2, r3 = asyncio.run(main())
print()
print(f"r1: {r1}, r2: {r2}, r3: {r3}")
The output, which is colourized, tells a lot more than I can and provides you an idea of how this script is executed:
the execution of rand.py
This software makes use of a single primary coroutine, referred to as makerandom(), and executes it simultaneously over three distinct inputs. The majority of applications will have many smaller coroutines that are modular and will also have one wrapper function that will help to link all of the individual smaller coroutines together. After that, the function main() is used to collect tasks, also known as futures, by mapping the core coroutine over an iterable or pool.
In this simplified example, the pool is denoted by the number range(3). In a subsequent, more comprehensive example, it is a group of URLs that need to be simultaneously fetched, parsed, and processed, and main() encapsulates the whole of that procedure for each URL.
While “generating random numbers” is probably not the best pick as a candidate for asyncio since it is CPU-bound more than anything else, the inclusion of asyncio.sleep() in the example is what is supposed to simulate an IO-bound operation because it involves an unknown amount of wait time. For instance, the call to asyncio.sleep() may stand for the sending and receiving of not-so-random numbers between two clients in the context of a message.
application.
Async IO Design Patterns
Async IO comes with its own unique set of potential script designs, all of which will be explained in detail in the following sections.
section.
Chaining Coroutines
One of the most important characteristics of coroutines is their ability to be connected to one another in a chain. (Remember that a coroutine object is awaitable, which means that another coroutine may await it.) This enables you to split programmes down into more manageable and recyclable components that are lower in size.
coroutines:
#!/usr/bin/env python3
# chained.py
import asyncio
import random
import time
async def part1(n: int) -> str:
i = random.randint(0, 10)
print(f"part1({n}) sleeping for {i} seconds.")
await asyncio.sleep(i)
result = f"result{n}-1"
print(f"Returning part1({n}) == {result}.")
return result
async def part2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(f"part2{n, arg} sleeping for {i} seconds.")
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(f"Returning part2{n, arg} == {result}.")
return result
async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await part1(n)
p2 = await part2(n, p1)
end = time.perf_counter() - start
print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")
async def main(*args):
await asyncio.gather(*(chain(n) for n in args))
if __name__ == "__main__":
import sys
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start = time.perf_counter()
asyncio.run(main(*args))
end = time.perf_counter() - start
print(f"Program finished in {end:0.2f} seconds.")
Take close attention to the output, which shows that part1() waits for an arbitrary period of time before entering sleep mode, and part2() then starts working with the results as they become available.
available:
$ python3 chained.py 9 6 3
part1(9) sleeping for 4 seconds.
part1(6) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(9) == result9-1.
part2(9, 'result9-1') sleeping for 7 seconds.
Returning part1(6) == result6-1.
part2(6, 'result6-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
-->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
-->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
Program finished in 11.01 seconds.
Under this configuration, the amount of time that the main() function is allowed to execute will be equal to the total amount of time that the tasks that it brings together are allowed to execute.
schedules.
Using a Queue
The queue classes included in the asyncio package are meant to function in a manner that is analogous to that of the queue classes included in the queue module. With the examples that we’ve gone through so far, a queue structure hasn’t actually been required at all. Each job (future) in chained.py is made up of a collection of coroutines that explicitly await one another and go through a single input per chain.
There is another structure that can also operate with asynchronous input and output called the queue structure. With this structure, a number of producers who are not connected to one another add things to the queue. Each producer has the ability to add several things to the queue at times that are arbitrary, staggered, and unscheduled. A number of customers take products from the line as soon as they become available, acting avariciously and without waiting for any other indication.
With this configuration, there is no specific customer that is linked to a producer in any way. In advance, the consumers do not have access to information on the total number of producers or the accumulated quantity of products that will be added to the queue.
It takes a different length of time for each individual producer or consumer to add things to the queue or remove items from the queue, accordingly. The queue acts as a throughput that may interact with the producers and consumers while preventing them from directly conversing with one another. Note: Although though the thread-safety of queue.Queue() is one of the reasons queues are often used in threaded systems, you shouldn’t have to worry about thread safety when it comes to asynchronous input/output operations. (The only exception to this rule is when you are merging the two, but we won’t be covering that in this guide.)
One use-case for queues is for the queue to function as a transmitter for producers and consumers who aren’t otherwise directly linked or related with each other. This is the case here. Another use-case for queues is for queues to store information.
The synchronous implementation of this programme would provide a quite depressing picture: a collection of blocked producers would add items to the queue in a sequential fashion, one producer at a time. The queue won’t be processed until all of the producers have finished, at which point it will be done so one consumer at a time, item by item. This design has a significant amount of lag built into it. It is possible for items to be left idle in the queue rather than picked up and processed straight away.
The file named asyncq.py is the asynchronous version of this programme. The most difficult aspect of this process is the need that there be some kind of indication for the customers that the manufacturing is complete. If this does not happen, the await q.get() function will get stuck forever since the queue will have been completely processed, but the consumers will be unaware that production has been completed.
(Many thanks go out to a user who posted on StackOverflow for their assistance in getting main() in order; the trick is to first await the q.join() function, which blocks until all of the items in the queue have been received and processed, and then to cancel the consumer tasks, which would otherwise become stuck and wait indefinitely for additional queue items to appear.)
Here is the whole answer:
script:
#!/usr/bin/env python3
# asyncq.py
import asyncio
import itertools as it
import os
import random
import time
async def makeitem(size: int = 5) -> str:
return os.urandom(size).hex()
async def randsleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await randsleep(caller=f"Producer {name}")
i = await makeitem()
t = time.perf_counter()
await q.put((i, t))
print(f"Producer {name} added <{i}> to queue.")
async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await randsleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
print(f"Consumer {name} got element <{i}>"
f" in {now-t:0.5f} seconds.")
q.task_done()
async def main(nprod: int, ncon: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()
if __name__ == "__main__":
import argparse
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")
The first few coroutines are assistance functions that, when called, return a random string, a performance counter that measures in fractional seconds, and a random number. The number of items that a producer adds to the queue might range anywhere from one to five. Each item is a tuple that has the elements I t), where I is a string that is chosen at random and t is the time that the producer will try to add the tuple to the queue.
When a customer takes an item out of the queue, the system just uses the timestamp that the item was first added with to compute the amount of time that has passed since the item was placed there.
Keep in mind that the purpose of the asyncio.sleep() method is to simulate the behaviour of another, more sophisticated coroutine that, if it were a typical blocking function, would gobble up time and prevent any other execution from occurring.
A trial run with two producers and five actors is shown here.
consumers:
$ python3 asyncq.py -p 2 -c 5
Producer 0 sleeping for 3 seconds.
Producer 1 sleeping for 3 seconds.
Consumer 0 sleeping for 4 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 sleeping for 3 seconds.
Consumer 3 sleeping for 5 seconds.
Consumer 4 sleeping for 4 seconds.
Producer 0 added <377b1e8f82> to queue.
Producer 0 sleeping for 5 seconds.
Producer 1 added <413b8802f8> to queue.
Consumer 1 got element <377b1e8f82> in 0.00013 seconds.
Consumer 1 sleeping for 3 seconds.
Consumer 2 got element <413b8802f8> in 0.00009 seconds.
Consumer 2 sleeping for 4 seconds.
Producer 0 added <06c055b3ab> to queue.
Producer 0 sleeping for 1 seconds.
Consumer 0 got element <06c055b3ab> in 0.00021 seconds.
Consumer 0 sleeping for 4 seconds.
Producer 0 added <17a8613276> to queue.
Consumer 4 got element <17a8613276> in 0.00022 seconds.
Consumer 4 sleeping for 5 seconds.
Program completed in 9.00954 seconds.
In this instance, the items are processed in a matter of seconds or fractions of seconds. There are two possible causes for a delay:
reasons:
- Standard, largely unavoidable overhead
- Situations where all consumers are sleeping when an item appears in the queue
With respect to the second reason, the good news is that expanding to hundreds or thousands of customers is quite common and not at all unusual. You shouldn’t run into any issues when using the command python3 asyncq.py -p 5 -c 100. The important takeaway from this is that, in principle, you could have various users on separate systems directing the management of producers and consumers, with the queue functioning as the central throughput.
You’ve been thrown into the deep end so far by being shown three connected instances of asyncio calling coroutines created with async and await. If you’re not totally following or if you simply want to dive further into the mechanics of how current coroutines came to be in Python, you’ll have to start completely from the beginning with the next section.
section.
Async IO’s Roots in Generators
You were shown an example of the outmoded generator-based coroutines earlier on, and these coroutines have now been rendered obsolete by more explicit native coroutines. It is worthwhile to demonstrate the scenario once again using a
tweak:
import asyncio
@asyncio.coroutine
def py34_coro():
"""Generator-based coroutine"""
# No need to build these yourself, but be aware of what they are
s = yield from stuff()
return s
async def py35_coro():
"""Native coroutine, modern syntax"""
s = await stuff()
return s
async def stuff():
return 0x10, 0x20, 0x30
What happens when you call py34 coro() or py35 coro() on their own, without await or any other calls to asyncio.run() or other asyncio “porcelain” functions? This is an experiment. A coroutine is returned when a single call to a coroutine is made.
object:
>>>
>>> py35_coro()
<coroutine object py35_coro at 0x10126dcc8>
On the surface, this doesn’t seem like it would be particularly intriguing. The creation of an awaitable coroutine object is what happens when a coroutine is called on its own.
Now it’s time for a quiz: think you know what other Python feature looks like this? (Which component of Python, when invoked on its own, does not truly “do much”?
I’m going to assume that when you think of an answer to this question, you’re thinking about generators since, behind the scenes, coroutines are just augmented generators. The behaviour is similar in this regard.
regard:
>>>
>>> def gen():
... yield 0x10, 0x20, 0x30
...
>>> g = gen()
>>> g # Nothing much happens - need to iterate with `.__next__()`
<generator object gen at 0x1012705e8>
>>> next(g)
(16, 32, 48)
Regardless of whether you define coroutines using the more modern syntax of async def or the more traditional syntax of @asyncio.coroutine wrapper, generator functions are the essential building blocks of asynchronous I/O. In a purely technical sense, await may be compared more closely to yield from than it can be to yield. (But, keep in mind that yield from x() is only a syntactic sugar that may be substituted for I in x() by using yield i.)
In the context of asynchronous I/O, the capability of generators to be efficiently stopped and resumed whenever necessary is one of the most important features. You could, for instance, stop iterating over a generator object in the middle of the process so that you may return to iterating over the remaining values at a later time. When a generating function reaches a certain point in its
yield, it gives that value, but then it does nothing further until it is given instructions to yield the next item in the sequence.
This may be developed further with an example.
example:
>>>
>>> from itertools import cycle
>>> def endless():
... """Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever"""
... yield from cycle((9, 8, 7, 6))
>>> e = endless()
>>> total = 0
>>> for i in e:
... if total < 30:
... print(i, end=" ")
... total += i
... else:
... print()
... # Pause execution. We can resume later.
... break
9 8 7 6 9 8 7 6 9 8 7 6 9 8
>>> # Resume
>>> next(e), next(e), next(e)
(6, 9, 8)
A similar behaviour is shown by the await keyword, which marks a break point at which the coroutine suspends its own execution and makes room for the execution of additional coroutines. In this context, “suspended” refers to a coroutine that has momentarily handed over control but has not yet completely left or completed its work. It is important to keep in mind that the yield command, and by extension, the yield from and await commands, denote a pause in the execution of a generator.
This highlights the primary distinction that can be made between functions and generators. There are no grey areas when it comes to functions. After it begins, it will continue until it encounters a return, at which point it will send that value back to the caller (the function that calls it). On the other hand, a generator comes to a stop once it reaches a yield and does not go any further. When you restart it by executing next() on it, it will not only be able to push this value to the calling stack, but it will also be able to retain its grip on its local variables.
Another important aspect of generators is one that is not as well recognised to the general public. In addition, you may feed a value into a generator by using the.send() function that it provides. This enables generators (and coroutines) to call (or await) each other without stopping the execution of their respective tasks. I won’t go any further into the specifics of this feature since it is important only for the implementation of coroutines behind the scenes, but you shouldn’t ever actually need to use it directly yourself. For that reason, I won’t go into any more detail.
If you are interested in learning more, you may begin your journey with PEP 342, which is the point at which coroutines were presented in a more official capacity. Another worthwhile read is Brett Cannon’s article titled “How the Hell Does Async-Await Work in Python,” as well as the PYMOTW post on the topic.
asyncio . Last but not least, there is David Beazley’s Fascinating Course on Coroutines and Concurrency, which provides an in-depth examination of the mechanism that underpins the operation of coroutines.
Let’s attempt to summarise all that’s been said in the previous articles in just a few sentences: there is a method that is extremely unorthodox that is used to actually execute these coroutines. When their.send() function is used, an exception object is generated, and their result is included as an attribute of that exception object. There is one more weird detail to all of this, but it is unlikely that knowing it would assist you in making use of this section of the language in practise, so let’s move on for the time being.
In order to bring everything together, I will now go through some of the most important aspects of the subject of coroutines.
generators:
-
Coroutines are
repurposed generators
that take advantage of the peculiarities of generator methods. -
Old generator-based coroutines use
yield from
to wait for a coroutine result. Modern Python syntax in native coroutines simply replaces
yield from
with
await
as the means of waiting on a coroutine result. The
await
is analogous to
yield from
, and it often helps to think of it as such. -
The use of
await
is a signal that marks a break point. It lets a coroutine temporarily suspend execution and permits the program to come back to it later.
Other Features:
In addition to the conventional async and await constructs, Python now provides the ability to use the async for statement to iterate over an asynchronous iterator. As anything is iterated through, an asynchronous iterator should be able to invoke asynchronous code at each step of the process. This is the iterator’s primary function.
An asynchronous generator is something that may be thought of as a logical extension of this notion. Keep in mind that you have the option of using either await, return, or yield when working with native coroutines. The introduction of asynchronous generators in Python 3.6 made it feasible to use yield inside of a coroutine. This was made possible by the Python Enhancement Proposal 525 (PEP 525), which also made it possible to utilise await and yield in the same coroutine function.
body:
>>>
>>> async def mygen(u: int = 10):
... """Yield powers of 2."""
... i = 0
... while i < u:
... yield 2 ** i
... i += 1
... await asyncio.sleep(0.1)
Last but not least, Python’s async for statement makes it possible to do asynchronous comprehension. This, like its close relative synchronous, is mostly syntactic.
sugar:
>>>
>>> async def main():
... # This does *not* introduce concurrent execution
... # It is meant to show syntax only
... g = [i async for i in mygen()]
... f = [j async for j in mygen() if not (j // 3 % 5)]
... return g, f
...
>>> g, f = asyncio.run(main())
>>> g
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
>>> f
[1, 2, 16, 32, 256, 512]
This difference is really important: asynchronous generators and comprehensions do not make the loop concurrent. All that they do is replicate the appearance and behaviour of their synchronous counterparts, but with the added capability of the loop in question being able to cede control to the event loop so that another coroutine may be executed.
In other words, asynchronous iterators and generators are not meant to simultaneously map some function over a sequence or iterator. This limitation applies to both types of iterators. They serve no use other than to provide the enclosing coroutine with an opportunity to delegate control to other activities. If using plain for or with will “break” the nature of await in the coroutine, then the async for and async with statements are required. However, this is the only situation in which they are required. The difference between asynchronicity and concurrency is an important one to keep in mind.
grasp.
The Event Loop and
You may think of an event loop as something similar to a while True loop that monitors coroutines, getting input on what’s idle, and looks around for tasks that can be carried out while the loop is running in the background. When the thing that a sleeping coroutine is waiting for finally becomes accessible, it has the ability to reawaken that sleeping coroutine.
Up until this point, the whole of the event loop’s administration has been implicitly taken care of by a single function.
call:
asyncio.run(main()) # Python 3.7+
asyncio.run() is a new feature that was added to Python in version 3.7. Its job is to get the event loop, execute tasks until they are tagged as complete, and then close the event loop.
Managing the asyncio event loop may also be done in a more convoluted manner using the get event loop() function. This is how the normal pattern appears.
this:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally
... print("World!")
>>> routine = main()
>>> routine
<coroutine object main at 0x1027a6150>
Keep in mind that you need to utilise the asyncio.run() method in order to really force execution. This method does this by putting the main() coroutine, which is a future object, on the execution schedule for the event.
loop:
>>>
>>> asyncio.run(routine)
Hello ...
World!
(Additional coroutines may be performed using await. It is common practise to wrap simply main() in asyncio.run(), and chained coroutines that use await will be called from there.) #2: By default, an async IO event loop operates in a single thread and on a single CPU core. In most cases, using just one single-threaded event loop in a single core of the CPU is more than enough. Event loops may potentially be executed simultaneously across many cores, if desired. You may learn more by watching this lecture given by John Reese, but you should be aware that your laptop might catch fire at any moment. #3. Event loops are pluggable. That is, if you really wanted to, you could design your own event loop implementation and have it do tasks in exactly the same way as the one that was provided. This is brilliantly shown in the uvloop package, which is an implementation of the event loop in the Cython programming language.
This is what is meant by the phrase “pluggable event loop”: you are free to use any functioning implementation of an event loop, regardless of how the coroutines themselves are structured. The asyncio package itself provides with two distinct event loop implementations, with the selectors module serving as the foundation for the one that is considered the default. (The second implementation is developed for the Windows platform.
only.)
A Full Program: Asynchronous Requests
Now that you’ve made it this far, it’s time for the exciting and uncomplicated portion of the process. You will construct a web-scraping URL collector called areq.py in this part by using aiohttp, which is a lightning-quick async HTTP client/server framework. A tool like this might be used to map relationships between a group of sites, with the linkages creating a directed graph. (All that is needed is the client component.) Note: You may be curious about the reason why the requests package in Python isn’t compatible with async IO. requests was developed on top of urllib3, which relies on Python’s http and socket modules for its functionality.
By default, socket operations are blocking. This indicates that Python will not appreciate the await requests.get(url) statement because the.get() method cannot be used in an await statement. In contrast, almost everything that can be done with aiohttp is done via an awaitable coroutine. Examples of this include session.request() and response.text(). You are doing yourself a disservice by using requests inside asynchronous programming, despite the fact that the package is otherwise fantastic.
What the high-level structure of the programme will look like
this:
-
Read a sequence of URLs from a local file,
urls.txt
. -
Send GET requests for the URLs and decode the resulting content. If this fails, stop there for a URL.
-
Search for the URLs within
href
tags in the HTML of the responses. -
Write the results to
foundurls.txt
. -
Do all of the above as asynchronously and concurrently as possible. (Use
aiohttp
for the requests, and
aiofiles
for the file-appends. These are two primary examples of IO that are well-suited for the async IO model.)
The contents of the urls.txt file are included below. It is not very large and consists mostly of heavily travelled areas.
sites:
$ cat urls.txt
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt
You should get a 404 error message from the second Site in the list, which you will need to handle in a courteous manner. If you are using an enhanced version of this application, you will most likely have to deal with far more serious issues than this one, such as continuous server disconnections and redirection.
Requests should be made using a single session so that re-use of the session’s internal connection pool may be maximised. This will allow the requests to be processed more quickly.
Let’s have a look at the itinerary in its entirety. We will go methodically through everything.
after:
#!/usr/bin/env python3
# areq.py
"""Asynchronously get links embedded in multiple pages' HMTL."""
import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse
import aiofiles
import aiohttp
from aiohttp import ClientSession
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
level=logging.DEBUG,
datefmt="%H:%M:%S",
stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True
HREF_RE = re.compile(r'href="(.*?)"')
async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
"""GET request wrapper to fetch page HTML.
kwargs are passed to `session.request()`.
"""
resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()
logger.info("Got response [%s] for URL: %s", resp.status, url)
html = await resp.text()
return html
async def parse(url: str, session: ClientSession, **kwargs) -> set:
"""Find HREFs in the HTML of `url`."""
found = set()
try:
html = await fetch_html(url=url, session=session, **kwargs)
except (
aiohttp.ClientError,
aiohttp.http_exceptions.HttpProcessingError,
) as e:
logger.error(
"aiohttp exception for %s [%s]: %s",
url,
getattr(e, "status", None),
getattr(e, "message", None),
)
return found
except Exception as e:
logger.exception(
"Non-aiohttp exception occured: %s", getattr(e, "__dict__", {})
)
return found
else:
for link in HREF_RE.findall(html):
try:
abslink = urllib.parse.urljoin(url, link)
except (urllib.error.URLError, ValueError):
logger.exception("Error parsing URL: %s", link)
pass
else:
found.add(abslink)
logger.info("Found %d links for %s", len(found), url)
return found
async def write_one(file: IO, url: str, **kwargs) -> None:
"""Write the found HREFs from `url` to `file`."""
res = await parse(url=url, **kwargs)
if not res:
return None
async with aiofiles.open(file, "a") as f:
for p in res:
await f.write(f"{url}\t{p}\n")
logger.info("Wrote results for source URL: %s", url)
async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
"""Crawl & write concurrently to `file` for multiple `urls`."""
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
write_one(file=file, url=url, session=session, **kwargs)
)
await asyncio.gather(*tasks)
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
outpath = here.joinpath("foundurls.txt")
with open(outpath, "w") as outfile:
outfile.write("source_url\tparsed_url\n")
asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))
This script is lengthier than the toy programmes we first created, so let’s split it down into its component parts.
The constant HREF RE is a regular expression that is used to extract what it is that we are ultimately looking for, which are href tags included inside the document.
HTML:
>>>
>>> HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')
<re.Match object; span=(15, 45), match='href="https://realpython.com/"'>
A GET request is wrapped inside of the coroutine fetch html(), which then decodes the HTML of the page that is returned as a consequence of the request. It puts in the request, waits for a response, and then raises immediately away if the answer is anything other than 200.
status:
resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()
If everything is in order, the fetch html() function will return the page’s HTML (a str ). It is important to note that this method does not handle exceptions in any way. The correct course of action would be to notify the caller of the error and wait for it to be dealt with.
there:
html = await resp.text()
Since they are awaitable coroutines, session.request() and resp.text() are both waiting to be processed. If it weren’t for async I/O and fetch html(), the request-response cycle would be the portion of the application that took up the most time and was the slowest. However, this function allows the event loop to work on other jobs that are immediately available, such as parsing and writing URLs that have already been fetched.
The next function in the chain of coroutines is called parse(). This function first waits for fetch html() to return the HTML for a given URL, and then it extracts all of the href tags from the HTML of the current page. After doing so, it verifies that each tag is legitimate and then formats the extracted value as an absolute path.
It is true that the second part of the parse() function blocks, but all it does is do a fast regex search and check to see whether the detected links have been converted into absolute paths.
Within the context of this particular scenario, the synchronous code need to be brief and unobtrusive. It is important to keep in mind, however, that any line inside a specific coroutine will block other coroutines unless that line makes use of the keywords yield, await, or return. If the parsing was a more time-consuming operation, you might perhaps think about performing this section in its own process using the loop.run in executor() function.
Next, the function write() accepts a file object and a single URL as its arguments. It then waits for the parse() function to return a set of the parsed URLs before writing each one to the file asynchronously along with its source URL. This is accomplished with the help of the aiofiles package, which is a library for asynchronous file I/O.
In conclusion, bulk crawl and write() is the primary point of entry into the sequence of coroutines that the script implements. It makes use of a single session, and a task is generated for each URL, which is then read from the urls.txt file.
The following are some extra considerations that demand your attention:
mention:
-
The default
ClientSession
has an
adapter
with a maximum of 100 open connections. To change that, pass an instance of
asyncio.connector.TCPConnector
to
ClientSession
. You can also specify limits on a per-host basis. -
You can specify max
timeouts
for both the session as a whole and for individual requests. -
This script also uses
async with
, which works with an
asynchronous context manager
. I haven’t devoted a whole section to this concept because the transition from synchronous to asynchronous context managers is fairly straightforward. The latter has to define
.__aenter__()
and
.__aexit__()
rather than
.__exit__()
and
.__enter__()
. As you might expect,
async with
can only be used inside a coroutine function declared with
async def
.
You can find comments and docstrings connected to the companion files for this lesson on GitHub. These files may be found if you would want to investigate a little bit more.
In less than a minute, areq.py retrieves, parses, and stores the results for nine different URLs. Here you can see the execution in all its splendour.
second:
$ python3 areq.py
21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/
21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
21:33:23 INFO:areq: Found 23 links for https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/
That’s not half bad, that’s for sure! You may do a sanity check by counting the number of lines that are printed. In my particular instance, it is 626, but it is important to bear in mind that this may
fluctuate:
$ wc -l foundurls.txt
626 foundurls.txt
$ head -n 3 foundurls.txt
source_url parsed_url
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos