"""GEDCOM import/export. A pragmatic parser + mapper for the common subset of GEDCOM (5.5.1 / 7 share the line grammar): INDI, FAM, SOUR. Import maps records into a tree and returns a mapping report (counts + unmapped tags); export serializes the tree back to GEDCOM. Runs inline for now — large files should move to the worker later. Import is duplicate-aware: ``preview_gedcom`` reports incoming people that look like existing ones, and ``import_gedcom`` applies a per-record resolution (new / skip / merge / overwrite). Names carry their GEDCOM type (a married name imports as a typed alternate, not a second primary). """ import re import uuid from collections import defaultdict from datetime import UTC, date, datetime from difflib import SequenceMatcher from sqlalchemy import or_, select, update from sqlalchemy.ext.asyncio import AsyncSession from app.models.enums import ParentChildQualifier, RelationshipType from app.models.event import Event from app.models.person import Name, Person from app.models.place import Place from app.models.relationship import Relationship from app.models.source import Citation, Source from app.models.tree import Tree from app.models.user import User from app.services import privacy from app.services.audit import record_audit from app.services.exceptions import Forbidden # GEDCOM event tag -> our event_type (INDI-level). INDI_EVENTS = { "BIRT": "birth", "DEAT": "death", "BAPM": "baptism", "CHR": "christening", "BURI": "burial", "CREM": "cremation", "RESI": "residence", "CENS": "census", "IMMI": "immigration", "EMIG": "emigration", "OCCU": "occupation", "EDUC": "education", "GRAD": "graduation", "RETI": "retirement", "NATU": "naturalization", "BAPL": "baptism", "RELI": "religion", } # INDI attribute tags whose line VALUE is the fact (no date), stored in detail. VALUE_EVENTS = {"RELI", "OCCU", "EDUC"} # INDI sub-tags consumed elsewhere or intentionally ignored (not "unmapped"). INDI_SKIP_TAGS = { "NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID", "_MARNM", "NOTE", } # FAM-level events. FAM_EVENTS = {"MARR": "marriage", "DIV": "divorce", "ENGA": "engagement"} EVENT_TO_GED = {v: k for k, v in {**INDI_EVENTS, **FAM_EVENTS}.items()} # GEDCOM NAME TYPE (or _MARNM-derived) -> our Name.name_type vocabulary. NAME_TYPE_MAP = { "birth": "birth", "maiden": "birth", "married": "married", "aka": "alias", "also known as": "alias", "nickname": "nickname", "religious": "religious", "immigrant": "immigration", "immigration": "immigration", "professional": "alias", "other": "alias", } # Our type -> GEDCOM TYPE on export (birth is the default; emit nothing). EXPORT_TYPE_MAP = { "married": "married", "alias": "aka", "nickname": "nickname", "religious": "religious", "immigration": "immigrant", } class GedcomNode: __slots__ = ("level", "tag", "value", "xref", "children") def __init__(self, level: int, tag: str, value: str = "", xref: str | None = None): self.level = level self.tag = tag self.value = value self.xref = xref self.children: list[GedcomNode] = [] def first(self, tag: str) -> "GedcomNode | None": return next((c for c in self.children if c.tag == tag), None) def all(self, tag: str) -> list["GedcomNode"]: return [c for c in self.children if c.tag == tag] def text(self, tag: str, default: str | None = None) -> str | None: n = self.first(tag) return n.value if n is not None else default def parse_records(text: str) -> list[GedcomNode]: roots: list[GedcomNode] = [] stack: list[GedcomNode] = [] for raw in text.replace("\r\n", "\n").replace("\r", "\n").split("\n"): line = raw.lstrip("").rstrip() if not line.strip(): continue parts = line.split(" ", 1) try: level = int(parts[0]) except ValueError: continue rest = parts[1] if len(parts) > 1 else "" xref: str | None = None if rest.startswith("@"): end = rest.find("@", 1) if end != -1: xref = rest[: end + 1] rest = rest[end + 1:].strip() tparts = rest.split(" ", 1) tag = tparts[0] value = tparts[1] if len(tparts) > 1 else "" while stack and stack[-1].level >= level: stack.pop() parent = stack[-1] if stack else None if tag in ("CONC", "CONT") and parent is not None: parent.value += ("" if tag == "CONC" else "\n") + value continue node = GedcomNode(level, tag, value, xref) if parent is None: roots.append(node) else: parent.children.append(node) stack.append(node) return roots def _parse_name(value: str) -> tuple[str | None, str | None]: if "/" in value: given, _, rest = value.partition("/") surname = rest.split("/", 1)[0] return given.strip() or None, surname.strip() or None return value.strip() or None, None def _parse_marnm(value: str, base_given: str | None) -> tuple[str | None, str | None]: """A _MARNM value is sometimes a full name ("Jane /Smith/") and sometimes just the married surname ("Smith"). Keep the given name from the base name in the latter case.""" v = (value or "").strip() if "/" in v: g, s = _parse_name(v) return (g or base_given), s return base_given, (v or None) def _extract_names(rec: GedcomNode) -> list[dict]: """All names for an INDI, typed. Multiple NAME records (each with an optional TYPE) plus any _MARNM (married name) subtags become separate Name rows. The first birth/maiden name is primary.""" out: list[dict] = [] for nm in rec.all("NAME"): g, s = _parse_name(nm.value) t = (nm.text("TYPE") or "").strip().lower() ntype = NAME_TYPE_MAP.get(t, t or "birth") out.append({"type": ntype, "given": g, "surname": s, "display": nm.value or None, "nickname": nm.text("NICK")}) for mar in nm.all("_MARNM"): mg, ms = _parse_marnm(mar.value, g) out.append({"type": "married", "given": mg, "surname": ms, "display": mar.value or None, "nickname": None}) for mar in rec.all("_MARNM"): base_g = out[0]["given"] if out else None mg, ms = _parse_marnm(mar.value, base_g) out.append({"type": "married", "given": mg, "surname": ms, "display": mar.value or None, "nickname": None}) if not out: return out primary_idx = next((i for i, n in enumerate(out) if n["type"] == "birth"), 0) for i, n in enumerate(out): n["is_primary"] = i == primary_idx n["sort"] = i return out def _norm(given: str | None, surname: str | None) -> str: return re.sub(r"\s+", " ", f"{given or ''} {surname or ''}".strip().lower()) def _year(date_value: str | None) -> str | None: if not date_value: return None m = re.search(r"\b(\d{3,4})\b", date_value) return m.group(1) if m else None def _date_start(date_value: str | None) -> date | None: y = _year(date_value) if not y: return None try: return date(int(y), 1, 1) except ValueError: return None def _sex(value: str | None) -> str | None: if not value: return None v = value.strip().upper() return {"M": "male", "F": "female"}.get(v, value.strip().lower() or None) def _notes_text(rec: GedcomNode) -> str | None: """Join an INDI's NOTE lines (which pack confidence / findagrave / fs_pid / free text) into the person's notes field.""" vals = [n.value.strip() for n in rec.all("NOTE") if n.value and n.value.strip()] return "\n".join(vals) or None def _person_summary(rec: GedcomNode) -> dict: """Display name + birth year for an incoming INDI, for duplicate matching.""" names = _extract_names(rec) primary = next((n for n in names if n.get("is_primary")), names[0] if names else None) g = primary["given"] if primary else None s = primary["surname"] if primary else None disp = " ".join(x for x in (g, s) if x) if not disp and primary: disp = primary.get("display") or "" birth = rec.first("BIRT") year = _year(birth.text("DATE")) if birth else None return {"names": names, "norm": _norm(g, s), "name": disp or "(no name)", "year": year} async def _build_existing_index(session: AsyncSession, tree: Tree) -> list[dict]: """Existing (non-deleted) people with a display name + birth year, for matching incoming records against.""" persons = list( ( await session.execute( select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) ) ).scalars().all() ) names = list( ( await session.execute( select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None)) ) ).scalars().all() ) name_by_person: dict[uuid.UUID, Name] = {} for n in sorted(names, key=lambda n: (not n.is_primary, n.sort_order)): name_by_person.setdefault(n.person_id, n) births = list( ( await session.execute( select(Event).where( Event.tree_id == tree.id, Event.deleted_at.is_(None), Event.event_type == "birth", ) ) ).scalars().all() ) year_by_person: dict[uuid.UUID, str] = {} for e in births: if e.person_id and e.person_id not in year_by_person: y = str(e.date_start.year) if e.date_start else _year(e.date_value) if y: year_by_person[e.person_id] = y index: list[dict] = [] for p in persons: nm = name_by_person.get(p.id) g = nm.given if nm else None s = nm.surname if nm else None disp = " ".join(x for x in (g, s) if x) or (nm.display_name if nm else None) index.append({ "id": p.id, "norm": _norm(g, s), "name": disp or "(no name)", "year": year_by_person.get(p.id), }) return index def _best_match(norm: str, year: str | None, index: list[dict]) -> tuple[dict | None, str | None]: """Closest existing person by name similarity, rejecting clear birth-year conflicts. Returns (entry, "high"|"medium") or (None, None).""" if not norm: return None, None best: dict | None = None best_r = 0.0 for e in index: if not e["norm"]: continue r = SequenceMatcher(None, norm, e["norm"]).ratio() if r < 0.88: continue if year and e["year"] and abs(int(year) - int(e["year"])) > 1: continue # same-ish name but different birth year — not a duplicate if r > best_r: best_r = r best = e if best is None: return None, None year_match = bool(year and best["year"] and abs(int(year) - int(best["year"])) <= 1) both_unknown = not year and not best["year"] score = "high" if best_r >= 0.93 and (year_match or both_unknown) else "medium" return best, score def _relkey(rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID) -> tuple: if rtype == RelationshipType.parent_child: return ("pc", str(a), str(b)) return (rtype.value, *sorted([str(a), str(b)])) def _count_incoming(roots: list[GedcomNode]) -> tuple[dict, list[str]]: counts: dict[str, int] = defaultdict(int) unmapped: set[str] = set() for rec in roots: if rec.tag == "INDI" and rec.xref: counts["persons"] += 1 counts["names"] += len(_extract_names(rec)) for child in rec.children: if child.tag in INDI_EVENTS: counts["events"] += 1 elif child.tag not in INDI_SKIP_TAGS: unmapped.add(child.tag) elif rec.tag == "FAM": counts["families"] += 1 for child in rec.children: if child.tag in FAM_EVENTS: counts["events"] += 1 elif rec.tag == "SOUR" and rec.xref: counts["sources"] += 1 return dict(counts), sorted(unmapped) async def preview_gedcom(session: AsyncSession, *, actor: User, tree: Tree, text: str) -> dict: """Dry run: what would import, and which incoming people look like existing ones. No writes.""" if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") roots = parse_records(text) counts, unmapped = _count_incoming(roots) index = await _build_existing_index(session, tree) duplicates: list[dict] = [] for rec in roots: if rec.tag != "INDI" or not rec.xref: continue summ = _person_summary(rec) entry, score = _best_match(summ["norm"], summ["year"], index) if entry is None: continue duplicates.append({ "xref": rec.xref, "incoming_name": summ["name"], "incoming_birth_year": summ["year"], "existing_person_id": entry["id"], "existing_name": entry["name"], "existing_birth_year": entry["year"], "score": score, }) return {"counts": counts, "potential_duplicates": duplicates, "unmapped_tags": unmapped} async def import_gedcom( session: AsyncSession, *, actor: User, tree: Tree, text: str, default_action: str = "new", resolutions: dict | None = None, ) -> dict: """Import records. ``default_action`` (new|skip|merge|overwrite) applies to incoming people that match an existing one; ``resolutions`` overrides it per GEDCOM xref ({xref: {action, target_id}}). 'skip' links families to the existing person but copies nothing; 'merge' also copies the incoming names (as alternates), events and citations onto them; 'overwrite' deletes the existing person and imports the incoming one fresh.""" if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree): raise Forbidden("not an editor of this tree") resolutions = resolutions or {} roots = parse_records(text) counts: dict[str, int] = defaultdict(int) unmapped: set[str] = set() place_cache: dict[str, uuid.UUID] = {} source_map: dict[str, uuid.UUID] = {} person_map: dict[str, uuid.UUID] = {} now = datetime.now(UTC) index = await _build_existing_index(session, tree) # Pre-load existing relationship keys so a merge doesn't create dup edges. existing_rels = list( ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None) ) ) ).scalars().all() ) rel_keys = {_relkey(r.type, r.person_from_id, r.person_to_id) for r in existing_rels} def add_relationship( rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID, **kw ) -> Relationship | None: key = _relkey(rtype, a, b) if key in rel_keys: return None rel = Relationship(tree_id=tree.id, type=rtype, person_from_id=a, person_to_id=b, **kw) session.add(rel) rel_keys.add(key) counts["relationships"] += 1 return rel async def place_id(name: str | None) -> uuid.UUID | None: if not name: return None if name in place_cache: return place_cache[name] p = Place(tree_id=tree.id, name=name) session.add(p) await session.flush() place_cache[name] = p.id counts["places"] += 1 return p.id # Sources first (so citations can reference them). for rec in roots: if rec.tag == "SOUR" and rec.xref: src = Source( tree_id=tree.id, title=rec.text("TITL") or rec.text("ABBR") or "Untitled source", author=rec.text("AUTH"), publication_info=rec.text("PUBL"), citation_text=rec.text("TEXT"), ) session.add(src) await session.flush() source_map[rec.xref] = src.id counts["sources"] += 1 async def add_citations(holder: GedcomNode, **target) -> None: for s in holder.all("SOUR"): sid = source_map.get(s.value.strip()) if sid is None: continue session.add(Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target)) counts["citations"] += 1 def add_names(person_id: uuid.UUID, names: list[dict], *, set_primary: bool) -> None: for nd in names: session.add( Name( tree_id=tree.id, person_id=person_id, name_type=nd["type"], given=nd["given"], surname=nd["surname"], nickname=nd.get("nickname"), display_name=nd.get("display"), is_primary=set_primary and nd.get("is_primary", False), sort_order=nd.get("sort", 0), ) ) counts["names"] += 1 async def add_events(rec: GedcomNode, person_id: uuid.UUID) -> None: for child in rec.children: if child.tag in INDI_EVENTS: dv = child.text("DATE") # Attribute-style facts (RELI, OCCU, EDUC) carry their value on # the line itself; store it in detail. detail = child.value.strip() if child.tag in VALUE_EVENTS else None ev = Event( tree_id=tree.id, person_id=person_id, event_type=INDI_EVENTS[child.tag], date_value=dv, date_start=_date_start(dv), place_id=await place_id(child.text("PLAC")), detail=detail or None, notes=child.text("NOTE"), ) session.add(ev) await session.flush() counts["events"] += 1 await add_citations(child, event_id=ev.id) elif child.tag in INDI_SKIP_TAGS: continue else: unmapped.add(child.tag) async def soft_delete_existing(person_id: uuid.UUID) -> None: p = ( await session.execute( select(Person).where(Person.id == person_id, Person.deleted_at.is_(None)) ) ).scalar_one_or_none() if p is None: return p.deleted_at = now rels = ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None), or_( Relationship.person_from_id == person_id, Relationship.person_to_id == person_id, ), ) ) ).scalars().all() for r in rels: r.deleted_at = now await session.execute( update(User).where(User.self_person_id == person_id).values(self_person_id=None) ) # Precompute the best match per incoming xref (for default-policy resolution). matches: dict[str, dict] = {} for rec in roots: if rec.tag == "INDI" and rec.xref: summ = _person_summary(rec) entry, _score = _best_match(summ["norm"], summ["year"], index) if entry is not None: matches[rec.xref] = entry def resolve(xref: str) -> tuple[str, uuid.UUID | None]: ov = resolutions.get(xref) if ov: action = ov.get("action", "new") tid = ov.get("target_id") target = uuid.UUID(tid) if tid else (matches[xref]["id"] if xref in matches else None) if action in ("skip", "merge", "overwrite") and target is None: return "new", None return action, target if default_action != "new" and xref in matches: return default_action, matches[xref]["id"] return "new", None # Individuals. for rec in roots: if rec.tag != "INDI" or not rec.xref: continue names = _extract_names(rec) action, target = resolve(rec.xref) if action == "skip" and target is not None: person_map[rec.xref] = target counts["skipped"] += 1 continue if action == "merge" and target is not None: person_map[rec.xref] = target add_names(target, names, set_primary=False) await add_events(rec, target) await add_citations(rec, person_id=target) note = _notes_text(rec) if note: existing = ( await session.execute(select(Person).where(Person.id == target)) ).scalar_one_or_none() if existing is not None: existing.notes = "\n".join(filter(None, [existing.notes, note])) counts["merged"] += 1 continue if action == "overwrite" and target is not None: await soft_delete_existing(target) counts["overwritten"] += 1 person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX")), notes=_notes_text(rec)) session.add(person) await session.flush() person_map[rec.xref] = person.id counts["persons"] += 1 add_names(person.id, names, set_primary=True) await add_citations(rec, person_id=person.id) await add_events(rec, person.id) # Families -> partnerships, parent-child edges, marriage events. for rec in roots: if rec.tag != "FAM": continue counts["families"] += 1 husb = person_map.get((rec.text("HUSB") or "").strip()) wife = person_map.get((rec.text("WIFE") or "").strip()) partnership_id: uuid.UUID | None = None if husb and wife and husb != wife: rel = add_relationship(RelationshipType.partnership, husb, wife) if rel is not None: await session.flush() partnership_id = rel.id if partnership_id is None and husb and wife: # Edge already existed — find it so marriage events can attach. existing = next( ( r for r in existing_rels if r.type == RelationshipType.partnership and {r.person_from_id, r.person_to_id} == {husb, wife} ), None, ) partnership_id = existing.id if existing else None for fe in rec.children: if fe.tag in FAM_EVENTS and partnership_id is not None: dv = fe.text("DATE") ev = Event( tree_id=tree.id, relationship_id=partnership_id, event_type=FAM_EVENTS[fe.tag], date_value=dv, date_start=_date_start(dv), place_id=await place_id(fe.text("PLAC")), ) session.add(ev) await session.flush() counts["events"] += 1 for chil in rec.all("CHIL"): cp = person_map.get(chil.value.strip()) if cp is None: continue for parent in (husb, wife): if parent and parent != cp: add_relationship( RelationshipType.parent_child, parent, cp, qualifier=ParentChildQualifier.biological, ) record_audit( session, action="import", entity_type="Gedcom", tree_id=tree.id, actor_user_id=actor.id, after=dict(counts), ) await session.commit() return {"counts": dict(counts), "unmapped_tags": sorted(unmapped)} def _ged_date(value: str | None) -> str | None: return value.strip() if value else None async def export_gedcom(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> str: if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree): raise Forbidden("not permitted to view this tree") persons = list( ( await session.execute( select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None)) ) ).scalars().all() ) names = list( ( await session.execute( select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None)) ) ).scalars().all() ) events = list( ( await session.execute( select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None)) ) ).scalars().all() ) rels = list( ( await session.execute( select(Relationship).where( Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None) ) ) ).scalars().all() ) sources = list( ( await session.execute( select(Source).where(Source.tree_id == tree.id, Source.deleted_at.is_(None)) ) ).scalars().all() ) places = { p.id: p for p in ( await session.execute(select(Place).where(Place.tree_id == tree.id)) ).scalars().all() } pxref = {p.id: f"@I{i + 1}@" for i, p in enumerate(persons)} gender_by_id = {p.id: p.gender for p in persons} sxref = {s.id: f"@S{i + 1}@" for i, s in enumerate(sources)} names_by_person: dict[uuid.UUID, list[Name]] = defaultdict(list) for n in sorted(names, key=lambda n: (n.sort_order, not n.is_primary)): names_by_person[n.person_id].append(n) events_by_person: dict[uuid.UUID, list[Event]] = defaultdict(list) events_by_rel: dict[uuid.UUID, list[Event]] = defaultdict(list) for e in events: if e.person_id: events_by_person[e.person_id].append(e) elif e.relationship_id: events_by_rel[e.relationship_id].append(e) # Build families from parent-child + partnership edges (group by parent set). parents_of: dict[uuid.UUID, set[uuid.UUID]] = defaultdict(set) for r in rels: if r.type == RelationshipType.parent_child: parents_of[r.person_to_id].add(r.person_from_id) fams: dict[frozenset, dict] = {} for child, ps in parents_of.items(): key = frozenset(ps) fams.setdefault(key, {"parents": set(ps), "children": [], "rel_id": None}) fams[key]["children"].append(child) for r in rels: if r.type == RelationshipType.partnership: key = frozenset({r.person_from_id, r.person_to_id}) fam = fams.setdefault( key, {"parents": {r.person_from_id, r.person_to_id}, "children": [], "rel_id": None}, ) fam["rel_id"] = r.id fam_list = list(fams.values()) fxref = {id(f): f"@F{i + 1}@" for i, f in enumerate(fam_list)} # person -> the families they are a spouse in / a child in spouse_fams: dict[uuid.UUID, list[str]] = defaultdict(list) child_fams: dict[uuid.UUID, str] = {} for f in fam_list: x = fxref[id(f)] for pid in f["parents"]: spouse_fams[pid].append(x) for cid in f["children"]: child_fams[cid] = x out: list[str] = ["0 HEAD", "1 SOUR Provenance", "1 GEDC", "2 VERS 5.5.1", "1 CHAR UTF-8"] for p in persons: out.append(f"0 {pxref[p.id]} INDI") for n in names_by_person.get(p.id, []): display = n.display_name or f"{n.given or ''} /{n.surname or ''}/".strip() out.append(f"1 NAME {display}") ged_type = EXPORT_TYPE_MAP.get(n.name_type) if ged_type: out.append(f"2 TYPE {ged_type}") sex = {"male": "M", "female": "F"}.get(p.gender or "") if sex: out.append(f"1 SEX {sex}") for e in events_by_person.get(p.id, []): tag = EVENT_TO_GED.get(e.event_type) if not tag: continue out.append(f"1 {tag}") if _ged_date(e.date_value): out.append(f"2 DATE {e.date_value}") if e.place_id and e.place_id in places: out.append(f"2 PLAC {places[e.place_id].name}") if p.id in child_fams: out.append(f"1 FAMC {child_fams[p.id]}") for x in spouse_fams.get(p.id, []): out.append(f"1 FAMS {x}") for f in fam_list: x = fxref[id(f)] out.append(f"0 {x} FAM") ps = list(f["parents"]) # HUSB/WIFE by recorded gender where possible. males = [pid for pid in ps if gender_by_id.get(pid) == "male"] females = [pid for pid in ps if gender_by_id.get(pid) == "female"] husb = males[0] if males else (ps[0] if ps else None) wife = females[0] if females else next((pid for pid in ps if pid != husb), None) if husb: out.append(f"1 HUSB {pxref[husb]}") if wife: out.append(f"1 WIFE {pxref[wife]}") for cid in f["children"]: out.append(f"1 CHIL {pxref[cid]}") if f["rel_id"]: for e in events_by_rel.get(f["rel_id"], []): tag = EVENT_TO_GED.get(e.event_type) if not tag: continue out.append(f"1 {tag}") if _ged_date(e.date_value): out.append(f"2 DATE {e.date_value}") for s in sources: out.append(f"0 {sxref[s.id]} SOUR") if s.title: out.append(f"1 TITL {s.title}") if s.author: out.append(f"1 AUTH {s.author}") if s.publication_info: out.append(f"1 PUBL {s.publication_info}") out.append("0 TRLR") return "\n".join(out) + "\n"