GEDCOM: duplicate-aware import + typed name/attribute mapping
Duplicate detection (the "merge / skip / overwrite" the user asked for):
- New POST /gedcom/preview dry-runs the file and flags incoming people that
resemble existing ones (name similarity via difflib + birth-year guard;
high/medium score). No writes.
- /gedcom/import takes default_action (new|skip|merge|overwrite) + per-xref
resolutions {xref: {action, target_id}}:
new create as a new person (current behavior)
skip link families to the existing person, copy nothing
merge attach the incoming names (as alternates), events, citations,
and notes onto the existing person
overwrite soft-delete the existing person, import the incoming one fresh
Relationship creation is deduped so a merge can't double an edge.
Richer record mapping (covers the user's repo's GEDCOM):
- Multiple NAME records honor their TYPE; _MARNM (and NICK) import as typed
alternate names — maiden stays primary, married becomes a "married" Name.
- RELI -> a "religion" event with the value in detail; OCCU/EDUC values too.
- NOTE -> person notes (and event notes); NOTE/RELI are no longer "unmapped".
- Export round-trips name TYPE.
Verified against the user's 2185-person export: 0 unmapped tags. 48 tests pass.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+400
-50
@@ -4,14 +4,20 @@ A pragmatic parser + mapper for the common subset of GEDCOM (5.5.1 / 7 share
|
||||
the line grammar): INDI, FAM, SOUR. Import maps records into a tree and returns
|
||||
a mapping report (counts + unmapped tags); export serializes the tree back to
|
||||
GEDCOM. Runs inline for now — large files should move to the worker later.
|
||||
|
||||
Import is duplicate-aware: ``preview_gedcom`` reports incoming people that look
|
||||
like existing ones, and ``import_gedcom`` applies a per-record resolution
|
||||
(new / skip / merge / overwrite). Names carry their GEDCOM type (a married name
|
||||
imports as a typed alternate, not a second primary).
|
||||
"""
|
||||
|
||||
import re
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from datetime import date
|
||||
from datetime import UTC, date, datetime
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import or_, select, update
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.models.enums import ParentChildQualifier, RelationshipType
|
||||
@@ -32,12 +38,31 @@ INDI_EVENTS = {
|
||||
"BURI": "burial", "CREM": "cremation", "RESI": "residence", "CENS": "census",
|
||||
"IMMI": "immigration", "EMIG": "emigration", "OCCU": "occupation",
|
||||
"EDUC": "education", "GRAD": "graduation", "RETI": "retirement",
|
||||
"NATU": "naturalization", "BAPL": "baptism",
|
||||
"NATU": "naturalization", "BAPL": "baptism", "RELI": "religion",
|
||||
}
|
||||
# INDI attribute tags whose line VALUE is the fact (no date), stored in detail.
|
||||
VALUE_EVENTS = {"RELI", "OCCU", "EDUC"}
|
||||
# INDI sub-tags consumed elsewhere or intentionally ignored (not "unmapped").
|
||||
INDI_SKIP_TAGS = {
|
||||
"NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID", "_MARNM", "NOTE",
|
||||
}
|
||||
# FAM-level events.
|
||||
FAM_EVENTS = {"MARR": "marriage", "DIV": "divorce", "ENGA": "engagement"}
|
||||
EVENT_TO_GED = {v: k for k, v in {**INDI_EVENTS, **FAM_EVENTS}.items()}
|
||||
|
||||
# GEDCOM NAME TYPE (or _MARNM-derived) -> our Name.name_type vocabulary.
|
||||
NAME_TYPE_MAP = {
|
||||
"birth": "birth", "maiden": "birth", "married": "married",
|
||||
"aka": "alias", "also known as": "alias", "nickname": "nickname",
|
||||
"religious": "religious", "immigrant": "immigration",
|
||||
"immigration": "immigration", "professional": "alias", "other": "alias",
|
||||
}
|
||||
# Our type -> GEDCOM TYPE on export (birth is the default; emit nothing).
|
||||
EXPORT_TYPE_MAP = {
|
||||
"married": "married", "alias": "aka", "nickname": "nickname",
|
||||
"religious": "religious", "immigration": "immigrant",
|
||||
}
|
||||
|
||||
|
||||
class GedcomNode:
|
||||
__slots__ = ("level", "tag", "value", "xref", "children")
|
||||
@@ -108,6 +133,50 @@ def _parse_name(value: str) -> tuple[str | None, str | None]:
|
||||
return value.strip() or None, None
|
||||
|
||||
|
||||
def _parse_marnm(value: str, base_given: str | None) -> tuple[str | None, str | None]:
|
||||
"""A _MARNM value is sometimes a full name ("Jane /Smith/") and sometimes
|
||||
just the married surname ("Smith"). Keep the given name from the base name
|
||||
in the latter case."""
|
||||
v = (value or "").strip()
|
||||
if "/" in v:
|
||||
g, s = _parse_name(v)
|
||||
return (g or base_given), s
|
||||
return base_given, (v or None)
|
||||
|
||||
|
||||
def _extract_names(rec: GedcomNode) -> list[dict]:
|
||||
"""All names for an INDI, typed. Multiple NAME records (each with an optional
|
||||
TYPE) plus any _MARNM (married name) subtags become separate Name rows. The
|
||||
first birth/maiden name is primary."""
|
||||
out: list[dict] = []
|
||||
for nm in rec.all("NAME"):
|
||||
g, s = _parse_name(nm.value)
|
||||
t = (nm.text("TYPE") or "").strip().lower()
|
||||
ntype = NAME_TYPE_MAP.get(t, t or "birth")
|
||||
out.append({"type": ntype, "given": g, "surname": s, "display": nm.value or None,
|
||||
"nickname": nm.text("NICK")})
|
||||
for mar in nm.all("_MARNM"):
|
||||
mg, ms = _parse_marnm(mar.value, g)
|
||||
out.append({"type": "married", "given": mg, "surname": ms,
|
||||
"display": mar.value or None, "nickname": None})
|
||||
for mar in rec.all("_MARNM"):
|
||||
base_g = out[0]["given"] if out else None
|
||||
mg, ms = _parse_marnm(mar.value, base_g)
|
||||
out.append({"type": "married", "given": mg, "surname": ms,
|
||||
"display": mar.value or None, "nickname": None})
|
||||
if not out:
|
||||
return out
|
||||
primary_idx = next((i for i, n in enumerate(out) if n["type"] == "birth"), 0)
|
||||
for i, n in enumerate(out):
|
||||
n["is_primary"] = i == primary_idx
|
||||
n["sort"] = i
|
||||
return out
|
||||
|
||||
|
||||
def _norm(given: str | None, surname: str | None) -> str:
|
||||
return re.sub(r"\s+", " ", f"{given or ''} {surname or ''}".strip().lower())
|
||||
|
||||
|
||||
def _year(date_value: str | None) -> str | None:
|
||||
if not date_value:
|
||||
return None
|
||||
@@ -132,18 +201,215 @@ def _sex(value: str | None) -> str | None:
|
||||
return {"M": "male", "F": "female"}.get(v, value.strip().lower() or None)
|
||||
|
||||
|
||||
def _notes_text(rec: GedcomNode) -> str | None:
|
||||
"""Join an INDI's NOTE lines (which pack confidence / findagrave / fs_pid /
|
||||
free text) into the person's notes field."""
|
||||
vals = [n.value.strip() for n in rec.all("NOTE") if n.value and n.value.strip()]
|
||||
return "\n".join(vals) or None
|
||||
|
||||
|
||||
def _person_summary(rec: GedcomNode) -> dict:
|
||||
"""Display name + birth year for an incoming INDI, for duplicate matching."""
|
||||
names = _extract_names(rec)
|
||||
primary = next((n for n in names if n.get("is_primary")), names[0] if names else None)
|
||||
g = primary["given"] if primary else None
|
||||
s = primary["surname"] if primary else None
|
||||
disp = " ".join(x for x in (g, s) if x)
|
||||
if not disp and primary:
|
||||
disp = primary.get("display") or ""
|
||||
birth = rec.first("BIRT")
|
||||
year = _year(birth.text("DATE")) if birth else None
|
||||
return {"names": names, "norm": _norm(g, s), "name": disp or "(no name)", "year": year}
|
||||
|
||||
|
||||
async def _build_existing_index(session: AsyncSession, tree: Tree) -> list[dict]:
|
||||
"""Existing (non-deleted) people with a display name + birth year, for
|
||||
matching incoming records against."""
|
||||
persons = list(
|
||||
(
|
||||
await session.execute(
|
||||
select(Person).where(Person.tree_id == tree.id, Person.deleted_at.is_(None))
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
names = list(
|
||||
(
|
||||
await session.execute(
|
||||
select(Name).where(Name.tree_id == tree.id, Name.deleted_at.is_(None))
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
name_by_person: dict[uuid.UUID, Name] = {}
|
||||
for n in sorted(names, key=lambda n: (not n.is_primary, n.sort_order)):
|
||||
name_by_person.setdefault(n.person_id, n)
|
||||
births = list(
|
||||
(
|
||||
await session.execute(
|
||||
select(Event).where(
|
||||
Event.tree_id == tree.id,
|
||||
Event.deleted_at.is_(None),
|
||||
Event.event_type == "birth",
|
||||
)
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
year_by_person: dict[uuid.UUID, str] = {}
|
||||
for e in births:
|
||||
if e.person_id and e.person_id not in year_by_person:
|
||||
y = str(e.date_start.year) if e.date_start else _year(e.date_value)
|
||||
if y:
|
||||
year_by_person[e.person_id] = y
|
||||
|
||||
index: list[dict] = []
|
||||
for p in persons:
|
||||
nm = name_by_person.get(p.id)
|
||||
g = nm.given if nm else None
|
||||
s = nm.surname if nm else None
|
||||
disp = " ".join(x for x in (g, s) if x) or (nm.display_name if nm else None)
|
||||
index.append({
|
||||
"id": p.id,
|
||||
"norm": _norm(g, s),
|
||||
"name": disp or "(no name)",
|
||||
"year": year_by_person.get(p.id),
|
||||
})
|
||||
return index
|
||||
|
||||
|
||||
def _best_match(norm: str, year: str | None, index: list[dict]) -> tuple[dict | None, str | None]:
|
||||
"""Closest existing person by name similarity, rejecting clear birth-year
|
||||
conflicts. Returns (entry, "high"|"medium") or (None, None)."""
|
||||
if not norm:
|
||||
return None, None
|
||||
best: dict | None = None
|
||||
best_r = 0.0
|
||||
for e in index:
|
||||
if not e["norm"]:
|
||||
continue
|
||||
r = SequenceMatcher(None, norm, e["norm"]).ratio()
|
||||
if r < 0.88:
|
||||
continue
|
||||
if year and e["year"] and abs(int(year) - int(e["year"])) > 1:
|
||||
continue # same-ish name but different birth year — not a duplicate
|
||||
if r > best_r:
|
||||
best_r = r
|
||||
best = e
|
||||
if best is None:
|
||||
return None, None
|
||||
year_match = bool(year and best["year"] and abs(int(year) - int(best["year"])) <= 1)
|
||||
both_unknown = not year and not best["year"]
|
||||
score = "high" if best_r >= 0.93 and (year_match or both_unknown) else "medium"
|
||||
return best, score
|
||||
|
||||
|
||||
def _relkey(rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID) -> tuple:
|
||||
if rtype == RelationshipType.parent_child:
|
||||
return ("pc", str(a), str(b))
|
||||
return (rtype.value, *sorted([str(a), str(b)]))
|
||||
|
||||
|
||||
def _count_incoming(roots: list[GedcomNode]) -> tuple[dict, list[str]]:
|
||||
counts: dict[str, int] = defaultdict(int)
|
||||
unmapped: set[str] = set()
|
||||
for rec in roots:
|
||||
if rec.tag == "INDI" and rec.xref:
|
||||
counts["persons"] += 1
|
||||
counts["names"] += len(_extract_names(rec))
|
||||
for child in rec.children:
|
||||
if child.tag in INDI_EVENTS:
|
||||
counts["events"] += 1
|
||||
elif child.tag not in INDI_SKIP_TAGS:
|
||||
unmapped.add(child.tag)
|
||||
elif rec.tag == "FAM":
|
||||
counts["families"] += 1
|
||||
for child in rec.children:
|
||||
if child.tag in FAM_EVENTS:
|
||||
counts["events"] += 1
|
||||
elif rec.tag == "SOUR" and rec.xref:
|
||||
counts["sources"] += 1
|
||||
return dict(counts), sorted(unmapped)
|
||||
|
||||
|
||||
async def preview_gedcom(session: AsyncSession, *, actor: User, tree: Tree, text: str) -> dict:
|
||||
"""Dry run: what would import, and which incoming people look like existing
|
||||
ones. No writes."""
|
||||
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
||||
raise Forbidden("not an editor of this tree")
|
||||
roots = parse_records(text)
|
||||
counts, unmapped = _count_incoming(roots)
|
||||
index = await _build_existing_index(session, tree)
|
||||
|
||||
duplicates: list[dict] = []
|
||||
for rec in roots:
|
||||
if rec.tag != "INDI" or not rec.xref:
|
||||
continue
|
||||
summ = _person_summary(rec)
|
||||
entry, score = _best_match(summ["norm"], summ["year"], index)
|
||||
if entry is None:
|
||||
continue
|
||||
duplicates.append({
|
||||
"xref": rec.xref,
|
||||
"incoming_name": summ["name"],
|
||||
"incoming_birth_year": summ["year"],
|
||||
"existing_person_id": entry["id"],
|
||||
"existing_name": entry["name"],
|
||||
"existing_birth_year": entry["year"],
|
||||
"score": score,
|
||||
})
|
||||
return {"counts": counts, "potential_duplicates": duplicates, "unmapped_tags": unmapped}
|
||||
|
||||
|
||||
async def import_gedcom(
|
||||
session: AsyncSession, *, actor: User, tree: Tree, text: str
|
||||
session: AsyncSession,
|
||||
*,
|
||||
actor: User,
|
||||
tree: Tree,
|
||||
text: str,
|
||||
default_action: str = "new",
|
||||
resolutions: dict | None = None,
|
||||
) -> dict:
|
||||
"""Import records. ``default_action`` (new|skip|merge|overwrite) applies to
|
||||
incoming people that match an existing one; ``resolutions`` overrides it per
|
||||
GEDCOM xref ({xref: {action, target_id}}). 'skip' links families to the
|
||||
existing person but copies nothing; 'merge' also copies the incoming names
|
||||
(as alternates), events and citations onto them; 'overwrite' deletes the
|
||||
existing person and imports the incoming one fresh."""
|
||||
if not await privacy.can_edit_tree(session, user_id=actor.id, tree=tree):
|
||||
raise Forbidden("not an editor of this tree")
|
||||
|
||||
resolutions = resolutions or {}
|
||||
roots = parse_records(text)
|
||||
counts = defaultdict(int)
|
||||
counts: dict[str, int] = defaultdict(int)
|
||||
unmapped: set[str] = set()
|
||||
place_cache: dict[str, uuid.UUID] = {}
|
||||
source_map: dict[str, uuid.UUID] = {}
|
||||
person_map: dict[str, uuid.UUID] = {}
|
||||
now = datetime.now(UTC)
|
||||
|
||||
index = await _build_existing_index(session, tree)
|
||||
|
||||
# Pre-load existing relationship keys so a merge doesn't create dup edges.
|
||||
existing_rels = list(
|
||||
(
|
||||
await session.execute(
|
||||
select(Relationship).where(
|
||||
Relationship.tree_id == tree.id, Relationship.deleted_at.is_(None)
|
||||
)
|
||||
)
|
||||
).scalars().all()
|
||||
)
|
||||
rel_keys = {_relkey(r.type, r.person_from_id, r.person_to_id) for r in existing_rels}
|
||||
|
||||
def add_relationship(
|
||||
rtype: RelationshipType, a: uuid.UUID, b: uuid.UUID, **kw
|
||||
) -> Relationship | None:
|
||||
key = _relkey(rtype, a, b)
|
||||
if key in rel_keys:
|
||||
return None
|
||||
rel = Relationship(tree_id=tree.id, type=rtype, person_from_id=a, person_to_id=b, **kw)
|
||||
session.add(rel)
|
||||
rel_keys.add(key)
|
||||
counts["relationships"] += 1
|
||||
return rel
|
||||
|
||||
async def place_id(name: str | None) -> uuid.UUID | None:
|
||||
if not name:
|
||||
@@ -177,59 +443,139 @@ async def import_gedcom(
|
||||
sid = source_map.get(s.value.strip())
|
||||
if sid is None:
|
||||
continue
|
||||
session.add(
|
||||
Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target)
|
||||
)
|
||||
session.add(Citation(tree_id=tree.id, source_id=sid, page=s.text("PAGE"), **target))
|
||||
counts["citations"] += 1
|
||||
|
||||
# Individuals.
|
||||
for rec in roots:
|
||||
if rec.tag != "INDI" or not rec.xref:
|
||||
continue
|
||||
person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX")))
|
||||
session.add(person)
|
||||
await session.flush()
|
||||
person_map[rec.xref] = person.id
|
||||
counts["persons"] += 1
|
||||
|
||||
for i, nm in enumerate(rec.all("NAME")):
|
||||
given, surname = _parse_name(nm.value)
|
||||
def add_names(person_id: uuid.UUID, names: list[dict], *, set_primary: bool) -> None:
|
||||
for nd in names:
|
||||
session.add(
|
||||
Name(
|
||||
tree_id=tree.id,
|
||||
person_id=person.id,
|
||||
name_type="birth",
|
||||
given=given,
|
||||
surname=surname,
|
||||
display_name=nm.value or None,
|
||||
is_primary=(i == 0),
|
||||
sort_order=i,
|
||||
person_id=person_id,
|
||||
name_type=nd["type"],
|
||||
given=nd["given"],
|
||||
surname=nd["surname"],
|
||||
nickname=nd.get("nickname"),
|
||||
display_name=nd.get("display"),
|
||||
is_primary=set_primary and nd.get("is_primary", False),
|
||||
sort_order=nd.get("sort", 0),
|
||||
)
|
||||
)
|
||||
counts["names"] += 1
|
||||
|
||||
await add_citations(rec, person_id=person.id)
|
||||
|
||||
async def add_events(rec: GedcomNode, person_id: uuid.UUID) -> None:
|
||||
for child in rec.children:
|
||||
if child.tag in INDI_EVENTS:
|
||||
dv = child.text("DATE")
|
||||
# Attribute-style facts (RELI, OCCU, EDUC) carry their value on
|
||||
# the line itself; store it in detail.
|
||||
detail = child.value.strip() if child.tag in VALUE_EVENTS else None
|
||||
ev = Event(
|
||||
tree_id=tree.id,
|
||||
person_id=person.id,
|
||||
person_id=person_id,
|
||||
event_type=INDI_EVENTS[child.tag],
|
||||
date_value=dv,
|
||||
date_start=_date_start(dv),
|
||||
place_id=await place_id(child.text("PLAC")),
|
||||
detail=detail or None,
|
||||
notes=child.text("NOTE"),
|
||||
)
|
||||
session.add(ev)
|
||||
await session.flush()
|
||||
counts["events"] += 1
|
||||
await add_citations(child, event_id=ev.id)
|
||||
elif child.tag in ("NAME", "SEX", "SOUR", "FAMC", "FAMS", "CHAN", "OBJE", "_UID"):
|
||||
elif child.tag in INDI_SKIP_TAGS:
|
||||
continue
|
||||
else:
|
||||
unmapped.add(child.tag)
|
||||
|
||||
async def soft_delete_existing(person_id: uuid.UUID) -> None:
|
||||
p = (
|
||||
await session.execute(
|
||||
select(Person).where(Person.id == person_id, Person.deleted_at.is_(None))
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
if p is None:
|
||||
return
|
||||
p.deleted_at = now
|
||||
rels = (
|
||||
await session.execute(
|
||||
select(Relationship).where(
|
||||
Relationship.tree_id == tree.id,
|
||||
Relationship.deleted_at.is_(None),
|
||||
or_(
|
||||
Relationship.person_from_id == person_id,
|
||||
Relationship.person_to_id == person_id,
|
||||
),
|
||||
)
|
||||
)
|
||||
).scalars().all()
|
||||
for r in rels:
|
||||
r.deleted_at = now
|
||||
await session.execute(
|
||||
update(User).where(User.self_person_id == person_id).values(self_person_id=None)
|
||||
)
|
||||
|
||||
# Precompute the best match per incoming xref (for default-policy resolution).
|
||||
matches: dict[str, dict] = {}
|
||||
for rec in roots:
|
||||
if rec.tag == "INDI" and rec.xref:
|
||||
summ = _person_summary(rec)
|
||||
entry, _score = _best_match(summ["norm"], summ["year"], index)
|
||||
if entry is not None:
|
||||
matches[rec.xref] = entry
|
||||
|
||||
def resolve(xref: str) -> tuple[str, uuid.UUID | None]:
|
||||
ov = resolutions.get(xref)
|
||||
if ov:
|
||||
action = ov.get("action", "new")
|
||||
tid = ov.get("target_id")
|
||||
target = uuid.UUID(tid) if tid else (matches[xref]["id"] if xref in matches else None)
|
||||
if action in ("skip", "merge", "overwrite") and target is None:
|
||||
return "new", None
|
||||
return action, target
|
||||
if default_action != "new" and xref in matches:
|
||||
return default_action, matches[xref]["id"]
|
||||
return "new", None
|
||||
|
||||
# Individuals.
|
||||
for rec in roots:
|
||||
if rec.tag != "INDI" or not rec.xref:
|
||||
continue
|
||||
names = _extract_names(rec)
|
||||
action, target = resolve(rec.xref)
|
||||
|
||||
if action == "skip" and target is not None:
|
||||
person_map[rec.xref] = target
|
||||
counts["skipped"] += 1
|
||||
continue
|
||||
if action == "merge" and target is not None:
|
||||
person_map[rec.xref] = target
|
||||
add_names(target, names, set_primary=False)
|
||||
await add_events(rec, target)
|
||||
await add_citations(rec, person_id=target)
|
||||
note = _notes_text(rec)
|
||||
if note:
|
||||
existing = (
|
||||
await session.execute(select(Person).where(Person.id == target))
|
||||
).scalar_one_or_none()
|
||||
if existing is not None:
|
||||
existing.notes = "\n".join(filter(None, [existing.notes, note]))
|
||||
counts["merged"] += 1
|
||||
continue
|
||||
if action == "overwrite" and target is not None:
|
||||
await soft_delete_existing(target)
|
||||
counts["overwritten"] += 1
|
||||
|
||||
person = Person(tree_id=tree.id, gender=_sex(rec.text("SEX")), notes=_notes_text(rec))
|
||||
session.add(person)
|
||||
await session.flush()
|
||||
person_map[rec.xref] = person.id
|
||||
counts["persons"] += 1
|
||||
add_names(person.id, names, set_primary=True)
|
||||
await add_citations(rec, person_id=person.id)
|
||||
await add_events(rec, person.id)
|
||||
|
||||
# Families -> partnerships, parent-child edges, marriage events.
|
||||
for rec in roots:
|
||||
if rec.tag != "FAM":
|
||||
@@ -238,17 +584,22 @@ async def import_gedcom(
|
||||
husb = person_map.get((rec.text("HUSB") or "").strip())
|
||||
wife = person_map.get((rec.text("WIFE") or "").strip())
|
||||
partnership_id: uuid.UUID | None = None
|
||||
if husb and wife:
|
||||
rel = Relationship(
|
||||
tree_id=tree.id,
|
||||
type=RelationshipType.partnership,
|
||||
person_from_id=husb,
|
||||
person_to_id=wife,
|
||||
if husb and wife and husb != wife:
|
||||
rel = add_relationship(RelationshipType.partnership, husb, wife)
|
||||
if rel is not None:
|
||||
await session.flush()
|
||||
partnership_id = rel.id
|
||||
if partnership_id is None and husb and wife:
|
||||
# Edge already existed — find it so marriage events can attach.
|
||||
existing = next(
|
||||
(
|
||||
r for r in existing_rels
|
||||
if r.type == RelationshipType.partnership
|
||||
and {r.person_from_id, r.person_to_id} == {husb, wife}
|
||||
),
|
||||
None,
|
||||
)
|
||||
session.add(rel)
|
||||
await session.flush()
|
||||
partnership_id = rel.id
|
||||
counts["relationships"] += 1
|
||||
partnership_id = existing.id if existing else None
|
||||
|
||||
for fe in rec.children:
|
||||
if fe.tag in FAM_EVENTS and partnership_id is not None:
|
||||
@@ -271,16 +622,12 @@ async def import_gedcom(
|
||||
continue
|
||||
for parent in (husb, wife):
|
||||
if parent and parent != cp:
|
||||
session.add(
|
||||
Relationship(
|
||||
tree_id=tree.id,
|
||||
type=RelationshipType.parent_child,
|
||||
person_from_id=parent,
|
||||
person_to_id=cp,
|
||||
qualifier=ParentChildQualifier.biological,
|
||||
)
|
||||
add_relationship(
|
||||
RelationshipType.parent_child,
|
||||
parent,
|
||||
cp,
|
||||
qualifier=ParentChildQualifier.biological,
|
||||
)
|
||||
counts["relationships"] += 1
|
||||
|
||||
record_audit(
|
||||
session,
|
||||
@@ -397,6 +744,9 @@ async def export_gedcom(session: AsyncSession, *, viewer_id: uuid.UUID, tree: Tr
|
||||
for n in names_by_person.get(p.id, []):
|
||||
display = n.display_name or f"{n.given or ''} /{n.surname or ''}/".strip()
|
||||
out.append(f"1 NAME {display}")
|
||||
ged_type = EXPORT_TYPE_MAP.get(n.name_type)
|
||||
if ged_type:
|
||||
out.append(f"2 TYPE {ged_type}")
|
||||
sex = {"male": "M", "female": "F"}.get(p.gender or "")
|
||||
if sex:
|
||||
out.append(f"1 SEX {sex}")
|
||||
|
||||
Reference in New Issue
Block a user