Api代码库
基于django的代码示例
完整版: bilibili-api/django/examples/application
视图
"""基于函数的视图
缺点:
即便它们可以很好的处理简单案例,但除了一些配置选项之外,没办法扩展或自定义它们,这样就限制了它们在实际应用中用途。
"""
from django.http import HttpResponse
def my_view(request):
if request.method == 'GET':
# 视图逻辑
...
return HttpResponse("result", status=201)
from django.http import HttpResponse, HttpRequest
from django.views import View
class MyView(View):
def get(self, request: HttpRequest):
# 视图逻辑
...
return HttpResponse("result")
"""
跟django View的区别:
* 传入到视图方法中的是REST framework的Request对象
* 视图方法可以返回REST framework的Response对象
* 任何APIException异常都会被捕获到,并且处理成合适的响应信息
* 拓展了身份认证、权限检查、流量控制这三个功能
"""
from rest_framework.views import APIView
from rest_framework.authentication import BasicAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
class MyView(APIView):
authentication_classes = [BasicAuthentication]
permission_classes = [IsAuthenticated]
def get(self, request: Request, format=None):
# 视图逻辑
return Response("result")
package logic
import (
"context"
"demo/internal/svc"
"demo/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type DemoLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewDemoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DemoLogic {
return &DemoLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *DemoLogic) Demo(req *types.Request) (resp *types.Response, err error) {
resp = new(types.Response)
resp.Message = req.Name
return
}
校验数据
"""https://docs.djangoproject.com/zh-hans/4.2/ref/forms/validation/#cleaning-a-specific-field-attribute"""
from django import forms
from django.core.exceptions import ValidationError
class ContactForm(forms.Form):
# Everything as before.
...
def clean_recipients(self):
"""校验recipients"""
data = self.cleaned_data["recipients"]
if "fred@example.com" not in data:
raise ValidationError("You have forgotten about Fred!")
# Always return a value to use as the new cleaned data, even if
# this method didn't change it.
return data
"""https://www.django-rest-framework.org/api-guide/serializers/#validation"""
from rest_framework import serializers
class BlogPostSerializer(serializers.Serializer):
title = serializers.CharField(max_length=100)
content = serializers.CharField()
# 校验title字段
def validate_title(self, value):
"""
Check that the blog post is about Django.
"""
if 'django' not in value.lower():
raise serializers.ValidationError("Blog post is not about Django")
return value
https://gitee.com/luzhenxiong/bilibili-orm/tree/master/marshmallow
1"""
2摘自v3.21.1官方的examples: https://github.com/marshmallow-code/marshmallow/blob/3.21.1/examples/flask_example.py
3
4注意,该example仅便于理解marshmallow的功能,对于生产环境,可以使用[marshmallow-sqlalchemy](https://github.com/marshmallow-code/marshmallow-sqlalchemy)
5
6增强的特性是自动映射模型和Schema字段
7"""
8
9import datetime
10
11from flask import Flask, request
12from flask_sqlalchemy import SQLAlchemy
13from sqlalchemy.exc import NoResultFound
14
15from marshmallow import Schema, ValidationError, fields, pre_load
16
17app = Flask(__name__)
18app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:////tmp/quotes.db"
19db = SQLAlchemy(app)
20
21##### MODELS #####
22
23
24class Author(db.Model): # type: ignore
25 id = db.Column(db.Integer, primary_key=True)
26 first = db.Column(db.String(80))
27 last = db.Column(db.String(80))
28
29
30class Quote(db.Model): # type: ignore
31 id = db.Column(db.Integer, primary_key=True)
32 content = db.Column(db.String, nullable=False)
33 author_id = db.Column(db.Integer, db.ForeignKey("author.id"))
34 author = db.relationship("Author", backref=db.backref("quotes", lazy="dynamic"))
35 posted_at = db.Column(db.DateTime)
36
37
38##### SCHEMAS #####
39# 【pk】定义schema: https://marshmallow.readthedocs.io/en/stable/quickstart.html#declaring-schemas
40
41
42class AuthorSchema(Schema):
43 id = fields.Int(dump_only=True)
44 first = fields.Str()
45 last = fields.Str()
46 formatted_name = fields.Method("format_name", dump_only=True)
47
48 def format_name(self, author):
49 return f"{author.last}, {author.first}"
50
51
52# Custom validator
53def must_not_be_blank(data):
54 """【pk】自定义验证
55
56 https://marshmallow.readthedocs.io/en/stable/quickstart.html#validation
57 """
58 if not data:
59 raise ValidationError("Data not provided.")
60
61
62class QuoteSchema(Schema):
63 # 【pk】只读和只写字段: https://marshmallow.readthedocs.io/en/stable/quickstart.html#read-only-and-write-only-fields
64 # id是只读字段, 意味着执行load方法时不会处理id字段
65 id = fields.Int(dump_only=True)
66 author = fields.Nested(AuthorSchema(), validate=must_not_be_blank)
67 # 【pk】必填字段: https://marshmallow.readthedocs.io/en/stable/quickstart.html#required-fields
68 content = fields.Str(required=True, validate=must_not_be_blank)
69 posted_at = fields.DateTime(dump_only=True)
70
71 # Allow client to pass author's full name in request body
72 # e.g. {"author': 'Tim Peters"} rather than {"first": "Tim", "last": "Peters"}
73 @pre_load
74 def process_author(self, data, **kwargs):
75 author_name = data.get("author")
76 if author_name:
77 first, last = author_name.split(" ")
78 author_dict = dict(first=first, last=last)
79 else:
80 author_dict = {}
81 data["author"] = author_dict
82 return data
83
84
85author_schema = AuthorSchema()
86# 【pk】处理列表数据: https://marshmallow.readthedocs.io/en/stable/quickstart.html#handling-collections-of-objects
87authors_schema = AuthorSchema(many=True)
88quote_schema = QuoteSchema()
89# 【pk】filtering-output: https://marshmallow.readthedocs.io/en/stable/quickstart.html#filtering-output
90quotes_schema = QuoteSchema(many=True, only=("id", "content"))
91
92##### API #####
93
94
95@app.route("/authors")
96def get_authors():
97 authors = Author.query.all()
98 # Serialize the queryset
99 result = authors_schema.dump(authors)
100 return {"authors": result}
101
102
103@app.route("/authors/<int:pk>")
104def get_author(pk):
105 try:
106 author = Author.query.filter(Author.id == pk).one()
107 except NoResultFound:
108 return {"message": "Author could not be found."}, 400
109 author_result = author_schema.dump(author)
110 quotes_result = quotes_schema.dump(author.quotes.all())
111 return {"author": author_result, "quotes": quotes_result}
112
113
114@app.route("/quotes/", methods=["GET"])
115def get_quotes():
116 quotes = Quote.query.all()
117 result = quotes_schema.dump(quotes, many=True)
118 return {"quotes": result}
119
120
121@app.route("/quotes/<int:pk>")
122def get_quote(pk):
123 try:
124 quote = Quote.query.filter(Quote.id == pk).one()
125 except NoResultFound:
126 return {"message": "Quote could not be found."}, 400
127 result = quote_schema.dump(quote)
128 return {"quote": result}
129
130
131@app.route("/quotes/", methods=["POST"])
132def new_quote():
133 json_data = request.get_json()
134 if not json_data:
135 return {"message": "No input data provided"}, 400
136 try:
137 # 【pk】反序列化: https://marshmallow.readthedocs.io/en/stable/quickstart.html#deserializing-objects-loading
138 # 校验输入数据
139 data = quote_schema.load(json_data)
140 except ValidationError as err:
141 # 验证失败
142 return err.messages, 422
143 first, last = data["author"]["first"], data["author"]["last"]
144 author = Author.query.filter_by(first=first, last=last).first()
145 if author is None:
146 # Create a new author
147 author = Author(first=first, last=last)
148 db.session.add(author)
149 # Create new quote
150 quote = Quote(
151 content=data["content"], author=author, posted_at=datetime.datetime.utcnow()
152 )
153 db.session.add(quote)
154 db.session.commit()
155 # 【pk】序列化: https://marshmallow.readthedocs.io/en/stable/quickstart.html#serializing-objects-dumping
156 result = quote_schema.dump(Quote.query.get(quote.id))
157 return {"message": "Created new quote.", "quote": result}
158
159
160if __name__ == "__main__":
161 db.create_all()
162 app.run(debug=True, port=5000)
内置了一些基础的参数校验规则,想要更复杂的校验或者自定义校验,考虑使用validators库
demo.api
syntax = "v1"
type Request {
// path: 路由参数
// https://go-zero.dev/docs/tutorials/api/route/rule
// options: 当前参数仅可接收的枚举值
// https://go-zero.dev/docs/tutorials/api/parameter
Name string `path:"name,options=you|me"`
}
type Response {
Message string `json:"message"`
}
service demo-api {
@handler DemoHandler
get /from/:name (Request) returns (Response)
}
type (
ListReq struct{}
ListResp struct{}
CreateReq {
Name string `json:"name"`
Age uint8 `json:"age"`
}
CreateResp struct{}
RetrieveReq struct{}
RetrieveResp struct{}
PartialUpdateReq struct{}
PartialUpdateResp struct{}
UpdateReq struct{}
UpdateResp struct{}
DeleteReq struct{}
DeleteResp struct{}
)
@server (
group: drf_style
prefix: api
)
service demo-api {
@doc "List样例(列出所有用户)"
@handler ListHandler
get /users (ListReq) returns(ListResp)
@doc "Create样例(创建用户)"
@handler CreateHandler
post /users (CreateReq) returns(CreateResp)
@doc "Retrieve样例(检索单个用户)"
@handler RetrieveHandler
get /users/:id (RetrieveReq) returns(RetrieveResp)
@doc "PartialUpdate样例(更新用户)"
@handler PartialUpdateHandler
patch /users/:id (PartialUpdateReq) returns(PartialUpdateResp)
@doc "Update样例(更新用户)"
@handler UpdateHandler
put /users/:id (UpdateReq) returns (UpdateResp)
@doc "Delete样例(删除用户)"
@handler DeleteHandler
delete /users/:id (DeleteReq) returns (DeleteResp)
}
缓存
from datetime import datetime
from django.views.decorators.cache import cache_page
from django.http import HttpResponse
from django.views.decorators.http import require_GET
from django.core.cache import cache
# 视图缓存: https://docs.djangoproject.com/zh-hans/4.2/topics/cache/#the-per-view-cache
# 只支持GET、HEAD请求, 适合经常被查询使用而且不怎么变化的数据
# 不支持双写一致性, 缓存期间,总是显示修改前的数据内容
@cache_page(60 * 15)
def my_view(request):
now = datetime.now().isoformat()
return HttpResponse("现在的时间是: " + now)
# 模拟django模型
class User:
...
def get(self, pk):
pass
return self
def to_json(self):
...
# 双写一致性demo
# 对于小厂中厂(QPS <= 1000)可以使用,但是大厂不行
# 高并发场景下的缺点:
# 1. 多线程同时访问, 查询相差1毫秒, 把mysql访问量瞬间打高
# 2. 通通回写redis, 容易出现数据覆盖(可以用双检加锁策略解决)
# 双检加锁策略:
# 多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个互斥锁来锁住它。其他的线程走到这一步拿不到锁就等着,等第一个线程查到了数据,然后 做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。
@require_GET
def find_user_by_id(request, pk):
CACHE_KEY_USER = "user:"
key = CACHE_KEY_USER + pk
# 1. 先从缓存里面查询,如果有直接返回结果,如果没有再去查询db
user = User(**cache.get(key)) # 伪代码
if not user:
# 2. redis里面无, 继续查询db
user = User.get(pk)
if not user:
# 3.1 cache + db都无数据
# 你具体细化, 防止多次穿透,我们业务规定,记录下这个null值的key, 列入黑名单或者记录异常。。。。。
return user
else:
# 3.2 db有,需要将数据写回redis, 保证下一次的缓存命中率
cache.set(key, user.to_json())
return HttpResponse(user.to_json())
测试
import pytest
from django.test import RequestFactory
from testcontainers.redis import RedisContainer
from inventory.views import sale, sale_v2
@pytest.fixture(scope="session")
def redis():
with RedisContainer('redis:7.2.3') as redis_container:
yield redis_container
# 覆盖settings设置
# ============================================
# https://pytest-django.readthedocs.io/en/latest/configuring_django.html#overriding-individual-settings
@pytest.fixture(autouse=True)
def setup_django_redis(settings, redis):
settings.CACHES['default']["LOCATION"] = f"redis://{redis.get_container_host_ip()}:{redis.get_exposed_port(redis.port)}"
# 用请求工厂跳过路由配置直接测试视图
# ==============================================
# https://docs.djangoproject.com/zh-hans/4.2/topics/testing/advanced/#the-request-factory
# 测试基于函数的视图
# -----------------------------------------------
# https://docs.djangoproject.com/zh-hans/4.2/topics/testing/advanced/#example
@pytest.mark.parametrize('view', [sale, sale_v2])
def test_sale(view):
factory = RequestFactory()
request = factory.get('/sale/')
response = view(request)
assert response.status_code == 200
用到pytest-django的开源项目:
djangorestframework
测试form表单
import io
import pytest
from flask import url_for
from main import app
@pytest.fixture(autouse=True)
def flask_app(tmp_path):
config_object = {"TESTING": True,
'SERVER_NAME': 'flask.app.unittest.com',
'UPLOAD_FOLDER': tmp_path
}
app.config.from_mapping(config_object)
with app.app_context():
yield app
@pytest.fixture
def client(flask_app):
"""发起http请求的客户端"""
return app.test_client()
class TestUploadFile:
def test_empty_file(self, client):
url = url_for('upload-file')
res = client.post(url, data={}, content_type='multipart/form-data')
assert res.json['code'] == 1
assert res.json['msg'] == '没有文件上传'
def test_upload_file(self, client):
url = url_for('upload-file')
data = {
'file': (io.BytesIO(b'file content'), 'filename.txt')
}
res = client.post(url, data=data, content_type='multipart/form-data')
assert res.json['code'] == 0
assert res.json['msg'] == '上传成功'
文件上传
"""上传文件: https://flask.palletsprojects.com/en/stable/patterns/fileuploads/"""
import os
from flask import Flask, request
from werkzeug.utils import secure_filename
UPLOAD_FOLDER = '/path/to/the/uploads'
ALLOW_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
def allow_file(filename: str):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOW_EXTENSIONS
@app.route('/', methods=['POST'], endpoint='upload-file')
def upload_file():
# 检查post请求是否有file部分
if 'file' not in request.files:
return {"code": 1, "msg": "没有文件上传"}
file = request.files['file']
if file.filename == '':
return {"code": 1, "msg": "没有选择文件"}
if file and allow_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return {"code": 0, "msg": "上传成功"}
else:
return {"code": 0, "msg": "文件不符合规范"}
if __name__ == '__main__':
print(app)
分布式锁
https://redis.io/docs/latest/develop/use/patterns/distributed-locks/
import random
import time
import uuid
import threading
from django.http import HttpResponse
from django.core.cache import cache
from django_redis import get_redis_connection
from django.views import View
def logic():
"""业务逻辑处理"""
# 1. 查询库存信息
result = cache.get("inventory001")
# 2. 判断库存是否足够
inventory_number = 0 if result is None else int(result)
# 3. 扣减库存, 每次减少一个
if inventory_number > 0:
cache.set("inventory001", inventory_number - 1)
ret_message = "成功卖出一个商品, 库存剩余: " + str(inventory_number)
else:
ret_message = "商品卖完了, /(ㄒoㄒ)/~~"
return ret_message
# 分布式锁
# ======================================================
# https://www.bilibili.com/video/BV13R4y1v7sP/?p=131
# 将判断+删除自己的合并为lua脚本保证原子性
# ---------------------------------------------------
# 代码: https://gitee.com/luzhenxiong/paradox-playground/blob/master/t/ut/test_cache.py
# 可重入性的分布式锁
# 允许同一个线程多次获得同一把锁,避免了线程因试图再次获取它已经拥有的锁而导致死锁的问题。
# --------------------------------------------------
class SaleViewV3(View):
uuid_value = None
key = "zzyyRedisLock"
expire_time = 50
def post(self, request):
con = get_redis_connection("default")
self.lock(con)
logic()
self.unlock(con)
return HttpResponse("ok")
def lock(self, con):
script = """
if redis.call("get",KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == then
redis.call('hincrby', KEYS[1], ARGV[1], 1)
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
end
"""
while not con.eval(script, self.key, self.uuid_value, self.expire_time):
# 暂停20毫秒~1秒后重试
time.sleep(random.uniform(0.02, 1))
return True
def unlock(self, con):
script = """
if redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
return nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then
return redis.call("del", KEYS[1])
else
return 0
end
"""
flag = con.eval(script, self.key, self.uuid_value)
assert flag is None, "this lock doesn't exists, /(ㄒoㄒ)/~~"
# 自动续期
# --------------------------------------------------
# 使用一种叫做"看门狗"的技术来实现自动续期
# 通常情况下, 看门狗会是一个定时器或者后台线程
class SaleViewV4(SaleViewV3):
renew_thread = None
locked = False
def lock(self, con):
res = super().lock(con)
if res:
self.renew_thread = threading.Thread(target=self.renew, args=(con,))
self.renew_thread.start()
self.locked = True
return res
def renew(self, con):
script = """
IF redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1 THEN
redis.call('EXPIRE', KEYS[1], ARGV[2])
ELSE
RETURN 0
END
"""
while con.eval(script, self.key, self.uuid_value, self.expire_time):
time.sleep(self.expire_time * 1000 / 3)
if not self.locked:
break
def unlock(self, con):
super().unlock(con)
if self.renew_thread:
self.renew_thread.join()
数据
多对多关系数据
前端用穿梭狂
后端处理流程:
取现有到数据到集合
求差集获取新增数据,然后set.add
差集获取删除数据, 然后set.delete
部署
方案一: ansible + supervisor + venv
在宿主机部署django应用, 通过supervisor管理进程
场景1是celery程序, 部署方便
场景2是用到docker的地方
前期阶段可以编写单文件playbook.yml
playbooks/
playbook.yml
supervisor.conf.j2
后期再考虑分目录规范playbook剧本
方案二: docker容器
cicd构建镜像, 推送到仓库, 在服务器上运行容器