137 lines
4.8 KiB
Python
137 lines
4.8 KiB
Python
|
|
import pymysql
|
||
|
|
from framework.server_config import ServerConfig
|
||
|
|
import copy
|
||
|
|
from framework.open_response import MinCustomException
|
||
|
|
from framework.response_code import ResponseCode
|
||
|
|
|
||
|
|
|
||
|
|
class MysqlCli(ServerConfig):
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__()
|
||
|
|
self._sql = None
|
||
|
|
# 连接
|
||
|
|
self._connect = self._connect_mysql()
|
||
|
|
# 游标
|
||
|
|
self._cursor = self._connect.cursor()
|
||
|
|
# response message
|
||
|
|
self._sql_response: dict = {"Status": False,
|
||
|
|
"Result": None,
|
||
|
|
"Execute_sql": None,
|
||
|
|
"Exception_error": None}
|
||
|
|
|
||
|
|
def _connect_mysql(self, need_character: bool = False):
|
||
|
|
if need_character:
|
||
|
|
return pymysql.connect(host=self.mysql_host,
|
||
|
|
user=self.mysql_user,
|
||
|
|
password=self.mysql_password,
|
||
|
|
database=self.mysql_database,
|
||
|
|
charset=self.mysql_character,
|
||
|
|
port=self.mysql_port)
|
||
|
|
else:
|
||
|
|
return pymysql.connect(host=self.mysql_host,
|
||
|
|
user=self.mysql_user,
|
||
|
|
password=self.mysql_password,
|
||
|
|
database=self.mysql_database,
|
||
|
|
port=self.mysql_port)
|
||
|
|
|
||
|
|
def commit_all(self, debug: bool = False, please_commit: bool = True) -> dict:
|
||
|
|
_result = copy.deepcopy(self._sql_response)
|
||
|
|
if not self._sql:
|
||
|
|
_result.update({"result": "Execute_SQL Is Null! "
|
||
|
|
"\nPlease Run 'self.create_sql(sql:str)'",
|
||
|
|
"Execute_sql": self._sql})
|
||
|
|
self.call_print_result(_result)
|
||
|
|
return _result
|
||
|
|
|
||
|
|
try:
|
||
|
|
self._cursor.execute(self._sql)
|
||
|
|
if please_commit:
|
||
|
|
self._connect.commit()
|
||
|
|
_result.update({"Status": True,
|
||
|
|
"Result": "Commit Success!"})
|
||
|
|
|
||
|
|
except Exception as error:
|
||
|
|
self._connect.rollback()
|
||
|
|
_result.update({"Result": "Commit Exception! RollBack",
|
||
|
|
"Exception_error": error})
|
||
|
|
self.call_print_result(_result)
|
||
|
|
|
||
|
|
finally:
|
||
|
|
self.flush_sql()
|
||
|
|
return _result
|
||
|
|
|
||
|
|
def fetchone(self, debug: bool = False, fetchall: bool = False) -> dict:
|
||
|
|
_result = copy.deepcopy(self._sql_response)
|
||
|
|
if not self._sql:
|
||
|
|
_result.update({"Result": "Execute_SQL Is Null! "
|
||
|
|
"\nPlease Run 'self.create_sql(sql:str)'"})
|
||
|
|
self.call_print_result(_result)
|
||
|
|
return _result
|
||
|
|
_result.update({"Execute_sql": self._sql})
|
||
|
|
try:
|
||
|
|
self._cursor.execute(self._sql)
|
||
|
|
|
||
|
|
if not fetchall:
|
||
|
|
_result.update({"Result": self._cursor.fetchone(),
|
||
|
|
"Status": True})
|
||
|
|
else:
|
||
|
|
_result.update({"Result": self._cursor.fetchall(),
|
||
|
|
"Status": True})
|
||
|
|
except Exception as error:
|
||
|
|
_result.update({"Exception_error": error})
|
||
|
|
self.call_print_result(_result)
|
||
|
|
finally:
|
||
|
|
self.flush_sql()
|
||
|
|
return _result
|
||
|
|
|
||
|
|
def flush_sql(self):
|
||
|
|
self._sql = None
|
||
|
|
|
||
|
|
def create_sql(self, sql: str):
|
||
|
|
self._sql = sql
|
||
|
|
|
||
|
|
def ping_connect(self):
|
||
|
|
self._connect.ping(True)
|
||
|
|
|
||
|
|
def close_connect(self):
|
||
|
|
if self._cursor:
|
||
|
|
self._cursor.close()
|
||
|
|
if self._connect:
|
||
|
|
self._connect.close()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def call_result(result):
|
||
|
|
print(result)
|
||
|
|
return result["Result"]
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def call_status(result):
|
||
|
|
return result["Status"]
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def call_execute_sql(result):
|
||
|
|
return result["Execute_sql"]
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def call_exception_error(result):
|
||
|
|
return result["Exception_error"]
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def call_print_result(result):
|
||
|
|
print("-" * 50)
|
||
|
|
print("Database Error:")
|
||
|
|
for key, value in result.items():
|
||
|
|
print(f"{key}: {value}")
|
||
|
|
|
||
|
|
def check_mysql_exception(self, mysql_result: dict, print_result: bool = False):
|
||
|
|
if not self.call_status(mysql_result):
|
||
|
|
if print_result:
|
||
|
|
self.call_print_result(mysql_result)
|
||
|
|
raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_EXCEPTION)
|
||
|
|
|
||
|
|
def check_mysql_response_null(self, mysql_result: dict, print_result: bool = False):
|
||
|
|
if not self.call_result(mysql_result):
|
||
|
|
if print_result:
|
||
|
|
self.call_print_result(mysql_result)
|
||
|
|
raise MinCustomException(ResponseCode.RESPONSE_501_NOT_IMPLEMENTED_NULL)
|