m-gm/framework/mysql_cli.py

93 lines
3.0 KiB
Python
Raw Permalink Normal View History

2025-03-19 18:02:29 +08:00
import pymysql
import copy
from framework.server_config import ServerConfig
from framework.response_code import MinCustomException, ResponseCode
from abc import ABC
class MysqlCliV3(ServerConfig):
def __init__(self):
super().__init__()
self._sql_response = {
"Status": False,
"Result": None,
"Execute_sql": None,
"Exception_error": None,
"Exception_code": 0
}
self._initialize_connection()
def _initialize_connection(self):
if not hasattr(self, "_connect"):
mysql_config = self.return_mysql_config()
self._connect = pymysql.connect(
host=mysql_config["host"],
user=mysql_config["user"],
password=mysql_config["password"],
database=mysql_config["database"],
port=mysql_config["port"]
)
self._cursor = self._connect.cursor()
def close_connect(self):
try:
self._cursor.close()
self._connect.close()
except pymysql.err.Error as e:
print(f"pymysql close: {e}")
def execute_sql(self, sql: str, fetch_all: bool = False, commit_all: bool = False,
close_connect: bool = True) -> dict:
sql_response = copy.deepcopy(self._sql_response)
try:
self._cursor.execute(sql)
result = self._cursor.fetchall() if fetch_all else self._cursor.fetchone()
if commit_all:
self._connect.commit()
sql_response.update({
"Status": True,
"Result": result,
"Execute_sql": sql
})
except pymysql.err.IntegrityError as e:
sql_response.update({
"Result": None,
"Exception_error": e,
"Exception_code": 1062,
"Execute_sql": sql
})
self._connect.rollback()
self._print_result()
self.close_connect()
# raise
except Exception as e:
sql_response.update({
"Result": None,
"Exception_error": e,
"Exception_code": 502,
"Execute_sql": sql
})
self._connect.rollback()
self._print_result()
self.close_connect()
raise
if close_connect:
self.close_connect()
return sql_response
def _print_result(self):
self.print_result(self._sql_response)
@staticmethod
def print_result(mysql_result: dict):
print("-" * 50)
print("Database Operation Result:")
for key, value in mysql_result.items():
print(f"{key}: {value}")
def check_mysql_exception(self, mysql_result: dict, print_result: bool = False):
if not mysql_result.get("Status"):
if print_result:
self.print_result(mysql_result)
raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_EXCEPTION)