All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m17s
49 lines
1.3 KiB
Python
49 lines
1.3 KiB
Python
"""Volcano Engine TOS file upload utility using official TOS SDK."""
|
|
|
|
import uuid
|
|
import tos
|
|
from django.conf import settings
|
|
|
|
|
|
CONTENT_TYPE_MAP = {
|
|
'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png',
|
|
'webp': 'image/webp', 'gif': 'image/gif', 'bmp': 'image/bmp',
|
|
'tiff': 'image/tiff',
|
|
'mp4': 'video/mp4', 'mov': 'video/quicktime',
|
|
'mp3': 'audio/mpeg', 'wav': 'audio/wav',
|
|
}
|
|
|
|
_client = None
|
|
|
|
|
|
def get_tos_client():
|
|
global _client
|
|
if _client is None:
|
|
endpoint = settings.TOS_ENDPOINT.replace('https://', '').replace('http://', '')
|
|
_client = tos.TosClientV2(
|
|
ak=settings.TOS_ACCESS_KEY,
|
|
sk=settings.TOS_SECRET_KEY,
|
|
endpoint=endpoint,
|
|
region=settings.TOS_REGION,
|
|
)
|
|
return _client
|
|
|
|
|
|
def upload_file(file_obj, folder='uploads'):
|
|
"""Upload a file to TOS bucket, return its public URL."""
|
|
ext = file_obj.name.rsplit('.', 1)[-1].lower()
|
|
key = f'{folder}/{uuid.uuid4().hex}.{ext}'
|
|
content_type = CONTENT_TYPE_MAP.get(ext, 'application/octet-stream')
|
|
|
|
client = get_tos_client()
|
|
content = file_obj.read()
|
|
|
|
client.put_object(
|
|
bucket=settings.TOS_BUCKET,
|
|
key=key,
|
|
content=content,
|
|
content_type=content_type,
|
|
)
|
|
|
|
return f'{settings.TOS_CDN_DOMAIN}/{key}'
|