from __init__ import User import logging import session import error import random import string import jwt import time #Users类,包括注册登录 class Users(session.ORMsession): token_lifetime: int = 60 def __init__(self): session.ORMsession.__init__(self) def register(self,user_id:str,password:str) -> (int,str): try: if(len(password)==0): return 202,"error" user=User(user_id=user_id,password=password) self.db_session.add(user) self.db_session.commit() except: return error.error_exist_user_id(user_id) return 200,"ok" def unregister(self, user_id: str, password: str) -> (int, str): try: code, message = self.check_password(user_id, password) if code != 200: return code, message user = self.db_session.query(User).filter(User.user_id==user_id).first() self.db_session.delete(user) self.db_session.commit() except: return error.error_authorization_fail() return 200, "ok" def check_password(self, user_id: str, password: str) -> (int, str): user = self.db_session.query(User).filter(User.user_id==user_id).first() if user is None: return error.error_authorization_fail() if password != user.password: return error.error_authorization_fail() return 200, "ok" def login(self, user_id: str, password: str) -> (int, str, str): try: code, message = self.check_password(user_id, password) if code != 200: return code, message self.db_session.commit() except: return error.error_authorization_fail() return 200, "ok" def change_password(self, user_id: str, old_password: str, new_password: str) -> (int,str): try: code, message = self.check_password(user_id, old_password) if code != 200: return code, message self.db_session.query(User).filter(User.user_id== user_id)\ .update({'password': new_password}) self.db_session.commit() except: return error.error_authorization_fail() return 200, "ok"