Flask + PyJWT 实现基于Json Web Token的用户认证授权

在程序开发中,用户认证授权是一个绕不过的重难点。以前的开发模式下,cookie和session认证是主流,随着前后端分离的趋势,基于Token的认证方式成为主流,而JWT是基于Token认证方式的一种机制,是实现单点登录认证的一种有效方法。

PyJWT是一个用来编码和解码JWT(JSON Web Tokens)的Python库,也可以用在Flask上。本文就通过一个实例来演示Flask项目整合PyJWT来实现基于Token的用户认证授权。

 

一、需求

1、程序将实现一个用户注册、登录和获取用户信息的功能

2、用户注册时输入用户名(username)、邮箱(email)和密码(password),用户名和邮箱是唯一的,如果数据库中已有则会注册失败;用户注册成功后返回用户的信息。

3、用户使用用户名(username)和密码(password)登录,登录成功时返回token,每次登录都会更新一次token。

4、用户要获取用户信息,需要在请求Header中传入验证参数和token,程序会验证这个token的有效性并给出响应。

5、程序构建方面,将用户和认证分列两个模块。

 

二、程序目录结构

根据示例需求构建程序目录结构:

 

三、程序实现

1、程序构建及相关文件

数据迁移配置文件:

from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from run import app
from app import db

app.config.from_object('app.config')

db.init_app(app)

migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)

if __name__ == '__main__':
    manager.run()

运行入口文件:

from app import create_app

app = create_app('app.config')

if __name__ == '__main__':
    app.run(host=app.config['HOST'],
            port=app.config['PORT'],
            debug=app.config['DEBUG'])

程序初始化文件:

from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

def create_app(config_filename):
    app = Flask(__name__)
    app.config.from_object(config_filename)

    @app.after_request
    def after_request(response):
        response.headers.add('Access-Control-Allow-Origin', '*')
        if request.method == 'OPTIONS':
            response.headers['Access-Control-Allow-Methods'] = 'DELETE, GET, POST, PUT'
            headers = request.headers.get('Access-Control-Request-Headers')
            if headers:
                response.headers['Access-Control-Allow-Headers'] = headers
        return response

    from app.users.model import db
    db.init_app(app)

    from app.users.api import init_api
    init_api(app)

    return app

上面代码加入了全局HTTP请求头配置,设置允许所有跨域请求。

配置文件:

DB_USER = 'root'
DB_PASSWORD = ''
DB_HOST = 'localhost'
DB_DB = 'flask-pyjwt-auth'

DEBUG = True
PORT = 3333
HOST = "192.168.1.141"
SECRET_KEY = "my blog"

SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = 'mysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB

公共文件:

def trueReturn(data, msg):
    return {
        "status": True,
        "data": data,
        "msg": msg
    }


def falseReturn(data, msg):
    return {
        "status": False,
        "data": data,
        "msg": msg
    }

2、用户模块

模块入口(空)


用户模型:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.security import generate_password_hash, check_password_hash

from app import db

class Users(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(250),  unique=True, nullable=False)
    username = db.Column(db.String(250),  unique=True, nullable=False)
    password = db.Column(db.String(250))
    login_time = db.Column(db.Integer)

    def __init__(self, username, password, email):
        self.username = username
        self.password = password
        self.email = email

    def __str__(self):
        return "Users(id='%s')" % self.id

    def set_password(self, password):
        return generate_password_hash(password)

    def check_password(self, hash, password):
        return check_password_hash(hash, password)

    def get(self, id):
        return self.query.filter_by(id=id).first()

    def add(self, user):
        db.session.add(user)
        return session_commit()

    def update(self):
        return session_commit()

    def delete(self, id):
        self.query.filter_by(id=id).delete()
        return session_commit()


def session_commit():
    try:
        db.session.commit()
    except SQLAlchemyError as e:
        db.session.rollback()
        reason = str(e)
        return reason

在上面用户模型定义中,定义了set_password和check_password方法,分别用来加密用户注册时填写的密码(将加密后的密码写入数据库)和在用户登录时检查用户密码是否正确。

用户相关接口实现:

from flask import jsonify, request
from app.users.model import Users
from app.auth.auths import Auth
from .. import common

def init_api(app):
    @app.route('/register', methods=['POST'])
    def register():
        """
        用户注册
        :return: json
        """
        email = request.form.get('email')
        username = request.form.get('username')
        password = request.form.get('password')
        user = Users(email=email, username=username, password=Users.set_password(Users, password))
        result = Users.add(Users, user)
        if user.id:
            returnUser = {
                'id': user.id,
                'username': user.username,
                'email': user.email,
                'login_time': user.login_time
            }
            return jsonify(common.trueReturn(returnUser, "用户注册成功"))
        else:
            return jsonify(common.falseReturn('', '用户注册失败'))


    @app.route('/login', methods=['POST'])
    def login():
        """
        用户登录
        :return: json
        """
        username = request.form.get('username')
        password = request.form.get('password')
        if (not username or not password):
            return jsonify(common.falseReturn('', '用户名和密码不能为空'))
        else:
            return Auth.authenticate(Auth, username, password)


    @app.route('/user', methods=['GET'])
    def get():
        """
        获取用户信息
        :return: json
        """
        result = Auth.identify(Auth, request)
        if (result['status'] and result['data']):
            user = Users.get(Users, result['data'])
            returnUser = {
                'id': user.id,
                'username': user.username,
                'email': user.email,
                'login_time': user.login_time
            }
            result = common.trueReturn(returnUser, "请求成功")
        return jsonify(result)

上面用户模块的API实现代码中,先从auth模块中导入Auth类,在用户登录接口中,调用Auth类的authenticate方法来执行用户认证,认证通过则返回token,认证不通过则返回错误信息。在获取用户信息的接口,首先要进行“用户鉴权”,只有拥有权限的用户才有权限拿到用户信息。

3、认证模块

模块入口(空)


授权认证处理:

import jwt, datetime, time
from flask import jsonify
from app.users.model import Users
from .. import config
from .. import common

class Auth():
    @staticmethod
    def encode_auth_token(user_id, login_time):
        """
        生成认证Token
        :param user_id: int
        :param login_time: int(timestamp)
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=10),
                'iat': datetime.datetime.utcnow(),
                'iss': 'ken',
                'data': {
                    'id': user_id,
                    'login_time': login_time
                }
            }
            return jwt.encode(
                payload,
                config.SECRET_KEY,
                algorithm='HS256'
            )
        except Exception as e:
            return e

    @staticmethod
    def decode_auth_token(auth_token):
        """
        验证Token
        :param auth_token:
        :return: integer|string
        """
        try:
            # payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'), leeway=datetime.timedelta(seconds=10))
            # 取消过期时间验证
            payload = jwt.decode(auth_token, config.SECRET_KEY, options={'verify_exp': False})
            if ('data' in payload and 'id' in payload['data']):
                return payload
            else:
                raise jwt.InvalidTokenError
        except jwt.ExpiredSignatureError:
            return 'Token过期'
        except jwt.InvalidTokenError:
            return '无效Token'


    def authenticate(self, username, password):
        """
        用户登录,登录成功返回token,写将登录时间写入数据库;登录失败返回失败原因
        :param password:
        :return: json
        """
        userInfo = Users.query.filter_by(username=username).first()
        if (userInfo is None):
            return jsonify(common.falseReturn('', '找不到用户'))
        else:
            if (Users.check_password(Users, userInfo.password, password)):
                login_time = int(time.time())
                userInfo.login_time = login_time
                Users.update(Users)
                token = self.encode_auth_token(userInfo.id, login_time)
                return jsonify(common.trueReturn(token.decode(), '登录成功'))
            else:
                return jsonify(common.falseReturn('', '密码不正确'))

    def identify(self, request):
        """
        用户鉴权
        :return: list
        """
        auth_header = request.headers.get('Authorization')
        if (auth_header):
            auth_tokenArr = auth_header.split(" ")
            if (not auth_tokenArr or auth_tokenArr[0] != 'JWT' or len(auth_tokenArr) != 2):
                result = common.falseReturn('', '请传递正确的验证头信息')
            else:
                auth_token = auth_tokenArr[1]
                payload = self.decode_auth_token(auth_token)
                if not isinstance(payload, str):
                    user = Users.get(Users, payload['data']['id'])
                    if (user is None):
                        result = common.falseReturn('', '找不到该用户信息')
                    else:
                        if (user.login_time == payload['data']['login_time']):
                            result = common.trueReturn(user.id, '请求成功')
                        else:
                            result = common.falseReturn('', 'Token已更改,请重新登录获取')
                else:
                    result = common.falseReturn('', payload)
        else:
            result = common.falseReturn('', '没有提供认证token')
        return result

认证模块实现token的生成、解析,以及用户的认证和鉴权。

首先要安装PyJWT

Pip install pyjwt

 

认证模块的实现主要包括下面4个部分(方法):

(1)encode_auth_token方法用来生成认证Token

要生成Token需要用到pyjwt的encode方法,这个方法可以传入三个参数,如示例:

jwt.encode(payload, config.SECRET_KEY, algorithm=’HS256′)

上面代码的jwt.encode方法中传入了三个参数:第一个是payload,这是认证依据的主要信息,第二个是密钥,这里是读取配置文件中的SECRET_KEY配置变量,第三个是生成Token的算法。

这里稍微讲一下payload,这是认证的依据,也是后续解析token后定位用户的依据,需要包含特定用户的特定信息,如本例注册了data声明,data声明中包括了用户ID和用户登录时间两个参数,在“用户鉴权”方法中,解析token完成后要利用这个用户ID来查找并返回用户信息给用户。这里的data声明是我们自己加的,pyjwt内置注册了以下几个声明:

  • “exp”: 过期时间
  • “nbf”: 表示当前时间在nbf里的时间之前,则Token不被接受
  • “iss”: token签发者
  • “aud”: 接收者
  • “iat”: 发行时间

要注意的是”exp”过期时间是按当地时间确定,所以设置时要使用utc时间。

(2)decode_auth_token方法用于Token验证

这里的Token验证主要包括过期时间验证和声明验证。使用pyjwt的decode方法解析Token,得到payload。如:

jwt.decode(auth_token, config.SECRET_KEY, options={‘verify_exp’: False})

上面的options设置不验证过期时间,如果不设置这个选项,token将在原payload中设置的过期时间后过期。

经过上面解析后,得到的payload可以跟原来生成payload进行比较来验证token的有效性。

(3)authenticate方法用于用户登录验证

这个方法进行用户登录验证,如果通过验证,先把登录时间写入用户记录,再调用上面第一个方法生成token,返回给用户(用户登录成功后,据此token来获取用户信息或其他操作)。

(4)identify方法用于用户鉴权

当用户有了token后,用户可以拿token去执行一些需要token才能执行的操作。这个用户鉴权方法就是进一步检查用户的token,如果完全符合条件则返回用户需要的信息或执行用户的操作。

用户鉴权的操作首先判断一个用户是否正确传递token,这里使用header的方式来传递,并要求header传值字段名为“Authorization”,字段值以“JWT”开头,并与token用“ ”(空格)隔开。

用户按正确的方式传递token后,再调用decode_auth_token方法来解析token,如果解析正确,获取解析出来的用户信息(user_id)并到数据库中查找详细信息返回给用户。

 

四、运行结果

1、注册成功

2、注册失败

3、登录成功

4、登录失败

5、成功获取用户信息

6、用户重新登录,token变更,原token无法获取用户信息

7、不带token请求,无法获取用户信息

 

PyJWT的使用比较简单,也比较安全,本文基本涵盖了Flask和PyJWT的整合和使用过程,希望对大家有用。本文完。

 

那时那我

随遇,随缘,随安,随喜!

13 thoughts to “Flask + PyJWT 实现基于Json Web Token的用户认证授权”

  1. 非常喜欢你的博客文章,及风格。可不可以分享下博客源码,主要还是喜欢其中代码的输入框。请问用的是哪个富文本编辑器呀??

    1. 不是很清楚你的意思?
      PyJWT是Token-base的验证授权方式,无状态的,适合于API接口请求,如文章中的获取用户信息接口,是需要授权通过才能获取用户信息的,在类似的需要授权的接口中加入token验证,只能验证通过才能获取到指定的内容,这就起到了保护的作用。

  2. 想问个问题,我现在正在做个前后端分离的项目,
    如果token 有时间限定的话,会不会用户正在做某个操作的时候,突然token到期就退出了,需要重新认证。
    这个博主有类似的解决方案么? 请教下,谢谢。

    1. 首先,token是否设置时间限制除了根据项目对安全性的要求外,还与项目实际情况有关系(一个比较典型就是项目分别有PC端和移动端的情况,用户在每个端登录的同时,token都会改变)。
      第二点,如果token有时间限制,用户在执行操作的时候,token到期退出的情况其实的正常的;而在使用token的授权方式下,token的有效期一般都设置较长,用户的操作基本上不会在同一次使用项目的情况下出现token到期退出的情况。
      第三点,如果要解决token到期自动续期(不会出现token到期自动退出的情况)的情况,其实微信公众平台开发就已经给出了很好的方式,使用OAuth2,使用一个附加的Refresh Token用于刷新生成新的token。

  3. module ‘jwt’ has no attribute ‘encode’
    这里什么原因呢?
    auths.py:
    33行
    except Exception as e:
    print(‘test:’,e)
    return e
    ——————-
    结果:
    test: module ‘jwt’ has no attribute ‘encode’

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.