Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions langchain-coordinode/langchain_coordinode/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import hashlib
import json
import re
from typing import Any

Expand Down Expand Up @@ -70,19 +72,107 @@ def refresh_schema(self) -> None:
structured = _parse_schema(text)
# Augment with relationship triples (start_label, type, end_label) via
# Cypher — get_schema_text() only lists edge types without direction.
try:
# CoordiNode: wildcard [r] returns no results; build typed pattern from
# the rel_props keys returned by _parse_schema().
rel_types = list(structured.get("rel_props", {}).keys())
if rel_types:
rel_filter = "|".join(_cypher_ident(t) for t in rel_types)
rows = self._client.cypher(
"MATCH (a)-[r]->(b) RETURN DISTINCT labels(a)[0] AS src, type(r) AS rel, labels(b)[0] AS dst"
f"MATCH (a)-[r:{rel_filter}]->(b) "
"RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst"
)
structured["relationships"] = [
{"start": row["src"], "type": row["rel"], "end": row["dst"]}
for row in rows
if row.get("src") and row.get("rel") and row.get("dst")
]
except Exception: # noqa: BLE001
pass # Graph may have no relationships yet; structured["relationships"] stays []
self._structured_schema = structured

def add_graph_documents(
self,
graph_documents: list[Any],
include_source: bool = False,
) -> None:
"""Store nodes and relationships extracted from ``GraphDocument`` objects.

Nodes are upserted by ``id`` (used as the ``name`` property) via
``MERGE``, so repeated calls are safe for nodes.

Relationships are created with unconditional ``CREATE`` because
CoordiNode does not yet support ``MERGE`` for edge patterns. Re-ingesting
the same ``GraphDocument`` will therefore produce duplicate edges.

Args:
graph_documents: List of ``langchain_community.graphs.graph_document.GraphDocument``.
include_source: If ``True``, also store the source ``Document`` as a
``__Document__`` node linked to every extracted entity via
``MENTIONS`` edges (also unconditional ``CREATE``).
"""
for doc in graph_documents:
for node in doc.nodes:
self._upsert_node(node)
for rel in doc.relationships:
self._create_edge(rel)
if include_source and doc.source:
self._link_document_to_entities(doc)

# Invalidate cached schema so next access reflects new data
self._schema = None
self._structured_schema = None

def _upsert_node(self, node: Any) -> None:
"""Upsert a single node by ``id`` via MERGE."""
label = _cypher_ident(node.type or "Entity")
props = dict(node.properties or {})
# Always enforce node.id as the merge key; incoming
# properties["name"] must not drift from the MERGE predicate.
props["name"] = node.id
self._client.cypher(
f"MERGE (n:{label} {{name: $name}}) SET n += $props",
params={"name": node.id, "props": props},
)

def _create_edge(self, rel: Any) -> None:
"""Create a relationship via unconditional CREATE.

CoordiNode does not support MERGE for edge patterns. Re-ingesting the
same relationship will create a duplicate edge. SET r += $props is
skipped when props is empty because SET r += {} is not supported by all
server versions.
"""
src_label = _cypher_ident(rel.source.type or "Entity")
dst_label = _cypher_ident(rel.target.type or "Entity")
rel_type = _cypher_ident(rel.type)
props = dict(rel.properties or {})
if props:
self._client.cypher(
f"MATCH (src:{src_label} {{name: $src}}) "
f"MATCH (dst:{dst_label} {{name: $dst}}) "
f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props",
params={"src": rel.source.id, "dst": rel.target.id, "props": props},
)
else:
self._client.cypher(
f"MATCH (src:{src_label} {{name: $src}}) "
f"MATCH (dst:{dst_label} {{name: $dst}}) "
f"CREATE (src)-[r:{rel_type}]->(dst)",
params={"src": rel.source.id, "dst": rel.target.id},
)

def _link_document_to_entities(self, doc: Any) -> None:
"""Upsert a ``__Document__`` node and CREATE ``MENTIONS`` edges to all entities."""
src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source)
self._client.cypher(
"MERGE (d:__Document__ {id: $id}) SET d.page_content = $text",
params={"id": src_id, "text": doc.source.page_content or ""},
)
for node in doc.nodes:
label = _cypher_ident(node.type or "Entity")
self._client.cypher(
f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) CREATE (d)-[:MENTIONS]->(n)",
params={"doc_id": src_id, "name": node.id},
)

def query(
self,
query: str,
Expand Down Expand Up @@ -116,6 +206,40 @@ def __exit__(self, *args: Any) -> None:
# ── Schema parser ─────────────────────────────────────────────────────────


def _stable_document_id(source: Any) -> str:
"""Return a deterministic ID for a LangChain Document.

Combines ``page_content`` and sorted ``metadata`` items so the same
document produces the same ``__Document__`` node ID across different
Python processes. This makes document-node creation stable when
``include_source=True`` is used, but does not make re-ingest fully
idempotent because ``MENTIONS`` edges are not deduplicated until edge
``MERGE``/dedup support is added to CoordiNode.
"""
content = getattr(source, "page_content", "") or ""
metadata = getattr(source, "metadata", {}) or {}
# Use canonical JSON encoding to avoid delimiter ambiguity and ensure
# determinism for nested/non-scalar metadata values. default=str converts
# non-JSON-serializable types (datetime, UUID, Path, …) to their string
# representation so the hash never raises TypeError.
canonical = json.dumps(
{"content": content, "metadata": metadata},
sort_keys=True,
separators=(",", ":"),
ensure_ascii=False,
default=str,
)
return hashlib.sha256(canonical.encode()).hexdigest()[:32]


def _cypher_ident(name: str) -> str:
"""Escape a label/type name for use as a Cypher identifier."""
# ASCII-only word characters: letter/digit/underscore, not starting with digit.
if re.match(r"^[A-Za-z_]\w*$", name, re.ASCII):
return name
return f"`{name.replace('`', '``')}`"


def _parse_schema(schema_text: str) -> dict[str, Any]:
"""Convert CoordiNode schema text into LangChain's structured format.

Expand Down
4 changes: 3 additions & 1 deletion llama-index-coordinode/llama_index/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# namespace package
from pkgutil import extend_path

__path__ = extend_path(__path__, __name__)
4 changes: 3 additions & 1 deletion llama-index-coordinode/llama_index/graph_stores/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# namespace package
from pkgutil import extend_path

__path__ = extend_path(__path__, __name__)
134 changes: 97 additions & 37 deletions llama-index-coordinode/llama_index/graph_stores/coordinode/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ def get_triplets(
properties: dict[str, Any] | None = None,
ids: list[str] | None = None,
) -> list[list[LabelledNode]]:
"""Retrieve triplets (subject, predicate, object) as node triples."""
"""Retrieve triplets (subject, predicate, object) as node triples.

Note:
``relation_names`` is **required**. CoordiNode does not support
untyped wildcard ``[r]`` relationship patterns — they silently return
no rows. Omitting ``relation_names`` raises ``NotImplementedError``.
"""
conditions: list[str] = []
params: dict[str, Any] = {}

Expand All @@ -131,20 +137,26 @@ def get_triplets(
rel_filter = "|".join(_cypher_ident(t) for t in relation_names)
rel_pattern = f"[r:{rel_filter}]"
else:
rel_pattern = "[r]"
# CoordiNode: wildcard [r] pattern returns no results.
# Callers must supply relation_names for the query to work.
raise NotImplementedError(
"CoordinodePropertyGraphStore.get_triplets() requires relation_names — "
"CoordiNode does not support untyped wildcard [r] patterns"
)

where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
# CoordiNode: use r.__type__ instead of type(r) — type() returns null.
cypher = (
f"MATCH (n)-{rel_pattern}->(m) {where} "
"RETURN n, type(r) AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
"RETURN n, r.__type__ AS rel_type, m, n.id AS _src_id, m.id AS _dst_id "
"LIMIT 1000"
)
result = self._client.cypher(cypher, params=params)

triplets: list[list[LabelledNode]] = []
for row in result:
src_data = row.get("n", {})
rel_type = row.get("rel_type", "RELATED")
rel_type = row.get("rel_type") or "RELATED"
dst_data = row.get("m", {})
src_id = str(row.get("_src_id", ""))
dst_id = str(row.get("_dst_id", ""))
Expand All @@ -158,30 +170,47 @@ def get_triplets(
def get_rel_map(
self,
graph_nodes: list[LabelledNode],
depth: int = 2,
depth: int = 1,
limit: int = 30,
ignore_rels: list[str] | None = None,
) -> list[list[LabelledNode]]:
"""Get relationship map for a set of nodes up to ``depth`` hops."""
"""Get relationship map for a set of nodes up to ``depth`` hops.

Note: only ``depth=1`` (single hop) is supported. ``depth > 1`` raises
``NotImplementedError`` because CoordiNode does not yet serialise
variable-length path results.
"""
if depth != 1:
raise NotImplementedError(
"CoordinodePropertyGraphStore.get_rel_map() currently supports depth=1 only; "
"variable-length path queries are not yet available in CoordiNode"
)

if not graph_nodes:
return []

node_ids = [n.id for n in graph_nodes]
ignored = list(ignore_rels) if ignore_rels else []
# CoordiNode: wildcard [r] pattern returns no results. Fetch all
# known edge types from the schema and build a typed pattern instead,
# e.g. [r:TYPE_A|TYPE_B|...].
schema_text = self._client.get_schema_text()
edge_types = _parse_edge_types_from_schema(schema_text)

# Push ignore_rels filter into Cypher so LIMIT applies after filtering;
# a Python-side filter after LIMIT would silently truncate valid results.
ignored = set(ignore_rels) if ignore_rels else set()
active_types = [t for t in edge_types if t not in ignored]

if not active_types:
return []

rel_filter = "|".join(_cypher_ident(t) for t in active_types)
node_ids = [n.id for n in graph_nodes]
safe_limit = int(limit) # coerce to int to prevent Cypher injection via non-integer input
params: dict[str, object] = {"ids": node_ids}
ignore_clause = ""
if ignored:
ignore_clause = " AND NONE(rel IN r WHERE type(rel) IN $ignored_rels)"
params["ignored_rels"] = ignored

cypher = (
f"MATCH (n)-[r*1..{depth}]->(m) "
f"WHERE n.id IN $ids{ignore_clause} "
f"RETURN n, r, m, n.id AS _src_id, m.id AS _dst_id "
f"LIMIT {limit}"
f"MATCH (n)-[r:{rel_filter}]->(m) "
f"WHERE n.id IN $ids "
f"RETURN n, r.__type__ AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id "
f"LIMIT {safe_limit}"
)
result = self._client.cypher(cypher, params=params)

Expand All @@ -191,13 +220,7 @@ def get_rel_map(
dst_data = row.get("m", {})
src_id = str(row.get("_src_id", ""))
dst_id = str(row.get("_dst_id", ""))
# Variable-length path [r*1..N] returns a list of relationship dicts.
rels = row.get("r", [])
if isinstance(rels, list) and rels:
first_rel = rels[0]
rel_label = first_rel.get("type", "RELATED") if isinstance(first_rel, dict) else str(first_rel)
else:
rel_label = "RELATED"
rel_label = str(row.get("_rel_type") or "RELATED")
src = _node_result_to_labelled(src_id, src_data)
dst = _node_result_to_labelled(dst_id, dst_data)
rel = Relation(label=rel_label, source_id=src_id, target_id=dst_id)
Expand All @@ -217,18 +240,29 @@ def upsert_relations(self, relations: list[Relation]) -> None:
"""Upsert relationships into the graph."""
for rel in relations:
props = rel.properties or {}
cypher = (
f"MATCH (src {{id: $src_id}}), (dst {{id: $dst_id}}) "
f"MERGE (src)-[r:{_cypher_ident(rel.label)}]->(dst) SET r += $props"
)
self._client.cypher(
cypher,
params={
"src_id": rel.source_id,
"dst_id": rel.target_id,
"props": props,
},
)
label = _cypher_ident(rel.label)
# CoordiNode does not yet support MERGE for edge patterns; use CREATE.
# A WHERE NOT (src)-[:TYPE]->(dst) guard was tested but returns 0
# rows silently in CoordiNode, making all CREATE statements no-ops.
# Until server-side MERGE or pattern predicates are supported,
# repeated calls will create duplicate edges.
# SET r += $props is skipped when props is empty — SET r += {} is
# not supported by all server versions.
if props:
cypher = (
f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) "
f"CREATE (src)-[r:{label}]->(dst) SET r += $props"
)
self._client.cypher(
cypher,
params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props},
)
else:
cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) CREATE (src)-[r:{label}]->(dst)"
self._client.cypher(
cypher,
params={"src_id": rel.source_id, "dst_id": rel.target_id},
)

def delete(
self,
Expand Down Expand Up @@ -342,3 +376,29 @@ def _node_label(node: LabelledNode) -> str:
if isinstance(node, EntityNode):
return node.label or "Entity"
return "Node"


def _parse_edge_types_from_schema(schema_text: str) -> list[str]:
"""Extract edge type names from CoordiNode schema text.

Parses the "Edge types:" section produced by ``get_schema_text()``.
"""
edge_types: list[str] = []
lines = iter(schema_text.splitlines())

# Advance to the "Edge types:" header.
for line in lines:
if line.strip().lower().startswith("edge types"):
break

# Collect bullet items until the first blank line.
for line in lines:
stripped = line.strip()
if not stripped:
break
if stripped.startswith(("-", "*")):
name = stripped.lstrip("-* ").split("(")[0].strip()
if name:
edge_types.append(name)

return edge_types
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ coordinode = { workspace = true }
dev = [
"build>=1.2",
"grpcio-tools>=1.60",
"langchain-community>=0.3",
"langchain-core>=0.3",
"llama-index-core>=0.12",
"pytest>=8",
"pytest-asyncio>=0.23",
"pytest-timeout>=2",
Expand Down
Empty file.
Loading
Loading