Python 的 Asynchronous Programming
處理 IO 密集型任務時,一般採用多執行緒 (multithreading) 和非同步 (asynchronous) 技術。這種方法允許在等待 IO 操作的同時執行其他任務,從而充分利用 CPU 資源,提高整體效能。
本文將重點介紹非同步 (asynchronous) 技術的原理和應用時機,並以 Python 的asyncio套件為例進行實作,以更清晰地闡釋相關概念。
IO 密集型 (I/O-bound)
IO 密集型 (I/O-bound) 是指一個程式或系統的執行受限於輸入/輸出操作 (I/O operations) 的速度,而不是計算速度。典型的IO操作包括讀寫檔案、網路通信、資料庫查詢等。在這種情境底下,CPU 花費大部分時間等待IO操作的完成,而不是進行實際的計算工作。因此,提高 CPU 的運算能力對 IO 密集型任務相對有限,更為關鍵的是如何優化 IO 操作的效率。
非同步 (asynchronous)
非同步 (asynchronous) 是一種基於單執行緒的技術,其特點是在等待某些事件 (例如 I/O 操作或網路請求) 的同時,能夠持續執行其他任務,從而實現非阻塞 (non-blocking) 的效果。
為了實現這種非阻塞操作,特別引入了協程 (coroutine) 這種特殊的函式類型。協程允許在函式的執行中暫停,讓其他任務有機會執行,這樣就能夠更靈活地處理異步操作,並確保程式的高效執行。
阻塞 (blocking) vs 非阻塞 (non-blocking)
以大量請求網路上的資源為例,一般流程包括發出請求、等待回覆,最後收到回覆。
阻塞 (blocking) 情境
當一個任務向網路資源發出請求後,如果在等待資料的過程中 CPU 是閒置的且無法執行其他任務,我們會形容這段程式碼是阻塞的 (blocking)。在這種情況下,整個程式會等待直到資料返回,無法進行其他工作,這可能導致效能的浪費。
非阻塞 (Non-blocking) 情境
當一個任務向網路資源發出請求後,如果 CPU 能夠在等待資料的過程中同時執行其他任務,這就被稱為非阻塞 (Non-blocking)。透過這項技術,程式能夠在等待特定操作完成的同時,繼續執行其他任務,充分利用 CPU 資源,提高整體效能
Python 如何實現非同步 (asynchronous)?
在本章節中,我們將親自架設一個簡單的Web server,然後測試在進行 I/O-bound 任務時,未使用與使用 asynchronous 技術的效能差異。
首先,我們將使用uvicorn跟FastAPI在電腦上架設一個簡單的Web server,該server僅有一個簡單的API,名為get_data
,其功能是會在休眠三秒後返回一個隨機數。
import time
import uvicorn
from fastapi import FastAPI
import random
app = FastAPI()
@app.get("/data")
def get_data():
time.sleep(3)
return {"data": random.random()}
if __name__ == "__main__":
uvicorn.run(app)
不採用 asynchronous 技術版本
我們編寫了一個Client端程式,它呼叫main
函式,其功能是按照任務 (Job) 順序以HTTP請求方式來呼叫get_data
這個API。由於程式在等待資料返回的過程中耗費大量時間,我們使用這個例子來說明阻塞 (blocking) 程式碼的情況,特別是在處理 (I/O-bound) 任務可能造成不必要的時間浪費。
import requests
import time
URL = "http://localhost:8000/data" # Define URL globally
def request():
resp = requests.get(URL)
result = resp.json()
return result["data"]
def main(job_num=3):
for no in range(job_num):
job_id = f"JOB{no}"
print(f"[{job_id}] Start getting data...")
data = request()
print(f"[{job_id}] Got the data!")
if __name__ == "__main__":
start_time = time.time()
main()
spent_time = round(time.time() - start_time,2)
print(f"Total Spent Time: {spent_time}s")
在不使用 asynchronous 技術時,輸出順序符合預期,而整體執行時間約15.23秒。
採用 asynchronous 版本 - 1
我們使用 asyncio 和 aiohttp 這兩個套件來實作非同步 (asynchronous) 版本的Client端。
asyncio 是 Python 的一個內建模組,用於支援非同步 I/O 操作的編程框架而aiohttp 則是一個建構於 asyncio 之上的網路函式庫,主要用於處理 HTTP 請求和響應。
接下來,我們先介紹 Python 的兩個關鍵字:async
和await
。
async
通常與def
一起使用,用於定義協程 (coroutine) 函式。協程是一種支援非同步操作的特殊函式,在協程內使用await
時,它會暫停協程的執行,讓事件循環 (event loop) 有機會執行其他任務,而不阻塞整個程式,直到等待的操作完成。
我們先使用以下的例子來說明async
和await
,這兩個關鍵字的使用方法。
import asyncio
async def foo():
await asyncio.sleep(1)
return "Foo result"
async def bar():
await asyncio.sleep(2)
return "Bar result"
async def main():
results = await asyncio.gather(foo(), bar())
print(results)
asyncio.run(main())
foo 跟 bar 的function 開頭都以async
關鍵字標示,這表示它們是協程函示,當函式執行至 await
時,主程式將會暫停協程的執行,直到等待的操作完成。
而 asyncio.gather
的作用是將這些非同步任務進行組合,並在它們全部完成時返回一個包含所有結果的列表。這使得可以同時執行多個任務,而不需要等待每個任務單獨完成。
最後的 print(results)
中,將會輸出 ["Foo result", "Bar result"]
採用 asynchronous 版本 - 2
上述範例說明了協程函示的基本用法,接下來讓我們來編寫完整的非同步 (asynchronous) 版本的Client端程式。
import time
import asyncio
import aiohttp
#%%
URL = "http://localhost:8000/data" # Define URL globally
#%%
async def request(session,job_id):
print(f"[{job_id}] Start getting data...")
async with session.get(URL) as resp:
result = await resp.json()
print(f"[{job_id}] Got the data!")
return (job_id,result["data"])
async def main(job_num=3):
print("== Start the all jobs ==")
async with aiohttp.ClientSession() as session:
operations = (request(session,f"JOB{no}") for no in range(job_num))
result = await asyncio.gather(*operations)
print(f"data: {result}")
print("== End the all jobs ==")
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main())
spent_time = round(time.time() - start_time,2)
print(f"Total Spent Time: {spent_time}s")
session.get(URL)
這個操作將返回一個非同步的物件 (通常是一個 ClientResponse
物件),這個物件代表了正在進行的 HTTP 請求。它會暫停協程 (coroutine) 的執行,讓事件循環有機會執行其他任務,而不是一直等待 HTTP 請求的完成。
輸出結果如下,可以觀察到主程式不再按照任務順序執行,而是在請求 API 的過程中暫停,讓主程式先去執行其他任務,從而有效地避免了時間的浪費。