本文导读

    一、分布式锁在FastAPI中的作用与原理 1.1 为什么需要分布式锁? 想象一个场景:你做了个FastAPI秒杀接口,商品库存只有1件。如果同时有100个请求打进来,单实例FastAPI能用asyn

一、分布式锁在FastAPI中的作用与原理

1.1 为什么需要分布式锁?

想象一个场景:你做了个FastAPI秒杀接口,商品库存只有1件。如果同时有100个请求打进来,单实例FastAPI能用asyncio.Lock(本地锁)保证同一时间只有一个请求处理库存。但如果部署了3个FastAPI实例(多进程/多机器),本地锁就失效了——每个实例都有自己的锁,100个请求会同时冲进3个实例,导致库存变成-99,彻底乱套。

分布式锁的本质:给跨进程、跨机器的资源竞争“上全局锁”,不管多少个FastAPI实例,同一时间只有一个请求能拿到锁,确保数据一致。

1.2 分布式锁的核心原理

分布式锁要满足4个核心要求:

  • 互斥性:同一时间只有一个请求能拿到锁;
  • 安全性:不能让A的锁被B释放;
  • 可用性:Redis挂了一个节点,还能正常用;
  • 容错性:持有锁的进程崩溃,锁要能自动释放。

FastAPI里最常用的是Redis分布式锁(轻量、性能高),底层用Redlock算法(解决Redis单点故障问题)。测试环境可以简化成单Redis节点,生产环境建议用3-5个节点。

1.3 Redlock算法简化理解

Redlock是“多节点投票制”:

  1. 向5个Redis节点发“锁请求”;
  2. 超过3个节点同意(半数以上),就算拿到锁;
  3. 计算总耗时,如果比锁超时时间短,锁有效;
  4. 否则,把所有节点的锁都删了,重新来。

测试环境不用这么复杂——先拿单Redis节点练手,生产再扩展。

二、FastAPI中分布式锁的实现

2.1 依赖准备与配置

首先装依赖:

pip install fastapi==0.109 aioredis==2.0.1 pydantic==2.5.3 pytest-asyncio==0.23.2

pydantic写个配置类(统一管理Redis连接参数):

# lock_config.py
from pydantic import BaseModel, Field

class RedisLockConfig(BaseModel):
    redis_url: str = Field(default="redis://localhost:6379", description="Redis连接地址")
    lock_prefix: str = Field(default="dist_lock:", description="锁键前缀,避免key冲突")
    timeout: int = Field(default=10, description="锁超时时间(秒),防止死锁")
    renew_interval: int = Field(default=3, description="锁续约间隔(秒),防止业务超时")

2.2 异步分布式锁实现(aioredis)

因为FastAPI是异步的,必须用aioredis(异步Redis客户端)。写个RedisDistributedLock类,封装锁的获取、释放、续约:

# distributed_lock.py
from aioredis import Redis, RedisError
from pydantic import BaseModel
import uuid
import asyncio

class RedisDistributedLock:
    def __init__(self, config: RedisLockConfig):
        self.config = config
        self.redis: Redis | None = None  # Redis客户端实例
        self.lock_key: str | None = None  # 当前锁的key
        self.lock_value: str | None = None  # 唯一标识(防误删别人的锁)
        self.renew_task: asyncio.Task | None = None  # 锁续约任务

    # 异步上下文管理器:自动连接/断开Redis
    async def __aenter__(self) -> "RedisDistributedLock":
        await self._connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.release()
        await self._disconnect()

    # 连接Redis
    async def _connect(self):
        if not self.redis:
            self.redis = await Redis.from_url(self.config.redis_url)

    # 断开Redis连接
    async def _disconnect(self):
        if self.redis:
            await self.redis.close()
            await self.redis.wait_closed()
            self.redis = None

    # 获取锁:原子操作(SETNX + EX)
    async def acquire(self, lock_name: str) -> bool:
        self.lock_key = f"{self.config.lock_prefix}{lock_name}"
        self.lock_value = str(uuid.uuid4())  # 生成唯一值,防误删

        try:
            # SET key value NX(不存在才设置) EX(过期时间)
            success = await self.redis.set(
                self.lock_key, self.lock_value,
                nx=True,
                ex=self.config.timeout
            )
        except RedisError as e:
            print(f"获取锁失败: {e}")
            return False

        if success:
            # 启动锁续约任务(防止业务超时)
            self.renew_task = asyncio.create_task(self._renew_lock())
            return True
        return False

    # 锁续约:用Lua脚本原子验证+续期
    async def _renew_lock(self):
        while self.lock_key and self.lock_value:
            try:
                # Lua脚本:如果锁是自己的,就续期
                script = """
                    if redis.call('GET', KEYS[1]) == ARGV[1] then
                        return redis.call('EXPIRE', KEYS[1], ARGV[2])
                    else
                        return 0
                    end
                """
                # 执行脚本:KEYS是锁key,ARGV是锁值+超时时间
                result = await self.redis.eval(
                    script,
                    keys=[self.lock_key],
                    args=[self.lock_value, self.config.timeout]
                )
                if result == 0:  # 续约失败(锁不是自己的)
                    break
            except Exception as e:
                print(f"续约失败: {e}")
                break
            await asyncio.sleep(self.config.renew_interval)  # 每隔3秒续一次

    # 释放锁:用Lua脚本原子验证+删除
    async def release(self):
        # 先取消续约任务
        if self.renew_task:
            self.renew_task.cancel()
            try:
                await self.renew_task
            except asyncio.CancelledError:
                pass

        if self.lock_key and self.lock_value and self.redis:
            try:
                # Lua脚本:只有锁是自己的,才删除
                script = """
                    if redis.call('GET', KEYS[1]) == ARGV[1] then
                        return redis.call('DEL', KEYS[1])
                    else
                        return 0
                    end
                """
                await self.redis.eval(
                    script,
                    keys=[self.lock_key],
                    args=[self.lock_value]
                )
            except RedisError as e:
                print(f"释放锁失败: {e}")

        # 重置状态
        self.lock_key = None
        self.lock_value = None

2.3 FastAPI路由中使用锁(依赖注入)

把锁封装成依赖,方便路由调用:

# main.py
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from lock_config import RedisLockConfig
from distributed_lock import RedisDistributedLock
import asyncio

app = FastAPI()

# 1. 配置依赖(读取Redis连接参数)
async def get_lock_config() -> RedisLockConfig:
    return RedisLockConfig()  # 实际项目可以从环境变量读,比如os.getenv("REDIS_URL")

# 2. 锁依赖:用异步生成器管理生命周期
async def get_distributed_lock(
    config: RedisLockConfig = Depends(get_lock_config)
) -> RedisDistributedLock:
    async with RedisDistributedLock(config) as lock:
        yield lock

# 模拟库存(实际用数据库)
fake_inventory = {"iphone15": 1}

# 3. 秒杀接口(用锁保护库存扣减)
@app.post("/seckill/{product_id}")
async def seckill(
    product_id: str,
    lock: RedisDistributedLock = Depends(get_distributed_lock)
):
    # 先拿锁,拿不到返回429(请求过多)
    if not await lock.acquire(lock_name=product_id):
        raise HTTPException(status_code=429, detail="抢的人太多啦,再试一次~")

    try:
        # 业务逻辑:扣减库存
        if fake_inventory.get(product_id, 0) <= 0:
            raise HTTPException(status_code=400, detail="手慢了,商品已售罄!")
        fake_inventory[product_id] -= 1
        return {"msg": "秒杀成功!", "剩余库存": fake_inventory[product_id]}
    finally:
        # 不管成功失败,都释放锁(重要!)
        await lock.release()

三、分布式锁的测试策略与用例设计

3.1 要测什么?

分布式锁的测试要覆盖正常场景异常场景

  1. 单实例并发:同一FastAPI实例下,多个请求抢锁;
  2. 多实例并发:启动多个FastAPI实例(比如用uvicorn main:app --port 8000--port 8001),用Postman批量发请求;
  3. 锁超时:持有锁的进程超时,锁自动释放;
  4. 异常崩溃:持有锁的进程突然死掉,锁是否自动释放;
  5. 锁续约:业务逻辑超时,续约是否成功。

3.2 异步测试用例(pytest-asyncio)

pytest-asyncio写异步测试,示例:

# test_seckill.py
import pytest
from httpx import AsyncClient
from main import app, fake_inventory
import asyncio

# 1. 测试客户端 fixture
@pytest.fixture(scope="module")
async def client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

# 2. 重置库存 fixture(每个测试前重置)
@pytest.fixture(autouse=True)
def reset_inv():
    fake_inventory["iphone15"] = 1
    yield

# 3. 测试1:单请求秒杀成功
@pytest.mark.asyncio
async def test_seckill_success(client: AsyncClient):
    resp = await client.post("/seckill/iphone15")
    assert resp.status_code == 200
    assert resp.json() == {"msg": "秒杀成功!", "剩余库存": 0}

# 4. 测试2:并发请求,只有1个成功
@pytest.mark.asyncio
async def test_seckill_concurrent(client: AsyncClient):
    # 定义并发请求函数
    async def send_req():
        resp = await client.post("/seckill/iphone15")
        return resp.status_code, resp.json()

    # 发5个并发请求
    tasks = [send_req() for _ in range(5)]
    results = await asyncio.gather(*tasks)

    # 统计结果:1个200(成功),4个429/400(失败)
    success = sum(1 for status, _ in results if status == 200)
    assert success == 1

# 5. 测试3:锁超时后释放
@pytest.mark.asyncio
async def test_lock_timeout(client: AsyncClient):
    fake_inventory["iphone15"] = 2  # 库存改成2,方便测试

    # 模拟一个持有锁超时的进程
    async def hold_lock():
        async with RedisDistributedLock(RedisLockConfig(timeout=2)) as lock:
            await lock.acquire("iphone15")
            await asyncio.sleep(3)  # 超过锁超时时间(2秒)

    # 先启动hold_lock,1秒后发秒杀请求
    task = asyncio.create_task(hold_lock())
    await asyncio.sleep(1)
    resp = await client.post("/seckill/iphone15")
    await task

    # 验证:锁超时释放,请求成功
    assert resp.status_code == 200
    assert fake_inventory["iphone15"] == 1

运行测试:

pytest test_seckill.py -v

四、课后Quiz:巩固知识

问题1:为什么FastAPI异步应用要用aioredis而不是redis-py?

答案解析
redis-py是同步库,会阻塞FastAPI的事件循环(相当于“堵住了水管”),导致所有请求变慢。而aioredis是异步库,能和FastAPI的异步机制完美配合,不会阻塞,性能更高。

问题2:锁的“超时时间”设太短或太长有什么问题?

答案解析

  • 设太短:如果业务逻辑没处理完,锁就自动释放了,其他请求会拿到锁,导致数据冲突(比如库存变成负数);
  • 设太长:如果持有锁的进程崩溃,锁要等很久才释放,其他请求一直拿不到锁,导致假死锁(系统像“卡住了”)。

五、常见报错与解决

报错1:aioredis.exceptions.ConnectionClosedError

原因:Redis没启动,或者连接URL错了(比如端口不是6379)。
解决

  1. 检查Redis是否在运行:redis-cli ping(返回PONG就对了);
  2. 验证RedisLockConfig里的redis_url是否正确(比如redis://localhost:6379);
  3. 增加连接超时时间:Redis.from_url(redis_url, timeout=10)

报错2:HTTP 429 Too Many Requests

原因:并发请求太多,锁被占了。
解决

  1. 优化业务逻辑,缩短锁的持有时间(比如把非核心逻辑移到锁外面);
  2. 用队列限流(比如Redis队列,把请求排成队,一个一个处理);
  3. 返回友好提示(比如“再试一次”)。

报错3:锁释放失败(Lua脚本返回0)

原因:锁已经被其他进程释放了(比如超时),或者锁值不对。
解决

  1. 确保release方法在finally块里(不管成功失败都释放);
  2. 检查锁的timeout设置,不要太短;
  3. 用唯一锁值(uuid),避免释放别人的锁。

六、实战运行步骤

  1. 启动Redisredis-server(Windows用redis-server.exe);
  2. 启动FastAPIuvicorn main:app --reload
  3. 测试接口:用Postman发POST http://localhost:8000/seckill/iphone15,看返回结果;
  4. 运行测试pytest test_seckill.py -v
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]