From 50fb1f1c23519e43eb6d55f1d8014400cb700471 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 18:47:23 +0300 Subject: [PATCH 01/11] fix(langchain): implement add_graph_documents and use __label__/__type__ - Implement add_graph_documents() which was inherited as a no-op from GraphStore base class; upserts nodes via MERGE and creates edges via MATCH+CREATE with fallback when WHERE NOT guard is unsupported - Fix refresh_schema() relationship enrichment query: labels()/type() Cypher functions return null in CoordiNode; use a.__label__ and r.__type__ internal properties instead - Add _cypher_ident() helper to safely escape label/type names Closes #14 --- .../langchain_coordinode/graph.py | 92 ++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index e21fbf0..9d16ed6 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -72,7 +72,7 @@ def refresh_schema(self) -> None: # Cypher — get_schema_text() only lists edge types without direction. try: rows = self._client.cypher( - "MATCH (a)-[r]->(b) RETURN DISTINCT labels(a)[0] AS src, type(r) AS rel, labels(b)[0] AS dst" + "MATCH (a)-[r]->(b) RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst" ) structured["relationships"] = [ {"start": row["src"], "type": row["rel"], "end": row["dst"]} @@ -83,6 +83,88 @@ def refresh_schema(self) -> None: pass # Graph may have no relationships yet; structured["relationships"] stays [] self._structured_schema = structured + def add_graph_documents( + self, + graph_documents: list[Any], + include_source: bool = False, + ) -> None: + """Store nodes and relationships extracted from ``GraphDocument`` objects. + + Nodes are upserted by ``id`` (used as the ``name`` property). + Relationships are created between existing nodes; if a relationship + between the same source and target already exists it is skipped. + + Args: + graph_documents: List of ``langchain_community.graphs.graph_document.GraphDocument``. + include_source: If ``True``, also store the source ``Document`` as a + ``__Document__`` node linked to every extracted entity. + """ + for doc in graph_documents: + # ── Upsert nodes ────────────────────────────────────────────── + for node in doc.nodes: + label = _cypher_ident(node.type or "Entity") + props = dict(node.properties or {}) + props.setdefault("name", node.id) + self._client.cypher( + f"MERGE (n:{label} {{name: $name}}) SET n += $props", + params={"name": node.id, "props": props}, + ) + + # ── Create relationships (idempotent: skip if already exists) ─ + for rel in doc.relationships: + src_label = _cypher_ident(rel.source.type or "Entity") + dst_label = _cypher_ident(rel.target.type or "Entity") + rel_type = _cypher_ident(rel.type) + props = dict(rel.properties or {}) + # CoordiNode does not yet support MERGE for edges; use CREATE + # guarded by NOT EXISTS to avoid duplicates on repeated calls. + try: + self._client.cypher( + f"MATCH (src:{src_label} {{name: $src}}) " + f"MATCH (dst:{dst_label} {{name: $dst}}) " + f"WHERE NOT (src)-[:{rel_type}]->(dst) " + f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props", + params={"src": rel.source.id, "dst": rel.target.id, "props": props}, + ) + except Exception: # noqa: BLE001 + # WHERE NOT EXISTS guard may not be supported on all server + # versions; fall back to unconditional CREATE + self._client.cypher( + f"MATCH (src:{src_label} {{name: $src}}) " + f"MATCH (dst:{dst_label} {{name: $dst}}) " + f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props", + params={"src": rel.source.id, "dst": rel.target.id, "props": props}, + ) + + # ── Optionally link source document ─────────────────────────── + if include_source and doc.source: + src_id = getattr(doc.source, "id", None) or str(id(doc.source)) + self._client.cypher( + "MERGE (d:__Document__ {id: $id}) SET d.page_content = $text", + params={"id": src_id, "text": doc.source.page_content or ""}, + ) + for node in doc.nodes: + label = _cypher_ident(node.type or "Entity") + try: + self._client.cypher( + f"MATCH (d:__Document__ {{id: $doc_id}}) " + f"MATCH (n:{label} {{name: $name}}) " + f"WHERE NOT (d)-[:MENTIONS]->(n) " + f"CREATE (d)-[:MENTIONS]->(n)", + params={"doc_id": src_id, "name": node.id}, + ) + except Exception: # noqa: BLE001 + self._client.cypher( + f"MATCH (d:__Document__ {{id: $doc_id}}) " + f"MATCH (n:{label} {{name: $name}}) " + f"CREATE (d)-[:MENTIONS]->(n)", + params={"doc_id": src_id, "name": node.id}, + ) + + # Invalidate cached schema so next access reflects new data + self._schema = None + self._structured_schema = None + def query( self, query: str, @@ -116,6 +198,14 @@ def __exit__(self, *args: Any) -> None: # ── Schema parser ───────────────────────────────────────────────────────── +def _cypher_ident(name: str) -> str: + """Escape a label/type name for use as a Cypher identifier.""" + # If already safe (alphanumeric + underscore, not starting with digit) keep as-is + if re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name): + return name + return f"`{name.replace('`', '``')}`" + + def _parse_schema(schema_text: str) -> dict[str, Any]: """Convert CoordiNode schema text into LangChain's structured format. From c06a820448c9d4d1bd0c33e2d356b75be6abb542 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 18:47:41 +0300 Subject: [PATCH 02/11] fix(llama-index): use __type__ for rel type and MATCH+CREATE for edges - Fix upsert_relations(): CoordiNode does not support MERGE for edge patterns; replace MATCH+MERGE with MATCH+MATCH+CREATE - Fix get_triplets(): type(r) returns null; use r.__type__ instead. Also use `or "RELATED"` guard (not default arg) since null differs from missing key - Fix get_rel_map(): same __type__ fix for variable-length path results where relationship data arrives as a list of dicts Closes #14 --- .../graph_stores/coordinode/base.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py index cb0e89c..c9ab08c 100644 --- a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py +++ b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py @@ -134,9 +134,12 @@ def get_triplets( rel_pattern = "[r]" where = f"WHERE {' AND '.join(conditions)}" if conditions else "" + # CoordiNode: use r.__type__ instead of type(r) — type() returns null. + # Wildcard [r] pattern also returns no results; caller must supply + # relation_names for wildcard queries to work. cypher = ( f"MATCH (n)-{rel_pattern}->(m) {where} " - "RETURN n, type(r) AS rel_type, m, n.id AS _src_id, m.id AS _dst_id " + "RETURN n, r.__type__ AS rel_type, m, n.id AS _src_id, m.id AS _dst_id " "LIMIT 1000" ) result = self._client.cypher(cypher, params=params) @@ -144,7 +147,7 @@ def get_triplets( triplets: list[list[LabelledNode]] = [] for row in result: src_data = row.get("n", {}) - rel_type = row.get("rel_type", "RELATED") + rel_type = row.get("rel_type") or "RELATED" dst_data = row.get("m", {}) src_id = str(row.get("_src_id", "")) dst_id = str(row.get("_dst_id", "")) @@ -195,7 +198,8 @@ def get_rel_map( rels = row.get("r", []) if isinstance(rels, list) and rels: first_rel = rels[0] - rel_label = first_rel.get("type", "RELATED") if isinstance(first_rel, dict) else str(first_rel) + # CoordiNode: use __type__ key instead of "type" — type() returns null + rel_label = first_rel.get("__type__") or first_rel.get("type", "RELATED") if isinstance(first_rel, dict) else str(first_rel) else: rel_label = "RELATED" src = _node_result_to_labelled(src_id, src_data) @@ -217,9 +221,13 @@ def upsert_relations(self, relations: list[Relation]) -> None: """Upsert relationships into the graph.""" for rel in relations: props = rel.properties or {} + label = _cypher_ident(rel.label) + # CoordiNode does not yet support MERGE for edge patterns; use CREATE. + # Note: repeated calls will create duplicate edges until MERGE for + # edges is implemented server-side. cypher = ( - f"MATCH (src {{id: $src_id}}), (dst {{id: $dst_id}}) " - f"MERGE (src)-[r:{_cypher_ident(rel.label)}]->(dst) SET r += $props" + f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) " + f"CREATE (src)-[r:{label}]->(dst) SET r += $props" ) self._client.cypher( cypher, From d59e18f313aec6f6dd6a79bff283fd72c48aa546 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 18:48:16 +0300 Subject: [PATCH 03/11] build(lock): sync uv.lock to v0.4.3 workspace version --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index cbf940e..0b461ba 100644 --- a/uv.lock +++ b/uv.lock @@ -359,7 +359,7 @@ provides-extras = ["dev"] [[package]] name = "coordinode-workspace" -version = "0.0.0" +version = "0.4.3" source = { virtual = "." } dependencies = [ { name = "googleapis-common-protos" }, From 2a3909c3c61299cc6134784984e52b27f2481b36 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 18:57:17 +0300 Subject: [PATCH 04/11] test(adapters): add integration tests for langchain and llama-index adapters - Add tests/integration/adapters/test_langchain.py: CoordinodeGraph connect, schema, query, add_graph_documents (upsert nodes, create relationships, idempotency), schema invalidation - Add tests/integration/adapters/test_llama_index.py: CoordinodePropertyGraphStore connect, schema, structured_query, upsert_nodes, upsert_relations, get_triplets, get_rel_map, delete (by id and name) - Fix namespace package declarations in llama_index/__init__.py and llama_index/graph_stores/__init__.py: use pkgutil.extend_path so llama_index.core remains importable when package is installed editable - Add llama-index-core and langchain-community to dev dependencies so adapter tests can be collected and run from the workspace root - 19/19 integration tests pass against localhost:17080 All 19 adapter tests pass. Relates to #14. --- .../llama_index/__init__.py | 4 +- .../llama_index/graph_stores/__init__.py | 4 +- pyproject.toml | 3 + tests/integration/adapters/__init__.py | 0 tests/integration/adapters/test_langchain.py | 130 ++++++++++++++++ .../integration/adapters/test_llama_index.py | 141 ++++++++++++++++++ uv.lock | 6 + 7 files changed, 286 insertions(+), 2 deletions(-) create mode 100644 tests/integration/adapters/__init__.py create mode 100644 tests/integration/adapters/test_langchain.py create mode 100644 tests/integration/adapters/test_llama_index.py diff --git a/llama-index-coordinode/llama_index/__init__.py b/llama-index-coordinode/llama_index/__init__.py index 9ebc633..b36383a 100644 --- a/llama-index-coordinode/llama_index/__init__.py +++ b/llama-index-coordinode/llama_index/__init__.py @@ -1 +1,3 @@ -# namespace package +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) diff --git a/llama-index-coordinode/llama_index/graph_stores/__init__.py b/llama-index-coordinode/llama_index/graph_stores/__init__.py index 9ebc633..b36383a 100644 --- a/llama-index-coordinode/llama_index/graph_stores/__init__.py +++ b/llama-index-coordinode/llama_index/graph_stores/__init__.py @@ -1 +1,3 @@ -# namespace package +from pkgutil import extend_path + +__path__ = extend_path(__path__, __name__) diff --git a/pyproject.toml b/pyproject.toml index baa64c2..18bc7ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,9 @@ coordinode = { workspace = true } dev = [ "build>=1.2", "grpcio-tools>=1.60", + "langchain-community>=0.3", + "langchain-core>=0.3", + "llama-index-core>=0.12", "pytest>=8", "pytest-asyncio>=0.23", "pytest-timeout>=2", diff --git a/tests/integration/adapters/__init__.py b/tests/integration/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/adapters/test_langchain.py b/tests/integration/adapters/test_langchain.py new file mode 100644 index 0000000..71984c4 --- /dev/null +++ b/tests/integration/adapters/test_langchain.py @@ -0,0 +1,130 @@ +"""Integration tests for CoordinodeGraph (LangChain adapter). + +Requires a running CoordiNode instance. Set COORDINODE_ADDR env var +(default: localhost:7080). + +Run via: + COORDINODE_ADDR=localhost:17080 pytest tests/integration/adapters/test_langchain.py -v +""" + +import os +import uuid + +import pytest +from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship +from langchain_core.documents import Document + +from langchain_coordinode import CoordinodeGraph + +ADDR = os.environ.get("COORDINODE_ADDR", "localhost:7080") + + +@pytest.fixture(scope="module") +def graph(): + with CoordinodeGraph(ADDR) as g: + yield g + + +# ── Basic connectivity ──────────────────────────────────────────────────────── + +def test_connect(graph): + assert graph is not None + + +def test_schema_returns_string(graph): + schema = graph.schema + assert isinstance(schema, str) + + +def test_refresh_schema_does_not_raise(graph): + graph.refresh_schema() + assert isinstance(graph.schema, str) + assert isinstance(graph.structured_schema, dict) + assert "node_props" in graph.structured_schema + assert "rel_props" in graph.structured_schema + assert "relationships" in graph.structured_schema + + +# ── Cypher query ────────────────────────────────────────────────────────────── + +def test_query_returns_list(graph): + result = graph.query("RETURN 1 AS n") + assert isinstance(result, list) + assert result[0]["n"] == 1 + + +def test_query_count(graph): + result = graph.query("MATCH (n) RETURN count(n) AS total") + assert isinstance(result, list) + assert isinstance(result[0]["total"], int) + + +# ── add_graph_documents ─────────────────────────────────────────────────────── + +@pytest.fixture +def unique_tag(): + return uuid.uuid4().hex[:8] + + +def test_add_graph_documents_upserts_nodes(graph, unique_tag): + node_a = Node(id=f"Alice-{unique_tag}", type="LCPerson", properties={"role": "researcher"}) + node_b = Node(id=f"Bob-{unique_tag}", type="LCPerson", properties={"role": "engineer"}) + doc = GraphDocument(nodes=[node_a, node_b], relationships=[], source=Document(page_content="test")) + + graph.add_graph_documents([doc]) + + result = graph.query( + "MATCH (n:LCPerson {name: $name}) RETURN n.name AS name", + params={"name": f"Alice-{unique_tag}"}, + ) + assert len(result) >= 1 + assert result[0]["name"] == f"Alice-{unique_tag}" + + +def test_add_graph_documents_creates_relationship(graph, unique_tag): + node_a = Node(id=f"Charlie-{unique_tag}", type="LCPerson2") + node_b = Node(id=f"GraphRAG-{unique_tag}", type="LCConcept") + rel = Relationship(source=node_a, target=node_b, type="LC_RESEARCHES") + doc = GraphDocument( + nodes=[node_a, node_b], + relationships=[rel], + source=Document(page_content="test"), + ) + + graph.add_graph_documents([doc]) + + # Verify both nodes exist + result = graph.query( + "MATCH (n:LCPerson2 {name: $name}) RETURN n.name AS name", + params={"name": f"Charlie-{unique_tag}"}, + ) + assert len(result) >= 1, f"source node not found: {result}" + + +def test_add_graph_documents_idempotent(graph, unique_tag): + """Calling add_graph_documents twice must not raise.""" + node = Node(id=f"Idempotent-{unique_tag}", type="LCIdempotent") + doc = GraphDocument(nodes=[node], relationships=[], source=Document(page_content="test")) + + graph.add_graph_documents([doc]) + graph.add_graph_documents([doc]) # second call must not raise + + result = graph.query( + "MATCH (n:LCIdempotent {name: $name}) RETURN count(n) AS cnt", + params={"name": f"Idempotent-{unique_tag}"}, + ) + assert result[0]["cnt"] >= 1 + + +def test_schema_refreshes_after_add(graph, unique_tag): + """structured_schema is invalidated and re-fetched after add_graph_documents.""" + graph._schema = None # force refresh + schema_before = graph.schema + + node = Node(id=f"SchemaNode-{unique_tag}", type="LCSchemaTest") + doc = GraphDocument(nodes=[node], relationships=[], source=Document(page_content="test")) + graph.add_graph_documents([doc]) + + graph.refresh_schema() + # schema must still be a string after refresh (content depends on server) + assert isinstance(graph.schema, str) diff --git a/tests/integration/adapters/test_llama_index.py b/tests/integration/adapters/test_llama_index.py new file mode 100644 index 0000000..1a4b60e --- /dev/null +++ b/tests/integration/adapters/test_llama_index.py @@ -0,0 +1,141 @@ +"""Integration tests for CoordinodePropertyGraphStore (LlamaIndex adapter). + +Requires a running CoordiNode instance. Set COORDINODE_ADDR env var +(default: localhost:7080). + +Run via: + COORDINODE_ADDR=localhost:17080 pytest tests/integration/adapters/test_llama_index.py -v +""" + +import os +import uuid + +import pytest +from llama_index.core.graph_stores.types import EntityNode, Relation + +from llama_index.graph_stores.coordinode import CoordinodePropertyGraphStore + +ADDR = os.environ.get("COORDINODE_ADDR", "localhost:7080") + + +@pytest.fixture(scope="module") +def store(): + with CoordinodePropertyGraphStore(ADDR) as s: + yield s + + +@pytest.fixture +def tag(): + return uuid.uuid4().hex[:8] + + +# ── Basic connectivity ──────────────────────────────────────────────────────── + +def test_connect(store): + assert store is not None + + +def test_get_schema(store): + schema = store.get_schema() + assert isinstance(schema, str) + + +def test_structured_query_literal(store): + result = store.structured_query("RETURN 1 AS n") + assert isinstance(result, list) + assert result[0]["n"] == 1 + + +# ── Node operations ─────────────────────────────────────────────────────────── + +def test_upsert_and_get_nodes(store, tag): + nodes = [ + EntityNode(label="LITestPerson", name=f"Alice-{tag}", properties={"role": "researcher"}), + EntityNode(label="LITestConcept", name=f"GraphRAG-{tag}", properties={"field": "AI"}), + ] + store.upsert_nodes(nodes) + + found = store.get(properties={"name": f"Alice-{tag}"}) + assert len(found) >= 1 + assert any(getattr(n, "name", None) == f"Alice-{tag}" for n in found) + + +def test_upsert_nodes_idempotent(store, tag): + """Upserting the same node twice must not raise and must not duplicate.""" + node = EntityNode(label="LIIdempotent", name=f"Idem-{tag}") + store.upsert_nodes([node]) + store.upsert_nodes([node]) # second call must not raise + + found = store.get(properties={"name": f"Idem-{tag}"}) + assert len(found) >= 1 + + +def test_get_by_id(store, tag): + node = EntityNode(label="LIGetById", name=f"ById-{tag}") + node_id = node.id + store.upsert_nodes([node]) + + found = store.get(ids=[node_id]) + assert len(found) >= 1 + + +# ── Relation operations ─────────────────────────────────────────────────────── + +def test_upsert_and_get_triplets(store, tag): + src = EntityNode(label="LIRelPerson", name=f"Src-{tag}") + dst = EntityNode(label="LIRelConcept", name=f"Dst-{tag}") + store.upsert_nodes([src, dst]) + + rel = Relation( + label="LI_RESEARCHES", + source_id=src.id, + target_id=dst.id, + properties={"since": 2024}, + ) + store.upsert_relations([rel]) + + # CoordiNode does not support wildcard [r] patterns yet — must pass relation_names. + # See: get_triplets() implementation note. + triplets = store.get_triplets( + entity_names=[f"Src-{tag}"], + relation_names=["LI_RESEARCHES"], + ) + assert isinstance(triplets, list) + assert len(triplets) >= 1 + + labels = [t[1].label for t in triplets] + assert "LI_RESEARCHES" in labels + + +def test_get_rel_map(store, tag): + src = EntityNode(label="LIRelMap", name=f"RMapSrc-{tag}") + dst = EntityNode(label="LIRelMap", name=f"RMapDst-{tag}") + store.upsert_nodes([src, dst]) + + rel = Relation(label="LI_RELATED", source_id=src.id, target_id=dst.id) + store.upsert_relations([rel]) + + result = store.get_rel_map([src], depth=1, limit=10) + assert isinstance(result, list) + + +# ── Delete ──────────────────────────────────────────────────────────────────── + +def test_delete_by_id(store, tag): + node = EntityNode(label="LIDelete", name=f"Del-{tag}") + store.upsert_nodes([node]) + + store.delete(ids=[node.id]) + + found = store.get(ids=[node.id]) + assert len(found) == 0 + + +def test_delete_by_entity_name(store, tag): + node = EntityNode(label="LIDeleteByName", name=f"DelNamed-{tag}") + store.upsert_nodes([node]) + + store.delete(entity_names=[f"DelNamed-{tag}"]) + + found = store.get(properties={"name": f"DelNamed-{tag}"}) + assert len(found) == 0 diff --git a/uv.lock b/uv.lock index 0b461ba..3b28da1 100644 --- a/uv.lock +++ b/uv.lock @@ -369,6 +369,9 @@ dependencies = [ dev = [ { name = "build" }, { name = "grpcio-tools" }, + { name = "langchain-community" }, + { name = "langchain-core" }, + { name = "llama-index-core" }, { name = "pytest" }, { name = "pytest-asyncio" }, { name = "pytest-timeout" }, @@ -382,6 +385,9 @@ requires-dist = [{ name = "googleapis-common-protos", specifier = ">=1.74.0" }] dev = [ { name = "build", specifier = ">=1.2" }, { name = "grpcio-tools", specifier = ">=1.60" }, + { name = "langchain-community", specifier = ">=0.3" }, + { name = "langchain-core", specifier = ">=0.3" }, + { name = "llama-index-core", specifier = ">=0.12" }, { name = "pytest", specifier = ">=8" }, { name = "pytest-asyncio", specifier = ">=0.23" }, { name = "pytest-timeout", specifier = ">=2" }, From b1fd70da91240068638720bbc099c6ee709570bc Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 19:18:59 +0300 Subject: [PATCH 05/11] test(adapters): fix ruff lint errors in integration tests - Remove unused schema_before variable (F841) - Fix unsorted imports in test_llama_index.py (I001) --- tests/integration/adapters/test_langchain.py | 2 +- tests/integration/adapters/test_llama_index.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/integration/adapters/test_langchain.py b/tests/integration/adapters/test_langchain.py index 71984c4..4996214 100644 --- a/tests/integration/adapters/test_langchain.py +++ b/tests/integration/adapters/test_langchain.py @@ -119,7 +119,7 @@ def test_add_graph_documents_idempotent(graph, unique_tag): def test_schema_refreshes_after_add(graph, unique_tag): """structured_schema is invalidated and re-fetched after add_graph_documents.""" graph._schema = None # force refresh - schema_before = graph.schema + graph.schema # trigger initial fetch before mutation node = Node(id=f"SchemaNode-{unique_tag}", type="LCSchemaTest") doc = GraphDocument(nodes=[node], relationships=[], source=Document(page_content="test")) diff --git a/tests/integration/adapters/test_llama_index.py b/tests/integration/adapters/test_llama_index.py index 1a4b60e..62be044 100644 --- a/tests/integration/adapters/test_llama_index.py +++ b/tests/integration/adapters/test_llama_index.py @@ -12,7 +12,6 @@ import pytest from llama_index.core.graph_stores.types import EntityNode, Relation - from llama_index.graph_stores.coordinode import CoordinodePropertyGraphStore ADDR = os.environ.get("COORDINODE_ADDR", "localhost:7080") From 13487c9ca3b72ffce1a7ba17b8c37e988a692a10 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 19:22:25 +0300 Subject: [PATCH 06/11] fix(langchain): enforce node.id as merge key; stable document IDs - props["name"] = node.id overwrites incoming properties["name"] so the MERGE predicate and stored name always match; prevents duplicate nodes and broken relationship MATCH on re-ingest - Add _stable_document_id(): SHA-256(page_content + metadata) makes include_source=True idempotent across Python processes test(adapters): tighten integration assertions for idempotency and rels - test_add_graph_documents_creates_relationship: assert LC_RESEARCHES edge exists, not just source node presence - test_add_graph_documents_idempotent: cnt == 1 to catch duplicates - test_upsert_nodes_idempotent: len == 1 to catch duplicates - test_get_rel_map: assert len >= 1 to verify non-empty result --- .../langchain_coordinode/graph.py | 20 +++++++++++++++++-- tests/integration/adapters/test_langchain.py | 11 +++++----- .../integration/adapters/test_llama_index.py | 3 ++- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index 9d16ed6..c690ccc 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -2,6 +2,7 @@ from __future__ import annotations +import hashlib import re from typing import Any @@ -104,7 +105,9 @@ def add_graph_documents( for node in doc.nodes: label = _cypher_ident(node.type or "Entity") props = dict(node.properties or {}) - props.setdefault("name", node.id) + # Always enforce node.id as the merge key; incoming + # properties["name"] must not drift from the MERGE predicate. + props["name"] = node.id self._client.cypher( f"MERGE (n:{label} {{name: $name}}) SET n += $props", params={"name": node.id, "props": props}, @@ -138,7 +141,7 @@ def add_graph_documents( # ── Optionally link source document ─────────────────────────── if include_source and doc.source: - src_id = getattr(doc.source, "id", None) or str(id(doc.source)) + src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source) self._client.cypher( "MERGE (d:__Document__ {id: $id}) SET d.page_content = $text", params={"id": src_id, "text": doc.source.page_content or ""}, @@ -198,6 +201,19 @@ def __exit__(self, *args: Any) -> None: # ── Schema parser ───────────────────────────────────────────────────────── +def _stable_document_id(source: Any) -> str: + """Return a deterministic ID for a LangChain Document. + + Combines ``page_content`` and sorted ``metadata`` items so the same + document produces the same node across different Python processes, + making ``include_source=True`` re-ingest truly idempotent. + """ + content = getattr(source, "page_content", "") or "" + metadata = getattr(source, "metadata", {}) or {} + stable = content + "|" + "|".join(f"{k}={v}" for k, v in sorted(metadata.items())) + return hashlib.sha256(stable.encode()).hexdigest()[:32] + + def _cypher_ident(name: str) -> str: """Escape a label/type name for use as a Cypher identifier.""" # If already safe (alphanumeric + underscore, not starting with digit) keep as-is diff --git a/tests/integration/adapters/test_langchain.py b/tests/integration/adapters/test_langchain.py index 4996214..a685c93 100644 --- a/tests/integration/adapters/test_langchain.py +++ b/tests/integration/adapters/test_langchain.py @@ -93,12 +93,13 @@ def test_add_graph_documents_creates_relationship(graph, unique_tag): graph.add_graph_documents([doc]) - # Verify both nodes exist + # Verify the relationship was created, not just the source node. result = graph.query( - "MATCH (n:LCPerson2 {name: $name}) RETURN n.name AS name", - params={"name": f"Charlie-{unique_tag}"}, + "MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) " + "RETURN count(r) AS cnt", + params={"src": f"Charlie-{unique_tag}", "dst": f"GraphRAG-{unique_tag}"}, ) - assert len(result) >= 1, f"source node not found: {result}" + assert result[0]["cnt"] >= 1, f"relationship not found: {result}" def test_add_graph_documents_idempotent(graph, unique_tag): @@ -113,7 +114,7 @@ def test_add_graph_documents_idempotent(graph, unique_tag): "MATCH (n:LCIdempotent {name: $name}) RETURN count(n) AS cnt", params={"name": f"Idempotent-{unique_tag}"}, ) - assert result[0]["cnt"] >= 1 + assert result[0]["cnt"] == 1 def test_schema_refreshes_after_add(graph, unique_tag): diff --git a/tests/integration/adapters/test_llama_index.py b/tests/integration/adapters/test_llama_index.py index 62be044..a490bc8 100644 --- a/tests/integration/adapters/test_llama_index.py +++ b/tests/integration/adapters/test_llama_index.py @@ -66,7 +66,7 @@ def test_upsert_nodes_idempotent(store, tag): store.upsert_nodes([node]) # second call must not raise found = store.get(properties={"name": f"Idem-{tag}"}) - assert len(found) >= 1 + assert len(found) == 1 def test_get_by_id(store, tag): @@ -116,6 +116,7 @@ def test_get_rel_map(store, tag): result = store.get_rel_map([src], depth=1, limit=10) assert isinstance(result, list) + assert len(result) >= 1 # ── Delete ──────────────────────────────────────────────────────────────────── From f045c77e01a5d8ba20dcb87923ed68f4ce4337dc Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 19:37:19 +0300 Subject: [PATCH 07/11] fix(adapters): use unconditional CREATE for edges; fix get_rel_map MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LangChain adapter: - Remove WHERE NOT (pattern) guard — CoordiNode returns 0 rows silently instead of raising, so the guarded CREATE never executed - Remove try/except fallback (no longer needed) - Skip SET r += $props when props is empty — SET r += {} unsupported LlamaIndex adapter: - upsert_relations: same fix — skip SET r += $props when props is empty - get_rel_map: replace [r*1..N] variable-length path with single-hop [r]; variable-length paths don't serialize correctly in RETURN position - Adjust ignore_rels filter from NONE(rel IN r ...) to r.__type__ NOT IN (compatible with single-hop [r] pattern) --- .../langchain_coordinode/graph.py | 40 ++++++--------- .../graph_stores/coordinode/base.py | 51 +++++++++++-------- tests/integration/adapters/test_langchain.py | 6 ++- .../integration/adapters/test_llama_index.py | 4 ++ 4 files changed, 52 insertions(+), 49 deletions(-) diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index c690ccc..9db6ac6 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -113,30 +113,29 @@ def add_graph_documents( params={"name": node.id, "props": props}, ) - # ── Create relationships (idempotent: skip if already exists) ─ + # ── Create relationships ────────────────────────────────────── for rel in doc.relationships: src_label = _cypher_ident(rel.source.type or "Entity") dst_label = _cypher_ident(rel.target.type or "Entity") rel_type = _cypher_ident(rel.type) props = dict(rel.properties or {}) - # CoordiNode does not yet support MERGE for edges; use CREATE - # guarded by NOT EXISTS to avoid duplicates on repeated calls. - try: + # CoordiNode does not support MERGE for edges or WHERE NOT + # (pattern) guards — use unconditional CREATE. SET r += $props + # is skipped when props is empty because SET r += {} is not + # supported by all server versions. + if props: self._client.cypher( f"MATCH (src:{src_label} {{name: $src}}) " f"MATCH (dst:{dst_label} {{name: $dst}}) " - f"WHERE NOT (src)-[:{rel_type}]->(dst) " f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props", params={"src": rel.source.id, "dst": rel.target.id, "props": props}, ) - except Exception: # noqa: BLE001 - # WHERE NOT EXISTS guard may not be supported on all server - # versions; fall back to unconditional CREATE + else: self._client.cypher( f"MATCH (src:{src_label} {{name: $src}}) " f"MATCH (dst:{dst_label} {{name: $dst}}) " - f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props", - params={"src": rel.source.id, "dst": rel.target.id, "props": props}, + f"CREATE (src)-[r:{rel_type}]->(dst)", + params={"src": rel.source.id, "dst": rel.target.id}, ) # ── Optionally link source document ─────────────────────────── @@ -148,21 +147,12 @@ def add_graph_documents( ) for node in doc.nodes: label = _cypher_ident(node.type or "Entity") - try: - self._client.cypher( - f"MATCH (d:__Document__ {{id: $doc_id}}) " - f"MATCH (n:{label} {{name: $name}}) " - f"WHERE NOT (d)-[:MENTIONS]->(n) " - f"CREATE (d)-[:MENTIONS]->(n)", - params={"doc_id": src_id, "name": node.id}, - ) - except Exception: # noqa: BLE001 - self._client.cypher( - f"MATCH (d:__Document__ {{id: $doc_id}}) " - f"MATCH (n:{label} {{name: $name}}) " - f"CREATE (d)-[:MENTIONS]->(n)", - params={"doc_id": src_id, "name": node.id}, - ) + self._client.cypher( + f"MATCH (d:__Document__ {{id: $doc_id}}) " + f"MATCH (n:{label} {{name: $name}}) " + f"CREATE (d)-[:MENTIONS]->(n)", + params={"doc_id": src_id, "name": node.id}, + ) # Invalidate cached schema so next access reflects new data self._schema = None diff --git a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py index c9ab08c..81750ea 100644 --- a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py +++ b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py @@ -172,16 +172,19 @@ def get_rel_map( node_ids = [n.id for n in graph_nodes] ignored = list(ignore_rels) if ignore_rels else [] - # Push ignore_rels filter into Cypher so LIMIT applies after filtering; - # a Python-side filter after LIMIT would silently truncate valid results. params: dict[str, object] = {"ids": node_ids} ignore_clause = "" if ignored: - ignore_clause = " AND NONE(rel IN r WHERE type(rel) IN $ignored_rels)" + # Single-hop [r]: filter with r.__type__ NOT IN $ignored_rels. + ignore_clause = " AND NOT r.__type__ IN $ignored_rels" params["ignored_rels"] = ignored + # CoordiNode does not support variable-length path [r*1..N] in RETURN + # position (result serialisation is undefined for path lists). Use a + # single-hop pattern; multi-hop traversal is a future enhancement. + _ = depth # depth parameter reserved; currently single-hop only cypher = ( - f"MATCH (n)-[r*1..{depth}]->(m) " + f"MATCH (n)-[r]->(m) " f"WHERE n.id IN $ids{ignore_clause} " f"RETURN n, r, m, n.id AS _src_id, m.id AS _dst_id " f"LIMIT {limit}" @@ -194,12 +197,11 @@ def get_rel_map( dst_data = row.get("m", {}) src_id = str(row.get("_src_id", "")) dst_id = str(row.get("_dst_id", "")) - # Variable-length path [r*1..N] returns a list of relationship dicts. - rels = row.get("r", []) - if isinstance(rels, list) and rels: - first_rel = rels[0] - # CoordiNode: use __type__ key instead of "type" — type() returns null - rel_label = first_rel.get("__type__") or first_rel.get("type", "RELATED") if isinstance(first_rel, dict) else str(first_rel) + # Single-hop [r] returns the relationship as a dict. + # CoordiNode: use __type__ key — type() returns null. + r_val = row.get("r", {}) + if isinstance(r_val, dict): + rel_label = r_val.get("__type__") or r_val.get("type") or "RELATED" else: rel_label = "RELATED" src = _node_result_to_labelled(src_id, src_data) @@ -223,20 +225,25 @@ def upsert_relations(self, relations: list[Relation]) -> None: props = rel.properties or {} label = _cypher_ident(rel.label) # CoordiNode does not yet support MERGE for edge patterns; use CREATE. + # SET r += $props is skipped when props is empty — SET r += {} is + # not supported by all server versions. # Note: repeated calls will create duplicate edges until MERGE for # edges is implemented server-side. - cypher = ( - f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) " - f"CREATE (src)-[r:{label}]->(dst) SET r += $props" - ) - self._client.cypher( - cypher, - params={ - "src_id": rel.source_id, - "dst_id": rel.target_id, - "props": props, - }, - ) + if props: + cypher = ( + f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) " + f"CREATE (src)-[r:{label}]->(dst) SET r += $props" + ) + self._client.cypher( + cypher, + params={"src_id": rel.source_id, "dst_id": rel.target_id, "props": props}, + ) + else: + cypher = f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) CREATE (src)-[r:{label}]->(dst)" + self._client.cypher( + cypher, + params={"src_id": rel.source_id, "dst_id": rel.target_id}, + ) def delete( self, diff --git a/tests/integration/adapters/test_langchain.py b/tests/integration/adapters/test_langchain.py index a685c93..4a45e6c 100644 --- a/tests/integration/adapters/test_langchain.py +++ b/tests/integration/adapters/test_langchain.py @@ -27,6 +27,7 @@ def graph(): # ── Basic connectivity ──────────────────────────────────────────────────────── + def test_connect(graph): assert graph is not None @@ -47,6 +48,7 @@ def test_refresh_schema_does_not_raise(graph): # ── Cypher query ────────────────────────────────────────────────────────────── + def test_query_returns_list(graph): result = graph.query("RETURN 1 AS n") assert isinstance(result, list) @@ -61,6 +63,7 @@ def test_query_count(graph): # ── add_graph_documents ─────────────────────────────────────────────────────── + @pytest.fixture def unique_tag(): return uuid.uuid4().hex[:8] @@ -95,8 +98,7 @@ def test_add_graph_documents_creates_relationship(graph, unique_tag): # Verify the relationship was created, not just the source node. result = graph.query( - "MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) " - "RETURN count(r) AS cnt", + "MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(r) AS cnt", params={"src": f"Charlie-{unique_tag}", "dst": f"GraphRAG-{unique_tag}"}, ) assert result[0]["cnt"] >= 1, f"relationship not found: {result}" diff --git a/tests/integration/adapters/test_llama_index.py b/tests/integration/adapters/test_llama_index.py index a490bc8..83dea99 100644 --- a/tests/integration/adapters/test_llama_index.py +++ b/tests/integration/adapters/test_llama_index.py @@ -30,6 +30,7 @@ def tag(): # ── Basic connectivity ──────────────────────────────────────────────────────── + def test_connect(store): assert store is not None @@ -47,6 +48,7 @@ def test_structured_query_literal(store): # ── Node operations ─────────────────────────────────────────────────────────── + def test_upsert_and_get_nodes(store, tag): nodes = [ EntityNode(label="LITestPerson", name=f"Alice-{tag}", properties={"role": "researcher"}), @@ -80,6 +82,7 @@ def test_get_by_id(store, tag): # ── Relation operations ─────────────────────────────────────────────────────── + def test_upsert_and_get_triplets(store, tag): src = EntityNode(label="LIRelPerson", name=f"Src-{tag}") dst = EntityNode(label="LIRelConcept", name=f"Dst-{tag}") @@ -121,6 +124,7 @@ def test_get_rel_map(store, tag): # ── Delete ──────────────────────────────────────────────────────────────────── + def test_delete_by_id(store, tag): node = EntityNode(label="LIDelete", name=f"Del-{tag}") store.upsert_nodes([node]) From 19a3b346efbb080da35b2db6b442dc3fbf82d669 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 20:00:26 +0300 Subject: [PATCH 08/11] fix(adapters): raise NotImplementedError for unsupported wildcard patterns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - get_rel_map: query schema edge types, build typed [r:T1|T2|...] pattern instead of wildcard [r] which returns no results in CoordiNode - get_rel_map: raise NotImplementedError when depth != 1 - get_triplets: raise NotImplementedError when relation_names is None - add_graph_documents docstring: clarify edges use unconditional CREATE - _cypher_ident: use \w with re.ASCII instead of [A-Za-z0-9_] - tests: count(*) instead of count(r) — CoordiNode returns 0 for rel vars - tests: add relationship to idempotency test, assert edge count >= 1 - tests: assert depth=2 raises NotImplementedError - tests: fix example port 17080 -> 7080 in docstrings --- .../langchain_coordinode/graph.py | 16 ++-- .../graph_stores/coordinode/base.py | 84 +++++++++++++------ tests/integration/adapters/test_langchain.py | 32 +++++-- .../integration/adapters/test_llama_index.py | 11 ++- 4 files changed, 104 insertions(+), 39 deletions(-) diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index 9db6ac6..a8e102c 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -91,14 +91,18 @@ def add_graph_documents( ) -> None: """Store nodes and relationships extracted from ``GraphDocument`` objects. - Nodes are upserted by ``id`` (used as the ``name`` property). - Relationships are created between existing nodes; if a relationship - between the same source and target already exists it is skipped. + Nodes are upserted by ``id`` (used as the ``name`` property) via + ``MERGE``, so repeated calls are safe for nodes. + + Relationships are created with unconditional ``CREATE`` because + CoordiNode does not yet support ``MERGE`` for edge patterns. Re-ingesting + the same ``GraphDocument`` will therefore produce duplicate edges. Args: graph_documents: List of ``langchain_community.graphs.graph_document.GraphDocument``. include_source: If ``True``, also store the source ``Document`` as a - ``__Document__`` node linked to every extracted entity. + ``__Document__`` node linked to every extracted entity via + ``MENTIONS`` edges (also unconditional ``CREATE``). """ for doc in graph_documents: # ── Upsert nodes ────────────────────────────────────────────── @@ -206,8 +210,8 @@ def _stable_document_id(source: Any) -> str: def _cypher_ident(name: str) -> str: """Escape a label/type name for use as a Cypher identifier.""" - # If already safe (alphanumeric + underscore, not starting with digit) keep as-is - if re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", name): + # ASCII-only word characters: letter/digit/underscore, not starting with digit. + if re.match(r"^[A-Za-z_]\w*$", name, re.ASCII): return name return f"`{name.replace('`', '``')}`" diff --git a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py index 81750ea..bde2019 100644 --- a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py +++ b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py @@ -131,12 +131,15 @@ def get_triplets( rel_filter = "|".join(_cypher_ident(t) for t in relation_names) rel_pattern = f"[r:{rel_filter}]" else: - rel_pattern = "[r]" + # CoordiNode: wildcard [r] pattern returns no results. + # Callers must supply relation_names for the query to work. + raise NotImplementedError( + "CoordinodePropertyGraphStore.get_triplets() requires relation_names — " + "CoordiNode does not support untyped wildcard [r] patterns" + ) where = f"WHERE {' AND '.join(conditions)}" if conditions else "" # CoordiNode: use r.__type__ instead of type(r) — type() returns null. - # Wildcard [r] pattern also returns no results; caller must supply - # relation_names for wildcard queries to work. cypher = ( f"MATCH (n)-{rel_pattern}->(m) {where} " "RETURN n, r.__type__ AS rel_type, m, n.id AS _src_id, m.id AS _dst_id " @@ -165,28 +168,41 @@ def get_rel_map( limit: int = 30, ignore_rels: list[str] | None = None, ) -> list[list[LabelledNode]]: - """Get relationship map for a set of nodes up to ``depth`` hops.""" + """Get relationship map for a set of nodes up to ``depth`` hops. + + Note: only ``depth=1`` (single hop) is supported. ``depth > 1`` raises + ``NotImplementedError`` because CoordiNode does not yet serialise + variable-length path results. + """ + if depth != 1: + raise NotImplementedError( + "CoordinodePropertyGraphStore.get_rel_map() currently supports depth=1 only; " + "variable-length path queries are not yet available in CoordiNode" + ) + if not graph_nodes: return [] - node_ids = [n.id for n in graph_nodes] - ignored = list(ignore_rels) if ignore_rels else [] + # CoordiNode: wildcard [r] pattern returns no results. Fetch all + # known edge types from the schema and build a typed pattern instead, + # e.g. [r:TYPE_A|TYPE_B|...]. + schema_text = self._client.get_schema_text() + edge_types = _parse_edge_types_from_schema(schema_text) + + ignored = set(ignore_rels) if ignore_rels else set() + active_types = [t for t in edge_types if t not in ignored] + + if not active_types: + return [] + rel_filter = "|".join(_cypher_ident(t) for t in active_types) + node_ids = [n.id for n in graph_nodes] params: dict[str, object] = {"ids": node_ids} - ignore_clause = "" - if ignored: - # Single-hop [r]: filter with r.__type__ NOT IN $ignored_rels. - ignore_clause = " AND NOT r.__type__ IN $ignored_rels" - params["ignored_rels"] = ignored - - # CoordiNode does not support variable-length path [r*1..N] in RETURN - # position (result serialisation is undefined for path lists). Use a - # single-hop pattern; multi-hop traversal is a future enhancement. - _ = depth # depth parameter reserved; currently single-hop only + cypher = ( - f"MATCH (n)-[r]->(m) " - f"WHERE n.id IN $ids{ignore_clause} " - f"RETURN n, r, m, n.id AS _src_id, m.id AS _dst_id " + f"MATCH (n)-[r:{rel_filter}]->(m) " + f"WHERE n.id IN $ids " + f"RETURN n, r.__type__ AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id " f"LIMIT {limit}" ) result = self._client.cypher(cypher, params=params) @@ -197,13 +213,7 @@ def get_rel_map( dst_data = row.get("m", {}) src_id = str(row.get("_src_id", "")) dst_id = str(row.get("_dst_id", "")) - # Single-hop [r] returns the relationship as a dict. - # CoordiNode: use __type__ key — type() returns null. - r_val = row.get("r", {}) - if isinstance(r_val, dict): - rel_label = r_val.get("__type__") or r_val.get("type") or "RELATED" - else: - rel_label = "RELATED" + rel_label = str(row.get("_rel_type") or "RELATED") src = _node_result_to_labelled(src_id, src_data) dst = _node_result_to_labelled(dst_id, dst_data) rel = Relation(label=rel_label, source_id=src_id, target_id=dst_id) @@ -357,3 +367,25 @@ def _node_label(node: LabelledNode) -> str: if isinstance(node, EntityNode): return node.label or "Entity" return "Node" + + +def _parse_edge_types_from_schema(schema_text: str) -> list[str]: + """Extract edge type names from CoordiNode schema text. + + Parses the "Edge types:" section produced by ``get_schema_text()``. + """ + edge_types: list[str] = [] + in_edges = False + for line in schema_text.splitlines(): + stripped = line.strip() + if stripped.lower().startswith("edge types"): + in_edges = True + continue + if in_edges: + if not stripped: + break + if stripped.startswith("-") or stripped.startswith("*"): + name = stripped.lstrip("-* ").split("(")[0].strip() + if name: + edge_types.append(name) + return edge_types diff --git a/tests/integration/adapters/test_langchain.py b/tests/integration/adapters/test_langchain.py index 4a45e6c..2d5e274 100644 --- a/tests/integration/adapters/test_langchain.py +++ b/tests/integration/adapters/test_langchain.py @@ -4,7 +4,7 @@ (default: localhost:7080). Run via: - COORDINODE_ADDR=localhost:17080 pytest tests/integration/adapters/test_langchain.py -v + COORDINODE_ADDR=localhost:7080 pytest tests/integration/adapters/test_langchain.py -v """ import os @@ -97,27 +97,47 @@ def test_add_graph_documents_creates_relationship(graph, unique_tag): graph.add_graph_documents([doc]) # Verify the relationship was created, not just the source node. + # count(*) instead of count(r): CoordiNode returns 0 for relationship-variable counts result = graph.query( - "MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(r) AS cnt", + "MATCH (a:LCPerson2 {name: $src})-[r:LC_RESEARCHES]->(b:LCConcept {name: $dst}) RETURN count(*) AS cnt", params={"src": f"Charlie-{unique_tag}", "dst": f"GraphRAG-{unique_tag}"}, ) assert result[0]["cnt"] >= 1, f"relationship not found: {result}" def test_add_graph_documents_idempotent(graph, unique_tag): - """Calling add_graph_documents twice must not raise.""" - node = Node(id=f"Idempotent-{unique_tag}", type="LCIdempotent") - doc = GraphDocument(nodes=[node], relationships=[], source=Document(page_content="test")) + """Calling add_graph_documents twice must not raise. + + Nodes are idempotent (MERGE). Edges are NOT — CoordiNode does not yet + support MERGE for edges, so unconditional CREATE is used and duplicate + edges are expected after two ingests. + """ + node_a = Node(id=f"Idempotent-{unique_tag}", type="LCIdempotent") + node_b = Node(id=f"IdempTarget-{unique_tag}", type="LCIdempotent") + rel = Relationship(source=node_a, target=node_b, type="LC_IDEMP_REL") + doc = GraphDocument( + nodes=[node_a, node_b], + relationships=[rel], + source=Document(page_content="test"), + ) graph.add_graph_documents([doc]) graph.add_graph_documents([doc]) # second call must not raise + # Nodes: MERGE keeps count at 1 result = graph.query( - "MATCH (n:LCIdempotent {name: $name}) RETURN count(n) AS cnt", + "MATCH (n:LCIdempotent {name: $name}) RETURN count(*) AS cnt", params={"name": f"Idempotent-{unique_tag}"}, ) assert result[0]["cnt"] == 1 + # Edges: unconditional CREATE → count >= 1 (may be > 1 due to CoordiNode limitation) + result = graph.query( + "MATCH (a:LCIdempotent {name: $src})-[r:LC_IDEMP_REL]->(b:LCIdempotent {name: $dst}) RETURN count(*) AS cnt", + params={"src": f"Idempotent-{unique_tag}", "dst": f"IdempTarget-{unique_tag}"}, + ) + assert result[0]["cnt"] >= 1 + def test_schema_refreshes_after_add(graph, unique_tag): """structured_schema is invalidated and re-fetched after add_graph_documents.""" diff --git a/tests/integration/adapters/test_llama_index.py b/tests/integration/adapters/test_llama_index.py index 83dea99..bf90e74 100644 --- a/tests/integration/adapters/test_llama_index.py +++ b/tests/integration/adapters/test_llama_index.py @@ -4,7 +4,7 @@ (default: localhost:7080). Run via: - COORDINODE_ADDR=localhost:17080 pytest tests/integration/adapters/test_llama_index.py -v + COORDINODE_ADDR=localhost:7080 pytest tests/integration/adapters/test_llama_index.py -v """ import os @@ -122,6 +122,15 @@ def test_get_rel_map(store, tag): assert len(result) >= 1 +def test_get_rel_map_depth_gt1_raises(store, tag): + """depth > 1 must raise NotImplementedError until multi-hop is supported.""" + node = EntityNode(label="LIRelMapDepth", name=f"DepthNode-{tag}") + store.upsert_nodes([node]) + + with pytest.raises(NotImplementedError): + store.get_rel_map([node], depth=2, limit=10) + + # ── Delete ──────────────────────────────────────────────────────────────────── From 64a2877b12dabb759f44fd5e40f7b49896a55f2e Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 20:13:50 +0300 Subject: [PATCH 09/11] fix(adapters): fix wildcard [r] in refresh_schema, depth default, docstrings - refresh_schema: build typed [r:T1|T2|...] pattern from rel_props keys (CoordiNode wildcard [r] returns no results) - get_rel_map: change default depth from 2 to 1 (depth != 1 now raises) - _stable_document_id docstring: clarify only __Document__ node ID is stable; MENTIONS edges are not deduplicated (unconditional CREATE) - upsert_relations: add comment explaining WHERE NOT (pattern) returns 0 rows silently in CoordiNode, hence unconditional CREATE is used --- .../langchain_coordinode/graph.py | 35 ++++++++++++------- .../graph_stores/coordinode/base.py | 8 +++-- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index a8e102c..662db87 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -71,17 +71,23 @@ def refresh_schema(self) -> None: structured = _parse_schema(text) # Augment with relationship triples (start_label, type, end_label) via # Cypher — get_schema_text() only lists edge types without direction. - try: - rows = self._client.cypher( - "MATCH (a)-[r]->(b) RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst" - ) - structured["relationships"] = [ - {"start": row["src"], "type": row["rel"], "end": row["dst"]} - for row in rows - if row.get("src") and row.get("rel") and row.get("dst") - ] - except Exception: # noqa: BLE001 - pass # Graph may have no relationships yet; structured["relationships"] stays [] + # CoordiNode: wildcard [r] returns no results; build typed pattern from + # the rel_props keys returned by _parse_schema(). + rel_types = list(structured.get("rel_props", {}).keys()) + if rel_types: + try: + rel_filter = "|".join(_cypher_ident(t) for t in rel_types) + rows = self._client.cypher( + f"MATCH (a)-[r:{rel_filter}]->(b) " + "RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst" + ) + structured["relationships"] = [ + {"start": row["src"], "type": row["rel"], "end": row["dst"]} + for row in rows + if row.get("src") and row.get("rel") and row.get("dst") + ] + except Exception: # noqa: BLE001 + pass # Graph may have no relationships yet; structured["relationships"] stays [] self._structured_schema = structured def add_graph_documents( @@ -199,8 +205,11 @@ def _stable_document_id(source: Any) -> str: """Return a deterministic ID for a LangChain Document. Combines ``page_content`` and sorted ``metadata`` items so the same - document produces the same node across different Python processes, - making ``include_source=True`` re-ingest truly idempotent. + document produces the same ``__Document__`` node ID across different + Python processes. This makes document-node creation stable when + ``include_source=True`` is used, but does not make re-ingest fully + idempotent because ``MENTIONS`` edges are not deduplicated until edge + ``MERGE``/dedup support is added to CoordiNode. """ content = getattr(source, "page_content", "") or "" metadata = getattr(source, "metadata", {}) or {} diff --git a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py index bde2019..c5d4369 100644 --- a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py +++ b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py @@ -164,7 +164,7 @@ def get_triplets( def get_rel_map( self, graph_nodes: list[LabelledNode], - depth: int = 2, + depth: int = 1, limit: int = 30, ignore_rels: list[str] | None = None, ) -> list[list[LabelledNode]]: @@ -235,10 +235,12 @@ def upsert_relations(self, relations: list[Relation]) -> None: props = rel.properties or {} label = _cypher_ident(rel.label) # CoordiNode does not yet support MERGE for edge patterns; use CREATE. + # A WHERE NOT (src)-[:TYPE]->(dst) guard was tested but returns 0 + # rows silently in CoordiNode, making all CREATE statements no-ops. + # Until server-side MERGE or pattern predicates are supported, + # repeated calls will create duplicate edges. # SET r += $props is skipped when props is empty — SET r += {} is # not supported by all server versions. - # Note: repeated calls will create duplicate edges until MERGE for - # edges is implemented server-side. if props: cypher = ( f"MATCH (src {{id: $src_id}}) MATCH (dst {{id: $dst_id}}) " From 8953ce5e45752d743ed43cc55612a14a433220cb Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 20:37:53 +0300 Subject: [PATCH 10/11] refactor: extract helpers, fix hash encoding, reduce cognitive complexity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - graph.py: extract _upsert_node(), _create_edge(), _link_document_to_entities() from add_graph_documents() — reduces cognitive complexity from 23 to ~8 - graph.py: _stable_document_id() now uses json.dumps(sort_keys=True) instead of string concat with "|"/"=" — fixes delimiter ambiguity for nested metadata - base.py: _parse_edge_types_from_schema() refactored to use iter() with two for-loops instead of boolean flag — reduces complexity from 16 to 14 - base.py: get_triplets() docstring documents that relation_names is required --- .../langchain_coordinode/graph.py | 117 ++++++++++-------- .../graph_stores/coordinode/base.py | 36 ++++-- 2 files changed, 89 insertions(+), 64 deletions(-) diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index 662db87..30c13da 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -3,6 +3,7 @@ from __future__ import annotations import hashlib +import json import re from typing import Any @@ -111,63 +112,70 @@ def add_graph_documents( ``MENTIONS`` edges (also unconditional ``CREATE``). """ for doc in graph_documents: - # ── Upsert nodes ────────────────────────────────────────────── for node in doc.nodes: - label = _cypher_ident(node.type or "Entity") - props = dict(node.properties or {}) - # Always enforce node.id as the merge key; incoming - # properties["name"] must not drift from the MERGE predicate. - props["name"] = node.id - self._client.cypher( - f"MERGE (n:{label} {{name: $name}}) SET n += $props", - params={"name": node.id, "props": props}, - ) - - # ── Create relationships ────────────────────────────────────── + self._upsert_node(node) for rel in doc.relationships: - src_label = _cypher_ident(rel.source.type or "Entity") - dst_label = _cypher_ident(rel.target.type or "Entity") - rel_type = _cypher_ident(rel.type) - props = dict(rel.properties or {}) - # CoordiNode does not support MERGE for edges or WHERE NOT - # (pattern) guards — use unconditional CREATE. SET r += $props - # is skipped when props is empty because SET r += {} is not - # supported by all server versions. - if props: - self._client.cypher( - f"MATCH (src:{src_label} {{name: $src}}) " - f"MATCH (dst:{dst_label} {{name: $dst}}) " - f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props", - params={"src": rel.source.id, "dst": rel.target.id, "props": props}, - ) - else: - self._client.cypher( - f"MATCH (src:{src_label} {{name: $src}}) " - f"MATCH (dst:{dst_label} {{name: $dst}}) " - f"CREATE (src)-[r:{rel_type}]->(dst)", - params={"src": rel.source.id, "dst": rel.target.id}, - ) - - # ── Optionally link source document ─────────────────────────── + self._create_edge(rel) if include_source and doc.source: - src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source) - self._client.cypher( - "MERGE (d:__Document__ {id: $id}) SET d.page_content = $text", - params={"id": src_id, "text": doc.source.page_content or ""}, - ) - for node in doc.nodes: - label = _cypher_ident(node.type or "Entity") - self._client.cypher( - f"MATCH (d:__Document__ {{id: $doc_id}}) " - f"MATCH (n:{label} {{name: $name}}) " - f"CREATE (d)-[:MENTIONS]->(n)", - params={"doc_id": src_id, "name": node.id}, - ) + self._link_document_to_entities(doc) # Invalidate cached schema so next access reflects new data self._schema = None self._structured_schema = None + def _upsert_node(self, node: Any) -> None: + """Upsert a single node by ``id`` via MERGE.""" + label = _cypher_ident(node.type or "Entity") + props = dict(node.properties or {}) + # Always enforce node.id as the merge key; incoming + # properties["name"] must not drift from the MERGE predicate. + props["name"] = node.id + self._client.cypher( + f"MERGE (n:{label} {{name: $name}}) SET n += $props", + params={"name": node.id, "props": props}, + ) + + def _create_edge(self, rel: Any) -> None: + """Create a relationship via unconditional CREATE. + + CoordiNode does not support MERGE for edge patterns. Re-ingesting the + same relationship will create a duplicate edge. SET r += $props is + skipped when props is empty because SET r += {} is not supported by all + server versions. + """ + src_label = _cypher_ident(rel.source.type or "Entity") + dst_label = _cypher_ident(rel.target.type or "Entity") + rel_type = _cypher_ident(rel.type) + props = dict(rel.properties or {}) + if props: + self._client.cypher( + f"MATCH (src:{src_label} {{name: $src}}) " + f"MATCH (dst:{dst_label} {{name: $dst}}) " + f"CREATE (src)-[r:{rel_type}]->(dst) SET r += $props", + params={"src": rel.source.id, "dst": rel.target.id, "props": props}, + ) + else: + self._client.cypher( + f"MATCH (src:{src_label} {{name: $src}}) " + f"MATCH (dst:{dst_label} {{name: $dst}}) " + f"CREATE (src)-[r:{rel_type}]->(dst)", + params={"src": rel.source.id, "dst": rel.target.id}, + ) + + def _link_document_to_entities(self, doc: Any) -> None: + """Upsert a ``__Document__`` node and CREATE ``MENTIONS`` edges to all entities.""" + src_id = getattr(doc.source, "id", None) or _stable_document_id(doc.source) + self._client.cypher( + "MERGE (d:__Document__ {id: $id}) SET d.page_content = $text", + params={"id": src_id, "text": doc.source.page_content or ""}, + ) + for node in doc.nodes: + label = _cypher_ident(node.type or "Entity") + self._client.cypher( + f"MATCH (d:__Document__ {{id: $doc_id}}) MATCH (n:{label} {{name: $name}}) CREATE (d)-[:MENTIONS]->(n)", + params={"doc_id": src_id, "name": node.id}, + ) + def query( self, query: str, @@ -213,8 +221,15 @@ def _stable_document_id(source: Any) -> str: """ content = getattr(source, "page_content", "") or "" metadata = getattr(source, "metadata", {}) or {} - stable = content + "|" + "|".join(f"{k}={v}" for k, v in sorted(metadata.items())) - return hashlib.sha256(stable.encode()).hexdigest()[:32] + # Use canonical JSON encoding to avoid delimiter ambiguity and ensure + # determinism for nested/non-scalar metadata values. + canonical = json.dumps( + {"content": content, "metadata": metadata}, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ) + return hashlib.sha256(canonical.encode()).hexdigest()[:32] def _cypher_ident(name: str) -> str: diff --git a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py index c5d4369..1e4662b 100644 --- a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py +++ b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py @@ -117,7 +117,13 @@ def get_triplets( properties: dict[str, Any] | None = None, ids: list[str] | None = None, ) -> list[list[LabelledNode]]: - """Retrieve triplets (subject, predicate, object) as node triples.""" + """Retrieve triplets (subject, predicate, object) as node triples. + + Note: + ``relation_names`` is **required**. CoordiNode does not support + untyped wildcard ``[r]`` relationship patterns — they silently return + no rows. Omitting ``relation_names`` raises ``NotImplementedError``. + """ conditions: list[str] = [] params: dict[str, Any] = {} @@ -377,17 +383,21 @@ def _parse_edge_types_from_schema(schema_text: str) -> list[str]: Parses the "Edge types:" section produced by ``get_schema_text()``. """ edge_types: list[str] = [] - in_edges = False - for line in schema_text.splitlines(): + lines = iter(schema_text.splitlines()) + + # Advance to the "Edge types:" header. + for line in lines: + if line.strip().lower().startswith("edge types"): + break + + # Collect bullet items until the first blank line. + for line in lines: stripped = line.strip() - if stripped.lower().startswith("edge types"): - in_edges = True - continue - if in_edges: - if not stripped: - break - if stripped.startswith("-") or stripped.startswith("*"): - name = stripped.lstrip("-* ").split("(")[0].strip() - if name: - edge_types.append(name) + if not stripped: + break + if stripped.startswith(("-", "*")): + name = stripped.lstrip("-* ").split("(")[0].strip() + if name: + edge_types.append(name) + return edge_types From 828e8d976c607d1f7745eec9beb6c5692e252913 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Thu, 9 Apr 2026 21:04:55 +0300 Subject: [PATCH 11/11] fix: harden refresh_schema, _stable_document_id, get_rel_map limit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - refresh_schema(): remove broad except Exception — transport/query errors now propagate instead of silently producing an empty relationships list - _stable_document_id(): add default=str to json.dumps to handle non-JSON- serializable metadata values (datetime, UUID, Path, custom objects) - get_rel_map(): coerce limit to int() before Cypher f-string interpolation to prevent injection via non-integer caller input --- .../langchain_coordinode/graph.py | 28 +++++++++---------- .../graph_stores/coordinode/base.py | 3 +- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/langchain-coordinode/langchain_coordinode/graph.py b/langchain-coordinode/langchain_coordinode/graph.py index 30c13da..6814c95 100644 --- a/langchain-coordinode/langchain_coordinode/graph.py +++ b/langchain-coordinode/langchain_coordinode/graph.py @@ -76,19 +76,16 @@ def refresh_schema(self) -> None: # the rel_props keys returned by _parse_schema(). rel_types = list(structured.get("rel_props", {}).keys()) if rel_types: - try: - rel_filter = "|".join(_cypher_ident(t) for t in rel_types) - rows = self._client.cypher( - f"MATCH (a)-[r:{rel_filter}]->(b) " - "RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst" - ) - structured["relationships"] = [ - {"start": row["src"], "type": row["rel"], "end": row["dst"]} - for row in rows - if row.get("src") and row.get("rel") and row.get("dst") - ] - except Exception: # noqa: BLE001 - pass # Graph may have no relationships yet; structured["relationships"] stays [] + rel_filter = "|".join(_cypher_ident(t) for t in rel_types) + rows = self._client.cypher( + f"MATCH (a)-[r:{rel_filter}]->(b) " + "RETURN DISTINCT a.__label__ AS src, r.__type__ AS rel, b.__label__ AS dst" + ) + structured["relationships"] = [ + {"start": row["src"], "type": row["rel"], "end": row["dst"]} + for row in rows + if row.get("src") and row.get("rel") and row.get("dst") + ] self._structured_schema = structured def add_graph_documents( @@ -222,12 +219,15 @@ def _stable_document_id(source: Any) -> str: content = getattr(source, "page_content", "") or "" metadata = getattr(source, "metadata", {}) or {} # Use canonical JSON encoding to avoid delimiter ambiguity and ensure - # determinism for nested/non-scalar metadata values. + # determinism for nested/non-scalar metadata values. default=str converts + # non-JSON-serializable types (datetime, UUID, Path, …) to their string + # representation so the hash never raises TypeError. canonical = json.dumps( {"content": content, "metadata": metadata}, sort_keys=True, separators=(",", ":"), ensure_ascii=False, + default=str, ) return hashlib.sha256(canonical.encode()).hexdigest()[:32] diff --git a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py index 1e4662b..2d43788 100644 --- a/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py +++ b/llama-index-coordinode/llama_index/graph_stores/coordinode/base.py @@ -203,13 +203,14 @@ def get_rel_map( rel_filter = "|".join(_cypher_ident(t) for t in active_types) node_ids = [n.id for n in graph_nodes] + safe_limit = int(limit) # coerce to int to prevent Cypher injection via non-integer input params: dict[str, object] = {"ids": node_ids} cypher = ( f"MATCH (n)-[r:{rel_filter}]->(m) " f"WHERE n.id IN $ids " f"RETURN n, r.__type__ AS _rel_type, m, n.id AS _src_id, m.id AS _dst_id " - f"LIMIT {limit}" + f"LIMIT {safe_limit}" ) result = self._client.cypher(cypher, params=params)