Api代码库

基于django的代码示例

完整版: bilibili-api/django/examples/application

视图

"""基于函数的视图

缺点:
即便它们可以很好的处理简单案例,但除了一些配置选项之外,没办法扩展或自定义它们,这样就限制了它们在实际应用中用途。
"""
from django.http import HttpResponse


def my_view(request):
    if request.method == 'GET':
        # 视图逻辑
        ...
    return HttpResponse("result", status=201)

校验数据

"""https://docs.djangoproject.com/zh-hans/4.2/ref/forms/validation/#cleaning-a-specific-field-attribute"""

from django import forms
from django.core.exceptions import ValidationError


class ContactForm(forms.Form):
    # Everything as before.
    ...

    def clean_recipients(self):
        """校验recipients"""
        data = self.cleaned_data["recipients"]
        if "fred@example.com" not in data:
            raise ValidationError("You have forgotten about Fred!")

        # Always return a value to use as the new cleaned data, even if
        # this method didn't change it.
        return data

缓存

from datetime import datetime
from django.views.decorators.cache import cache_page
from django.http import HttpResponse
from django.views.decorators.http import require_GET
from django.core.cache import cache

# 视图缓存: https://docs.djangoproject.com/zh-hans/4.2/topics/cache/#the-per-view-cache
# 只支持GET、HEAD请求, 适合经常被查询使用而且不怎么变化的数据
# 不支持双写一致性, 缓存期间,总是显示修改前的数据内容
@cache_page(60 * 15)
def my_view(request):
    now = datetime.now().isoformat()
    return HttpResponse("现在的时间是: " + now)


# 模拟django模型
class User:
    ...

    def get(self, pk):
        pass
        return self

    def to_json(self):
        ...


# 双写一致性demo
# 对于小厂中厂(QPS <= 1000)可以使用,但是大厂不行
# 高并发场景下的缺点:
# 1. 多线程同时访问, 查询相差1毫秒, 把mysql访问量瞬间打高
# 2. 通通回写redis, 容易出现数据覆盖(可以用双检加锁策略解决)
# 双检加锁策略:
# 多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个互斥锁来锁住它。其他的线程走到这一步拿不到锁就等着,等第一个线程查到了数据,然后 做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。
@require_GET
def find_user_by_id(request, pk):
    CACHE_KEY_USER = "user:"
    key = CACHE_KEY_USER + pk
    # 1. 先从缓存里面查询,如果有直接返回结果,如果没有再去查询db
    user = User(**cache.get(key))  # 伪代码

    if not user:
        # 2. redis里面无, 继续查询db
        user = User.get(pk)
        if not user:
            # 3.1 cache + db都无数据
            # 你具体细化, 防止多次穿透,我们业务规定,记录下这个null值的key, 列入黑名单或者记录异常。。。。。
            return user
        else:
            # 3.2 db有,需要将数据写回redis, 保证下一次的缓存命中率
            cache.set(key, user.to_json())
    return HttpResponse(user.to_json())

测试

import pytest
from django.test import RequestFactory
from testcontainers.redis import RedisContainer

from inventory.views import sale, sale_v2


@pytest.fixture(scope="session")
def redis():
    with RedisContainer('redis:7.2.3') as redis_container:
        yield redis_container


# 覆盖settings设置
# ============================================
# https://pytest-django.readthedocs.io/en/latest/configuring_django.html#overriding-individual-settings
@pytest.fixture(autouse=True)
def setup_django_redis(settings, redis):
    settings.CACHES['default']["LOCATION"] = f"redis://{redis.get_container_host_ip()}:{redis.get_exposed_port(redis.port)}"


# 用请求工厂跳过路由配置直接测试视图
# ==============================================
# https://docs.djangoproject.com/zh-hans/4.2/topics/testing/advanced/#the-request-factory

# 测试基于函数的视图
# -----------------------------------------------
# https://docs.djangoproject.com/zh-hans/4.2/topics/testing/advanced/#example
@pytest.mark.parametrize('view', [sale, sale_v2])
def test_sale(view):
    factory = RequestFactory()
    request = factory.get('/sale/')
    response = view(request)
    assert response.status_code == 200

用到pytest-django的开源项目:

  • djangorestframework

文件上传

"""上传文件: https://flask.palletsprojects.com/en/stable/patterns/fileuploads/"""
import os

from flask import Flask, request
from werkzeug.utils import secure_filename

UPLOAD_FOLDER = '/path/to/the/uploads'
ALLOW_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER


def allow_file(filename: str):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOW_EXTENSIONS


@app.route('/', methods=['POST'], endpoint='upload-file')
def upload_file():
    # 检查post请求是否有file部分
    if 'file' not in request.files:
        return {"code": 1, "msg": "没有文件上传"}
    file = request.files['file']
    if file.filename == '':
        return {"code": 1, "msg": "没有选择文件"}
    if file and allow_file(file.filename):
        filename = secure_filename(file.filename)
        file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
        return {"code": 0, "msg": "上传成功"}
    else:
        return {"code": 0, "msg": "文件不符合规范"}


if __name__ == '__main__':
    print(app)

分布式锁

https://redis.io/docs/latest/develop/use/patterns/distributed-locks/

import random
import time
import uuid
import threading

from django.http import HttpResponse
from django.core.cache import cache
from django_redis import get_redis_connection
from django.views import View


def logic():
    """业务逻辑处理"""
    # 1. 查询库存信息
    result = cache.get("inventory001")
    # 2. 判断库存是否足够
    inventory_number = 0 if result is None else int(result)
    # 3. 扣减库存, 每次减少一个
    if inventory_number > 0:
        cache.set("inventory001", inventory_number - 1)
        ret_message = "成功卖出一个商品, 库存剩余: " + str(inventory_number)
    else:
        ret_message = "商品卖完了, /(ㄒoㄒ)/~~"
    return ret_message


# 分布式锁
# ======================================================
# https://www.bilibili.com/video/BV13R4y1v7sP/?p=131

# 将判断+删除自己的合并为lua脚本保证原子性
# ---------------------------------------------------
# 代码: https://gitee.com/luzhenxiong/paradox-playground/blob/master/t/ut/test_cache.py


# 可重入性的分布式锁
# 允许同一个线程多次获得同一把锁,避免了线程因试图再次获取它已经拥有的锁而导致死锁的问题。
# --------------------------------------------------
class SaleViewV3(View):
    uuid_value = None
    key = "zzyyRedisLock"
    expire_time = 50

    def post(self, request):
        con = get_redis_connection("default")

        self.lock(con)
        logic()
        self.unlock(con)
        return HttpResponse("ok")

    def lock(self, con):
        script = """
if redis.call("get",KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == then
    redis.call('hincrby', KEYS[1], ARGV[1], 1)
    redis.call('expire', KEYS[1], ARGV[2])
    return 1
else
    return 0
end    
        """

        while not con.eval(script, self.key, self.uuid_value, self.expire_time):
            # 暂停20毫秒~1秒后重试
            time.sleep(random.uniform(0.02, 1))
        return True

    def unlock(self, con):
        script = """
if redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
    return nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then
    return redis.call("del", KEYS[1])
else
    return 0
end        
        """
        flag = con.eval(script, self.key, self.uuid_value)
        assert flag is None, "this lock doesn't exists, /(ㄒoㄒ)/~~"


# 自动续期
# --------------------------------------------------
# 使用一种叫做"看门狗"的技术来实现自动续期
# 通常情况下, 看门狗会是一个定时器或者后台线程
class SaleViewV4(SaleViewV3):
    renew_thread = None
    locked = False

    def lock(self, con):
        res = super().lock(con)
        if res:
            self.renew_thread = threading.Thread(target=self.renew, args=(con,))
            self.renew_thread.start()
            self.locked = True
        return res

    def renew(self, con):
        script = """
        IF redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1 THEN
            redis.call('EXPIRE', KEYS[1], ARGV[2])
        ELSE 
            RETURN 0
        END            
        """
        while con.eval(script, self.key, self.uuid_value, self.expire_time):
            time.sleep(self.expire_time * 1000 / 3)

            if not self.locked:
                break

    def unlock(self, con):
        super().unlock(con)
        if self.renew_thread:
            self.renew_thread.join()

数据

多对多关系数据

前端用穿梭狂

后端处理流程:

  1. 取现有到数据到集合

  2. 求差集获取新增数据,然后set.add

  3. 差集获取删除数据, 然后set.delete

部署

方案一: ansible + supervisor + venv

在宿主机部署django应用, 通过supervisor管理进程

场景1是celery程序, 部署方便

场景2是用到docker的地方

前期阶段可以编写单文件playbook.yml

playbooks/
        playbook.yml
        supervisor.conf.j2

后期再考虑分目录规范playbook剧本

方案二: docker容器

cicd构建镜像, 推送到仓库, 在服务器上运行容器