All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m27s
AssetsAPIError 加 user_message,按 code/关键词映射中文提示,用户不再看到英文错误 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
221 lines
8.1 KiB
Python
221 lines
8.1 KiB
Python
"""Volcano Engine Assets API client — uses volcengine SDK for AK/SK auth.
|
|
|
|
All functions are synchronous and raise ``AssetsAPIError`` on API errors.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
|
|
from django.conf import settings
|
|
from volcengine.ApiInfo import ApiInfo
|
|
from volcengine.base.Service import Service
|
|
from volcengine.Credentials import Credentials
|
|
from volcengine.ServiceInfo import ServiceInfo
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SERVICE = 'ark'
|
|
REGION = 'cn-beijing'
|
|
API_VERSION = '2024-01-01'
|
|
HOST = 'open.volcengineapi.com'
|
|
PROJECT_NAME = 'int_dev_Airlabs'
|
|
|
|
|
|
_ASSETS_ERROR_MESSAGES = {
|
|
'ConfigError': '素材服务未配置,请联系管理员',
|
|
'RequestError': '素材服务暂时不可用,请稍后重试',
|
|
'InvalidParameter': '素材参数无效,请检查输入',
|
|
'NotFound': '素材不存在或已被删除',
|
|
'NotExist': '素材不存在或已被删除',
|
|
'InternalError': '素材服务异常,请稍后重试',
|
|
'Forbidden': '没有权限操作该素材',
|
|
'RateLimitExceeded': '操作过于频繁,请稍后重试',
|
|
}
|
|
|
|
_ASSETS_MESSAGE_KEYWORDS = {
|
|
'dimension': '图片尺寸不符合要求(宽高需在 300~6000 像素之间)',
|
|
'size': '文件大小超出限制',
|
|
'format': '不支持的文件格式',
|
|
'not found': '素材不存在或已被删除',
|
|
'permission': '没有权限操作该素材',
|
|
}
|
|
|
|
|
|
class AssetsAPIError(Exception):
|
|
"""Raised when the Assets API returns an error."""
|
|
def __init__(self, code, message, status_code=400):
|
|
self.code = code
|
|
self.api_message = message
|
|
self.status_code = status_code
|
|
# 中文友好提示
|
|
friendly = _ASSETS_ERROR_MESSAGES.get(code)
|
|
if not friendly:
|
|
msg_lower = (message or '').lower()
|
|
for keyword, hint in _ASSETS_MESSAGE_KEYWORDS.items():
|
|
if keyword in msg_lower:
|
|
friendly = hint
|
|
break
|
|
self.user_message = friendly or '素材操作失败,请稍后重试'
|
|
super().__init__(f'[{code}] {message}')
|
|
|
|
|
|
def _get_service():
|
|
"""Build a volcengine Service instance with AK/SK credentials."""
|
|
ak = settings.TOS_ACCESS_KEY
|
|
sk = settings.TOS_SECRET_KEY
|
|
if not ak or not sk:
|
|
raise AssetsAPIError('ConfigError', 'TOS_ACCESS_KEY / TOS_SECRET_KEY not configured')
|
|
|
|
service_info = ServiceInfo(
|
|
HOST,
|
|
{'Accept': 'application/json', 'Content-Type': 'application/json'},
|
|
Credentials(ak, sk, SERVICE, REGION),
|
|
10, 30,
|
|
)
|
|
|
|
api_info = {
|
|
'CreateAssetGroup': ApiInfo('POST', '/', {'Action': 'CreateAssetGroup', 'Version': API_VERSION}, {}, {}),
|
|
'CreateAsset': ApiInfo('POST', '/', {'Action': 'CreateAsset', 'Version': API_VERSION}, {}, {}),
|
|
'ListAssetGroups': ApiInfo('POST', '/', {'Action': 'ListAssetGroups', 'Version': API_VERSION}, {}, {}),
|
|
'ListAssets': ApiInfo('POST', '/', {'Action': 'ListAssets', 'Version': API_VERSION}, {}, {}),
|
|
'GetAsset': ApiInfo('POST', '/', {'Action': 'GetAsset', 'Version': API_VERSION}, {}, {}),
|
|
'GetAssetGroup': ApiInfo('POST', '/', {'Action': 'GetAssetGroup', 'Version': API_VERSION}, {}, {}),
|
|
'UpdateAssetGroup': ApiInfo('POST', '/', {'Action': 'UpdateAssetGroup', 'Version': API_VERSION}, {}, {}),
|
|
'UpdateAsset': ApiInfo('POST', '/', {'Action': 'UpdateAsset', 'Version': API_VERSION}, {}, {}),
|
|
}
|
|
|
|
return Service(service_info, api_info)
|
|
|
|
|
|
def _do_request(action: str, body_dict: dict) -> dict:
|
|
"""Send a signed POST to the Assets API and return the Result dict."""
|
|
service = _get_service()
|
|
body = json.dumps(body_dict, ensure_ascii=False)
|
|
|
|
try:
|
|
resp = service.json(action, {}, body)
|
|
except Exception as e:
|
|
# SDK raises Exception(resp.text.encode("utf-8")) on non-200;
|
|
# str(e) becomes b'...' which isn't valid JSON. Decode it first.
|
|
raw = e.args[0] if e.args else ''
|
|
error_str = raw.decode('utf-8') if isinstance(raw, bytes) else str(raw)
|
|
logger.warning('Assets API %s raw error: %s', action, error_str)
|
|
try:
|
|
error_data = json.loads(error_str)
|
|
err_meta = error_data.get('ResponseMetadata', {}).get('Error', {})
|
|
if err_meta:
|
|
raise AssetsAPIError(err_meta.get('Code', 'Unknown'), err_meta.get('Message', error_str))
|
|
err = error_data.get('error', {})
|
|
raise AssetsAPIError(err.get('code', 'Unknown'), err.get('message', error_str))
|
|
except (json.JSONDecodeError, AssetsAPIError):
|
|
raise
|
|
except Exception:
|
|
pass
|
|
raise AssetsAPIError('RequestError', error_str or 'Empty response from API')
|
|
|
|
data = json.loads(resp) if isinstance(resp, str) else resp
|
|
|
|
meta = data.get('ResponseMetadata', {})
|
|
error = meta.get('Error', {})
|
|
if error:
|
|
raise AssetsAPIError(
|
|
error.get('Code', 'Unknown'),
|
|
error.get('Message', str(data)),
|
|
)
|
|
|
|
return data.get('Result', {})
|
|
|
|
|
|
# ──────────────────────────────────────────────
|
|
# Public helpers
|
|
# ──────────────────────────────────────────────
|
|
|
|
def create_asset_group(name: str, description: str = '', group_type: str = 'AIGC') -> str:
|
|
"""Create an asset group. Returns the remote group id."""
|
|
body = {
|
|
'Name': name,
|
|
'Description': description,
|
|
'GroupType': group_type,
|
|
'ProjectName': PROJECT_NAME,
|
|
}
|
|
result = _do_request('CreateAssetGroup', body)
|
|
return result.get('Id', '')
|
|
|
|
|
|
def create_asset(group_id: str, image_url: str, name: str = '', asset_type: str = 'Image') -> str:
|
|
"""Create an asset inside an existing group. Returns the remote asset id."""
|
|
body = {
|
|
'GroupId': group_id,
|
|
'URL': image_url,
|
|
'Name': name,
|
|
'AssetType': asset_type,
|
|
'ProjectName': PROJECT_NAME,
|
|
}
|
|
result = _do_request('CreateAsset', body)
|
|
return result.get('Id', '')
|
|
|
|
|
|
def list_asset_groups(page: int = 1, page_size: int = 20, name: str = None) -> tuple:
|
|
"""List asset groups. Returns (items_list, total_count)."""
|
|
filter_dict = {'GroupType': 'AIGC'}
|
|
if name:
|
|
filter_dict['Name'] = name
|
|
body = {
|
|
'Filter': filter_dict,
|
|
'PageNumber': page,
|
|
'PageSize': page_size,
|
|
'ProjectName': PROJECT_NAME,
|
|
}
|
|
result = _do_request('ListAssetGroups', body)
|
|
return result.get('Items', []), result.get('TotalCount', 0)
|
|
|
|
|
|
def list_assets(group_ids: list = None, status: str = None,
|
|
name: str = None, page: int = 1, page_size: int = 20) -> tuple:
|
|
"""List assets with optional filters. Returns (items_list, total_count)."""
|
|
filter_dict = {'GroupType': 'AIGC'}
|
|
if group_ids:
|
|
filter_dict['GroupIds'] = group_ids
|
|
if status:
|
|
filter_dict['Statuses'] = [status]
|
|
if name:
|
|
filter_dict['Name'] = name
|
|
body = {
|
|
'Filter': filter_dict,
|
|
'PageNumber': page,
|
|
'PageSize': page_size,
|
|
'ProjectName': PROJECT_NAME,
|
|
}
|
|
result = _do_request('ListAssets', body)
|
|
return result.get('Items', []), result.get('TotalCount', 0)
|
|
|
|
|
|
def get_asset(asset_id: str) -> dict:
|
|
"""Get single asset details including processing status."""
|
|
body = {'Id': asset_id, 'ProjectName': PROJECT_NAME}
|
|
return _do_request('GetAsset', body)
|
|
|
|
|
|
def get_asset_group(group_id: str) -> dict:
|
|
"""Get single asset group details."""
|
|
body = {'Id': group_id, 'ProjectName': PROJECT_NAME}
|
|
return _do_request('GetAssetGroup', body)
|
|
|
|
|
|
def update_asset_group(group_id: str, name: str = None, description: str = None):
|
|
"""Update an asset group's name and/or description."""
|
|
body = {'Id': group_id, 'ProjectName': PROJECT_NAME}
|
|
if name is not None:
|
|
body['Name'] = name
|
|
if description is not None:
|
|
body['Description'] = description
|
|
_do_request('UpdateAssetGroup', body)
|
|
|
|
|
|
def update_asset(asset_id: str, name: str = None):
|
|
"""Update an asset's name."""
|
|
body = {'Id': asset_id, 'ProjectName': PROJECT_NAME}
|
|
if name is not None:
|
|
body['Name'] = name
|
|
_do_request('UpdateAsset', body)
|