Flask程序目录结构——构建可扩展的Flask应用程序

在开始之前有两点要说明一下:

一是因为本人是做前端的,所以针对Flask的项目开发都是基本接口实现的,没有包括模板。

另外,使用Flask构建项目,如果项目足够简单,其实一个文件就可以够用了,但这不在本次讨论范围内,我将以实际项目开发为出发点来构建Flask程序。

以下将分两种情况(普通项目及复杂项目两种)来构建Flask程序。

 

一、普通Flask项目的构建

对于简单的项目,不需要做到模块化,程序文件的组织都放在项目文件夹下,只根据文件功能分出不同的文件,如程序入口、公共文件、配置文件、模型文件、数据迁移配置文件、接口实现文件等,并在程序中需要的地方互相引用即可。

下面给出一个普通Flask项目的构建示例:

1、程序文件结构及代码组织

2、程序代码

from flask import Flask, jsonify
from model import db
from api import init_api
from common import Common

app = Flask(__name__)
app.config.from_object('config')

db.init_app(app)
init_api(app)


if __name__ == '__main__':
    app.run(host=app.config['HOST'],
            port=app.config['PORT'],
            debug=app.config['DEBUG'])
DB_USER = 'root'
DB_PASSWORD = ''
DB_HOST = 'localhost'
DB_DB = 'test'

DEBUG = True
PORT = 3333
HOST = "192.168.1.141"

SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = 'mysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
class Common:
    Def trueReturn(self, data, msg="请求成功"):
        return {
            "status": 1,
            "data": data,
            "msg": msg
        }

    def falseReturn(self, data, msg="请求失败"):
        return {
            "status": 0,
            "data": data,
            "msg": msg
        }
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError

db = SQLAlchemy()

class Users(db.Model):
    user_id = db.Column(db.Integer, primary_key=True)
    user_name = db.Column(db.String(60), nullable=False)
    user_password = db.Column(db.String(30), nullable=False)
    user_nickname = db.Column(db.String(50))
    user_email = db.Column(db.String(30), nullable=False)

    def __init__(self, user_name, user_password, user_nickname, user_email):
        self.user_name = user_name
        self.user_password = user_password
        self.user_nickname = user_nickname
        self.user_email = user_email

    def __str__(self):
        return "Users(user_id='%s')" % self.user_id

    def getAll(self):
        return self.query.all()

    def get(self, id):
        return self.query.filter_by(user_id=id).first()

    def add(self, user):
        db.session.add(user)
        return session_commit()

    def update(self):
        return session_commit()

    def delete(self, id):
        deleteRow = self.query.filter_by(user_id=id).delete()
        return session_commit()

    def output(self, user):
        return {
            'user_id': user.user_id,
            'user_name': user.user_name,
            'user_nickname': user.user_nickname,
            'user_email': user.user_email
        }

def session_commit():
    try:
        db.session.commit()
    except SQLAlchemyError as e:
        db.session.rollback()
        reason = str(e)
        return reason
from flask import jsonify, request
from model import Users
from common import Common

def init_api(app):
    @app.route('/')
    def index():
        return jsonify(Common.trueReturn(Common, 'Hello Flask!'))

    @app.route('/user', methods=['GET'])
    def getUsers():
        users = Users.getAll(Users)
        output = []
        for user in users:
            output.append(Users.output(Users, user))
        return jsonify(Common.trueReturn(Common, output))


    @app.route('/user/<int:userId>', methods=['GET'])
    def getUser(userId):
        user = Users.get(Users, userId)
        if user is None:
            return jsonify(Common.falseReturn(Common, None, '找不到数据'))
        else:
            return jsonify(Common.trueReturn(Common, Users.output(Users, user)))


    @app.route('/user', methods=['POST'])
    def addUser():
        user_name = request.form.get('user_name')
        user_password = request.form.get('user_password')
        user_nickname = request.form.get('user_nickname')
        user_email = request.form.get('user_email')
        user = Users(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)
        result = Users.add(Users, user)
        if user.user_id:
            return getUser(user.user_id)
        else:
            return jsonify(Common.falseReturn(Common, None, result))


    @app.route('/user/<int:userId>', methods=['PUT'])
    def updateUser(userId):
        user = Users.get(Users, userId)
        if user is None:
            return jsonify(Common.falseReturn(Common, None, '找不到要修改的数据'))
        else:
            user_name = request.form.get('user_name')
            user_password = request.form.get('user_password')
            user_nickname = request.form.get('user_nickname')
            user_email = request.form.get('user_email')

            user.user_name = user_name
            user.user_password = user_password
            user.user_nickname = user_nickname
            user.user_email = user_email

            result = Users.update(Users)
            return getUser(user.user_id)


    @app.route('/user/<int:userId>', methods=['DELETE'])
    def deleteUser(userId):
        user = Users.get(Users, userId)
        if user is None:
            return jsonify(Common.falseReturn(Common, None, '找不到要删除的数据'))
        else:
            deleteRow = Users.delete(Users, userId)
            user = Users.get(Users, userId)
            if user is None:
                return getUsers()
            else:
                return jsonify(Common.falseReturn(Common, None, '删除失败'))
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

from app import app
from model import db
db.init_app(app)

migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)

if __name__ == '__main__':
    manager.run()

 

二、复杂多模块Flask项目构建

对于较复杂的项目,上面的构架很明显不符合实际开发,对于较复杂的项目来说,可扩展、弹性、维护方便、结构清晰等等要求都要达到,最好的方法就是将项目模块化,各个模块之间相对独立又彼此可交互。例如,我们都知道wordpress,假如我们现在要用Flask做一个像wordpress的管理系统,那么,我们可以将项目分成“文章管理模块”,“用户管理模块”,“分类标签管理模块”等等模块来构建。假如后续增加需求,比如增加一个“相册管理”功能,那我们就可以做一个“相册管理模块”来拼合到原来的项目中,可扩展和弹性的便利就凸显了。

下面给出一个示例来看看多模块项目的构建,示例将包含“用户模块”和“新闻模块”两个模块(功能方面只实际“用户模块”的功能)。

1、程序文件结构及代码组织

2、程序代码

from app import create_app

app = create_app('app.config')

if __name__ == '__main__':
    app.run(host=app.config['HOST'],
            port=app.config['PORT'],
            debug=app.config['DEBUG'])
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from run import app

app.config.from_object('app.config')

from app import db
db.init_app(app)

migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)

if __name__ == '__main__':
    manager.run()
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

def create_app(config_filename):
    app = Flask(__name__)
    app.config.from_object(config_filename)

    db.init_app(app)

    # 用户模块
    from app.users.api import init_api
    init_api(app)

    # 新闻模块
    from app.news.api import init_api
    init_api(app)

    return app
DB_USER = 'root'
DB_PASSWORD = ''
DB_HOST = 'localhost'
DB_DB = 'test'

DEBUG = True
PORT = 3333
HOST = "192.168.1.141"

SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = 'mysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
class Common:
    def trueReturn(self, data, msg="请求成功"):
        return {
            "status": 1,
            "data": data,
            "msg": msg
        }

    def falseReturn(self, data, msg="请求失败"):
        return {
            "status": 0,
            "data": data,
            "msg": msg
        }
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError

from app import db

class Users(db.Model):
    user_id = db.Column(db.Integer, primary_key=True)
    user_name = db.Column(db.String(60), nullable=False)
    user_password = db.Column(db.String(30), nullable=False)
    user_nickname = db.Column(db.String(50))
    user_email = db.Column(db.String(30), nullable=False)

    def __init__(self, user_name, user_password, user_nickname, user_email):
        self.user_name = user_name
        self.user_password = user_password
        self.user_nickname = user_nickname
        self.user_email = user_email

    def __str__(self):
        return "Users(user_id='%s')" % self.user_id

    def getAll(self):
        return self.query.all()

    def get(self, id):
        return self.query.filter_by(user_id=id).first()

    def add(self, user):
        db.session.add(user)
        return session_commit()

    def update(self):
        return session_commit()

    def delete(self, id):
        deleteRow = self.query.filter_by(user_id=id).delete()
        return session_commit()

    def output(self, user):
        return {
            'user_id': user.user_id,
            'user_name': user.user_name,
            'user_nickname': user.user_nickname,
            'user_email': user.user_email
        }

def session_commit():
    try:
        db.session.commit()
    except SQLAlchemyError as e:
        db.session.rollback()
        reason = str(e)
        return reason
from flask import jsonify, request
from app.users.model import Users
from app.common import Common

def init_api(app):
    @app.route('/')
    def index():
        return jsonify(Common.trueReturn(Common, '用户模块'))

    @app.route('/user', methods=['GET'])
    def getUsers():
        users = Users.getAll(Users)
        output = []
        for user in users:
            output.append(Users.output(Users, user))
        return jsonify(Common.trueReturn(Common, output))


    @app.route('/user/<int:userId>', methods=['GET'])
    def getUser(userId):
        user = Users.get(Users, userId)
        if user is None:
            return jsonify(Common.falseReturn(Common, None, '找不到数据'))
        else:
            return jsonify(Common.trueReturn(Common, Users.output(Users, user)))


    @app.route('/user', methods=['POST'])
    def addUser():
        user_name = request.form.get('user_name')
        user_password = request.form.get('user_password')
        user_nickname = request.form.get('user_nickname')
        user_email = request.form.get('user_email')
        user = Users(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)
        result = Users.add(Users, user)
        if user.user_id:
            return getUser(user.user_id)
        else:
            return jsonify(Common.falseReturn(Common, None, result))


    @app.route('/user/<int:userId>', methods=['PUT'])
    def updateUser(userId):
        user = Users.get(Users, userId)
        if user is None:
            return jsonify(Common.falseReturn(Common, None, '找不到要修改的数据'))
        else:
            user_name = request.form.get('user_name')
            user_password = request.form.get('user_password')
            user_nickname = request.form.get('user_nickname')
            user_email = request.form.get('user_email')

            user.user_name = user_name
            user.user_password = user_password
            user.user_nickname = user_nickname
            user.user_email = user_email

            result = Users.update(Users)
            return getUser(user.user_id)


    @app.route('/user/<int:userId>', methods=['DELETE'])
    def deleteUser(userId):
        user = Users.get(Users, userId)
        if user is None:
            return jsonify(Common.falseReturn(Common, None, '找不到要删除的数据'))
        else:
            deleteRow = Users.delete(Users, userId)
            user = Users.get(Users, userId)
            if user is None:
                return getUsers()
            else:
                return jsonify(Common.falseReturn(Common, None, '删除失败'))
from flask_sqlalchemy import SQLAlchemy

from app import db

class News(db.Model):
    news_id = db.Column(db.Integer, primary_key=True)
    news_title = db.Column(db.String(60), nullable=False)
    news_desc = db.Column(db.String(30), nullable=False)
    news_date = db.Column(db.DateTime)
    news_content = db.Column(db.Text, nullable=False)
from flask import jsonify, request
from app.news.model import News
from app.common import Common

def init_api(app):
    @app.route('/news')
    def news():
        return jsonify(Common.trueReturn(Common, '这是新闻模块', "提示信息"))

 

上面给出了两种常见情形的程序构建示例,对于具体项目,当然也还要具体分析,本文就先到此为止了。

 

那时那我

随遇,随缘,随安,随喜!

One thought to “Flask程序目录结构——构建可扩展的Flask应用程序”

  1. 你好,谢谢你的文章,让我改善了我现在写的代码结构,不过,我在写的时候遇到了一个,希望你能给我一点建议,就是关于,app文件中不同的模块的数据库映射model,存在一对多和多对多的关系,由于声明外键的时候需要导入相应的模块,例如:(省略部分)

    在app/user/model.py
    class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True)
    age = db.Column(db.Integer)

    def __init__(self, name, age):
    self.name = name
    self.age = age

    在app/score/model.py

    class Score(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    course = db.Column(db.String(50))
    assess_date = db.Column(db.DateTime)
    score = db.Column(db.Float)
    is_pass = db.Column(db.Boolean)

    user_id = db.Column(db.Integer, db.ForeignKey(‘user.id’))
    其中app/score/model.py中的user_id = db.Column(db.Integer, db.ForeignKey(‘user.id’))的user类,采用什么方式导入好一些,来保证代码的工整性,我尝试在app/score/model.py直接导入app/user/model.py的user,但是感觉当模块边多厚会导致导入变得混乱,麻烦大佬给予一点意见。谢谢

发表评论

电子邮件地址不会被公开。 必填项已用*标注