将JWT身份验证集成到Python的Flask Web API中
首先
我尝试在使用Python创建的Flask Web API中加入JWT(JSON Web Token)认证。
参考资料:使用Python + Flask + MongoDB创建Web API并将其部署到Azure VM + Nginx(收集Hololive预定视频的计划3)。
2023/1/30 更新,由于无法在 Python 3.11 系统中使用 Flask-JWT,因此重新发布了此帖。
将 JWT 认证(Flask-JWT-Extended)嵌入到 Python 的 Flask Web API 中。
JWT(JSON Web Token)是什么
JWT是JSON Web Token的缩写,它是一种规范,用于以JSON对象的形式交换请求信息(Claim)。
该规范在RFC7519中定义,并在双方通信时的授权(Authorization)中使用。
本次,根据请求中指定的用户名和密码,生成JWT令牌作为响应返回,并利用该JWT令牌使得可以使用Web API的功能。
环境
ローカル環境
Windows 10 Pro 1909
Python 3.8.5
PowerShell 7.1 / WSL
Git for Windows 2.27.0
MongoDB 4.4.2
クラウド環境
Azure VM の Ubuntu 20.04(LTS)
Python 3.8.6
Git 2.25.1
MongoDB 4.4.2
Nginx 1.18.0
准备用户信息
为了判断是否可以使用Web API功能,我们将根据用户名和密码将用户信息添加到现有的MongoDB的users集合中并进行注册。
> mongo localhost:27017/admin -u admin -p
> use holoduledb
switched to db holoduledb
> show collections
holodules
> db.createCollection("users");
{ "ok" : 1 }
> db.users.save( {"id":"1", "username":"user01", "password":"password01"} );
WriteResult({ "nInserted" : 1 })
> db.users.find();
{ "_id" : ObjectId("5fe1aca6d53eaa62c5f8c75b"), "id" : "1", "username" : "user01", "password" : "password01" }
创建用户类 (Yuè kè de zuò
创建一个用于存储从MongoDB的用户集合中获取的用户信息的类。
由于没有使用Object-Document-Mapper,因此我还创建了一个方法来相互转换JSON和对象。
class User:
def __init__(self, id, username, password):
self.__id = id
self.__username = username
self.__password = password
# id
@property
def id(self):
return self.__id
@id.setter
def id(self, id):
self.__id = id
# username
@property
def username(self):
return self.__username
@username.setter
def username(self, username):
self.__username = username
# password
@property
def password(self):
return self.__password
@password.setter
def password(self, password):
self.__password = password
# ドキュメントから変換
@classmethod
def from_doc(cls, doc):
if doc is None:
return None
user = User(doc['id'],
doc['username'],
doc['password'])
return user
# ドキュメントへ変換
def to_doc(self):
doc = { 'id': str(self.id),
'username': str(self.username),
'password' : str(self.password) }
return doc
在使用Flask时,添加一个用于JWT认证的包。
因为现有的Web API使用了Flask框架,所以我们需要添加Flask-JWT包来在Flask中使用JWT。
> poetry add Flask-JWT
将JWT认证过程集成进来。
对于现有的Web API的app.py,将JWT认证过程集成进去。
添加已创建的User类和Flask-JWT包的导入。
from flask_jwt import jwt_required, current_identity, JWT
from models.user import User
添加一个函数,使用用户名和密码进行身份验证,并返回用户对象(identity)。
def authoricate(username, password):
user = User.from_doc(db.users.find_one({"username": username}))
authenticated = True if user is not None and user.password == password else False
return user if authenticated else None
添加一个根据用户ID返回用户对象(identity)的函数。
def identity(payload):
# @jwt.jwt_payload_handler でJWTペイロードをカスタマイズし、identity をユーザー名にしている
username = payload['identity']
user = User.from_doc(db.users.find_one({"username": username}))
return user
添加JWT认证的初始化处理
密钥 JWT_SECRET_KEY 是从配置文件中设置的。
# Flask
app = Flask(__name__)
# JSONのソートを抑止
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key # JWTに署名する際の秘密鍵
app.config['JWT_ALGORITHM'] = 'HS256' # 暗号化署名のアルゴリズム
app.config['JWT_LEEWAY'] = 0 # 有効期限に対する余裕時間
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) # トークンの有効期間
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0) # トークンの使用を開始する相対時間
app.config['JWT_AUTH_URL_RULE'] = '/auth' # 認証エンドポイントURL
jwt = JWT(app, authoricate, identity) # ここで上記2つの関数を指定
添加一个函数来自定义JWT负载。
@jwt.jwt_payload_handler
def make_payload(identity):
iat = datetime.utcnow()
exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA')
nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
identity = getattr(identity, 'username')
return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}
為需要認證的網絡 API 方法指定裝飾器。
@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required() # デコレータを指定
def get_Holodules(date):
logger.info(f"holodules/{date}")
if len(date) != 8:
abort(500)
...
驗證JWT的運作確認
启动Web API
> poetry run flask run
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
[INFO ]_log - * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
使用curl命令获取JWT令牌
我們會向認證端點URL的 /auth 發送請求,並提供用戶名和密碼,確認是否可以獲取JWT令牌。
# Powershell の場合は JSON のエスケープが必要
> curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '"{ \"username\": \"user01\", \"password\": \"password01\" }"'
{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}
# WSL であればいつも通り
$ curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '{ "username": "user01", "password": "password01" }'
{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}
使用curl命令调用Web API的方法。
用获取的 JWT 令牌指定并调用 Web API 方法,以确保能够获取到响应。
# Powershell
> curl "http://localhost:5000/holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."
{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...
# WSL
$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."
{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...
若未指定 JWT 令牌,将导致认证错误。
$ curl "http://localhost:5000/Holodules/20201209"
{"status_code":401,"error":"Authorization Required","description":"Request does not contain an access token"}
如果JWT令牌过期的话,会是这样的情况。
$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."
{"status_code": 401,"error": "Invalid token","description": "Signature has expired"}
搭载JWT认证的app.py源代码
我們在上一次的基礎上,進行了一些修改,不僅添加了JWT身份驗證,還增加了日誌紀錄等功能。
import json
from flask import Flask, jsonify, request, abort, make_response, current_app
from flask_jwt import jwt_required, current_identity, JWT
from pymongo import MongoClient
from os.path import join, dirname
from urllib.parse import quote_plus
from datetime import timedelta, datetime
from models.holodule import Holodule
from models.user import User
from settings import Settings
from logger import log, get_logger
# ロギングの設定
json_path = join(dirname(__file__), "config/logger.json")
log_dir = join(dirname(__file__), "log")
logger = get_logger(log_dir, json_path, False)
# Settings インスタンス
settings = Settings(join(dirname(__file__), '.env'))
# MongoDB 接続情報
mongodb_user = quote_plus(settings.mongodb_user)
mongodb_password = quote_plus(settings.mongodb_password)
mongodb_host = "mongodb://%s/" % (settings.mongodb_host)
# MongoDB 接続認証
client = MongoClient(mongodb_host)
db = client.holoduledb
db.authenticate(name=mongodb_user,password=mongodb_password)
# ユーザー名とパスワードを用いて認証情報を検証するコールバック関数
def authoricate(username, password):
user = User.from_doc(db.users.find_one({"username": username}))
authenticated = True if user is not None and user.password == password else False
return user if authenticated else None
# JWTペイロードをもとにユーザー情報を取得するコールバック関数
def identity(payload):
# @jwt.jwt_payload_handler でJWTペイロードをカスタマイズし、identity をユーザー名にしている
username = payload['identity']
user = User.from_doc(db.users.find_one({"username": username}))
return user
# Flask
app = Flask(__name__)
# JSONのソートを抑止
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key # JWTに署名する際の秘密鍵
app.config['JWT_ALGORITHM'] = 'HS256' # 暗号化署名のアルゴリズム
app.config['JWT_LEEWAY'] = 0 # 有効期限に対する余裕時間
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) # トークンの有効期間
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0) # トークンの使用を開始する相対時間
app.config['JWT_AUTH_URL_RULE'] = '/auth' # 認証エンドポイントURL
jwt = JWT(app, authoricate, identity) # ここで上記2つの関数を指定
# JWTペイロードのカスタマイズ
@jwt.jwt_payload_handler
def make_payload(identity):
iat = datetime.utcnow()
exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA')
nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
identity = getattr(identity, 'username')
return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}
# ホロジュール配信予定の取得
@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required()
def get_Holodules(date):
logger.info(f"holodules/{date}")
if len(date) != 8:
abort(500)
# MongoDB から年月日を条件にホロジュール配信予定を取得してリストに格納
holodule_list = []
for doc in db.holodules.find({"datetime": {'$regex':'^'+date}}).sort("datetime", -1):
holodule = Holodule.from_doc(doc)
holodule_list.append(holodule)
if len(holodule_list) == 0:
abort(404)
# オブジェクトをもとに辞書を構築してJSONとして返却
holodules = []
for holodule in holodule_list:
doc = holodule.to_doc()
holodules.append(doc)
result = {
"result":len(holodule_list),
"holodules":holodules
}
# UTF-8コード、Content-Type は application/json
return make_response(jsonify(result))
# UTF-8文字、Content-Type は text/html; charset=utf-8
# return make_response(json.dumps(result, ensure_ascii=False))
# エラーハンドラ:404
@log(logger)
@app.errorhandler(404)
def not_found(error):
logger.error(f"{error}")
return make_response(jsonify({'error': 'Not found'}), 404)
# エラーハンドラ:500
@log(logger)
@app.errorhandler(500)
def internal_server_error(error):
logger.error(f"{error}")
return make_response(jsonify({'error': 'Internal Server Error'}), 500)
if __name__ == "__main__":
app.run()
最后
在使用JWT(JSON Web Token)认证的基础上,我们成功地集成了最基本的机制。
虽然我们实现了所需的SSL通信和JWT认证,但由于强行结合了各种机制,我希望重新审视整体。
在这里,服务器端的功能告一段落,同时我们会开始开发客户端(Android 应用)并且进行修改,利用Web API作为依托。