"""IP 归属地解析 — 阿里云市场在线 API + ip2region 离线库 + 熔断降级。""" import ipaddress import logging import re import time import requests from django.conf import settings logger = logging.getLogger(__name__) # ── 熔断状态 ── _circuit_open_until = 0 # 在线 API 失败后,此时间戳之前直接走离线 _CIRCUIT_COOLDOWN = 60 # 熔断冷却 60 秒 # ── ip2region 搜索器缓存 ── _ip2region_searcher = None def _is_private_ip(ip: str) -> bool: """判断是否为私有/本地 IP。""" try: addr = ipaddress.ip_address(ip) return addr.is_private or addr.is_loopback or addr.is_reserved except (ValueError, TypeError): return True def _normalize_city(name: str) -> str: """标准化城市名:去掉「市」后缀。""" if name and name.endswith('市'): return name[:-1] return name def _normalize_province(name: str) -> str: """标准化省份名:去掉「省」「自治区」「壮族自治区」等后缀。""" if not name: return name for suffix in ['壮族自治区', '回族自治区', '维吾尔自治区', '自治区', '省']: if name.endswith(suffix): return name[:-len(suffix)] return name def _resolve_online(ip: str) -> tuple: """阿里云市场 IP138 归属地 API(超时 2s)。 Returns: (country, province, city) or raises Exception. """ appcode = settings.ALIYUN_IP_GEO_APPCODE if not appcode: raise RuntimeError('ALIYUN_IP_GEO_APPCODE not configured') url = f'https://ali.ip138.com/ip/?ip={ip}&datatype=json' headers = {'Authorization': f'APPCODE {appcode}'} resp = requests.get(url, headers=headers, timeout=2) resp.raise_for_status() data = resp.json() if data.get('ret') != 'ok': raise RuntimeError(f'IP138 API error: {data.get("msg", data)}') # data.data = ["国家", "省份", "城市", "运营商", "邮编", "区号"] parts = data.get('data', []) country = parts[0] if len(parts) > 0 else '' province = _normalize_province(parts[1] if len(parts) > 1 else '') city = _normalize_city(parts[2] if len(parts) > 2 else '') return country, province, city def _resolve_offline(ip: str) -> tuple: """ip2region 离线库解析。 Returns: (country, province, city) or ('', '', ''). """ global _ip2region_searcher if _ip2region_searcher is None: try: import ipregion, os from ipregion import XdbSearcher pkg_dir = os.path.dirname(ipregion.__file__) db_path = os.path.join(pkg_dir, 'ip2region.xdb') content = XdbSearcher.loadContentFromFile(dbfile=db_path) _ip2region_searcher = XdbSearcher(contentBuff=content) except Exception as e: logger.warning('ip2region init failed: %s', e) return '', '', '' try: region_str = _ip2region_searcher.searchByIPStr(ip) # 格式: "中国|0|广东省|广州市|电信" parts = region_str.split('|') if region_str else [] country = parts[0] if len(parts) > 0 and parts[0] != '0' else '' province = _normalize_province(parts[2] if len(parts) > 2 and parts[2] != '0' else '') city = _normalize_city(parts[3] if len(parts) > 3 and parts[3] != '0' else '') return country, province, city except Exception as e: logger.warning('ip2region lookup failed for %s: %s', ip, e) return '', '', '' def resolve_ip_location(ip: str) -> tuple: """解析 IP 归属地。 Returns: (country, province, city, source) source: 'online' / 'offline' / 'skip' / 'failed' """ if not ip or _is_private_ip(ip): return '', '', '', 'skip' global _circuit_open_until # 尝试在线 API(熔断期间跳过) now = time.time() if now >= _circuit_open_until and settings.ALIYUN_IP_GEO_APPCODE: try: country, province, city = _resolve_online(ip) return country, province, city, 'online' except Exception as e: logger.warning('Online IP geo failed for %s: %s — circuit open for %ds', ip, e, _CIRCUIT_COOLDOWN) _circuit_open_until = now + _CIRCUIT_COOLDOWN # 降级到离线库 try: country, province, city = _resolve_offline(ip) if country or province or city: return country, province, city, 'offline' except Exception as e: logger.warning('Offline IP geo failed for %s: %s', ip, e) return '', '', '', 'failed'