video-shuoshan/migration_backup/migrate_two_teams.py
2026-04-17 20:24:05 +08:00

296 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Migrate two teams (万物苏网络 id=12, 洁雯团队 id=4 增量 user 107) from
source SQL dump → 测试库 MySQL (mysql-8351f937d637-public.rds.volces.com).
策略:
- team 4 已存在,仅新增 user 107 及其关联数据
- team 12 + 9 个用户完全不存在 → 全量插入
- team/user ID 保留源 ID无冲突
- loginrecord / loginanomaly / assetgroup / asset / generationrecord / activesession / adminauditlog
让 AUTO_INCREMENT 重新分配 ID维护 old→new 映射
"""
import re
import sys
import pymysql
SOURCE_SQL = '/Users/maidong/Desktop/zyc/研究openclaw/视频生成平台/jimeng-clone/video_auto迁移2个团队的数据.sql'
DB = dict(
host='mysql-8351f937d637-public.rds.volces.com',
port=3306, user='zyc', password='Zyc188208',
database='video_auto', charset='utf8mb4',
autocommit=False,
)
TEAM4_EXISTING_USERS = {19, 20, 21, 22, 23, 24, 25} # 已存在,不动
INCREMENTAL_USERS = {107, 99, 102, 103, 104, 105, 108, 109, 110, 111}
NEW_TEAMS = {12}
def split_values(s):
vals, cur, in_str, i = [], '', False, 0
while i < len(s):
c = s[i]
if c == "'" and (i == 0 or s[i-1] != '\\'):
in_str = not in_str
cur += c
elif c == ',' and not in_str:
vals.append(cur.strip()); cur = ''
else:
cur += c
i += 1
vals.append(cur.strip())
return vals
def parse_table(path, tbl):
rows = []
with open(path, 'r', encoding='utf-8') as f:
for line in f:
if not line.startswith(f'INSERT INTO `{tbl}`'):
continue
m = re.search(r'VALUES \((.*)\);', line)
if not m: continue
rows.append(split_values(m.group(1)))
return rows
def q(v):
"""Value placeholder helper — we already have MySQL-escaped literal strings
from the dump, so we inject them as-is into the SQL."""
return v
def bulk_insert(cur, tbl, cols, rows_values, label):
"""Insert preserving source id (rows_values includes id as first column).
Returns rowcount."""
if not rows_values:
print(f' [{label}] 0 rows')
return 0
placeholders = ','.join(['%s'] * len(cols))
sql = f"INSERT INTO `{tbl}` ({','.join('`'+c+'`' for c in cols)}) VALUES ({placeholders})"
cur.executemany(sql, rows_values)
print(f' [{label}] inserted {cur.rowcount} rows')
return cur.rowcount
def auto_insert_collect_id(cur, tbl, cols_no_id, rows_vals_no_id, src_ids, label):
"""INSERT rows letting AUTO_INCREMENT assign id.
Uses row-by-row insert to map old→new deterministically.
cols_no_id: column list without `id`.
rows_vals_no_id: list of tuples matching cols_no_id.
src_ids: list of source ids (same order as rows_vals_no_id).
Returns dict {old_id: new_id}.
"""
mapping = {}
if not rows_vals_no_id:
print(f' [{label}] 0 rows')
return mapping
placeholders = ','.join(['%s'] * len(cols_no_id))
sql = f"INSERT INTO `{tbl}` ({','.join('`'+c+'`' for c in cols_no_id)}) VALUES ({placeholders})"
for old_id, vals in zip(src_ids, rows_vals_no_id):
cur.execute(sql, vals)
mapping[int(old_id)] = cur.lastrowid
print(f' [{label}] inserted {len(mapping)} rows, id range {min(mapping.values())}-{max(mapping.values())}')
return mapping
def unquote(s):
"""Turn a raw SQL literal like 'foo\\'bar' or NULL into Python value."""
s = s.strip()
if s == 'NULL':
return None
if s.startswith("'") and s.endswith("'"):
inner = s[1:-1]
# MySQL dump uses \' for escaped single quote and \\ for backslash
inner = inner.replace("\\'", "'").replace('\\"', '"').replace('\\\\', '\\').replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t').replace('\\0', '\0')
return inner
# numeric / boolean
return s
def raw_vals_to_py(vals):
return [unquote(v) for v in vals]
def main():
print(f'Loading source SQL: {SOURCE_SQL}')
# --- parse all needed tables ---
teams_all = parse_table(SOURCE_SQL, 'accounts_team')
users_all = parse_table(SOURCE_SQL, 'accounts_user')
agroups_all = parse_table(SOURCE_SQL, 'generation_assetgroup')
assets_all = parse_table(SOURCE_SQL, 'generation_asset')
lrs_all = parse_table(SOURCE_SQL, 'accounts_loginrecord')
las_all = parse_table(SOURCE_SQL, 'accounts_loginanomaly')
ases_all = parse_table(SOURCE_SQL, 'accounts_activesession')
als_all = parse_table(SOURCE_SQL, 'accounts_adminauditlog')
gens_all = parse_table(SOURCE_SQL, 'generation_generationrecord')
# --- filter ---
teams = [r for r in teams_all if int(r[0]) in NEW_TEAMS]
users = [r for r in users_all if int(r[0]) in INCREMENTAL_USERS]
# assetgroup team_id is at index 7
relevant_groups = [r for r in agroups_all if r[7] in ('4', '12')]
group_ids = {r[0] for r in relevant_groups}
# asset group_id at index 7
assets = [r for r in assets_all if r[7] in group_ids]
# loginrecord user_id at index 4
lrs = [r for r in lrs_all if int(r[4]) in INCREMENTAL_USERS]
lr_ids = {r[0] for r in lrs}
# loginanomaly user_id at index 10; login_record_id at index 8
las = [r for r in las_all if int(r[10]) in INCREMENTAL_USERS and r[8] in lr_ids]
# activesession user_id at index 5
ases = [r for r in ases_all if int(r[5]) in INCREMENTAL_USERS]
# adminauditlog operator_id at last (index 10)
als = [r for r in als_all if r[-1] != 'NULL' and r[-1].isdigit() and int(r[-1]) in INCREMENTAL_USERS]
# generationrecord user_id at index 9
gens = [r for r in gens_all if int(r[9]) in INCREMENTAL_USERS]
print(f'Prepared:')
print(f' teams (new) : {len(teams)}')
print(f' users (incremental): {len(users)}')
print(f' assetgroups (T4+T12): {len(relevant_groups)}')
print(f' assets : {len(assets)}')
print(f' loginrecords : {len(lrs)}')
print(f' loginanomalies : {len(las)}')
print(f' activesessions : {len(ases)}')
print(f' adminauditlogs : {len(als)}')
print(f' generationrecords : {len(gens)}')
if '--dry-run' in sys.argv:
print('\n--dry-run: exiting before DB connect')
return
# --- connect ---
print('\nConnecting to target DB...')
conn = pymysql.connect(**DB)
try:
cur = conn.cursor()
cur.execute('SET FOREIGN_KEY_CHECKS = 0')
# 1) accounts_team (id preserved)
print('\n[1/9] accounts_team')
team_cols = ['id','name','total_seconds_pool','total_seconds_used','monthly_seconds_limit',
'daily_member_limit_default','is_active','created_at','updated_at','disabled_by',
'expected_regions','balance','daily_member_spending_default','frozen_amount',
'markup_percentage','monthly_spending_limit','total_spent','max_concurrent_tasks']
bulk_insert(cur, 'accounts_team', team_cols,
[raw_vals_to_py(r) for r in teams], 'team')
# 2) accounts_user (id preserved)
print('\n[2/9] accounts_user')
user_cols = ['id','password','last_login','is_superuser','username','first_name','last_name',
'is_staff','is_active','date_joined','email','created_at','updated_at',
'daily_seconds_limit','monthly_seconds_limit','is_team_admin','team_id',
'must_change_password','disabled_by','daily_generation_limit',
'monthly_generation_limit','spending_limit','last_read_announcement','is_team_owner']
bulk_insert(cur, 'accounts_user', user_cols,
[raw_vals_to_py(r) for r in users], 'user')
# 3) generation_assetgroup (AUTO id)
print('\n[3/9] generation_assetgroup')
ag_cols = ['remote_group_id','name','description','thumbnail_url','created_at','created_by_id','team_id']
# source schema: id=0, remote_group_id=1, name=2, description=3, thumbnail_url=4, created_at=5, created_by_id=6, team_id=7
ag_src_ids = [r[0] for r in relevant_groups]
ag_vals = [raw_vals_to_py(r[1:]) for r in relevant_groups] # strip id
ag_map = auto_insert_collect_id(cur, 'generation_assetgroup', ag_cols, ag_vals, ag_src_ids, 'assetgroup')
# 4) generation_asset (AUTO id, remap group_id)
# 测试库 schema 多 asset_type(NOT NULL)、duration(NULL)、thumbnail_url(NOT NULL)
# asset_type 按 url 后缀推断thumbnail_url 留空串duration 留 NULL
print('\n[4/9] generation_asset')
a_cols = ['remote_asset_id','name','url','status','error_message','created_at','group_id',
'asset_type','duration','thumbnail_url']
a_src_ids = [r[0] for r in assets]
a_vals = []
VIDEO_EXT = ('.mp4','.mov','.avi','.webm','.mkv','.m4v')
for r in assets:
v = raw_vals_to_py(r[1:]) # index 0..6 = remote_asset_id..group_id
# remap group_id (now at index 6)
v[6] = ag_map[int(r[7])]
url_lower = (v[2] or '').lower()
asset_type = 'video' if any(e in url_lower for e in VIDEO_EXT) else 'image'
v.extend([asset_type, None, '']) # asset_type, duration, thumbnail_url
a_vals.append(v)
auto_insert_collect_id(cur, 'generation_asset', a_cols, a_vals, a_src_ids, 'asset')
# 5) accounts_loginrecord (AUTO id)
print('\n[5/9] accounts_loginrecord')
# source schema: id=0, ip_address=1, user_agent=2, created_at=3, user_id=4, geo_city=5, geo_country=6, geo_province=7, geo_source=8, team_id=9
lr_cols = ['ip_address','user_agent','created_at','user_id','geo_city','geo_country','geo_province','geo_source','team_id']
lr_src_ids = [r[0] for r in lrs]
lr_vals = [raw_vals_to_py(r[1:]) for r in lrs]
lr_map = auto_insert_collect_id(cur, 'accounts_loginrecord', lr_cols, lr_vals, lr_src_ids, 'loginrecord')
# 6) accounts_loginanomaly (AUTO id, remap login_record_id)
print('\n[6/9] accounts_loginanomaly')
# source schema: id=0, level=1, rule=2, detail=3, alerted=4, auto_disabled=5, disabled_target=6, created_at=7, login_record_id=8, team_id=9, user_id=10
la_cols = ['level','rule','detail','alerted','auto_disabled','disabled_target','created_at','login_record_id','team_id','user_id']
la_src_ids = [r[0] for r in las]
la_vals = []
for r in las:
v = raw_vals_to_py(r[1:]) # index 0..9 in slice = level..user_id
# login_record_id is at new-index 7
v[7] = lr_map[int(r[8])]
la_vals.append(v)
auto_insert_collect_id(cur, 'accounts_loginanomaly', la_cols, la_vals, la_src_ids, 'loginanomaly')
# 7) accounts_activesession (AUTO id)
print('\n[7/9] accounts_activesession')
# source schema: id=0, session_id=1, device_type=2, user_agent=3, created_at=4, user_id=5
as_cols = ['session_id','device_type','user_agent','created_at','user_id']
as_src_ids = [r[0] for r in ases]
as_vals = [raw_vals_to_py(r[1:]) for r in ases]
auto_insert_collect_id(cur, 'accounts_activesession', as_cols, as_vals, as_src_ids, 'activesession')
# 8) accounts_adminauditlog (AUTO id)
print('\n[8/9] accounts_adminauditlog')
# source schema: id=0, operator_name=1, action=2, target_type=3, target_id=4, target_name=5,
# before=6, after=7, ip_address=8, created_at=9, operator_id=10
al_cols = ['operator_name','action','target_type','target_id','target_name','before','after','ip_address','created_at','operator_id']
al_src_ids = [r[0] for r in als]
al_vals = [raw_vals_to_py(r[1:]) for r in als]
auto_insert_collect_id(cur, 'accounts_adminauditlog', al_cols, al_vals, al_src_ids, 'adminauditlog')
# 9) generation_generationrecord (AUTO id)
print('\n[9/9] generation_generationrecord')
# source schema: id, task_id, prompt, mode, model, aspect_ratio, duration, status, created_at,
# user_id, seconds_consumed, ark_task_id, error_message, reference_urls, result_url,
# base_cost_amount, cost_amount, frozen_amount, resolution, tokens_consumed,
# is_favorited, seed, completed_at, raw_error, updated_at, is_deleted
# 测试库 schema 多 thumbnail_url(NOT NULL) —— 留空串
g_cols = ['task_id','prompt','mode','model','aspect_ratio','duration','status','created_at',
'user_id','seconds_consumed','ark_task_id','error_message','reference_urls','result_url',
'base_cost_amount','cost_amount','frozen_amount','resolution','tokens_consumed',
'is_favorited','seed','completed_at','raw_error','updated_at','is_deleted',
'thumbnail_url']
g_src_ids = [r[0] for r in gens]
g_vals = []
for r in gens:
v = raw_vals_to_py(r[1:])
v.append('') # thumbnail_url
g_vals.append(v)
auto_insert_collect_id(cur, 'generation_generationrecord', g_cols, g_vals, g_src_ids, 'generationrecord')
cur.execute('SET FOREIGN_KEY_CHECKS = 1')
if '--commit' in sys.argv:
conn.commit()
print('\n✅ COMMITTED')
else:
conn.rollback()
print('\n🔎 Rolled back (rerun with --commit to persist)')
except Exception as e:
conn.rollback()
print(f'\n❌ Error: {e}')
raise
finally:
conn.close()
if __name__ == '__main__':
main()