Browse Source

auth中的路由全部实现

master
杨舜 1 year ago
parent
commit
d06898c562
9 changed files with 132 additions and 101 deletions
  1. BIN
      figure_require/login_test.png
  2. BIN
      figure_require/logout_test.png
  3. BIN
      figure_require/password_test.png
  4. BIN
      figure_require/unregister_test.png
  5. BIN
      modified/be/model/__pycache__/user.cpython-38.pyc
  6. +123
    -100
      modified/be/model/user.py
  7. BIN
      modified/be/view/__pycache__/auth.cpython-38.pyc
  8. +1
    -0
      modified/be/view/auth.py
  9. +8
    -1
      report.md

BIN
figure_require/login_test.png View File

Before After
Width: 953  |  Height: 692  |  Size: 61 KiB

BIN
figure_require/logout_test.png View File

Before After
Width: 901  |  Height: 627  |  Size: 49 KiB

BIN
figure_require/password_test.png View File

Before After
Width: 898  |  Height: 581  |  Size: 48 KiB

BIN
figure_require/unregister_test.png View File

Before After
Width: 865  |  Height: 536  |  Size: 44 KiB

BIN
modified/be/model/__pycache__/user.cpython-38.pyc View File


+ 123
- 100
modified/be/model/user.py View File

@ -2,6 +2,7 @@ import jwt
import time
import logging
import sqlite3 as sqlite
from sqlalchemy.exc import SQLAlchemyError
from model import error
from model import postgreSQLORM
from model.postgreSQLORM import User
@ -44,6 +45,7 @@ class User(db_conn.DBConn):
def __check_token(self, user_id, db_token, token) -> bool:
try:
if db_token != token:
# print('touch')
return False
jwt_text = jwt_decode(encoded_token=token, user_id=user_id)
ts = jwt_text["timestamp"]
@ -66,6 +68,7 @@ class User(db_conn.DBConn):
## 为新注册的用户创建对象
new_user = postgreSQLORM.User(user_id=user_id,password=password,balance=0,token=token,terminal=terminal)
self.session.add(new_user)
self.session.commit()
# self.conn.execute(
# "INSERT into user(user_id, password, balance, token, terminal) "
@ -76,104 +79,124 @@ class User(db_conn.DBConn):
# return error.error_exist_user_id(user_id)
return 200, "ok"
# def check_token(self, user_id: str, token: str) -> (int, str):
# cursor = self.conn.execute("SELECT token from user where user_id=?", (user_id,))
# row = cursor.fetchone()
# if row is None:
# return error.error_authorization_fail()
# db_token = row[0]
# if not self.__check_token(user_id, db_token, token):
# return error.error_authorization_fail()
# return 200, "ok"
# def check_password(self, user_id: str, password: str) -> (int, str):
# cursor = self.conn.execute("SELECT password from user where user_id=?", (user_id,))
# row = cursor.fetchone()
# if row is None:
# return error.error_authorization_fail()
# if password != row[0]:
# return error.error_authorization_fail()
# return 200, "ok"
# def login(self, user_id: str, password: str, terminal: str) -> (int, str, str):
# token = ""
# try:
# code, message = self.check_password(user_id, password)
# if code != 200:
# return code, message, ""
# token = jwt_encode(user_id, terminal)
# cursor = self.conn.execute(
# "UPDATE user set token= ? , terminal = ? where user_id = ?",
# (token, terminal, user_id), )
# if cursor.rowcount == 0:
# return error.error_authorization_fail() + ("", )
# self.conn.commit()
# except sqlite.Error as e:
# return 528, "{}".format(str(e)), ""
# except BaseException as e:
# return 530, "{}".format(str(e)), ""
# return 200, "ok", token
# def logout(self, user_id: str, token: str) -> bool:
# try:
# code, message = self.check_token(user_id, token)
# if code != 200:
# return code, message
# terminal = "terminal_{}".format(str(time.time()))
# dummy_token = jwt_encode(user_id, terminal)
# cursor = self.conn.execute(
# "UPDATE user SET token = ?, terminal = ? WHERE user_id=?",
# (dummy_token, terminal, user_id), )
# if cursor.rowcount == 0:
# return error.error_authorization_fail()
# self.conn.commit()
# except sqlite.Error as e:
# return 528, "{}".format(str(e))
# except BaseException as e:
# return 530, "{}".format(str(e))
# return 200, "ok"
# def unregister(self, user_id: str, password: str) -> (int, str):
# try:
# code, message = self.check_password(user_id, password)
# if code != 200:
# return code, message
# cursor = self.conn.execute("DELETE from user where user_id=?", (user_id,))
# if cursor.rowcount == 1:
# self.conn.commit()
# else:
# return error.error_authorization_fail()
# except sqlite.Error as e:
# return 528, "{}".format(str(e))
# except BaseException as e:
# return 530, "{}".format(str(e))
# return 200, "ok"
# def change_password(self, user_id: str, old_password: str, new_password: str) -> bool:
# try:
# code, message = self.check_password(user_id, old_password)
# if code != 200:
# return code, message
# terminal = "terminal_{}".format(str(time.time()))
# token = jwt_encode(user_id, terminal)
# cursor = self.conn.execute(
# "UPDATE user set password = ?, token= ? , terminal = ? where user_id = ?",
# (new_password, token, terminal, user_id), )
# if cursor.rowcount == 0:
# return error.error_authorization_fail()
# self.conn.commit()
# except sqlite.Error as e:
# return 528, "{}".format(str(e))
# except BaseException as e:
# return 530, "{}".format(str(e))
# return 200, "ok"
def check_token(self, user_id: str, token: str) -> (int, str):
row = self.session.query(postgreSQLORM.User.token).filter(postgreSQLORM.User.user_id==user_id).first()
# cursor = self.conn.execute("SELECT token from user where user_id=?", (user_id,))
# row = cursor.fetchone()
# print(row)
if row is None:
# print('touch')
return error.error_authorization_fail()
db_token = row[0]
# print(db_token)
# print(token)
if not self.__check_token(user_id, db_token, token):
# print('touch')
return error.error_authorization_fail()
return 200, "ok"
def check_password(self, user_id: str, password: str) -> (int, str):
row = self.session.query(postgreSQLORM.User.password).filter(postgreSQLORM.User.user_id==user_id).first()
# cursor = self.conn.execute("SELECT password from user where user_id=?", (user_id,))
# row = cursor.fetchone()
if row is None:
return error.error_authorization_fail()
if password != row[0]:
return error.error_authorization_fail()
return 200, "ok"
def login(self, user_id: str, password: str, terminal: str) -> (int, str, str):
token = ""
try:
code, message = self.check_password(user_id, password)
if code != 200:
return code, message, ""
token = jwt_encode(user_id, terminal)
row = self.session.query(postgreSQLORM.User).filter_by(user_id=user_id).update({'token':token,'terminal':terminal})
# cursor = self.conn.execute(
# "UPDATE user set token= ? , terminal = ? where user_id = ?",
# (token, terminal, user_id), )
# if cursor.rowcount == 0:
if row == 0:
return error.error_authorization_fail() + ("", )
# self.conn.commit()
self.session.commit()
except SQLAlchemyError as e:
return 528, "{}".format(str(e)), ""
except BaseException as e:
return 530, "{}".format(str(e)), ""
return 200, "ok", token
def logout(self, user_id: str, token: str) -> bool:
try:
code, message = self.check_token(user_id, token)
if code != 200:
return code, message
terminal = "terminal_{}".format(str(time.time()))
dummy_token = jwt_encode(user_id, terminal)
row = self.session.query(postgreSQLORM.User).filter_by(user_id=user_id).update({'token':dummy_token,'terminal':terminal})
# cursor = self.conn.execute(
# "UPDATE user SET token = ?, terminal = ? WHERE user_id=?",
# (dummy_token, terminal, user_id), )
# if cursor.rowcount == 0:
# print(row)
if row == 0:
return error.error_authorization_fail()
# self.conn.commit()
self.session.commit()
except SQLAlchemyError as e:
return 528, "{}".format(str(e))
except BaseException as e:
return 530, "{}".format(str(e))
return 200, "ok"
def unregister(self, user_id: str, password: str) -> (int, str):
try:
code, message = self.check_password(user_id, password)
if code != 200:
return code, message
row = self.session.query(postgreSQLORM.User).filter(postgreSQLORM.User.user_id==user_id).delete()
# cursor = self.conn.execute("DELETE from user where user_id=?", (user_id,))
# if cursor.rowcount == 1:
if row == 1:
self.session.commit()
# self.conn.commit()
else:
return error.error_authorization_fail()
except SQLAlchemyError as e:
return 528, "{}".format(str(e))
except BaseException as e:
return 530, "{}".format(str(e))
return 200, "ok"
def change_password(self, user_id: str, old_password: str, new_password: str) -> bool:
try:
code, message = self.check_password(user_id, old_password)
if code != 200:
return code, message
terminal = "terminal_{}".format(str(time.time()))
token = jwt_encode(user_id, terminal)
row = self.session.query(postgreSQLORM.User).filter_by(user_id=user_id).update({'password':new_password,'token':token,'terminal':terminal})
# cursor = self.conn.execute(
# "UPDATE user set password = ?, token= ? , terminal = ? where user_id = ?",
# (new_password, token, terminal, user_id), )
# if cursor.rowcount == 0:
if row == 0:
return error.error_authorization_fail()
self.session.commit()
# self.conn.commit()
except SQLAlchemyError as e:
return 528, "{}".format(str(e))
except BaseException as e:
return 530, "{}".format(str(e))
return 200, "ok"

BIN
modified/be/view/__pycache__/auth.cpython-38.pyc View File


+ 1
- 0
modified/be/view/auth.py View File

@ -19,6 +19,7 @@ def login():
def logout():
user_id: str = request.json.get("user_id")
token: str = request.headers.get("token")
print(token)
u = user.User()
code, message = u.logout(user_id=user_id, token=token)
return jsonify({"message": message}), code

+ 8
- 1
report.md View File

@ -242,6 +242,7 @@
## 为新注册的用户创建对象
new_user = postgreSQLORM.User(user_id=user_id,password=password,balance=0,token=token,terminal=terminal)
self.session.add(new_user)
self.session.commit()
# self.conn.execute(
# "INSERT into user(user_id, password, balance, token, terminal) "
@ -254,4 +255,10 @@
```
![avatar](./figure_require/register_test.png)
7.
7. 另外对于auth路由中的其他功能接口(注销、登录、登出、更改密码)进行类似上述注册接口的修改,此处不在单独贴出代码,只是给出postman的测试截图,至此auth中的路由全部实现(2022.10.30 17:50 杨舜)
![avatar](./figure_require/unregister_test.png)
![avatar](./figure_require/login_test.png)
![avatar](./figure_require/logout_test.png)
![avatar](./figure_require/password_test.png)
8.

Loading…
Cancel
Save