课程名称:当代数据库管理系统 | 年级 :2020级 | 上机实践成绩: |
---|---|---|
指导教师 :周烜 | 姓名 :杨舜、姚嘉和 | 学号 :10205501415、10205501436 |
上机实践名称 :BookStore | 上机实践日期:2022.11.28 —— 2022.12.10 | |
上机实践编号 : | 组号 :21 | 上机实践时间:2022.11.28 —— 2022.12.10 |
分析demo中/be/model/store.py中创建数据库表的sql语句可知原有数据库的结构的ER图大致如下
有上述ER图可以得到原有数据库表如下
user表:
user_id | password | balance | token | terminal |
---|---|---|---|---|
主键为user_id |
store表:
store_id | stock_level |
---|---|
主键为store_id |
store_book表:
store_id | book_id | book_info | stock_level |
---|---|---|---|
主键为联合主键(store_id,book_id) |
user_store表:
user_id | store_id |
---|---|
外键为(user_id,store_id) |
new_order表:
order_id | user_id | store_id |
---|---|---|
主键为(order_id) |
new_order_detail表:
oeder_id | book_id | count | price |
---|---|---|---|
主键为联合主键(order_id,book_id) |
class con:
def connect():
'''Returns a connection and a metadata object'''
# We connect with the help of the PostgreSQL URL
url = 'postgresql://stu10205501415:Stu10205501415@dase-cdms-2022-pub.pg.rds.aliyuncs.com:5432/stu10205501415'
# The return value of create_engine() is our connection object
con = create_engine(url, client_encoding='utf8')
# We then bind the connection to MetaData()
meta = MetaData(bind=con)
return con, meta
class User(Base):
__tablename__ = 'user'
user_id = Column(TEXT, primary_key=True, comment="主键")
password = Column(TEXT, nullable=False, comment="密码")
balance = Column(Integer, nullable=False, comment="")
token = Column(TEXT, comment="缓存的令牌")
terminal = Column(TEXT, comment="终端代码")
class Store(Base):
__tablename__ = 'store'
store_id = Column(TEXT, primary_key=True,comment="主键")
stock_level = Column(Integer, comment = "货存")
class Store_Book(Base):
__tablename__ = 'store_book'
store_id = Column(TEXT, comment="主键")
book_id = Column(TEXT, comment="主键")
book_info = Column(TEXT, comment="书籍信息")
stock_level = Column(Integer, comment = "货存")
__table_args__ = (
PrimaryKeyConstraint('store_id', 'book_id'),
)
class User_Store(Base):
__tablename__ = 'user_store'
id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
fk_user_id = Column(
TEXT,
ForeignKey(
"user.user_id",
ondelete="CASCADE",
onupdate="CASCADE",
),
nullable=False,
comment="user外键"
)
fk_store_id = Column(
TEXT,
ForeignKey(
"store.store_id",
ondelete="CASCADE",
onupdate="CASCADE",
),
nullable=False,
comment="store外键"
)
# 多对多关系的中间表必须使用联合唯一约束,防止出现重复数据
__table_args__ = (
UniqueConstraint("fk_user_id", "fk_store_id"),
)
class New_Order(Base):
__tablename__ = 'new_order'
order_id = Column(TEXT, primary_key = True, comment = '订单id')
fk_user_id = Column(
TEXT,
ForeignKey(
"user.user_id",
ondelete="CASCADE",
onupdate="CASCADE",
),
nullable=False,
comment="user外键"
)
fk_store_id = Column(
TEXT,
ForeignKey(
"store.store_id",
ondelete="CASCADE",
onupdate="CASCADE",
),
nullable=False,
comment="store外键"
)
class New_Order_Detail(Base):
__tablename__ = 'new_order_detail'
order_id = Column(TEXT, comment='订单id')
book_id = Column(TEXT, comment='订单书籍')
count = Column(Integer, comment='购买书籍数')
price = Column(Integer, comment='单价')
__table_args__ = (
PrimaryKeyConstraint('order_id','book_id'),
)
engine, meta = con.connect()
Base.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
session = DBSession()
app.register_blueprint(auth.bp_auth)
app.register_blueprint(seller.bp_seller)
app.register_blueprint(buyer.bp_buyer)
class DBConn:
def __init__(self):
self.session = postgreSQLORM.session
return
# self.conn = store.get_db_conn()
def user_id_exist(self, user_id):
row = self.session.query(User).filter(User.user_id==user_id).first()
# cursor = self.conn.execute("SELECT user_id FROM user WHERE user_id = ?;", (user_id,))
# row = cursor.fetchone()
if row is None:
return False
else:
return True
def book_id_exist(self, store_id, book_id):
row = self.session.query(Store_Book).filter(Store_Book.book_id==book_id and Store_Book.store_id==store_id).first()
# cursor = self.conn.execute("SELECT book_id FROM store WHERE store_id = ? AND book_id = ?;", (store_id, book_id))
# row = cursor.fetchone()
if row is None:
return False
else:
return True
def store_id_exist(self, store_id):
row = self.session.query(User_Store).filter(User_Store.fk_store_id==store_id).first()
# cursor = self.conn.execute("SELECT store_id FROM user_store WHERE store_id = ?;", (store_id,))
# row = cursor.fetchone()
if row is None:
return False
else:
return True
class User(db_conn.DBConn):
token_lifetime: int = 3600 # 3600 second
def __init__(self):
db_conn.DBConn.__init__(self)
def __check_token(self, user_id, db_token, token) -> bool:
try:
if db_token != token:
return False
jwt_text = jwt_decode(encoded_token=token, user_id=user_id)
ts = jwt_text["timestamp"]
if ts is not None:
now = time.time()
if self.token_lifetime > now - ts >= 0:
return True
except jwt.exceptions.InvalidSignatureError as e:
logging.error(str(e))
return False
def register(self, user_id: str, password: str):
## 判断用户是否注册过了
if self.user_id_exist(user_id=user_id):
return error.error_exist_user_id(user_id)
else:
# try:
terminal = "terminal_{}".format(str(time.time()))
token = jwt_encode(user_id, terminal)
## 为新注册的用户创建对象
new_user = postgreSQLORM.User(user_id=user_id,password=password,balance=0,token=token,terminal=terminal)
self.session.add(new_user)
self.session.commit()
# self.conn.execute(
# "INSERT into user(user_id, password, balance, token, terminal) "
# "VALUES (?, ?, ?, ?, ?);",
# (user_id, password, 0, token, terminal), )
# self.conn.commit()
# except sqlite.Error:
# return error.error_exist_user_id(user_id)
return 200, "ok"
利用上述类似的实现auth路由接口的方式完成seller路由接口,并利用postman测试实现(2022.12.01 19:20 杨舜)
利用上述类似的实现auth路由接口的方式完成buyer路由接口,并利用postman测试实现(2022.12.02 12:10 杨舜)
为了实现发货,收获,订单状态的查询,可以在new_order的订单的table中添加status,并利用不同的状态码来表示当前次订单的状态
status code | status |
---|---|
-1 | 取消 |
0 | 初始值(未付款) |
1 | 已付款 |
2 | 已发货 |
3 | 已收货 |
因此修改postgreSQLORM.py文件中New_Order的类
row = session.query(New_Order).filter(New_Order.order_id==order_id).update({'status':1})
## /view/seller.py
@bp_seller.routr("/send_out",methods=["POST"])
def send_out():
order_id: str = request.json.get("order_id")
user_id: str = request.json.get("user_id")
s = seller.Seller()
code, message = s.send_out(order_id)
return jsonify({"message": message}), code
## /model/seller.py
def send_out(self, order_id:str):
session = self.session
try:
if not self.user_id_exist(user_id):
return error.error_non_exist_user_id(user_id)
row = session.query(New_Order).filter(New_Order.order_id==order_id).first()
if row is None:
return error.error_invalid_order_id(order_id)
if row.status != 1:
return error.error_invalid_order_id(order_id)
row = session.query(New_Order).filter(New_Order.order_id==order_id).update({'status':2})
if row == 0:
return error.error_invalid_order_id(order_id)
session.commit()
except SQLAlchemyError as e:
return 528, "{}".format(str(e))
except BaseException as e:
# print('touch3')
return 530, "{}".format(str(e))
return 200, "ok"
(1)创建商铺接口实现: 首先检查user_id和store_id是否已存在,若未存在则创建新对象,并插入表user_store中 (2)添加书本信息接口实现: 首先检查user_id和store_id是否存在,再检查book_id是否已有对应的书本,最后将书本信息创建新对象插入表store_book中 (3)添加库存接口实现: 首先检查user_id和store_id和book_id是否存在,之后根据store_id和book_id找到对应店铺的对应书本库存并进行更新 (4)商家发货接口实现: 首先检查user_id是否存在,之后检查order_id是否有效,对new_order表进行更新
(5)买家下单接口实现: 首先检查user_id和store_id是否存在,之后检查对应店铺的所要买的书本库存是否足够,若足够则减去对应库存,生成order_id,对表new_order_detail更新 (6)买家充值接口实现: 首先根据user_id获取对应密码,与用户输入密码对比,相符合则在表user中更新余额 (7)买家付款接口实现: 首先检查order_id是否存在,若存在则根据user_id获取密码,与用户输入密码对比,之后检查store_id与卖家的user_id是否存在,若存在则比较买家余额和订单价格,若余额足够则扣除买家相应价格,在卖家余额上等额增加,更新user表 (8)买家收货接口实现: 首先检查user_id是否存在,检查order_id是否有效,之后更新new_order表 (9)买家取消订单接口实现: 首先检查user_id是否存在,之后根据订单信息查找对应的订单,将查找到的订单从表new_order中删去,更新 (10)买家查看历史订单接口实现: 首先检查user_id是否存在,之后根据user_id查找表new_order,之后根据order_id列出订单 (11)自动取消订单接口实现: 首先根据order_id查找表new_order,之后根据当前时间计算订单处于未付款状态的持续时长,若超过30s处于未付款状态则自动删去该条目,更新表new_order
商家发货
URL
POST http://[address]/seller/send_out
Request Headers:
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
token | string | 登录产生的会话标识 | N |
Body:
{
"user_id": "$seller id$",
"order_id": "$store id$",
}
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
user_id | String | 卖家用户ID | N |
order_id | String | 订单号 | N |
Response
Status Code:
码 | 描述 |
---|---|
200 | 发货成功 |
5XX | 买家用户ID不存在 |
5XX | 无效参数 |
## /view/buyer.py
@bp_buyer.route("/take_over", methods=["POST"])
def take_over():
user_id = request.json.get("user_id")
order_id = request.json.get("order_id")
b = Buyer()
code, message = b.take_over(user_id, order_id)
return jsonify({"message": message}), code
## /model/buyer.py
def take_over(self, user_id, order_id):
session = self.session
try:
if not self.user_id_exist(user_id):
return error.error_non_exist_user_id(user_id)
row = session.query(New_Order).filter(and_(New_Order.order_id==order_id,New_Order.fk_user_id==user_id)).first()
if row is None:
return error.error_invalid_order_id(order_id)
if row.status != 2:
return error.error_invalid_order_id(order_id)
row = session.query(New_Order).filter(and_(New_Order.order_id==order_id,New_Order.fk_user_id==user_id)).update({'status':3})
if row == 0:
return error.error_invalid_order_id(order_id)
session.commit()
except SQLAlchemyError as e:
return 528, "{}".format(str(e))
except BaseException as e:
# print('touch3')
return 530, "{}".format(str(e))
return 200, "ok"
卖家收货
URL
POST http://[address]/buyer/take_over
Request Headers:
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
token | string | 登录产生的会话标识 | N |
Body:
{
"user_id": "$seller id$",
"order_id": "$store id$",
}
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
user_id | String | 买家用户ID | N |
order_id | String | 订单号 | N |
Response
Status Code:
码 | 描述 |
---|---|
200 | 收货成功 |
5XX | 买家用户ID不存在 |
5XX | 无效参数 |
## /view/buyer.py
@bp_buyer.route("/order_cancel", methods=["POST"])
def take_over():
user_id = request.json.get("user_id")
order_id = request.json.get("order_id")
b = Buyer()
code, message = b.order_cancel(user_id, order_id)
return jsonify({"message": message}), code
## /model/buyer.py
def order_cancel(self, user_id, order_id):
session = self.session
try:
if not self.user_id_exist(user_id):
return error.error_non_exist_user_id(user_id)
row = session.query(New_Order).filter(and_(New_Order.order_id==order_id,New_Order.fk_user_id==user_id)).first()
if row is None:
return error.error_invalid_order_id(order_id)
if row.status != 0:
return error.error_invalid_order_id(order_id)
row = session.query(New_Order).filter(and_(New_Order.order_id==order_id,New_Order.fk_user_id==user_id)).update({'status':-1})
if row == 0:
return error.error_invalid_order_id(order_id)
session.commit()
except SQLAlchemyError as e:
return 528, "{}".format(str(e))
except BaseException as e:
# print('touch3')
return 530, "{}".format(str(e))
return 200, "ok"
买家取消订单
URL
POST http://[address]/buyer/order_cancel
Request Headers:
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
token | string | 登录产生的会话标识 | N |
Body:
{
"user_id": "$seller id$",
"order_id": "$store id$",
}
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
user_id | String | 买家用户ID | N |
order_id | String | 订单号 | N |
Response
Status Code:
码 | 描述 |
---|---|
200 | 收货成功 |
5XX | 买家用户ID不存在 |
5XX | 无效参数 |
## /view/buyer.py
@bp_buyer.route("/history_order", methods=["POST"])
def take_over():
user_id = request.json.get("user_id")
b = Buyer()
code, message = b.history_order(user_id)
return jsonify({"message": message}), code
## /model/buyer.py
历史订单查询
URL
POST http://[address]/buyer/history_order
Request Headers:
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
token | string | 登录产生的会话标识 | N |
Body:
{
"user_id": "$buyer id$"
}
key | 类型 | 描述 | 是否可为空 |
---|---|---|---|
user_id | String | 买家用户ID | N |
Response
Status Code:
码 | 描述 |
---|---|
200 | 查询成功 |
5XX | 买家用户ID不存在 |
5XX | 无效参数 |
Body:
{
"order_id": ["uuid"]
}
变量名 | 类型 | 描述 | 是否可为空 |
---|---|---|---|
order_id | string | 订单号,只有返回200时才有效 | N |
Book
book_id | title | author | publisher | original_title | translator | pub_year | pages | original_price | currency_unit | binding | isbn | author_intro | book_intro | content | tags | picture |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
主键(book_id) |
Search_title
search_id | title | book_id |
---|---|---|
联合主键(search_id,book_id) | ||
外键(book_id) |
Search_tags
search_id | tags | book_id |
---|---|---|
联合主键(search_id,book_id) | ||
外键(book_id) |
Search_author
search_id | author | book_id |
---|---|---|
联合主键(search_id,author) | ||
外键(book_id) |
Search_book_intro
search_id | book_intro | book_id |
---|---|---|
联合主键(search_id,book_id) | ||
外键(book_id) |
@bp_auth.route("/search_author", methods=["POST"])
def search_author():
author = request.json.get("author", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_author(author=author, page=page)
return jsonify({"message": message}), code
@bp_auth.route("/search_book_intro", methods=["POST"])
def search_book_intro():
book_intro = request.json.get("book_intro", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_book_intro(book_intro=book_intro, page=page)
return jsonify({"message": message}), code
@bp_auth.route("/search_tags", methods=["POST"])
def search_tags():
tags = request.json.get("tags", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_tags(tags=tags, page=page)
return jsonify({"message": message}), code
@bp_auth.route("/search_title", methods=["POST"])
def search_title():
title = request.json.get("title", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_title(title=title, page=page)
return jsonify({"message": message}), code
bash script/test.sh
首先针对要实现的功能编写相应test,再进行功能实现,符合测试驱动开发的方法,最后输入上述命令,从上面的图中可以看出绝大部分的测试都能够通过,代码覆盖率为54%,作为核心组件的buyer.py、seller.py和user.py的代码覆盖率均在50%-65%之间
本次书店project结合了这学期目前为止所学的许多知识,er图、ORM的使用与查询等,是一个比较大的挑战,在本次大项目中在构思方面帮助我们更好的理解和掌握了er图的绘制,在代码方面帮助我们对orm的增删改查操作有了较多的熟悉,同时,由于本次为合作作业,也让我们对git的使用变得更加流畅与顺手,相信在下次的大项目中能有比较大的提升
在本次项目的实现过程中采用了水杉码园的代码托管工具用于版本管理 下图为项目仓库的提交图:
其中项目的参与人如下: 10205501415 —— 杨舜 Max —————— 姚嘉和
项目的链接为 https://gitea.shuishan.net.cn/10205501415/BookStore-PJ2
具体分工: