Flask使用Flask-SQLAlchemy操作MySQL数据库(二)——单表查询

一、单表查询示例

一个用户表的增删改查(CRUD)及用户列表查询

1、数据表

新建一个用户表user,user表结构如下:

DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `user_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `user_name` varchar(60) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  `user_password` varchar(30) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  `user_nickname` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT '',
  `user_email` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  PRIMARY KEY (`user_id`),
  KEY `user_name` (`user_name`),
  KEY `user_email` (`user_email`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

2、API接口

获取所有用户数据[GET]:

http://127.0.0.1:5000/users

 

添加一条用户数据[POST]:

http://127.0.0.1:5000/user

 

删除一条用户数据[DELETE]:

http://127.0.0.1:5000/user/1

 

修改一条用户数据[PATCH]:

http://127.0.0.1:5000/user/1

 

查询一条用户数据[GET]:

http://127.0.0.1:5000/user/1

 

二、接口实现

1、定义数据表模型

根据用户表结构和程序需求来定义数据表模型

class User(mydb.Model):
    user_id = mydb.Column(mydb.Integer, primary_key=True)
    user_name = mydb.Column(mydb.String(60), nullable=False)
    user_password = mydb.Column(mydb.String(30), nullable=False)
    user_nickname = mydb.Column(mydb.String(50))
    user_email = mydb.Column(mydb.String(30), nullable=False)
    def __repr__(self):
        return '<User %r>' % self.user_name

 

2、查询所有用户数据

@app.route('/users', methods=['GET'])
def getUsers():
    data = User.query.all()
    datas = []
    for user in data:
        datas.append({'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email})
    return jsonify(data=datas)

使用User模型查询出所有的用户数据,利用for…in循环遍历查询出来的数据,并重新组织输出格式和字段,最后使用jsonify格式化输出JSON数据。

 

3、查询单条用户数据

@app.route('/user/<int:userId>', methods=['GET'])
def getUser(userId):
    user = User.query.filter_by(user_id=userId).first()
    if (user is None):
        result = {'msg': '找不到数据'}
    else:
        result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
    return jsonify(data=result)

使用filter_by作为查询筛选条件以获得指定用户id的数据

 

4、添加用户数据

@app.route('/user', methods=['POST'])
def addUser():
    user_name = request.form.get('user_name')
    user_password = request.form.get('user_password')
    user_nickname = request.form.get('user_nickname')
    user_email = request.form.get('user_email')
    user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)
    try:
        mydb.session.add(user)
        mydb.session.commit()
    except:
        mydb.session.rollback()
        mydb.session.flush()
    userId = user.user_id
    if (user.user_id is None):
        result = {'msg': '添加失败'}
        return jsonify(data=result)

    data = User.query.filter_by(user_id=userId).first()
    result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
    return jsonify(data=result)

说明:

(1)添加数据需要从请求方获取数据,一般情况是通过表单的方式来提交,使用request.form.get(‘user_name’)可以获取来自请求方的相关字段数据。

(2)在获取到添加数据后,要使用User模型对数据进行整理,整理出的数据本身就是准备要添加到数据表中的数据

user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)

(3)数据添加分为两步,一是通过数据库操作会话来添加数据

mydb.session.add(user)

二是提交添加数据这个操作

mydb.session.commit()

在添加和提交操作这个过程中,如果出现异常,需要做异常处理。如进行回滚,重置等

mydb.session.rollback()

mydb.session.flush()

(4)后面是添加成功后返回新添加的数据

 

5、修改指定用户数据

@app.route('/user/<int:userId>', methods=['PATCH'])
def updateUser(userId):
    user_name = request.form.get('user_name')
    user_password = request.form.get('user_password')
    user_nickname = request.form.get('user_nickname')
    user_email = request.form.get('user_email')
    try:
        user = User.query.filter_by(user_id=userId).first()
        if (user is None):
            result = {'msg': '找不到要修改的记录'}
            return jsonify(data=result)
        else:
            user.user_name = user_name
            user.user_password = user_password
            user.user_nickname = user_nickname
            user.user_email = user_email
            mydb.session.commit()
    except:
        mydb.session.rollback()
        mydb.session.flush()
    userId = user.user_id
    data = User.query.filter_by(user_id=userId).first()
    result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_password': user.user_password, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
    return jsonify(data=result)

说明:

修改数据需要获取请求方提交的数据,同时还要查询请求方传过来的要修改的数据的id,在用户id存在的情况下才执行修改,因此这里要先根据id从数据库中查询记录是否存在,如果存在就把修改数据,并提交操作,如果不存在,则不处理。当然如果过程出现异常,也做异常处理。

 

6、删除指定用户数据

@app.route('/user/<int:userId>', methods=['DELETE'])
def deleteUser(userId):
    User.query.filter_by(user_id=userId).delete()
    mydb.session.commit()
    return getUsers()

 

至此,关于用户表单表查询的五个接口(用户列表及用户的增删改查)都已实现,程序代码如下:

from flask_sqlalchemy import SQLAlchemy
from flask import Flask, jsonify, request
import configparser

app = Flask(__name__)

my_config = configparser.ConfigParser()
my_config.read('db.conf')

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://' + my_config.get('DB', 'DB_USER') + ':' + my_config.get('DB', 'DB_PASSWORD') + '@' + my_config.get('DB', 'DB_HOST') + '/' + my_config.get('DB', 'DB_DB')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

mydb = SQLAlchemy()
mydb.init_app(app)

# 用户模型
class User(mydb.Model):
    user_id = mydb.Column(mydb.Integer, primary_key=True)
    user_name = mydb.Column(mydb.String(60), nullable=False)
    user_password = mydb.Column(mydb.String(30), nullable=False)
    user_nickname = mydb.Column(mydb.String(50))
    user_email = mydb.Column(mydb.String(30), nullable=False)
    def __repr__(self):
        return '<User %r>' % self.user_name

# 获取用户列表
@app.route('/users', methods=['GET'])
def getUsers():
    data = User.query.all()
    datas = []
    for user in data:
        datas.append({'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email})
    return jsonify(data=datas)

# 添加用户数据
@app.route('/user', methods=['POST'])
def addUser():
    user_name = request.form.get('user_name')
    user_password = request.form.get('user_password')
    user_nickname = request.form.get('user_nickname')
    user_email = request.form.get('user_email')
    user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)
    try:
        mydb.session.add(user)
        mydb.session.commit()
    except:
        mydb.session.rollback()
        mydb.session.flush()
    userId = user.user_id
    if (user.user_id is None):
        result = {'msg': '添加失败'}
        return jsonify(data=result)

    data = User.query.filter_by(user_id=userId).first()
    result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
    return jsonify(data=result)

# 获取单条数据
@app.route('/user/<int:userId>', methods=['GET'])
def getUser(userId):
    user = User.query.filter_by(user_id=userId).first()
    if (user is None):
        result = {'msg': '找不到数据'}
    else:
        result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
    return jsonify(data=result)

# 修改用户数据
@app.route('/user/<int:userId>', methods=['PATCH'])
def updateUser(userId):
    user_name = request.form.get('user_name')
    user_password = request.form.get('user_password')
    user_nickname = request.form.get('user_nickname')
    user_email = request.form.get('user_email')
    try:
        user = User.query.filter_by(user_id=userId).first()
        if (user is None):
            result = {'msg': '找不到要修改的记录'}
            return jsonify(data=result)
        else:
            user.user_name = user_name
            user.user_password = user_password
            user.user_nickname = user_nickname
            user.user_email = user_email
            mydb.session.commit()
    except:
        mydb.session.rollback()
        mydb.session.flush()
    userId = user.user_id
    data = User.query.filter_by(user_id=userId).first()
    result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_password': user.user_password, 'user_nickname': user.user_nickname, 'user_email': user.user_email}
    return jsonify(data=result)

# 删除用户数据
@app.route('/user/<int:userId>', methods=['DELETE'])
def deleteUser(userId):
    User.query.filter_by(user_id=userId).delete()
    mydb.session.commit()
    return getUsers()


if __name__ == '__main__':
    app.run(debug=True)

 

那时那我

随遇,随缘,随安,随喜!

One thought to “Flask使用Flask-SQLAlchemy操作MySQL数据库(二)——单表查询”

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.