video-shuoshan/backend/utils/geo_client.py
seaislee1209 be656900c0
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m13s
feat: v0.9.7 登录风控第二期 — IP归属地解析 + 异常检测(R1-R5) + 飞书告警 + 自动封禁
- IP138 在线 API + ip2region 离线库双通道归属地解析,60 秒熔断降级
- 5 条异常检测规则:地区不对/不可能旅行/频繁登录/团队遍地开花/海外IP太杂
- 飞书 interactive 卡片告警(红色严重/橙色警告),含辅助指标
- R2 自动封禁用户、R4 自动封禁团队,封禁即踢下线
- 系统设置页全局配置 + 团队详情页独立阈值覆盖
- 安全日志页面 + 管理员修改密码入口

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 00:02:56 +08:00

136 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""IP 归属地解析 — 阿里云市场在线 API + ip2region 离线库 + 熔断降级。"""
import ipaddress
import logging
import re
import time
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
# ── 熔断状态 ──
_circuit_open_until = 0 # 在线 API 失败后,此时间戳之前直接走离线
_CIRCUIT_COOLDOWN = 60 # 熔断冷却 60 秒
# ── ip2region 搜索器缓存 ──
_ip2region_searcher = None
def _is_private_ip(ip: str) -> bool:
"""判断是否为私有/本地 IP。"""
try:
addr = ipaddress.ip_address(ip)
return addr.is_private or addr.is_loopback or addr.is_reserved
except (ValueError, TypeError):
return True
def _normalize_city(name: str) -> str:
"""标准化城市名:去掉「市」后缀。"""
if name and name.endswith(''):
return name[:-1]
return name
def _normalize_province(name: str) -> str:
"""标准化省份名:去掉「省」「自治区」「壮族自治区」等后缀。"""
if not name:
return name
for suffix in ['壮族自治区', '回族自治区', '维吾尔自治区', '自治区', '']:
if name.endswith(suffix):
return name[:-len(suffix)]
return name
def _resolve_online(ip: str) -> tuple:
"""阿里云市场 IP138 归属地 API超时 2s
Returns: (country, province, city) or raises Exception.
"""
appcode = settings.ALIYUN_IP_GEO_APPCODE
if not appcode:
raise RuntimeError('ALIYUN_IP_GEO_APPCODE not configured')
url = f'https://ali.ip138.com/ip/?ip={ip}&datatype=json'
headers = {'Authorization': f'APPCODE {appcode}'}
resp = requests.get(url, headers=headers, timeout=2)
resp.raise_for_status()
data = resp.json()
if data.get('ret') != 'ok':
raise RuntimeError(f'IP138 API error: {data.get("msg", data)}')
# data.data = ["国家", "省份", "城市", "运营商", "邮编", "区号"]
parts = data.get('data', [])
country = parts[0] if len(parts) > 0 else ''
province = _normalize_province(parts[1] if len(parts) > 1 else '')
city = _normalize_city(parts[2] if len(parts) > 2 else '')
return country, province, city
def _resolve_offline(ip: str) -> tuple:
"""ip2region 离线库解析。
Returns: (country, province, city) or ('', '', '').
"""
global _ip2region_searcher
if _ip2region_searcher is None:
try:
import ipregion, os
from ipregion import XdbSearcher
pkg_dir = os.path.dirname(ipregion.__file__)
db_path = os.path.join(pkg_dir, 'ip2region.xdb')
content = XdbSearcher.loadContentFromFile(dbfile=db_path)
_ip2region_searcher = XdbSearcher(contentBuff=content)
except Exception as e:
logger.warning('ip2region init failed: %s', e)
return '', '', ''
try:
region_str = _ip2region_searcher.searchByIPStr(ip)
# 格式: "中国|0|广东省|广州市|电信"
parts = region_str.split('|') if region_str else []
country = parts[0] if len(parts) > 0 and parts[0] != '0' else ''
province = _normalize_province(parts[2] if len(parts) > 2 and parts[2] != '0' else '')
city = _normalize_city(parts[3] if len(parts) > 3 and parts[3] != '0' else '')
return country, province, city
except Exception as e:
logger.warning('ip2region lookup failed for %s: %s', ip, e)
return '', '', ''
def resolve_ip_location(ip: str) -> tuple:
"""解析 IP 归属地。
Returns: (country, province, city, source)
source: 'online' / 'offline' / 'skip' / 'failed'
"""
if not ip or _is_private_ip(ip):
return '', '', '', 'skip'
global _circuit_open_until
# 尝试在线 API熔断期间跳过
now = time.time()
if now >= _circuit_open_until and settings.ALIYUN_IP_GEO_APPCODE:
try:
country, province, city = _resolve_online(ip)
return country, province, city, 'online'
except Exception as e:
logger.warning('Online IP geo failed for %s: %s — circuit open for %ds', ip, e, _CIRCUIT_COOLDOWN)
_circuit_open_until = now + _CIRCUIT_COOLDOWN
# 降级到离线库
try:
country, province, city = _resolve_offline(ip)
if country or province or city:
return country, province, city, 'offline'
except Exception as e:
logger.warning('Offline IP geo failed for %s: %s', ip, e)
return '', '', '', 'failed'