""" 阿里云OSS工具类 """ import os import uuid from datetime import datetime from django.conf import settings try: import oss2 OSS_AVAILABLE = True except ImportError: OSS_AVAILABLE = False class OSSClient: """阿里云OSS客户端""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return if not OSS_AVAILABLE: self.bucket = None self._initialized = True return oss_config = settings.ALIYUN_OSS if not oss_config.get('ACCESS_KEY_ID'): self.bucket = None self._initialized = True return auth = oss2.Auth( oss_config['ACCESS_KEY_ID'], oss_config['ACCESS_KEY_SECRET'] ) self.bucket = oss2.Bucket( auth, oss_config['ENDPOINT'], oss_config['BUCKET_NAME'] ) self.custom_domain = oss_config.get('CUSTOM_DOMAIN', '') self._initialized = True def upload_file(self, file_obj, folder='uploads'): """ 上传文件 :param file_obj: 文件对象 :param folder: 存储文件夹 :return: 文件URL """ if not self.bucket: raise Exception('OSS未配置') # 生成唯一文件名 ext = os.path.splitext(file_obj.name)[1] if hasattr(file_obj, 'name') else '' filename = f"{datetime.now().strftime('%Y%m%d')}/{uuid.uuid4().hex}{ext}" key = f"{folder}/{filename}" # 上传文件 self.bucket.put_object(key, file_obj) # 返回URL if self.custom_domain: return f"https://{self.custom_domain}/{key}" return f"https://{settings.ALIYUN_OSS['BUCKET_NAME']}.{settings.ALIYUN_OSS['ENDPOINT']}/{key}" def delete_file(self, file_url): """ 删除文件 :param file_url: 文件URL """ if not self.bucket: return # 从URL提取key key = file_url.split('/')[-2] + '/' + file_url.split('/')[-1] self.bucket.delete_object(key) def get_oss_client(): """获取OSS客户端单例""" return OSSClient()