迁移Git项目

This commit is contained in:
mnjnhuang 2025-03-19 18:02:29 +08:00
commit 63b72e3b14
15 changed files with 628 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/.idea
/.venv
/test.py

Binary file not shown.

View File

@ -0,0 +1,61 @@
from abc import abstractmethod
from time import sleep
import json
from framework.open_response import MinCustomException
from framework.mysql_cli import MysqlCliV3
from framework.open_response import OpenResponse
class Response(OpenResponse):
"""必须实现Core和Before_Response方法"""
def __init__(self):
super(Response, self).__init__()
self._is_connect_mysql: bool = False
self.mysql_cli: MysqlCliV3 | None = None
def set_connect_mysql(self, mysql_cli: MysqlCliV3 | None = None):
if mysql_cli is not None:
self._is_connect_mysql = True
self.mysql_cli = mysql_cli
# def set_handle_success(self, handle_success: bool):
# self._handle_success = handle_success
@abstractmethod
def core(self) -> None:
# TODO: 用于处理核心内容, 数据库逻辑等
pass
@abstractmethod
def before_response(self) -> dict:
# TODO: 用于创建返回的内容, 以及关闭数据库连接等
pass
def run_before_response(self) -> dict:
response_message: dict = self.before_response()
if self._is_connect_mysql:
self.mysql_cli.close_connect()
return response_message
def run_core(self):
self.core()
if self._handle_success:
self.set_response_code(self.response_code_enum.RESPONSE_200_SUCCESS)
def main_function(response: Response) -> dict:
"""
:param response: 重写的Response类
"""
try:
response.run_core()
except json.decoder.JSONDecodeError:
response.set_response_code(response.response_code_enum.RESPONSE_400_BAD_REQUESTS_JSON)
sleep(1)
response.echo = True
except MinCustomException as e:
response.set_response_code(e.args[0])
sleep(1)
response.echo = True
return response.run_before_response()

92
framework/mysql_cli.py Normal file
View File

@ -0,0 +1,92 @@
import pymysql
import copy
from framework.server_config import ServerConfig
from framework.response_code import MinCustomException, ResponseCode
from abc import ABC
class MysqlCliV3(ServerConfig):
def __init__(self):
super().__init__()
self._sql_response = {
"Status": False,
"Result": None,
"Execute_sql": None,
"Exception_error": None,
"Exception_code": 0
}
self._initialize_connection()
def _initialize_connection(self):
if not hasattr(self, "_connect"):
mysql_config = self.return_mysql_config()
self._connect = pymysql.connect(
host=mysql_config["host"],
user=mysql_config["user"],
password=mysql_config["password"],
database=mysql_config["database"],
port=mysql_config["port"]
)
self._cursor = self._connect.cursor()
def close_connect(self):
try:
self._cursor.close()
self._connect.close()
except pymysql.err.Error as e:
print(f"pymysql close: {e}")
def execute_sql(self, sql: str, fetch_all: bool = False, commit_all: bool = False,
close_connect: bool = True) -> dict:
sql_response = copy.deepcopy(self._sql_response)
try:
self._cursor.execute(sql)
result = self._cursor.fetchall() if fetch_all else self._cursor.fetchone()
if commit_all:
self._connect.commit()
sql_response.update({
"Status": True,
"Result": result,
"Execute_sql": sql
})
except pymysql.err.IntegrityError as e:
sql_response.update({
"Result": None,
"Exception_error": e,
"Exception_code": 1062,
"Execute_sql": sql
})
self._connect.rollback()
self._print_result()
self.close_connect()
# raise
except Exception as e:
sql_response.update({
"Result": None,
"Exception_error": e,
"Exception_code": 502,
"Execute_sql": sql
})
self._connect.rollback()
self._print_result()
self.close_connect()
raise
if close_connect:
self.close_connect()
return sql_response
def _print_result(self):
self.print_result(self._sql_response)
@staticmethod
def print_result(mysql_result: dict):
print("-" * 50)
print("Database Operation Result:")
for key, value in mysql_result.items():
print(f"{key}: {value}")
def check_mysql_exception(self, mysql_result: dict, print_result: bool = False):
if not mysql_result.get("Status"):
if print_result:
self.print_result(mysql_result)
raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_EXCEPTION)

View File

@ -0,0 +1,72 @@
from abc import ABC
from framework.response_code import ResponseCode, MinCustomException
from datetime import datetime
from flask import request
import hashlib
class OpenABC(ABC):
"""
提供公共函数和功能函数
"""
def __init__(self):
self.response_code_enum: ResponseCode = ResponseCode
@staticmethod
def call_datetime(need_str: bool = True):
return str(datetime.now()) if need_str else datetime.now()
@staticmethod
def md5sum(string: str):
md5 = hashlib.md5()
password = string.encode("utf-8")
md5.update(password)
return md5.hexdigest()
@staticmethod
def make_exception(error_code: ResponseCode):
raise MinCustomException(error_code)
class OpenResponse(OpenABC):
def __init__(self):
super(OpenResponse, self).__init__()
self._response_code = self.response_code_enum.RESPONSE_100_CONTINUE # 错误代码, 默认Continue
self._response_data = None # 返回的数据
self.response: dict = {}
self.echo = False # 是否打印消息
self._handle_success: bool = False
def make_response(self, use_custom_context: bool = False) -> dict:
if use_custom_context:
return self._response_data
response_code: ResponseCode = self._response_code
err_code, err_message = response_code.return_tuple()
self.response = {"code": err_code, "message": err_message}
if self._response_data is not None:
self.response.update({"data": self._response_data})
if self.echo is True:
print("-" * 50)
print("Interface ERROR:")
print(f"Echo url: {request.url}")
print(f"Echo methods: {request.method}")
if request.method == "GET":
print(f"Echo params: {request.args}")
elif request.method == "POST":
print(f"Echo data: {request.data}")
print(f"Echo response: {self.response}")
return self.response
def set_response_data(self, data: object):
self._response_data = data
self._handle_success = True
def set_response_code(self, code_enum: ResponseCode):
self._response_code = code_enum
def make_debug_response(self):
raise MinCustomException(self.response_code_enum.RESPONSE_201_DEBUG)

View File

@ -0,0 +1,43 @@
from dataclasses import dataclass
import json as JSON
from flask import request
from framework.open_response import OpenABC
from framework.response_code import RequestMethods
@dataclass
class SampleRequests(OpenABC):
"""
基本数据请求体
login_username: str
login_password: str
self.login_username = self.check_string(self.json_data.get("login_username"))
self.login_password = self.check_string(self.json_data.get("login_token"))
"""
def __init__(self, methods: RequestMethods, json_data: bytes | str | dict):
"""
:param methods: 请求方式, 需要传入枚举类型的数据
:param json_data: 请求的数据, 如果是POST请求, 则为bytes|str类型
如果是GET请求, 则为dict类型
"""
super().__init__()
if methods != RequestMethods.POST and methods != RequestMethods.GET:
self.make_exception(self.response_code_enum.RESPONSE_405_METHODS_NOT_ALLOW)
if methods == RequestMethods.POST:
self.json_data: dict = JSON.loads(json_data)
elif methods == RequestMethods.GET:
self.json_data = json_data
def repr(self):
print(self.__repr__())
@staticmethod
def return_headers():
return request.headers
# def return_token(self) -> str:
# authorization = request.headers.get("Authorization")
# if authorization and authorization.startswith("Bearer"):
# return authorization.replace("Bearer ", "")
# self.make_exception(self.response_code_enum.RESPONSE_401_TOKEN_INVALID)

View File

@ -0,0 +1,45 @@
from enum import unique
from enum import Enum
class MinCustomException(Exception):
def __init__(self, error):
self.error = error
super().__init__(self.error)
@unique
class RequestMethods(Enum):
GET = 1
POST = 2
PUT = 3
DELETE = 4
@unique
class ResponseCode(Enum):
def __init__(self, code: int, message: str) -> None:
self.code: int = code
self.message: str = message
# Main
RESPONSE_100_CONTINUE = 100, "Continue"
RESPONSE_200_SUCCESS = 200, "Success"
RESPONSE_201_DEBUG = 201, "Debug"
RESPONSE_502_BAD_GATEWAY = 502, "BadGateway"
RESPONSE_500_SERVER_ERROR = 500, "ServerError"
RESPONSE_405_METHODS_NOT_ALLOW = 405, "METHODS NOT ALLOW"
RESPONSE_501_NOT_IMPLEMENTED_EXCEPTION = 501.1, "NOT IMPLEMENTED" # 数据库语句异常
RESPONSE_501_NOT_IMPLEMENTED_NULL = 501.2, "NOT IMPLEMENTED" # 数据库语句异常
RESPONSE_400_BAD_REQUESTS_JSON = 400.1, "JsonDecodeError"
RESPONSE_401_Unauthorized = 401, "Unauthorized"
RESPONSE_401_TOKEN_INVALID = 401.1, "TokenInvalid"
RESPONSE_401_TOKEN_EXPIRE = 401.2, "TokenExpire"
# Request Params Warning
RESPONSE_PARAMS_IS_NULL = 10400, "未获取所有参数"
RESPONSE_PARAMS_IS_VALID = 10401, "请求参数错误"
def return_tuple(self) -> tuple:
return self.code, self.message

View File

@ -0,0 +1,111 @@
from framework.main_framework import Response, main_function
from framework.requests_dataclass import SampleRequests
from dataclasses import dataclass
from dataclasses import asdict
from flask import Blueprint
from flask import request
from framework.mysql_cli import MysqlCliV3
from framework.response_code import RequestMethods
@dataclass
class OverrideSampleRequestParams(SampleRequests):
"""
创建POST请求参数
"""
def __init__(self):
if request.method == "GET":
super(OverrideSampleRequestParams, self).__init__(json_data=request.args, methods=RequestMethods.GET)
if request.method == "POST":
super(OverrideSampleRequestParams, self).__init__(json_data=request.get_data(), methods=RequestMethods.POST)
@dataclass
class ResponseInterface:
"""
返回的内容
"""
def convert_to_dict(self) -> dict:
"""
调用这个函数将ResponseInterface数据类转成字典类型的数据
:return: dict
"""
return asdict(self)
class OverrideMysqlCli(MysqlCliV3):
"""
创建Mysql操作类
"""
def __init__(self):
super().__init__()
self.token_table = "accounts_info"
def handle_search_token_valid(self, username: str, user_token: str) -> None:
# 验证登录token
sql = f"""
SELECT
username
FROM {self.token_table}
WHERE username = '{username}' AND user_token = '{user_token} AND 1=1'
"""
self.create_sql(sql)
mysql_result: dict = self.fetchone()
self.check_mysql_exception(mysql_result=mysql_result)
self.check_mysql_response_null(mysql_result=mysql_result)
class OverriderMainResponse(Response):
"""
主要处理请求的类
"""
def __init__(self):
super(OverriderMainResponse, self).__init__()
# self.is_connect_mysql = connect_mysql
self.mysql_cli: MysqlCliV3 = MysqlCliV3()
self.set_connect_mysql(mysql_cli=self.mysql_cli)
def core(self) -> None:
"""
重写core函数处理核心内容 例如数据查询/数据处理
"""
params = OverrideSampleRequestParams()
"""
处理完核心内容后将 handle_success 设置为True
self.handle_success = True
"""
self.set_handle_success(handle_success=True)
def before_response(self) -> dict:
"""
重写before_response函数
例如关闭mysql/redis连接等
"""
return self.make_response()
"""
请不要忘记生成Flask蓝图
"""
sample_interface = Blueprint("sample_interface", __name__)
"""
请不要忘记指定URL路径
请不要忘记执行main_function函数
"""
@sample_interface.route("/sample/index", methods=["GET"])
def main():
"""
实例化OverrideMainResponse类 并传入main_function中
您只重写core()和before_response()
即可完成一个接口 它将返回一个dict直接发送给请求端
"""
override_main_response = OverriderMainResponse()
return main_function(override_main_response)

View File

@ -0,0 +1,35 @@
from enum import unique
from enum import Enum
@unique
class ServerEnv(Enum):
DEBUG = 1,
BUILD = 2,
class ServerConfig(object):
def __init__(self, env: ServerEnv = ServerEnv.DEBUG):
super(ServerConfig, self).__init__()
self._listen: str = "0.0.0.0"
self._listen_port: int = 5000
self._env: ServerEnv = env
self._mysql_host = "192.168.160.223"
self._mysql_user = "root"
self._mysql_password = "123456"
self._mysql_database = "djh_gm"
self._mysql_port = 3306
self._mysql_character = "UTF8"
def return_mysql_config(self) -> dict:
_mysql_config: dict = {"host": self._mysql_host, "user": self._mysql_user,
"password": self._mysql_password, "database": self._mysql_database,
"charset": self._mysql_character, "port": self._mysql_port}
return _mysql_config
def tell_listen(self) -> tuple:
return self._listen, self._listen_port
def tell_env(self) -> ServerEnv:
return self._env

View File

@ -0,0 +1,26 @@
from abc import ABC
from datetime import datetime
import hashlib
class SingletonFunction(ABC):
"""
提供公共函数和功能函数
"""
def __init__(self):
pass
@staticmethod
def call_datetime(need_str: bool = True):
return str(datetime.now()) if need_str else datetime.now()
@staticmethod
def md5sum(string: str):
md5 = hashlib.md5()
password = string.encode("utf-8")
md5.update(password)
return md5.hexdigest()
singleton_function = SingletonFunction()

27
main.py Normal file
View File

@ -0,0 +1,27 @@
from flask import Flask
from flask import abort
from flask_cors import CORS
from framework.server_config import ServerEnv
from framework.server_config import ServerConfig
from routers import register_blueprints
app = Flask(__name__)
CORS(app, supports_credentials=True)
register_blueprints(app)
@app.route("/")
def hello_world():
return abort(403)
server: ServerConfig = ServerConfig(env=ServerEnv.DEBUG)
# if server.tell_env() != ServerEnv.BUILD:
# # 开启计划任务
# pass
if __name__ == '__main__':
server_listen: tuple = server.tell_listen()
app.run(host=server_listen[0], port=server_listen[1],
debug=True if server.tell_env() == ServerEnv.DEBUG else False)

3
requirements Normal file
View File

@ -0,0 +1,3 @@
pymysql
flask
flask-cors

9
routers/__init__.py Normal file
View File

@ -0,0 +1,9 @@
from flask import Blueprint
from flask.app import Flask
from .search_gm import search_gm_data_bp
def register_blueprints(app: Flask):
# app.register_blueprint(login_bp, url_prefix="/main")
app.register_blueprint(search_gm_data_bp)

Binary file not shown.

101
routers/search_gm.py Normal file
View File

@ -0,0 +1,101 @@
from framework.main_framework import Response, main_function
from framework.requests_dataclass import SampleRequests
from dataclasses import dataclass
from flask import Blueprint
from flask import request
from framework.mysql_cli import MysqlCliV3
from framework.response_code import RequestMethods
import json as JSON
@dataclass
class OverrideSampleRequestParams(SampleRequests):
gameId: int
uid: int
def __init__(self):
if request.method == "GET":
super(OverrideSampleRequestParams, self).__init__(json_data=request.args, methods=RequestMethods.GET)
if request.method == "POST":
super(OverrideSampleRequestParams, self).__init__(json_data=request.get_data(), methods=RequestMethods.POST)
self.gameId = self.json_data.get("gameId")
self.uid = self.json_data.get("uid")
if not self.gameId or not self.uid:
self.make_exception(self.response_code_enum.RESPONSE_PARAMS_IS_NULL)
class OverrideMysqlCli(MysqlCliV3):
def __init__(self):
super().__init__()
self.table_name = "game_kf_configure_table" # default table
def search_game_configuration_table(self, game_id: int, uid: int, table: str | None = None) -> dict:
if table is not None:
self.table_name = table
sql = f"""
SELECT `base` , `uid` , `gameId`, `free`, `id`, `normalIcons`, `multiplier`
FROM {self.table_name} WHERE 1 = 1 AND `gameId` = {game_id} AND `uid` = {uid}
"""
mysql_result: dict = self.execute_sql(sql=sql, fetch_all=False, close_connect=False)
self.check_mysql_exception(mysql_result)
return mysql_result
def delete_game_configuration_table_with_read_once(self, m_id: int, table: str | None = None) -> None:
if table is not None:
self.table_name = table
sql = f"""
DELETE FROM {self.table_name} WHERE 1 = 1 AND `id` = {m_id}
"""
mysql_result: dict = self.execute_sql(sql=sql, fetch_all=False, close_connect=True, commit_all=True)
self.check_mysql_exception(mysql_result)
class OverriderMainResponse(Response):
def __init__(self):
super(OverriderMainResponse, self).__init__()
self.mysql_cli: OverrideMysqlCli = OverrideMysqlCli()
self.set_connect_mysql(mysql_cli=self.mysql_cli)
def core(self) -> None:
params: OverrideSampleRequestParams = OverrideSampleRequestParams()
mysql_result: dict = self.mysql_cli.search_game_configuration_table(game_id=params.gameId, uid=params.uid)
result: tuple = mysql_result.get("Result")
if result is None:
response_message: str = "code"
else:
base_message_from_mysql: str = result[0]
code_content: int = 1
content_id: int = result[4]
not_use_base_field_game_id: list[str] = ['3000503', '3000524']
if str(params.gameId) in not_use_base_field_game_id:
normal_icons_from_mysql: str = result[5]
normal_icons_content: list[str] = normal_icons_from_mysql.split(",")
normal_icons_content_to_int: list[int] = [int(i) for i in normal_icons_content]
multiplier: int = result[6]
response_message: str = JSON.dumps({
"code": code_content,
"data": {"normalIcons": normal_icons_content_to_int, "multiplier": multiplier}
})
else:
base_content: list[str] = base_message_from_mysql.split(",")
base_content_to_int: list[int] = [int(i) for i in base_content]
free_content: str = result[3]
response_message: str = JSON.dumps({
"code": code_content,
"data": {"base": base_content_to_int, "free": free_content}
})
# self.mysql_cli.delete_game_configuration_table_with_read_once(m_id=content_id)
self.set_response_data(data=response_message)
def before_response(self) -> dict:
return self.make_response(use_custom_context=True)
search_gm_data_bp = Blueprint("search_gm_data_bp", __name__)
@search_gm_data_bp.route("/test/result", methods=["GET"])
def main():
override_main_response = OverriderMainResponse()
return main_function(override_main_response)