"""GEDCOM import/export. A pragmatic parser + mapper for the common subset of GEDCOM (5.5.1 / 7 share the line grammar): INDI, FAM, SOUR. Import maps records into a tree and returns a mapping report (counts + unmapped tags); export serializes the tree back to GEDCOM. Runs inline for now — large files should move to the worker later. """ import re import uuid from collections import defaultdict from datetime import date from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import ParentChildQualifier, RelationshipType from app.models.event import Event from app.models.person import Name, Person from app.models.place import Place from app.models.relationship import Relationship from app.models.source import Citation, Source from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden # GEDCOM event tag -> our event_type (INDI-level). INDI_EVENTS = { "BIRT": "birth", "DEAT": "death", "BAPM": "baptism", "CHR": "christening", "BURI": "burial", "CREM": "cremation", "RESI": "residence", "CENS": "census", "IMMI": "immigration", "EMIG": "emigration", "OCCU": "occupation", "EDUC": "education", "GRAD": "graduation", "RETI": "retirement", "NATU": "naturalization", "BAPL": "baptism", } # FAM-level events. FAM_EVENTS = {"MARR": "marriage", "DIV": "divorce", "ENGA": "engagement"} EVENT_TO_GED = {v: k for k, v in {**INDI_EVENTS, **FAM_EVENTS}.items()} class GedcomNode: __slots__ = ("level", "tag", "value", "xref", "children") def __init__(self, level: int, tag: str, value: str = "", xref: str | None = None): self.level = level self.tag = tag self.value = value self.xref = xref self.children: list[GedcomNode] = [] def first(self, tag: str) -> "GedcomNode | None": return next((c for c in self.children if c.tag == tag), None) def all(self, tag: str) -> list["GedcomNode"]: return [c for c in self.children if c.tag == tag] def text(self, tag: str, default: str | None = None) -> str | None: n = self.first(tag) return n.value if n is not None else default def parse_records(text: str) -> list[GedcomNode]: roots: list[GedcomNode] = [] stack: list[GedcomNode] = [] for raw in text.replace("\r\n", "\n").replace("\r", "\n").split("\n"): line = raw.lstrip("").rstrip() if not line.strip(): continue parts = line.split(" ", 1) try: level = int(parts[0]) except ValueError: continue rest = parts[1] if len(parts) > 1 else "" xref: str | None = None if rest.startswith("@"): end = rest.find("@", 1) if end != -1: xref = rest[: end + 1] rest = rest[end + 1:].strip() tparts = rest.split(" ", 1) tag = tparts[0] value = tparts[1] if len(tparts) > 1 else "" while stack and stack[-1].level >= level: stack.pop() parent = stack[-1] if stack else None if tag in ("CONC", "CONT") and parent is not None: parent.value += ("" if tag == "CONC" else "\n") + value continue node = GedcomNode(level, tag, value, xref) if parent is None: roots.append(node) else: parent.children.append(node) stack.append(node) return roots def _parse_name(value: str) -> tuple[str | None, str | None]: if "/" in value: given, _, rest = value.partition("/") surname = rest.split("/", 1)[0] return given.strip() or None, surname.strip() or None return value.strip() or None, None def _year(date_value: str | None) -> str | None: if not date_value: return None m = re.search(r"\b(\d{3,4})\b", date_value) return m.group(1) if m else None def _date_start(date_value: str | None) -> date | None: y = _year(date_value) if not y: return None try: return date(int(y), 1, 1) except ValueError: return None def _sex(value: str | None) -> str | None: if not value: return None v = value.strip().upper() return {"M": "male", "F": "female"}.get(v, value.strip().lower() or None) async def import_gedcom( session: AsyncSession, *, actor: User, tree: Tree, text: str ) -> dict: if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") roots = parse_records(text) counts = defaultdict(int) unmapped: set[str] = set() place_cache: dict[str, uuid.UUID] = {} source_map: dict[str, uuid.UUID] = {} person_map: dict[str, uuid.UUID] = {} async def place_id(name: str | None) -> uuid.UUID | None: if not name: return None if name in place_cache: return place_cache[name] p = Place(tree_id=tree.id, name=name) session.add(p) await session.flush() place_cache[name] = p.id counts["places"] += 1 return p.id # Sources first (so citations can reference them). for rec in roots: if rec.tag == "SOUR" and rec.xref: src = Source( tree_id=tree.id, title=rec.text("TITL") or rec.text("ABBR") or "Untitled source", author=rec.text("AUTH"), publication_info=rec.text("PUBL"), citation_text=rec.text("TEXT"), ) session.add(src) await session.flush() source_map[rec.xref] = src.id counts["sources"] += 1 async def add_citations(holder: GedcomNode, **target) -> None: for s in holder.all("SOUR"): sid = source_map.get(s.value.strip()) if sid is None: continue session.add( Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target) ) counts["citations"] += 1 # Individuals. for rec in roots: if rec.tag != "INDI" or not rec.xref: continue person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX"))) session.add(person) await session.flush() person_map[rec.xref] = person.id counts["persons"] += 1 for i, nm in enumerate(rec.all("NAME")): given, surname = _parse_name(nm.value) session.add( Name( tree_id=tree.id, person_id=person.id, name_type="birth", given=given, surname=surname, display_name=nm.value or None, is_primary=(i == 0), sort_order=i, ) ) counts["names"] += 1 await add_citations(rec, person_id=person.id) for child in rec.children: if child.tag in INDI_EVENTS: dv = child.text("DATE") ev = Event( tree_id=tree.id, person_id=person.id, event_type=INDI_EVENTS[child.tag], date_value=dv, date_start=_date_start(dv), place_id=await place_id(child.text("PLAC")), ) session.add(ev) await session.flush() counts["events"] += 1 await add_citations(child, event_id=ev.id) elif child.tag in ("NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID"): continue else: unmapped.add(child.tag) # Families -> partnerships, parent-child edges, marriage events. for rec in roots: if rec.tag != "FAM": continue counts["families"] += 1 husb = person_map.get((rec.text("HUSB") or "").strip()) wife = person_map.get((rec.text("WIFE") or "").strip()) partnership_id: uuid.UUID | None = None if husb and wife: rel = Relationship( tree_id=tree.id, type=RelationshipType.partnership, person_from_id=husb, person_to_id=wife, ) session.add(rel) await session.flush() partnership_id = rel.id counts["relationships"] += 1 for fe in rec.children: if fe.tag in FAM_EVENTS and partnership_id is not None: dv = fe.text("DATE") ev = Event( tree_id=tree.id, relationship_id=partnership_id, event_type=FAM_EVENTS[fe.tag], date_value=dv, date_start=_date_start(dv), place_id=await place_id(fe.text("PLAC")), ) session.add(ev) await session.flush() counts["events"] += 1 for chil in rec.all("CHIL"): cp = person_map.get(chil.value.strip()) if cp is None: continue for parent in (husb, wife): if parent and parent != cp: session.add( Relationship( tree_id=tree.id, type=RelationshipType.parent_child, person_from_id=parent, person_to_id=cp, qualifier=ParentChildQualifier.biological, ) ) counts["relationships"] += 1 record_audit( session, action="import", entity_type="Gedcom", tree_id=tree.id, actor_user_id=actor.id, after=dict(counts), ) await session.commit() return {"counts": dict(counts), "unmapped_tags": sorted(unmapped)} def _ged_date(value: str | None) -> str | None: return value.strip() if value else None async def export_gedcom(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> str: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") persons = list( ( await session.execute( select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) ) ).scalars().all() ) names = list( ( await session.execute( select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None)) ) ).scalars().all() ) events = list( ( await session.execute( select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None)) ) ).scalars().all() ) rels = list( ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None) ) ) ).scalars().all() ) sources = list( ( await session.execute( select(Source).where(Source.tree_id == tree.id, Source.deleted_at.is_(None)) ) ).scalars().all() ) places = { p.id: p for p in ( await session.execute(select(Place).where(Place.tree_id == tree.id)) ).scalars().all() } pxref = {p.id: f"@I{i + 1}@" for i, p in enumerate(persons)} gender_by_id = {p.id: p.gender for p in persons} sxref = {s.id: f"@S{i + 1}@" for i, s in enumerate(sources)} names_by_person: dict[uuid.UUID, list[Name]] = defaultdict(list) for n in sorted(names, key=lambda n: (n.sort_order, not n.is_primary)): names_by_person[n.person_id].append(n) events_by_person: dict[uuid.UUID, list[Event]] = defaultdict(list) events_by_rel: dict[uuid.UUID, list[Event]] = defaultdict(list) for e in events: if e.person_id: events_by_person[e.person_id].append(e) elif e.relationship_id: events_by_rel[e.relationship_id].append(e) # Build families from parent-child + partnership edges (group by parent set). parents_of: dict[uuid.UUID, set[uuid.UUID]] = defaultdict(set) for r in rels: if r.type == RelationshipType.parent_child: parents_of[r.person_to_id].add(r.person_from_id) fams: dict[frozenset, dict] = {} for child, ps in parents_of.items(): key = frozenset(ps) fams.setdefault(key, {"parents": set(ps), "children": [], "rel_id": None}) fams[key]["children"].append(child) for r in rels: if r.type == RelationshipType.partnership: key = frozenset({r.person_from_id, r.person_to_id}) fam = fams.setdefault( key, {"parents": {r.person_from_id, r.person_to_id}, "children": [], "rel_id": None}, ) fam["rel_id"] = r.id fam_list = list(fams.values()) fxref = {id(f): f"@F{i + 1}@" for i, f in enumerate(fam_list)} # person -> the families they are a spouse in / a child in spouse_fams: dict[uuid.UUID, list[str]] = defaultdict(list) child_fams: dict[uuid.UUID, str] = {} for f in fam_list: x = fxref[id(f)] for pid in f["parents"]: spouse_fams[pid].append(x) for cid in f["children"]: child_fams[cid] = x out: list[str] = ["0 HEAD", "1 SOUR Provenance", "1 GEDC", "2 VERS 5.5.1", "1 CHAR UTF-8"] for p in persons: out.append(f"0 {pxref[p.id]} INDI") for n in names_by_person.get(p.id, []): display = n.display_name or f"{n.given or ''} /{n.surname or ''}/".strip() out.append(f"1 NAME {display}") sex = {"male": "M", "female": "F"}.get(p.gender or "") if sex: out.append(f"1 SEX {sex}") for e in events_by_person.get(p.id, []): tag = EVENT_TO_GED.get(e.event_type) if not tag: continue out.append(f"1 {tag}") if _ged_date(e.date_value): out.append(f"2 DATE {e.date_value}") if e.place_id and e.place_id in places: out.append(f"2 PLAC {places[e.place_id].name}") if p.id in child_fams: out.append(f"1 FAMC {child_fams[p.id]}") for x in spouse_fams.get(p.id, []): out.append(f"1 FAMS {x}") for f in fam_list: x = fxref[id(f)] out.append(f"0 {x} FAM") ps = list(f["parents"]) # HUSB/WIFE by recorded gender where possible. males = [pid for pid in ps if gender_by_id.get(pid) == "male"] females = [pid for pid in ps if gender_by_id.get(pid) == "female"] husb = males[0] if males else (ps[0] if ps else None) wife = females[0] if females else next((pid for pid in ps if pid != husb), None) if husb: out.append(f"1 HUSB {pxref[husb]}") if wife: out.append(f"1 WIFE {pxref[wife]}") for cid in f["children"]: out.append(f"1 CHIL {pxref[cid]}") if f["rel_id"]: for e in events_by_rel.get(f["rel_id"], []): tag = EVENT_TO_GED.get(e.event_type) if not tag: continue out.append(f"1 {tag}") if _ged_date(e.date_value): out.append(f"2 DATE {e.date_value}") for s in sources: out.append(f"0 {sxref[s.id]} SOUR") if s.title: out.append(f"1 TITL {s.title}") if s.author: out.append(f"1 AUTH {s.author}") if s.publication_info: out.append(f"1 PUBL {s.publication_info}") out.append("0 TRLR") return "\n".join(out) + "\n"