异步(Asynchronous)指的是两个或多个对象或事件不同时存在或发生,也就是说,它们不是同步的。当多个相关的,但是并不依赖于前面发生的事情完成的事情发生时,它们就是异步的。 ——developer.mozilla.org

并发与并行的区别

学习异步前,了解什么是并行(parallel)与并发(concurrent):

  1. 并发(concurrent),多个任务交替执行——同一时间只有一个任务在执行
  2. 并行(parallel),多个任务同时执行——利用CPU多核调用

Python并发实现

在Python里面实现并发有两种方式——多线程与异步。

多线程

线程是系统资源分配的最小单位,也就是CPU调度的最小单位,一个进程可以包含多个线程,并且至少包含一个线程。

Python解释器有GIL互斥锁,限制了任何时候只有一个线程可以执行Python代码。GIL作用是为了保证内存安全,防止多个线程同时修改一个对象,注意,GIL是CPython解释器的实现,不是Python语言的特性,比如PyPy、Jython没有实现GIL。

通过简单的例子来理解GIL影响:

import threading
import time

def cpu_bound_task(n):
    while n > 0:
        n -= 1

# 单线程执行
def single_thread():
    start = time.time()
    cpu_bound_task(100000000)
    cpu_bound_task(100000000)
    end = time.time()
    print(f"单线程耗时:{end - start:.2f}秒")

# 多线程执行
def multi_thread():
    start = time.time()
    t1 = threading.Thread(target=cpu_bound_task, args=(100000000,))
    t2 = threading.Thread(target=cpu_bound_task, args=(100000000,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    end = time.time()
    print(f"多线程耗时:{end - start:.2f}秒")

single_thread()
multi_thread()

"""
单线程耗时:2.02秒
多线程耗时:2.02秒
"""

在Python里面多进程才能充分利用多核计算,在需要CPU计算方面,需要使用多进程执行并行计算。

在I/O操作方面,多线程依然有效,因为Python会在线程(等待I/O、time.sleep、字节码执行一定数量时)自动释放GIL锁,其他线程可以执行CPU计算任务。线程会自动竞争GIL锁来执行。

import threading
import time
import logging
  
# 配置日志
logging.basicConfig(
    level=logging.DEBUG, format="%(asctime)s - %(threadName)s - %(message)s"
)

def io_task():
    logging.debug("开始 I/O 任务")
    
    # 模拟 I/O 操作(比如读文件)
    logging.debug("开始 I/O 操作")
    time.sleep(2)  # 模拟 I/O 等待
    logging.debug("I/O 操作完成")
    logging.debug("任务结束")

def cpu_task():
    logging.debug("开始 CPU 任务")
    
    # 模拟 CPU 计算
    for i in range(3):
        logging.debug(f"CPU 计算中... {i}")
        # 短暂睡眠以便观察线程切换
        time.sleep(0.5)
    logging.debug("任务结束")

if __name__ == "__main__":
    # 创建线程
    t1 = threading.Thread(target=io_task, name="IOThread")
    t2 = threading.Thread(target=cpu_task, name="CPUThread")
    # 启动线程
    t1.start()
    t2.start()
    # 等待线程结束
    t1.join()
    t2.join()
    
"""
2024-12-10 11:11:28,761 - IOThread - 开始 I/O 任务
2024-12-10 11:11:28,761 - IOThread - 开始 I/O 操作
2024-12-10 11:11:28,761 - CPUThread - 开始 CPU 任务
2024-12-10 11:11:28,762 - CPUThread - CPU 计算中... 0
2024-12-10 11:11:28,762 - CPUThread - CPU 计算中... 1
2024-12-10 11:11:28,762 - CPUThread - CPU 计算中... 2
2024-12-10 11:11:28,762 - CPUThread - 任务结束
2024-12-10 11:11:30,761 - IOThread - I/O 操作完成
2024-12-10 11:11:30,762 - IOThread - 任务结束
"""

异步

因为GIL锁的存在,多线程是CPython的一种并发实现,异步也是Python的一种并发实现。

异步(asyncio)编程用于处理并发(concurrent)。核心思想在于当程序需要等待某些操作完成时(I/O操作),不会阻塞整个程序的执行,等待时会切换到别的任务去执行。

异步以协程(Coroutine)为单位,相比于线程区别在于,线程由操作系统控制切换,协程由程序自身控制切换(await),占用资源更少。

import asyncio
import time

async def task_1():
    print(f"Task 1: 开始 - {time.strftime('%H:%M:%S')}")
    print("Task 1: 即将等待2秒")
    await asyncio.sleep(2)  # 切换点:Task 1 暂停,控制权交给其他任务
    print(f"Task 1: 结束 - {time.strftime('%H:%M:%S')}")

async def task_2():
    print(f"Task 2: 开始 - {time.strftime('%H:%M:%S')}")
    print("Task 2: 即将等待1秒")
    await asyncio.sleep(1)  # 切换点:Task 2 暂停,控制权交给其他任务
    print(f"Task 2: 结束 - {time.strftime('%H:%M:%S')}")

async def main():
    print("开始执行主任务")
    # 同时运行两个任务
    await asyncio.gather(task_1(), task_2())
    print("主任务执行完成")

asyncio.run(main())

"""
开始执行主任务
Task 1: 开始 - 11:35:09
Task 1: 即将等待2秒
Task 2: 开始 - 11:35:09
Task 2: 即将等待1秒
Task 2: 结束 - 11:35:10
Task 1: 结束 - 11:35:11
主任务执行完成
"""

使用await等待调用,等待的操作没有立即完成时,控制器转移到事件循环,寻找其他可以执行的协程。 协程切换是自动的,只在await时切换。

异步编程基于协程的单线程模型,如果在协程中执行CPU密集型等各种同步任务时,阻塞整个事件循环,其他协程无法执行,失去了异步的意义,异步编程时需要注意避免的阻塞操作:

1.文件操作阻塞:

#协程里面使用同步读取,不推荐
async def read_file():
    with open('large_file.txt', 'r') as f:  # 这是阻塞操作
        content = f.read()
    return content

#异步读取,推荐
import aiofiles
async def read_file():
    async with aiofiles.open('large_file.txt', 'r') as f:
        content = await f.read()
    return content

2.网络请求阻塞

#造成阻塞,不推荐
import requests

async def fetch_data():
    response = requests.get('https://api.example.com')  # 同步请求会阻塞
    return response.json()

#异步请求,推荐
import aiohttp

async def fetch_data():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com') as response:
            return await response.json()

3.数据库操作阻塞

#同步连接,不推荐
import psycopg2

async def get_user():
    conn = psycopg2.connect(...)  # 同步连接会阻塞
    cur = conn.cursor()
    cur.execute("SELECT * FROM users")
    return cur.fetchall()

#异步连接,推荐
import asyncpg

async def get_user():
    conn = await asyncpg.connect(...)
    result = await conn.fetch("SELECT * FROM users")
    await conn.close()
    return result

4.时间等待阻塞

#同步睡眠,不推荐
import time

async def wait_operation():
    time.sleep(5)  # 同步睡眠会阻塞整个事件循环

#异步睡眠,推荐
import asyncio

async def wait_operation():
    await asyncio.sleep(5)  # 异步睡眠不会阻塞事件循环

5.CPU密集型操作

#CPU密集操作会阻塞事件循环,不推荐
async def heavy_calculation():
    result = 0
    for i in range(10**8):  # CPU密集型操作会阻塞事件循环
        result += i
    return result

#CPU密集操作分离出来,推荐
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_task():
    result = 0
    for i in range(10**8):
        result += i
    return result
    
#异步函数使用进程池执行CPU密集任务
async def heavy_calculation():
    # 获取当前运行的事件循环
    loop = asyncio.get_running_loop()
    # 创建进程池并执行任务
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_task)
    return result

最佳实践

这种方式既保持了异步编程的非阻塞特性,又充分利用了多核CPU的计算能力,是处理CPU密集型任务的最佳实践。

import asyncio
from concurrent.futures import ProcessPoolExecutor
import time

def cpu_bound_task(n):
    result = 0
    for i in range(n):
        result += i
    return result

async def process_data():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        # 并行处理多个计算任务
        tasks = [
            loop.run_in_executor(pool, cpu_bound_task, 10**8),
            loop.run_in_executor(pool, cpu_bound_task, 10**7),
            loop.run_in_executor(pool, cpu_bound_task, 10**6),
        ]
        results = await asyncio.gather(*tasks)
    return results

# 运行示例
async def main():
    start = time.time()
    results = await process_data()
    end = time.time()
    print(f"Total time: {end - start:.2f} seconds")
    print(f"Results: {results}")
    
asyncio.run(main())