Files
justin b4434cb5dd Fix #169: keep citation links on GEDCOM export
Export emitted SOUR records but never the per-fact SOUR links, so a
Provenance→Provenance round-trip destroyed the sources graph (citations were
dropped). Emit citation links on the facts they sit on:
- person-level → 1 SOUR @Sx@ (2 PAGE)
- name-level   → 2 SOUR under 1 NAME
- event-level  → 2 SOUR under the event (incl. partnership events in FAM)
- relationship → 1 SOUR under FAM
Citations whose source didn't export are skipped.

Test: a person + event citation round-trips through export→import into a fresh
tree with their pages intact. GEDCOM suite 6 passed.

Closes #169

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-09 12:37:03 -04:00

842 lines
31 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""GEDCOM import/export.
A pragmatic parser + mapper for the common subset of GEDCOM (5.5.1 / 7 share
the line grammar): INDI, FAM, SOUR. Import maps records into a tree and returns
a mapping report (counts + unmapped tags); export serializes the tree back to
GEDCOM. Runs inline for now — large files should move to the worker later.
Import is duplicate-aware: ``preview_gedcom`` reports incoming people that look
like existing ones, and ``import_gedcom`` applies a per-record resolution
(new / skip / merge / overwrite). Names carry their GEDCOM type (a married name
imports as a typed alternate, not a second primary).
"""
import re
import uuid
from collections import defaultdict
from datetime import UTC, date, datetime
from difflib import SequenceMatcher
from sqlalchemy import or_, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.enums import ParentChildQualifier, RelationshipType
from app.models.event import Event
from app.models.person import Name, Person
from app.models.place import Place
from app.models.relationship import Relationship
from app.models.source import Citation, Source
from app.models.tree import Tree
from app.models.user import User
from app.services import privacy
from app.services.audit import record_audit
from app.services.exceptions import Forbidden
# GEDCOM event tag -> our event_type (INDI-level).
INDI_EVENTS = {
"BIRT": "birth", "DEAT": "death", "BAPM": "baptism", "CHR": "christening",
"BURI": "burial", "CREM": "cremation", "RESI": "residence", "CENS": "census",
"IMMI": "immigration", "EMIG": "emigration", "OCCU": "occupation",
"EDUC": "education", "GRAD": "graduation", "RETI": "retirement",
"NATU": "naturalization", "BAPL": "baptism", "RELI": "religion",
}
# INDI attribute tags whose line VALUE is the fact (no date), stored in detail.
VALUE_EVENTS = {"RELI", "OCCU", "EDUC"}
# INDI sub-tags consumed elsewhere or intentionally ignored (not "unmapped").
INDI_SKIP_TAGS = {
"NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID", "_MARNM", "NOTE",
}
# FAM-level events.
FAM_EVENTS = {"MARR": "marriage", "DIV": "divorce", "ENGA": "engagement"}
EVENT_TO_GED = {v: k for k, v in {**INDI_EVENTS, **FAM_EVENTS}.items()}
# GEDCOM NAME TYPE (or _MARNM-derived) -> our Name.name_type vocabulary.
NAME_TYPE_MAP = {
"birth": "birth", "maiden": "birth", "married": "married",
"aka": "alias", "also known as": "alias", "nickname": "nickname",
"religious": "religious", "immigrant": "immigration",
"immigration": "immigration", "professional": "alias", "other": "alias",
}
# Our type -> GEDCOM TYPE on export (birth is the default; emit nothing).
EXPORT_TYPE_MAP = {
"married": "married", "alias": "aka", "nickname": "nickname",
"religious": "religious", "immigration": "immigrant",
}
class GedcomNode:
__slots__ = ("level", "tag", "value", "xref", "children")
def __init__(self, level: int, tag: str, value: str = "", xref: str | None = None):
self.level = level
self.tag = tag
self.value = value
self.xref = xref
self.children: list[GedcomNode] = []
def first(self, tag: str) -> "GedcomNode | None":
return next((c for c in self.children if c.tag == tag), None)
def all(self, tag: str) -> list["GedcomNode"]:
return [c for c in self.children if c.tag == tag]
def text(self, tag: str, default: str | None = None) -> str | None:
n = self.first(tag)
return n.value if n is not None else default
def parse_records(text: str) -> list[GedcomNode]:
roots: list[GedcomNode] = []
stack: list[GedcomNode] = []
for raw in text.replace("\r\n", "\n").replace("\r", "\n").split("\n"):
line = raw.lstrip("").rstrip()
if not line.strip():
continue
parts = line.split(" ", 1)
try:
level = int(parts[0])
except ValueError:
continue
rest = parts[1] if len(parts) > 1 else ""
xref: str | None = None
if rest.startswith("@"):
end = rest.find("@", 1)
if end != -1:
xref = rest[: end + 1]
rest = rest[end + 1:].strip()
tparts = rest.split(" ", 1)
tag = tparts[0]
value = tparts[1] if len(tparts) > 1 else ""
while stack and stack[-1].level >= level:
stack.pop()
parent = stack[-1] if stack else None
if tag in ("CONC", "CONT") and parent is not None:
parent.value += ("" if tag == "CONC" else "\n") + value
continue
node = GedcomNode(level, tag, value, xref)
if parent is None:
roots.append(node)
else:
parent.children.append(node)
stack.append(node)
return roots
def _parse_name(value: str) -> tuple[str | None, str | None]:
if "/" in value:
given, _, rest = value.partition("/")
surname = rest.split("/", 1)[0]
return given.strip() or None, surname.strip() or None
return value.strip() or None, None
def _parse_marnm(value: str, base_given: str | None) -> tuple[str | None, str | None]:
"""A _MARNM value is sometimes a full name ("Jane /Smith/") and sometimes
just the married surname ("Smith"). Keep the given name from the base name
in the latter case."""
v = (value or "").strip()
if "/" in v:
g, s = _parse_name(v)
return (g or base_given), s
return base_given, (v or None)
def _extract_names(rec: GedcomNode) -> list[dict]:
"""All names for an INDI, typed. Multiple NAME records (each with an optional
TYPE) plus any _MARNM (married name) subtags become separate Name rows. The
first birth/maiden name is primary."""
out: list[dict] = []
for nm in rec.all("NAME"):
g, s = _parse_name(nm.value)
t = (nm.text("TYPE") or "").strip().lower()
ntype = NAME_TYPE_MAP.get(t, t or "birth")
out.append({"type": ntype, "given": g, "surname": s, "display": nm.value or None,
"nickname": nm.text("NICK")})
for mar in nm.all("_MARNM"):
mg, ms = _parse_marnm(mar.value, g)
out.append({"type": "married", "given": mg, "surname": ms,
"display": mar.value or None, "nickname": None})
for mar in rec.all("_MARNM"):
base_g = out[0]["given"] if out else None
mg, ms = _parse_marnm(mar.value, base_g)
out.append({"type": "married", "given": mg, "surname": ms,
"display": mar.value or None, "nickname": None})
if not out:
return out
primary_idx = next((i for i, n in enumerate(out) if n["type"] == "birth"), 0)
for i, n in enumerate(out):
n["is_primary"] = i == primary_idx
n["sort"] = i
return out
def _norm(given: str | None, surname: str | None) -> str:
return re.sub(r"\s+", " ", f"{given or ''} {surname or ''}".strip().lower())
def _year(date_value: str | None) -> str | None:
if not date_value:
return None
m = re.search(r"\b(\d{3,4})\b", date_value)
return m.group(1) if m else None
def _date_start(date_value: str | None) -> date | None:
y = _year(date_value)
if not y:
return None
try:
return date(int(y), 1, 1)
except ValueError:
return None
def _sex(value: str | None) -> str | None:
if not value:
return None
v = value.strip().upper()
return {"M": "male", "F": "female"}.get(v, value.strip().lower() or None)
def _notes_text(rec: GedcomNode) -> str | None:
"""Join an INDI's NOTE lines (which pack confidence / findagrave / fs_pid /
free text) into the person's notes field."""
vals = [n.value.strip() for n in rec.all("NOTE") if n.value and n.value.strip()]
return "\n".join(vals) or None
def _person_summary(rec: GedcomNode) -> dict:
"""Display name + birth year for an incoming INDI, for duplicate matching."""
names = _extract_names(rec)
primary = next((n for n in names if n.get("is_primary")), names[0] if names else None)
g = primary["given"] if primary else None
s = primary["surname"] if primary else None
disp = " ".join(x for x in (g, s) if x)
if not disp and primary:
disp = primary.get("display") or ""
birth = rec.first("BIRT")
year = _year(birth.text("DATE")) if birth else None
return {"names": names, "norm": _norm(g, s), "name": disp or "(no name)", "year": year}
async def _build_existing_index(session: AsyncSession, tree: Tree) -> list[dict]:
"""Existing (non-deleted) people with a display name + birth year, for
matching incoming records against."""
persons = list(
(
await session.execute(
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
)
).scalars().all()
)
names = list(
(
await session.execute(
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
)
).scalars().all()
)
name_by_person: dict[uuid.UUID, Name] = {}
for n in sorted(names, key=lambda n: (not n.is_primary, n.sort_order)):
name_by_person.setdefault(n.person_id, n)
births = list(
(
await session.execute(
select(Event).where(
Event.tree_id == tree.id,
Event.deleted_at.is_(None),
Event.event_type == "birth",
)
)
).scalars().all()
)
year_by_person: dict[uuid.UUID, str] = {}
for e in births:
if e.person_id and e.person_id not in year_by_person:
y = str(e.date_start.year) if e.date_start else _year(e.date_value)
if y:
year_by_person[e.person_id] = y
index: list[dict] = []
for p in persons:
nm = name_by_person.get(p.id)
g = nm.given if nm else None
s = nm.surname if nm else None
disp = " ".join(x for x in (g, s) if x) or (nm.display_name if nm else None)
index.append({
"id": p.id,
"norm": _norm(g, s),
"name": disp or "(no name)",
"year": year_by_person.get(p.id),
})
return index
def _best_match(norm: str, year: str | None, index: list[dict]) -> tuple[dict | None, str | None]:
"""Closest existing person by name similarity, rejecting clear birth-year
conflicts. Returns (entry, "high"|"medium") or (None, None)."""
if not norm:
return None, None
best: dict | None = None
best_r = 0.0
for e in index:
if not e["norm"]:
continue
r = SequenceMatcher(None, norm, e["norm"]).ratio()
if r < 0.88:
continue
if year and e["year"] and abs(int(year) - int(e["year"])) > 1:
continue # same-ish name but different birth year — not a duplicate
if r > best_r:
best_r = r
best = e
if best is None:
return None, None
year_match = bool(year and best["year"] and abs(int(year) - int(best["year"])) <= 1)
both_unknown = not year and not best["year"]
score = "high" if best_r >= 0.93 and (year_match or both_unknown) else "medium"
return best, score
def _relkey(rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID) -> tuple:
if rtype == RelationshipType.parent_child:
return ("pc", str(a), str(b))
return (rtype.value, *sorted([str(a), str(b)]))
def _count_incoming(roots: list[GedcomNode]) -> tuple[dict, list[str]]:
counts: dict[str, int] = defaultdict(int)
unmapped: set[str] = set()
for rec in roots:
if rec.tag == "INDI" and rec.xref:
counts["persons"] += 1
counts["names"] += len(_extract_names(rec))
for child in rec.children:
if child.tag in INDI_EVENTS:
counts["events"] += 1
elif child.tag not in INDI_SKIP_TAGS:
unmapped.add(child.tag)
elif rec.tag == "FAM":
counts["families"] += 1
for child in rec.children:
if child.tag in FAM_EVENTS:
counts["events"] += 1
elif rec.tag == "SOUR" and rec.xref:
counts["sources"] += 1
return dict(counts), sorted(unmapped)
async def preview_gedcom(session: AsyncSession, *, actor: User, tree: Tree, text: str) -> dict:
"""Dry run: what would import, and which incoming people look like existing
ones. No writes."""
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
roots = parse_records(text)
counts, unmapped = _count_incoming(roots)
index = await _build_existing_index(session, tree)
duplicates: list[dict] = []
for rec in roots:
if rec.tag != "INDI" or not rec.xref:
continue
summ = _person_summary(rec)
entry, score = _best_match(summ["norm"], summ["year"], index)
if entry is None:
continue
duplicates.append({
"xref": rec.xref,
"incoming_name": summ["name"],
"incoming_birth_year": summ["year"],
"existing_person_id": entry["id"],
"existing_name": entry["name"],
"existing_birth_year": entry["year"],
"score": score,
})
return {"counts": counts, "potential_duplicates": duplicates, "unmapped_tags": unmapped}
async def import_gedcom(
session: AsyncSession,
*,
actor: User,
tree: Tree,
text: str,
default_action: str = "new",
resolutions: dict | None = None,
) -> dict:
"""Import records. ``default_action`` (new|skip|merge|overwrite) applies to
incoming people that match an existing one; ``resolutions`` overrides it per
GEDCOM xref ({xref: {action, target_id}}). 'skip' links families to the
existing person but copies nothing; 'merge' also copies the incoming names
(as alternates), events and citations onto them; 'overwrite' deletes the
existing person and imports the incoming one fresh."""
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
resolutions = resolutions or {}
roots = parse_records(text)
counts: dict[str, int] = defaultdict(int)
unmapped: set[str] = set()
place_cache: dict[str, uuid.UUID] = {}
source_map: dict[str, uuid.UUID] = {}
person_map: dict[str, uuid.UUID] = {}
now = datetime.now(UTC)
index = await _build_existing_index(session, tree)
# Pre-load existing relationship keys so a merge doesn't create dup edges.
existing_rels = list(
(
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
)
)
).scalars().all()
)
rel_keys = {_relkey(r.type, r.person_from_id, r.person_to_id) for r in existing_rels}
def add_relationship(
rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID, **kw
) -> Relationship | None:
key = _relkey(rtype, a, b)
if key in rel_keys:
return None
rel = Relationship(tree_id=tree.id, type=rtype, person_from_id=a, person_to_id=b, **kw)
session.add(rel)
rel_keys.add(key)
counts["relationships"] += 1
return rel
async def place_id(name: str | None) -> uuid.UUID | None:
if not name:
return None
if name in place_cache:
return place_cache[name]
p = Place(tree_id=tree.id, name=name)
session.add(p)
await session.flush()
place_cache[name] = p.id
counts["places"] += 1
return p.id
# Sources first (so citations can reference them).
for rec in roots:
if rec.tag == "SOUR" and rec.xref:
src = Source(
tree_id=tree.id,
title=rec.text("TITL") or rec.text("ABBR") or "Untitled source",
author=rec.text("AUTH"),
publication_info=rec.text("PUBL"),
citation_text=rec.text("TEXT"),
)
session.add(src)
await session.flush()
source_map[rec.xref] = src.id
counts["sources"] += 1
async def add_citations(holder: GedcomNode, **target) -> None:
for s in holder.all("SOUR"):
sid = source_map.get(s.value.strip())
if sid is None:
continue
session.add(Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target))
counts["citations"] += 1
def add_names(person_id: uuid.UUID, names: list[dict], *, set_primary: bool) -> None:
for nd in names:
session.add(
Name(
tree_id=tree.id,
person_id=person_id,
name_type=nd["type"],
given=nd["given"],
surname=nd["surname"],
nickname=nd.get("nickname"),
display_name=nd.get("display"),
is_primary=set_primary and nd.get("is_primary", False),
sort_order=nd.get("sort", 0),
)
)
counts["names"] += 1
async def add_events(rec: GedcomNode, person_id: uuid.UUID) -> None:
for child in rec.children:
if child.tag in INDI_EVENTS:
dv = child.text("DATE")
# Attribute-style facts (RELI, OCCU, EDUC) carry their value on
# the line itself; store it in detail.
detail = child.value.strip() if child.tag in VALUE_EVENTS else None
ev = Event(
tree_id=tree.id,
person_id=person_id,
event_type=INDI_EVENTS[child.tag],
date_value=dv,
date_start=_date_start(dv),
place_id=await place_id(child.text("PLAC")),
detail=detail or None,
notes=child.text("NOTE"),
)
session.add(ev)
await session.flush()
counts["events"] += 1
await add_citations(child, event_id=ev.id)
elif child.tag in INDI_SKIP_TAGS:
continue
else:
unmapped.add(child.tag)
async def soft_delete_existing(person_id: uuid.UUID) -> None:
p = (
await session.execute(
select(Person).where(Person.id == person_id, Person.deleted_at.is_(None))
)
).scalar_one_or_none()
if p is None:
return
p.deleted_at = now
rels = (
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id,
Relationship.deleted_at.is_(None),
or_(
Relationship.person_from_id == person_id,
Relationship.person_to_id == person_id,
),
)
)
).scalars().all()
for r in rels:
r.deleted_at = now
await session.execute(
update(User).where(User.self_person_id == person_id).values(self_person_id=None)
)
# Precompute the best match per incoming xref (for default-policy resolution).
matches: dict[str, dict] = {}
for rec in roots:
if rec.tag == "INDI" and rec.xref:
summ = _person_summary(rec)
entry, _score = _best_match(summ["norm"], summ["year"], index)
if entry is not None:
matches[rec.xref] = entry
def resolve(xref: str) -> tuple[str, uuid.UUID | None]:
ov = resolutions.get(xref)
if ov:
action = ov.get("action", "new")
tid = ov.get("target_id")
target = uuid.UUID(tid) if tid else (matches[xref]["id"] if xref in matches else None)
if action in ("skip", "merge", "overwrite") and target is None:
return "new", None
return action, target
if default_action != "new" and xref in matches:
return default_action, matches[xref]["id"]
return "new", None
# Individuals.
for rec in roots:
if rec.tag != "INDI" or not rec.xref:
continue
names = _extract_names(rec)
action, target = resolve(rec.xref)
if action == "skip" and target is not None:
person_map[rec.xref] = target
counts["skipped"] += 1
continue
if action == "merge" and target is not None:
person_map[rec.xref] = target
add_names(target, names, set_primary=False)
await add_events(rec, target)
await add_citations(rec, person_id=target)
note = _notes_text(rec)
if note:
existing = (
await session.execute(select(Person).where(Person.id == target))
).scalar_one_or_none()
if existing is not None:
existing.notes = "\n".join(filter(None, [existing.notes, note]))
counts["merged"] += 1
continue
if action == "overwrite" and target is not None:
await soft_delete_existing(target)
counts["overwritten"] += 1
person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX")), notes=_notes_text(rec))
session.add(person)
await session.flush()
person_map[rec.xref] = person.id
counts["persons"] += 1
add_names(person.id, names, set_primary=True)
await add_citations(rec, person_id=person.id)
await add_events(rec, person.id)
# Families -> partnerships, parent-child edges, marriage events.
for rec in roots:
if rec.tag != "FAM":
continue
counts["families"] += 1
husb = person_map.get((rec.text("HUSB") or "").strip())
wife = person_map.get((rec.text("WIFE") or "").strip())
partnership_id: uuid.UUID | None = None
if husb and wife and husb != wife:
rel = add_relationship(RelationshipType.partnership, husb, wife)
if rel is not None:
await session.flush()
partnership_id = rel.id
if partnership_id is None and husb and wife:
# Edge already existed — find it so marriage events can attach.
existing = next(
(
r for r in existing_rels
if r.type == RelationshipType.partnership
and {r.person_from_id, r.person_to_id} == {husb, wife}
),
None,
)
partnership_id = existing.id if existing else None
for fe in rec.children:
if fe.tag in FAM_EVENTS and partnership_id is not None:
dv = fe.text("DATE")
ev = Event(
tree_id=tree.id,
relationship_id=partnership_id,
event_type=FAM_EVENTS[fe.tag],
date_value=dv,
date_start=_date_start(dv),
place_id=await place_id(fe.text("PLAC")),
)
session.add(ev)
await session.flush()
counts["events"] += 1
for chil in rec.all("CHIL"):
cp = person_map.get(chil.value.strip())
if cp is None:
continue
for parent in (husb, wife):
if parent and parent != cp:
add_relationship(
RelationshipType.parent_child,
parent,
cp,
qualifier=ParentChildQualifier.biological,
)
record_audit(
session,
action="import",
entity_type="Gedcom",
tree_id=tree.id,
actor_user_id=actor.id,
after=dict(counts),
)
await session.commit()
return {"counts": dict(counts), "unmapped_tags": sorted(unmapped)}
def _ged_date(value: str | None) -> str | None:
return value.strip() if value else None
async def export_gedcom(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> str:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
persons = list(
(
await session.execute(
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
)
).scalars().all()
)
names = list(
(
await session.execute(
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
)
).scalars().all()
)
events = list(
(
await session.execute(
select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None))
)
).scalars().all()
)
rels = list(
(
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
)
)
).scalars().all()
)
sources = list(
(
await session.execute(
select(Source).where(Source.tree_id == tree.id, Source.deleted_at.is_(None))
)
).scalars().all()
)
places = {
p.id: p
for p in (
await session.execute(select(Place).where(Place.tree_id == tree.id))
).scalars().all()
}
citations = list(
(
await session.execute(
select(Citation).where(
Citation.tree_id == tree.id, Citation.deleted_at.is_(None)
)
)
).scalars().all()
)
pxref = {p.id: f"@I{i + 1}@" for i, p in enumerate(persons)}
gender_by_id = {p.id: p.gender for p in persons}
sxref = {s.id: f"@S{i + 1}@" for i, s in enumerate(sources)}
# Citations grouped by the fact they sit on, so each fact can emit its SOUR
# links (dropping these is the round-trip data loss this fixes). Skip any
# whose source didn't export.
cite_by_person: dict[uuid.UUID, list[Citation]] = defaultdict(list)
cite_by_name: dict[uuid.UUID, list[Citation]] = defaultdict(list)
cite_by_event: dict[uuid.UUID, list[Citation]] = defaultdict(list)
cite_by_rel: dict[uuid.UUID, list[Citation]] = defaultdict(list)
for c in citations:
if c.source_id not in sxref:
continue
if c.person_id:
cite_by_person[c.person_id].append(c)
elif c.event_id:
cite_by_event[c.event_id].append(c)
elif c.name_id:
cite_by_name[c.name_id].append(c)
elif c.relationship_id:
cite_by_rel[c.relationship_id].append(c)
def cite_lines(cites: list[Citation], depth: int) -> list[str]:
lines: list[str] = []
for c in cites:
lines.append(f"{depth} SOUR {sxref[c.source_id]}")
if c.page:
lines.append(f"{depth + 1} PAGE {c.page}")
return lines
names_by_person: dict[uuid.UUID, list[Name]] = defaultdict(list)
for n in sorted(names, key=lambda n: (n.sort_order, not n.is_primary)):
names_by_person[n.person_id].append(n)
events_by_person: dict[uuid.UUID, list[Event]] = defaultdict(list)
events_by_rel: dict[uuid.UUID, list[Event]] = defaultdict(list)
for e in events:
if e.person_id:
events_by_person[e.person_id].append(e)
elif e.relationship_id:
events_by_rel[e.relationship_id].append(e)
# Build families from parent-child + partnership edges (group by parent set).
parents_of: dict[uuid.UUID, set[uuid.UUID]] = defaultdict(set)
for r in rels:
if r.type == RelationshipType.parent_child:
parents_of[r.person_to_id].add(r.person_from_id)
fams: dict[frozenset, dict] = {}
for child, ps in parents_of.items():
key = frozenset(ps)
fams.setdefault(key, {"parents": set(ps), "children": [], "rel_id": None})
fams[key]["children"].append(child)
for r in rels:
if r.type == RelationshipType.partnership:
key = frozenset({r.person_from_id, r.person_to_id})
fam = fams.setdefault(
key,
{"parents": {r.person_from_id, r.person_to_id}, "children": [], "rel_id": None},
)
fam["rel_id"] = r.id
fam_list = list(fams.values())
fxref = {id(f): f"@F{i + 1}@" for i, f in enumerate(fam_list)}
# person -> the families they are a spouse in / a child in
spouse_fams: dict[uuid.UUID, list[str]] = defaultdict(list)
child_fams: dict[uuid.UUID, str] = {}
for f in fam_list:
x = fxref[id(f)]
for pid in f["parents"]:
spouse_fams[pid].append(x)
for cid in f["children"]:
child_fams[cid] = x
out: list[str] = ["0 HEAD", "1 SOUR Provenance", "1 GEDC", "2 VERS 5.5.1", "1 CHAR UTF-8"]
for p in persons:
out.append(f"0 {pxref[p.id]} INDI")
for n in names_by_person.get(p.id, []):
display = n.display_name or f"{n.given or ''} /{n.surname or ''}/".strip()
out.append(f"1 NAME {display}")
ged_type = EXPORT_TYPE_MAP.get(n.name_type)
if ged_type:
out.append(f"2 TYPE {ged_type}")
out += cite_lines(cite_by_name.get(n.id, []), 2)
sex = {"male": "M", "female": "F"}.get(p.gender or "")
if sex:
out.append(f"1 SEX {sex}")
for e in events_by_person.get(p.id, []):
tag = EVENT_TO_GED.get(e.event_type)
if not tag:
continue
out.append(f"1 {tag}")
if _ged_date(e.date_value):
out.append(f"2 DATE {e.date_value}")
if e.place_id and e.place_id in places:
out.append(f"2 PLAC {places[e.place_id].name}")
out += cite_lines(cite_by_event.get(e.id, []), 2)
out += cite_lines(cite_by_person.get(p.id, []), 1)
if p.id in child_fams:
out.append(f"1 FAMC {child_fams[p.id]}")
for x in spouse_fams.get(p.id, []):
out.append(f"1 FAMS {x}")
for f in fam_list:
x = fxref[id(f)]
out.append(f"0 {x} FAM")
ps = list(f["parents"])
# HUSB/WIFE by recorded gender where possible.
males = [pid for pid in ps if gender_by_id.get(pid) == "male"]
females = [pid for pid in ps if gender_by_id.get(pid) == "female"]
husb = males[0] if males else (ps[0] if ps else None)
wife = females[0] if females else next((pid for pid in ps if pid != husb), None)
if husb:
out.append(f"1 HUSB {pxref[husb]}")
if wife:
out.append(f"1 WIFE {pxref[wife]}")
for cid in f["children"]:
out.append(f"1 CHIL {pxref[cid]}")
if f["rel_id"]:
for e in events_by_rel.get(f["rel_id"], []):
tag = EVENT_TO_GED.get(e.event_type)
if not tag:
continue
out.append(f"1 {tag}")
if _ged_date(e.date_value):
out.append(f"2 DATE {e.date_value}")
out += cite_lines(cite_by_event.get(e.id, []), 2)
out += cite_lines(cite_by_rel.get(f["rel_id"], []), 1)
for s in sources:
out.append(f"0 {sxref[s.id]} SOUR")
if s.title:
out.append(f"1 TITL {s.title}")
if s.author:
out.append(f"1 AUTH {s.author}")
if s.publication_info:
out.append(f"1 PUBL {s.publication_info}")
out.append("0 TRLR")
return "\n".join(out) + "\n"