5824e70895
Duplicate detection (the "merge / skip / overwrite" the user asked for):
- New POST /gedcom/preview dry-runs the file and flags incoming people that
resemble existing ones (name similarity via difflib + birth-year guard;
high/medium score). No writes.
- /gedcom/import takes default_action (new|skip|merge|overwrite) + per-xref
resolutions {xref: {action, target_id}}:
new create as a new person (current behavior)
skip link families to the existing person, copy nothing
merge attach the incoming names (as alternates), events, citations,
and notes onto the existing person
overwrite soft-delete the existing person, import the incoming one fresh
Relationship creation is deduped so a merge can't double an edge.
Richer record mapping (covers the user's repo's GEDCOM):
- Multiple NAME records honor their TYPE; _MARNM (and NICK) import as typed
alternate names — maiden stays primary, married becomes a "married" Name.
- RELI -> a "religion" event with the value in detail; OCCU/EDUC values too.
- NOTE -> person notes (and event notes); NOTE/RELI are no longer "unmapped".
- Export round-trips name TYPE.
Verified against the user's 2185-person export: 0 unmapped tags. 48 tests pass.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
802 lines
30 KiB
Python
802 lines
30 KiB
Python
"""GEDCOM import/export.
|
||
|
||
A pragmatic parser + mapper for the common subset of GEDCOM (5.5.1 / 7 share
|
||
the line grammar): INDI, FAM, SOUR. Import maps records into a tree and returns
|
||
a mapping report (counts + unmapped tags); export serializes the tree back to
|
||
GEDCOM. Runs inline for now — large files should move to the worker later.
|
||
|
||
Import is duplicate-aware: ``preview_gedcom`` reports incoming people that look
|
||
like existing ones, and ``import_gedcom`` applies a per-record resolution
|
||
(new / skip / merge / overwrite). Names carry their GEDCOM type (a married name
|
||
imports as a typed alternate, not a second primary).
|
||
"""
|
||
|
||
import re
|
||
import uuid
|
||
from collections import defaultdict
|
||
from datetime import UTC, date, datetime
|
||
from difflib import SequenceMatcher
|
||
|
||
from sqlalchemy import or_, select, update
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.models.enums import ParentChildQualifier, RelationshipType
|
||
from app.models.event import Event
|
||
from app.models.person import Name, Person
|
||
from app.models.place import Place
|
||
from app.models.relationship import Relationship
|
||
from app.models.source import Citation, Source
|
||
from app.models.tree import Tree
|
||
from app.models.user import User
|
||
from app.services import privacy
|
||
from app.services.audit import record_audit
|
||
from app.services.exceptions import Forbidden
|
||
|
||
# GEDCOM event tag -> our event_type (INDI-level).
|
||
INDI_EVENTS = {
|
||
"BIRT": "birth", "DEAT": "death", "BAPM": "baptism", "CHR": "christening",
|
||
"BURI": "burial", "CREM": "cremation", "RESI": "residence", "CENS": "census",
|
||
"IMMI": "immigration", "EMIG": "emigration", "OCCU": "occupation",
|
||
"EDUC": "education", "GRAD": "graduation", "RETI": "retirement",
|
||
"NATU": "naturalization", "BAPL": "baptism", "RELI": "religion",
|
||
}
|
||
# INDI attribute tags whose line VALUE is the fact (no date), stored in detail.
|
||
VALUE_EVENTS = {"RELI", "OCCU", "EDUC"}
|
||
# INDI sub-tags consumed elsewhere or intentionally ignored (not "unmapped").
|
||
INDI_SKIP_TAGS = {
|
||
"NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID", "_MARNM", "NOTE",
|
||
}
|
||
# FAM-level events.
|
||
FAM_EVENTS = {"MARR": "marriage", "DIV": "divorce", "ENGA": "engagement"}
|
||
EVENT_TO_GED = {v: k for k, v in {**INDI_EVENTS, **FAM_EVENTS}.items()}
|
||
|
||
# GEDCOM NAME TYPE (or _MARNM-derived) -> our Name.name_type vocabulary.
|
||
NAME_TYPE_MAP = {
|
||
"birth": "birth", "maiden": "birth", "married": "married",
|
||
"aka": "alias", "also known as": "alias", "nickname": "nickname",
|
||
"religious": "religious", "immigrant": "immigration",
|
||
"immigration": "immigration", "professional": "alias", "other": "alias",
|
||
}
|
||
# Our type -> GEDCOM TYPE on export (birth is the default; emit nothing).
|
||
EXPORT_TYPE_MAP = {
|
||
"married": "married", "alias": "aka", "nickname": "nickname",
|
||
"religious": "religious", "immigration": "immigrant",
|
||
}
|
||
|
||
|
||
class GedcomNode:
|
||
__slots__ = ("level", "tag", "value", "xref", "children")
|
||
|
||
def __init__(self, level: int, tag: str, value: str = "", xref: str | None = None):
|
||
self.level = level
|
||
self.tag = tag
|
||
self.value = value
|
||
self.xref = xref
|
||
self.children: list[GedcomNode] = []
|
||
|
||
def first(self, tag: str) -> "GedcomNode | None":
|
||
return next((c for c in self.children if c.tag == tag), None)
|
||
|
||
def all(self, tag: str) -> list["GedcomNode"]:
|
||
return [c for c in self.children if c.tag == tag]
|
||
|
||
def text(self, tag: str, default: str | None = None) -> str | None:
|
||
n = self.first(tag)
|
||
return n.value if n is not None else default
|
||
|
||
|
||
def parse_records(text: str) -> list[GedcomNode]:
|
||
roots: list[GedcomNode] = []
|
||
stack: list[GedcomNode] = []
|
||
for raw in text.replace("\r\n", "\n").replace("\r", "\n").split("\n"):
|
||
line = raw.lstrip("").rstrip()
|
||
if not line.strip():
|
||
continue
|
||
parts = line.split(" ", 1)
|
||
try:
|
||
level = int(parts[0])
|
||
except ValueError:
|
||
continue
|
||
rest = parts[1] if len(parts) > 1 else ""
|
||
xref: str | None = None
|
||
if rest.startswith("@"):
|
||
end = rest.find("@", 1)
|
||
if end != -1:
|
||
xref = rest[: end + 1]
|
||
rest = rest[end + 1:].strip()
|
||
tparts = rest.split(" ", 1)
|
||
tag = tparts[0]
|
||
value = tparts[1] if len(tparts) > 1 else ""
|
||
|
||
while stack and stack[-1].level >= level:
|
||
stack.pop()
|
||
parent = stack[-1] if stack else None
|
||
|
||
if tag in ("CONC", "CONT") and parent is not None:
|
||
parent.value += ("" if tag == "CONC" else "\n") + value
|
||
continue
|
||
|
||
node = GedcomNode(level, tag, value, xref)
|
||
if parent is None:
|
||
roots.append(node)
|
||
else:
|
||
parent.children.append(node)
|
||
stack.append(node)
|
||
return roots
|
||
|
||
|
||
def _parse_name(value: str) -> tuple[str | None, str | None]:
|
||
if "/" in value:
|
||
given, _, rest = value.partition("/")
|
||
surname = rest.split("/", 1)[0]
|
||
return given.strip() or None, surname.strip() or None
|
||
return value.strip() or None, None
|
||
|
||
|
||
def _parse_marnm(value: str, base_given: str | None) -> tuple[str | None, str | None]:
|
||
"""A _MARNM value is sometimes a full name ("Jane /Smith/") and sometimes
|
||
just the married surname ("Smith"). Keep the given name from the base name
|
||
in the latter case."""
|
||
v = (value or "").strip()
|
||
if "/" in v:
|
||
g, s = _parse_name(v)
|
||
return (g or base_given), s
|
||
return base_given, (v or None)
|
||
|
||
|
||
def _extract_names(rec: GedcomNode) -> list[dict]:
|
||
"""All names for an INDI, typed. Multiple NAME records (each with an optional
|
||
TYPE) plus any _MARNM (married name) subtags become separate Name rows. The
|
||
first birth/maiden name is primary."""
|
||
out: list[dict] = []
|
||
for nm in rec.all("NAME"):
|
||
g, s = _parse_name(nm.value)
|
||
t = (nm.text("TYPE") or "").strip().lower()
|
||
ntype = NAME_TYPE_MAP.get(t, t or "birth")
|
||
out.append({"type": ntype, "given": g, "surname": s, "display": nm.value or None,
|
||
"nickname": nm.text("NICK")})
|
||
for mar in nm.all("_MARNM"):
|
||
mg, ms = _parse_marnm(mar.value, g)
|
||
out.append({"type": "married", "given": mg, "surname": ms,
|
||
"display": mar.value or None, "nickname": None})
|
||
for mar in rec.all("_MARNM"):
|
||
base_g = out[0]["given"] if out else None
|
||
mg, ms = _parse_marnm(mar.value, base_g)
|
||
out.append({"type": "married", "given": mg, "surname": ms,
|
||
"display": mar.value or None, "nickname": None})
|
||
if not out:
|
||
return out
|
||
primary_idx = next((i for i, n in enumerate(out) if n["type"] == "birth"), 0)
|
||
for i, n in enumerate(out):
|
||
n["is_primary"] = i == primary_idx
|
||
n["sort"] = i
|
||
return out
|
||
|
||
|
||
def _norm(given: str | None, surname: str | None) -> str:
|
||
return re.sub(r"\s+", " ", f"{given or ''} {surname or ''}".strip().lower())
|
||
|
||
|
||
def _year(date_value: str | None) -> str | None:
|
||
if not date_value:
|
||
return None
|
||
m = re.search(r"\b(\d{3,4})\b", date_value)
|
||
return m.group(1) if m else None
|
||
|
||
|
||
def _date_start(date_value: str | None) -> date | None:
|
||
y = _year(date_value)
|
||
if not y:
|
||
return None
|
||
try:
|
||
return date(int(y), 1, 1)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def _sex(value: str | None) -> str | None:
|
||
if not value:
|
||
return None
|
||
v = value.strip().upper()
|
||
return {"M": "male", "F": "female"}.get(v, value.strip().lower() or None)
|
||
|
||
|
||
def _notes_text(rec: GedcomNode) -> str | None:
|
||
"""Join an INDI's NOTE lines (which pack confidence / findagrave / fs_pid /
|
||
free text) into the person's notes field."""
|
||
vals = [n.value.strip() for n in rec.all("NOTE") if n.value and n.value.strip()]
|
||
return "\n".join(vals) or None
|
||
|
||
|
||
def _person_summary(rec: GedcomNode) -> dict:
|
||
"""Display name + birth year for an incoming INDI, for duplicate matching."""
|
||
names = _extract_names(rec)
|
||
primary = next((n for n in names if n.get("is_primary")), names[0] if names else None)
|
||
g = primary["given"] if primary else None
|
||
s = primary["surname"] if primary else None
|
||
disp = " ".join(x for x in (g, s) if x)
|
||
if not disp and primary:
|
||
disp = primary.get("display") or ""
|
||
birth = rec.first("BIRT")
|
||
year = _year(birth.text("DATE")) if birth else None
|
||
return {"names": names, "norm": _norm(g, s), "name": disp or "(no name)", "year": year}
|
||
|
||
|
||
async def _build_existing_index(session: AsyncSession, tree: Tree) -> list[dict]:
|
||
"""Existing (non-deleted) people with a display name + birth year, for
|
||
matching incoming records against."""
|
||
persons = list(
|
||
(
|
||
await session.execute(
|
||
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
names = list(
|
||
(
|
||
await session.execute(
|
||
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
name_by_person: dict[uuid.UUID, Name] = {}
|
||
for n in sorted(names, key=lambda n: (not n.is_primary, n.sort_order)):
|
||
name_by_person.setdefault(n.person_id, n)
|
||
births = list(
|
||
(
|
||
await session.execute(
|
||
select(Event).where(
|
||
Event.tree_id == tree.id,
|
||
Event.deleted_at.is_(None),
|
||
Event.event_type == "birth",
|
||
)
|
||
)
|
||
).scalars().all()
|
||
)
|
||
year_by_person: dict[uuid.UUID, str] = {}
|
||
for e in births:
|
||
if e.person_id and e.person_id not in year_by_person:
|
||
y = str(e.date_start.year) if e.date_start else _year(e.date_value)
|
||
if y:
|
||
year_by_person[e.person_id] = y
|
||
|
||
index: list[dict] = []
|
||
for p in persons:
|
||
nm = name_by_person.get(p.id)
|
||
g = nm.given if nm else None
|
||
s = nm.surname if nm else None
|
||
disp = " ".join(x for x in (g, s) if x) or (nm.display_name if nm else None)
|
||
index.append({
|
||
"id": p.id,
|
||
"norm": _norm(g, s),
|
||
"name": disp or "(no name)",
|
||
"year": year_by_person.get(p.id),
|
||
})
|
||
return index
|
||
|
||
|
||
def _best_match(norm: str, year: str | None, index: list[dict]) -> tuple[dict | None, str | None]:
|
||
"""Closest existing person by name similarity, rejecting clear birth-year
|
||
conflicts. Returns (entry, "high"|"medium") or (None, None)."""
|
||
if not norm:
|
||
return None, None
|
||
best: dict | None = None
|
||
best_r = 0.0
|
||
for e in index:
|
||
if not e["norm"]:
|
||
continue
|
||
r = SequenceMatcher(None, norm, e["norm"]).ratio()
|
||
if r < 0.88:
|
||
continue
|
||
if year and e["year"] and abs(int(year) - int(e["year"])) > 1:
|
||
continue # same-ish name but different birth year — not a duplicate
|
||
if r > best_r:
|
||
best_r = r
|
||
best = e
|
||
if best is None:
|
||
return None, None
|
||
year_match = bool(year and best["year"] and abs(int(year) - int(best["year"])) <= 1)
|
||
both_unknown = not year and not best["year"]
|
||
score = "high" if best_r >= 0.93 and (year_match or both_unknown) else "medium"
|
||
return best, score
|
||
|
||
|
||
def _relkey(rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID) -> tuple:
|
||
if rtype == RelationshipType.parent_child:
|
||
return ("pc", str(a), str(b))
|
||
return (rtype.value, *sorted([str(a), str(b)]))
|
||
|
||
|
||
def _count_incoming(roots: list[GedcomNode]) -> tuple[dict, list[str]]:
|
||
counts: dict[str, int] = defaultdict(int)
|
||
unmapped: set[str] = set()
|
||
for rec in roots:
|
||
if rec.tag == "INDI" and rec.xref:
|
||
counts["persons"] += 1
|
||
counts["names"] += len(_extract_names(rec))
|
||
for child in rec.children:
|
||
if child.tag in INDI_EVENTS:
|
||
counts["events"] += 1
|
||
elif child.tag not in INDI_SKIP_TAGS:
|
||
unmapped.add(child.tag)
|
||
elif rec.tag == "FAM":
|
||
counts["families"] += 1
|
||
for child in rec.children:
|
||
if child.tag in FAM_EVENTS:
|
||
counts["events"] += 1
|
||
elif rec.tag == "SOUR" and rec.xref:
|
||
counts["sources"] += 1
|
||
return dict(counts), sorted(unmapped)
|
||
|
||
|
||
async def preview_gedcom(session: AsyncSession, *, actor: User, tree: Tree, text: str) -> dict:
|
||
"""Dry run: what would import, and which incoming people look like existing
|
||
ones. No writes."""
|
||
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
||
raise Forbidden("not an editor of this tree")
|
||
roots = parse_records(text)
|
||
counts, unmapped = _count_incoming(roots)
|
||
index = await _build_existing_index(session, tree)
|
||
|
||
duplicates: list[dict] = []
|
||
for rec in roots:
|
||
if rec.tag != "INDI" or not rec.xref:
|
||
continue
|
||
summ = _person_summary(rec)
|
||
entry, score = _best_match(summ["norm"], summ["year"], index)
|
||
if entry is None:
|
||
continue
|
||
duplicates.append({
|
||
"xref": rec.xref,
|
||
"incoming_name": summ["name"],
|
||
"incoming_birth_year": summ["year"],
|
||
"existing_person_id": entry["id"],
|
||
"existing_name": entry["name"],
|
||
"existing_birth_year": entry["year"],
|
||
"score": score,
|
||
})
|
||
return {"counts": counts, "potential_duplicates": duplicates, "unmapped_tags": unmapped}
|
||
|
||
|
||
async def import_gedcom(
|
||
session: AsyncSession,
|
||
*,
|
||
actor: User,
|
||
tree: Tree,
|
||
text: str,
|
||
default_action: str = "new",
|
||
resolutions: dict | None = None,
|
||
) -> dict:
|
||
"""Import records. ``default_action`` (new|skip|merge|overwrite) applies to
|
||
incoming people that match an existing one; ``resolutions`` overrides it per
|
||
GEDCOM xref ({xref: {action, target_id}}). 'skip' links families to the
|
||
existing person but copies nothing; 'merge' also copies the incoming names
|
||
(as alternates), events and citations onto them; 'overwrite' deletes the
|
||
existing person and imports the incoming one fresh."""
|
||
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
||
raise Forbidden("not an editor of this tree")
|
||
|
||
resolutions = resolutions or {}
|
||
roots = parse_records(text)
|
||
counts: dict[str, int] = defaultdict(int)
|
||
unmapped: set[str] = set()
|
||
place_cache: dict[str, uuid.UUID] = {}
|
||
source_map: dict[str, uuid.UUID] = {}
|
||
person_map: dict[str, uuid.UUID] = {}
|
||
now = datetime.now(UTC)
|
||
|
||
index = await _build_existing_index(session, tree)
|
||
|
||
# Pre-load existing relationship keys so a merge doesn't create dup edges.
|
||
existing_rels = list(
|
||
(
|
||
await session.execute(
|
||
select(Relationship).where(
|
||
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
|
||
)
|
||
)
|
||
).scalars().all()
|
||
)
|
||
rel_keys = {_relkey(r.type, r.person_from_id, r.person_to_id) for r in existing_rels}
|
||
|
||
def add_relationship(
|
||
rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID, **kw
|
||
) -> Relationship | None:
|
||
key = _relkey(rtype, a, b)
|
||
if key in rel_keys:
|
||
return None
|
||
rel = Relationship(tree_id=tree.id, type=rtype, person_from_id=a, person_to_id=b, **kw)
|
||
session.add(rel)
|
||
rel_keys.add(key)
|
||
counts["relationships"] += 1
|
||
return rel
|
||
|
||
async def place_id(name: str | None) -> uuid.UUID | None:
|
||
if not name:
|
||
return None
|
||
if name in place_cache:
|
||
return place_cache[name]
|
||
p = Place(tree_id=tree.id, name=name)
|
||
session.add(p)
|
||
await session.flush()
|
||
place_cache[name] = p.id
|
||
counts["places"] += 1
|
||
return p.id
|
||
|
||
# Sources first (so citations can reference them).
|
||
for rec in roots:
|
||
if rec.tag == "SOUR" and rec.xref:
|
||
src = Source(
|
||
tree_id=tree.id,
|
||
title=rec.text("TITL") or rec.text("ABBR") or "Untitled source",
|
||
author=rec.text("AUTH"),
|
||
publication_info=rec.text("PUBL"),
|
||
citation_text=rec.text("TEXT"),
|
||
)
|
||
session.add(src)
|
||
await session.flush()
|
||
source_map[rec.xref] = src.id
|
||
counts["sources"] += 1
|
||
|
||
async def add_citations(holder: GedcomNode, **target) -> None:
|
||
for s in holder.all("SOUR"):
|
||
sid = source_map.get(s.value.strip())
|
||
if sid is None:
|
||
continue
|
||
session.add(Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target))
|
||
counts["citations"] += 1
|
||
|
||
def add_names(person_id: uuid.UUID, names: list[dict], *, set_primary: bool) -> None:
|
||
for nd in names:
|
||
session.add(
|
||
Name(
|
||
tree_id=tree.id,
|
||
person_id=person_id,
|
||
name_type=nd["type"],
|
||
given=nd["given"],
|
||
surname=nd["surname"],
|
||
nickname=nd.get("nickname"),
|
||
display_name=nd.get("display"),
|
||
is_primary=set_primary and nd.get("is_primary", False),
|
||
sort_order=nd.get("sort", 0),
|
||
)
|
||
)
|
||
counts["names"] += 1
|
||
|
||
async def add_events(rec: GedcomNode, person_id: uuid.UUID) -> None:
|
||
for child in rec.children:
|
||
if child.tag in INDI_EVENTS:
|
||
dv = child.text("DATE")
|
||
# Attribute-style facts (RELI, OCCU, EDUC) carry their value on
|
||
# the line itself; store it in detail.
|
||
detail = child.value.strip() if child.tag in VALUE_EVENTS else None
|
||
ev = Event(
|
||
tree_id=tree.id,
|
||
person_id=person_id,
|
||
event_type=INDI_EVENTS[child.tag],
|
||
date_value=dv,
|
||
date_start=_date_start(dv),
|
||
place_id=await place_id(child.text("PLAC")),
|
||
detail=detail or None,
|
||
notes=child.text("NOTE"),
|
||
)
|
||
session.add(ev)
|
||
await session.flush()
|
||
counts["events"] += 1
|
||
await add_citations(child, event_id=ev.id)
|
||
elif child.tag in INDI_SKIP_TAGS:
|
||
continue
|
||
else:
|
||
unmapped.add(child.tag)
|
||
|
||
async def soft_delete_existing(person_id: uuid.UUID) -> None:
|
||
p = (
|
||
await session.execute(
|
||
select(Person).where(Person.id == person_id, Person.deleted_at.is_(None))
|
||
)
|
||
).scalar_one_or_none()
|
||
if p is None:
|
||
return
|
||
p.deleted_at = now
|
||
rels = (
|
||
await session.execute(
|
||
select(Relationship).where(
|
||
Relationship.tree_id == tree.id,
|
||
Relationship.deleted_at.is_(None),
|
||
or_(
|
||
Relationship.person_from_id == person_id,
|
||
Relationship.person_to_id == person_id,
|
||
),
|
||
)
|
||
)
|
||
).scalars().all()
|
||
for r in rels:
|
||
r.deleted_at = now
|
||
await session.execute(
|
||
update(User).where(User.self_person_id == person_id).values(self_person_id=None)
|
||
)
|
||
|
||
# Precompute the best match per incoming xref (for default-policy resolution).
|
||
matches: dict[str, dict] = {}
|
||
for rec in roots:
|
||
if rec.tag == "INDI" and rec.xref:
|
||
summ = _person_summary(rec)
|
||
entry, _score = _best_match(summ["norm"], summ["year"], index)
|
||
if entry is not None:
|
||
matches[rec.xref] = entry
|
||
|
||
def resolve(xref: str) -> tuple[str, uuid.UUID | None]:
|
||
ov = resolutions.get(xref)
|
||
if ov:
|
||
action = ov.get("action", "new")
|
||
tid = ov.get("target_id")
|
||
target = uuid.UUID(tid) if tid else (matches[xref]["id"] if xref in matches else None)
|
||
if action in ("skip", "merge", "overwrite") and target is None:
|
||
return "new", None
|
||
return action, target
|
||
if default_action != "new" and xref in matches:
|
||
return default_action, matches[xref]["id"]
|
||
return "new", None
|
||
|
||
# Individuals.
|
||
for rec in roots:
|
||
if rec.tag != "INDI" or not rec.xref:
|
||
continue
|
||
names = _extract_names(rec)
|
||
action, target = resolve(rec.xref)
|
||
|
||
if action == "skip" and target is not None:
|
||
person_map[rec.xref] = target
|
||
counts["skipped"] += 1
|
||
continue
|
||
if action == "merge" and target is not None:
|
||
person_map[rec.xref] = target
|
||
add_names(target, names, set_primary=False)
|
||
await add_events(rec, target)
|
||
await add_citations(rec, person_id=target)
|
||
note = _notes_text(rec)
|
||
if note:
|
||
existing = (
|
||
await session.execute(select(Person).where(Person.id == target))
|
||
).scalar_one_or_none()
|
||
if existing is not None:
|
||
existing.notes = "\n".join(filter(None, [existing.notes, note]))
|
||
counts["merged"] += 1
|
||
continue
|
||
if action == "overwrite" and target is not None:
|
||
await soft_delete_existing(target)
|
||
counts["overwritten"] += 1
|
||
|
||
person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX")), notes=_notes_text(rec))
|
||
session.add(person)
|
||
await session.flush()
|
||
person_map[rec.xref] = person.id
|
||
counts["persons"] += 1
|
||
add_names(person.id, names, set_primary=True)
|
||
await add_citations(rec, person_id=person.id)
|
||
await add_events(rec, person.id)
|
||
|
||
# Families -> partnerships, parent-child edges, marriage events.
|
||
for rec in roots:
|
||
if rec.tag != "FAM":
|
||
continue
|
||
counts["families"] += 1
|
||
husb = person_map.get((rec.text("HUSB") or "").strip())
|
||
wife = person_map.get((rec.text("WIFE") or "").strip())
|
||
partnership_id: uuid.UUID | None = None
|
||
if husb and wife and husb != wife:
|
||
rel = add_relationship(RelationshipType.partnership, husb, wife)
|
||
if rel is not None:
|
||
await session.flush()
|
||
partnership_id = rel.id
|
||
if partnership_id is None and husb and wife:
|
||
# Edge already existed — find it so marriage events can attach.
|
||
existing = next(
|
||
(
|
||
r for r in existing_rels
|
||
if r.type == RelationshipType.partnership
|
||
and {r.person_from_id, r.person_to_id} == {husb, wife}
|
||
),
|
||
None,
|
||
)
|
||
partnership_id = existing.id if existing else None
|
||
|
||
for fe in rec.children:
|
||
if fe.tag in FAM_EVENTS and partnership_id is not None:
|
||
dv = fe.text("DATE")
|
||
ev = Event(
|
||
tree_id=tree.id,
|
||
relationship_id=partnership_id,
|
||
event_type=FAM_EVENTS[fe.tag],
|
||
date_value=dv,
|
||
date_start=_date_start(dv),
|
||
place_id=await place_id(fe.text("PLAC")),
|
||
)
|
||
session.add(ev)
|
||
await session.flush()
|
||
counts["events"] += 1
|
||
|
||
for chil in rec.all("CHIL"):
|
||
cp = person_map.get(chil.value.strip())
|
||
if cp is None:
|
||
continue
|
||
for parent in (husb, wife):
|
||
if parent and parent != cp:
|
||
add_relationship(
|
||
RelationshipType.parent_child,
|
||
parent,
|
||
cp,
|
||
qualifier=ParentChildQualifier.biological,
|
||
)
|
||
|
||
record_audit(
|
||
session,
|
||
action="import",
|
||
entity_type="Gedcom",
|
||
tree_id=tree.id,
|
||
actor_user_id=actor.id,
|
||
after=dict(counts),
|
||
)
|
||
await session.commit()
|
||
return {"counts": dict(counts), "unmapped_tags": sorted(unmapped)}
|
||
|
||
|
||
def _ged_date(value: str | None) -> str | None:
|
||
return value.strip() if value else None
|
||
|
||
|
||
async def export_gedcom(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tree) -> str:
|
||
if not await privacy.can_view_tree(session, user_id=viewer_id, tree=tree):
|
||
raise Forbidden("not permitted to view this tree")
|
||
|
||
persons = list(
|
||
(
|
||
await session.execute(
|
||
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
names = list(
|
||
(
|
||
await session.execute(
|
||
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
events = list(
|
||
(
|
||
await session.execute(
|
||
select(Event).where(Event.tree_id == tree.id, Event.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
rels = list(
|
||
(
|
||
await session.execute(
|
||
select(Relationship).where(
|
||
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
|
||
)
|
||
)
|
||
).scalars().all()
|
||
)
|
||
sources = list(
|
||
(
|
||
await session.execute(
|
||
select(Source).where(Source.tree_id == tree.id, Source.deleted_at.is_(None))
|
||
)
|
||
).scalars().all()
|
||
)
|
||
places = {
|
||
p.id: p
|
||
for p in (
|
||
await session.execute(select(Place).where(Place.tree_id == tree.id))
|
||
).scalars().all()
|
||
}
|
||
|
||
pxref = {p.id: f"@I{i + 1}@" for i, p in enumerate(persons)}
|
||
gender_by_id = {p.id: p.gender for p in persons}
|
||
sxref = {s.id: f"@S{i + 1}@" for i, s in enumerate(sources)}
|
||
names_by_person: dict[uuid.UUID, list[Name]] = defaultdict(list)
|
||
for n in sorted(names, key=lambda n: (n.sort_order, not n.is_primary)):
|
||
names_by_person[n.person_id].append(n)
|
||
events_by_person: dict[uuid.UUID, list[Event]] = defaultdict(list)
|
||
events_by_rel: dict[uuid.UUID, list[Event]] = defaultdict(list)
|
||
for e in events:
|
||
if e.person_id:
|
||
events_by_person[e.person_id].append(e)
|
||
elif e.relationship_id:
|
||
events_by_rel[e.relationship_id].append(e)
|
||
|
||
# Build families from parent-child + partnership edges (group by parent set).
|
||
parents_of: dict[uuid.UUID, set[uuid.UUID]] = defaultdict(set)
|
||
for r in rels:
|
||
if r.type == RelationshipType.parent_child:
|
||
parents_of[r.person_to_id].add(r.person_from_id)
|
||
fams: dict[frozenset, dict] = {}
|
||
for child, ps in parents_of.items():
|
||
key = frozenset(ps)
|
||
fams.setdefault(key, {"parents": set(ps), "children": [], "rel_id": None})
|
||
fams[key]["children"].append(child)
|
||
for r in rels:
|
||
if r.type == RelationshipType.partnership:
|
||
key = frozenset({r.person_from_id, r.person_to_id})
|
||
fam = fams.setdefault(
|
||
key,
|
||
{"parents": {r.person_from_id, r.person_to_id}, "children": [], "rel_id": None},
|
||
)
|
||
fam["rel_id"] = r.id
|
||
fam_list = list(fams.values())
|
||
fxref = {id(f): f"@F{i + 1}@" for i, f in enumerate(fam_list)}
|
||
# person -> the families they are a spouse in / a child in
|
||
spouse_fams: dict[uuid.UUID, list[str]] = defaultdict(list)
|
||
child_fams: dict[uuid.UUID, str] = {}
|
||
for f in fam_list:
|
||
x = fxref[id(f)]
|
||
for pid in f["parents"]:
|
||
spouse_fams[pid].append(x)
|
||
for cid in f["children"]:
|
||
child_fams[cid] = x
|
||
|
||
out: list[str] = ["0 HEAD", "1 SOUR Provenance", "1 GEDC", "2 VERS 5.5.1", "1 CHAR UTF-8"]
|
||
|
||
for p in persons:
|
||
out.append(f"0 {pxref[p.id]} INDI")
|
||
for n in names_by_person.get(p.id, []):
|
||
display = n.display_name or f"{n.given or ''} /{n.surname or ''}/".strip()
|
||
out.append(f"1 NAME {display}")
|
||
ged_type = EXPORT_TYPE_MAP.get(n.name_type)
|
||
if ged_type:
|
||
out.append(f"2 TYPE {ged_type}")
|
||
sex = {"male": "M", "female": "F"}.get(p.gender or "")
|
||
if sex:
|
||
out.append(f"1 SEX {sex}")
|
||
for e in events_by_person.get(p.id, []):
|
||
tag = EVENT_TO_GED.get(e.event_type)
|
||
if not tag:
|
||
continue
|
||
out.append(f"1 {tag}")
|
||
if _ged_date(e.date_value):
|
||
out.append(f"2 DATE {e.date_value}")
|
||
if e.place_id and e.place_id in places:
|
||
out.append(f"2 PLAC {places[e.place_id].name}")
|
||
if p.id in child_fams:
|
||
out.append(f"1 FAMC {child_fams[p.id]}")
|
||
for x in spouse_fams.get(p.id, []):
|
||
out.append(f"1 FAMS {x}")
|
||
|
||
for f in fam_list:
|
||
x = fxref[id(f)]
|
||
out.append(f"0 {x} FAM")
|
||
ps = list(f["parents"])
|
||
# HUSB/WIFE by recorded gender where possible.
|
||
males = [pid for pid in ps if gender_by_id.get(pid) == "male"]
|
||
females = [pid for pid in ps if gender_by_id.get(pid) == "female"]
|
||
husb = males[0] if males else (ps[0] if ps else None)
|
||
wife = females[0] if females else next((pid for pid in ps if pid != husb), None)
|
||
if husb:
|
||
out.append(f"1 HUSB {pxref[husb]}")
|
||
if wife:
|
||
out.append(f"1 WIFE {pxref[wife]}")
|
||
for cid in f["children"]:
|
||
out.append(f"1 CHIL {pxref[cid]}")
|
||
if f["rel_id"]:
|
||
for e in events_by_rel.get(f["rel_id"], []):
|
||
tag = EVENT_TO_GED.get(e.event_type)
|
||
if not tag:
|
||
continue
|
||
out.append(f"1 {tag}")
|
||
if _ged_date(e.date_value):
|
||
out.append(f"2 DATE {e.date_value}")
|
||
|
||
for s in sources:
|
||
out.append(f"0 {sxref[s.id]} SOUR")
|
||
if s.title:
|
||
out.append(f"1 TITL {s.title}")
|
||
if s.author:
|
||
out.append(f"1 AUTH {s.author}")
|
||
if s.publication_info:
|
||
out.append(f"1 PUBL {s.publication_info}")
|
||
|
||
out.append("0 TRLR")
|
||
return "\n".join(out) + "\n"
|