import jwt
|
|
import time
|
|
import logging
|
|
import sqlite3 as sqlite
|
|
from be.model import error
|
|
from be.model import db_conn
|
|
|
|
# encode a json string like:
|
|
# {
|
|
# "user_id": [user name],
|
|
# "terminal": [terminal code],
|
|
# "timestamp": [ts]} to a JWT
|
|
# }
|
|
|
|
|
|
def jwt_encode(user_id: str, terminal: str) -> str:
|
|
encoded = jwt.encode(
|
|
{"user_id": user_id, "terminal": terminal, "timestamp": time.time()},
|
|
key=user_id,
|
|
algorithm="HS256",
|
|
)
|
|
return encoded.decode("utf-8")
|
|
|
|
|
|
# decode a JWT to a json string like:
|
|
# {
|
|
# "user_id": [user name],
|
|
# "terminal": [terminal code],
|
|
# "timestamp": [ts]} to a JWT
|
|
# }
|
|
def jwt_decode(encoded_token, user_id: str) -> str:
|
|
decoded = jwt.decode(encoded_token, key=user_id, algorithms="HS256")
|
|
return decoded
|
|
|
|
|
|
class User(db_conn.DBConn):
|
|
token_lifetime: int = 3600 # 3600 second
|
|
|
|
def __init__(self):
|
|
db_conn.DBConn.__init__(self)
|
|
|
|
def __check_token(self, user_id, db_token, token) -> bool:
|
|
try:
|
|
if db_token != token:
|
|
return False
|
|
jwt_text = jwt_decode(encoded_token=token, user_id=user_id)
|
|
ts = jwt_text["timestamp"]
|
|
if ts is not None:
|
|
now = time.time()
|
|
if self.token_lifetime > now - ts >= 0:
|
|
return True
|
|
except jwt.exceptions.InvalidSignatureError as e:
|
|
logging.error(str(e))
|
|
return False
|
|
|
|
def register(self, user_id: str, password: str):
|
|
try:
|
|
terminal = "terminal_{}".format(str(time.time()))
|
|
token = jwt_encode(user_id, terminal)
|
|
self.conn.execute(
|
|
"INSERT into user(user_id, password, balance, token, terminal) "
|
|
"VALUES (?, ?, ?, ?, ?);",
|
|
(user_id, password, 0, token, terminal), )
|
|
self.conn.commit()
|
|
except sqlite.Error:
|
|
return error.error_exist_user_id(user_id)
|
|
return 200, "ok"
|
|
|
|
def check_token(self, user_id: str, token: str) -> (int, str):
|
|
cursor = self.conn.execute("SELECT token from user where user_id=?", (user_id,))
|
|
row = cursor.fetchone()
|
|
if row is None:
|
|
return error.error_authorization_fail()
|
|
db_token = row[0]
|
|
if not self.__check_token(user_id, db_token, token):
|
|
return error.error_authorization_fail()
|
|
return 200, "ok"
|
|
|
|
def check_password(self, user_id: str, password: str) -> (int, str):
|
|
cursor = self.conn.execute("SELECT password from user where user_id=?", (user_id,))
|
|
row = cursor.fetchone()
|
|
if row is None:
|
|
return error.error_authorization_fail()
|
|
|
|
if password != row[0]:
|
|
return error.error_authorization_fail()
|
|
|
|
return 200, "ok"
|
|
|
|
def login(self, user_id: str, password: str, terminal: str) -> (int, str, str):
|
|
token = ""
|
|
try:
|
|
code, message = self.check_password(user_id, password)
|
|
if code != 200:
|
|
return code, message, ""
|
|
|
|
token = jwt_encode(user_id, terminal)
|
|
cursor = self.conn.execute(
|
|
"UPDATE user set token= ? , terminal = ? where user_id = ?",
|
|
(token, terminal, user_id), )
|
|
if cursor.rowcount == 0:
|
|
return error.error_authorization_fail() + ("", )
|
|
self.conn.commit()
|
|
except sqlite.Error as e:
|
|
return 528, "{}".format(str(e)), ""
|
|
except BaseException as e:
|
|
return 530, "{}".format(str(e)), ""
|
|
return 200, "ok", token
|
|
|
|
def logout(self, user_id: str, token: str) -> bool:
|
|
try:
|
|
code, message = self.check_token(user_id, token)
|
|
if code != 200:
|
|
return code, message
|
|
|
|
terminal = "terminal_{}".format(str(time.time()))
|
|
dummy_token = jwt_encode(user_id, terminal)
|
|
|
|
cursor = self.conn.execute(
|
|
"UPDATE user SET token = ?, terminal = ? WHERE user_id=?",
|
|
(dummy_token, terminal, user_id), )
|
|
if cursor.rowcount == 0:
|
|
return error.error_authorization_fail()
|
|
|
|
self.conn.commit()
|
|
except sqlite.Error as e:
|
|
return 528, "{}".format(str(e))
|
|
except BaseException as e:
|
|
return 530, "{}".format(str(e))
|
|
return 200, "ok"
|
|
|
|
def unregister(self, user_id: str, password: str) -> (int, str):
|
|
try:
|
|
code, message = self.check_password(user_id, password)
|
|
if code != 200:
|
|
return code, message
|
|
|
|
cursor = self.conn.execute("DELETE from user where user_id=?", (user_id,))
|
|
if cursor.rowcount == 1:
|
|
self.conn.commit()
|
|
else:
|
|
return error.error_authorization_fail()
|
|
except sqlite.Error as e:
|
|
return 528, "{}".format(str(e))
|
|
except BaseException as e:
|
|
return 530, "{}".format(str(e))
|
|
return 200, "ok"
|
|
|
|
def change_password(self, user_id: str, old_password: str, new_password: str) -> bool:
|
|
try:
|
|
code, message = self.check_password(user_id, old_password)
|
|
if code != 200:
|
|
return code, message
|
|
|
|
terminal = "terminal_{}".format(str(time.time()))
|
|
token = jwt_encode(user_id, terminal)
|
|
cursor = self.conn.execute(
|
|
"UPDATE user set password = ?, token= ? , terminal = ? where user_id = ?",
|
|
(new_password, token, terminal, user_id), )
|
|
if cursor.rowcount == 0:
|
|
return error.error_authorization_fail()
|
|
|
|
self.conn.commit()
|
|
except sqlite.Error as e:
|
|
return 528, "{}".format(str(e))
|
|
except BaseException as e:
|
|
return 530, "{}".format(str(e))
|
|
return 200, "ok"
|
|
|