296 lines
13 KiB
Python
296 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Migrate two teams (万物苏网络 id=12, 洁雯团队 id=4 增量 user 107) from
|
||
source SQL dump → 测试库 MySQL (mysql-8351f937d637-public.rds.volces.com).
|
||
|
||
策略:
|
||
- team 4 已存在,仅新增 user 107 及其关联数据
|
||
- team 12 + 9 个用户完全不存在 → 全量插入
|
||
- team/user ID 保留源 ID(无冲突)
|
||
- loginrecord / loginanomaly / assetgroup / asset / generationrecord / activesession / adminauditlog
|
||
让 AUTO_INCREMENT 重新分配 ID,维护 old→new 映射
|
||
"""
|
||
import re
|
||
import sys
|
||
import pymysql
|
||
|
||
SOURCE_SQL = '/Users/maidong/Desktop/zyc/研究openclaw/视频生成平台/jimeng-clone/video_auto迁移2个团队的数据.sql'
|
||
|
||
DB = dict(
|
||
host='mysql-8351f937d637-public.rds.volces.com',
|
||
port=3306, user='zyc', password='Zyc188208',
|
||
database='video_auto', charset='utf8mb4',
|
||
autocommit=False,
|
||
)
|
||
|
||
TEAM4_EXISTING_USERS = {19, 20, 21, 22, 23, 24, 25} # 已存在,不动
|
||
INCREMENTAL_USERS = {107, 99, 102, 103, 104, 105, 108, 109, 110, 111}
|
||
NEW_TEAMS = {12}
|
||
|
||
|
||
def split_values(s):
|
||
vals, cur, in_str, i = [], '', False, 0
|
||
while i < len(s):
|
||
c = s[i]
|
||
if c == "'" and (i == 0 or s[i-1] != '\\'):
|
||
in_str = not in_str
|
||
cur += c
|
||
elif c == ',' and not in_str:
|
||
vals.append(cur.strip()); cur = ''
|
||
else:
|
||
cur += c
|
||
i += 1
|
||
vals.append(cur.strip())
|
||
return vals
|
||
|
||
|
||
def parse_table(path, tbl):
|
||
rows = []
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
if not line.startswith(f'INSERT INTO `{tbl}`'):
|
||
continue
|
||
m = re.search(r'VALUES \((.*)\);', line)
|
||
if not m: continue
|
||
rows.append(split_values(m.group(1)))
|
||
return rows
|
||
|
||
|
||
def q(v):
|
||
"""Value placeholder helper — we already have MySQL-escaped literal strings
|
||
from the dump, so we inject them as-is into the SQL."""
|
||
return v
|
||
|
||
|
||
def bulk_insert(cur, tbl, cols, rows_values, label):
|
||
"""Insert preserving source id (rows_values includes id as first column).
|
||
Returns rowcount."""
|
||
if not rows_values:
|
||
print(f' [{label}] 0 rows')
|
||
return 0
|
||
placeholders = ','.join(['%s'] * len(cols))
|
||
sql = f"INSERT INTO `{tbl}` ({','.join('`'+c+'`' for c in cols)}) VALUES ({placeholders})"
|
||
cur.executemany(sql, rows_values)
|
||
print(f' [{label}] inserted {cur.rowcount} rows')
|
||
return cur.rowcount
|
||
|
||
|
||
def auto_insert_collect_id(cur, tbl, cols_no_id, rows_vals_no_id, src_ids, label):
|
||
"""INSERT rows letting AUTO_INCREMENT assign id.
|
||
Uses row-by-row insert to map old→new deterministically.
|
||
cols_no_id: column list without `id`.
|
||
rows_vals_no_id: list of tuples matching cols_no_id.
|
||
src_ids: list of source ids (same order as rows_vals_no_id).
|
||
Returns dict {old_id: new_id}.
|
||
"""
|
||
mapping = {}
|
||
if not rows_vals_no_id:
|
||
print(f' [{label}] 0 rows')
|
||
return mapping
|
||
placeholders = ','.join(['%s'] * len(cols_no_id))
|
||
sql = f"INSERT INTO `{tbl}` ({','.join('`'+c+'`' for c in cols_no_id)}) VALUES ({placeholders})"
|
||
for old_id, vals in zip(src_ids, rows_vals_no_id):
|
||
cur.execute(sql, vals)
|
||
mapping[int(old_id)] = cur.lastrowid
|
||
print(f' [{label}] inserted {len(mapping)} rows, id range {min(mapping.values())}-{max(mapping.values())}')
|
||
return mapping
|
||
|
||
|
||
def unquote(s):
|
||
"""Turn a raw SQL literal like 'foo\\'bar' or NULL into Python value."""
|
||
s = s.strip()
|
||
if s == 'NULL':
|
||
return None
|
||
if s.startswith("'") and s.endswith("'"):
|
||
inner = s[1:-1]
|
||
# MySQL dump uses \' for escaped single quote and \\ for backslash
|
||
inner = inner.replace("\\'", "'").replace('\\"', '"').replace('\\\\', '\\').replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t').replace('\\0', '\0')
|
||
return inner
|
||
# numeric / boolean
|
||
return s
|
||
|
||
|
||
def raw_vals_to_py(vals):
|
||
return [unquote(v) for v in vals]
|
||
|
||
|
||
def main():
|
||
print(f'Loading source SQL: {SOURCE_SQL}')
|
||
|
||
# --- parse all needed tables ---
|
||
teams_all = parse_table(SOURCE_SQL, 'accounts_team')
|
||
users_all = parse_table(SOURCE_SQL, 'accounts_user')
|
||
agroups_all = parse_table(SOURCE_SQL, 'generation_assetgroup')
|
||
assets_all = parse_table(SOURCE_SQL, 'generation_asset')
|
||
lrs_all = parse_table(SOURCE_SQL, 'accounts_loginrecord')
|
||
las_all = parse_table(SOURCE_SQL, 'accounts_loginanomaly')
|
||
ases_all = parse_table(SOURCE_SQL, 'accounts_activesession')
|
||
als_all = parse_table(SOURCE_SQL, 'accounts_adminauditlog')
|
||
gens_all = parse_table(SOURCE_SQL, 'generation_generationrecord')
|
||
|
||
# --- filter ---
|
||
teams = [r for r in teams_all if int(r[0]) in NEW_TEAMS]
|
||
users = [r for r in users_all if int(r[0]) in INCREMENTAL_USERS]
|
||
# assetgroup team_id is at index 7
|
||
relevant_groups = [r for r in agroups_all if r[7] in ('4', '12')]
|
||
group_ids = {r[0] for r in relevant_groups}
|
||
# asset group_id at index 7
|
||
assets = [r for r in assets_all if r[7] in group_ids]
|
||
# loginrecord user_id at index 4
|
||
lrs = [r for r in lrs_all if int(r[4]) in INCREMENTAL_USERS]
|
||
lr_ids = {r[0] for r in lrs}
|
||
# loginanomaly user_id at index 10; login_record_id at index 8
|
||
las = [r for r in las_all if int(r[10]) in INCREMENTAL_USERS and r[8] in lr_ids]
|
||
# activesession user_id at index 5
|
||
ases = [r for r in ases_all if int(r[5]) in INCREMENTAL_USERS]
|
||
# adminauditlog operator_id at last (index 10)
|
||
als = [r for r in als_all if r[-1] != 'NULL' and r[-1].isdigit() and int(r[-1]) in INCREMENTAL_USERS]
|
||
# generationrecord user_id at index 9
|
||
gens = [r for r in gens_all if int(r[9]) in INCREMENTAL_USERS]
|
||
|
||
print(f'Prepared:')
|
||
print(f' teams (new) : {len(teams)}')
|
||
print(f' users (incremental): {len(users)}')
|
||
print(f' assetgroups (T4+T12): {len(relevant_groups)}')
|
||
print(f' assets : {len(assets)}')
|
||
print(f' loginrecords : {len(lrs)}')
|
||
print(f' loginanomalies : {len(las)}')
|
||
print(f' activesessions : {len(ases)}')
|
||
print(f' adminauditlogs : {len(als)}')
|
||
print(f' generationrecords : {len(gens)}')
|
||
|
||
if '--dry-run' in sys.argv:
|
||
print('\n--dry-run: exiting before DB connect')
|
||
return
|
||
|
||
# --- connect ---
|
||
print('\nConnecting to target DB...')
|
||
conn = pymysql.connect(**DB)
|
||
try:
|
||
cur = conn.cursor()
|
||
cur.execute('SET FOREIGN_KEY_CHECKS = 0')
|
||
|
||
# 1) accounts_team (id preserved)
|
||
print('\n[1/9] accounts_team')
|
||
team_cols = ['id','name','total_seconds_pool','total_seconds_used','monthly_seconds_limit',
|
||
'daily_member_limit_default','is_active','created_at','updated_at','disabled_by',
|
||
'expected_regions','balance','daily_member_spending_default','frozen_amount',
|
||
'markup_percentage','monthly_spending_limit','total_spent','max_concurrent_tasks']
|
||
bulk_insert(cur, 'accounts_team', team_cols,
|
||
[raw_vals_to_py(r) for r in teams], 'team')
|
||
|
||
# 2) accounts_user (id preserved)
|
||
print('\n[2/9] accounts_user')
|
||
user_cols = ['id','password','last_login','is_superuser','username','first_name','last_name',
|
||
'is_staff','is_active','date_joined','email','created_at','updated_at',
|
||
'daily_seconds_limit','monthly_seconds_limit','is_team_admin','team_id',
|
||
'must_change_password','disabled_by','daily_generation_limit',
|
||
'monthly_generation_limit','spending_limit','last_read_announcement','is_team_owner']
|
||
bulk_insert(cur, 'accounts_user', user_cols,
|
||
[raw_vals_to_py(r) for r in users], 'user')
|
||
|
||
# 3) generation_assetgroup (AUTO id)
|
||
print('\n[3/9] generation_assetgroup')
|
||
ag_cols = ['remote_group_id','name','description','thumbnail_url','created_at','created_by_id','team_id']
|
||
# source schema: id=0, remote_group_id=1, name=2, description=3, thumbnail_url=4, created_at=5, created_by_id=6, team_id=7
|
||
ag_src_ids = [r[0] for r in relevant_groups]
|
||
ag_vals = [raw_vals_to_py(r[1:]) for r in relevant_groups] # strip id
|
||
ag_map = auto_insert_collect_id(cur, 'generation_assetgroup', ag_cols, ag_vals, ag_src_ids, 'assetgroup')
|
||
|
||
# 4) generation_asset (AUTO id, remap group_id)
|
||
# 测试库 schema 多 asset_type(NOT NULL)、duration(NULL)、thumbnail_url(NOT NULL)
|
||
# asset_type 按 url 后缀推断;thumbnail_url 留空串;duration 留 NULL
|
||
print('\n[4/9] generation_asset')
|
||
a_cols = ['remote_asset_id','name','url','status','error_message','created_at','group_id',
|
||
'asset_type','duration','thumbnail_url']
|
||
a_src_ids = [r[0] for r in assets]
|
||
a_vals = []
|
||
VIDEO_EXT = ('.mp4','.mov','.avi','.webm','.mkv','.m4v')
|
||
for r in assets:
|
||
v = raw_vals_to_py(r[1:]) # index 0..6 = remote_asset_id..group_id
|
||
# remap group_id (now at index 6)
|
||
v[6] = ag_map[int(r[7])]
|
||
url_lower = (v[2] or '').lower()
|
||
asset_type = 'video' if any(e in url_lower for e in VIDEO_EXT) else 'image'
|
||
v.extend([asset_type, None, '']) # asset_type, duration, thumbnail_url
|
||
a_vals.append(v)
|
||
auto_insert_collect_id(cur, 'generation_asset', a_cols, a_vals, a_src_ids, 'asset')
|
||
|
||
# 5) accounts_loginrecord (AUTO id)
|
||
print('\n[5/9] accounts_loginrecord')
|
||
# source schema: id=0, ip_address=1, user_agent=2, created_at=3, user_id=4, geo_city=5, geo_country=6, geo_province=7, geo_source=8, team_id=9
|
||
lr_cols = ['ip_address','user_agent','created_at','user_id','geo_city','geo_country','geo_province','geo_source','team_id']
|
||
lr_src_ids = [r[0] for r in lrs]
|
||
lr_vals = [raw_vals_to_py(r[1:]) for r in lrs]
|
||
lr_map = auto_insert_collect_id(cur, 'accounts_loginrecord', lr_cols, lr_vals, lr_src_ids, 'loginrecord')
|
||
|
||
# 6) accounts_loginanomaly (AUTO id, remap login_record_id)
|
||
print('\n[6/9] accounts_loginanomaly')
|
||
# source schema: id=0, level=1, rule=2, detail=3, alerted=4, auto_disabled=5, disabled_target=6, created_at=7, login_record_id=8, team_id=9, user_id=10
|
||
la_cols = ['level','rule','detail','alerted','auto_disabled','disabled_target','created_at','login_record_id','team_id','user_id']
|
||
la_src_ids = [r[0] for r in las]
|
||
la_vals = []
|
||
for r in las:
|
||
v = raw_vals_to_py(r[1:]) # index 0..9 in slice = level..user_id
|
||
# login_record_id is at new-index 7
|
||
v[7] = lr_map[int(r[8])]
|
||
la_vals.append(v)
|
||
auto_insert_collect_id(cur, 'accounts_loginanomaly', la_cols, la_vals, la_src_ids, 'loginanomaly')
|
||
|
||
# 7) accounts_activesession (AUTO id)
|
||
print('\n[7/9] accounts_activesession')
|
||
# source schema: id=0, session_id=1, device_type=2, user_agent=3, created_at=4, user_id=5
|
||
as_cols = ['session_id','device_type','user_agent','created_at','user_id']
|
||
as_src_ids = [r[0] for r in ases]
|
||
as_vals = [raw_vals_to_py(r[1:]) for r in ases]
|
||
auto_insert_collect_id(cur, 'accounts_activesession', as_cols, as_vals, as_src_ids, 'activesession')
|
||
|
||
# 8) accounts_adminauditlog (AUTO id)
|
||
print('\n[8/9] accounts_adminauditlog')
|
||
# source schema: id=0, operator_name=1, action=2, target_type=3, target_id=4, target_name=5,
|
||
# before=6, after=7, ip_address=8, created_at=9, operator_id=10
|
||
al_cols = ['operator_name','action','target_type','target_id','target_name','before','after','ip_address','created_at','operator_id']
|
||
al_src_ids = [r[0] for r in als]
|
||
al_vals = [raw_vals_to_py(r[1:]) for r in als]
|
||
auto_insert_collect_id(cur, 'accounts_adminauditlog', al_cols, al_vals, al_src_ids, 'adminauditlog')
|
||
|
||
# 9) generation_generationrecord (AUTO id)
|
||
print('\n[9/9] generation_generationrecord')
|
||
# source schema: id, task_id, prompt, mode, model, aspect_ratio, duration, status, created_at,
|
||
# user_id, seconds_consumed, ark_task_id, error_message, reference_urls, result_url,
|
||
# base_cost_amount, cost_amount, frozen_amount, resolution, tokens_consumed,
|
||
# is_favorited, seed, completed_at, raw_error, updated_at, is_deleted
|
||
# 测试库 schema 多 thumbnail_url(NOT NULL) —— 留空串
|
||
g_cols = ['task_id','prompt','mode','model','aspect_ratio','duration','status','created_at',
|
||
'user_id','seconds_consumed','ark_task_id','error_message','reference_urls','result_url',
|
||
'base_cost_amount','cost_amount','frozen_amount','resolution','tokens_consumed',
|
||
'is_favorited','seed','completed_at','raw_error','updated_at','is_deleted',
|
||
'thumbnail_url']
|
||
g_src_ids = [r[0] for r in gens]
|
||
g_vals = []
|
||
for r in gens:
|
||
v = raw_vals_to_py(r[1:])
|
||
v.append('') # thumbnail_url
|
||
g_vals.append(v)
|
||
auto_insert_collect_id(cur, 'generation_generationrecord', g_cols, g_vals, g_src_ids, 'generationrecord')
|
||
|
||
cur.execute('SET FOREIGN_KEY_CHECKS = 1')
|
||
|
||
if '--commit' in sys.argv:
|
||
conn.commit()
|
||
print('\n✅ COMMITTED')
|
||
else:
|
||
conn.rollback()
|
||
print('\n🔎 Rolled back (rerun with --commit to persist)')
|
||
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f'\n❌ Error: {e}')
|
||
raise
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|