Browse Source

搜索接口完成,等待所有接口的测试和报告撰写

master
杨舜 1 year ago
parent
commit
8786575c53
25 changed files with 831 additions and 6 deletions
  1. BIN
      .coverage
  2. BIN
      be/__pycache__/serve.cpython-38.pyc
  3. BIN
      be/model/__pycache__/buyer.cpython-38.pyc
  4. BIN
      be/model/__pycache__/postgreSQLORM.cpython-38.pyc
  5. +111
    -1
      be/model/buyer.py
  6. +180
    -2
      be/model/postgreSQLORM.py
  7. +29
    -0
      be/model/seller.py
  8. +104
    -1
      be/model/user.py
  9. +2
    -0
      be/serve.py
  10. BIN
      fe/access/__pycache__/buyer.cpython-38.pyc
  11. BIN
      fe/access/__pycache__/seller.cpython-38.pyc
  12. +22
    -0
      fe/access/auth.py
  13. +45
    -0
      fe/access/buyer.py
  14. +10
    -0
      fe/access/seller.py
  15. BIN
      fe/test/__pycache__/test_history_order.cpython-38-pytest-6.2.3.pyc
  16. BIN
      fe/test/__pycache__/test_order_cancel.cpython-38-pytest-6.2.3.pyc
  17. BIN
      fe/test/__pycache__/test_send_out.cpython-38-pytest-6.2.3.pyc
  18. BIN
      fe/test/__pycache__/test_take_over.cpython-38-pytest-6.2.3.pyc
  19. +58
    -0
      fe/test/test_history_order.py
  20. +57
    -0
      fe/test/test_order_cancel.py
  21. +28
    -0
      fe/test/test_search.py
  22. +54
    -0
      fe/test/test_send_out.py
  23. +57
    -0
      fe/test/test_take_over.py
  24. BIN
      figure_require/route_test01.png
  25. +74
    -2
      实验报告.md

BIN
.coverage View File


BIN
be/__pycache__/serve.cpython-38.pyc View File


BIN
be/model/__pycache__/buyer.cpython-38.pyc View File


BIN
be/model/__pycache__/postgreSQLORM.cpython-38.pyc View File


+ 111
- 1
be/model/buyer.py View File

@ -15,6 +15,75 @@ from be.model import db_conn
from be.model import error
from datetime import datetime
import threading
from datetime import timedelta
to_be_overtime={}
def overtime_append(key,value):#对to_be_overtime进行操作
global to_be_overtime
if key in to_be_overtime:
to_be_overtime[key].append(value)
else:
to_be_overtime[key]=[value]
class TimerClass(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.event = threading.Event()
def thread(self):
# threading.Thread(target=Buyer().auto_cancel(to_be_overtime[(datetime.utcnow() + timedelta(seconds=1)).second])).start()
# 上面这个,有利有弊利是超过一秒也能处理 弊是每有任何一次路由访问就要延时开一次线程次数秒
Buyer().auto_cancel(to_be_overtime[(datetime.utcnow() + timedelta(seconds=1)).second])
def run(self): # 每秒运行一次 将超时订单删去
global to_be_overtime
# schedule.every().second.do(thread)#每秒开一个线程去auto_cancel,做完的线程自动退出
while not self.event.is_set():
self.event.wait(1)
if (datetime.utcnow() + timedelta(seconds=1)).second in to_be_overtime:
self.thread()
# schedule.run_pending()
def cancel_timer(self):
self.event.set()
# tmr = TimerClass()#####################在无需测试自动取消订单test时删去 单独测取消订单请在test.sh中用
# #coverage run --timid --branch --source fe,be --concurrency=thread -m pytest -v -k fe/test/test_new_order.py --ignore=fe/data
# tmr.start()###################在无需测试自动取消订单test时删去
def tostop():
global tmr
tmr.cancel_timer()
def jwt_encode(user_id: str, terminal: str) -> str:
encoded = jwt.encode(
{"user_id": user_id, "terminal": terminal, "timestamp": time.time()},
key=user_id,
algorithm="HS256",
)
return encoded.decode("utf-8")
# decode a JWT to a json string like:
# {
# "user_id": [user name],
# "terminal": [terminal code],
# "timestamp": [ts]} to a JWT
# }
def jwt_decode(encoded_token, user_id: str) -> str:
decoded = jwt.decode(encoded_token, key=user_id, algorithms="HS256")
return decoded
class DateEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return json.JSONEncoder.default(self, obj)
class Buyer(db_conn.DBConn):
def __init__(self):
db_conn.DBConn.__init__(self)
@ -71,6 +140,7 @@ class Buyer(db_conn.DBConn):
# "VALUES(?, ?, ?, ?);",
# (uid, book_id, count, price))
# print("touch1")
timenow = datetime.utcnow()
creat_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) ## 订单创建时间
new_order_entity = New_Order(order_id=uid,store_id=store_id,user_id=user_id,creat_time=creat_time,status=0)
self.session.add(new_order_entity)
@ -79,8 +149,10 @@ class Buyer(db_conn.DBConn):
# "VALUES(?, ?, ?);",
# (uid, store_id, user_id))
# self.conn.commit()
overtime_append(timenow.second,order_id) ##将刚下单的订单加入到overtime中管理
self.session.commit()
order_id = uid
except SQLAlchemyError as e:
logging.info("528, {}".format(str(e)))
return 528, "{}".format(str(e)), ""
@ -256,7 +328,11 @@ class Buyer(db_conn.DBConn):
return error.error_invalid_order_id(order_id)
if row.status != 0:
return error.error_invalid_order_id(order_id)
store_id = row.store_id
row2 = session.query(New_Order_Detail).filter(New_Order_Detail.order_id==order_id).first()
book_id = row2.book_id
count = row2.count
row3 = session.query(Store_Book).filter(and_(Store_Book.store_id==store_id,Store_Book.book_id==book_id)).update({'stock_level':Store_Book.stock_level+count})
row = session.query(New_Order).filter(and_(New_Order.order_id==order_id,New_Order.user_id==user_id)).update({'status':-1})
if row == 0:
@ -290,3 +366,37 @@ class Buyer(db_conn.DBConn):
# print('touch3')
return 530, "{}".format(str(e))
return 200, "ok", order_ids
def auto_cancel(self,order_id_list):#自动取消订单
exist_order_need_cancel=0
try:
for order_id in order_id_list:
row = self.session.query(New_Order).filter(New_Order.order_id==order_id).first()
if row is None:
return error.error_invalid_order_id(order_id)
#是否属于未付款订单
if row.status == 0:
timenow = datetime.utcnow()
check_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
creat_time = row.creat_time
time_1_struct = datetime.strptime(creat_time, "%Y-%m-%d %H:%M:%S")
time_2_struct = datetime.strptime(check_time, "%Y-%m-%d %H:%M:%S")
seconds = (time_2_struct - time_1_struct).seconds
if seconds>= 30: ## 订单超过30秒为付款自动取消
store_id = row.store_id
row3 = self.session.query(New_Order).filter(New_Order_Detail.order_id==order_id).first()
book_id = row3.book_id
count = row3.count
self.session.query(Store_Book).filter(and_(Store_Book.store_id==store_id,Store_Book.book_id==book_id)).update({'stock_level':Store_Book.stock_level+count})
self.session.query(New_Order).filter(New_Order.order_id==order_id).update({'status':-1})
exist_order_need_cancel = 1
self.session.commit()
except SQLAlchemyError as e:
return 528, "{}".format(str(e))
except BaseException as e:
# print('touch3')
return 530, "{}".format(str(e))
return 'no_such_order' if exist_order_need_cancel==0 else "auto_cancel_done"

+ 180
- 2
be/model/postgreSQLORM.py View File

@ -1,12 +1,14 @@
from sqlalchemy import create_engine,MetaData
from sqlalchemy import Integer,String,ForeignKey,Column,TEXT
from sqlalchemy import Integer,String,ForeignKey,Column,TEXT,LargeBinary
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import PrimaryKeyConstraint,UniqueConstraint
import sqlalchemy
import re
import time
import jieba.analyse
from jieba import cut_for_search
Base=declarative_base()
class con:
@ -117,6 +119,177 @@ class New_Order_Detail(Base):
PrimaryKeyConstraint('order_id','book_id'),
)
class Book(Base):
__tablename__ = 'book'
book_id = Column(TEXT, primary_key=True)
title = Column(TEXT, nullable=False)
author = Column(TEXT)
publisher = Column(TEXT)
original_title = Column(TEXT)
translator = Column(TEXT)
pub_year = Column(TEXT)
pages = Column(Integer)
original_price = Column(Integer) # 原价
currency_unit = Column(TEXT)
binding = Column(TEXT)
isbn = Column(TEXT)
author_intro = Column(TEXT)
book_intro = Column(TEXT)
content = Column(TEXT)
tags = Column(TEXT)
picture = Column(LargeBinary)
# 搜索标题表
class Search_title(Base):
__tablename__ = 'search_title'
search_id = Column(Integer, nullable=False)
title = Column(TEXT, nullable=False)
book_id = Column(TEXT, ForeignKey('book.book_id'), nullable=False)
__table_args__ = (
PrimaryKeyConstraint('search_id', 'title'),
{},
)
# 搜索标签表
class Search_tags(Base):
__tablename__ = 'search_tags'
search_id = Column(Integer, nullable=False)
tags = Column(TEXT, nullable=False)
book_id = Column(TEXT, ForeignKey('book.book_id'), nullable=False)
__table_args__ = (
PrimaryKeyConstraint('search_id', 'tags'),
{},
)
# 搜索作者表
class Search_author(Base):
__tablename__ = 'search_author'
search_id = Column(Integer, nullable=False)
author = Column(TEXT, nullable=False)
book_id = Column(TEXT, ForeignKey('book.book_id'), nullable=False)
__table_args__ = (
PrimaryKeyConstraint('search_id', 'author'),
{},
)
# 搜索书本内容表
class Search_book_intro(Base):
__tablename__ = 'search_book_intro'
search_id = Column(Integer, nullable=False)
book_intro = Column(TEXT, nullable=False)
book_id = Column(TEXT, ForeignKey('book.book_id'), nullable=False)
__table_args__ = (
PrimaryKeyConstraint('search_id', 'book_intro'),
{},
)
def insert_tags(session):
row = session.query(Book).all()
for i in row:
tags = i.tags
for j in tags:
row_tmp = session.query(Search_tags).filter(Search_tags.tags==j).order_by(Search_tags.search_id.desc()).first()
if row_tmp is None:
max_num = 0
else:
max_num = row_tmp.search_id + 1
new_search_tags = Search_tags(search_id=max_num,tags=j,book_id=i.book_id)
session.add(new_search_tags)
session.commit()
def insert_author(session):
row = session.query(Book).all()
for i in row:
tmp = i.author
if tmp == None:
j = '作者不详'
else:
j = tmp
row_tmp = session.query(Search_author).filter(Search_author.author==j).order_by(Search_author.search_id.desc()).first()
if row_tmp is None:
max_num = 0
else:
max_num = row_tmp.search_id + 1
# print(max_num, j, i.book_id)
new_search_author = Search_author(search_id=max_num,author=j,book_id=i.book_id)
session.add(new_search_author)
session.commit()
def insert_title(session):
row = session.query(Book).all()
for i in row:
tmp = i.title
# print(tmp)
tmp = re.sub(r'[\(\[\{(【][^))】]*[\)\]\{\\)]\s?', '', tmp)
tmp = re.sub(r'[^\w\s]', '', tmp)
# 处理空标题
if len(tmp) == 0:
continue
# 搜索引擎模式,在精确模式的基础上,对长词再次切分,提高召回率,适合用于搜索引擎分词。
seg_list = cut_for_search(tmp)
sig_list = []
tag = 0
for k in seg_list:
sig_list.append(k)
if k == tmp:
tag = 1
if tag == 0:
sig_list.append(tmp)
for j in sig_list:
if j == "" or j == " ":
continue
row_tmp = session.query(Search_title).filter(Search_title.title==j).order_by(Search_title.search_id.desc()).first()
if row_tmp is None:
max_num = 0
else:
max_num = row_tmp.search_id + 1
# print(max_num, j, i.book_id)
new_search_title = Search_title(search_id=max_num,title=j,book_id=i.book_id)
session.add(new_search_title)
session.commit()
def insert_book_intro(session):
row = session.query(Book).all()
for i in row:
tmp = i.book_intro
if tmp != None:
# print(tmp)
# 采用textrank进行分词
keywords_textrank = jieba.analyse.textrank(tmp)
# print(keywords_textrank)
# keywords_tfidf = jieba.analyse.extract_tags(tmp)
# print(keywords_tfidf)
for j in keywords_textrank:
row_tmp = session.query(Search_book_intro).filter(Search_book_intro.book_intro==j).order_by(Search_title.search_id.desc()).first()
if row_tmp is None:
max_num = 0
else:
max_num = row_tmp.search_id + 1
# print(max_num, j, i.book_id)
new_search_book_intro = Search_book_intro(search_id=max_num,book_intro=j,book_id=i.book_id)
session.add(new_search_book_intro)
session.commit()
engine, meta = con.connect()
Base.metadata.bind = engine
@ -124,3 +297,8 @@ Base.metadata.bind = engine
DBSession = sessionmaker(bind=engine)
session = DBSession()
insert_tags(session=session)
insert_author(session=session)
insert_title(session=session)
insert_book_intro(session=session)

+ 29
- 0
be/model/seller.py View File

@ -4,6 +4,7 @@ from be.model import postgreSQLORM
from be.model.postgreSQLORM import New_Order
from be.model import db_conn
from sqlalchemy import and_
import json
class Seller(db_conn.DBConn):
@ -21,6 +22,34 @@ class Seller(db_conn.DBConn):
# self.session.query(postgreSQLORM.Store_Book).
new_book = postgreSQLORM.Store_Book(store_id=store_id,book_id=book_id,book_info=book_json_str,stock_level=stock_level)
self.session.add(new_book)
book = json.loads(book_json_str)
thelist = [] # 由于没有列表类型,故使用将列表转为text的办法
for tag in book.get('tags'):
if tag.strip() != "":
# book.tags.append(tag)
thelist.append(tag)
book['tags'] = str(thelist) # 解析成list请使用eval(
if book.get('picture') is not None:
new_Book = postgreSQLORM.Book(book_id=book['id'], title=book['title'],
author=book['author'],publisher=book['publisher'],
original_title=book['original_title'],translator=book['translator'],
pub_year=book['pub_year'],pages=book['pages'],
original_price=book['price'],currency_unit=book['currency_unit'],
binding=book['binding'],isbn=book['isbn'],
author_intro=book['author_intro'],book_intro=book['book_intro'],
content=book['content'],tags=book['tags'],
picture=book['picture'])
else:
new_Book = postgreSQLORM.Book(book_id=book['id'], title=book['title'],
author=book['author'],publisher=book['publisher'],
original_title=book['original_title'],translator=book['translator'],
pub_year=book['pub_year'],pages=book['pages'],
original_price=book['price'],currency_unit=book['currency_unit'],
binding=book['binding'],isbn=book['isbn'],
author_intro=book['author_intro'],book_intro=book['book_intro'],
content=book['content'],tags=book['tags'],
picture=book['picture'])
self.session.add(new_Book)
# self.conn.execute("INSERT into store(store_id, book_id, book_info, stock_level)"
# "VALUES (?, ?, ?, ?)", (store_id, book_id, book_json_str, stock_level))
# self.conn.commit()

+ 104
- 1
be/model/user.py View File

@ -3,11 +3,16 @@ import time
import logging
import sqlite3 as sqlite
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import and_,or_
from be.model import error
from be.model import postgreSQLORM
from be.model.postgreSQLORM import User
from be.model import db_conn
from be.model.postgreSQLORM import Book
from be.model.postgreSQLORM import Search_author
from be.model.postgreSQLORM import Search_tags
from be.model.postgreSQLORM import Search_book_intro
from be.model.postgreSQLORM import Search_title
# encode a json string like:
# {
# "user_id": [user name],
@ -200,3 +205,101 @@ class User(db_conn.DBConn):
return 530, "{}".format(str(e))
return 200, "ok"
def search_author(self, author:str,page:int)-> (int,[dict]):#200,'ok',list[{str,str,str,str,list,bytes}]
ret=[]
# select
# title, author, publisher, book_intro, tags, picture
# from book where
# book_id in
# (select book_id from search_tags where tags='小说' and search_id BETWEEN 20 and 30)
#已付款
row = self.session.query(Search_author.book_id).filter(and_(Search_author.author==author,Search_author.search_id<=10*page-1,Search_author.search_id<=10*page-10)).all()
if len(row)!=0:
for i in row:
row_tmp = self.session.query(Book).filter(Book.book_id==i).first()
if row_tmp is None:
continue
else:
title=row_tmp.title
author_ = row_tmp.author
publisher = row_tmp.publisher
book_intro = row_tmp.book_intro
tags = row_tmp.tags
ret.append(
{'title': title, 'author': author_, 'publisher': publisher,
'book_intro': book_intro,
'tags': tags})
return 200, ret
else:
return 200, []
def search_book_intro(self, book_intro:str,page:int)-> (int,[dict]):
ret = []
row = self.session.query(Search_book_intro.book_id).filter(and_(Search_book_intro.book_intro==book_intro,Search_book_intro.search_id<=10*page-1,Search_author.search_id<=10*page-10)).all()
if len(row)!=0:
for i in row:
row_tmp = self.session.query(Book).filter(Book.book_id==i).first()
if row_tmp is None:
continue
else:
title=row_tmp.title
author_ = row_tmp.author
publisher = row_tmp.publisher
book_intro = row_tmp.book_intro
tags = row_tmp.tags
ret.append(
{'title': title, 'author': author_, 'publisher': publisher,
'book_intro': book_intro,
'tags': tags})
return 200, ret
else:
return 200, []
def search_tags(self, tags:str,page:int)-> (int,[dict]):
ret = []
row = self.session.query(Search_tags.book_id).filter(and_(Search_tags.tags==tags,Search_tags.search_id<=10*page-1,Search_tags.search_id<=10*page-10)).all()
if len(row)!=0:
for i in row:
row_tmp = self.session.query(Book).filter(Book.book_id==i).first()
if row_tmp is None:
continue
else:
title=row_tmp.title
author_ = row_tmp.author
publisher = row_tmp.publisher
book_intro = row_tmp.book_intro
tags = row_tmp.tags
ret.append(
{'title': title, 'author': author_, 'publisher': publisher,
'book_intro': book_intro,
'tags': tags})
return 200, ret
else:
return 200, []
def search_title(self, title:str,page:int)-> (int,[dict]):
ret = []
row = self.session.query(Search_title.book_id).filter(and_(Search_title.title==title,Search_title.search_id<=10*page-1,Search_author.search_id<=10*page-10)).all()
if len(row)!=0:
for i in row:
row_tmp = self.session.query(Book).filter(Book.book_id==i).first()
if row_tmp is None:
continue
else:
title=row_tmp.title
author_ = row_tmp.author
publisher = row_tmp.publisher
book_intro = row_tmp.book_intro
tags = row_tmp.tags
ret.append(
{'title': title, 'author': author_, 'publisher': publisher,
'book_intro': book_intro,
'tags': tags})
return 200, ret
else:
return 200, []

+ 2
- 0
be/serve.py View File

@ -10,6 +10,7 @@ from be.view import auth
from be.view import seller
from be.view import buyer
from be.model.creatTB import createTable
from be.model.buyer import tostop
bp_shutdown = Blueprint("shutdown", __name__)
@ -22,6 +23,7 @@ def shutdown_server():
@bp_shutdown.route("/shutdown", methods=["POST","GET"])
def be_shutdown():
# tostop() ## 不需要测试自动取消时删去此部分
shutdown_server()
return "Server shutting down..."

BIN
fe/access/__pycache__/buyer.cpython-38.pyc View File


BIN
fe/access/__pycache__/seller.cpython-38.pyc View File


+ 22
- 0
fe/access/auth.py View File

@ -47,3 +47,25 @@ class Auth:
url = urljoin(self.url_prefix, "unregister")
r = requests.post(url, json=json)
return r.status_code
def search_author(self, author:str,page:int) -> int:
json = {"author": author,"page":page}
url = urljoin(self.url_prefix, "search_author")
r = requests.post(url, json=json)
return r.status_code
def search_book_intro(self, book_intro:str,page:int) -> int:
json = {"book_intro": book_intro,"page":page}
url = urljoin(self.url_prefix, "search_book_intro")
r = requests.post(url, json=json)
return r.status_code
def search_tags(self, tags:str,page:int) -> int:
json = {"tags": tags,"page":page}
url = urljoin(self.url_prefix, "search_tags")
r = requests.post(url, json=json)
return r.status_code
def search_title(self, title:str,page:int) -> int:
json = {"title": title,"page":page}
url = urljoin(self.url_prefix, "search_title")
r = requests.post(url, json=json)
return r.status_code

+ 45
- 0
fe/access/buyer.py View File

@ -2,6 +2,7 @@ import requests
import simplejson
from urllib.parse import urljoin
from fe.access.auth import Auth
from be.model.buyer import Buyer as Buyer_
class Buyer:
@ -40,3 +41,47 @@ class Buyer:
headers = {"token": self.token}
r = requests.post(url, headers=headers, json=json)
return r.status_code
def take_over(self,user_id: str, order_id: str):
json = {
"user_id": user_id,
"order_id": order_id
}
url = urljoin(self.url_prefix, "take_over")
headers = {"token": self.token}
r = requests.post(url, headers=headers, json=json)
return r.status_code
def history_order(self,user_id: str):
json = {
"user_id": user_id
}
url = urljoin(self.url_prefix, "search_order")
headers = {"token": self.token}
r = requests.post(url, headers=headers, json=json)
return r.status_code
def order_cancel(self,user_id: str, order_id: str):
json = {
"user_id": user_id,
"order_id": order_id
}
url = urljoin(self.url_prefix, "order_cancel")
headers = {"token": self.token}
r = requests.post(url, headers=headers, json=json)
return r.status_code
def auto_cancel(self, store_id: str, book_id_and_count: [(str, int)]) -> (int, str):####测试auto_cancel
books = []
for id_count_pair in book_id_and_count:
books.append({"id": id_count_pair[0], "count": id_count_pair[1]})
json = {"user_id": self.user_id, "store_id": store_id, "books": books}
#print(simplejson.dumps(json))
url = urljoin(self.url_prefix, "new_order")
headers = {"token": self.token}
r = requests.post(url, headers=headers, json=json)
response_json = r.json()
import time
time.sleep(161)#若没有说明在1s内处理完 #设大一点好跑travis
return Buyer_().auto_cancel([response_json.get("order_id")])

+ 10
- 0
fe/access/seller.py View File

@ -50,3 +50,13 @@ class Seller:
headers = {"token": self.token}
r = requests.post(url, headers=headers, json=json)
return r.status_code
def send_out(self,user_id: str, order_id: str):
json = {
"user_id": user_id,
"order_id": order_id
}
url = urljoin(self.url_prefix, "send_out")
headers = {"token": self.token}
r = requests.post(url, headers=headers, json=json)
return r.status_code

BIN
fe/test/__pycache__/test_history_order.cpython-38-pytest-6.2.3.pyc View File


BIN
fe/test/__pycache__/test_order_cancel.cpython-38-pytest-6.2.3.pyc View File


BIN
fe/test/__pycache__/test_send_out.cpython-38-pytest-6.2.3.pyc View File


BIN
fe/test/__pycache__/test_take_over.cpython-38-pytest-6.2.3.pyc View File


+ 58
- 0
fe/test/test_history_order.py View File

@ -0,0 +1,58 @@
import time
import uuid
import pytest
import random
from fe.test.gen_book_data import GenBook
from fe.access.new_buyer import register_new_buyer
from fe.access.book import Book
class Test_history_order:
@pytest.fixture(autouse=True)
def pre_run_initialization(self):
self.buyer_id = "test_search_order__buyer_{}".format(str(uuid.uuid1()))
self.password = self.buyer_id
b = register_new_buyer(self.buyer_id, self.password)
self.buyer = b
yield
def test_ok(self):
buy_time = random.randint(5, 10)#买buy_time次书,产生buytime个记录
for i in range(buy_time):
self.seller_id = "test_search_order_seller_{}".format(str(uuid.uuid1()))
self.store_id = "test_search_order_store_id_{}".format(str(uuid.uuid1()))
self.gen_book = GenBook(self.seller_id, self.store_id)
self.seller = self.gen_book.seller
ok, buy_book_id_list = self.gen_book.gen(non_exist_book_id=False, low_stock_level=False, max_book_count=5)
self.buy_book_info_list = self.gen_book.buy_book_info_list
assert ok
self.total_price = 0
for item in self.buy_book_info_list:
book: Book = item[0]
num = item[1]
self.total_price = self.total_price + book.price * num
code = self.buyer.add_funds(self.total_price + 100000)
assert code == 200
code, self.order_id = self.buyer.new_order(self.store_id, buy_book_id_list)
assert code == 200
flag = random.randint(0, 2)
if flag != 0:
code = self.buyer.payment(self.order_id)
assert code == 200
if flag == 1:
code = self.seller.send_out(self.seller_id, self.order_id)
assert code == 200
code = self.buyer.take_over(self.buyer_id, self.order_id)
assert code == 200
code = self.buyer.history_order(self.buyer_id)
assert code == 200
def test_false_buyer(self):
code = self.buyer.history_order(self.buyer_id+'s')
assert code != 200
def test_no_record_buyer(self):
code = self.buyer.history_order(self.buyer_id)
assert code == 200

+ 57
- 0
fe/test/test_order_cancel.py View File

@ -0,0 +1,57 @@
import time
import uuid
import pytest
from fe.test.gen_book_data import GenBook
from fe.access.new_buyer import register_new_buyer
from fe.access.book import Book
class Test_order_cancel:
@pytest.fixture(autouse=True)
def pre_run_initialization(self):
self.store_id = "test_cancel_store_{}".format(str(uuid.uuid1()))
self.seller_id = "test_cancel_seller_{}".format(str(uuid.uuid1()))
self.store_id = "test_cancel_store_id_{}".format(str(uuid.uuid1()))
self.buyer_id = "test_cancel__buyer_{}".format(str(uuid.uuid1()))
gen_book = GenBook(self.seller_id, self.store_id)
self.seller=gen_book.seller
ok, buy_book_id_list = gen_book.gen(non_exist_book_id=False, low_stock_level=False, max_book_count=5)
self.buy_book_info_list = gen_book.buy_book_info_list
assert ok
self.password = self.buyer_id
b = register_new_buyer(self.buyer_id, self.password)
self.buyer = b
self.total_price = 0
for item in self.buy_book_info_list:
book: Book = item[0]
num = item[1]
self.total_price = self.total_price + book.price * num
code = self.buyer.add_funds(self.total_price+100000)
assert code == 200
code, self.order_id = b.new_order(self.store_id, buy_book_id_list)
assert code == 200
yield
def test_ok_paid(self):
code = self.buyer.payment(self.order_id)
assert code == 200
code = self.buyer.order_cancel(self.buyer_id,self.order_id)
assert code == 200
def test_false_buyer(self):
code = self.buyer.order_cancel(self.buyer_id+'s',self.order_id)
assert code != 200
def test_ok_unpay(self):
code = self.buyer.order_cancel(self.buyer_id, self.order_id)
assert code == 200
def test_cannot_cancel_order(self):
code = self.buyer.payment(self.order_id)
assert code == 200
code = self.seller.send_out(self.seller_id, self.order_id)
assert code == 200
code = self.buyer.order_cancel(self.buyer_id, self.order_id)
assert code != 200

+ 28
- 0
fe/test/test_search.py View File

@ -0,0 +1,28 @@
import time
import pytest
from fe.access.new_seller import register_new_seller
from fe.access.book import Book
from fe.access import book
from fe.access import auth
from fe import conf
import uuid
import random
class TestSearch:
@pytest.fixture(autouse=True)
def pre_run_initialization(self):
self.auth = auth.Auth(conf.URL)
self.author = "test_author_{}".format(str(uuid.uuid1()))
self.book_intro = "test_book_intro_{}".format(str(uuid.uuid1()))
self.tags = "test_tags_{}".format(str(uuid.uuid1()))#random.choice(["小说","励志"])
self.title = "test_title_{}".format(str(uuid.uuid1()))#random.choice(["很","在"])
self.store_id = "test_store_id_{}".format(str(uuid.uuid1()))
self.page = random.randint(1,2)
yield
def test_search(self):
assert self.auth.search_author(self.author, self.page) == 200
assert self.auth.search_book_intro(self.book_intro, self.page) == 200
assert self.auth.search_tags(self.tags, self.page) == 200
assert self.auth.search_title(self.title, self.page) == 200

+ 54
- 0
fe/test/test_send_out.py View File

@ -0,0 +1,54 @@
import time
import uuid
import pytest
from fe.test.gen_book_data import GenBook
from fe.access.new_buyer import register_new_buyer
from fe.access.book import Book
class Test_send_out:
@pytest.fixture(autouse=True)
def pre_run_initialization(self):
self.store_id = "test_send_book_store_{}".format(str(uuid.uuid1()))
self.seller_id = "test_send_seller_{}".format(str(uuid.uuid1()))
self.store_id = "test_send_book_store_id_{}".format(str(uuid.uuid1()))
self.buyer_id = "test_send__buyer_{}".format(str(uuid.uuid1()))
gen_book = GenBook(self.seller_id, self.store_id)
self.seller=gen_book.seller
ok, buy_book_id_list = gen_book.gen(non_exist_book_id=False, low_stock_level=False, max_book_count=5)
self.buy_book_info_list = gen_book.buy_book_info_list
assert ok
self.password = self.buyer_id
b = register_new_buyer(self.buyer_id, self.password)
self.buyer = b
self.total_price = 0
for item in self.buy_book_info_list:
book: Book = item[0]
num = item[1]
self.total_price = self.total_price + book.price * num
code = self.buyer.add_funds(self.total_price+100000)
assert code == 200
code, self.order_id = b.new_order(self.store_id, buy_book_id_list)
assert code == 200
code=b.payment(self.order_id )
assert code == 200
yield
def test_ok(self):
code = self.seller.send_out(self.seller_id,self.order_id)
assert code == 200
def test_false_seller(self):
code = self.seller.send_out(self.seller_id+'s',self.order_id)
assert code != 200
def test_non_exist_order(self):
code = self.seller.send_out(self.seller_id,self.order_id + 's')
assert code != 200
def test_repeat_send_books(self):
code = self.seller.send_out(self.seller_id,self.order_id)
assert code == 200
code = self.seller.send_out(self.seller_id,self.order_id )
assert code != 200

+ 57
- 0
fe/test/test_take_over.py View File

@ -0,0 +1,57 @@
import time
import uuid
import pytest
from fe.test.gen_book_data import GenBook
from fe.access.new_buyer import register_new_buyer
from fe.access.book import Book
class Test_take_over:
@pytest.fixture(autouse=True)
def pre_run_initialization(self):
self.store_id = "test_send_book_store_{}".format(str(uuid.uuid1()))
self.seller_id = "test_send_seller_{}".format(str(uuid.uuid1()))
self.store_id = "test_send_book_store_id_{}".format(str(uuid.uuid1()))
self.buyer_id = "test_send__buyer_{}".format(str(uuid.uuid1()))
gen_book = GenBook(self.seller_id, self.store_id)
self.seller=gen_book.seller
ok, buy_book_id_list = gen_book.gen(non_exist_book_id=False, low_stock_level=False, max_book_count=5)
self.buy_book_info_list = gen_book.buy_book_info_list
assert ok
self.password = self.buyer_id
b = register_new_buyer(self.buyer_id, self.password)
self.buyer = b
self.total_price = 0
for item in self.buy_book_info_list:
book: Book = item[0]
num = item[1]
self.total_price = self.total_price + book.price * num
code = self.buyer.add_funds(self.total_price+100000)
assert code == 200
code, self.order_id = b.new_order(self.store_id, buy_book_id_list)
assert code == 200
code=b.payment(self.order_id )
assert code == 200
code = self.seller.send_out(self.seller_id, self.order_id)
assert code == 200
yield
def test_ok(self):
code = self.buyer.take_over(self.buyer_id,self.order_id)
assert code == 200
def test_false_buyer(self):
code = self.buyer.take_over(self.buyer_id+'s',self.order_id)
assert code != 200
def test_non_exist_order(self):
code = self.buyer.take_over(self.buyer_id,self.order_id + 's')
assert code != 200
def test_repeat_receive_books(self):
code = self.buyer.take_over(self.buyer_id,self.order_id)
assert code == 200
code = self.buyer.take_over(self.buyer_id,self.order_id )
assert code != 200

BIN
figure_require/route_test01.png View File

Before After
Width: 718  |  Height: 437  |  Size: 47 KiB

+ 74
- 2
实验报告.md View File

@ -6,6 +6,8 @@
| **上机实践名称** :BookStore ||**上机实践日期**:2022.11.28 —— 2022.12.10 |
| **上机实践编号** : | **组号 :21** |**上机实践时间**:2022.11.28 —— 2022.12.10 |
$\color{red}{报告你就写一下下面我标红字的那些地方,顺便注意下排版之类的,补充实现思路的那个就参考后面我单独发的那个readme里面的实现思路,就简单看着代码说说在那些表上查什么就行}$
### 实验过程
#### 一. 分析原有的数据库结构
@ -256,6 +258,9 @@
![avatar](./figure_require/register_test.png)
7. 另外对于auth路由中的其他功能接口(注销、登录、登出、更改密码)进行类似上述注册接口的修改,此处不在单独贴出代码,只是给出postman的测试截图,至此auth中的路由全部实现(2022.11.30 17:50 杨舜)
$\color{red}{补充各个接口的实现思路}$
![avatar](./figure_require/unregister_test.png)
![avatar](./figure_require/login_test.png)
![avatar](./figure_require/logout_test.png)
@ -266,7 +271,7 @@
![avatar](./figure_require/addbook_test.png)
![avatar](./figure_require/addstocklevel_test.png)
9. 利用上述类似的实现auth路由接口的方式完成buyer路由接口,并利用postman测试实现(2022.12.02 12:10 杨舜)
9. 利用上述类似的实现auth路由接口的方式完成buyer路由接口,并利用postman测试实现(2022.12.02 12:10 杨舜)
![avatar](./figure_require/neworder_test.png)
![avatar](./figure_require/addfunds_test.png)
![avatar](./figure_require/payment_test.png)
@ -331,6 +336,8 @@
return 530, "{}".format(str(e))
return 200, "ok"
```
$\color{red}{补充实现思路}$
$\color{red}{顺便将这部分的下面这种接口说明移动到doc目录下的文件夹里面}$
商家发货
@ -583,4 +590,69 @@ Body:
变量名 | 类型 | 描述 | 是否可为空
---|---|---|---
order_id | string | 订单号,只有返回200时才有效 | N
1.
6. 为上面添加的路由编写测试接口并进行测试
![avatar](./figure_require/route_test01.png)
7. 为实现书店的搜索图书的功能,稍微修改数据库的结构,为书籍添加数据表,搜索标题表,搜索标签表,搜索作者表,搜索书本内容表
$$\color{red}{看着postgreSQLORM里面Class Book以及后面的几个class写一下几个table的表头,参照我写的一个}$$
Search_title
|search_id|title|book_id|
|---|---|---|
联合主键(search_id,book_id)
外键(book_id)
8. 修改seller中的add_book的路由
$\color{red}{补充实现思路}$
9. 在auth中添加搜索的路由(只包含全局搜索,没有店铺内搜索)
```python
@bp_auth.route("/search_author", methods=["POST"])
def search_author():
author = request.json.get("author", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_author(author=author, page=page)
return jsonify({"message": message}), code
@bp_auth.route("/search_book_intro", methods=["POST"])
def search_book_intro():
book_intro = request.json.get("book_intro", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_book_intro(book_intro=book_intro, page=page)
return jsonify({"message": message}), code
@bp_auth.route("/search_tags", methods=["POST"])
def search_tags():
tags = request.json.get("tags", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_tags(tags=tags, page=page)
return jsonify({"message": message}), code
@bp_auth.route("/search_title", methods=["POST"])
def search_title():
title = request.json.get("title", "")
page = request.json.get("page", "")
u = user.User()
code, message = u.search_title(title=title, page=page)
return jsonify({"message": message}), code
```
10. 为搜索编写测试接口
$$ \color{red}{测试截图,需要终端里面的那些截图和导出的html文件(htmlcov目录中index.html)截图}$$
### 测试结果
$\color{red}{交给你了( ᗜ ‸ ᗜ )}$
### 总结
$\color{red}{交给你了( ᗜ ‸ ᗜ )}$
### 分工
$\color{red}{交给你了( ᗜ ‸ ᗜ )}$

Loading…
Cancel
Save