"""Bulk tree cleanup — preview/apply pairs for common import messes. Per the project's #1 rule (the assistant proposes, humans approve), each fix has a *preview* that returns the proposed changes and an *apply* that commits only the ids/edits the user confirmed. Nothing here mutates without an explicit apply call carrying the user's selections. """ import re import uuid from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.event import Event from app.models.person import Name, Person from app.models.tree import Tree from app.models.user import User from app.services import gedcom, privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden, NotFound async def _require_editor(session: AsyncSession, *, actor: User, tree: Tree) -> None: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") async def _persons(session: AsyncSession, tree_id: uuid.UUID) -> list[Person]: return list( ( await session.execute( select(Person).where(Person.tree_id == tree_id, Person.deleted_at.is_(None)) ) ).scalars().all() ) async def _primary_name_by_person( session: AsyncSession, tree_id: uuid.UUID ) -> dict[uuid.UUID, Name]: names = ( await session.execute( select(Name) .where(Name.tree_id == tree_id, Name.deleted_at.is_(None)) .order_by(Name.is_primary.desc(), Name.sort_order) ) ).scalars().all() out: dict[uuid.UUID, Name] = {} for n in names: out.setdefault(n.person_id, n) return out async def _birth_year_by_person(session: AsyncSession, tree_id: uuid.UUID) -> dict[uuid.UUID, int]: evs = ( await session.execute( select(Event).where( Event.tree_id == tree_id, Event.deleted_at.is_(None), Event.event_type == "birth", ) ) ).scalars().all() out: dict[uuid.UUID, int] = {} for e in evs: if not e.person_id or e.person_id in out: continue y = e.date_start.year if e.date_start else None if y is None: ys = gedcom._year(e.date_value) y = int(ys) if ys else None if y is not None: out[e.person_id] = y return out def _display(n: Name | None) -> str: if n is None: return "Unnamed" return " ".join(x for x in (n.given, n.surname) if x) or (n.display_name or "Unnamed") # ---- 1. Mark deceased by birth year ------------------------------------------------- async def preview_deceased( session: AsyncSession, *, actor: User, tree: Tree, year: int ) -> list[dict]: await _require_editor(session, actor=actor, tree=tree) names = await _primary_name_by_person(session, tree.id) years = await _birth_year_by_person(session, tree.id) out: list[dict] = [] for p in await _persons(session, tree.id): if p.is_living is False: # already deceased continue by = years.get(p.id) if by is not None and by <= year: out.append( {"person_id": str(p.id), "name": _display(names.get(p.id)), "birth_year": by} ) out.sort(key=lambda r: r["birth_year"]) return out async def apply_deceased( session: AsyncSession, *, actor: User, tree: Tree, person_ids: list[uuid.UUID] ) -> int: await _require_editor(session, actor=actor, tree=tree) persons = ( await session.execute( select(Person).where( Person.tree_id == tree.id, Person.deleted_at.is_(None), Person.id.in_(person_ids), ) ) ).scalars().all() for p in persons: p.is_living = False record_audit( session, action="cleanup_deceased", entity_type="Tree", entity_id=tree.id, tree_id=tree.id, actor_user_id=actor.id, after={"count": len(persons)}, ) await session.commit() return len(persons) # ---- 2. Re-derive gender from a source GEDCOM (matches by name) ---------------------- async def preview_gender( session: AsyncSession, *, actor: User, tree: Tree, gedcom_text: str ) -> list[dict]: await _require_editor(session, actor=actor, tree=tree) name2sex: dict[str, str] = {} for rec in gedcom.parse_records(gedcom_text): if rec.tag != "INDI": continue summ = gedcom._person_summary(rec) sex = gedcom._sex(rec.text("SEX")) if sex and summ["norm"]: name2sex.setdefault(summ["norm"], sex) names = await _primary_name_by_person(session, tree.id) out: list[dict] = [] for p in await _persons(session, tree.id): if p.gender: # only fill in what's missing continue nm = names.get(p.id) if nm is None: continue proposed = name2sex.get(gedcom._norm(nm.given, nm.surname)) if proposed: out.append({"person_id": str(p.id), "name": _display(nm), "proposed_gender": proposed}) out.sort(key=lambda r: r["name"]) return out async def apply_gender( session: AsyncSession, *, actor: User, tree: Tree, updates: list[dict] ) -> int: """updates: [{person_id, gender}].""" await _require_editor(session, actor=actor, tree=tree) wanted = {uuid.UUID(str(u["person_id"])): u["gender"] for u in updates if u.get("gender")} persons = ( await session.execute( select(Person).where( Person.tree_id == tree.id, Person.deleted_at.is_(None), Person.id.in_(wanted.keys()), ) ) ).scalars().all() for p in persons: p.gender = wanted[p.id] record_audit( session, action="cleanup_gender", entity_type="Tree", entity_id=tree.id, tree_id=tree.id, actor_user_id=actor.id, after={"count": len(persons)}, ) await session.commit() return len(persons) # ---- 3. Flag malformed names for review -------------------------------------------- _YEAR_RE = re.compile(r"\b\d{3,4}\b") def _name_issue(n: Name) -> str | None: given = (n.given or "").strip() surname = (n.surname or "").strip() if _YEAR_RE.search(surname) or re.search(r"\d", surname): return "date_in_surname" if re.search(r"\d", given): return "date_in_given" # A given name with many tokens often means a maiden+married name was packed # in (e.g. "Mary Smith Jones") — surface it for a human to split. if surname == "" and len(given.split()) >= 2: return "no_surname" if len(given.split()) >= 3: return "packed_given" return None async def preview_names(session: AsyncSession, *, actor: User, tree: Tree) -> list[dict]: await _require_editor(session, actor=actor, tree=tree) names = ( await session.execute( select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None)) ) ).scalars().all() out: list[dict] = [] for n in names: issue = _name_issue(n) if issue: out.append({ "name_id": str(n.id), "person_id": str(n.person_id), "given": n.given, "surname": n.surname, "issue": issue, }) return out async def apply_names( session: AsyncSession, *, actor: User, tree: Tree, edits: list[dict] ) -> int: """edits: [{name_id, given, surname}] — the user's corrected values.""" await _require_editor(session, actor=actor, tree=tree) by_id = {uuid.UUID(str(e["name_id"])): e for e in edits} rows = ( await session.execute( select(Name).where( Name.tree_id == tree.id, Name.deleted_at.is_(None), Name.id.in_(by_id.keys()), ) ) ).scalars().all() if len(rows) != len(by_id): raise NotFound("one or more names not found in this tree") for n in rows: e = by_id[n.id] n.given = (e.get("given") or "").strip() or None n.surname = (e.get("surname") or "").strip() or None n.display_name = None # rebuild from parts record_audit( session, action="cleanup_names", entity_type="Tree", entity_id=tree.id, tree_id=tree.id, actor_user_id=actor.id, after={"count": len(rows)}, ) await session.commit() return len(rows)