video-shuoshan/migration_backup/idempotent_sync.py
2026-04-17 20:24:05 +08:00

429 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
幂等同步:源 SQL dump → 测试库 MySQL只插入真正缺失的数据。
多次运行安全:所有表按业务唯一键去重。
- user : username 去重(冲突则 old_uid → 现有 new_uid 合并)
- assetgroup : remote_group_id 去重
- asset : remote_asset_id 去重(空则按 (group_id, name)
- generationrecord : task_id 去重
- loginrecord : (user_id, created_at, ip_address) 去重
- loginanomaly : (user_id, login_record_id, rule, created_at) 去重
- activesession : session_id 去重
- adminauditlog : (operator_id, action, target_id, created_at) 去重
执行完后,按本次新插入的生成记录增量更新 team 的 total_seconds_used / total_spent / balance。
用法:
python3 idempotent_sync.py # dry-run
python3 idempotent_sync.py --commit # 写入
"""
import re, sys
from decimal import Decimal
import pymysql
import pymysql.cursors
SOURCE = '/Users/maidong/Desktop/zyc/研究openclaw/视频生成平台/jimeng-clone/数据库备份/video_auto_原19-55.sql'
TARGET_TEAMS = (3, 4, 12)
DB_TEST = dict(host='mysql-8351f937d637-public.rds.volces.com', port=3306,
user='zyc', password='Zyc188208', database='video_auto', charset='utf8mb4',
autocommit=False, cursorclass=pymysql.cursors.DictCursor)
DB_PROD = dict(host='mysql-d9bb4e81696d-public.rds.volces.com', port=3306,
user='zyc', password='Zyc188208', database='video_auto', charset='utf8mb4',
autocommit=False, cursorclass=pymysql.cursors.DictCursor)
# ---------- SQL dump 解析 ----------
def split_values(s):
vals, cur, in_str, i = [], '', False, 0
while i < len(s):
c = s[i]
if in_str:
if c == '\\' and i+1 < len(s):
cur += c + s[i+1]; i += 2; continue
if c == "'": in_str = False
cur += c
else:
if c == "'": in_str = True; cur += c
elif c == ',': vals.append(cur.strip()); cur = ''
else: cur += c
i += 1
vals.append(cur.strip())
return vals
def parse_table(tbl):
rows = []
with open(SOURCE, 'r', encoding='utf-8') as f:
for line in f:
if not line.startswith(f'INSERT INTO `{tbl}`'): continue
m = re.search(r'VALUES \((.*)\);\s*$', line)
if not m: continue
rows.append(split_values(m.group(1)))
return rows
def unq(v):
if v == 'NULL': return None
if v.startswith("'") and v.endswith("'"):
return (v[1:-1].replace("\\'", "'").replace('\\"', '"').replace('\\\\', '\\')
.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t').replace('\\0', '\0'))
return v
# ---------- 源数据 → dict 形式 ----------
def row_to_dict(row, cols):
return {c: unq(v) for c, v in zip(cols, row)}
USER_COLS = ['id','password','last_login','is_superuser','username','first_name','last_name',
'is_staff','is_active','date_joined','email','created_at','updated_at',
'daily_seconds_limit','monthly_seconds_limit','is_team_admin','team_id',
'must_change_password','disabled_by','daily_generation_limit',
'monthly_generation_limit','spending_limit','last_read_announcement','is_team_owner']
AG_COLS = ['id','remote_group_id','name','description','thumbnail_url','created_at','created_by_id','team_id']
ASSET_COLS_SRC = ['id','remote_asset_id','name','url','status','error_message','created_at','group_id']
# 测试库 asset 多 asset_type,duration,thumbnail_url — 插入时补
GEN_COLS = ['id','task_id','prompt','mode','model','aspect_ratio','duration','status','created_at',
'user_id','seconds_consumed','ark_task_id','error_message','reference_urls','result_url',
'base_cost_amount','cost_amount','frozen_amount','resolution','tokens_consumed',
'is_favorited','seed','completed_at','raw_error','updated_at','is_deleted']
LR_COLS = ['id','ip_address','user_agent','created_at','user_id','geo_city','geo_country',
'geo_province','geo_source','team_id']
LA_COLS = ['id','level','rule','detail','alerted','auto_disabled','disabled_target','created_at',
'login_record_id','team_id','user_id']
AS_COLS = ['id','session_id','device_type','user_agent','created_at','user_id']
AL_COLS = ['id','operator_name','action','target_type','target_id','target_name','before','after',
'ip_address','created_at','operator_id']
def main():
commit = '--commit' in sys.argv
use_prod = '--prod' in sys.argv
DB = DB_PROD if use_prod else DB_TEST
target_name = '【正式服】' if use_prod else '【测试服】'
print(f'\n🎯 目标: {target_name} {DB["host"]}')
if use_prod and commit:
print('⚠️ 正在写入正式服!')
# ===== 解析源 =====
print('解析源 SQL...')
src_users_all = [row_to_dict(r, USER_COLS) for r in parse_table('accounts_user')]
src_ags_all = [row_to_dict(r, AG_COLS) for r in parse_table('generation_assetgroup')]
src_assets_all = [row_to_dict(r, ASSET_COLS_SRC) for r in parse_table('generation_asset')]
src_gens_all = [row_to_dict(r, GEN_COLS) for r in parse_table('generation_generationrecord')]
src_lrs_all = [row_to_dict(r, LR_COLS) for r in parse_table('accounts_loginrecord')]
src_las_all = [row_to_dict(r, LA_COLS) for r in parse_table('accounts_loginanomaly')]
src_ases_all = [row_to_dict(r, AS_COLS) for r in parse_table('accounts_activesession')]
src_als_all = [row_to_dict(r, AL_COLS) for r in parse_table('accounts_adminauditlog')]
# 只处理目标团队的用户
src_team_users = [u for u in src_users_all if str(u['team_id']) in tuple(str(t) for t in TARGET_TEAMS)]
src_uid_set = {int(u['id']) for u in src_team_users}
src_uname_set = {u['username'] for u in src_team_users}
# 源里目标团队相关的数据
src_ags = [g for g in src_ags_all if str(g['team_id']) in tuple(str(t) for t in TARGET_TEAMS)]
src_ag_ids = {int(g['id']) for g in src_ags}
src_assets = [a for a in src_assets_all if int(a['group_id']) in src_ag_ids]
src_gens = [g for g in src_gens_all if int(g['user_id']) in src_uid_set]
src_lrs = [r for r in src_lrs_all if int(r['user_id']) in src_uid_set]
src_las = [a for a in src_las_all if int(a['user_id']) in src_uid_set]
src_ases = [s for s in src_ases_all if int(s['user_id']) in src_uid_set]
src_als = [a for a in src_als_all if a['operator_id'] is not None and int(a['operator_id']) in src_uid_set]
# ===== 连测试库 =====
print('连接测试库...')
conn = pymysql.connect(**DB)
cur = conn.cursor()
try:
cur.execute('SET FOREIGN_KEY_CHECKS = 0')
# ---------- 1. user按 username 去重 ----------
print('\n[1/8] accounts_user')
ph = ','.join(['%s']*len(src_uname_set))
cur.execute(f"SELECT id, username FROM accounts_user WHERE username IN ({ph})", list(src_uname_set))
existing_by_uname = {r['username']: r['id'] for r in cur.fetchall()}
uid_map = {} # old_uid → new_uid
user_inserts = 0
for u in src_team_users:
old_uid = int(u['id'])
if u['username'] in existing_by_uname:
uid_map[old_uid] = existing_by_uname[u['username']]
continue
# 插入新用户AUTO id
insert_cols = [c for c in USER_COLS if c != 'id']
insert_vals = [u[c] for c in insert_cols]
cur.execute(
f"INSERT INTO `accounts_user` ({','.join('`'+c+'`' for c in insert_cols)}) "
f"VALUES ({','.join(['%s']*len(insert_cols))})",
insert_vals
)
new_uid = cur.lastrowid
uid_map[old_uid] = new_uid
user_inserts += 1
print(f' 新用户: {u["username"]} (old {old_uid} → new {new_uid})')
print(f' 新增 {user_inserts} 用户,映射 {len(uid_map)}')
# ---------- 2. assetgroup按 remote_group_id 去重 ----------
print('\n[2/8] generation_assetgroup')
src_rgids = [g['remote_group_id'] for g in src_ags if g['remote_group_id']]
if src_rgids:
ph = ','.join(['%s']*len(src_rgids))
cur.execute(f"SELECT id, remote_group_id FROM generation_assetgroup WHERE remote_group_id IN ({ph})", src_rgids)
existing_by_rgid = {r['remote_group_id']: r['id'] for r in cur.fetchall()}
else:
existing_by_rgid = {}
ag_map = {} # old_ag_id → new_ag_id
ag_inserts = 0
for g in src_ags:
old_id = int(g['id'])
rgid = g['remote_group_id']
if rgid and rgid in existing_by_rgid:
ag_map[old_id] = existing_by_rgid[rgid]
continue
insert_cols = [c for c in AG_COLS if c != 'id']
vals = []
for c in insert_cols:
v = g[c]
if c == 'created_by_id' and v is not None:
ov = int(v)
v = uid_map.get(ov, ov) # 可能 created_by 是两团队之外的用户,直接保留
vals.append(v)
cur.execute(
f"INSERT INTO `generation_assetgroup` ({','.join('`'+c+'`' for c in insert_cols)}) "
f"VALUES ({','.join(['%s']*len(insert_cols))})",
vals
)
ag_map[old_id] = cur.lastrowid
ag_inserts += 1
print(f' 新增 {ag_inserts} assetgroup映射 {len(ag_map)}')
# ---------- 3. asset按 remote_asset_id 去重,无 remote_asset_id 按 (group_id, name) ----------
print('\n[3/8] generation_asset')
# 已有 remote_asset_id 集合
cur.execute("SELECT remote_asset_id FROM generation_asset WHERE remote_asset_id != ''")
existing_raids = {r['remote_asset_id'] for r in cur.fetchall()}
# 已有 (group_id, name) 组合(当 remote_asset_id 为空)
cur.execute("SELECT group_id, name FROM generation_asset WHERE remote_asset_id = ''")
existing_namekeys = {(r['group_id'], r['name']) for r in cur.fetchall()}
asset_inserts = 0
VIDEO_EXT = ('.mp4', '.mov', '.avi', '.webm', '.mkv', '.m4v')
AUDIO_EXT = ('.mp3', '.wav', '.m4a', '.aac', '.flac', '.ogg')
for a in src_assets:
new_gid = ag_map[int(a['group_id'])]
raid = a['remote_asset_id']
key = (new_gid, a['name'])
if raid and raid in existing_raids:
continue
if not raid and key in existing_namekeys:
continue
# 推断 asset_type测试库 NOT NULL
url_l = (a['url'] or '').lower()
if any(e in url_l for e in VIDEO_EXT):
atype = 'Video'
elif any(e in url_l for e in AUDIO_EXT):
atype = 'Audio'
else:
atype = 'Image'
cur.execute(
"""INSERT INTO generation_asset (remote_asset_id,name,url,status,error_message,created_at,
group_id,asset_type,duration,thumbnail_url) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(raid or '', a['name'], a['url'], a['status'], a['error_message'], a['created_at'],
new_gid, atype, None, '')
)
asset_inserts += 1
print(f' 新增 {asset_inserts} asset')
# ---------- 4. generationrecord按 task_id 去重 ----------
print('\n[4/8] generation_generationrecord')
src_tids = [g['task_id'] for g in src_gens]
if src_tids:
ph = ','.join(['%s']*len(src_tids))
cur.execute(f"SELECT task_id FROM generation_generationrecord WHERE task_id IN ({ph})", src_tids)
existing_tids = {r['task_id'] for r in cur.fetchall()}
else:
existing_tids = set()
gen_inserts_by_team = {t: [] for t in TARGET_TEAMS} # 用于最后 team 字段重算
gen_insert_cols = [c for c in GEN_COLS if c != 'id'] + ['thumbnail_url']
for g in src_gens:
if g['task_id'] in existing_tids: continue
new_uid = uid_map[int(g['user_id'])]
vals = [g[c] if c != 'user_id' else new_uid for c in GEN_COLS if c != 'id'] + ['']
cur.execute(
f"INSERT INTO generation_generationrecord "
f"({','.join('`'+c+'`' for c in gen_insert_cols)}) "
f"VALUES ({','.join(['%s']*len(gen_insert_cols))})",
vals
)
# 分流到所属 team根据源用户的 team_id
src_uid = int(g['user_id'])
src_user = next(u for u in src_team_users if int(u['id']) == src_uid)
tid = int(src_user['team_id'])
gen_inserts_by_team[tid].append(g)
parts = ', '.join(f'team{t}={len(gen_inserts_by_team[t])}' for t in TARGET_TEAMS)
total = sum(len(v) for v in gen_inserts_by_team.values())
print(f' 新增 {total} generationrecord ({parts})')
# ---------- 5. loginrecord按 (user_id, created_at, ip) 去重 ----------
print('\n[5/8] accounts_loginrecord')
# 一次查出相关 user_id 的所有 loginrecord
mapped_uids = set(uid_map.values())
if mapped_uids:
ph = ','.join(['%s']*len(mapped_uids))
cur.execute(f"""SELECT id, user_id, created_at, ip_address FROM accounts_loginrecord
WHERE user_id IN ({ph})""", list(mapped_uids))
existing_lr = {(r['user_id'], r['created_at'], r['ip_address']): r['id'] for r in cur.fetchall()}
else:
existing_lr = {}
lr_map = {} # old_lr_id → new_lr_id (本次插入的)
lr_inserts = 0
lr_insert_cols = [c for c in LR_COLS if c != 'id']
for r in src_lrs:
new_uid = uid_map[int(r['user_id'])]
# 解析 created_at → datetime
from datetime import datetime
ca = r['created_at']
if isinstance(ca, str):
try: ca_dt = datetime.strptime(ca, '%Y-%m-%d %H:%M:%S.%f')
except ValueError: ca_dt = datetime.strptime(ca, '%Y-%m-%d %H:%M:%S')
else:
ca_dt = ca
key = (new_uid, ca_dt, r['ip_address'])
if key in existing_lr:
lr_map[int(r['id'])] = existing_lr[key]
continue
vals = [r[c] if c != 'user_id' else new_uid for c in lr_insert_cols]
cur.execute(
f"INSERT INTO accounts_loginrecord ({','.join('`'+c+'`' for c in lr_insert_cols)}) "
f"VALUES ({','.join(['%s']*len(lr_insert_cols))})",
vals
)
lr_map[int(r['id'])] = cur.lastrowid
lr_inserts += 1
print(f' 新增 {lr_inserts} loginrecord')
# ---------- 6. loginanomaly按 (user_id, login_record_id, rule, created_at) ----------
print('\n[6/8] accounts_loginanomaly')
la_inserts = 0
la_insert_cols = [c for c in LA_COLS if c != 'id']
for a in src_las:
new_uid = uid_map[int(a['user_id'])]
old_lr_id = int(a['login_record_id'])
if old_lr_id not in lr_map:
# login_record 可能不在源抽出范围(跨团队),跳过
continue
new_lr_id = lr_map[old_lr_id]
cur.execute("""SELECT 1 FROM accounts_loginanomaly
WHERE user_id=%s AND login_record_id=%s AND rule=%s AND created_at=%s""",
(new_uid, new_lr_id, a['rule'], a['created_at']))
if cur.fetchone(): continue
vals = []
for c in la_insert_cols:
if c == 'user_id': vals.append(new_uid)
elif c == 'login_record_id': vals.append(new_lr_id)
else: vals.append(a[c])
cur.execute(
f"INSERT INTO accounts_loginanomaly ({','.join('`'+c+'`' for c in la_insert_cols)}) "
f"VALUES ({','.join(['%s']*len(la_insert_cols))})",
vals
)
la_inserts += 1
print(f' 新增 {la_inserts} loginanomaly')
# ---------- 7. activesession按 session_id 去重 ----------
print('\n[7/8] accounts_activesession')
src_sids = [s['session_id'] for s in src_ases]
if src_sids:
ph = ','.join(['%s']*len(src_sids))
cur.execute(f"SELECT session_id FROM accounts_activesession WHERE session_id IN ({ph})", src_sids)
existing_sids = {r['session_id'] for r in cur.fetchall()}
else:
existing_sids = set()
as_inserts = 0
as_insert_cols = [c for c in AS_COLS if c != 'id']
for s in src_ases:
if s['session_id'] in existing_sids: continue
new_uid = uid_map[int(s['user_id'])]
vals = [s[c] if c != 'user_id' else new_uid for c in as_insert_cols]
cur.execute(
f"INSERT INTO accounts_activesession ({','.join('`'+c+'`' for c in as_insert_cols)}) "
f"VALUES ({','.join(['%s']*len(as_insert_cols))})",
vals
)
as_inserts += 1
print(f' 新增 {as_inserts} activesession')
# ---------- 8. adminauditlog按 (operator_id, action, target_id, created_at) ----------
print('\n[8/8] accounts_adminauditlog')
al_inserts = 0
al_insert_cols = [c for c in AL_COLS if c != 'id']
for a in src_als:
op_id = int(a['operator_id'])
new_op_id = uid_map.get(op_id, op_id)
tgt = int(a['target_id']) if a['target_id'] else None
new_tgt = uid_map.get(tgt, tgt) if tgt else None
cur.execute("""SELECT 1 FROM accounts_adminauditlog
WHERE operator_id=%s AND action=%s AND
(target_id=%s OR (target_id IS NULL AND %s IS NULL))
AND created_at=%s""",
(new_op_id, a['action'], new_tgt, new_tgt, a['created_at']))
if cur.fetchone(): continue
vals = []
for c in al_insert_cols:
if c == 'operator_id': vals.append(new_op_id)
elif c == 'target_id': vals.append(new_tgt)
else: vals.append(a[c])
cur.execute(
f"INSERT INTO accounts_adminauditlog ({','.join('`'+c+'`' for c in al_insert_cols)}) "
f"VALUES ({','.join(['%s']*len(al_insert_cols))})",
vals
)
al_inserts += 1
print(f' 新增 {al_inserts} adminauditlog')
# ---------- 重算 team 统计 ----------
print('\n[重算 team 统计]')
for tid in TARGET_TEAMS:
gens_added = gen_inserts_by_team[tid]
if not gens_added:
print(f' Team {tid}: 无新增生成记录,跳过')
continue
sec_delta = sum(Decimal(g['seconds_consumed']) for g in gens_added)
cost_delta = sum(Decimal(g['cost_amount']) for g in gens_added)
cur.execute("""UPDATE accounts_team SET
total_seconds_used = total_seconds_used + %s,
total_spent = total_spent + %s,
balance = balance - %s
WHERE id=%s""",
(sec_delta, cost_delta, cost_delta, tid))
print(f' Team {tid}: +seconds={sec_delta} +spent={cost_delta} -balance={cost_delta}')
cur.execute('SET FOREIGN_KEY_CHECKS = 1')
if commit:
conn.commit()
print('\n✅ COMMITTED')
else:
conn.rollback()
print('\n🔎 Rolled back (use --commit to persist)')
except Exception as e:
conn.rollback()
print(f'\n❌ Error: {e}')
raise
finally:
conn.close()
if __name__ == '__main__':
main()