429 lines
20 KiB
Python
429 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
幂等同步:源 SQL dump → 测试库 MySQL,只插入真正缺失的数据。
|
||
多次运行安全:所有表按业务唯一键去重。
|
||
|
||
- user : username 去重(冲突则 old_uid → 现有 new_uid 合并)
|
||
- assetgroup : remote_group_id 去重
|
||
- asset : remote_asset_id 去重(空则按 (group_id, name))
|
||
- generationrecord : task_id 去重
|
||
- loginrecord : (user_id, created_at, ip_address) 去重
|
||
- loginanomaly : (user_id, login_record_id, rule, created_at) 去重
|
||
- activesession : session_id 去重
|
||
- adminauditlog : (operator_id, action, target_id, created_at) 去重
|
||
|
||
执行完后,按本次新插入的生成记录增量更新 team 的 total_seconds_used / total_spent / balance。
|
||
|
||
用法:
|
||
python3 idempotent_sync.py # dry-run
|
||
python3 idempotent_sync.py --commit # 写入
|
||
"""
|
||
import re, sys
|
||
from decimal import Decimal
|
||
import pymysql
|
||
import pymysql.cursors
|
||
|
||
SOURCE = '/Users/maidong/Desktop/zyc/研究openclaw/视频生成平台/jimeng-clone/数据库备份/video_auto_原19-55.sql'
|
||
TARGET_TEAMS = (3, 4, 12)
|
||
|
||
DB_TEST = dict(host='mysql-8351f937d637-public.rds.volces.com', port=3306,
|
||
user='zyc', password='Zyc188208', database='video_auto', charset='utf8mb4',
|
||
autocommit=False, cursorclass=pymysql.cursors.DictCursor)
|
||
DB_PROD = dict(host='mysql-d9bb4e81696d-public.rds.volces.com', port=3306,
|
||
user='zyc', password='Zyc188208', database='video_auto', charset='utf8mb4',
|
||
autocommit=False, cursorclass=pymysql.cursors.DictCursor)
|
||
|
||
|
||
# ---------- SQL dump 解析 ----------
|
||
def split_values(s):
|
||
vals, cur, in_str, i = [], '', False, 0
|
||
while i < len(s):
|
||
c = s[i]
|
||
if in_str:
|
||
if c == '\\' and i+1 < len(s):
|
||
cur += c + s[i+1]; i += 2; continue
|
||
if c == "'": in_str = False
|
||
cur += c
|
||
else:
|
||
if c == "'": in_str = True; cur += c
|
||
elif c == ',': vals.append(cur.strip()); cur = ''
|
||
else: cur += c
|
||
i += 1
|
||
vals.append(cur.strip())
|
||
return vals
|
||
|
||
|
||
def parse_table(tbl):
|
||
rows = []
|
||
with open(SOURCE, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
if not line.startswith(f'INSERT INTO `{tbl}`'): continue
|
||
m = re.search(r'VALUES \((.*)\);\s*$', line)
|
||
if not m: continue
|
||
rows.append(split_values(m.group(1)))
|
||
return rows
|
||
|
||
|
||
def unq(v):
|
||
if v == 'NULL': return None
|
||
if v.startswith("'") and v.endswith("'"):
|
||
return (v[1:-1].replace("\\'", "'").replace('\\"', '"').replace('\\\\', '\\')
|
||
.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t').replace('\\0', '\0'))
|
||
return v
|
||
|
||
|
||
# ---------- 源数据 → dict 形式 ----------
|
||
def row_to_dict(row, cols):
|
||
return {c: unq(v) for c, v in zip(cols, row)}
|
||
|
||
|
||
USER_COLS = ['id','password','last_login','is_superuser','username','first_name','last_name',
|
||
'is_staff','is_active','date_joined','email','created_at','updated_at',
|
||
'daily_seconds_limit','monthly_seconds_limit','is_team_admin','team_id',
|
||
'must_change_password','disabled_by','daily_generation_limit',
|
||
'monthly_generation_limit','spending_limit','last_read_announcement','is_team_owner']
|
||
AG_COLS = ['id','remote_group_id','name','description','thumbnail_url','created_at','created_by_id','team_id']
|
||
ASSET_COLS_SRC = ['id','remote_asset_id','name','url','status','error_message','created_at','group_id']
|
||
# 测试库 asset 多 asset_type,duration,thumbnail_url — 插入时补
|
||
GEN_COLS = ['id','task_id','prompt','mode','model','aspect_ratio','duration','status','created_at',
|
||
'user_id','seconds_consumed','ark_task_id','error_message','reference_urls','result_url',
|
||
'base_cost_amount','cost_amount','frozen_amount','resolution','tokens_consumed',
|
||
'is_favorited','seed','completed_at','raw_error','updated_at','is_deleted']
|
||
LR_COLS = ['id','ip_address','user_agent','created_at','user_id','geo_city','geo_country',
|
||
'geo_province','geo_source','team_id']
|
||
LA_COLS = ['id','level','rule','detail','alerted','auto_disabled','disabled_target','created_at',
|
||
'login_record_id','team_id','user_id']
|
||
AS_COLS = ['id','session_id','device_type','user_agent','created_at','user_id']
|
||
AL_COLS = ['id','operator_name','action','target_type','target_id','target_name','before','after',
|
||
'ip_address','created_at','operator_id']
|
||
|
||
|
||
def main():
|
||
commit = '--commit' in sys.argv
|
||
use_prod = '--prod' in sys.argv
|
||
DB = DB_PROD if use_prod else DB_TEST
|
||
target_name = '【正式服】' if use_prod else '【测试服】'
|
||
print(f'\n🎯 目标: {target_name} {DB["host"]}')
|
||
if use_prod and commit:
|
||
print('⚠️ 正在写入正式服!')
|
||
|
||
# ===== 解析源 =====
|
||
print('解析源 SQL...')
|
||
src_users_all = [row_to_dict(r, USER_COLS) for r in parse_table('accounts_user')]
|
||
src_ags_all = [row_to_dict(r, AG_COLS) for r in parse_table('generation_assetgroup')]
|
||
src_assets_all = [row_to_dict(r, ASSET_COLS_SRC) for r in parse_table('generation_asset')]
|
||
src_gens_all = [row_to_dict(r, GEN_COLS) for r in parse_table('generation_generationrecord')]
|
||
src_lrs_all = [row_to_dict(r, LR_COLS) for r in parse_table('accounts_loginrecord')]
|
||
src_las_all = [row_to_dict(r, LA_COLS) for r in parse_table('accounts_loginanomaly')]
|
||
src_ases_all = [row_to_dict(r, AS_COLS) for r in parse_table('accounts_activesession')]
|
||
src_als_all = [row_to_dict(r, AL_COLS) for r in parse_table('accounts_adminauditlog')]
|
||
|
||
# 只处理目标团队的用户
|
||
src_team_users = [u for u in src_users_all if str(u['team_id']) in tuple(str(t) for t in TARGET_TEAMS)]
|
||
src_uid_set = {int(u['id']) for u in src_team_users}
|
||
src_uname_set = {u['username'] for u in src_team_users}
|
||
|
||
# 源里目标团队相关的数据
|
||
src_ags = [g for g in src_ags_all if str(g['team_id']) in tuple(str(t) for t in TARGET_TEAMS)]
|
||
src_ag_ids = {int(g['id']) for g in src_ags}
|
||
src_assets = [a for a in src_assets_all if int(a['group_id']) in src_ag_ids]
|
||
src_gens = [g for g in src_gens_all if int(g['user_id']) in src_uid_set]
|
||
src_lrs = [r for r in src_lrs_all if int(r['user_id']) in src_uid_set]
|
||
src_las = [a for a in src_las_all if int(a['user_id']) in src_uid_set]
|
||
src_ases = [s for s in src_ases_all if int(s['user_id']) in src_uid_set]
|
||
src_als = [a for a in src_als_all if a['operator_id'] is not None and int(a['operator_id']) in src_uid_set]
|
||
|
||
# ===== 连测试库 =====
|
||
print('连接测试库...')
|
||
conn = pymysql.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
try:
|
||
cur.execute('SET FOREIGN_KEY_CHECKS = 0')
|
||
|
||
# ---------- 1. user:按 username 去重 ----------
|
||
print('\n[1/8] accounts_user')
|
||
ph = ','.join(['%s']*len(src_uname_set))
|
||
cur.execute(f"SELECT id, username FROM accounts_user WHERE username IN ({ph})", list(src_uname_set))
|
||
existing_by_uname = {r['username']: r['id'] for r in cur.fetchall()}
|
||
|
||
uid_map = {} # old_uid → new_uid
|
||
user_inserts = 0
|
||
for u in src_team_users:
|
||
old_uid = int(u['id'])
|
||
if u['username'] in existing_by_uname:
|
||
uid_map[old_uid] = existing_by_uname[u['username']]
|
||
continue
|
||
# 插入新用户(AUTO id)
|
||
insert_cols = [c for c in USER_COLS if c != 'id']
|
||
insert_vals = [u[c] for c in insert_cols]
|
||
cur.execute(
|
||
f"INSERT INTO `accounts_user` ({','.join('`'+c+'`' for c in insert_cols)}) "
|
||
f"VALUES ({','.join(['%s']*len(insert_cols))})",
|
||
insert_vals
|
||
)
|
||
new_uid = cur.lastrowid
|
||
uid_map[old_uid] = new_uid
|
||
user_inserts += 1
|
||
print(f' ➕ 新用户: {u["username"]} (old {old_uid} → new {new_uid})')
|
||
print(f' 新增 {user_inserts} 用户,映射 {len(uid_map)}')
|
||
|
||
# ---------- 2. assetgroup:按 remote_group_id 去重 ----------
|
||
print('\n[2/8] generation_assetgroup')
|
||
src_rgids = [g['remote_group_id'] for g in src_ags if g['remote_group_id']]
|
||
if src_rgids:
|
||
ph = ','.join(['%s']*len(src_rgids))
|
||
cur.execute(f"SELECT id, remote_group_id FROM generation_assetgroup WHERE remote_group_id IN ({ph})", src_rgids)
|
||
existing_by_rgid = {r['remote_group_id']: r['id'] for r in cur.fetchall()}
|
||
else:
|
||
existing_by_rgid = {}
|
||
|
||
ag_map = {} # old_ag_id → new_ag_id
|
||
ag_inserts = 0
|
||
for g in src_ags:
|
||
old_id = int(g['id'])
|
||
rgid = g['remote_group_id']
|
||
if rgid and rgid in existing_by_rgid:
|
||
ag_map[old_id] = existing_by_rgid[rgid]
|
||
continue
|
||
insert_cols = [c for c in AG_COLS if c != 'id']
|
||
vals = []
|
||
for c in insert_cols:
|
||
v = g[c]
|
||
if c == 'created_by_id' and v is not None:
|
||
ov = int(v)
|
||
v = uid_map.get(ov, ov) # 可能 created_by 是两团队之外的用户,直接保留
|
||
vals.append(v)
|
||
cur.execute(
|
||
f"INSERT INTO `generation_assetgroup` ({','.join('`'+c+'`' for c in insert_cols)}) "
|
||
f"VALUES ({','.join(['%s']*len(insert_cols))})",
|
||
vals
|
||
)
|
||
ag_map[old_id] = cur.lastrowid
|
||
ag_inserts += 1
|
||
print(f' 新增 {ag_inserts} assetgroup,映射 {len(ag_map)}')
|
||
|
||
# ---------- 3. asset:按 remote_asset_id 去重,无 remote_asset_id 按 (group_id, name) ----------
|
||
print('\n[3/8] generation_asset')
|
||
# 已有 remote_asset_id 集合
|
||
cur.execute("SELECT remote_asset_id FROM generation_asset WHERE remote_asset_id != ''")
|
||
existing_raids = {r['remote_asset_id'] for r in cur.fetchall()}
|
||
# 已有 (group_id, name) 组合(当 remote_asset_id 为空)
|
||
cur.execute("SELECT group_id, name FROM generation_asset WHERE remote_asset_id = ''")
|
||
existing_namekeys = {(r['group_id'], r['name']) for r in cur.fetchall()}
|
||
|
||
asset_inserts = 0
|
||
VIDEO_EXT = ('.mp4', '.mov', '.avi', '.webm', '.mkv', '.m4v')
|
||
AUDIO_EXT = ('.mp3', '.wav', '.m4a', '.aac', '.flac', '.ogg')
|
||
for a in src_assets:
|
||
new_gid = ag_map[int(a['group_id'])]
|
||
raid = a['remote_asset_id']
|
||
key = (new_gid, a['name'])
|
||
if raid and raid in existing_raids:
|
||
continue
|
||
if not raid and key in existing_namekeys:
|
||
continue
|
||
# 推断 asset_type(测试库 NOT NULL)
|
||
url_l = (a['url'] or '').lower()
|
||
if any(e in url_l for e in VIDEO_EXT):
|
||
atype = 'Video'
|
||
elif any(e in url_l for e in AUDIO_EXT):
|
||
atype = 'Audio'
|
||
else:
|
||
atype = 'Image'
|
||
cur.execute(
|
||
"""INSERT INTO generation_asset (remote_asset_id,name,url,status,error_message,created_at,
|
||
group_id,asset_type,duration,thumbnail_url) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
||
(raid or '', a['name'], a['url'], a['status'], a['error_message'], a['created_at'],
|
||
new_gid, atype, None, '')
|
||
)
|
||
asset_inserts += 1
|
||
print(f' 新增 {asset_inserts} asset')
|
||
|
||
# ---------- 4. generationrecord:按 task_id 去重 ----------
|
||
print('\n[4/8] generation_generationrecord')
|
||
src_tids = [g['task_id'] for g in src_gens]
|
||
if src_tids:
|
||
ph = ','.join(['%s']*len(src_tids))
|
||
cur.execute(f"SELECT task_id FROM generation_generationrecord WHERE task_id IN ({ph})", src_tids)
|
||
existing_tids = {r['task_id'] for r in cur.fetchall()}
|
||
else:
|
||
existing_tids = set()
|
||
|
||
gen_inserts_by_team = {t: [] for t in TARGET_TEAMS} # 用于最后 team 字段重算
|
||
gen_insert_cols = [c for c in GEN_COLS if c != 'id'] + ['thumbnail_url']
|
||
for g in src_gens:
|
||
if g['task_id'] in existing_tids: continue
|
||
new_uid = uid_map[int(g['user_id'])]
|
||
vals = [g[c] if c != 'user_id' else new_uid for c in GEN_COLS if c != 'id'] + ['']
|
||
cur.execute(
|
||
f"INSERT INTO generation_generationrecord "
|
||
f"({','.join('`'+c+'`' for c in gen_insert_cols)}) "
|
||
f"VALUES ({','.join(['%s']*len(gen_insert_cols))})",
|
||
vals
|
||
)
|
||
# 分流到所属 team(根据源用户的 team_id)
|
||
src_uid = int(g['user_id'])
|
||
src_user = next(u for u in src_team_users if int(u['id']) == src_uid)
|
||
tid = int(src_user['team_id'])
|
||
gen_inserts_by_team[tid].append(g)
|
||
parts = ', '.join(f'team{t}={len(gen_inserts_by_team[t])}' for t in TARGET_TEAMS)
|
||
total = sum(len(v) for v in gen_inserts_by_team.values())
|
||
print(f' 新增 {total} generationrecord ({parts})')
|
||
|
||
# ---------- 5. loginrecord:按 (user_id, created_at, ip) 去重 ----------
|
||
print('\n[5/8] accounts_loginrecord')
|
||
# 一次查出相关 user_id 的所有 loginrecord
|
||
mapped_uids = set(uid_map.values())
|
||
if mapped_uids:
|
||
ph = ','.join(['%s']*len(mapped_uids))
|
||
cur.execute(f"""SELECT id, user_id, created_at, ip_address FROM accounts_loginrecord
|
||
WHERE user_id IN ({ph})""", list(mapped_uids))
|
||
existing_lr = {(r['user_id'], r['created_at'], r['ip_address']): r['id'] for r in cur.fetchall()}
|
||
else:
|
||
existing_lr = {}
|
||
|
||
lr_map = {} # old_lr_id → new_lr_id (本次插入的)
|
||
lr_inserts = 0
|
||
lr_insert_cols = [c for c in LR_COLS if c != 'id']
|
||
for r in src_lrs:
|
||
new_uid = uid_map[int(r['user_id'])]
|
||
# 解析 created_at → datetime
|
||
from datetime import datetime
|
||
ca = r['created_at']
|
||
if isinstance(ca, str):
|
||
try: ca_dt = datetime.strptime(ca, '%Y-%m-%d %H:%M:%S.%f')
|
||
except ValueError: ca_dt = datetime.strptime(ca, '%Y-%m-%d %H:%M:%S')
|
||
else:
|
||
ca_dt = ca
|
||
key = (new_uid, ca_dt, r['ip_address'])
|
||
if key in existing_lr:
|
||
lr_map[int(r['id'])] = existing_lr[key]
|
||
continue
|
||
vals = [r[c] if c != 'user_id' else new_uid for c in lr_insert_cols]
|
||
cur.execute(
|
||
f"INSERT INTO accounts_loginrecord ({','.join('`'+c+'`' for c in lr_insert_cols)}) "
|
||
f"VALUES ({','.join(['%s']*len(lr_insert_cols))})",
|
||
vals
|
||
)
|
||
lr_map[int(r['id'])] = cur.lastrowid
|
||
lr_inserts += 1
|
||
print(f' 新增 {lr_inserts} loginrecord')
|
||
|
||
# ---------- 6. loginanomaly:按 (user_id, login_record_id, rule, created_at) ----------
|
||
print('\n[6/8] accounts_loginanomaly')
|
||
la_inserts = 0
|
||
la_insert_cols = [c for c in LA_COLS if c != 'id']
|
||
for a in src_las:
|
||
new_uid = uid_map[int(a['user_id'])]
|
||
old_lr_id = int(a['login_record_id'])
|
||
if old_lr_id not in lr_map:
|
||
# login_record 可能不在源抽出范围(跨团队),跳过
|
||
continue
|
||
new_lr_id = lr_map[old_lr_id]
|
||
cur.execute("""SELECT 1 FROM accounts_loginanomaly
|
||
WHERE user_id=%s AND login_record_id=%s AND rule=%s AND created_at=%s""",
|
||
(new_uid, new_lr_id, a['rule'], a['created_at']))
|
||
if cur.fetchone(): continue
|
||
vals = []
|
||
for c in la_insert_cols:
|
||
if c == 'user_id': vals.append(new_uid)
|
||
elif c == 'login_record_id': vals.append(new_lr_id)
|
||
else: vals.append(a[c])
|
||
cur.execute(
|
||
f"INSERT INTO accounts_loginanomaly ({','.join('`'+c+'`' for c in la_insert_cols)}) "
|
||
f"VALUES ({','.join(['%s']*len(la_insert_cols))})",
|
||
vals
|
||
)
|
||
la_inserts += 1
|
||
print(f' 新增 {la_inserts} loginanomaly')
|
||
|
||
# ---------- 7. activesession:按 session_id 去重 ----------
|
||
print('\n[7/8] accounts_activesession')
|
||
src_sids = [s['session_id'] for s in src_ases]
|
||
if src_sids:
|
||
ph = ','.join(['%s']*len(src_sids))
|
||
cur.execute(f"SELECT session_id FROM accounts_activesession WHERE session_id IN ({ph})", src_sids)
|
||
existing_sids = {r['session_id'] for r in cur.fetchall()}
|
||
else:
|
||
existing_sids = set()
|
||
|
||
as_inserts = 0
|
||
as_insert_cols = [c for c in AS_COLS if c != 'id']
|
||
for s in src_ases:
|
||
if s['session_id'] in existing_sids: continue
|
||
new_uid = uid_map[int(s['user_id'])]
|
||
vals = [s[c] if c != 'user_id' else new_uid for c in as_insert_cols]
|
||
cur.execute(
|
||
f"INSERT INTO accounts_activesession ({','.join('`'+c+'`' for c in as_insert_cols)}) "
|
||
f"VALUES ({','.join(['%s']*len(as_insert_cols))})",
|
||
vals
|
||
)
|
||
as_inserts += 1
|
||
print(f' 新增 {as_inserts} activesession')
|
||
|
||
# ---------- 8. adminauditlog:按 (operator_id, action, target_id, created_at) ----------
|
||
print('\n[8/8] accounts_adminauditlog')
|
||
al_inserts = 0
|
||
al_insert_cols = [c for c in AL_COLS if c != 'id']
|
||
for a in src_als:
|
||
op_id = int(a['operator_id'])
|
||
new_op_id = uid_map.get(op_id, op_id)
|
||
tgt = int(a['target_id']) if a['target_id'] else None
|
||
new_tgt = uid_map.get(tgt, tgt) if tgt else None
|
||
cur.execute("""SELECT 1 FROM accounts_adminauditlog
|
||
WHERE operator_id=%s AND action=%s AND
|
||
(target_id=%s OR (target_id IS NULL AND %s IS NULL))
|
||
AND created_at=%s""",
|
||
(new_op_id, a['action'], new_tgt, new_tgt, a['created_at']))
|
||
if cur.fetchone(): continue
|
||
vals = []
|
||
for c in al_insert_cols:
|
||
if c == 'operator_id': vals.append(new_op_id)
|
||
elif c == 'target_id': vals.append(new_tgt)
|
||
else: vals.append(a[c])
|
||
cur.execute(
|
||
f"INSERT INTO accounts_adminauditlog ({','.join('`'+c+'`' for c in al_insert_cols)}) "
|
||
f"VALUES ({','.join(['%s']*len(al_insert_cols))})",
|
||
vals
|
||
)
|
||
al_inserts += 1
|
||
print(f' 新增 {al_inserts} adminauditlog')
|
||
|
||
# ---------- 重算 team 统计 ----------
|
||
print('\n[重算 team 统计]')
|
||
for tid in TARGET_TEAMS:
|
||
gens_added = gen_inserts_by_team[tid]
|
||
if not gens_added:
|
||
print(f' Team {tid}: 无新增生成记录,跳过')
|
||
continue
|
||
sec_delta = sum(Decimal(g['seconds_consumed']) for g in gens_added)
|
||
cost_delta = sum(Decimal(g['cost_amount']) for g in gens_added)
|
||
cur.execute("""UPDATE accounts_team SET
|
||
total_seconds_used = total_seconds_used + %s,
|
||
total_spent = total_spent + %s,
|
||
balance = balance - %s
|
||
WHERE id=%s""",
|
||
(sec_delta, cost_delta, cost_delta, tid))
|
||
print(f' Team {tid}: +seconds={sec_delta} +spent={cost_delta} -balance={cost_delta}')
|
||
|
||
cur.execute('SET FOREIGN_KEY_CHECKS = 1')
|
||
|
||
if commit:
|
||
conn.commit()
|
||
print('\n✅ COMMITTED')
|
||
else:
|
||
conn.rollback()
|
||
print('\n🔎 Rolled back (use --commit to persist)')
|
||
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f'\n❌ Error: {e}')
|
||
raise
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|