Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapseclient/core/constants/concrete_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
GRID_RECORD_SET_EXPORT_REQUEST = (
"org.sagebionetworks.repo.model.grid.GridRecordSetExportRequest"
)
GRID_CSV_IMPORT_REQUEST = "org.sagebionetworks.repo.model.grid.GridCsvImportRequest"
LIST_GRID_SESSIONS_REQUEST = (
"org.sagebionetworks.repo.model.grid.ListGridSessionsRequest"
)
Expand Down
230 changes: 230 additions & 0 deletions synapseclient/models/curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
data or metadata in Synapse.
"""

import asyncio
import os
from dataclasses import dataclass, field, replace
from typing import Any, AsyncGenerator, Dict, Generator, Optional, Protocol, Union

Expand All @@ -28,11 +30,13 @@
from synapseclient.core.constants.concrete_types import (
CREATE_GRID_REQUEST,
FILE_BASED_METADATA_TASK_PROPERTIES,
GRID_CSV_IMPORT_REQUEST,
GRID_RECORD_SET_EXPORT_REQUEST,
LIST_GRID_SESSIONS_REQUEST,
LIST_GRID_SESSIONS_RESPONSE,
RECORD_BASED_METADATA_TASK_PROPERTIES,
)
from synapseclient.core.upload.multipart_upload_async import multipart_upload_file_async
from synapseclient.core.utils import delete_none_keys, merge_dataclass_entities
from synapseclient.models.mixins.asynchronous_job import AsynchronousCommunicator
from synapseclient.models.recordset import ValidationSummary
Expand Down Expand Up @@ -1078,6 +1082,88 @@ def to_synapse_request(self) -> Dict[str, Any]:
return request_dict


@dataclass
class GridCsvImportRequest(AsynchronousCommunicator):
"""
A request to import a CSV file into an existing grid session.

Represents a [Synapse GridCsvImportRequest](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/grid/GridCsvImportRequest.html).

Attributes:
concrete_type: The concrete type for the request
session_id: The grid session ID to import the CSV into
file_handle_id: The file handle ID of the CSV file to import
"""

concrete_type: str = GRID_CSV_IMPORT_REQUEST
"""The concrete type for the request"""

session_id: Optional[str] = None
"""The grid session ID to import the CSV into"""

file_handle_id: Optional[str] = None
"""The file handle ID of the CSV file to import"""

_grid_session_data: Optional[Dict[str, Any]] = field(default=None, compare=False)
"""Internal storage of the full grid session data from the response for later use."""

def fill_from_dict(
self, synapse_response: Union[Dict[str, Any], Any]
) -> "GridCsvImportRequest":
"""
Converts a response from the REST API into this dataclass.

Arguments:
synapse_response: The response from the REST API.

Returns:
The GridCsvImportRequest object.
"""
grid_session_data = synapse_response.get("gridSession", {})
self.session_id = grid_session_data.get("sessionId", None)
self._grid_session_data = grid_session_data
return self

def fill_grid_session_from_response(self, grid_session: "Grid") -> "Grid":
"""
Fills a Grid object with data from the stored response.

Arguments:
grid_session: The Grid object to populate.

Returns:
The populated Grid object.
"""
if not self._grid_session_data:
return grid_session

data = self._grid_session_data
grid_session.session_id = data.get("sessionId", None)
grid_session.started_by = data.get("startedBy", None)
grid_session.started_on = data.get("startedOn", None)
grid_session.etag = data.get("etag", None)
grid_session.modified_on = data.get("modifiedOn", None)
grid_session.last_replica_id_client = data.get("lastReplicaIdClient", None)
grid_session.last_replica_id_service = data.get("lastReplicaIdService", None)
grid_session.grid_json_schema_id = data.get("gridJsonSchema$Id", None)
grid_session.source_entity_id = data.get("sourceEntityId", None)
return grid_session

def to_synapse_request(self) -> Dict[str, Any]:
"""
Converts this dataclass to a dictionary suitable for a Synapse REST API request.

Returns:
A dictionary representation of this object for API requests.
"""
request_dict = {"concreteType": self.concrete_type}
if self.session_id is not None:
request_dict["sessionId"] = self.session_id
if self.file_handle_id is not None:
request_dict["fileHandleId"] = self.file_handle_id
return request_dict


@dataclass
class GridSession:
"""
Expand Down Expand Up @@ -1427,6 +1513,66 @@ def list(
```
"""

def import_csv(
self,
file_handle_id: Optional[str] = None,
local_path: Optional[str] = None,
*,
timeout: int = 120,
synapse_client: Optional[Synapse] = None,
) -> "Grid":
"""
Import a CSV file into the grid session to populate or update grid data.

Arguments:
file_handle_id: The file handle ID of the CSV file to import. Either
this or `local_path` must be provided.
local_path: Path to a local CSV file to upload and import. Either this
or `file_handle_id` must be provided. The file will be uploaded
automatically before the import.
timeout: The number of seconds to wait for the job to complete or progress
before raising a SynapseTimeoutError. Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
Grid: The Grid object with updated session data after the import.

Raises:
ValueError: If `session_id` is not set.
ValueError: If neither `file_handle_id` nor `local_path` is provided.

Example: Import a CSV via file handle ID
 

```python
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

grid = Grid(session_id="abc-123-def")
grid = grid.import_csv(file_handle_id="12345678")
```

Example: Import a CSV from a local file
 

```python
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

grid = Grid(session_id="abc-123-def")
grid = grid.import_csv(local_path="/path/to/data.csv")
```
"""
return self


@dataclass
@async_to_sync
Expand Down Expand Up @@ -1694,6 +1840,90 @@ async def main():

return self

async def import_csv_async(
self,
file_handle_id: Optional[str] = None,
local_path: Optional[str] = None,
*,
timeout: int = 120,
synapse_client: Optional[Synapse] = None,
) -> "Grid":
"""
Import a CSV file into the grid session to populate or update grid data.

Arguments:
file_handle_id: The file handle ID of the CSV file to import. Either
this or `local_path` must be provided.
local_path: Path to a local CSV file to upload and import. Either this
or `file_handle_id` must be provided. The file will be uploaded
automatically before the import.
timeout: The number of seconds to wait for the job to complete or progress
before raising a SynapseTimeoutError. Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
Grid: The Grid object with updated session data after the import.

Raises:
ValueError: If `session_id` is not set.
ValueError: If neither `file_handle_id` nor `local_path` is provided.

Example: Import a CSV from a local file asynchronously
 

```python
import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

async def main():
grid = Grid(session_id="abc-123-def")
grid = await grid.import_csv_async(local_path="/path/to/data.csv")
print(f"Import complete, session etag: {grid.etag}")

asyncio.run(main())
```
"""
if not self.session_id:
raise ValueError("session_id is required to import CSV into a GridSession")
if not file_handle_id and not local_path:
raise ValueError(
"Either file_handle_id or local_path must be provided to import CSV"
)

trace.get_current_span().set_attributes(
{
"synapse.session_id": self.session_id or "",
}
)

if local_path:
client = Synapse.get_client(synapse_client=synapse_client)
async with client._get_parallel_file_transfer_semaphore(
asyncio_event_loop=asyncio.get_running_loop()
):
file_handle_id = await multipart_upload_file_async(
syn=client,
file_path=os.path.expanduser(local_path),
)

import_request = GridCsvImportRequest(
session_id=self.session_id,
file_handle_id=file_handle_id,
)
result = await import_request.send_job_and_wait_async(
timeout=timeout, synapse_client=synapse_client
)

result.fill_grid_session_from_response(self)

return self

def fill_from_dict(self, synapse_response: Dict[str, Any]) -> "Grid":
"""Converts a response from the REST API into this dataclass."""
self.session_id = synapse_response.get("sessionId", None)
Expand Down
2 changes: 2 additions & 0 deletions synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CREATE_GRID_REQUEST,
CREATE_SCHEMA_REQUEST,
GET_VALIDATION_SCHEMA_REQUEST,
GRID_CSV_IMPORT_REQUEST,
GRID_RECORD_SET_EXPORT_REQUEST,
QUERY_BUNDLE_REQUEST,
QUERY_TABLE_CSV_REQUEST,
Expand All @@ -30,6 +31,7 @@
AGENT_CHAT_REQUEST: "/agent/chat/async",
CREATE_GRID_REQUEST: "/grid/session/async",
GRID_RECORD_SET_EXPORT_REQUEST: "/grid/export/recordset/async",
GRID_CSV_IMPORT_REQUEST: "/grid/import/csv/async",
TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async",
GET_VALIDATION_SCHEMA_REQUEST: "/schema/type/validation/async",
CREATE_SCHEMA_REQUEST: "/schema/type/create/async",
Expand Down
66 changes: 66 additions & 0 deletions tests/integration/synapseclient/models/async/test_grid_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,69 @@ async def test_delete_grid_session_validation_error_async(self) -> None:
match="session_id is required to delete a GridSession",
):
await grid.delete_async(synapse_client=self.syn)

async def test_import_csv_async(self, record_set_fixture: RecordSet) -> None:
# GIVEN: A grid session created from a record set
grid = Grid(record_set_id=record_set_fixture.id)
created_grid = await grid.create_async(
timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn
)
assert created_grid.session_id is not None
self.schedule_for_cleanup(
lambda: Grid(session_id=created_grid.session_id).delete(
synapse_client=self.syn
)
)

# AND: A new CSV to import
new_data = pd.DataFrame(
{
"id": [6, 7],
"name": ["Zeta", "Eta"],
"value": [60.0, 70.0],
"category": ["A", "B"],
"active": [True, False],
}
)
temp_fd, csv_path = tempfile.mkstemp(suffix=".csv")
try:
os.close(temp_fd)
new_data.to_csv(csv_path, index=False)
self.schedule_for_cleanup(csv_path)

# WHEN: Importing the CSV via local path
result = await created_grid.import_csv_async(
local_path=csv_path,
timeout=ASYNC_JOB_TIMEOUT_SEC,
synapse_client=self.syn,
)
except Exception:
if os.path.exists(csv_path):
os.unlink(csv_path)
raise

# THEN: The grid should be updated
assert result is created_grid
assert result.session_id is not None

async def test_import_csv_async_validation_error_no_session_id(self) -> None:
# GIVEN: A Grid instance with no session_id
grid = Grid()

# WHEN/THEN: Importing CSV should raise ValueError
with pytest.raises(
ValueError,
match="session_id is required to import CSV",
):
await grid.import_csv_async(file_handle_id="12345", synapse_client=self.syn)

async def test_import_csv_async_validation_error_no_file_source(self) -> None:
# GIVEN: A Grid instance with a session_id but no file source
grid = Grid(session_id="some-session-id")

# WHEN/THEN: Importing CSV without file source should raise ValueError
with pytest.raises(
ValueError,
match="Either file_handle_id or local_path must be provided",
):
await grid.import_csv_async(synapse_client=self.syn)
Loading
Loading