import pymysql from framework.server_config import ServerConfig import copy from framework.open_response import MinCustomException from framework.response_code import ResponseCode class MysqlCli(ServerConfig): def __init__(self): super().__init__() self._sql = None # 连接 self._connect = self._connect_mysql() # 游标 self._cursor = self._connect.cursor() # response message self._sql_response: dict = {"Status": False, "Result": None, "Execute_sql": None, "Exception_error": None} def _connect_mysql(self, need_character: bool = False): if need_character: return pymysql.connect(host=self.mysql_host, user=self.mysql_user, password=self.mysql_password, database=self.mysql_database, charset=self.mysql_character, port=self.mysql_port) else: return pymysql.connect(host=self.mysql_host, user=self.mysql_user, password=self.mysql_password, database=self.mysql_database, port=self.mysql_port) def commit_all(self, debug: bool = False, please_commit: bool = True) -> dict: _result = copy.deepcopy(self._sql_response) if not self._sql: _result.update({"result": "Execute_SQL Is Null! " "\nPlease Run 'self.create_sql(sql:str)'", "Execute_sql": self._sql}) self.call_print_result(_result) return _result try: self._cursor.execute(self._sql) if please_commit: self._connect.commit() _result.update({"Status": True, "Result": "Commit Success!"}) except Exception as error: self._connect.rollback() _result.update({"Result": "Commit Exception! RollBack", "Exception_error": error}) self.call_print_result(_result) finally: self.flush_sql() return _result def fetchone(self, debug: bool = False, fetchall: bool = False) -> dict: _result = copy.deepcopy(self._sql_response) if not self._sql: _result.update({"Result": "Execute_SQL Is Null! " "\nPlease Run 'self.create_sql(sql:str)'"}) self.call_print_result(_result) return _result _result.update({"Execute_sql": self._sql}) try: self._cursor.execute(self._sql) if not fetchall: _result.update({"Result": self._cursor.fetchone(), "Status": True}) else: _result.update({"Result": self._cursor.fetchall(), "Status": True}) except Exception as error: _result.update({"Exception_error": error}) self.call_print_result(_result) finally: self.flush_sql() return _result def flush_sql(self): self._sql = None def create_sql(self, sql: str): self._sql = sql def ping_connect(self): self._connect.ping(True) def close_connect(self): if self._cursor: self._cursor.close() if self._connect: self._connect.close() @staticmethod def call_result(result): print(result) return result["Result"] @staticmethod def call_status(result): return result["Status"] @staticmethod def call_execute_sql(result): return result["Execute_sql"] @staticmethod def call_exception_error(result): return result["Exception_error"] @staticmethod def call_print_result(result): print("-" * 50) print("Database Error:") for key, value in result.items(): print(f"{key}: {value}") def check_mysql_exception(self, mysql_result: dict, print_result: bool = False): if not self.call_status(mysql_result): if print_result: self.call_print_result(mysql_result) raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_EXCEPTION) def check_mysql_response_null(self, mysql_result: dict, print_result: bool = False): if not self.call_result(mysql_result): if print_result: self.call_print_result(mysql_result) raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_NULL)