"""Volcano Engine Assets API client — uses volcengine SDK for AK/SK auth. All functions are synchronous and raise ``AssetsAPIError`` on API errors. """ import json import logging from django.conf import settings from volcengine.ApiInfo import ApiInfo from volcengine.base.Service import Service from volcengine.Credentials import Credentials from volcengine.ServiceInfo import ServiceInfo logger = logging.getLogger(__name__) SERVICE = 'ark' REGION = 'cn-beijing' API_VERSION = '2024-01-01' HOST = 'open.volcengineapi.com' PROJECT_NAME = 'int_dev_Airlabs' class AssetsAPIError(Exception): """Raised when the Assets API returns an error.""" def __init__(self, code, message, status_code=400): self.code = code self.api_message = message self.status_code = status_code super().__init__(f'[{code}] {message}') def _get_service(): """Build a volcengine Service instance with AK/SK credentials.""" ak = settings.TOS_ACCESS_KEY sk = settings.TOS_SECRET_KEY if not ak or not sk: raise AssetsAPIError('ConfigError', 'TOS_ACCESS_KEY / TOS_SECRET_KEY not configured') service_info = ServiceInfo( HOST, {'Accept': 'application/json', 'Content-Type': 'application/json'}, Credentials(ak, sk, SERVICE, REGION), 10, 30, ) api_info = { 'CreateAssetGroup': ApiInfo('POST', '/', {'Action': 'CreateAssetGroup', 'Version': API_VERSION}, {}, {}), 'CreateAsset': ApiInfo('POST', '/', {'Action': 'CreateAsset', 'Version': API_VERSION}, {}, {}), 'ListAssetGroups': ApiInfo('POST', '/', {'Action': 'ListAssetGroups', 'Version': API_VERSION}, {}, {}), 'ListAssets': ApiInfo('POST', '/', {'Action': 'ListAssets', 'Version': API_VERSION}, {}, {}), 'GetAsset': ApiInfo('POST', '/', {'Action': 'GetAsset', 'Version': API_VERSION}, {}, {}), 'GetAssetGroup': ApiInfo('POST', '/', {'Action': 'GetAssetGroup', 'Version': API_VERSION}, {}, {}), 'UpdateAssetGroup': ApiInfo('POST', '/', {'Action': 'UpdateAssetGroup', 'Version': API_VERSION}, {}, {}), 'UpdateAsset': ApiInfo('POST', '/', {'Action': 'UpdateAsset', 'Version': API_VERSION}, {}, {}), } return Service(service_info, api_info) def _do_request(action: str, body_dict: dict) -> dict: """Send a signed POST to the Assets API and return the Result dict.""" service = _get_service() body = json.dumps(body_dict, ensure_ascii=False) try: resp = service.json(action, {}, body) except Exception as e: # SDK raises Exception(resp.text.encode("utf-8")) on non-200; # str(e) becomes b'...' which isn't valid JSON. Decode it first. raw = e.args[0] if e.args else '' error_str = raw.decode('utf-8') if isinstance(raw, bytes) else str(raw) logger.warning('Assets API %s raw error: %s', action, error_str) try: error_data = json.loads(error_str) err_meta = error_data.get('ResponseMetadata', {}).get('Error', {}) if err_meta: raise AssetsAPIError(err_meta.get('Code', 'Unknown'), err_meta.get('Message', error_str)) err = error_data.get('error', {}) raise AssetsAPIError(err.get('code', 'Unknown'), err.get('message', error_str)) except (json.JSONDecodeError, AssetsAPIError): raise except Exception: pass raise AssetsAPIError('RequestError', error_str or 'Empty response from API') data = json.loads(resp) if isinstance(resp, str) else resp meta = data.get('ResponseMetadata', {}) error = meta.get('Error', {}) if error: raise AssetsAPIError( error.get('Code', 'Unknown'), error.get('Message', str(data)), ) return data.get('Result', {}) # ────────────────────────────────────────────── # Public helpers # ────────────────────────────────────────────── def create_asset_group(name: str, description: str = '', group_type: str = 'AIGC') -> str: """Create an asset group. Returns the remote group id.""" body = { 'Name': name, 'Description': description, 'GroupType': group_type, 'ProjectName': PROJECT_NAME, } result = _do_request('CreateAssetGroup', body) return result.get('Id', '') def create_asset(group_id: str, image_url: str, name: str = '', asset_type: str = 'Image') -> str: """Create an asset inside an existing group. Returns the remote asset id.""" body = { 'GroupId': group_id, 'URL': image_url, 'Name': name, 'AssetType': asset_type, 'ProjectName': PROJECT_NAME, } result = _do_request('CreateAsset', body) return result.get('Id', '') def list_asset_groups(page: int = 1, page_size: int = 20, name: str = None) -> tuple: """List asset groups. Returns (items_list, total_count).""" filter_dict = {'GroupType': 'AIGC'} if name: filter_dict['Name'] = name body = { 'Filter': filter_dict, 'PageNumber': page, 'PageSize': page_size, 'ProjectName': PROJECT_NAME, } result = _do_request('ListAssetGroups', body) return result.get('Items', []), result.get('TotalCount', 0) def list_assets(group_ids: list = None, status: str = None, name: str = None, page: int = 1, page_size: int = 20) -> tuple: """List assets with optional filters. Returns (items_list, total_count).""" filter_dict = {'GroupType': 'AIGC'} if group_ids: filter_dict['GroupIds'] = group_ids if status: filter_dict['Statuses'] = [status] if name: filter_dict['Name'] = name body = { 'Filter': filter_dict, 'PageNumber': page, 'PageSize': page_size, 'ProjectName': PROJECT_NAME, } result = _do_request('ListAssets', body) return result.get('Items', []), result.get('TotalCount', 0) def get_asset(asset_id: str) -> dict: """Get single asset details including processing status.""" body = {'Id': asset_id, 'ProjectName': PROJECT_NAME} return _do_request('GetAsset', body) def update_asset_group(group_id: str, name: str = None, description: str = None): """Update an asset group's name and/or description.""" body = {'Id': group_id, 'ProjectName': PROJECT_NAME} if name is not None: body['Name'] = name if description is not None: body['Description'] = description _do_request('UpdateAssetGroup', body) def update_asset(asset_id: str, name: str = None): """Update an asset's name.""" body = {'Id': asset_id, 'ProjectName': PROJECT_NAME} if name is not None: body['Name'] = name _do_request('UpdateAsset', body)