Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapseclient/core/constants/concrete_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
GRID_RECORD_SET_EXPORT_REQUEST = (
"org.sagebionetworks.repo.model.grid.GridRecordSetExportRequest"
)
GRID_CSV_IMPORT_REQUEST = "org.sagebionetworks.repo.model.grid.GridCsvImportRequest"
LIST_GRID_SESSIONS_REQUEST = (
"org.sagebionetworks.repo.model.grid.ListGridSessionsRequest"
)
Expand Down
226 changes: 226 additions & 0 deletions synapseclient/models/curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
data or metadata in Synapse.
"""

import os
from dataclasses import dataclass, field, replace
from typing import Any, AsyncGenerator, Dict, Generator, Optional, Protocol, Union

Expand All @@ -28,6 +29,7 @@
from synapseclient.core.constants.concrete_types import (
CREATE_GRID_REQUEST,
FILE_BASED_METADATA_TASK_PROPERTIES,
GRID_CSV_IMPORT_REQUEST,
GRID_RECORD_SET_EXPORT_REQUEST,
LIST_GRID_SESSIONS_REQUEST,
LIST_GRID_SESSIONS_RESPONSE,
Expand Down Expand Up @@ -1078,6 +1080,64 @@ def to_synapse_request(self) -> Dict[str, Any]:
return request_dict


@dataclass
class GridCsvImportRequest(AsynchronousCommunicator):
"""
Request to import a CSV file into an existing grid session.

Represents a Synapse GridCsvImportRequest:
POST /grid/import/csv/async/start
GET /grid/import/csv/async/get/{asyncToken}

Attributes:
concrete_type: The concrete type for the request
session_id: The grid session ID to import the CSV into
file_handle_id: The file handle ID of the CSV file to import
response_session_id: The session ID from the import response (populated from response)
"""

concrete_type: str = GRID_CSV_IMPORT_REQUEST
"""The concrete type for the request"""

session_id: Optional[str] = None
"""The grid session ID to import the CSV into"""

file_handle_id: Optional[str] = None
"""The file handle ID of the CSV file to import"""

response_session_id: Optional[str] = None
"""The session ID from the import response (populated from response)"""

def fill_from_dict(
self, synapse_response: Dict[str, Any]
) -> "GridCsvImportRequest":
"""
Converts a response from the REST API into this dataclass.

Arguments:
synapse_response: The response from the REST API.

Returns:
The GridCsvImportRequest object.
"""
self.response_session_id = synapse_response.get("sessionId", None)
return self

def to_synapse_request(self) -> Dict[str, Any]:
"""
Converts this dataclass to a dictionary suitable for a Synapse REST API request.

Returns:
A dictionary representation of this object for API requests.
"""
request_dict: Dict[str, Any] = {"concreteType": self.concrete_type}
if self.session_id is not None:
request_dict["sessionId"] = self.session_id
if self.file_handle_id is not None:
request_dict["fileHandleId"] = self.file_handle_id
return request_dict


@dataclass
class GridSession:
"""
Expand Down Expand Up @@ -1373,6 +1433,69 @@ def delete(self, *, synapse_client: Optional[Synapse] = None) -> None:
"""
return None

def import_csv(
self,
file_handle_id: Optional[str] = None,
path: Optional[str] = None,
*,
timeout: int = 120,
synapse_client: Optional[Synapse] = None,
) -> "Grid":
"""
Import a CSV file into the grid session.

Accepts either a pre-uploaded `file_handle_id` or a local `path` that will
be uploaded automatically before importing.

Arguments:
file_handle_id: The Synapse file handle ID of the CSV to import.
Mutually exclusive with `path`.
path: Local path to a CSV file. If provided, the file will be uploaded
via multipart upload and the resulting file handle ID used for import.
Mutually exclusive with `file_handle_id`.
timeout: The number of seconds to wait for the async job to complete.
Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
The Grid object (self).

Raises:
ValueError: If `session_id` is not set.
ValueError: If neither `file_handle_id` nor `path` is provided.

Example: Import CSV by file handle ID
 

```python
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

grid = Grid(session_id="abc-123-def")
grid = grid.import_csv(file_handle_id="12345678")
```

Example: Import CSV from a local path
 

```python
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

grid = Grid(session_id="abc-123-def")
grid = grid.import_csv(path="/path/to/data.csv")
```
"""
return self

@classmethod
def list(
cls,
Expand Down Expand Up @@ -1838,3 +1961,106 @@ async def main():
await delete_grid_session(
session_id=self.session_id, synapse_client=synapse_client
)

async def import_csv_async(
self,
file_handle_id: Optional[str] = None,
path: Optional[str] = None,
*,
timeout: int = 120,
synapse_client: Optional[Synapse] = None,
) -> "Grid":
"""
Import a CSV file into the grid session.

Accepts either a pre-uploaded `file_handle_id` or a local `path` that will
be uploaded automatically before importing.

Arguments:
file_handle_id: The Synapse file handle ID of the CSV to import.
Mutually exclusive with `path`.
path: Local path to a CSV file. If provided, the file will be uploaded
via multipart upload and the resulting file handle ID used for import.
Mutually exclusive with `file_handle_id`.
timeout: The number of seconds to wait for the async job to complete.
Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
The Grid object (self).

Raises:
ValueError: If `session_id` is not set.
ValueError: If neither `file_handle_id` nor `path` is provided.

Example: Import CSV by file handle ID asynchronously
 

```python
import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

async def main():
grid = Grid(session_id="abc-123-def")
grid = await grid.import_csv_async(file_handle_id="12345678")

asyncio.run(main())
```

Example: Import CSV from a local path asynchronously
 

```python
import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

async def main():
grid = Grid(session_id="abc-123-def")
grid = await grid.import_csv_async(path="/path/to/data.csv")

asyncio.run(main())
```
"""
if not self.session_id:
raise ValueError("session_id is required to import CSV into a GridSession")
if not file_handle_id and not path:
raise ValueError(
"Either file_handle_id or path must be provided to import a CSV"
)

trace.get_current_span().set_attributes(
{
"synapse.session_id": self.session_id or "",
}
)

client = Synapse.get_client(synapse_client=synapse_client)

if path:
from synapseclient.core.upload.multipart_upload_async import (
multipart_upload_file_async,
)

path = os.path.expanduser(path)
file_handle_id = await multipart_upload_file_async(
syn=client, file_path=path, content_type="text/csv"
)

import_request = GridCsvImportRequest(
session_id=self.session_id, file_handle_id=file_handle_id
)
await import_request.send_job_and_wait_async(
timeout=timeout, synapse_client=client
)

return self
2 changes: 2 additions & 0 deletions synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CREATE_GRID_REQUEST,
CREATE_SCHEMA_REQUEST,
GET_VALIDATION_SCHEMA_REQUEST,
GRID_CSV_IMPORT_REQUEST,
GRID_RECORD_SET_EXPORT_REQUEST,
QUERY_BUNDLE_REQUEST,
QUERY_TABLE_CSV_REQUEST,
Expand All @@ -29,6 +30,7 @@
ASYNC_JOB_URIS = {
AGENT_CHAT_REQUEST: "/agent/chat/async",
CREATE_GRID_REQUEST: "/grid/session/async",
GRID_CSV_IMPORT_REQUEST: "/grid/import/csv/async",
GRID_RECORD_SET_EXPORT_REQUEST: "/grid/export/recordset/async",
TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async",
GET_VALIDATION_SCHEMA_REQUEST: "/schema/type/validation/async",
Expand Down
78 changes: 78 additions & 0 deletions tests/integration/synapseclient/models/async/test_grid_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,81 @@ async def test_delete_grid_session_validation_error_async(self) -> None:
match="session_id is required to delete a GridSession",
):
await grid.delete_async(synapse_client=self.syn)

async def test_import_csv_async_with_file_handle_id(
self, record_set_fixture: RecordSet
) -> None:
# GIVEN: A grid session and the file handle ID from the record set
grid = Grid(record_set_id=record_set_fixture.id)
created_grid = await grid.create_async(
timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn
)
self.schedule_for_cleanup(created_grid.session_id)

assert created_grid.session_id is not None
assert record_set_fixture.data_file_handle_id is not None

# WHEN: Importing the CSV using the record set's file handle ID
result = await created_grid.import_csv_async(
file_handle_id=record_set_fixture.data_file_handle_id,
timeout=ASYNC_JOB_TIMEOUT_SEC,
synapse_client=self.syn,
)

# THEN: The import should complete without error and return the same grid
assert result is created_grid

async def test_import_csv_async_with_local_path(
self, record_set_fixture: RecordSet
) -> None:
# GIVEN: A grid session and a local CSV file
grid = Grid(record_set_id=record_set_fixture.id)
created_grid = await grid.create_async(
timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn
)
self.schedule_for_cleanup(created_grid.session_id)

assert created_grid.session_id is not None

# Create a local CSV with matching columns
import_data = pd.DataFrame(
{
"id": [10, 20],
"name": ["Zeta", "Eta"],
"value": [99.9, 88.8],
"category": ["D", "E"],
"active": [True, False],
}
)
temp_fd, filename = tempfile.mkstemp(suffix=".csv")
try:
os.close(temp_fd)
import_data.to_csv(filename, index=False)
self.schedule_for_cleanup(filename)

# WHEN: Importing the CSV from a local path
result = await created_grid.import_csv_async(
path=filename,
timeout=ASYNC_JOB_TIMEOUT_SEC,
synapse_client=self.syn,
)

# THEN: The import should complete without error and return the same grid
assert result is created_grid
except Exception:
if os.path.exists(filename):
os.unlink(filename)
raise

async def test_import_csv_async_validation_error_no_session(self) -> None:
# GIVEN: A Grid instance with no session_id
grid = Grid()

# WHEN/THEN: Importing CSV without session_id should raise ValueError
with pytest.raises(
ValueError,
match="session_id is required to import CSV",
):
await grid.import_csv_async(
file_handle_id="12345678", synapse_client=self.syn
)
Loading
Loading