lty/qy_lty/card/storage.py
2026-03-17 13:17:02 +08:00

151 lines
5.1 KiB
Python

from django.conf import settings
from django.core.files.storage import Storage
from django.utils.deconstruct import deconstructible
import oss2
import os
from urllib.parse import urljoin
import logging
logger = logging.getLogger(__name__)
@deconstructible
class OSSStorage(Storage):
"""
Alibaba Cloud OSS storage backend for Django
"""
def __init__(self, bucket_name=None, endpoint=None, access_key_id=None, access_key_secret=None, base_dir=''):
self.bucket_name = bucket_name or settings.AUDIO_SERVICE_CONFIG['aliyun']['oss_bucket']
self.endpoint = endpoint or settings.AUDIO_SERVICE_CONFIG['aliyun']['oss_endpoint']
self.access_key_id = access_key_id or settings.AUDIO_SERVICE_CONFIG['aliyun']['oss_key_id']
self.access_key_secret = access_key_secret or settings.AUDIO_SERVICE_CONFIG['aliyun']['oss_key_secret']
self.base_dir = base_dir.rstrip('/')
self.auth = oss2.Auth(self.access_key_id, self.access_key_secret)
self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
self.host = settings.AUDIO_SERVICE_CONFIG['aliyun']['oss_host']
def _get_key(self, name):
"""
Get the full path (key) in OSS
"""
if self.base_dir:
return f"{self.base_dir}/{name}"
return name
def _open(self, name, mode='rb'):
"""
Open a file from OSS
"""
logger.debug(f"Opening file {name} from OSS")
key = self._get_key(name)
try:
content = self.bucket.get_object(key)
content.name = name
content.mode = mode
return content
except oss2.exceptions.NoSuchKey:
logger.error(f"File {key} not found in OSS")
raise FileNotFoundError(f"File {name} does not exist")
def _save(self, name, content):
"""
Save a file to OSS
"""
logger.debug(f"Saving file {name} to OSS")
key = self._get_key(name)
# If the file has a file attribute, use that
if hasattr(content, 'file'):
content = content.file
# Convert to bytes if necessary
if hasattr(content, 'read'):
content_str = content.read()
if isinstance(content_str, str):
content_bytes = content_str.encode('utf-8')
else:
content_bytes = content_str
self.bucket.put_object(key, content_bytes)
else:
self.bucket.put_object(key, content)
return name
def exists(self, name):
"""
Check if a file exists in OSS
"""
key = self._get_key(name)
try:
self.bucket.get_object_meta(key)
return True
except oss2.exceptions.NoSuchKey:
return False
def delete(self, name):
"""
Delete a file from OSS
"""
key = self._get_key(name)
try:
self.bucket.delete_object(key)
except oss2.exceptions.NoSuchKey:
pass
def url(self, name):
"""
Get the URL of a file
"""
key = self._get_key(name)
return urljoin(self.host, key)
def size(self, name):
"""
Get the size of a file
"""
key = self._get_key(name)
try:
obj = self.bucket.get_object_meta(key)
return obj.content_length
except oss2.exceptions.NoSuchKey:
raise FileNotFoundError(f"File {name} does not exist")
def get_modified_time(self, name):
"""
Get the last modified time of a file
"""
from datetime import datetime
key = self._get_key(name)
try:
obj = self.bucket.get_object_meta(key)
return datetime.fromtimestamp(obj.last_modified)
except oss2.exceptions.NoSuchKey:
raise FileNotFoundError(f"File {name} does not exist")
def get_available_name(self, name, max_length=None):
"""
Return a filename that's free on the target storage system
"""
# If the filename already exists, add an underscore and a number until the filename doesn't exist
name = self._get_valid_name(name)
if self.exists(name):
dir_name, file_name = os.path.split(name)
file_root, file_ext = os.path.splitext(file_name)
count = 1
while self.exists(name):
# file.jpg -> file_1.jpg, file_2.jpg, etc.
name = os.path.join(dir_name, f"{file_root}_{count}{file_ext}")
count += 1
return name
def _get_valid_name(self, name):
"""
Return a filename that's suitable for use with the storage system
"""
# Replace spaces with underscores and remove any special characters
import re
s = name.replace(' ', '_')
s = re.sub(r'[^\w\-\.]', '', s)
return s