m-api-server/framework/m_mysql_cli.py

137 lines
4.8 KiB
Python
Raw Permalink Normal View History

2025-03-19 18:00:58 +08:00
import pymysql
from framework.server_config import ServerConfig
import copy
from framework.open_response import MinCustomException
from framework.response_code import ResponseCode
class MysqlCli(ServerConfig):
def __init__(self):
super().__init__()
self._sql = None
# 连接
self._connect = self._connect_mysql()
# 游标
self._cursor = self._connect.cursor()
# response message
self._sql_response: dict = {"Status": False,
"Result": None,
"Execute_sql": None,
"Exception_error": None}
def _connect_mysql(self, need_character: bool = False):
if need_character:
return pymysql.connect(host=self.mysql_host,
user=self.mysql_user,
password=self.mysql_password,
database=self.mysql_database,
charset=self.mysql_character,
port=self.mysql_port)
else:
return pymysql.connect(host=self.mysql_host,
user=self.mysql_user,
password=self.mysql_password,
database=self.mysql_database,
port=self.mysql_port)
def commit_all(self, debug: bool = False, please_commit: bool = True) -> dict:
_result = copy.deepcopy(self._sql_response)
if not self._sql:
_result.update({"result": "Execute_SQL Is Null! "
"\nPlease Run 'self.create_sql(sql:str)'",
"Execute_sql": self._sql})
self.call_print_result(_result)
return _result
try:
self._cursor.execute(self._sql)
if please_commit:
self._connect.commit()
_result.update({"Status": True,
"Result": "Commit Success!"})
except Exception as error:
self._connect.rollback()
_result.update({"Result": "Commit Exception! RollBack",
"Exception_error": error})
self.call_print_result(_result)
finally:
self.flush_sql()
return _result
def fetchone(self, debug: bool = False, fetchall: bool = False) -> dict:
_result = copy.deepcopy(self._sql_response)
if not self._sql:
_result.update({"Result": "Execute_SQL Is Null! "
"\nPlease Run 'self.create_sql(sql:str)'"})
self.call_print_result(_result)
return _result
_result.update({"Execute_sql": self._sql})
try:
self._cursor.execute(self._sql)
if not fetchall:
_result.update({"Result": self._cursor.fetchone(),
"Status": True})
else:
_result.update({"Result": self._cursor.fetchall(),
"Status": True})
except Exception as error:
_result.update({"Exception_error": error})
self.call_print_result(_result)
finally:
self.flush_sql()
return _result
def flush_sql(self):
self._sql = None
def create_sql(self, sql: str):
self._sql = sql
def ping_connect(self):
self._connect.ping(True)
def close_connect(self):
if self._cursor:
self._cursor.close()
if self._connect:
self._connect.close()
@staticmethod
def call_result(result):
print(result)
return result["Result"]
@staticmethod
def call_status(result):
return result["Status"]
@staticmethod
def call_execute_sql(result):
return result["Execute_sql"]
@staticmethod
def call_exception_error(result):
return result["Exception_error"]
@staticmethod
def call_print_result(result):
print("-" * 50)
print("Database Error:")
for key, value in result.items():
print(f"{key}: {value}")
def check_mysql_exception(self, mysql_result: dict, print_result: bool = False):
if not self.call_status(mysql_result):
if print_result:
self.call_print_result(mysql_result)
raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_EXCEPTION)
def check_mysql_response_null(self, mysql_result: dict, print_result: bool = False):
if not self.call_result(mysql_result):
if print_result:
self.call_print_result(mysql_result)
raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_NULL)