网站首页 > 技术文章 正文
Python是一门非常强大和灵活的编程语言,它支持多种编程范式,如面向对象、函数式、过程式等。Python也支持异步并发编程,这是一种同时处理多个任务的编程方式,可以提高程序的性能和效率,尤其是在面对IO密集型或网络密集型的任务时。
本文将介绍Python中的异步并发编程的基本概念、原理、模块、工具、设计模式、最佳实践、性能测试和评估等方面,帮助你快速掌握Python中的异步并发编程。本文还将给出一些使用Python中的异步并发编程的代码例子,让你更好地理解和应用Python中的异步并发编程。
什么是异步并发编程
异步并发编程是一种编程范式,用于同时处理多个任务,而不需要等待每个任务完成。异步并发编程可以提高程序的响应性和吞吐量,尤其是在面对IO密集型或网络密集型的任务时。
异步并发编程与同步编程、多线程、多进程的区别和联系:
- 同步编程是一种顺序执行任务的方式,每个任务必须等待前一个任务完成才能开始。
- 多线程是一种在同一个进程中创建多个执行流的方式,每个线程可以执行不同的任务,但共享内存空间。
- 多进程是一种在不同的进程中创建多个执行流的方式,每个进程可以执行不同的任务,但拥有独立的内存空间。
- 异步并发编程是一种在单个或多个执行流中创建多个协作单元(coroutine)的方式,每个协作单元可以执行不同的任务,并在合适的时机切换。
为什么要使用异步并发编程
异步并发编程有以下优势:
- 可以提高程序的响应性,避免阻塞在等待IO或网络操作上,提升用户体验。
- 可以提高程序的吞吐量,利用CPU和IO设备之间的空闲时间,提升资源利用率。
- 可以简化程序的逻辑,使用async/await语法来表示异步操作,提升代码可读性。
- 可以减少程序的开销,使用协作单元代替线程或进程,减少切换和通信的成本。
异步并发编程有以下局限性:
- 不适用于CPU密集型或计算密集型的任务,因为这类任务无法释放CPU资源给其他协作单元。
- 不适用于依赖顺序或同步执行的任务,因为这类任务无法利用异步操作带来的优势。
- 不适用于不支持异步操作或不兼容asyncio模块的库或框架,因为这类库或框架无法与协作单元协作。
Python中如何实现异步并发编程
Python中实现异步并发编程主要有以下模块和工具:
- asyncio模块:这是Python标准库中提供的一个基于事件循环(event loop)和协作单元(coroutine)的异步IO框架。它提供了一系列的API和工具,用于创建、管理、执行和取消协作单元,以及处理网络、文件、进程、信号等异步操作。
- async/await语法:这是Python 3.5中引入的两个新的关键字,用于定义和调用协作单元。async关键字用于声明一个函数或方法是一个协作单元,await关键字用于暂停当前协作单元,并等待另一个协作单元或异步操作的结果。
- aiohttp模块:这是一个基于asyncio模块的异步HTTP客户端和服务器库。它提供了一系列的API和工具,用于创建、发送、接收和处理HTTP请求和响应,以及支持WebSocket、ClientSession、Web Application等高级功能。
- aiofiles模块:这是一个基于asyncio模块的异步文件操作库。它提供了一系列的API和工具,用于在协作单元中打开、读写、关闭文件,以及支持临时文件、os模块等高级功能。
- aiomysql模块:这是一个基于asyncio模块的异步MySQL数据库访问库。它提供了一系列的API和工具,用于在协作单元中连接、查询、插入、更新、删除数据库,以及支持连接池、游标、事务等高级功能。
Python中的异步并发编程有哪些常见的设计模式和最佳实践
Python中的异步并发编程有以下常见的设计模式和最佳实践:
- 使用asyncio.run()函数来运行主协程函数,它会创建一个新的事件循环,并在其上执行主协程函数,直到完成或出错。
- 使用asyncio.create_task()函数或asyncio.gather()函数来创建和执行多个协作单元,并返回一个Task对象或Future对象,用于获取结果或异常。
- 使用async with语句来管理异步上下文管理器(如文件对象、客户端会话对象等),它会自动调用其__aenter__()和__aexit__()方法,进行初始化和清理操作。
- 使用async for语句来处理异步迭代器(如文件对象、响应对象等),它会自动调用其__aiter__()和__anext__()方法,进行迭代操作。
- 使用try/except/finally语句来处理异常和清理资源,它可以捕获协作单元中抛出的异常,并执行必要的清理操作。
- 使用asyncio.sleep()函数来模拟耗时操作或延迟执行,它可以让出CPU资源给其他协作单元,并在指定时间后恢复执行。
- 使用asyncio.wait_for()函数或asyncio.wait()函数来设置超时或等待多个协作单元或异步操作完成,并返回结果或异常。
- 使用asyncio.Queue()类来实现生产者-消费者模式,它可以在多个协作单元之间传递数据,并提供同步机制。
Python中的异步并发编程有哪些性能测试和评估的方法和工具
Python中的异步并发编程有以下性能测试和评估的方法和工具:
- 使用time模块或timeit模块来测量程序或代码片段的运行时间,比较不同方案的效率差异。
- 使用cProfile模块或profile模块来分析程序或代码片段的运行情况,查看各个函数或方法的调用次数、运行时间、消耗资源等信息。
- 使用concurrent.futures模块或multiprocessing模块来实现多线程或多进程版本的程序,比较与异步并发版本的程序的性能优劣。
- 使用requests模块或urllib.request模块来实现同步版本的程序,比较与异步并发版本的程序的性能优劣。
- 使用aiohttp.ClientSession()类来管理HTTP客户端会话,提高网络请求的效率和稳定性。
- 使用aiofiles.tempfile模块来创建临时文件,避免占用磁盘空间和影响文件IO的性能。
- 使用aiomysql.create_pool()函数来创建数据库连接池,避免频繁创建和关闭数据库连接,提高数据库访问的效率和稳定性。
Python中的异步并发编程的代码例子
为了让你更好地理解和应用Python中的异步并发编程,本文将给出一些使用Python中的异步并发编程的代码例子,分别涉及以下几个方面:
- 使用asyncio模块来实现一个简单的协作单元,并使用asyncio.run()函数来运行它。
- 使用asyncio模块和async/await语法来实现一个简单的异步网络请求,并使用asyncio.create_task()函数和asyncio.gather()函数来创建和执行多个协作单元。
- 使用aiohttp模块和async with语句来实现一个简单的异步HTTP客户端,并使用aiohttp.ClientSession()类来管理HTTP客户端会话。
- 使用aiofiles模块和async with语句来实现一个简单的异步文件操作,并使用aiofiles.tempfile模块来创建临时文件。
- 使用aiomysql模块和async with语句来实现一个简单的异步数据库访问,并使用aiomysql.create_pool()函数来创建数据库连接池。
使用asyncio模块来实现一个简单的协作单元
以下是一个使用asyncio模块来实现一个简单的协作单元的代码例子:
# 导入asyncio模块
import asyncio
# 定义一个协作单元函数,用async关键字修饰
async def hello():
# 打印开始信息
print("Hello, world!")
# 模拟耗时操作,使用await关键字暂停当前协作单元,并等待另一个协作单元或异步操作的结果
await asyncio.sleep(1)
# 打印结束信息
print("Hello, again!")
# 定义一个主函数
def main():
# 使用asyncio.run()函数来运行主协程函数,它会创建一个新的事件循环,并在其上执行主协程函数,直到完成或出错
asyncio.run(hello())
# 运行主函数
if __name__ == "__main__":
main()
运行结果如下:
Hello, world!
Hello, again!
可以看到,这个代码例子中,我们定义了一个协作单元函数hello(),它打印两条信息,并在中间暂停一秒。我们使用async关键字修饰这个函数,表示它是一个协作单元。我们在这个函数中使用await关键字暂停当前协作单元,并等待另一个协作单元或异步操作的结果。在这个例子中,我们等待了asyncio.sleep(1)这个异步操作的结果,它会让出CPU资源给其他协作单元,并在一秒后恢复执行。我们在主函数中使用asyncio.run()函数来运行这个协作单元函数,它会创建一个新的事件循环,并在其上执行这个协作单元函数,直到完成或出错。
使用asyncio模块和async/await语法来实现一个简单的异步网络请求
以下是一个使用asyncio模块和async/await语法来实现一个简单的异步网络请求的代码例子:
# 导入asyncio模块
import asyncio
# 导入socket模块
import socket
# 定义一个协作单元函数,用于获取指定域名的IP地址,并打印
# 用async关键字修饰
async def get_ip(domain):
# 打印开始信息
print(f"Getting IP for {domain}...")
# 使用socket模块的gethostbyname()函数来获取IP地址,这是一个阻塞的操作,所以需要使用run_in_executor()函数来在一个线程或进程池中执行它,并返回一个Future对象
loop = asyncio.get_running_loop()
ip = await loop.run_in_executor(None, socket.gethostbyname, domain)
# 打印结束信息
print(f"{domain} IP is {ip}")
# 定义一个主协程函数,用于创建和执行多个协作单元,并等待结果
async def main():
# 定义要查询的域名列表
domains = ["python.org", "realpython.com", "google.com"]
# 定义一个空列表,用于存储任务对象
tasks = []
# 遍历每个域名
for domain in domains:
# 创建一个任务对象,并添加到列表中
task = asyncio.create_task(get_ip(domain))
tasks.append(task)
# 等待所有任务完成,并获取结果
await asyncio.gather(*tasks)
# 运行主协程函数
if __name__ == "__main__":
asyncio.run(main())
运行结果如下:
Getting IP for python.org...
Getting IP for realpython.com...
Getting IP for google.com...
python.org IP is 45.55.99.72
realpython.com IP is 104.21.11.9
google.com IP is 142.250.67.46
可以看到,这个代码例子中,我们定义了一个协作单元函数get_ip(),它用于获取指定域名的IP地址,并打印。我们使用async关键字修饰这个函数,表示它是一个协作单元。我们在这个函数中使用await关键字暂停当前协作单元,并等待另一个协作单元或异步操作的结果。在这个例子中,我们等待了loop.run_in_executor()这个异步操作的结果,它会在一个线程或进程池中执行socket.gethostbyname()这个阻塞的操作,并返回一个Future对象。我们在主协程函数中使用asyncio.create_task()函数或asyncio.gather()函数来创建和执行多个协作单元,并返回一个Task对象或Future对象,用于获取结果或异常。我们在主函数中使用asyncio.run()函数来运行这个主协程函数,它会创建一个新的事件循环,并在其上执行这个主协程函数,直到完成或出错。
使用aiohttp模块来实现一个简单的异步HTTP客户端
以下是一个使用aiohttp模块来实现一个简单的异步HTTP客户端的代码例子:
# 爬取豆瓣电影Top250
import aiohttp
import asyncio
import time
# 定义一个异步函数,用于发送HTTP请求,并解析响应数据
async def fetch(session, url):
async with session.get(url) as response:
# 判断响应状态码是否为200
if response.status == 200:
# 获取响应内容
html = await response.text()
# 解析响应内容,提取电影信息
movies = parse(html)
# 打印电影信息
print(movies)
else:
# 打印错误信息
print(f"Error: {response.status}")
# 定义一个普通函数,用于解析HTML内容,并提取电影信息
def parse(html):
# 使用BeautifulSoup库进行解析
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "lxml")
# 获取电影列表
movie_list = soup.find("ol", class_="grid_view")
# 定义一个空列表,用于存储电影信息
movies = []
# 遍历每个电影项
for movie in movie_list.find_all("li"):
# 获取电影名称、评分、排名、链接等信息
name = movie.find("span", class_="title").text
score = movie.find("span", class_="rating_num").text
rank = movie.find("em").text
link = movie.find("a")["href"]
# 将电影信息组合成一个字典,并添加到列表中
movies.append({
"name": name,
"score": score,
"rank": rank,
"link": link
})
# 返回电影列表
return movies
# 定义一个主协程函数,用于创建客户端会话,并发发送多个请求
async def main():
# 创建一个客户端会话对象
async with aiohttp.ClientSession() as session:
# 定义一个空列表,用于存储任务对象
tasks = []
# 定义起始URL和页数
base_url = "https://movie.douban.com/top250?start="
page_num = 10
# 遍历每一页
for i in range(page_num):
# 拼接完整的URL
url = base_url + str(i * 25)
# 创建一个任务对象,并添加到列表中
task = asyncio.create_task(fetch(session, url))
tasks.append(task)
# 等待所有任务完成,并获取结果
results = await asyncio.gather(*tasks)
# 返回结果
return results
# 运行主协程函数,并记录运行时间
if __name__ == "__main__":
# 获取开始时间
start = time.time()
# 运行主协程函数
asyncio.run(main())
# 获取结束时间
end = time.time()
# 打印运行时间
print(f"Time: {end - start} seconds")
Web服务
# 提供一个简单的Web服务,返回当前时间和IP地址
import aiohttp
import asyncio
import datetime
# 定义一个异步函数,用于处理请求,并返回响应
async def handle(request):
# 获取当前时间和IP地址
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ip = request.remote
# 构造响应内容
text = f"Hello, the current time is {now}, and your IP address is {ip}."
# 返回响应对象
return aiohttp.web.Response(text=text)
# 定义一个主函数,用于创建Web应用和服务器
def main():
# 创建一个Web应用对象
app = aiohttp.web.Application()
# 添加一个路由到应用中,绑定请求处理函数
app.add_routes([aiohttp.web.get("/", handle)])
# 运行Web应用,监听本地8000端口
aiohttp.web.run_app(app, host="127.0.0.1", port=8000)
# 运行主函数
if __name__ == "__main__":
main()
以下是两个使用aiofiles的例子,一个是异步读取文件,另一个是异步写入文件:
异步读取文件
# 异步读取文件
import aiofiles
import asyncio
# 定义一个异步函数,用于读取文件内容,并打印
async def read_file(filename):
# 使用aiofiles打开文件
async with aiofiles.open(filename, mode="r") as f:
# 读取所有内容
content = await f.read()
# 打印内容
print(content)
# 定义一个主协程函数,用于创建多个任务,并等待完成
async def main():
# 定义要读取的文件列表
filenames = ["file1.txt", "file2.txt", "file3.txt"]
# 定义一个空列表,用于存储任务对象
tasks = []
# 遍历每个文件名
for filename in filenames:
# 创建一个任务对象,并添加到列表中
task = asyncio.create_task(read_file(filename))
tasks.append(task)
# 等待所有任务完成,并获取结果
results = await asyncio.gather(*tasks)
# 返回结果
return results
# 运行主协程函数
if __name__ == "__main__":
asyncio.run(main())
异步写入文件
# 异步写入文件
import aiofiles
import asyncio
# 定义一个异步函数,用于写入一些内容到文件中,并返回写入的字节数
async def write_file(filename, content):
# 使用aiofiles打开文件
async with aiofiles.open(filename, mode="w") as f:
# 写入内容
n = await f.write(content)
# 返回写入的字节数
return n
# 定义一个主协程函数,用于创建多个任务,并等待完成
async def main():
# 定义要写入的文件和内容列表
files_and_contents = [
("file1.txt", "Hello, world!"),
("file2.txt", "Python is awesome!"),
("file3.txt", "Async IO is cool!")
]
# 定义一个空列表,用于存储任务对象
tasks = []
# 遍历每个文件和内容
for filename, content in files_and_contents:
# 创建一个任务对象,并添加到列表中
task = asyncio.create_task(write_file(filename, content))
tasks.append(task)
# 等待所有任务完成,并获取结果
results = await asyncio.gather(*tasks)
# 打印结果
print(results)
运行主协程函数
If name== “main”: asyncio.run(main())
以下是两个使用aiomysql的例子,一个是异步查询数据,另一个是异步插入数据:
异步查询数据
# 异步查询数据
import aiomysql
定义一个异步函数,用于查询数据,并打印
async def query_data():
# 使用aiomysql连接数据库
conn = await aiomysql.connect(host=“localhost”, user=“root”, password=“123456”, db=“test”)
# 创建一个游标对象
cur = await conn.cursor()
# 执行一条SQL语句,查询数据
await cur.execute(“SELECT * FROM users”)
# 获取结果集
result = await cur.fetchall()
# 打印结果集
print(result)
# 关闭游标
await cur.close()
#关闭连接
await conn.close()
定义一个主协程函数,用于创建多个任务,并等待完成
async def main():
# 定义要查询的次数
query_num = 3
# 定义一个空列表,用于存储任务对象
tasks = []
#遍历每次查询
for i in range(query_num):
# 创建一个任务对象,并添加到列表中
task = asyncio.create_task(query_data())
tasks.append(task)
# 等待所有任务完成,并获取结果
results = await asyncio.gather(*tasks)
# 返回结果
return results
运行主协程函数
If name == “main”: asyncio.run(main())
异步插入数据
# 异步插入数据
import aiomysql
import asyncio
# 定义一个异步函数,用于插入数据,并返回影响的行数
async def insert_data(name, age):
# 使用aiomysql连接数据库
conn = await aiomysql.connect(host="localhost", user="root", password="123456", db="test")
# 创建一个游标对象
cur = await conn.cursor()
# 执行一条SQL语句,插入数据
n = await cur.execute("INSERT INTO users (name, age) VALUES (%s, %s)", (name, age))
# 提交事务
await conn.commit()
# 关闭游标
await cur.close()
# 关闭连接
await conn.close()
# 返回影响的行数
return n
定义一个主协程函数,用于创建多个任务,并等待完成
async def main():
# 定义要插入的数据列表
data_list = [ (“Alice”, 20), (“Bob”, 21), (“Charlie”, 22) ]
# 定义一个空列表,用于存储任务对象
tasks = []
# 遍历每个数据
for name, age in data_list:
# 创建一个任务对象,并添加到列表中
task = asyncio.create_task(insert_data(name, age)) tasks.append(task)
# 等待所有任务完成,并获取结果
results = await asyncio.gather(*tasks)
# 打印结果
print(results)
运行主协程函数
If name == “main”: asyncio.run(main())
- 上一篇: 新闻个性化推荐系统源代码系列之推荐流程设计
- 下一篇: 函数的基本语法
猜你喜欢
- 2024-11-26 比pgload更快更方便写入大数据量至Greenplum的Spark Connector
- 2024-11-26 使用flask+echarts+html+Ajax实现数据分析可视化看板
- 2024-11-26 Excel之基础 - 常用函数整理来了,现用现查
- 2024-11-26 简单通俗说PageRank
- 2024-11-26 SQLSERVER全文检索(FULL-TEXT)语法
- 2024-11-26 「NLP」文本关键词提取的两种方法-TFIDF和TextRank
- 2024-11-26 职场办公中每天都要使用的6个Excel函数公式
- 2024-11-26 威廉王子在“神秘机构”待了三周...出来后变成了这样?
- 2024-11-26 Day60:用Python解析XML文件(xml.etree.ElementTree)
- 2024-11-26 Python 与 Excel 不得不说的事:这几个常用库你知道几个?
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)