异步(Asynchronous)指的是两个或多个对象或事件不同时存在或发生,也就是说,它们不是同步的。当多个相关的,但是并不依赖于前面发生的事情完成的事情发生时,它们就是异步的。 ——developer.mozilla.org
并发与并行的区别
学习异步前,了解什么是并行(parallel)与并发(concurrent):
- 并发(concurrent),多个任务交替执行——同一时间只有一个任务在执行
- 并行(parallel),多个任务同时执行——利用CPU多核调用
Python并发实现
在Python里面实现并发有两种方式——多线程与异步。
多线程
线程是系统资源分配的最小单位,也就是CPU调度的最小单位,一个进程可以包含多个线程,并且至少包含一个线程。
Python解释器有GIL互斥锁,限制了任何时候只有一个线程可以执行Python代码。GIL作用是为了保证内存安全,防止多个线程同时修改一个对象,注意,GIL是CPython解释器的实现,不是Python语言的特性,比如PyPy、Jython没有实现GIL。
通过简单的例子来理解GIL影响:
import threading
import time
def cpu_bound_task(n):
while n > 0:
n -= 1
# 单线程执行
def single_thread():
start = time.time()
cpu_bound_task(100000000)
cpu_bound_task(100000000)
end = time.time()
print(f"单线程耗时:{end - start:.2f}秒")
# 多线程执行
def multi_thread():
start = time.time()
t1 = threading.Thread(target=cpu_bound_task, args=(100000000,))
t2 = threading.Thread(target=cpu_bound_task, args=(100000000,))
t1.start()
t2.start()
t1.join()
t2.join()
end = time.time()
print(f"多线程耗时:{end - start:.2f}秒")
single_thread()
multi_thread()
"""
单线程耗时:2.02秒
多线程耗时:2.02秒
"""
在Python里面多进程才能充分利用多核计算,在需要CPU计算方面,需要使用多进程执行并行计算。
在I/O操作方面,多线程依然有效,因为Python会在线程(等待I/O、time.sleep、字节码执行一定数量时)自动释放GIL锁,其他线程可以执行CPU计算任务。线程会自动竞争GIL锁来执行。
import threading
import time
import logging
# 配置日志
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(threadName)s - %(message)s"
)
def io_task():
logging.debug("开始 I/O 任务")
# 模拟 I/O 操作(比如读文件)
logging.debug("开始 I/O 操作")
time.sleep(2) # 模拟 I/O 等待
logging.debug("I/O 操作完成")
logging.debug("任务结束")
def cpu_task():
logging.debug("开始 CPU 任务")
# 模拟 CPU 计算
for i in range(3):
logging.debug(f"CPU 计算中... {i}")
# 短暂睡眠以便观察线程切换
time.sleep(0.5)
logging.debug("任务结束")
if __name__ == "__main__":
# 创建线程
t1 = threading.Thread(target=io_task, name="IOThread")
t2 = threading.Thread(target=cpu_task, name="CPUThread")
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
"""
2024-12-10 11:11:28,761 - IOThread - 开始 I/O 任务
2024-12-10 11:11:28,761 - IOThread - 开始 I/O 操作
2024-12-10 11:11:28,761 - CPUThread - 开始 CPU 任务
2024-12-10 11:11:28,762 - CPUThread - CPU 计算中... 0
2024-12-10 11:11:28,762 - CPUThread - CPU 计算中... 1
2024-12-10 11:11:28,762 - CPUThread - CPU 计算中... 2
2024-12-10 11:11:28,762 - CPUThread - 任务结束
2024-12-10 11:11:30,761 - IOThread - I/O 操作完成
2024-12-10 11:11:30,762 - IOThread - 任务结束
"""
异步
因为GIL锁的存在,多线程是CPython的一种并发实现,异步也是Python的一种并发实现。
异步(asyncio)编程用于处理并发(concurrent)。核心思想在于当程序需要等待某些操作完成时(I/O操作),不会阻塞整个程序的执行,等待时会切换到别的任务去执行。
异步以协程(Coroutine)为单位,相比于线程区别在于,线程由操作系统控制切换,协程由程序自身控制切换(await),占用资源更少。
import asyncio
import time
async def task_1():
print(f"Task 1: 开始 - {time.strftime('%H:%M:%S')}")
print("Task 1: 即将等待2秒")
await asyncio.sleep(2) # 切换点:Task 1 暂停,控制权交给其他任务
print(f"Task 1: 结束 - {time.strftime('%H:%M:%S')}")
async def task_2():
print(f"Task 2: 开始 - {time.strftime('%H:%M:%S')}")
print("Task 2: 即将等待1秒")
await asyncio.sleep(1) # 切换点:Task 2 暂停,控制权交给其他任务
print(f"Task 2: 结束 - {time.strftime('%H:%M:%S')}")
async def main():
print("开始执行主任务")
# 同时运行两个任务
await asyncio.gather(task_1(), task_2())
print("主任务执行完成")
asyncio.run(main())
"""
开始执行主任务
Task 1: 开始 - 11:35:09
Task 1: 即将等待2秒
Task 2: 开始 - 11:35:09
Task 2: 即将等待1秒
Task 2: 结束 - 11:35:10
Task 1: 结束 - 11:35:11
主任务执行完成
"""
使用await等待调用,等待的操作没有立即完成时,控制器转移到事件循环,寻找其他可以执行的协程。 协程切换是自动的,只在await时切换。
异步编程基于协程的单线程模型,如果在协程中执行CPU密集型等各种同步任务时,阻塞整个事件循环,其他协程无法执行,失去了异步的意义,异步编程时需要注意避免的阻塞操作:
1.文件操作阻塞:
#协程里面使用同步读取,不推荐
async def read_file():
with open('large_file.txt', 'r') as f: # 这是阻塞操作
content = f.read()
return content
#异步读取,推荐
import aiofiles
async def read_file():
async with aiofiles.open('large_file.txt', 'r') as f:
content = await f.read()
return content
2.网络请求阻塞
#造成阻塞,不推荐
import requests
async def fetch_data():
response = requests.get('https://api.example.com') # 同步请求会阻塞
return response.json()
#异步请求,推荐
import aiohttp
async def fetch_data():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.json()
3.数据库操作阻塞
#同步连接,不推荐
import psycopg2
async def get_user():
conn = psycopg2.connect(...) # 同步连接会阻塞
cur = conn.cursor()
cur.execute("SELECT * FROM users")
return cur.fetchall()
#异步连接,推荐
import asyncpg
async def get_user():
conn = await asyncpg.connect(...)
result = await conn.fetch("SELECT * FROM users")
await conn.close()
return result
4.时间等待阻塞
#同步睡眠,不推荐
import time
async def wait_operation():
time.sleep(5) # 同步睡眠会阻塞整个事件循环
#异步睡眠,推荐
import asyncio
async def wait_operation():
await asyncio.sleep(5) # 异步睡眠不会阻塞事件循环
5.CPU密集型操作
#CPU密集操作会阻塞事件循环,不推荐
async def heavy_calculation():
result = 0
for i in range(10**8): # CPU密集型操作会阻塞事件循环
result += i
return result
#CPU密集操作分离出来,推荐
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_task():
result = 0
for i in range(10**8):
result += i
return result
#异步函数使用进程池执行CPU密集任务
async def heavy_calculation():
# 获取当前运行的事件循环
loop = asyncio.get_running_loop()
# 创建进程池并执行任务
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_task)
return result
最佳实践
这种方式既保持了异步编程的非阻塞特性,又充分利用了多核CPU的计算能力,是处理CPU密集型任务的最佳实践。
import asyncio
from concurrent.futures import ProcessPoolExecutor
import time
def cpu_bound_task(n):
result = 0
for i in range(n):
result += i
return result
async def process_data():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
# 并行处理多个计算任务
tasks = [
loop.run_in_executor(pool, cpu_bound_task, 10**8),
loop.run_in_executor(pool, cpu_bound_task, 10**7),
loop.run_in_executor(pool, cpu_bound_task, 10**6),
]
results = await asyncio.gather(*tasks)
return results
# 运行示例
async def main():
start = time.time()
results = await process_data()
end = time.time()
print(f"Total time: {end - start:.2f} seconds")
print(f"Results: {results}")
asyncio.run(main())