1340d1957f
Adds a preview/apply rule to the Cleanup tool for parents who have NO birth date
of their own (so the existing born-on-or-before rule can't reach them) but who
have a child born long ago — they're necessarily deceased. This is the gap that
left ~56 parents in the Paul tree as "unknown".
- cleanup_service.preview_deceased_by_child(year): parents of any child born
on/before the cutoff, excluding already-deceased; returns child_birth_year.
- GET /trees/{id}/cleanup/deceased-by-child?born_on_or_before=1900. Apply reuses
the existing POST .../cleanup/deceased (same audited mark-deceased path).
- Frontend: a new card in the Cleanup tool (year input → preview → select →
apply), preview-first like the rest of the tool.
Test covers preview (finds the no-birthdate parent of a pre-cutoff child,
excludes modern-child parents), child_birth_year, apply, and re-preview drop.
Suite 106 passing.
Signed-off-by: Justin Paul <justin@jpaul.me>
383 lines
13 KiB
Python
383 lines
13 KiB
Python
"""Bulk tree cleanup — preview/apply pairs for common import messes.
|
|
|
|
Per the project's #1 rule (the assistant proposes, humans approve), each fix has
|
|
a *preview* that returns the proposed changes and an *apply* that commits only
|
|
the ids/edits the user confirmed. Nothing here mutates without an explicit apply
|
|
call carrying the user's selections.
|
|
"""
|
|
|
|
import re
|
|
import uuid
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.enums import RelationshipType
|
|
from app.models.event import Event
|
|
from app.models.person import Name, Person
|
|
from app.models.relationship import Relationship
|
|
from app.models.tree import Tree
|
|
from app.models.user import User
|
|
from app.services import gedcom, privacy
|
|
from app.services.audit import record_audit
|
|
from app.services.exceptions import Forbidden, NotFound
|
|
from app.services.name_gender_data import guess_sex
|
|
|
|
|
|
async def _require_editor(session: AsyncSession, *, actor: User, tree: Tree) -> None:
|
|
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
|
raise Forbidden("not an editor of this tree")
|
|
|
|
|
|
async def _persons(session: AsyncSession, tree_id: uuid.UUID) -> list[Person]:
|
|
return list(
|
|
(
|
|
await session.execute(
|
|
select(Person).where(Person.tree_id == tree_id, Person.deleted_at.is_(None))
|
|
)
|
|
).scalars().all()
|
|
)
|
|
|
|
|
|
async def _primary_name_by_person(
|
|
session: AsyncSession, tree_id: uuid.UUID
|
|
) -> dict[uuid.UUID, Name]:
|
|
names = (
|
|
await session.execute(
|
|
select(Name)
|
|
.where(Name.tree_id == tree_id, Name.deleted_at.is_(None))
|
|
.order_by(Name.is_primary.desc(), Name.sort_order)
|
|
)
|
|
).scalars().all()
|
|
out: dict[uuid.UUID, Name] = {}
|
|
for n in names:
|
|
out.setdefault(n.person_id, n)
|
|
return out
|
|
|
|
|
|
async def _birth_year_by_person(session: AsyncSession, tree_id: uuid.UUID) -> dict[uuid.UUID, int]:
|
|
evs = (
|
|
await session.execute(
|
|
select(Event).where(
|
|
Event.tree_id == tree_id,
|
|
Event.deleted_at.is_(None),
|
|
Event.event_type == "birth",
|
|
)
|
|
)
|
|
).scalars().all()
|
|
out: dict[uuid.UUID, int] = {}
|
|
for e in evs:
|
|
if not e.person_id or e.person_id in out:
|
|
continue
|
|
y = e.date_start.year if e.date_start else None
|
|
if y is None:
|
|
ys = gedcom._year(e.date_value)
|
|
y = int(ys) if ys else None
|
|
if y is not None:
|
|
out[e.person_id] = y
|
|
return out
|
|
|
|
|
|
def _display(n: Name | None) -> str:
|
|
if n is None:
|
|
return "Unnamed"
|
|
return " ".join(x for x in (n.given, n.surname) if x) or (n.display_name or "Unnamed")
|
|
|
|
|
|
# ---- 1. Mark deceased by birth year -------------------------------------------------
|
|
|
|
async def preview_deceased(
|
|
session: AsyncSession, *, actor: User, tree: Tree, year: int
|
|
) -> list[dict]:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
years = await _birth_year_by_person(session, tree.id)
|
|
out: list[dict] = []
|
|
for p in await _persons(session, tree.id):
|
|
if p.is_living is False: # already deceased
|
|
continue
|
|
by = years.get(p.id)
|
|
if by is not None and by <= year:
|
|
out.append(
|
|
{"person_id": str(p.id), "name": _display(names.get(p.id)), "birth_year": by}
|
|
)
|
|
out.sort(key=lambda r: r["birth_year"])
|
|
return out
|
|
|
|
|
|
async def apply_deceased(
|
|
session: AsyncSession, *, actor: User, tree: Tree, person_ids: list[uuid.UUID]
|
|
) -> int:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
persons = (
|
|
await session.execute(
|
|
select(Person).where(
|
|
Person.tree_id == tree.id,
|
|
Person.deleted_at.is_(None),
|
|
Person.id.in_(person_ids),
|
|
)
|
|
)
|
|
).scalars().all()
|
|
for p in persons:
|
|
p.is_living = False
|
|
record_audit(
|
|
session,
|
|
action="cleanup_deceased",
|
|
entity_type="Tree",
|
|
entity_id=tree.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after={"count": len(persons)},
|
|
)
|
|
await session.commit()
|
|
return len(persons)
|
|
|
|
|
|
# ---- 1b. Mark deceased by a CHILD's birth year -------------------------------------
|
|
# For parents whose own birth date is missing (so the birth-year rule can't reach
|
|
# them) but who have a child born long ago — they're necessarily deceased. Applies
|
|
# through the same apply_deceased() path.
|
|
|
|
async def preview_deceased_by_child(
|
|
session: AsyncSession, *, actor: User, tree: Tree, year: int
|
|
) -> list[dict]:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
years = await _birth_year_by_person(session, tree.id)
|
|
rels = (
|
|
await session.execute(
|
|
select(Relationship).where(
|
|
Relationship.tree_id == tree.id,
|
|
Relationship.deleted_at.is_(None),
|
|
Relationship.type == RelationshipType.parent_child,
|
|
)
|
|
)
|
|
).scalars().all()
|
|
# parent id -> earliest child birth year, among children born on/before `year`.
|
|
earliest_child: dict[uuid.UUID, int] = {}
|
|
for r in rels:
|
|
cy = years.get(r.person_to_id) # the child's birth year
|
|
if cy is None or cy > year:
|
|
continue
|
|
if r.person_from_id not in earliest_child or cy < earliest_child[r.person_from_id]:
|
|
earliest_child[r.person_from_id] = cy
|
|
persons = {p.id: p for p in await _persons(session, tree.id)}
|
|
out: list[dict] = []
|
|
for parent_id, cy in earliest_child.items():
|
|
p = persons.get(parent_id)
|
|
if p is None or p.is_living is False: # gone or already deceased
|
|
continue
|
|
out.append(
|
|
{
|
|
"person_id": str(parent_id),
|
|
"name": _display(names.get(parent_id)),
|
|
"child_birth_year": cy,
|
|
}
|
|
)
|
|
out.sort(key=lambda r: r["child_birth_year"])
|
|
return out
|
|
|
|
|
|
# ---- 2. Re-derive gender from a source GEDCOM (matches by name) ----------------------
|
|
|
|
async def preview_gender(
|
|
session: AsyncSession, *, actor: User, tree: Tree, gedcom_text: str
|
|
) -> list[dict]:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
name2sex: dict[str, str] = {}
|
|
for rec in gedcom.parse_records(gedcom_text):
|
|
if rec.tag != "INDI":
|
|
continue
|
|
summ = gedcom._person_summary(rec)
|
|
sex = gedcom._sex(rec.text("SEX"))
|
|
if sex and summ["norm"]:
|
|
name2sex.setdefault(summ["norm"], sex)
|
|
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
out: list[dict] = []
|
|
for p in await _persons(session, tree.id):
|
|
if p.gender: # only fill in what's missing
|
|
continue
|
|
nm = names.get(p.id)
|
|
if nm is None:
|
|
continue
|
|
proposed = name2sex.get(gedcom._norm(nm.given, nm.surname))
|
|
if proposed:
|
|
out.append({"person_id": str(p.id), "name": _display(nm), "proposed_gender": proposed})
|
|
out.sort(key=lambda r: r["name"])
|
|
return out
|
|
|
|
|
|
async def guess_gender_by_name(
|
|
session: AsyncSession, *, actor: User, tree: Tree
|
|
) -> list[dict]:
|
|
"""Best-guess sex from the first given name for people who don't have it set,
|
|
using the bundled name dictionary. Ambiguous/unknown names are skipped."""
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
out: list[dict] = []
|
|
for p in await _persons(session, tree.id):
|
|
if p.gender:
|
|
continue
|
|
nm = names.get(p.id)
|
|
if nm is None:
|
|
continue
|
|
proposed = guess_sex(nm.given)
|
|
if proposed:
|
|
out.append({"person_id": str(p.id), "name": _display(nm), "proposed_gender": proposed})
|
|
out.sort(key=lambda r: r["name"])
|
|
return out
|
|
|
|
|
|
async def guess_gender_by_spouse(
|
|
session: AsyncSession, *, actor: User, tree: Tree
|
|
) -> list[dict]:
|
|
"""Infer the sex of a person who has none set from a partner whose sex IS set
|
|
(couples in a tree are opposite-sex in practice — e.g. a confirmed-male
|
|
husband implies a female wife). People whose known partners disagree are
|
|
ambiguous and skipped; the result is a preview to review, not an auto-write."""
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
persons = await _persons(session, tree.id)
|
|
gender = {p.id: p.gender for p in persons}
|
|
names = await _primary_name_by_person(session, tree.id)
|
|
rels = (
|
|
await session.execute(
|
|
select(Relationship).where(
|
|
Relationship.tree_id == tree.id,
|
|
Relationship.deleted_at.is_(None),
|
|
Relationship.type == RelationshipType.partnership,
|
|
)
|
|
)
|
|
).scalars().all()
|
|
opp = {"male": "female", "female": "male"}
|
|
proposals: dict[uuid.UUID, set[str]] = {}
|
|
for r in rels:
|
|
for me_id, other_id in (
|
|
(r.person_from_id, r.person_to_id),
|
|
(r.person_to_id, r.person_from_id),
|
|
):
|
|
if gender.get(me_id):
|
|
continue # this person already has a sex
|
|
other_sex = str(gender.get(other_id) or "")
|
|
if other_sex in opp:
|
|
proposals.setdefault(me_id, set()).add(opp[other_sex])
|
|
out: list[dict] = []
|
|
for pid, sexes in proposals.items():
|
|
if len(sexes) != 1:
|
|
continue # partners of differing known sex → ambiguous
|
|
nm = names.get(pid)
|
|
if nm is None:
|
|
continue
|
|
out.append(
|
|
{"person_id": str(pid), "name": _display(nm), "proposed_gender": next(iter(sexes))}
|
|
)
|
|
out.sort(key=lambda r: r["name"])
|
|
return out
|
|
|
|
|
|
async def apply_gender(
|
|
session: AsyncSession, *, actor: User, tree: Tree, updates: list[dict]
|
|
) -> int:
|
|
"""updates: [{person_id, gender}]."""
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
wanted = {uuid.UUID(str(u["person_id"])): u["gender"] for u in updates if u.get("gender")}
|
|
persons = (
|
|
await session.execute(
|
|
select(Person).where(
|
|
Person.tree_id == tree.id,
|
|
Person.deleted_at.is_(None),
|
|
Person.id.in_(wanted.keys()),
|
|
)
|
|
)
|
|
).scalars().all()
|
|
for p in persons:
|
|
p.gender = wanted[p.id]
|
|
record_audit(
|
|
session,
|
|
action="cleanup_gender",
|
|
entity_type="Tree",
|
|
entity_id=tree.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after={"count": len(persons)},
|
|
)
|
|
await session.commit()
|
|
return len(persons)
|
|
|
|
|
|
# ---- 3. Flag malformed names for review --------------------------------------------
|
|
|
|
_YEAR_RE = re.compile(r"\b\d{3,4}\b")
|
|
|
|
|
|
def _name_issue(n: Name) -> str | None:
|
|
given = (n.given or "").strip()
|
|
surname = (n.surname or "").strip()
|
|
if _YEAR_RE.search(surname) or re.search(r"\d", surname):
|
|
return "date_in_surname"
|
|
if re.search(r"\d", given):
|
|
return "date_in_given"
|
|
# A given name with many tokens often means a maiden+married name was packed
|
|
# in (e.g. "Mary Smith Jones") — surface it for a human to split.
|
|
if surname == "" and len(given.split()) >= 2:
|
|
return "no_surname"
|
|
if len(given.split()) >= 3:
|
|
return "packed_given"
|
|
return None
|
|
|
|
|
|
async def preview_names(session: AsyncSession, *, actor: User, tree: Tree) -> list[dict]:
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
names = (
|
|
await session.execute(
|
|
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
|
|
)
|
|
).scalars().all()
|
|
out: list[dict] = []
|
|
for n in names:
|
|
issue = _name_issue(n)
|
|
if issue:
|
|
out.append({
|
|
"name_id": str(n.id),
|
|
"person_id": str(n.person_id),
|
|
"given": n.given,
|
|
"surname": n.surname,
|
|
"issue": issue,
|
|
})
|
|
return out
|
|
|
|
|
|
async def apply_names(
|
|
session: AsyncSession, *, actor: User, tree: Tree, edits: list[dict]
|
|
) -> int:
|
|
"""edits: [{name_id, given, surname}] — the user's corrected values."""
|
|
await _require_editor(session, actor=actor, tree=tree)
|
|
by_id = {uuid.UUID(str(e["name_id"])): e for e in edits}
|
|
rows = (
|
|
await session.execute(
|
|
select(Name).where(
|
|
Name.tree_id == tree.id,
|
|
Name.deleted_at.is_(None),
|
|
Name.id.in_(by_id.keys()),
|
|
)
|
|
)
|
|
).scalars().all()
|
|
if len(rows) != len(by_id):
|
|
raise NotFound("one or more names not found in this tree")
|
|
for n in rows:
|
|
e = by_id[n.id]
|
|
n.given = (e.get("given") or "").strip() or None
|
|
n.surname = (e.get("surname") or "").strip() or None
|
|
n.display_name = None # rebuild from parts
|
|
record_audit(
|
|
session,
|
|
action="cleanup_names",
|
|
entity_type="Tree",
|
|
entity_id=tree.id,
|
|
tree_id=tree.id,
|
|
actor_user_id=actor.id,
|
|
after={"count": len(rows)},
|
|
)
|
|
await session.commit()
|
|
return len(rows)
|