|
|
- import jwt
- import time
- import logging
- import sqlite3 as sqlite
- from be.model import error
- from be.model import db_conn
-
- # encode a json string like:
- # {
- # "user_id": [user name],
- # "terminal": [terminal code],
- # "timestamp": [ts]} to a JWT
- # }
-
-
- def jwt_encode(user_id: str, terminal: str) -> str:
- encoded = jwt.encode(
- {"user_id": user_id, "terminal": terminal, "timestamp": time.time()},
- key=user_id,
- algorithm="HS256",
- )
- return encoded.decode("utf-8")
-
-
- # decode a JWT to a json string like:
- # {
- # "user_id": [user name],
- # "terminal": [terminal code],
- # "timestamp": [ts]} to a JWT
- # }
- def jwt_decode(encoded_token, user_id: str) -> str:
- decoded = jwt.decode(encoded_token, key=user_id, algorithms="HS256")
- return decoded
-
-
- class User(db_conn.DBConn):
- token_lifetime: int = 3600 # 3600 second
-
- def __init__(self):
- db_conn.DBConn.__init__(self)
-
- def __check_token(self, user_id, db_token, token) -> bool:
- try:
- if db_token != token:
- return False
- jwt_text = jwt_decode(encoded_token=token, user_id=user_id)
- ts = jwt_text["timestamp"]
- if ts is not None:
- now = time.time()
- if self.token_lifetime > now - ts >= 0:
- return True
- except jwt.exceptions.InvalidSignatureError as e:
- logging.error(str(e))
- return False
-
- def register(self, user_id: str, password: str):
- try:
- terminal = "terminal_{}".format(str(time.time()))
- token = jwt_encode(user_id, terminal)
- self.conn.execute(
- "INSERT into user(user_id, password, balance, token, terminal) "
- "VALUES (?, ?, ?, ?, ?);",
- (user_id, password, 0, token, terminal), )
- self.conn.commit()
- except sqlite.Error:
- return error.error_exist_user_id(user_id)
- return 200, "ok"
-
- def check_token(self, user_id: str, token: str) -> (int, str):
- cursor = self.conn.execute("SELECT token from user where user_id=?", (user_id,))
- row = cursor.fetchone()
- if row is None:
- return error.error_authorization_fail()
- db_token = row[0]
- if not self.__check_token(user_id, db_token, token):
- return error.error_authorization_fail()
- return 200, "ok"
-
- def check_password(self, user_id: str, password: str) -> (int, str):
- cursor = self.conn.execute("SELECT password from user where user_id=?", (user_id,))
- row = cursor.fetchone()
- if row is None:
- return error.error_authorization_fail()
-
- if password != row[0]:
- return error.error_authorization_fail()
-
- return 200, "ok"
-
- def login(self, user_id: str, password: str, terminal: str) -> (int, str, str):
- token = ""
- try:
- code, message = self.check_password(user_id, password)
- if code != 200:
- return code, message, ""
-
- token = jwt_encode(user_id, terminal)
- cursor = self.conn.execute(
- "UPDATE user set token= ? , terminal = ? where user_id = ?",
- (token, terminal, user_id), )
- if cursor.rowcount == 0:
- return error.error_authorization_fail() + ("", )
- self.conn.commit()
- except sqlite.Error as e:
- return 528, "{}".format(str(e)), ""
- except BaseException as e:
- return 530, "{}".format(str(e)), ""
- return 200, "ok", token
-
- def logout(self, user_id: str, token: str) -> bool:
- try:
- code, message = self.check_token(user_id, token)
- if code != 200:
- return code, message
-
- terminal = "terminal_{}".format(str(time.time()))
- dummy_token = jwt_encode(user_id, terminal)
-
- cursor = self.conn.execute(
- "UPDATE user SET token = ?, terminal = ? WHERE user_id=?",
- (dummy_token, terminal, user_id), )
- if cursor.rowcount == 0:
- return error.error_authorization_fail()
-
- self.conn.commit()
- except sqlite.Error as e:
- return 528, "{}".format(str(e))
- except BaseException as e:
- return 530, "{}".format(str(e))
- return 200, "ok"
-
- def unregister(self, user_id: str, password: str) -> (int, str):
- try:
- code, message = self.check_password(user_id, password)
- if code != 200:
- return code, message
-
- cursor = self.conn.execute("DELETE from user where user_id=?", (user_id,))
- if cursor.rowcount == 1:
- self.conn.commit()
- else:
- return error.error_authorization_fail()
- except sqlite.Error as e:
- return 528, "{}".format(str(e))
- except BaseException as e:
- return 530, "{}".format(str(e))
- return 200, "ok"
-
- def change_password(self, user_id: str, old_password: str, new_password: str) -> bool:
- try:
- code, message = self.check_password(user_id, old_password)
- if code != 200:
- return code, message
-
- terminal = "terminal_{}".format(str(time.time()))
- token = jwt_encode(user_id, terminal)
- cursor = self.conn.execute(
- "UPDATE user set password = ?, token= ? , terminal = ? where user_id = ?",
- (new_password, token, terminal, user_id), )
- if cursor.rowcount == 0:
- return error.error_authorization_fail()
-
- self.conn.commit()
- except sqlite.Error as e:
- return 528, "{}".format(str(e))
- except BaseException as e:
- return 530, "{}".format(str(e))
- return 200, "ok"
-
|