将JWT身份验证集成到Python的Flask Web API中

首先

我尝试在使用Python创建的Flask Web API中加入JWT(JSON Web Token)认证。
参考资料:使用Python + Flask + MongoDB创建Web API并将其部署到Azure VM + Nginx(收集Hololive预定视频的计划3)。

2023/1/30 更新,由于无法在 Python 3.11 系统中使用 Flask-JWT,因此重新发布了此帖。
将 JWT 认证(Flask-JWT-Extended)嵌入到 Python 的 Flask Web API 中。

JWT(JSON Web Token)是什么

JWT是JSON Web Token的缩写,它是一种规范,用于以JSON对象的形式交换请求信息(Claim)。
该规范在RFC7519中定义,并在双方通信时的授权(Authorization)中使用。

本次,根据请求中指定的用户名和密码,生成JWT令牌作为响应返回,并利用该JWT令牌使得可以使用Web API的功能。

环境

ローカル環境

Windows 10 Pro 1909
Python 3.8.5
PowerShell 7.1 / WSL
Git for Windows 2.27.0
MongoDB 4.4.2

クラウド環境

Azure VM の Ubuntu 20.04(LTS)
Python 3.8.6
Git 2.25.1
MongoDB 4.4.2
Nginx 1.18.0

准备用户信息

为了判断是否可以使用Web API功能,我们将根据用户名和密码将用户信息添加到现有的MongoDB的users集合中并进行注册。

> mongo localhost:27017/admin -u admin -p
> use holoduledb
switched to db holoduledb
> show collections
holodules
> db.createCollection("users");
{ "ok" : 1 }
> db.users.save( {"id":"1", "username":"user01", "password":"password01"} );
WriteResult({ "nInserted" : 1 })
> db.users.find();
{ "_id" : ObjectId("5fe1aca6d53eaa62c5f8c75b"), "id" : "1", "username" : "user01", "password" : "password01" }

创建用户类 (Yuè kè de zuò

创建一个用于存储从MongoDB的用户集合中获取的用户信息的类。
由于没有使用Object-Document-Mapper,因此我还创建了一个方法来相互转换JSON和对象。

class User:
    def __init__(self, id, username, password):
        self.__id = id
        self.__username = username
        self.__password = password

    # id
    @property
    def id(self):
        return self.__id

    @id.setter
    def id(self, id):
        self.__id = id

    # username
    @property
    def username(self):
        return self.__username

    @username.setter
    def username(self, username):
        self.__username = username

    # password
    @property
    def password(self):
        return self.__password

    @password.setter
    def password(self, password):
        self.__password = password

    # ドキュメントから変換
    @classmethod
    def from_doc(cls, doc):
        if doc is None:
            return None
        user = User(doc['id'], 
                    doc['username'], 
                    doc['password'])
        return user

    # ドキュメントへ変換
    def to_doc(self):
        doc = { 'id': str(self.id),
                'username': str(self.username),
                'password' : str(self.password) }
        return doc

在使用Flask时,添加一个用于JWT认证的包。

因为现有的Web API使用了Flask框架,所以我们需要添加Flask-JWT包来在Flask中使用JWT。

> poetry add Flask-JWT

将JWT认证过程集成进来。

对于现有的Web API的app.py,将JWT认证过程集成进去。

添加已创建的User类和Flask-JWT包的导入。

from flask_jwt import jwt_required, current_identity, JWT
from models.user import User

添加一个函数,使用用户名和密码进行身份验证,并返回用户对象(identity)。

def authoricate(username, password):
    user = User.from_doc(db.users.find_one({"username": username}))
    authenticated = True if user is not None and user.password == password else False
    return user if authenticated else None

添加一个根据用户ID返回用户对象(identity)的函数。

def identity(payload):
    # @jwt.jwt_payload_handler でJWTペイロードをカスタマイズし、identity をユーザー名にしている
    username = payload['identity']
    user = User.from_doc(db.users.find_one({"username": username}))
    return user

添加JWT认证的初始化处理

密钥 JWT_SECRET_KEY 是从配置文件中设置的。

# Flask
app = Flask(__name__)
# JSONのソートを抑止
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key      # JWTに署名する際の秘密鍵
app.config['JWT_ALGORITHM'] = 'HS256'                       # 暗号化署名のアルゴリズム
app.config['JWT_LEEWAY'] = 0                                # 有効期限に対する余裕時間
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) # トークンの有効期間
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0)   # トークンの使用を開始する相対時間
app.config['JWT_AUTH_URL_RULE'] = '/auth'                   # 認証エンドポイントURL
jwt = JWT(app, authoricate, identity)                       # ここで上記2つの関数を指定

添加一个函数来自定义JWT负载。

@jwt.jwt_payload_handler
def make_payload(identity):
    iat = datetime.utcnow()
    exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA') 
    nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
    identity = getattr(identity, 'username')
    return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}

為需要認證的網絡 API 方法指定裝飾器。

@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required()                                   # デコレータを指定
def get_Holodules(date):
    logger.info(f"holodules/{date}")
    if len(date) != 8:
        abort(500)
    ...

驗證JWT的運作確認

启动Web API

> poetry run flask run

 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
[INFO    ]_log -  * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

使用curl命令获取JWT令牌

我們會向認證端點URL的 /auth 發送請求,並提供用戶名和密碼,確認是否可以獲取JWT令牌。

# Powershell の場合は JSON のエスケープが必要
> curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '"{ \"username\": \"user01\", \"password\": \"password01\" }"'

{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}
# WSL であればいつも通り
$ curl "http://127.0.0.1:5000/auth" -X POST -H "Content-Type: application/json" -d '{ "username": "user01", "password": "password01" }'

{"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."}
img01.jpg

使用curl命令调用Web API的方法。

用获取的 JWT 令牌指定并调用 Web API 方法,以确保能够获取到响应。

# Powershell
> curl "http://localhost:5000/holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."

{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...
# WSL
$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."

{"result":17,"holodules":[{"key":"HL0503_20201209_230000","video_id":"yEttl2nfhsQ","datetime":"20201209 230000","name":"\u5c3e\u4e38\u30dd\u30eb\u30ab","title":"\u30a2\u30ab\u30da\u30e9\u3067...
img02.jpg

若未指定 JWT 令牌,将导致认证错误。

$ curl "http://localhost:5000/Holodules/20201209"

{"status_code":401,"error":"Authorization Required","description":"Request does not contain an access token"}

如果JWT令牌过期的话,会是这样的情况。

$ curl "http://localhost:5000/Holodules/20201209" -H "Authorization: JWT eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI..."

{"status_code": 401,"error": "Invalid token","description": "Signature has expired"}

搭载JWT认证的app.py源代码

我們在上一次的基礎上,進行了一些修改,不僅添加了JWT身份驗證,還增加了日誌紀錄等功能。

import json
from flask import Flask, jsonify, request, abort, make_response, current_app
from flask_jwt import jwt_required, current_identity, JWT
from pymongo import MongoClient
from os.path import join, dirname
from urllib.parse import quote_plus
from datetime import timedelta, datetime
from models.holodule import Holodule
from models.user import User
from settings import Settings
from logger import log, get_logger

# ロギングの設定
json_path = join(dirname(__file__), "config/logger.json")
log_dir = join(dirname(__file__), "log")
logger = get_logger(log_dir, json_path, False)

# Settings インスタンス
settings = Settings(join(dirname(__file__), '.env'))

# MongoDB 接続情報
mongodb_user = quote_plus(settings.mongodb_user)
mongodb_password = quote_plus(settings.mongodb_password)
mongodb_host = "mongodb://%s/" % (settings.mongodb_host)

# MongoDB 接続認証
client = MongoClient(mongodb_host)
db = client.holoduledb
db.authenticate(name=mongodb_user,password=mongodb_password)

# ユーザー名とパスワードを用いて認証情報を検証するコールバック関数
def authoricate(username, password):
    user = User.from_doc(db.users.find_one({"username": username}))
    authenticated = True if user is not None and user.password == password else False
    return user if authenticated else None

# JWTペイロードをもとにユーザー情報を取得するコールバック関数
def identity(payload):
    # @jwt.jwt_payload_handler でJWTペイロードをカスタマイズし、identity をユーザー名にしている
    username = payload['identity']
    user = User.from_doc(db.users.find_one({"username": username}))
    return user

# Flask
app = Flask(__name__)
# JSONのソートを抑止
app.config['JSON_SORT_KEYS'] = False
# Flask JWT
app.config['JWT_SECRET_KEY'] = settings.jwt_secret_key      # JWTに署名する際の秘密鍵
app.config['JWT_ALGORITHM'] = 'HS256'                       # 暗号化署名のアルゴリズム
app.config['JWT_LEEWAY'] = 0                                # 有効期限に対する余裕時間
app.config['JWT_EXPIRATION_DELTA'] = timedelta(seconds=300) # トークンの有効期間
app.config['JWT_NOT_BEFORE_DELTA'] = timedelta(seconds=0)   # トークンの使用を開始する相対時間
app.config['JWT_AUTH_URL_RULE'] = '/auth'                   # 認証エンドポイントURL
jwt = JWT(app, authoricate, identity)                       # ここで上記2つの関数を指定

# JWTペイロードのカスタマイズ
@jwt.jwt_payload_handler
def make_payload(identity):
    iat = datetime.utcnow()
    exp = iat + current_app.config.get('JWT_EXPIRATION_DELTA') 
    nbf = iat + current_app.config.get('JWT_NOT_BEFORE_DELTA')
    identity = getattr(identity, 'username')
    return {'exp': exp, 'iat': iat, 'nbf': nbf, 'identity': identity}

# ホロジュール配信予定の取得
@log(logger)
@app.route('/holodules/<string:date>', methods=['GET'])
@jwt_required()
def get_Holodules(date):
    logger.info(f"holodules/{date}")
    if len(date) != 8:
        abort(500)

    # MongoDB から年月日を条件にホロジュール配信予定を取得してリストに格納
    holodule_list = []
    for doc in db.holodules.find({"datetime": {'$regex':'^'+date}}).sort("datetime", -1):
        holodule = Holodule.from_doc(doc)
        holodule_list.append(holodule)

    if len(holodule_list) == 0:
        abort(404)

    # オブジェクトをもとに辞書を構築してJSONとして返却
    holodules = []
    for holodule in holodule_list:
        doc = holodule.to_doc()
        holodules.append(doc)
    result = {
        "result":len(holodule_list),
        "holodules":holodules
    }
    # UTF-8コード、Content-Type は application/json
    return make_response(jsonify(result))
    # UTF-8文字、Content-Type は text/html; charset=utf-8
    # return make_response(json.dumps(result, ensure_ascii=False))

# エラーハンドラ:404
@log(logger)
@app.errorhandler(404)
def not_found(error):
    logger.error(f"{error}")
    return make_response(jsonify({'error': 'Not found'}), 404)

# エラーハンドラ:500
@log(logger)
@app.errorhandler(500)
def internal_server_error(error):
    logger.error(f"{error}")
    return make_response(jsonify({'error': 'Internal Server Error'}), 500)

if __name__ == "__main__":
    app.run()

最后

在使用JWT(JSON Web Token)认证的基础上,我们成功地集成了最基本的机制。
虽然我们实现了所需的SSL通信和JWT认证,但由于强行结合了各种机制,我希望重新审视整体。

在这里,服务器端的功能告一段落,同时我们会开始开发客户端(Android 应用)并且进行修改,利用Web API作为依托。

广告
将在 10 秒后关闭
bannerAds