import pymysql import copy from framework.server_config import ServerConfig from framework.response_code import MinCustomException, ResponseCode from abc import ABC class MysqlCliV3(ServerConfig): def __init__(self): super().__init__() self._sql_response = { "Status": False, "Result": None, "Execute_sql": None, "Exception_error": None, "Exception_code": 0 } self._initialize_connection() def _initialize_connection(self): if not hasattr(self, "_connect"): mysql_config = self.return_mysql_config() self._connect = pymysql.connect( host=mysql_config["host"], user=mysql_config["user"], password=mysql_config["password"], database=mysql_config["database"], port=mysql_config["port"] ) self._cursor = self._connect.cursor() def close_connect(self): try: self._cursor.close() self._connect.close() except pymysql.err.Error as e: print(f"pymysql close: {e}") def execute_sql(self, sql: str, fetch_all: bool = False, commit_all: bool = False, close_connect: bool = True) -> dict: sql_response = copy.deepcopy(self._sql_response) try: self._cursor.execute(sql) result = self._cursor.fetchall() if fetch_all else self._cursor.fetchone() if commit_all: self._connect.commit() sql_response.update({ "Status": True, "Result": result, "Execute_sql": sql }) except pymysql.err.IntegrityError as e: sql_response.update({ "Result": None, "Exception_error": e, "Exception_code": 1062, "Execute_sql": sql }) self._connect.rollback() self._print_result() self.close_connect() # raise except Exception as e: sql_response.update({ "Result": None, "Exception_error": e, "Exception_code": 502, "Execute_sql": sql }) self._connect.rollback() self._print_result() self.close_connect() raise if close_connect: self.close_connect() return sql_response def _print_result(self): self.print_result(self._sql_response) @staticmethod def print_result(mysql_result: dict): print("-" * 50) print("Database Operation Result:") for key, value in mysql_result.items(): print(f"{key}: {value}") def check_mysql_exception(self, mysql_result: dict, print_result: bool = False): if not mysql_result.get("Status"): if print_result: self.print_result(mysql_result) raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_EXCEPTION)