Files
provenance/backend/app/services/cleanup_service.py
T
justin 05d2773e25 Cleanup: infer a missing sex from a known-sex spouse (preview → approve)
Unset sex renders blue (male-colored), which is misleading next to a confirmed
male partner. Add a Cleanup action that proposes the opposite sex for an unset
partner of someone whose sex is set (couples are opposite-sex in practice — a
confirmed-male husband ⇒ a female wife). People whose known partners disagree
are skipped as ambiguous.

It's a preview the user reviews and approves in the Cleanup tool (reusing the
existing gender apply path + audit) — not an autonomous write. Backend:
guess_gender_by_spouse + GET /cleanup/gender/from-spouse. Frontend: an "Infer
from spouse" button feeding the existing proposal list. Test covers
propose-opposite, skip-no-partner, skip-already-set, apply, and re-preview.

Full suite 73 passed; frontend build clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-09 10:59:08 -04:00

338 lines
11 KiB
Python

"""Bulk tree cleanup — preview/apply pairs for common import messes.
Per the project's #1 rule (the assistant proposes, humans approve), each fix has
a *preview* that returns the proposed changes and an *apply* that commits only
the ids/edits the user confirmed. Nothing here mutates without an explicit apply
call carrying the user's selections.
"""
import re
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.enums import RelationshipType
from app.models.event import Event
from app.models.person import Name, Person
from app.models.relationship import Relationship
from app.models.tree import Tree
from app.models.user import User
from app.services import gedcom, privacy
from app.services.audit import record_audit
from app.services.exceptions import Forbidden, NotFound
from app.services.name_gender_data import guess_sex
async def _require_editor(session: AsyncSession, *, actor: User, tree: Tree) -> None:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
async def _persons(session: AsyncSession, tree_id: uuid.UUID) -> list[Person]:
return list(
(
await session.execute(
select(Person).where(Person.tree_id == tree_id, Person.deleted_at.is_(None))
)
).scalars().all()
)
async def _primary_name_by_person(
session: AsyncSession, tree_id: uuid.UUID
) -> dict[uuid.UUID, Name]:
names = (
await session.execute(
select(Name)
.where(Name.tree_id == tree_id, Name.deleted_at.is_(None))
.order_by(Name.is_primary.desc(), Name.sort_order)
)
).scalars().all()
out: dict[uuid.UUID, Name] = {}
for n in names:
out.setdefault(n.person_id, n)
return out
async def _birth_year_by_person(session: AsyncSession, tree_id: uuid.UUID) -> dict[uuid.UUID, int]:
evs = (
await session.execute(
select(Event).where(
Event.tree_id == tree_id,
Event.deleted_at.is_(None),
Event.event_type == "birth",
)
)
).scalars().all()
out: dict[uuid.UUID, int] = {}
for e in evs:
if not e.person_id or e.person_id in out:
continue
y = e.date_start.year if e.date_start else None
if y is None:
ys = gedcom._year(e.date_value)
y = int(ys) if ys else None
if y is not None:
out[e.person_id] = y
return out
def _display(n: Name | None) -> str:
if n is None:
return "Unnamed"
return " ".join(x for x in (n.given, n.surname) if x) or (n.display_name or "Unnamed")
# ---- 1. Mark deceased by birth year -------------------------------------------------
async def preview_deceased(
session: AsyncSession, *, actor: User, tree: Tree, year: int
) -> list[dict]:
await _require_editor(session, actor=actor, tree=tree)
names = await _primary_name_by_person(session, tree.id)
years = await _birth_year_by_person(session, tree.id)
out: list[dict] = []
for p in await _persons(session, tree.id):
if p.is_living is False: # already deceased
continue
by = years.get(p.id)
if by is not None and by <= year:
out.append(
{"person_id": str(p.id), "name": _display(names.get(p.id)), "birth_year": by}
)
out.sort(key=lambda r: r["birth_year"])
return out
async def apply_deceased(
session: AsyncSession, *, actor: User, tree: Tree, person_ids: list[uuid.UUID]
) -> int:
await _require_editor(session, actor=actor, tree=tree)
persons = (
await session.execute(
select(Person).where(
Person.tree_id == tree.id,
Person.deleted_at.is_(None),
Person.id.in_(person_ids),
)
)
).scalars().all()
for p in persons:
p.is_living = False
record_audit(
session,
action="cleanup_deceased",
entity_type="Tree",
entity_id=tree.id,
tree_id=tree.id,
actor_user_id=actor.id,
after={"count": len(persons)},
)
await session.commit()
return len(persons)
# ---- 2. Re-derive gender from a source GEDCOM (matches by name) ----------------------
async def preview_gender(
session: AsyncSession, *, actor: User, tree: Tree, gedcom_text: str
) -> list[dict]:
await _require_editor(session, actor=actor, tree=tree)
name2sex: dict[str, str] = {}
for rec in gedcom.parse_records(gedcom_text):
if rec.tag != "INDI":
continue
summ = gedcom._person_summary(rec)
sex = gedcom._sex(rec.text("SEX"))
if sex and summ["norm"]:
name2sex.setdefault(summ["norm"], sex)
names = await _primary_name_by_person(session, tree.id)
out: list[dict] = []
for p in await _persons(session, tree.id):
if p.gender: # only fill in what's missing
continue
nm = names.get(p.id)
if nm is None:
continue
proposed = name2sex.get(gedcom._norm(nm.given, nm.surname))
if proposed:
out.append({"person_id": str(p.id), "name": _display(nm), "proposed_gender": proposed})
out.sort(key=lambda r: r["name"])
return out
async def guess_gender_by_name(
session: AsyncSession, *, actor: User, tree: Tree
) -> list[dict]:
"""Best-guess sex from the first given name for people who don't have it set,
using the bundled name dictionary. Ambiguous/unknown names are skipped."""
await _require_editor(session, actor=actor, tree=tree)
names = await _primary_name_by_person(session, tree.id)
out: list[dict] = []
for p in await _persons(session, tree.id):
if p.gender:
continue
nm = names.get(p.id)
if nm is None:
continue
proposed = guess_sex(nm.given)
if proposed:
out.append({"person_id": str(p.id), "name": _display(nm), "proposed_gender": proposed})
out.sort(key=lambda r: r["name"])
return out
async def guess_gender_by_spouse(
session: AsyncSession, *, actor: User, tree: Tree
) -> list[dict]:
"""Infer the sex of a person who has none set from a partner whose sex IS set
(couples in a tree are opposite-sex in practice — e.g. a confirmed-male
husband implies a female wife). People whose known partners disagree are
ambiguous and skipped; the result is a preview to review, not an auto-write."""
await _require_editor(session, actor=actor, tree=tree)
persons = await _persons(session, tree.id)
gender = {p.id: p.gender for p in persons}
names = await _primary_name_by_person(session, tree.id)
rels = (
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id,
Relationship.deleted_at.is_(None),
Relationship.type == RelationshipType.partnership,
)
)
).scalars().all()
opp = {"male": "female", "female": "male"}
proposals: dict[uuid.UUID, set[str]] = {}
for r in rels:
for me_id, other_id in (
(r.person_from_id, r.person_to_id),
(r.person_to_id, r.person_from_id),
):
if gender.get(me_id):
continue # this person already has a sex
other_sex = str(gender.get(other_id) or "")
if other_sex in opp:
proposals.setdefault(me_id, set()).add(opp[other_sex])
out: list[dict] = []
for pid, sexes in proposals.items():
if len(sexes) != 1:
continue # partners of differing known sex → ambiguous
nm = names.get(pid)
if nm is None:
continue
out.append(
{"person_id": str(pid), "name": _display(nm), "proposed_gender": next(iter(sexes))}
)
out.sort(key=lambda r: r["name"])
return out
async def apply_gender(
session: AsyncSession, *, actor: User, tree: Tree, updates: list[dict]
) -> int:
"""updates: [{person_id, gender}]."""
await _require_editor(session, actor=actor, tree=tree)
wanted = {uuid.UUID(str(u["person_id"])): u["gender"] for u in updates if u.get("gender")}
persons = (
await session.execute(
select(Person).where(
Person.tree_id == tree.id,
Person.deleted_at.is_(None),
Person.id.in_(wanted.keys()),
)
)
).scalars().all()
for p in persons:
p.gender = wanted[p.id]
record_audit(
session,
action="cleanup_gender",
entity_type="Tree",
entity_id=tree.id,
tree_id=tree.id,
actor_user_id=actor.id,
after={"count": len(persons)},
)
await session.commit()
return len(persons)
# ---- 3. Flag malformed names for review --------------------------------------------
_YEAR_RE = re.compile(r"\b\d{3,4}\b")
def _name_issue(n: Name) -> str | None:
given = (n.given or "").strip()
surname = (n.surname or "").strip()
if _YEAR_RE.search(surname) or re.search(r"\d", surname):
return "date_in_surname"
if re.search(r"\d", given):
return "date_in_given"
# A given name with many tokens often means a maiden+married name was packed
# in (e.g. "Mary Smith Jones") — surface it for a human to split.
if surname == "" and len(given.split()) >= 2:
return "no_surname"
if len(given.split()) >= 3:
return "packed_given"
return None
async def preview_names(session: AsyncSession, *, actor: User, tree: Tree) -> list[dict]:
await _require_editor(session, actor=actor, tree=tree)
names = (
await session.execute(
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
)
).scalars().all()
out: list[dict] = []
for n in names:
issue = _name_issue(n)
if issue:
out.append({
"name_id": str(n.id),
"person_id": str(n.person_id),
"given": n.given,
"surname": n.surname,
"issue": issue,
})
return out
async def apply_names(
session: AsyncSession, *, actor: User, tree: Tree, edits: list[dict]
) -> int:
"""edits: [{name_id, given, surname}] — the user's corrected values."""
await _require_editor(session, actor=actor, tree=tree)
by_id = {uuid.UUID(str(e["name_id"])): e for e in edits}
rows = (
await session.execute(
select(Name).where(
Name.tree_id == tree.id,
Name.deleted_at.is_(None),
Name.id.in_(by_id.keys()),
)
)
).scalars().all()
if len(rows) != len(by_id):
raise NotFound("one or more names not found in this tree")
for n in rows:
e = by_id[n.id]
n.given = (e.get("given") or "").strip() or None
n.surname = (e.get("surname") or "").strip() or None
n.display_name = None # rebuild from parts
record_audit(
session,
action="cleanup_names",
entity_type="Tree",
entity_id=tree.id,
tree_id=tree.id,
actor_user_id=actor.id,
after={"count": len(rows)},
)
await session.commit()
return len(rows)