基于 Python + redis + flask 的在线聊天室

从零搭建一个在线聊天室 整体技术栈 flask 框架 flask_login 的使用 WebSocket 简单应用 Redis 应用 flask_socketio 的使用 搭建权限框架 还是使用 Flask 来搭建后台应用

本文包含相关资料包-----> 点击直达获取<-------

从零搭建一个在线聊天室

整体技术栈

  1. flask 框架
  2. flask_login 的使用
  3. WebSocket 简单应用
  4. Redis 应用
  5. flask_socketio 的使用

搭建权限框架

还是使用 Flask 来搭建后台应用,使用 flask-login 扩展来处理用户登陆鉴权逻辑。 首先定义登陆表单

class LoginForm(FlaskForm): username = StringField('Username', validators=[DataRequired(), ]) password = PasswordField('Password', validators=[DataRequired()]) remember_me = BooleanField('Keep me logged in') submit = SubmitField('Log in')

接下来定义数据库结构

class User(UserMixin, db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) password = db.Column(db.String(64))

当前,我们只需要一个 user 用户表,只包含三个字段的简单表。用户密码也只是简单的保存了明文,后面再处理用户密码的 hash 问题。

下面就可以定义用户登陆表单

``` from flask_login import LoginManager

login_manager = LoginManager() login_manager.session_protection = 'strong' login_manager.login_view = 'login' app = Flask( name ) login_manager.init_app(app) app.config['SECRET_KEY'] = 'hardtohard'

@login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id))

@app.route('/login', methods=['GET', 'POST']) def login(): form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user: login_user(user) return redirect(url_for('chat')) return render_template('login.html', form=form)

```

这里定义了,只检查用户名是否存在,如果存在,就执行 login_user() 函数,登陆。用户密码的使用,也留到后面再做处理。

其中 load_user,是回调函数,将获取到的 user 对象存储到浏览器的 session 中,然后在调用 login_user 函数时,就会调用 load_user 来把真正需要登陆的用户设置到 session 中。当登陆成功后,就会跳转到 chat 函数所对应的页面。

chat 函数比较简单,只是展示一个网页

@app.route('/chat', methods=['GET', 'POST']) @login_required def chat(): return render_template('chat.html')

使用 login_required 装饰器,保证该函数只允许登陆的用户访问。

增加些初始化函数

``` @app.route('/adddb', methods=['GET', 'POST']) def addbd(): db.create_all() return "OK"

@app.route('/deldb', methods=['GET', 'POST']) def delbd(): db.drop_all() return "OK"

@app.route('/adduser/ ', methods=['GET', 'POST']) def adduser(user): user = User(username=user, password='admin') db.session.add(user) db.session.commit() return "OK" ```

增加了初始化数据库和新增用户的函数。

构建前端页面

首先处理登陆页面,在 login.html 中添加

``` {% extends "bootstrap/base.html" %} {% import "bootstrap/wtf.html" as wtf %}

{% block title %}Flasky{% endblock %}

{% block navbar %}

{% endblock %}

{% block content %}

{{ wtf.quick_form(form) }}

{% endblock %} ```

使用扩展库 flask_bootstrap 来快速构建页面。

下面重点来看看 chat 页面。 首先来看看主体页面,在 chat.html 中填入代码

``` {% extends 'bootstrap/base.html' %} {% import "bootstrap/wtf.html" as wtf %} {% block title %}Kung Fu Realm{%endblock %} {% block head %} Hi Hi 聊天室

{% endblock %} {% block content %}

ROOM: 聊天室123哈哈哈

{% if current_user.is_authenticated %}
{% else %}
{% endif %}
小HI
小黄鸭
{% if current_user.is_authenticated %}
Welcome to Hihi Chat Room. 欢迎来到 Hihi 聊天室。
小黄鸭
{% else %}
您还没有登陆,先和小黄鸭聊聊吧。
小黄鸭
{% endif %}
{% if current_user.is_authenticated %} {% else %} {% endif %}
群聊
{% if current_user.is_authenticated %} {% else %} {% endif %}
``` 整体效果如下,是不是挺少女系的。 ![](https://www.writebug.com/myres/static/uploads/2022/1/8/bb2b570e65a687a7c4f07c0047d1afa8.writebug) 当用户在点击“提交”按钮后,调用 JS 函数 ``` /*用户登陆的用户点击提交按钮发送消息按钮*/ $('#sub_but_login').click(function(event){ sendMessageLogin(event, fromname, to_uid, to_uname); }); ``` 为了后面便于扩展,将未登录的用户特别区分开来,后面也许同样允许未登陆用户访问该页面,但是只能同机器人小黄鸭聊天 ``` /*用户未登陆的用户点击提交按钮发送消息按钮*/ $('#sub_but').click(function(event){ sendMessage(event, fromname, to_uid, to_uname); }); ``` 在来看函数 sendMessageLogin ``` function sendMessageLogin(event, from_name, to_uid, to_uname){ var msg = $("#message").val(); var myDate = new Date(); var myTime = myDate.toLocaleTimeString(); var itTime = myDate.toLocaleString(); //var iTime = myDate.toDateString(); var htmlData = '
' + '
{% if current_user.is_authenticated %} {% endif %}
' + '
' + '
' + msg + '
' + '
' + from_name + ' · ' + itTime +'
' + '
' + '
'; $("#message_box").append(htmlData); $('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20); $("#message").val(''); setTimeout(function(){sendToServer(from_name, msg)}, 1000); //延时调用 } ``` > 接收几个参数,然后将当前会话消息追加到 HTML 页面中,并且调用真正的后台 API 函数 sendToServer ``` function sendToServer(name, msg){ var xmlhttp = new XMLHttpRequest(); var myDate = new Date(); //var myTime = myDate.toLocaleTimeString(); var myTime = myDate.toLocaleString(); xmlhttp.onreadystatechange=function() { if (xmlhttp.readyState == 4 && xmlhttp.status == 200) { myObj = xmlhttp.responseText; var htmlData2 = '
' + '
' + '
' + '
' + myObj + '
' + '
' + '小黄鸭' + ' · ' + myTime +'
' + '
' + '
'; $("#message_box").append(htmlData2); $('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20); } } xmlhttp.open("GET", "/api/sendchat/" + msg, true); xmlhttp.send(); }; ``` > sendToServer 函数调用后台 API,并把返回接收到的消息回写到 HTML 页面中。 而目前的后台 API 也比较简单,直接返回用户输入的消息 ``` @app.route('/api/sendchat/ ', methods=['GET', 'POST']) @login_required def send_chat(info): return info ``` 这样,一个整体的聊天室架子就搭建好了,接下来我们再接入 Redis,来实现聊天功能。 ### 应用 Redis 我这里使用 Redis 来作为后端数据存储工具。大家如果有自己的 Redis 服务器当然是最好了,如果没有的话,推荐下在线的 Redis 免费应用 redislabs,大家可以自行体验下,[redislabs.com/](https://link.juejin.cn?target=https%3A%2F%2Fredislabs.com%2F) 下面连接到 Redis 服务器并打开连接池 ``` pool = redis.ConnectionPool(host='redis-12143.c8.us-east-1-3.ec2.cloud.redislabs.com', port=12143, decode_responses=True, password='pkAWNdYWfbLLfNOfxTJinm9SO1') r = redis.Redis(connection_pool=pool) ``` Redis 中数据结构及用法如下: chat-{ChatRoomName},聊天室及加入的用户,zset 类型 msg-{ChatRoomName},每个聊天室对应的消息,zset 类型 当前结构比较简单,暂时只定义了两个域,分别用来存储聊天室和消息。 ### 完善 chat 视图功能 在前面的代码中,chat 视图函数仅仅是返回了一个 HTML 页面,并没有任何功能逻辑,现在要完善下。最新的代码如下: ``` @app.route('/chat', methods=['GET', 'POST']) @login_required def chat(): rname = request.args.get('rname', "") ulist = r.zrange("chat-" + rname, 0, -1) messages = r.zrange("msg-" + rname, 0, -1, withscores=True) msg_list = [] for i in messages: msg_list.append([json.loads(i[0]), time.strftime("%Y/%m/%d %p%H:%M:%S", time.localtime(i[1]))]) return render_template('chat.html', rname=rname, user_list=ulist, msg_list=msg_list) ``` > 其中 rname 是其他函数传值过来的,我们后面再说。 r.zrange() 函数就是从 Redis 中取出对应聊天室的用户列表和历史聊天记录,最后就是把相关的信息返回到模板中。 ### 创建及加入聊天室 在 chat 视图中,我们传入了一个 rname 字段,这个字段就是当创建或者加入聊天室时,需要传递过来的。 #### 创建聊天室 ``` @app.route('/createroom', methods=["GET", 'POST']) @login_required def create_room(): rname = request.form.get('chatroomname', '') if r.exists("chat-" + rname) is False: r.zadd("chat-" + rname, current_user.username, 1) return redirect(url_for('chat', rname=rname)) else: return redirect(url_for('chat_room_list')) ``` > 判断聊天室名称是否存在,如果不存在,则将当前用户在 Redis 中创建并跳转至 chat 函数;否则跳转至聊天室列表页面。 #### 加入聊天室 ``` @app.route('/joinroom', methods=["GET", 'POST']) @login_required def join_chat_room(): rname = request.args.get('rname', '') if rname is None: return redirect(url_for('chat_room_list')) r.zadd("chat-" + rname, current_user.username, time.time()) return redirect(url_for('chat', rname=rname)) ``` > 这里是从前端获取到聊天室名称(rname),并将当前用户名加入到对应的聊天室中。 到这里,Redis 中的聊天室就处理完成了,下面再来看看其他的一些辅助功能。 ### 一些辅助功能 #### 一、聊天室列表 既然有加入聊天室的功能,那么就要提供一个列表供用户选择聊天室。 后台逻辑代码: ``` @app.route('/roomlist', methods=["GET", 'POST']) @login_required def chat_room_list(): roomlist_tmp = r.keys(pattern='chat-*') roomlist = [] for i in roomlist_tmp: i_str = str(i, encoding='utf-8') istr_list = i_str.split('-', 1) roomlist.append(istr_list[1]) return render_template('chatroomlist.html', roomlist=roomlist) ``` > 比较简单,到 Redis 中拿到所有以“chat-”开头的 key 值,然后处理成列表返回到前端即可。 前台页面代码: ``` {% extends "bootstrap/base.html" %} {% import "bootstrap/wtf.html" as wtf %} {% block title %}Flasky{% endblock %} {% block navbar %} {% endblock %} {% block content %}
{% endblock %} ``` > 就是循环渲染列表数据,和一个创建聊天室的表单。 #### 二、退出操作 当用户退出登陆时,我们当前也希望该用户同时退出聊天室,所以修改 logout 函数如下: ``` @app.route('/logout') @login_required def logout(): rname = request.args.get("rname", "") r.zrem("chat-" + rname, current_user.username) logout_user() return redirect(url_for('login')) ``` > 从前端拿到聊天室的名字,并在 Redis 的对应 zset 中删除当前用户。 #### 三、用户头像 为了聊天室的美观,不同用户需要拥有不同的头像,这里还是使用 gravatar 这个免费的头像服务。 在 User 模型中添加代码: ``` class User(UserMixin, db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) password = db.Column(db.String(64)) avatar_hash = db.Column(db.String(32)) def gravatar(self, name=None, size=100, default='identicon', rating='g'): if request.is_secure: url = 'https://secure.gravatar.com/avatar' else: url = 'http://www.gravatar.com/avatar' if name is not None: email = name + "@hihichat.com" else: email = self.username + "@hihichat.com" myhash = self.avatar_hash or hashlib.md5(email.encode('utf-8')).hexdigest() return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(url=url, hash=myhash, size=size, default=default, rating=rating) ``` > gravatar 函数,对于登陆的用户,使用其注册的邮箱来生成头像,对于未登录用户,这里就使用一个固定的邮箱来处理。 ### 消息推送逻辑 下面就开始编写最主要的消息推送逻辑。 我采用的技术是 WebSocket,这样节省了使用 AJAX 轮询带来的额外开销。而且 flask 框架也有很好的 WebSocket 相关的扩展库供我们使用,即 flask-sokcetio。 首先安装好 flask_socketio 模块,然后引入并初始化 ``` from flask_socketio import SocketIO, emit socketio = SocketIO() app = Flask(__name__) socketio.init_app(app) ``` 编写一个 socket 发送消息的函数 ``` def socket_send(data, user): emit("response", {"code": '200', "msg": data, "username": user}, broadcast=True, namespace='/testnamespace') socketio.on_event('request_for_response', socket_send, namespace='/testnamespace') ``` > 其中 request_for_response,response 和 testnamespace 都需要和前端代码相对应。request_for_response 是用来接收前端传递到后端的消息,response 是后端传递消息到前端时的标识,而 namespace 则类似于作用域的概念,相互传递的消息都仅仅作用在 testnamespace 这个 namespace 中。 前端 JavaScript 代码: ``` //websocket var websocket_url = 'ws://' + document.domain + ':' + location.port + '/testnamespace'; var socket = io.connect(websocket_url); //发送消息到后端 socket.emit('request_for_response',{'param':'{{rname}}'}); //监听回复的消息 socket.on('response',function(data){ var myDate = new Date(); var myTime = myDate.toLocaleString(); var msg = data.msg; var username = data.username; var currentuser = '{{ current_user.username }}'; console.log(currentuser); if ( currentuser == username ) { username = '你'; }; var hash = md5(username + "@hihichat.com"); var htmlData2 = '
' + '
' + '
' + '
' + msg + '
' + '
' + username + ' · ' + myTime +'
' + '
' + '
'; $("#message_box").append(htmlData2); $('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20); }); ``` > 关于更多的 WebSocket 用法,大家可以自行查找相关资料,这里就不做过多介绍了。 最后,编写接收聊天内容的 API ``` @app.route('/api/sendchat/ ', methods=['GET', 'POST']) @login_required def send_chat(info): rname = request.form.get("rname", "") body = {"username": current_user.username, "msg": info} r.zadd("msg-" + rname, json.dumps(body), time.time()) socket_send(info, current_user.username) return info ``` > 将接收到的聊天内容插入到对应的 Redis 中(msg-*),然后调用 WebSocket 函数,广播刚刚收到的消息到所有已经连接的 socket 客户端。 ### 效果图展示 登陆页面: ![](https://www.writebug.com/myres/static/uploads/2022/1/8/1e1967d7bb5dd97b99a7ae543e936538.writebug) index 页面: ![](https://www.writebug.com/myres/static/uploads/2022/1/8/1e1967d7bb5dd97b99a7ae543e936538.writebug) 聊天室列表页面: ![](https://www.writebug.com/myres/static/uploads/2022/1/8/bb3971e9ef02bd675d8bf68e8044e853.writebug) 聊天室页面: ![](https://www.writebug.com/myres/static/uploads/2022/1/8/100385db6f463066eea764cc509ff666.writebug) 到此为止,其实我们已经完成了一个简单的聊天室功能。但是呢,该程序还有很多功能需要优化,比如程序代码结构(当前所有后台逻辑代码都在一个文件中),聊天室控制(禁言,踢人等),以及非登陆用户的处理,还要比较有意思的聊天机器人等待。 --- 这里开始,就是一些进阶的功能,完善我们的聊天室。 ### 调整项目结构 随着我们项目功能越来越多,把所有的逻辑代码都写在一个文件里已经不太合适了,下面就通过 flask 的工厂模式,把项目代码拆分开。 首先来看下拆分后的项目结构: ![](https://www.writebug.com/myres/static/uploads/2022/1/8/45a0351212247f57006e364fefc7d586.writebug) main 中主要存放后台逻辑代码。 static 中存放 js,CSS 以及用到的图片等。 templates 中存放 HTML 模板。 models.py 中是数据库模型。 config.py 中是一些公共的配置信息。 manage.py 中是项目的启动信息。 下面我们分别来看看各个模块对应的代码 ### 具体代码拆分 #### 1. 配置信息 在 config.py 中,填入代码: ``` import os import redis basedir = os.path.abspath(os.path.dirname(__file__)) class Config: SECRET_KEY = 'hardtohard' SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(basedir, 'chat.sqlite3') @staticmethod def init_app(app): pass class DevelopmentConfig(Config): pass class TestingConfig(Config): pass class ProductionConfig(Config): pass config = { 'development': DevelopmentConfig, 'testing': TestingConfig, 'production': ProductionConfig, 'default': DevelopmentConfig } ``` #### 2. 使用工厂函数 在 app/_*init*_.py 中填入代码: ``` from flask import Flask from flask_login import LoginManager from flask_sqlalchemy import SQLAlchemy from flask_bootstrap import Bootstrap from flask_socketio import SocketIO from config import config login_manager = LoginManager() login_manager.session_protection = 'strong' login_manager.login_view = 'main.login' db = SQLAlchemy() bootstrap = Bootstrap() socketio = SocketIO() def create_app(config_name): app = Flask(__name__) app.config.from_object(config[config_name]) config[config_name].init_app(app) socketio.init_app(app) login_manager.init_app(app) db.init_app(app) bootstrap.init_app(app) # 注册蓝本 from .main import main as main_blueprint app.register_blueprint(main_blueprint) return app ``` > create_app 函数就是程序的工厂函数,它接受一个配置名的参数。 #### 3. 使用蓝本 蓝本和程序类似,也可以定义路由。不同的是,在蓝本中定义的路由处于休眠状态,直到蓝本注册到程序上后,路由才真正成为程序的一部分。 在 main/_*init*_.py 中创建蓝本 ``` from flask import Blueprint main = Blueprint('main', __name__) from . import views, forms ``` > 通过实例化一个 Blueprint 类对象可以创建蓝本。这个构造函数有两个必须指定的参数: 蓝本的名字和蓝本所在的包或模块。和程序一样,大多数情况下第二个参数使用 Python 的 _*name*_ 变量即可。 #### 4. 修改 view 视图 对于视图函数,需要导入相关的包,同时由于使用了蓝本,原来用来装饰路由的 app.route 都要修改为 main.route,url_for 函数也需要增加 main 作用域,修改后的部分代码如下: ``` from flask import render_template, redirect, url_for, request from flask_login import login_required, login_user, logout_user, current_user from . import main from .. import db from .forms import LoginForm from ..models import User from config import config import time import json from ..socket_conn import socket_send pool = redis.ConnectionPool(host='redis-12143.c8.us-east-1-3.ec2.cloud.redislabs.com', port=12143, decode_responses=True, password='pkAWNdYWfbLLfNOfxTJinm9SO16eSJFx') r = redis.Redis(connection_pool=pool) @main.route('/login', methods=['GET', 'POST']) def login(): form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user: login_user(user) return redirect(url_for('main.index')) return render_template('login.html', form=form) @main.route('/createroom', methods=["GET", 'POST']) @login_required def create_room(): rname = request.form.get('chatroomname', '') if r.exists("chat-" + rname) is False: r.zadd("chat-" + rname, current_user.username, 1) return redirect(url_for('main.chat', rname=rname)) else: return redirect(url_for('main.chat_room_list')) ``` #### 5. 编写 socket 连接函数 在 models.py 的同级目录下创建 socket_conn.py 文件,添加代码如下: ``` from . import socketio from flask_socketio import emit @socketio.on('request_for_response', namespace='/testnamespace') def socket_send(data, user): emit("response", {"code": '200', "msg": data, "username": user}, broadcast=True, namespace='/testnamespace') ``` > 该函数供视图函数调用,广播 socket 消息。 #### 6. 完成 forms 和 models 将原来的表单代码和数据库模型代码分别拷贝到这两个文件中 forms.py ``` from wtforms import StringField, PasswordField, BooleanField, SubmitField from wtforms.validators import DataRequired from flask_wtf import FlaskForm class LoginForm(FlaskForm): username = StringField('Username', validators=[DataRequired(), ]) password = PasswordField('Password', validators=[DataRequired()]) remember_me = BooleanField('Keep me logged in') submit = SubmitField('Log in') ``` models.py ``` from . import db from flask_login import UserMixin from flask import request import hashlib class User(UserMixin, db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) password = db.Column(db.String(64)) avatar_hash = db.Column(db.String(32)) def gravatar(self, name=None, size=100, default='identicon', rating='g'): if request.is_secure: url = 'https://secure.gravatar.com/avatar' else: url = 'http://www.gravatar.com/avatar' if name is not None: email = name + "@hihichat.com" else: email = self.username + "@hihichat.com" myhash = self.avatar_hash or hashlib.md5(email.encode('utf-8')).hexdigest() return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(url=url, hash=myhash, size=size, default=default, rating=rating) ``` #### 7. 修改模板 把 HTML 模板里的 url_for() 函数都增加 main.,再放置到 templates 下面即可。 #### 8. 启动脚本 顶级文件夹中的 manage.py 文件用于启动程序。 ``` import os from app import create_app, socketio app = create_app(os.getenv('FLASK_CONFIG') or 'default') if __name__ == '__main__': socketio.run(app, debug=True) ``` > 还是使用 socketio.run 的方式启动应用。 至此,代码拆分完毕。 ### 功能增强 #### 1. 新增用户 以前我们都是使用浏览器 URL 直接新增用户的,即函数 adduser,现在我们做一个简单的页面,来规范这个操作。 定义表单 ``` class CreateUserForm(FlaskForm): username = StringField('Username', validators=[DataRequired()]) password = PasswordField('Password', validators=[DataRequired(), EqualTo('password2', message='Password must match.')]) password2 = PasswordField('Confirm password', validators=[DataRequired()]) submit = SubmitField('Create User') def validate_username(self, field): if User.query.filter_by(username=field.data).first(): raise ValidationError('Username already in use.') ``` > 定义了一个函数,来校验用户名是否重复。 修改原来的视图函数 adduser ``` @main.route('/adduser', methods=['GET', 'POST']) @login_required def adduser(): form = CreateUserForm() if form.validate_on_submit(): user = User(username=form.username.data, password=form.password.data) db.session.add(user) db.session.commit() return redirect(url_for('main.index')) return render_template('adduser.html', form=form) ``` 还要再修改下 User 模型,因为当前保存的是明文密码,修改成使用 hash 存储。 ``` @property def password(self): raise AttributeError('password is not a readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) def verify_password(self, password): return check_password_hash(self.password_hash, password) ``` > 分别设置密码的只读权限,以及 hash 计算和验证功能。 接下来编写 HTML 模板 ``` {% extends "bootstrap/base.html" %} {% import "bootstrap/wtf.html" as wtf %} {% block title %}Flasky{% endblock %} {% block navbar %} {% endblock %} {% block content %}
{{ wtf.quick_form(form) }}
{% endblock %} ``` 至此,一个简单的新增用户功能就好了。当然,我们还可以增加删除用户,重置密码等功能,这些的具体实现,都可以在 GitHub 的代码中看到,就不再赘述了。 #### 2. 权限控制 我们其实并不希望所有人都能够创建聊天室,那么就要做一个简单的控制功能。 首先定义一个 permission 表,用来存储创建聊天室等权限,再定义一个用户和权限的关联关系表 ``` class Permission(db.Model): id = db.Column(db.Integer, primary_key=True) permission_name = db.Column(db.String(64), unique=True, index=True) class RelUserPermission(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer) permission_id = db.Column(db.Integer) ``` 然后我们还需要一个增加权限的表,以及一个用户列表页面 在 forms.py 中添加 ``` class EditUserForm(FlaskForm): permission = SelectMultipleField('Permission', coerce=int) submit = SubmitField('Submit') def __init__(self, user, *args, **kwargs): super(EditUserForm, self).__init__(*args, **kwargs) self.permission.choices = [(per.id, per.permission_name) for per in Permission.query.order_by(Permission.permission_name).all()] self.user = user ``` > 定义了一个初始化函数,会获取到 Permission 表中的 name,id 等信息 接下来编写视图函数 ``` @main.route('/listuser/', methods=['GET', 'POST']) @login_required def listuser(): user_list = User.query.all() return render_template('listuser.html', user_list=user_list) @main.route('/addper/', methods=['GET', 'POST']) @login_required def addper(): form = CreatePerForm() if form.validate_on_submit(): per = Permission(permission_name=form.permissionname.data) db.session.add(per) db.session.commit() return redirect(url_for('main.index')) return render_template('addper.html', form=form) @main.route('/edituser/ /', methods=['GET', 'POST']) @login_required def edituser(id): user = User.query.filter_by(id=id).first() form = EditUserForm(user=user) if form.validate_on_submit(): for p in form.permission.data: rup = RelUserPermission(user_id=id, permission_id=p) db.session.add(rup) db.session.commit() return redirect(url_for('main.index')) return render_template('edituser.html', form=form) ``` > 三个函数,分别是展示用户列表,增加权限,以及为用户添加权限。 然后再修改下 chat_room_list 函数,使得没有权限的用户不能展示创建聊天室的表单。 ``` @main.route('/roomlist/', methods=["GET", 'POST']) @login_required def chat_room_list(): roomlist_tmp = r.keys(pattern='chat-*') roomlist = [] can_create = False create_room_id = Permission.query.filter_by(permission_name='createroom').first().id rel_user_id = RelUserPermission.query.filter_by(user_id=current_user.id).first() rel_permission = RelUserPermission.query.filter_by(user_id=current_user.id).first() if rel_permission and rel_user_id and create_room_id: rel_permission_id = rel_permission.permission_id if rel_permission_id == create_room_id: can_create = True for i in roomlist_tmp: i_str = str(i) istr_list = i_str.split('-', 1) roomlist.append(istr_list[1]) return render_template('chatroomlist.html', roomlist=roomlist, can_create=can_create) ``` > 这里主要是判断用户是否拥有 createroom 权限,其实还有一种更加简便,但是稍微有些绕的鉴权方式,可以在文末的链接中找到,大家也可以尝试下。 最后处理 HTML 表单 对于聊天室列表页面: ``` {% if can_create %}
{% endif %} ``` 对于用户列表页面: ``` {% extends "bootstrap/base.html" %} {% import "bootstrap/wtf.html" as wtf %} {% block title %}Flasky{% endblock %} {% block navbar %} {% endblock %} {% block content %}
{% for user in user_list %} {{ user.username }} {% endfor %}
{% endblock %} ``` > 这里为了方便起见,当点击用户时,就会跳转至编辑用户权限的页面。 现在,没有权限的用户,就不能看到创建聊天室的表单喽! #### 3. 登陆优化 当前的登陆,只要用户名是正确的,不会验证密码,直接登陆成功,现在来处理下密码校验功能。其实也简单,我们在 User 模型中新增了一个函数 verify_password,只要登陆的时候,调用该函数来验证密码即可。 ``` @main.route('/login/', methods=['GET', 'POST']) def login(): form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user is not None and user.verify_password(form.password.data): login_user(user) return redirect(url_for('main.index')) return render_template('login.html', form=form) ``` ok,密码错误的你,已经没法再登陆了。 #### 4. 放开非登陆也可进入聊天室 1.去掉 chat_room_list,join_chat_room,send_chat 和 chat 视图函数的登陆装饰器 @login_required 2.修改 chat_room_list,判断当前用户是否已经登陆 ``` @main.route('/roomlist/', methods=["GET", 'POST']) def chat_room_list(): roomlist_tmp = r.keys(pattern='chat-*') roomlist = [] can_create = False create_room = Permission.query.filter_by(permission_name='createroom').first() if current_user.is_authenticated: # 判断用户是否登陆 rel_user_id = RelUserPermission.query.filter_by(user_id=current_user.id).first() rel_permission = RelUserPermission.query.filter_by(user_id=current_user.id).first() if rel_permission and rel_user_id and create_room: rel_permission_id = rel_permission.permission_id create_room_id = create_room.id if rel_permission_id == create_room_id: can_create = True for i in roomlist_tmp: i_str = str(i) istr_list = i_str.split('-', 1) roomlist.append(istr_list[1]) return render_template('chatroomlist.html', roomlist=roomlist, can_create=can_create) ``` 3.导航栏增加 room list 入口 ``` ``` 4.chat 视图函数增加判断逻辑 ``` @main.route('/chat/', methods=['GET', 'POST']) def chat(): rname = request.args.get('rname', "") ulist = r.zrange("chat-" + rname, 0, -1) messages = r.zrange("msg-" + rname, 0, -1, withscores=True) msg_list = [] for i in messages: msg_list.append([json.loads(i[0]), time.strftime("%Y/%m/%d %p%H:%M:%S", time.localtime(i[1]))]) if current_user.is_authenticated: return render_template('chat.html', rname=rname, user_list=ulist, msg_list=msg_list) else: email = "youke" + "@hihichat.com" hash = hashlib.md5(email.encode('utf-8')).hexdigest() gravatar_url = 'http://www.gravatar.com/avatar/' + hash + '?s=40&d=identicon&r=g' return render_template('chat.html', rname=rname, user_list=ulist, msg_list=msg_list, g=gravatar_url) ``` 5.修改 send_chat 视图 ``` @main.route('/api/sendchat/ ', methods=['GET', 'POST']) def send_chat(info): if current_user.is_authenticated: rname = request.form.get("rname", "") body = {"username": current_user.username, "msg": info} r.zadd("msg-" + rname, json.dumps(body), time.time()) socket_send(info, current_user.username) return info else: return info ``` > 当前对于未登陆的用户(游客),直接回复游客发送的消息。 ### 清理过期消息 由于我们需要定时清理 Redis 中保存的聊天记录,那么就需要一个定时任务。flask 有一个完善的插件 flask-apscheduler,但是简单实验了下,限制还是挺多的,所以,我这里选择自己实现一个简单的定时器功能。 创建一个 tasks.py 文件 首先定义定时器类 ``` from threading import Timer class Scheduler(object): def __init__(self, sleep_time, func, mytime=None): self.sleep_time = sleep_time self.func = func self._t = None self.mytime = mytime def start(self): if self._t is None: self._t = Timer(self.sleep_time, self._run) self._t.start() else: raise Exception("this timer is already running") def _run(self): if self.mytime is not None: self.func(self.mytime) else: self.func() self._t = Timer(self.sleep_time, self._run) self._t.start() def stop(self): if self._t is not None: self._t.cancel() self._t = None @staticmethod def init_app(app): pass ``` > 使用线程中的 Timer 来调用真正的函数,通过 sleep time 的方式达到定时调用的效果。 然后编写需要定时调用的函数,即清理数据的函数。 ``` def keep_msg(mytime=None): if mytime is not None: expare_time = mytime else: expare_time = 604800 msg_list = r.keys("msg-*") for msg in msg_list: _ = r.zrange(msg, 0, 0) for i in _: score = r.zscore(msg, i) if time.time() - score > expare_time: r.zrem(msg, i) ``` > 比较简单,判断 Redis 中的 score 是否处于过期时间,是,则删除。 接下来注册函数到我们的 flask 应用当中。 在 _*init*_.py 中填入如下代码: ``` from .tasks import Scheduler, keep_msg sch = Scheduler(86400, keep_msg) # 每间隔一天执行 def create_app(config_name): ... sch.init_app(app) ... return app ``` 最后还要注意的是,由于我们前面是使用 socketio 来启动的应用,因为 socketio 是异步 io,而我们的 scheduler 是阻塞运行的,所以需要在 socketio 中创建子线程来启动。 修改 manage.py 如下: ``` import os from app import create_app, socketio, sch app = create_app(os.getenv('FLASK_CONFIG') or 'default') if __name__ == '__main__': my = sch.start socketio.start_background_task(target=my) # 启动一个子线程 socketio.run(app, debug=True) ``` 这样,一个简单的定时任务就做好了。 ### 禁言功能 正所谓“林子大了,什么鸟都有”,当聊天室人数很多的时候,经常会出现一些不和谐的话语和人,那么禁言功能就很有必要了。 首先在 views 中创建一个新的函数 ``` @main.route('/chat/block/roomuser/', methods=['GET', 'POST']) @login_required def block_roomuser(): rname = request.args.get('rname', "") new_b_user = request.args.get('b_user', "") b_time = request.args.get('b_time', "") if b_time is "": r.set('b_user-' + new_b_user, new_b_user, ex=None) else: r.set('b_user-' + new_b_user, new_b_user, ex=b_time) return redirect(url_for('main.room_user_list', rname=rname)) ``` > 从前端获取到对应的聊天室名字、需要禁言的用户和禁言时间,然后根据禁言时间,把用户添加到 Redis 中。 再来看看禁言功能的入口函数 ``` @main.route('/chat/roomuser/list', methods=['GET', 'POST']) @login_required def room_user_list(): rname = request.args.get('rname', "") ulist = r.zrange("chat-" + rname, 0, -1) b_user = r.keys('b_user-*') b_user_list = [] for b in b_user: b_user_list.append(r.get(b)) return render_template('roomuser_list.html', ulist=ulist, rname=rname, b_user=b_user_list) ``` > 从 Redis 对应的有序集合中取出正处于禁言状态的用户,把这些用户传递到模板供渲染使用。 对应的 roomuser_list.html 代码为: ```
{% for user in ulist %}

{% if user in b_user %} 禁言中。。。 {% endif %}

解禁 踢出

{% endfor %}
``` > 方便起见,直接使用 bootstrap 框架渲染页面。同时这里取了个巧,在“解禁”的时候,只是传入 b_time 为 1,这样 1 秒之后,用户就自动从 Redis 中过期了,也就成功解禁了。 最后,再来处理聊天室的消息,禁言的用户,当然不能再发消息啦。 在 chat 函数中,添加代码: ``` @main.route('/chat/', methods=['GET', 'POST']) def chat(): ... b_user = r.keys('b_user-*') b_user_list = [] for b in b_user: b_user_list.append(r.get(b)) ... if current_user.is_authenticated: return render_template('chat.html', rname=rname, user_list=ulist, msg_list=msg_list, b_user_list=b_user_list) ``` > 把处于禁言的用户取出,传递给模板。 在 send_chat 函数中添加代码: ``` @main.route('/api/sendchat/ ', methods=['GET', 'POST']) def send_chat(info): ... b_user = r.exists('b_user-%s' % current_user.username) if b_user: data = json.dumps({'code': 201, 'msg': 'Your are under block now!'}) return data ... ``` > 如果用户处于禁言状态,直接返回 JSON 消息。 修改 chat.html 中的 JavaScript 函数 sendToServer,增加代码如下: ``` var jsondata = JSON.parse(myObj); if ( jsondata.code == 201 || jsondata.code == 403) { var htmlData3 = '
' + '
' + '
' + '
' + "自动回复: " + jsondata.msg + '
' + '
' + '小黄鸭' + ' · ' + myTime +'
' + '
' + '
'; $("#message_box").append(htmlData3); $('#message_box').scrollTop($("#message_box")[0].scrollHeight + 20); } ``` > 判断返回的 JSON 中 code 值如果是 201 或 403,则由小黄鸭自动回复消息。 最后的效果如下: ![](https://www.writebug.com/myres/static/uploads/2022/1/8/a7811015d7ce186df986c61e79461565.writebug) ### 踢人功能 如果在聊天室中,这个人真的让人忍无可忍,那么踢人就是最好的办法了。 其实实现思想和逻辑都和禁言相类似,这里直接给出部分代码 新增函数 kick_roomuser ``` @main.route('/chat/kick/roomuser/', methods=['GET', 'POST']) @login_required def kick_roomuser(): rname = request.args.get("rname", "") del_user = request.args.get("del_user", "") r.zrem("chat-" + rname, del_user) return redirect(url_for('main.room_user_list', rname=rname)) ``` 修改 send_chat 函数 ``` @main.route('/api/sendchat/ ', methods=['GET', 'POST']) def send_chat(info): ... if current_user.is_authenticated: rname = request.form.get("rname", "") ulist = r.zrange("chat-" + rname, 0, -1) if current_user.username in ulist: body = {"username": current_user.username, "msg": info} r.zadd("msg-" + rname, json.dumps(body), time.time()) socket_send(info, current_user.username) data = json.dumps({'code': 200, 'msg': info}) return data else: data = json.dumps({'code': 403, 'msg': 'You are not in this room'}) return data else: data = json.dumps({'code': 202, 'msg': info}) return data ``` 最后效果如下 ![](https://www.writebug.com/myres/static/uploads/2022/1/8/1cd0463cbf7b853d826f758e804e399c.writebug) ### 对接聊天机器人 当前,如果用户没有登陆,是无法和其他人聊天的。那么一个友好的聊天机器人就非常有必要了。我们可以使用免费的图灵聊天机器人,当然也可以自己训练一个。以前我也写过一篇关于如何训练聊天机器人,感兴趣的小伙伴儿可以到我的公众号里查看(萝卜大杂烩)。 在这里也直接复用以前部署的 API 了,只需要增加几行代码即可 修改 send_chat 函数 ``` @main.route('/api/sendchat/ ', methods=['GET', 'POST']) def send_chat(info): ... else: base_url = 'http://luobodazahui.top:8889/api/chat/' chat_text = requests.get(base_url + info).text return chat_text ``` > 在函数中调用聊天机器人的 API 地址,将返回的内容传递给前端即可。 最终的效果如下: ![](https://www.writebug.com/myres/static/uploads/2022/1/8/a1180717ab1c766be2c170e5143e2d1c.writebug) --- 最后的最后,我们再来看看如何部署上线呢,毕竟没有部署到公网的 Web 服务,都是啥啥啥 首先,你得有一个公网服务器,云主机之类的。 然后就是一顿折腾,各种安装了。我的云主机是 CentOS 7.5,下面的一切操作都默认是这个操作系统了。 在 CentOS 7.5 上安装 docker ``` curl -fsSL https://get.docker.com ``` 安装 Redis ``` docker pull redis ``` 启动 Redis ``` docker run --name=myredis -p 6379:6379 -v /home/redis-data:/data -d redis redis-server --appendonly yes ``` 在本机连接 Redis,测试一下 ``` docker exec -it 491f1051715a redis-cli ``` 安装 python3 源码安装 安装编译软件 ``` yum -y groupinstall "Development tools" yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel yum install libffi-devel -y ``` 下载源码包并解压 ``` wget https://www.python.org/ftp/python/3.7.1/Python-3.7.1.tar.xz tar Jxvf Python-3.7.1.tar.xz ``` 编译安装 ``` cd Python-3.7.1 ./configure --prefix=/usr/local/python3 make && make install ``` 创建软连接 ``` ln -s /usr/local/python3/bin/python3 /usr/local/bin/python3 ln -s /usr/local/python3/bin/pip3 /usr/local/bin/pip3 ``` 验证 ``` python3 -V pip3 -V ``` 安装 chatterbot > 由于使用 pip 直接安装 chatterbot,一直报错,所以我这里采用 conda 来安装。 首先到 conda 官网下载好安装脚本,然后一键安装即可。 ``` sh Miniconda3-latest-Linux-x86_64.sh ``` 之后就可以使用 conda 的 pip 命令来安装 chatterbot 了。 ``` pip install --upgrade chatterbot pip install chatterbot_corpus ``` 安装 gevent 和 gunicorn ``` pip install gevent pip install gunicorn ``` 以上,所有的安装准备工作基本完成了! 下面就很简单了呀 首先编写 gunicorn 启动脚本 ``` debug = True loglevel = 'debug' bind = '0.0.0.0:5000' logfile = '/home/mychat/online_chat/log/debug.log' workers = 1 worker_class = 'eventlet' reload = True ``` > 使用 gunicorn 启动 flask_socketio,貌似还不能很好的启动多进程,这部分,留待以后再研究。 然后再写一个程序启动脚本 ``` /root/miniconda3/bin/gunicorn -D -c /home/mychat/online_chat/gunicorn manage:app ``` 最后,运行 run.sh 脚本,然后使用 ps -ef|grep python 来查看是否有进程存在 ![](https://www.writebug.com/myres/static/uploads/2022/1/8/3c62f1b87ae2d5dc1e8f9437c99a8e05.writebug) 如上图所示,说明我们的程序已经启动成功了,现在让我们来访问[ http://www.{hostip}:5000],不出意外的话,已经可以正常访问了。 [原文链接](https://juejin.cn/post/6844903880573059085)

参考文献

  • 网上论坛系统设计与实现(西安电子科技大学·胡秉玺)
  • 手机软酷网即时通讯软件的设计与实现(电子科技大学·齐迎旭)
  • 企业内部即时通讯系统的设计与实现(内蒙古大学·王慧平)
  • 基于移动平台的SNS系统的设计与实现(电子科技大学·高原)
  • 一个通用论坛系统的设计与实现(山东大学·张正)
  • 基于Web的在线交流平台的开发技术研究与应用(燕山大学·卢仕伟)
  • 基于Hadoop的分布式数据存储设计与实现(吉林大学·毛剑)
  • 基于Web的信息发布与信息交流平台的设计与实现(吉林大学·许昭霞)
  • 企业级即时通讯系统设计与实现(华南理工大学·余春贵)
  • 一个通用论坛系统的设计与实现(山东大学·张正)
  • 基于B/S架构的酷跑社区系统的设计与实现(内蒙古大学·张晓乐)
  • 基于Web的企业即时通讯系统的设计与实现(河北科技大学·张艳芳)
  • 基于Web的在线交流平台的开发技术研究与应用(燕山大学·卢仕伟)
  • 基于SSH的大学生联谊交友管理系统设计与实现(华中科技大学·王海波)
  • 基于redis的分布式自动化爬虫的设计与实现(华中科技大学·曾胜)

本文内容包括但不限于文字、数据、图表及超链接等)均来源于该信息及资料的相关主题。发布者:源码港湾 ,原文地址:https://m.bishedaima.com/yuanma/35929.html

相关推荐

发表回复

登录后才能评论