国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

異步編程進階:asyncio、threading 和多進程在實戰中的選擇

開發
在Python并發編程中,沒有絕對的最優方案,只有最適合當前場景的方案。理解GIL的影響、掌握三種方案的特點和適用場景,選擇合適的方案進行組合使用,這才是實戰中的最佳實踐。

Python中的并發編程一直是開發者的難題。asyncio提供了異步編程的原生支持,threading提供了多線程能力,multiprocessing提供了多進程支持。這三種方案各有所長,在不同的場景中發揮不同的作用。

本文將基于真實的應用場景,深入分析這三種并發方案的原理、性能和最佳實踐,幫助你在實戰中做出正確的選擇。

三種并發模型的本質區別

Python的全局解釋器鎖(GIL)是理解三種并發模型的關鍵:

┌─────────────────────────────────────┐
│        Python解釋器                  │
│  ┌──────────────────────────────┐  │
│  │      GIL(全局鎖)              │  │
│  │  一次只允許一個線程執行      │  │
│  └──────────────────────────────┘  │
└─────────────────────────────────────┘

asyncio:單線程,在I/O等待時切換任務 ? 不受GIL影響
threading:多線程,但受GIL限制 ? CPU密集型無法并行
multiprocessing:多進程,每個進程獨立GIL ? CPU密集型可并行

方案一:asyncio - I/O密集型的最優選擇

asyncio的工作原理:

import asyncio

asyncdeffetch_data(url):
    """模擬獲取數據"""
    print(f"開始獲取 {url}")
    await asyncio.sleep(2)  # 模擬I/O操作
    print(f"完成獲取 {url}")
    returnf"Data from {url}"

asyncdefmain():
    # 并發執行多個異步任務
    tasks = [
        fetch_data("http://example.com/1"),
        fetch_data("http://example.com/2"),
        fetch_data("http://example.com/3"),
    ]
    results = await asyncio.gather(*tasks)
    return results

# 運行
results = asyncio.run(main())
# 耗時:約2秒(并發),而不是6秒(順序)

asyncio的高級用法:

import asyncio
from typing import AsyncGenerator

# 異步生成器
asyncdefasync_generator():
    for i in range(5):
        await asyncio.sleep(1)
        yield i

# 異步上下文管理器
classAsyncResource:
    asyncdef__aenter__(self):
        print("獲取資源")
        await asyncio.sleep(1)
        return self
    
    asyncdef__aexit__(self, exc_type, exc_val, exc_tb):
        print("釋放資源")
        await asyncio.sleep(1)

# 使用
asyncdefuse_resource():
    asyncwith AsyncResource() as resource:
        print("使用資源")

asyncio.run(use_resource())

asyncio的性能特性:

import asyncio
import time
import aiohttp

asyncdefbenchmark_asyncio():
    """測試asyncio處理1000個并發請求的性能"""
    start = time.time()
    
    asyncdeffetch(session, url):
        try:
            asyncwith session.get(url, timeout=5) as response:
                returnawait response.text()
        except:
            returnNone
    
    asyncwith aiohttp.ClientSession() as session:
        tasks = [fetch(session, f"http://example.com/{i}") for i in range(1000)]
        results = await asyncio.gather(*tasks)
    
    end = time.time()
    print(f"asyncio耗時:{end - start:.2f}秒,成功請求:{len([r for r in results if r])}")

asyncio.run(benchmark_asyncio())

方案二:threading - 輕量級并發

threading的適用場景:

import threading
import time
from queue import Queue

defworker(queue, worker_id):
    """工作線程"""
    whileTrue:
        task = queue.get()
        if task isNone:
            break
        print(f"Worker {worker_id} 處理任務 {task}")
        time.sleep(1)
        queue.task_done()

defmain_threading():
    queue = Queue()
    num_workers = 4
    
    # 創建并啟動工作線程
    threads = []
    for i in range(num_workers):
        t = threading.Thread(target=worker, args=(queue, i))
        t.start()
        threads.append(t)
    
    # 添加任務
    for i in range(10):
        queue.put(i)
    
    # 等待所有任務完成
    queue.join()
    
    # 停止工作線程
    for _ in range(num_workers):
        queue.put(None)
    for t in threads:
        t.join()

main_threading()

threading的局限性:

import threading
import time

defcpu_intensive():
    """CPU密集型計算"""
    total = 0
    for i in range(100000000):
        total += i
    return total

# 單線程執行
start = time.time()
cpu_intensive()
cpu_intensive()
print(f"單線程耗時:{time.time() - start:.2f}秒")

# 多線程執行(受GIL影響,實際更慢)
start = time.time()
t1 = threading.Thread(target=cpu_intensive)
t2 = threading.Thread(target=cpu_intensive)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"多線程耗時:{time.time() - start:.2f}秒")
# 結果:多線程因GIL競爭反而更慢!

threading的正確用途:

import threading
import time
import requests
from concurrent.futures import ThreadPoolExecutor

deffetch_url(url):
    """獲取URL內容"""
    try:
        response = requests.get(url, timeout=5)
        return len(response.content)
    except:
        return0

defbenchmark_threading():
    urls = ["http://example.com"] * 100
    
    # 使用線程池
    with ThreadPoolExecutor(max_workers=10) as executor:
        sizes = list(executor.map(fetch_url, urls))
    
    print(f"成功獲取 {len([s for s in sizes if s > 0])} 個URL")

benchmark_threading()

方案三:multiprocessing - CPU密集型的利器

multiprocessing基礎:

import multiprocessing
import time

defcpu_intensive(n):
    """CPU密集型計算"""
    total = 0
    for i in range(n):
        total += i ** 2
    return total

defmain_multiprocessing():
    # 創建進程池
    with multiprocessing.Pool(processes=4) as pool:
        tasks = [100000000] * 4
        start = time.time()
        results = pool.map(cpu_intensive, tasks)
        end = time.time()
    
    print(f"多進程耗時:{end - start:.2f}秒")
    print(f"結果:{results}")

if __name__ == '__main__':
    main_multiprocessing()

multiprocessing的進程間通信:

import multiprocessing
from multiprocessing import Queue, Pipe

defworker_queue(queue):
    """通過隊列通信"""
    queue.put("Message from worker")

defmain():
    # 方法1:使用隊列
    queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=worker_queue, args=(queue,))
    p.start()
    message = queue.get()
    print(f"收到消息:{message}")
    p.join()
    
    # 方法2:使用管道
    parent_conn, child_conn = multiprocessing.Pipe()
    
    defworker_pipe(conn):
        conn.send("Hello from pipe")
        conn.close()
    
    p = multiprocessing.Process(target=worker_pipe, args=(child_conn,))
    p.start()
    message = parent_conn.recv()
    print(f"收到消息:{message}")
    p.join()

if __name__ == '__main__':
    main()

性能對比與最佳實踐

綜合性能測試:

import asyncio
import threading
import multiprocessing
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

defbenchmark_all():
    """綜合性能對比"""
    
    # 測試1:I/O密集型(網絡請求)
    defio_task():
        try:
            requests.get("http://httpbin.org/delay/1", timeout=5)
            return1
        except:
            return0
    
    # asyncio版本
    asyncdefio_asyncio():
        import aiohttp
        asyncwith aiohttp.ClientSession() as session:
            tasks = []
            for _ in range(10):
                tasks.append(io_asyncio_task(session))
            returnawait asyncio.gather(*tasks)
    
    asyncdefio_asyncio_task(session):
        try:
            asyncwith session.get("http://httpbin.org/delay/1", timeout=5) as r:
                return1
        except:
            return0
    
    # threading版本
    defio_threading():
        with ThreadPoolExecutor(max_workers=10) as executor:
            return list(executor.map(lambda _: io_task(), range(10)))
    
    # 測試CPU密集型
    defcpu_task():
        return sum(i ** 2for i in range(10000000))
    
    # threading版本(CPU密集型)
    defcpu_threading():
        with ThreadPoolExecutor(max_workers=4) as executor:
            return list(executor.map(lambda _: cpu_task(), range(4)))
    
    # multiprocessing版本
    defcpu_multiprocessing():
        with ProcessPoolExecutor(max_workers=4) as executor:
            return list(executor.map(lambda _: cpu_task(), range(4)))
    
    print("=== 性能對比 ===")
    
    # I/O測試
    print("\n1. I/O密集型(10個網絡請求)")
    start = time.time()
    io_threading()
    print(f"threading耗時:{time.time() - start:.2f}秒")
    
    # CPU測試
    print("\n2. CPU密集型(4個大計算)")
    start = time.time()
    cpu_threading()
    print(f"threading耗時:{time.time() - start:.2f}秒")
    
    start = time.time()
    cpu_multiprocessing()
    print(f"multiprocessing耗時:{time.time() - start:.2f}秒")

benchmark_all()

選擇決策樹

┌─ 是I/O密集型嗎?
│  ├─ Yes ──> 是否需要實時響應?
│  │          ├─ Yes ──> 使用 asyncio(推薦)
│  │          └─ No ──> 可用threading或asyncio
│  │
│  └─ No ──> 是CPU密集型嗎?
│             ├─ Yes ──> 使用 multiprocessing
│             └─ No ──> 數據處理量???
│                       ├─ Yes ──> 單線程即可
│                       └─ No ──> 使用threading

實戰案例:混合方案

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor

asyncdefhybrid_approach():
    """混合使用異步和多進程"""
    
    defcpu_intensive(n):
        # CPU密集計算
        return sum(i ** 2for i in range(n))
    
    # 先用asyncio并發發起任務
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = []
        for i in range(10):
            # 在線程池中運行CPU密集操作
            task = loop.run_in_executor(executor, cpu_intensive, 10000000)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
    
    return results

asyncio.run(hybrid_approach())

結尾

在Python并發編程中,沒有絕對的最優方案,只有最適合當前場景的方案。asyncio適合I/O密集型的高并發場景,threading提供了輕量級的并發支持,multiprocessing則是CPU密集型計算的終極武器。理解GIL的影響、掌握三種方案的特點和適用場景,選擇合適的方案進行組合使用,這才是實戰中的最佳實踐。隨著FastAPI等現代框架的普及,asyncio已經成為主流選擇,但在處理混合型應用時,仍然需要靈活地運用三種方案。

責任編輯:趙寧寧 來源: Python數智工坊
相關推薦

2017-05-05 08:44:24

PythonAsyncio異步編程

2017-08-02 15:00:12

PythonAsyncio異步編程

2024-03-29 06:44:55

Python多進程模塊工具

2021-06-11 06:54:35

PythonThreadingMultiproces

2023-12-11 18:18:24

Python編程線程

2023-08-30 08:43:42

asyncioaiohttp

2010-10-15 08:57:15

PHP多進程

2025-06-03 08:27:58

Python異步IO編程

2019-02-26 11:15:25

進程多線程多進程

2024-12-27 08:11:44

Python編程模式IO

2023-05-10 07:47:08

Python并發編程

2021-04-20 12:39:52

Node.js多線程多進程

2021-08-04 23:30:28

Node.js開發線程

2023-11-28 13:52:00

Python多進程多線程

2011-02-22 08:49:16

.NET同步異步

2021-03-23 07:56:54

JS基礎同步異步編程EventLoop底層

2011-02-22 09:09:21

.NETAsync CTP異步

2021-02-25 11:19:37

谷歌Android開發者

2024-08-26 08:39:26

PHP孤兒進程僵尸進程

2023-11-01 11:20:57

點贊
收藏

51CTO技術棧公眾號

色三级在线观看| 国产有码在线| 69久久久久| 亚洲电影激情视频网站| 婷婷激情综合| 亚洲欧美国产毛片在线| 伊人久久视频| 日韩av一二三四区| 91精品免费久久久久久久久| 无码人妻aⅴ一区二区三区日本| 男人的天堂在线视频免费观看 | 欧美大片免费| 国产麻豆成人精品| 国产小视频国产精品| 久久99久久久久久| 亚洲一二三区视频| 一区二区三区四区高清精品免费观看| 国产精品电影网| 深夜福利在线观看直播| 亚洲影院免费| 国产亚洲精品激情久久| 不卡福利视频| 国产午夜精品久久久 | 日韩精品中文字幕久久臀| 在线电影福利片| 国产黄人亚洲片| 久久亚洲私人国产精品va| 国产精品久久久久久精| 99精品国产在热久久婷婷| 日韩欧美一级精品久久| 蜜臀在线免费观看| 免费日韩一区二区三区| 欧美日韩三级一区二区| 欧美美女黄色网| 国产精品18久久久久久vr| 亚洲五码在线观看视频| 福利一区福利二区| 中文字幕日韩专区| 久久不见久久见中文字幕免费 | 亚洲高清福利| 久久黄色网页| 视频在线观看99| 1024在线视频| 麻豆专区一区二区三区四区五区| 伦理中文字幕亚洲| 成人av地址| 亚洲精品99久久久久中文字幕| 黄色手机在线视频| 免费永久网站黄欧美| 一区二区三区av在线| 欧洲乱码伦视频免费| 欧美va亚洲va香蕉在线| 啊啊啊啊啊好爽| 六月丁香婷婷久久| 福利在线一区二区| 66视频精品| 久热国产精品视频| 蜜乳av综合| 欧美日韩亚洲一区二区三区在线观看 | 性刺激的欧美三级视频| 天天操天天色综合| 精品嫩模一区二区三区| 国产亚洲精品7777| 精品999在线观看| 秋霞在线一区| 国产三区精品| 美国成人xxx| 在线精品高清中文字幕| 国产色在线观看| 亚洲福利国产精品| 欧美日韩xx| 久国内精品在线| 国产精品av一区二区三区| 久久免费国产精品1| 国产极品久久久久久久久波多结野| 狠狠色香婷婷久久亚洲精品| 深夜黄色小视频| 欧美日韩一区二区三区在线| 久久综合色播| 国产日韩视频一区二区三区| 六十路在线观看| 一区二区三区免费观看| 国产免费999| 国产91精品精华液一区二区三区| 羞羞视频立即看| 欧美激情一区在线观看| 国产成人三级视频| 亚洲va中文字幕| 国产精品午夜久久久久久| 欧美成人精品高清在线播放| 少妇精品久久久| 国产一线二线三线女| 欧美中文一区二区三区| 男人的天堂在线视频| 中文字幕无线精品亚洲乱码一区 | 欧美日韩中文字幕在线播放| 亚洲女同在线| 波多野吉衣av| 国产亚洲aⅴaaaaaa毛片| 激情国产一区| 99理论电影网| 91精品综合| 欧美黄色免费影院| 99国产欧美久久久精品| 日韩免费一级视频| 日韩免费电影一区| 一级欧洲+日本+国产| 91一区二区三区| 欧美日韩18| 国严精品久久久久久亚洲影视 | 在线看片国产福利你懂的| 精品国产麻豆免费人成网站| 伊人精品影院| 国产综合色香蕉精品| 日本欧美肥老太交大片| 国产日韩专区在线| 国产欧美日韩三级| 欧美亚洲韩国| 中文字幕中文字幕在线中心一区| 国产一区二区三区高清播放| www.在线观看av| 日韩一区二区三区av| 九色porny自拍视频在线播放| 日韩电影免费在线观看中文字幕| 欧美一区国产| 亚洲黄色成人久久久| 成+人+亚洲+综合天堂| 免费观看成人在线视频| 亚洲成人在线免费| 欧美绝顶高潮抽搐喷水合集| 中文久久久久久| 欧美激情视频播放| 91精品国偷自产在线电影 | 欧美激情一区二区三区在线视频观看 | 美女啪啪无遮挡免费久久网站| 亚洲一区电影| 霍思燕三级露全乳照| 日本一区二区在线不卡| 成人高清一区| 2019中文字幕在线免费观看| 成人6969www免费视频| 国产不卡一区二区三区在线观看| 先锋亚洲精品| 成人av免费| 日韩一区在线视频| 精品久久中文| 手机看片1024久久| 日韩亚洲欧美高清| 久久精品导航| 一二三四视频在线中文| 日韩极品视频在线观看 | 乱人伦视频在线| 亚洲一区 在线播放| 久久亚洲精品网站| 中文字幕一区二区不卡| 日本中文字幕电影在线免费观看| 国产精品日韩二区| 国产精品一区二区男女羞羞无遮挡 | 国产污视频在线| 亚洲欧美第一页| 成人精品影院| 国产特黄在线| 水蜜桃亚洲精品| 亚洲最色的网站| 欧美日韩亚洲一区二区三区在线| 国产三区在线观看| 缅甸午夜性猛交xxxx| 欧美日韩精品电影| 久久精品国产网站| 日本一区精品视频| 日韩高清dvd| 亚洲视频一区二区免费在线观看| 免费高清完整在线观看| 一区二区三区四区国产| 久热精品视频在线免费观看| 亚洲色图20p| 国产日韩精品视频一区二区三区| 777视频在线观看| 日韩成人在线网站| 99国产精品国产精品久久| 国产成人久久| 欧美日韩中文字幕在线播放| 欧美大片免费观看| 岛国av在线不卡| 麻豆国产精品官网| 欧美一区二区三区久久| 好操啊在线观看免费视频| 免费观看日韩毛片| 99在线视频免费观看| 中文字幕综合在线| 一本大道综合伊人精品热热| 99久久香蕉| 在线观看二区| 人人妻人人添人人爽欧美一区| 91精品视频观看| 一区二区欧美国产| 奇米影视7777精品一区二区| 日本a在线播放| 任你操这里只有精品| 一区二区三区视频在线|