import volcenginesdkcore import volcenginesdktranslate20250301 from volcenginesdkcore.rest import ApiException from TranslateParent import TranslateParent from concurrent.futures import ThreadPoolExecutor, as_completed class VolcEngine(TranslateParent): def __init__(self, ): from main import server self.access_id, self.access_key = server.tell_volc_api() def _set_configure(self): configuration = volcenginesdkcore.Configuration() configuration.ak = self.access_id configuration.sk = self.access_key configuration.region = "cn-beijing" volcenginesdkcore.Configuration.set_default(configuration) @staticmethod def _start_translate_threads(tag_list: list, target_language: str, api_instance) -> dict: index = -1 if isinstance(tag_list[0], dict): index = tag_list.pop(0)["index"] translate_text_request = volcenginesdktranslate20250301.TranslateTextRequest( source_language="en", target_language=target_language, text_list=tag_list, ) try: response = api_instance.translate_text(translate_text_request) if index == -1: return {"thread_status": True, "thread_data": response.translation_list} return {"thread_status": True, "thread_data": response.translation_list, "index": index} except ApiException as e: # 复制代码运行示例,请自行打印API错误信息。 print("Exception when calling api: %s\n" % e) return {"thread_status": False} def translate_list(self, tag_list: list, target_language: str = "zh") -> dict | None: self._set_configure() api_instance = volcenginesdktranslate20250301.TRANSLATE20250301Api() list_max = 15 if len(tag_list) <= list_max: thread_result_dict = self._start_translate_threads(tag_list, target_language, api_instance) if not thread_result_dict["thread_status"]: return {"status": False} translate_list = [] thread_result = thread_result_dict["thread_data"] for item in thread_result: translate_list.append(item.translation) return {"status": True, "translate_list": translate_list} else: threads_result = [] chunks = [tag_list[i:i + list_max] for i in range(0, len(tag_list), list_max)] open_threads_count = len(chunks) index = 0 for item in chunks: item.insert(0, {"index": index}) index += 1 with ThreadPoolExecutor(max_workers=min(open_threads_count, 5)) as executor: futures = [executor.submit(self._start_translate_threads, thread_tag_list, target_language, api_instance) for thread_tag_list in chunks] for future in as_completed(futures): try: if not future.result().get("thread_status"): return {"status": False} threads_result.append(future.result()) except Exception as e: print(e) return None translate_list = [] for index in range(len(threads_result)): for item in threads_result: if item["index"] == index: thread_result = item["thread_data"] for translate_data in thread_result: translate_list.append(translate_data.translation) return {"status": True, "translate_list": translate_list}