Files
provenance/backend/app/services/gedcom.py
justin d48029a407 Add GEDCOM import/export
A pragmatic GEDCOM parser + mapper: import reads INDI/FAM/SOUR and creates people, names, life events, partnership + qualified parent-child relationships, marriage events, places (deduped), sources, and citations from SOUR refs — returning a mapping report (counts + unmapped tags). Export serializes the tree back to GEDCOM (families derived from the edge model). Import is additive (no merge) and runs inline for now. Round-trip test passes; 29 tests total.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: Justin Paul <justin@jpaul.me>
2026-06-06 22:46:48 -04:00

452 lines
16 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""GEDCOM import/export.
A pragmatic parser + mapper for the common subset of GEDCOM (5.5.1 / 7 share
the line grammar): INDI, FAM, SOUR. Import maps records into a tree and returns
a mapping report (counts + unmapped tags); export serializes the tree back to
GEDCOM. Runs inline for now — large files should move to the worker later.
"""
import re
import uuid
from collections import defaultdict
from datetime import date
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.enums import ParentChildQualifier, RelationshipType
from app.models.event import Event
from app.models.person import Name, Person
from app.models.place import Place
from app.models.relationship import Relationship
from app.models.source import Citation, Source
from app.models.tree import Tree
from app.models.user import User
from app.services import privacy
from app.services.audit import record_audit
from app.services.exceptions import Forbidden
# GEDCOM event tag -> our event_type (INDI-level).
INDI_EVENTS = {
"BIRT": "birth", "DEAT": "death", "BAPM": "baptism", "CHR": "christening",
"BURI": "burial", "CREM": "cremation", "RESI": "residence", "CENS": "census",
"IMMI": "immigration", "EMIG": "emigration", "OCCU": "occupation",
"EDUC": "education", "GRAD": "graduation", "RETI": "retirement",
"NATU": "naturalization", "BAPL": "baptism",
}
# FAM-level events.
FAM_EVENTS = {"MARR": "marriage", "DIV": "divorce", "ENGA": "engagement"}
EVENT_TO_GED = {v: k for k, v in {**INDI_EVENTS, **FAM_EVENTS}.items()}
class GedcomNode:
__slots__ = ("level", "tag", "value", "xref", "children")
def __init__(self, level: int, tag: str, value: str = "", xref: str | None = None):
self.level = level
self.tag = tag
self.value = value
self.xref = xref
self.children: list[GedcomNode] = []
def first(self, tag: str) -> "GedcomNode | None":
return next((c for c in self.children if c.tag == tag), None)
def all(self, tag: str) -> list["GedcomNode"]:
return [c for c in self.children if c.tag == tag]
def text(self, tag: str, default: str | None = None) -> str | None:
n = self.first(tag)
return n.value if n is not None else default
def parse_records(text: str) -> list[GedcomNode]:
roots: list[GedcomNode] = []
stack: list[GedcomNode] = []
for raw in text.replace("\r\n", "\n").replace("\r", "\n").split("\n"):
line = raw.lstrip("").rstrip()
if not line.strip():
continue
parts = line.split(" ", 1)
try:
level = int(parts[0])
except ValueError:
continue
rest = parts[1] if len(parts) > 1 else ""
xref: str | None = None
if rest.startswith("@"):
end = rest.find("@", 1)
if end != -1:
xref = rest[: end + 1]
rest = rest[end + 1:].strip()
tparts = rest.split(" ", 1)
tag = tparts[0]
value = tparts[1] if len(tparts) > 1 else ""
while stack and stack[-1].level >= level:
stack.pop()
parent = stack[-1] if stack else None
if tag in ("CONC", "CONT") and parent is not None:
parent.value += ("" if tag == "CONC" else "\n") + value
continue
node = GedcomNode(level, tag, value, xref)
if parent is None:
roots.append(node)
else:
parent.children.append(node)
stack.append(node)
return roots
def _parse_name(value: str) -> tuple[str | None, str | None]:
if "/" in value:
given, _, rest = value.partition("/")
surname = rest.split("/", 1)[0]
return given.strip() or None, surname.strip() or None
return value.strip() or None, None
def _year(date_value: str | None) -> str | None:
if not date_value:
return None
m = re.search(r"\b(\d{3,4})\b", date_value)
return m.group(1) if m else None
def _date_start(date_value: str | None) -> date | None:
y = _year(date_value)
if not y:
return None
try:
return date(int(y), 1, 1)
except ValueError:
return None
def _sex(value: str | None) -> str | None:
if not value:
return None
v = value.strip().upper()
return {"M": "male", "F": "female"}.get(v, value.strip().lower() or None)
async def import_gedcom(
session: AsyncSession, *, actor: User, tree: Tree, text: str
) -> dict:
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
raise Forbidden("not an editor of this tree")
roots = parse_records(text)
counts = defaultdict(int)
unmapped: set[str] = set()
place_cache: dict[str, uuid.UUID] = {}
source_map: dict[str, uuid.UUID] = {}
person_map: dict[str, uuid.UUID] = {}
async def place_id(name: str | None) -> uuid.UUID | None:
if not name:
return None
if name in place_cache:
return place_cache[name]
p = Place(tree_id=tree.id, name=name)
session.add(p)
await session.flush()
place_cache[name] = p.id
counts["places"] += 1
return p.id
# Sources first (so citations can reference them).
for rec in roots:
if rec.tag == "SOUR" and rec.xref:
src = Source(
tree_id=tree.id,
title=rec.text("TITL") or rec.text("ABBR") or "Untitled source",
author=rec.text("AUTH"),
publication_info=rec.text("PUBL"),
citation_text=rec.text("TEXT"),
)
session.add(src)
await session.flush()
source_map[rec.xref] = src.id
counts["sources"] += 1
async def add_citations(holder: GedcomNode, **target) -> None:
for s in holder.all("SOUR"):
sid = source_map.get(s.value.strip())
if sid is None:
continue
session.add(
Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target)
)
counts["citations"] += 1
# Individuals.
for rec in roots:
if rec.tag != "INDI" or not rec.xref:
continue
person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX")))
session.add(person)
await session.flush()
person_map[rec.xref] = person.id
counts["persons"] += 1
for i, nm in enumerate(rec.all("NAME")):
given, surname = _parse_name(nm.value)
session.add(
Name(
tree_id=tree.id,
person_id=person.id,
name_type="birth",
given=given,
surname=surname,
display_name=nm.value or None,
is_primary=(i == 0),
sort_order=i,
)
)
counts["names"] += 1
await add_citations(rec, person_id=person.id)
for child in rec.children:
if child.tag in INDI_EVENTS:
dv = child.text("DATE")
ev = Event(
tree_id=tree.id,
person_id=person.id,
event_type=INDI_EVENTS[child.tag],
date_value=dv,
date_start=_date_start(dv),
place_id=await place_id(child.text("PLAC")),
)
session.add(ev)
await session.flush()
counts["events"] += 1
await add_citations(child, event_id=ev.id)
elif child.tag in ("NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID"):
continue
else:
unmapped.add(child.tag)
# Families -> partnerships, parent-child edges, marriage events.
for rec in roots:
if rec.tag != "FAM":
continue
counts["families"] += 1
husb = person_map.get((rec.text("HUSB") or "").strip())
wife = person_map.get((rec.text("WIFE") or "").strip())
partnership_id: uuid.UUID | None = None
if husb and wife:
rel = Relationship(
tree_id=tree.id,
type=RelationshipType.partnership,
person_from_id=husb,
person_to_id=wife,
)
session.add(rel)
await session.flush()
partnership_id = rel.id
counts["relationships"] += 1
for fe in rec.children:
if fe.tag in FAM_EVENTS and partnership_id is not None:
dv = fe.text("DATE")
ev = Event(
tree_id=tree.id,
relationship_id=partnership_id,
event_type=FAM_EVENTS[fe.tag],
date_value=dv,
date_start=_date_start(dv),
place_id=await place_id(fe.text("PLAC")),
)
session.add(ev)
await session.flush()
counts["events"] += 1
for chil in rec.all("CHIL"):
cp = person_map.get(chil.value.strip())
if cp is None:
continue
for parent in (husb, wife):
if parent and parent != cp:
session.add(
Relationship(
tree_id=tree.id,
type=RelationshipType.parent_child,
person_from_id=parent,
person_to_id=cp,
qualifier=ParentChildQualifier.biological,
)
)
counts["relationships"] += 1
record_audit(
session,
action="import",
entity_type="Gedcom",
tree_id=tree.id,
actor_user_id=actor.id,
after=dict(counts),
)
await session.commit()
return {"counts": dict(counts), "unmapped_tags": sorted(unmapped)}
def _ged_date(value: str | None) -> str | None:
return value.strip() if value else None
async def export_gedcom(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> str:
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
raise Forbidden("not permitted to view this tree")
persons = list(
(
await session.execute(
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
)
).scalars().all()
)
names = list(
(
await session.execute(
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
)
).scalars().all()
)
events = list(
(
await session.execute(
select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None))
)
).scalars().all()
)
rels = list(
(
await session.execute(
select(Relationship).where(
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
)
)
).scalars().all()
)
sources = list(
(
await session.execute(
select(Source).where(Source.tree_id == tree.id, Source.deleted_at.is_(None))
)
).scalars().all()
)
places = {
p.id: p
for p in (
await session.execute(select(Place).where(Place.tree_id == tree.id))
).scalars().all()
}
pxref = {p.id: f"@I{i + 1}@" for i, p in enumerate(persons)}
gender_by_id = {p.id: p.gender for p in persons}
sxref = {s.id: f"@S{i + 1}@" for i, s in enumerate(sources)}
names_by_person: dict[uuid.UUID, list[Name]] = defaultdict(list)
for n in sorted(names, key=lambda n: (n.sort_order, not n.is_primary)):
names_by_person[n.person_id].append(n)
events_by_person: dict[uuid.UUID, list[Event]] = defaultdict(list)
events_by_rel: dict[uuid.UUID, list[Event]] = defaultdict(list)
for e in events:
if e.person_id:
events_by_person[e.person_id].append(e)
elif e.relationship_id:
events_by_rel[e.relationship_id].append(e)
# Build families from parent-child + partnership edges (group by parent set).
parents_of: dict[uuid.UUID, set[uuid.UUID]] = defaultdict(set)
for r in rels:
if r.type == RelationshipType.parent_child:
parents_of[r.person_to_id].add(r.person_from_id)
fams: dict[frozenset, dict] = {}
for child, ps in parents_of.items():
key = frozenset(ps)
fams.setdefault(key, {"parents": set(ps), "children": [], "rel_id": None})
fams[key]["children"].append(child)
for r in rels:
if r.type == RelationshipType.partnership:
key = frozenset({r.person_from_id, r.person_to_id})
fam = fams.setdefault(
key,
{"parents": {r.person_from_id, r.person_to_id}, "children": [], "rel_id": None},
)
fam["rel_id"] = r.id
fam_list = list(fams.values())
fxref = {id(f): f"@F{i + 1}@" for i, f in enumerate(fam_list)}
# person -> the families they are a spouse in / a child in
spouse_fams: dict[uuid.UUID, list[str]] = defaultdict(list)
child_fams: dict[uuid.UUID, str] = {}
for f in fam_list:
x = fxref[id(f)]
for pid in f["parents"]:
spouse_fams[pid].append(x)
for cid in f["children"]:
child_fams[cid] = x
out: list[str] = ["0 HEAD", "1 SOUR Provenance", "1 GEDC", "2 VERS 5.5.1", "1 CHAR UTF-8"]
for p in persons:
out.append(f"0 {pxref[p.id]} INDI")
for n in names_by_person.get(p.id, []):
display = n.display_name or f"{n.given or ''} /{n.surname or ''}/".strip()
out.append(f"1 NAME {display}")
sex = {"male": "M", "female": "F"}.get(p.gender or "")
if sex:
out.append(f"1 SEX {sex}")
for e in events_by_person.get(p.id, []):
tag = EVENT_TO_GED.get(e.event_type)
if not tag:
continue
out.append(f"1 {tag}")
if _ged_date(e.date_value):
out.append(f"2 DATE {e.date_value}")
if e.place_id and e.place_id in places:
out.append(f"2 PLAC {places[e.place_id].name}")
if p.id in child_fams:
out.append(f"1 FAMC {child_fams[p.id]}")
for x in spouse_fams.get(p.id, []):
out.append(f"1 FAMS {x}")
for f in fam_list:
x = fxref[id(f)]
out.append(f"0 {x} FAM")
ps = list(f["parents"])
# HUSB/WIFE by recorded gender where possible.
males = [pid for pid in ps if gender_by_id.get(pid) == "male"]
females = [pid for pid in ps if gender_by_id.get(pid) == "female"]
husb = males[0] if males else (ps[0] if ps else None)
wife = females[0] if females else next((pid for pid in ps if pid != husb), None)
if husb:
out.append(f"1 HUSB {pxref[husb]}")
if wife:
out.append(f"1 WIFE {pxref[wife]}")
for cid in f["children"]:
out.append(f"1 CHIL {pxref[cid]}")
if f["rel_id"]:
for e in events_by_rel.get(f["rel_id"], []):
tag = EVENT_TO_GED.get(e.event_type)
if not tag:
continue
out.append(f"1 {tag}")
if _ged_date(e.date_value):
out.append(f"2 DATE {e.date_value}")
for s in sources:
out.append(f"0 {sxref[s.id]} SOUR")
if s.title:
out.append(f"1 TITL {s.title}")
if s.author:
out.append(f"1 AUTH {s.author}")
if s.publication_info:
out.append(f"1 PUBL {s.publication_info}")
out.append("0 TRLR")
return "\n".join(out) + "\n"