SD_WebTagManager/python/VolcEngine.py

78 lines
3.5 KiB
Python
Raw Normal View History

2026-01-13 16:51:39 +08:00
import volcenginesdkcore
import volcenginesdktranslate20250301
from volcenginesdkcore.rest import ApiException
from TranslateParent import TranslateParent
from concurrent.futures import ThreadPoolExecutor, as_completed
class VolcEngine(TranslateParent):
def __init__(self, ):
from main import server
self.access_id, self.access_key = server.tell_volc_api()
def _set_configure(self):
configuration = volcenginesdkcore.Configuration()
configuration.ak = self.access_id
configuration.sk = self.access_key
configuration.region = "cn-beijing"
volcenginesdkcore.Configuration.set_default(configuration)
@staticmethod
def _start_translate_threads(tag_list: list, target_language: str, api_instance) -> dict:
index = -1
if type(tag_list[0] == dict):
index = tag_list.pop(0)["index"]
translate_text_request = volcenginesdktranslate20250301.TranslateTextRequest(
source_language="en",
target_language=target_language,
text_list=tag_list,
)
try:
response = api_instance.translate_text(translate_text_request)
return {"thread_status": True, "thread_data": response.translation_list, "index": index}
except ApiException as e:
# 复制代码运行示例请自行打印API错误信息。
print("Exception when calling api: %s\n" % e)
return {"thread_status": False}
def translate_list(self, tag_list: list, target_language: str = "zh") -> dict | None:
self._set_configure()
api_instance = volcenginesdktranslate20250301.TRANSLATE20250301Api()
list_max = 15
if len(tag_list) <= list_max:
thread_result_dict = self._start_translate_threads(tag_list, target_language, api_instance)
if not thread_result_dict["thread_status"]:
return {"status": False}
translate_list = []
thread_result = thread_result_dict["thread_data"]
for item in thread_result:
translate_list.append(item.translation)
return {"status": True, "translate_list": translate_list}
else:
threads_result = []
chunks = [tag_list[i:i + list_max] for i in range(0, len(tag_list), list_max)]
open_threads_count = len(chunks)
index = 0
for item in chunks:
item.insert(0, {"index": index})
index += 1
with ThreadPoolExecutor(max_workers=min(open_threads_count, 5)) as executor:
futures = [executor.submit(self._start_translate_threads, thread_tag_list,
target_language, api_instance) for thread_tag_list in chunks]
for future in as_completed(futures):
try:
if not future.result().get("thread_status"):
return {"status": False}
threads_result.append(future.result())
except Exception as e:
print(e)
return None
translate_list = []
for index in range(len(threads_result)):
for item in threads_result:
if item["index"] == index:
thread_result = item["thread_data"]
for translate_data in thread_result:
translate_list.append(translate_data.translation)
return {"status": True, "translate_list": translate_list}