d48029a407
A pragmatic GEDCOM parser + mapper: import reads INDI/FAM/SOUR and creates people, names, life events, partnership + qualified parent-child relationships, marriage events, places (deduped), sources, and citations from SOUR refs — returning a mapping report (counts + unmapped tags). Export serializes the tree back to GEDCOM (families derived from the edge model). Import is additive (no merge) and runs inline for now. Round-trip test passes; 29 tests total. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Paul <justin@jpaul.me>
452 lines
16 KiB
Python
452 lines
16 KiB
Python
"""GEDCOM import/export.
|
||
|
||
A pragmatic parser + mapper for the common subset of GEDCOM (5.5.1 / 7 share
|
||
the line grammar): INDI, FAM, SOUR. Import maps records into a tree and returns
|
||
a mapping report (counts + unmapped tags); export serializes the tree back to
|
||
GEDCOM. Runs inline for now — large files should move to the worker later.
|
||
"""
|
||
|
||
import re
|
||
import uuid
|
||
from collections import defaultdict
|
||
from datetime import date
|
||
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.models.enums import ParentChildQualifier, RelationshipType
|
||
from app.models.event import Event
|
||
from app.models.person import Name, Person
|
||
from app.models.place import Place
|
||
from app.models.relationship import Relationship
|
||
from app.models.source import Citation, Source
|
||
from app.models.tree import Tree
|
||
from app.models.user import User
|
||
from app.services import privacy
|
||
from app.services.audit import record_audit
|
||
from app.services.exceptions import Forbidden
|
||
|
||
# GEDCOM event tag -> our event_type (INDI-level).
|
||
INDI_EVENTS = {
|
||
"BIRT": "birth", "DEAT": "death", "BAPM": "baptism", "CHR": "christening",
|
||
"BURI": "burial", "CREM": "cremation", "RESI": "residence", "CENS": "census",
|
||
"IMMI": "immigration", "EMIG": "emigration", "OCCU": "occupation",
|
||
"EDUC": "education", "GRAD": "graduation", "RETI": "retirement",
|
||
"NATU": "naturalization", "BAPL": "baptism",
|
||
}
|
||
# FAM-level events.
|
||
FAM_EVENTS = {"MARR": "marriage", "DIV": "divorce", "ENGA": "engagement"}
|
||
EVENT_TO_GED = {v: k for k, v in {**INDI_EVENTS, **FAM_EVENTS}.items()}
|
||
|
||
|
||
class GedcomNode:
|
||
__slots__ = ("level", "tag", "value", "xref", "children")
|
||
|
||
def __init__(self, level: int, tag: str, value: str = "", xref: str | None = None):
|
||
self.level = level
|
||
self.tag = tag
|
||
self.value = value
|
||
self.xref = xref
|
||
self.children: list[GedcomNode] = []
|
||
|
||
def first(self, tag: str) -> "GedcomNode | None":
|
||
return next((c for c in self.children if c.tag == tag), None)
|
||
|
||
def all(self, tag: str) -> list["GedcomNode"]:
|
||
return [c for c in self.children if c.tag == tag]
|
||
|
||
def text(self, tag: str, default: str | None = None) -> str | None:
|
||
n = self.first(tag)
|
||
return n.value if n is not None else default
|
||
|
||
|
||
def parse_records(text: str) -> list[GedcomNode]:
|
||
roots: list[GedcomNode] = []
|
||
stack: list[GedcomNode] = []
|
||
for raw in text.replace("\r\n", "\n").replace("\r", "\n").split("\n"):
|
||
line = raw.lstrip("").rstrip()
|
||
if not line.strip():
|
||
continue
|
||
parts = line.split(" ", 1)
|
||
try:
|
||
level = int(parts[0])
|
||
except ValueError:
|
||
continue
|
||
rest = parts[1] if len(parts) > 1 else ""
|
||
xref: str | None = None
|
||
if rest.startswith("@"):
|
||
end = rest.find("@", 1)
|
||
if end != -1:
|
||
xref = rest[: end + 1]
|
||
rest = rest[end + 1:].strip()
|
||
tparts = rest.split(" ", 1)
|
||
tag = tparts[0]
|
||
value = tparts[1] if len(tparts) > 1 else ""
|
||
|
||
while stack and stack[-1].level >= level:
|
||
stack.pop()
|
||
parent = stack[-1] if stack else None
|
||
|
||
if tag in ("CONC", "CONT") and parent is not None:
|
||
parent.value += ("" if tag == "CONC" else "\n") + value
|
||
continue
|
||
|
||
node = GedcomNode(level, tag, value, xref)
|
||
if parent is None:
|
||
roots.append(node)
|
||
else:
|
||
parent.children.append(node)
|
||
stack.append(node)
|
||
return roots
|
||
|
||
|
||
def _parse_name(value: str) -> tuple[str | None, str | None]:
|
||
if "/" in value:
|
||
given, _, rest = value.partition("/")
|
||
surname = rest.split("/", 1)[0]
|
||
return given.strip() or None, surname.strip() or None
|
||
return value.strip() or None, None
|
||
|
||
|
||
def _year(date_value: str | None) -> str | None:
|
||
if not date_value:
|
||
return None
|
||
m = re.search(r"\b(\d{3,4})\b", date_value)
|
||
return m.group(1) if m else None
|
||
|
||
|
||
def _date_start(date_value: str | None) -> date | None:
|
||
y = _year(date_value)
|
||
if not y:
|
||
return None
|
||
try:
|
||
return date(int(y), 1, 1)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _sex(value: str | None) -> str | None:
|
||
if not value:
|
||
return None
|
||
v = value.strip().upper()
|
||
return {"M": "male", "F": "female"}.get(v, value.strip().lower() or None)
|
||
|
||
|
||
async def import_gedcom(
|
||
session: AsyncSession, *, actor: User, tree: Tree, text: str
|
||
) -> dict:
|
||
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
||
raise Forbidden("not an editor of this tree")
|
||
|
||
roots = parse_records(text)
|
||
counts = defaultdict(int)
|
||
unmapped: set[str] = set()
|
||
place_cache: dict[str, uuid.UUID] = {}
|
||
source_map: dict[str, uuid.UUID] = {}
|
||
person_map: dict[str, uuid.UUID] = {}
|
||
|
||
async def place_id(name: str | None) -> uuid.UUID | None:
|
||
if not name:
|
||
return None
|
||
if name in place_cache:
|
||
return place_cache[name]
|
||
p = Place(tree_id=tree.id, name=name)
|
||
session.add(p)
|
||
await session.flush()
|
||
place_cache[name] = p.id
|
||
counts["places"] += 1
|
||
return p.id
|
||
|
||
# Sources first (so citations can reference them).
|
||
for rec in roots:
|
||
if rec.tag == "SOUR" and rec.xref:
|
||
src = Source(
|
||
tree_id=tree.id,
|
||
title=rec.text("TITL") or rec.text("ABBR") or "Untitled source",
|
||
author=rec.text("AUTH"),
|
||
publication_info=rec.text("PUBL"),
|
||
citation_text=rec.text("TEXT"),
|
||
)
|
||
session.add(src)
|
||
await session.flush()
|
||
source_map[rec.xref] = src.id
|
||
counts["sources"] += 1
|
||
|
||
async def add_citations(holder: GedcomNode, **target) -> None:
|
||
for s in holder.all("SOUR"):
|
||
sid = source_map.get(s.value.strip())
|
||
if sid is None:
|
||
continue
|
||
session.add(
|
||
Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target)
|
||
)
|
||
counts["citations"] += 1
|
||
|
||
# Individuals.
|
||
for rec in roots:
|
||
if rec.tag != "INDI" or not rec.xref:
|
||
continue
|
||
person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX")))
|
||
session.add(person)
|
||
await session.flush()
|
||
person_map[rec.xref] = person.id
|
||
counts["persons"] += 1
|
||
|
||
for i, nm in enumerate(rec.all("NAME")):
|
||
given, surname = _parse_name(nm.value)
|
||
session.add(
|
||
Name(
|
||
tree_id=tree.id,
|
||
person_id=person.id,
|
||
name_type="birth",
|
||
given=given,
|
||
surname=surname,
|
||
display_name=nm.value or None,
|
||
is_primary=(i == 0),
|
||
sort_order=i,
|
||
)
|
||
)
|
||
counts["names"] += 1
|
||
|
||
await add_citations(rec, person_id=person.id)
|
||
|
||
for child in rec.children:
|
||
if child.tag in INDI_EVENTS:
|
||
dv = child.text("DATE")
|
||
ev = Event(
|
||
tree_id=tree.id,
|
||
person_id=person.id,
|
||
event_type=INDI_EVENTS[child.tag],
|
||
date_value=dv,
|
||
date_start=_date_start(dv),
|
||
place_id=await place_id(child.text("PLAC")),
|
||
)
|
||
session.add(ev)
|
||
await session.flush()
|
||
counts["events"] += 1
|
||
await add_citations(child, event_id=ev.id)
|
||
elif child.tag in ("NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID"):
|
||
continue
|
||
else:
|
||
unmapped.add(child.tag)
|
||
|
||
# Families -> partnerships, parent-child edges, marriage events.
|
||
for rec in roots:
|
||
if rec.tag != "FAM":
|
||
continue
|
||
counts["families"] += 1
|
||
husb = person_map.get((rec.text("HUSB") or "").strip())
|
||
wife = person_map.get((rec.text("WIFE") or "").strip())
|
||
partnership_id: uuid.UUID | None = None
|
||
if husb and wife:
|
||
rel = Relationship(
|
||
tree_id=tree.id,
|
||
type=RelationshipType.partnership,
|
||
person_from_id=husb,
|
||
person_to_id=wife,
|
||
)
|
||
session.add(rel)
|
||
await session.flush()
|
||
partnership_id = rel.id
|
||
counts["relationships"] += 1
|
||
|
||
for fe in rec.children:
|
||
if fe.tag in FAM_EVENTS and partnership_id is not None:
|
||
dv = fe.text("DATE")
|
||
ev = Event(
|
||
tree_id=tree.id,
|
||
relationship_id=partnership_id,
|
||
event_type=FAM_EVENTS[fe.tag],
|
||
date_value=dv,
|
||
date_start=_date_start(dv),
|
||
place_id=await place_id(fe.text("PLAC")),
|
||
)
|
||
session.add(ev)
|
||
await session.flush()
|
||
counts["events"] += 1
|
||
|
||
for chil in rec.all("CHIL"):
|
||
cp = person_map.get(chil.value.strip())
|
||
if cp is None:
|
||
continue
|
||
for parent in (husb, wife):
|
||
if parent and parent != cp:
|
||
session.add(
|
||
Relationship(
|
||
tree_id=tree.id,
|
||
type=RelationshipType.parent_child,
|
||
person_from_id=parent,
|
||
person_to_id=cp,
|
||
qualifier=ParentChildQualifier.biological,
|
||
)
|
||
)
|
||
counts["relationships"] += 1
|
||
|
||
record_audit(
|
||
session,
|
||
action="import",
|
||
entity_type="Gedcom",
|
||
tree_id=tree.id,
|
||
actor_user_id=actor.id,
|
||
after=dict(counts),
|
||
)
|
||
await session.commit()
|
||
return {"counts": dict(counts), "unmapped_tags": sorted(unmapped)}
|
||
|
||
|
||
def _ged_date(value: str | None) -> str | None:
|
||
return value.strip() if value else None
|
||
|
||
|
||
async def export_gedcom(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> str:
|
||
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
|
||
raise Forbidden("not permitted to view this tree")
|
||
|
||
persons = list(
|
||
(
|
||
await session.execute(
|
||
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
names = list(
|
||
(
|
||
await session.execute(
|
||
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
events = list(
|
||
(
|
||
await session.execute(
|
||
select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
rels = list(
|
||
(
|
||
await session.execute(
|
||
select(Relationship).where(
|
||
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
|
||
)
|
||
)
|
||
).scalars().all()
|
||
)
|
||
sources = list(
|
||
(
|
||
await session.execute(
|
||
select(Source).where(Source.tree_id == tree.id, Source.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
places = {
|
||
p.id: p
|
||
for p in (
|
||
await session.execute(select(Place).where(Place.tree_id == tree.id))
|
||
).scalars().all()
|
||
}
|
||
|
||
pxref = {p.id: f"@I{i + 1}@" for i, p in enumerate(persons)}
|
||
gender_by_id = {p.id: p.gender for p in persons}
|
||
sxref = {s.id: f"@S{i + 1}@" for i, s in enumerate(sources)}
|
||
names_by_person: dict[uuid.UUID, list[Name]] = defaultdict(list)
|
||
for n in sorted(names, key=lambda n: (n.sort_order, not n.is_primary)):
|
||
names_by_person[n.person_id].append(n)
|
||
events_by_person: dict[uuid.UUID, list[Event]] = defaultdict(list)
|
||
events_by_rel: dict[uuid.UUID, list[Event]] = defaultdict(list)
|
||
for e in events:
|
||
if e.person_id:
|
||
events_by_person[e.person_id].append(e)
|
||
elif e.relationship_id:
|
||
events_by_rel[e.relationship_id].append(e)
|
||
|
||
# Build families from parent-child + partnership edges (group by parent set).
|
||
parents_of: dict[uuid.UUID, set[uuid.UUID]] = defaultdict(set)
|
||
for r in rels:
|
||
if r.type == RelationshipType.parent_child:
|
||
parents_of[r.person_to_id].add(r.person_from_id)
|
||
fams: dict[frozenset, dict] = {}
|
||
for child, ps in parents_of.items():
|
||
key = frozenset(ps)
|
||
fams.setdefault(key, {"parents": set(ps), "children": [], "rel_id": None})
|
||
fams[key]["children"].append(child)
|
||
for r in rels:
|
||
if r.type == RelationshipType.partnership:
|
||
key = frozenset({r.person_from_id, r.person_to_id})
|
||
fam = fams.setdefault(
|
||
key,
|
||
{"parents": {r.person_from_id, r.person_to_id}, "children": [], "rel_id": None},
|
||
)
|
||
fam["rel_id"] = r.id
|
||
fam_list = list(fams.values())
|
||
fxref = {id(f): f"@F{i + 1}@" for i, f in enumerate(fam_list)}
|
||
# person -> the families they are a spouse in / a child in
|
||
spouse_fams: dict[uuid.UUID, list[str]] = defaultdict(list)
|
||
child_fams: dict[uuid.UUID, str] = {}
|
||
for f in fam_list:
|
||
x = fxref[id(f)]
|
||
for pid in f["parents"]:
|
||
spouse_fams[pid].append(x)
|
||
for cid in f["children"]:
|
||
child_fams[cid] = x
|
||
|
||
out: list[str] = ["0 HEAD", "1 SOUR Provenance", "1 GEDC", "2 VERS 5.5.1", "1 CHAR UTF-8"]
|
||
|
||
for p in persons:
|
||
out.append(f"0 {pxref[p.id]} INDI")
|
||
for n in names_by_person.get(p.id, []):
|
||
display = n.display_name or f"{n.given or ''} /{n.surname or ''}/".strip()
|
||
out.append(f"1 NAME {display}")
|
||
sex = {"male": "M", "female": "F"}.get(p.gender or "")
|
||
if sex:
|
||
out.append(f"1 SEX {sex}")
|
||
for e in events_by_person.get(p.id, []):
|
||
tag = EVENT_TO_GED.get(e.event_type)
|
||
if not tag:
|
||
continue
|
||
out.append(f"1 {tag}")
|
||
if _ged_date(e.date_value):
|
||
out.append(f"2 DATE {e.date_value}")
|
||
if e.place_id and e.place_id in places:
|
||
out.append(f"2 PLAC {places[e.place_id].name}")
|
||
if p.id in child_fams:
|
||
out.append(f"1 FAMC {child_fams[p.id]}")
|
||
for x in spouse_fams.get(p.id, []):
|
||
out.append(f"1 FAMS {x}")
|
||
|
||
for f in fam_list:
|
||
x = fxref[id(f)]
|
||
out.append(f"0 {x} FAM")
|
||
ps = list(f["parents"])
|
||
# HUSB/WIFE by recorded gender where possible.
|
||
males = [pid for pid in ps if gender_by_id.get(pid) == "male"]
|
||
females = [pid for pid in ps if gender_by_id.get(pid) == "female"]
|
||
husb = males[0] if males else (ps[0] if ps else None)
|
||
wife = females[0] if females else next((pid for pid in ps if pid != husb), None)
|
||
if husb:
|
||
out.append(f"1 HUSB {pxref[husb]}")
|
||
if wife:
|
||
out.append(f"1 WIFE {pxref[wife]}")
|
||
for cid in f["children"]:
|
||
out.append(f"1 CHIL {pxref[cid]}")
|
||
if f["rel_id"]:
|
||
for e in events_by_rel.get(f["rel_id"], []):
|
||
tag = EVENT_TO_GED.get(e.event_type)
|
||
if not tag:
|
||
continue
|
||
out.append(f"1 {tag}")
|
||
if _ged_date(e.date_value):
|
||
out.append(f"2 DATE {e.date_value}")
|
||
|
||
for s in sources:
|
||
out.append(f"0 {sxref[s.id]} SOUR")
|
||
if s.title:
|
||
out.append(f"1 TITL {s.title}")
|
||
if s.author:
|
||
out.append(f"1 AUTH {s.author}")
|
||
if s.publication_info:
|
||
out.append(f"1 PUBL {s.publication_info}")
|
||
|
||
out.append("0 TRLR")
|
||
return "\n".join(out) + "\n"
|