6ec852a23a
A "Guess from first name" option in the Cleanup gender section: a bundled, curated given-name -> sex dictionary (weighted English + German for the first real tree) proposes sex for people who don't have it set. Deterministic, offline, no model. Genuinely ambiguous names (Marion, Frances, Jordan, …) are excluded from both sets so they're left for a human. Reuses the existing preview/apply gender flow, so every guess is reviewed before saving. No migration. 56 backend tests pass; frontend builds. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
290 lines
9.2 KiB
Python
290 lines
9.2 KiB
Python
"""Bulk tree cleanup — preview/apply pairs for common import messes.
|
|
|
|
Per the project's #1 rule (the assistant proposes, humans approve), each fix has
|
|
a *preview* that returns the proposed changes and an *apply* that commits only
|
|
the ids/edits the user confirmed. Nothing here mutates without an explicit apply
|
|
call carrying the user's selections.
|
|
"""
|
|
|
|
import re
|
|
import uuid
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.event import Event
|
|
from app.models.person import Name, Person
|
|
from app.models.tree import Tree
|
|
from app.models.user import User
|
|
from app.services import gedcom, privacy
|
|
from app.services.audit import record_audit
|
|
from app.services.exceptions import Forbidden, NotFound
|
|
from app.services.name_gender_data import guess_sex
|
|
|
|
|
|
async def _require_editor(session: AsyncSession, *, actor: User, tree: Tree) -> None:
|
|
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
|
raise Forbidden("not an editor of this tree")
|
|
|
|
|
|
async def _persons(session: AsyncSession, tree_id: uuid.UUID) -> list[Person]:
|
|
return list(
|
|
(
|
|
await session.execute(
|
|
select(Person).where(Person.tree_id == tree_id, Person.deleted_at.is_(None))
|
|
)
|
|
).scalars().all()
|
|
)
|
|
|
|
|
|
async def _primary_name_by_person(
|
|
session: AsyncSession, tree_id: uuid.UUID
|
|
) -> dict[uuid.UUID, Name]:
|
|
names = (
|
|
await session.execute(
|
|
select(Name)
|
|
.where(Name.tree_id == tree_id, Name.deleted_at.is_(None))
|
|
.order_by(Name.is_primary.desc(), Name.sort_order)
|
|
)
|
|
).scalars().all()
|
|
out: dict[uuid.UUID, Name] = {}
|
|
for n in names:
|
|
out.setdefault(n.person_id, n)
|
|
return out
|
|
|
|
|
|
async def _birth_year_by_person(session: AsyncSession, tree_id: uuid.UUID) -> dict[uuid.UUID, int]:
|
|
evs = (
|
|
await session.execute(
|
|
select(Event).where(
|
|
Event.tree_id == tree_id,
|
|
Event.deleted_at.is_(None),
|
|
Event.event_type == "birth",
|
|
)
|
|
)
|
|
).scalars().all()
|
|
out: dict[uuid.UUID, int] = {}
|
|
for e in evs:
|
|
if not e.person_id or e.person_id in out:
|
|
continue
|
|
y = e.date_start.year if e.date_start else None
|
|
if y is None:
|
|
ys = gedcom._year(e.date_value)
|
|
y = int(ys) if ys else None
|
|
if y is not None:
|
|
out[e.person_id] = y
|
|
return out
|
|
|
|
|
|
def _display(n: Name | None) -> str:
|
|
if n is None:
|
|
return "Unnamed"
|
|
return " ".join(x for x in (n.given, n.surname) if x) or (n.display_name or "Unnamed")
|
|
|
|
|
|
# ---- 1. Mark deceased by birth year -------------------------------------------------
|
|
|
|
async def preview_deceased(
|
|
session: AsyncSession, *, actor: User, tree: Tree, year: int
|
|
) -> list[dict]:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
years = await _birth_year_by_person(session, tree.id)
|
|
out: list[dict] = []
|
|
for p in await _persons(session, tree.id):
|
|
if p.is_living is False: # already deceased
|
|
continue
|
|
by = years.get(p.id)
|
|
if by is not None and by <= year:
|
|
out.append(
|
|
{"person_id": str(p.id), "name": _display(names.get(p.id)), "birth_year": by}
|
|
)
|
|
out.sort(key=lambda r: r["birth_year"])
|
|
return out
|
|
|
|
|
|
async def apply_deceased(
|
|
session: AsyncSession, *, actor: User, tree: Tree, person_ids: list[uuid.UUID]
|
|
) -> int:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
persons = (
|
|
await session.execute(
|
|
select(Person).where(
|
|
Person.tree_id == tree.id,
|
|
Person.deleted_at.is_(None),
|
|
Person.id.in_(person_ids),
|
|
)
|
|
)
|
|
).scalars().all()
|
|
for p in persons:
|
|
p.is_living = False
|
|
record_audit(
|
|
session,
|
|
action="cleanup_deceased",
|
|
entity_type="Tree",
|
|
entity_id=tree.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after={"count": len(persons)},
|
|
)
|
|
await session.commit()
|
|
return len(persons)
|
|
|
|
|
|
# ---- 2. Re-derive gender from a source GEDCOM (matches by name) ----------------------
|
|
|
|
async def preview_gender(
|
|
session: AsyncSession, *, actor: User, tree: Tree, gedcom_text: str
|
|
) -> list[dict]:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
name2sex: dict[str, str] = {}
|
|
for rec in gedcom.parse_records(gedcom_text):
|
|
if rec.tag != "INDI":
|
|
continue
|
|
summ = gedcom._person_summary(rec)
|
|
sex = gedcom._sex(rec.text("SEX"))
|
|
if sex and summ["norm"]:
|
|
name2sex.setdefault(summ["norm"], sex)
|
|
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
out: list[dict] = []
|
|
for p in await _persons(session, tree.id):
|
|
if p.gender: # only fill in what's missing
|
|
continue
|
|
nm = names.get(p.id)
|
|
if nm is None:
|
|
continue
|
|
proposed = name2sex.get(gedcom._norm(nm.given, nm.surname))
|
|
if proposed:
|
|
out.append({"person_id": str(p.id), "name": _display(nm), "proposed_gender": proposed})
|
|
out.sort(key=lambda r: r["name"])
|
|
return out
|
|
|
|
|
|
async def guess_gender_by_name(
|
|
session: AsyncSession, *, actor: User, tree: Tree
|
|
) -> list[dict]:
|
|
"""Best-guess sex from the first given name for people who don't have it set,
|
|
using the bundled name dictionary. Ambiguous/unknown names are skipped."""
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
out: list[dict] = []
|
|
for p in await _persons(session, tree.id):
|
|
if p.gender:
|
|
continue
|
|
nm = names.get(p.id)
|
|
if nm is None:
|
|
continue
|
|
proposed = guess_sex(nm.given)
|
|
if proposed:
|
|
out.append({"person_id": str(p.id), "name": _display(nm), "proposed_gender": proposed})
|
|
out.sort(key=lambda r: r["name"])
|
|
return out
|
|
|
|
|
|
async def apply_gender(
|
|
session: AsyncSession, *, actor: User, tree: Tree, updates: list[dict]
|
|
) -> int:
|
|
"""updates: [{person_id, gender}]."""
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
wanted = {uuid.UUID(str(u["person_id"])): u["gender"] for u in updates if u.get("gender")}
|
|
persons = (
|
|
await session.execute(
|
|
select(Person).where(
|
|
Person.tree_id == tree.id,
|
|
Person.deleted_at.is_(None),
|
|
Person.id.in_(wanted.keys()),
|
|
)
|
|
)
|
|
).scalars().all()
|
|
for p in persons:
|
|
p.gender = wanted[p.id]
|
|
record_audit(
|
|
session,
|
|
action="cleanup_gender",
|
|
entity_type="Tree",
|
|
entity_id=tree.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after={"count": len(persons)},
|
|
)
|
|
await session.commit()
|
|
return len(persons)
|
|
|
|
|
|
# ---- 3. Flag malformed names for review --------------------------------------------
|
|
|
|
_YEAR_RE = re.compile(r"\b\d{3,4}\b")
|
|
|
|
|
|
def _name_issue(n: Name) -> str | None:
|
|
given = (n.given or "").strip()
|
|
surname = (n.surname or "").strip()
|
|
if _YEAR_RE.search(surname) or re.search(r"\d", surname):
|
|
return "date_in_surname"
|
|
if re.search(r"\d", given):
|
|
return "date_in_given"
|
|
# A given name with many tokens often means a maiden+married name was packed
|
|
# in (e.g. "Mary Smith Jones") — surface it for a human to split.
|
|
if surname == "" and len(given.split()) >= 2:
|
|
return "no_surname"
|
|
if len(given.split()) >= 3:
|
|
return "packed_given"
|
|
return None
|
|
|
|
|
|
async def preview_names(session: AsyncSession, *, actor: User, tree: Tree) -> list[dict]:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
names = (
|
|
await session.execute(
|
|
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
|
|
)
|
|
).scalars().all()
|
|
out: list[dict] = []
|
|
for n in names:
|
|
issue = _name_issue(n)
|
|
if issue:
|
|
out.append({
|
|
"name_id": str(n.id),
|
|
"person_id": str(n.person_id),
|
|
"given": n.given,
|
|
"surname": n.surname,
|
|
"issue": issue,
|
|
})
|
|
return out
|
|
|
|
|
|
async def apply_names(
|
|
session: AsyncSession, *, actor: User, tree: Tree, edits: list[dict]
|
|
) -> int:
|
|
"""edits: [{name_id, given, surname}] — the user's corrected values."""
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
by_id = {uuid.UUID(str(e["name_id"])): e for e in edits}
|
|
rows = (
|
|
await session.execute(
|
|
select(Name).where(
|
|
Name.tree_id == tree.id,
|
|
Name.deleted_at.is_(None),
|
|
Name.id.in_(by_id.keys()),
|
|
)
|
|
)
|
|
).scalars().all()
|
|
if len(rows) != len(by_id):
|
|
raise NotFound("one or more names not found in this tree")
|
|
for n in rows:
|
|
e = by_id[n.id]
|
|
n.given = (e.get("given") or "").strip() or None
|
|
n.surname = (e.get("surname") or "").strip() or None
|
|
n.display_name = None # rebuild from parts
|
|
record_audit(
|
|
session,
|
|
action="cleanup_names",
|
|
entity_type="Tree",
|
|
entity_id=tree.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after={"count": len(rows)},
|
|
)
|
|
await session.commit()
|
|
return len(rows)
|