Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapseclient/core/constants/concrete_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
GRID_RECORD_SET_EXPORT_REQUEST = (
"org.sagebionetworks.repo.model.grid.GridRecordSetExportRequest"
)
GRID_CSV_IMPORT_REQUEST = "org.sagebionetworks.repo.model.grid.GridCsvImportRequest"
LIST_GRID_SESSIONS_REQUEST = (
"org.sagebionetworks.repo.model.grid.ListGridSessionsRequest"
)
Expand Down
223 changes: 223 additions & 0 deletions synapseclient/models/curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
from synapseclient.core.constants.concrete_types import (
CREATE_GRID_REQUEST,
FILE_BASED_METADATA_TASK_PROPERTIES,
GRID_CSV_IMPORT_REQUEST,
GRID_RECORD_SET_EXPORT_REQUEST,
LIST_GRID_SESSIONS_REQUEST,
LIST_GRID_SESSIONS_RESPONSE,
RECORD_BASED_METADATA_TASK_PROPERTIES,
)
from synapseclient.core.upload.multipart_upload_async import multipart_upload_file_async
from synapseclient.core.utils import delete_none_keys, merge_dataclass_entities
from synapseclient.models.mixins.asynchronous_job import AsynchronousCommunicator
from synapseclient.models.recordset import ValidationSummary
Expand Down Expand Up @@ -1078,6 +1080,58 @@ def to_synapse_request(self) -> Dict[str, Any]:
return request_dict


@dataclass
class GridCsvImportRequest(AsynchronousCommunicator):
"""
A request to import a CSV file into an active grid session.

Represents a [Synapse GridCsvImportRequest](https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/grid/GridCsvImportRequest.html).

Attributes:
concrete_type: The concrete type for the request
session_id: The grid session ID to import the CSV into
file_handle_id: The file handle ID of the CSV to import
"""

concrete_type: str = GRID_CSV_IMPORT_REQUEST
"""The concrete type for the request"""

session_id: Optional[str] = None
"""The grid session ID to import the CSV into"""

file_handle_id: Optional[str] = None
"""The file handle ID of the CSV to import"""

def fill_from_dict(
self, synapse_response: Union[Dict[str, Any], Any]
) -> "GridCsvImportRequest":
"""
Converts a response from the REST API into this dataclass.

Arguments:
synapse_response: The response from the REST API.

Returns:
The GridCsvImportRequest object.
"""
self.session_id = synapse_response.get("sessionId", None)
return self

def to_synapse_request(self) -> Dict[str, Any]:
"""
Converts this dataclass to a dictionary suitable for a Synapse REST API request.

Returns:
A dictionary representation of this object for API requests.
"""
request_dict = {"concreteType": self.concrete_type}
if self.session_id is not None:
request_dict["sessionId"] = self.session_id
if self.file_handle_id is not None:
request_dict["fileHandleId"] = self.file_handle_id
return request_dict


@dataclass
class GridSession:
"""
Expand Down Expand Up @@ -1373,6 +1427,70 @@ def delete(self, *, synapse_client: Optional[Synapse] = None) -> None:
"""
return None

def import_csv(
self,
file_handle_id: Optional[str] = None,
path: Optional[str] = None,
*,
timeout: int = 120,
synapse_client: Optional[Synapse] = None,
) -> "Grid":
"""
Import a CSV file into the active grid session.

Either `file_handle_id` or `path` must be provided. If `path` is provided,
the file will be uploaded via multipart upload and the resulting file handle
ID will be used.

Arguments:
file_handle_id: The file handle ID of the CSV to import.
Mutually exclusive with `path`.
path: Local path to a CSV file to upload and import.
Mutually exclusive with `file_handle_id`.
timeout: The number of seconds to wait for the job to complete or
progress before raising a SynapseTimeoutError. Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
Grid: This Grid instance.

Raises:
ValueError: If `session_id` is not set.
ValueError: If neither `file_handle_id` nor `path` is provided.
ValueError: If both `file_handle_id` and `path` are provided.

Example: Import a CSV by file handle ID
 

```python
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

grid = Grid(session_id="abc-123-def")
grid = grid.import_csv(file_handle_id="123456")
```

Example: Import a CSV from a local path
 

```python
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

grid = Grid(session_id="abc-123-def")
grid = grid.import_csv(path="/path/to/data.csv")
```
"""
return self

@classmethod
def list(
cls,
Expand Down Expand Up @@ -1838,3 +1956,108 @@ async def main():
await delete_grid_session(
session_id=self.session_id, synapse_client=synapse_client
)

async def import_csv_async(
self,
file_handle_id: Optional[str] = None,
path: Optional[str] = None,
*,
timeout: int = 120,
synapse_client: Optional[Synapse] = None,
) -> "Grid":
"""
Import a CSV file into the active grid session.

Either `file_handle_id` or `path` must be provided. If `path` is provided,
the file will be uploaded via multipart upload and the resulting file handle
ID will be used.

Arguments:
file_handle_id: The file handle ID of the CSV to import.
Mutually exclusive with `path`.
path: Local path to a CSV file to upload and import.
Mutually exclusive with `file_handle_id`.
timeout: The number of seconds to wait for the job to complete or
progress before raising a SynapseTimeoutError. Defaults to 120.
synapse_client: If not passed in and caching was not disabled by
`Synapse.allow_client_caching(False)` this will use the last created
instance from the Synapse class constructor.

Returns:
Grid: This Grid instance.

Raises:
ValueError: If `session_id` is not set.
ValueError: If neither `file_handle_id` nor `path` is provided.
ValueError: If both `file_handle_id` and `path` are provided.

Example: Import a CSV by file handle ID asynchronously
 

```python
import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

async def main():
grid = Grid(session_id="abc-123-def")
grid = await grid.import_csv_async(file_handle_id="123456")

asyncio.run(main())
```

Example: Import a CSV from a local path asynchronously
 

```python
import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

async def main():
grid = Grid(session_id="abc-123-def")
grid = await grid.import_csv_async(path="/path/to/data.csv")

asyncio.run(main())
```
"""
if not self.session_id:
raise ValueError("session_id is required to import a CSV into a Grid")
if file_handle_id is None and path is None:
raise ValueError(
"Either file_handle_id or path must be provided to import a CSV"
)
if file_handle_id is not None and path is not None:
raise ValueError(
"Only one of file_handle_id or path may be provided, not both"
)

trace.get_current_span().set_attributes(
{
"synapse.session_id": self.session_id or "",
}
)

if path is not None:
client = Synapse.get_client(synapse_client=synapse_client)
file_handle_id = await multipart_upload_file_async(
syn=client,
file_path=path,
content_type="text/csv",
)

import_request = GridCsvImportRequest(
session_id=self.session_id,
file_handle_id=file_handle_id,
)
await import_request.send_job_and_wait_async(
timeout=timeout, synapse_client=synapse_client
)

return self
2 changes: 2 additions & 0 deletions synapseclient/models/mixins/asynchronous_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
CREATE_GRID_REQUEST,
CREATE_SCHEMA_REQUEST,
GET_VALIDATION_SCHEMA_REQUEST,
GRID_CSV_IMPORT_REQUEST,
GRID_RECORD_SET_EXPORT_REQUEST,
QUERY_BUNDLE_REQUEST,
QUERY_TABLE_CSV_REQUEST,
Expand All @@ -29,6 +30,7 @@
ASYNC_JOB_URIS = {
AGENT_CHAT_REQUEST: "/agent/chat/async",
CREATE_GRID_REQUEST: "/grid/session/async",
GRID_CSV_IMPORT_REQUEST: "/grid/import/csv/async",
GRID_RECORD_SET_EXPORT_REQUEST: "/grid/export/recordset/async",
TABLE_UPDATE_TRANSACTION_REQUEST: "/entity/{entityId}/table/transaction/async",
GET_VALIDATION_SCHEMA_REQUEST: "/schema/type/validation/async",
Expand Down
87 changes: 87 additions & 0 deletions tests/integration/synapseclient/models/async/test_grid_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,90 @@ async def test_delete_grid_session_validation_error_async(self) -> None:
match="session_id is required to delete a GridSession",
):
await grid.delete_async(synapse_client=self.syn)

async def test_import_csv_with_file_handle_id_async(
self, record_set_fixture: RecordSet
) -> None:
# GIVEN: A grid session and a file handle from an existing record set
grid = Grid(record_set_id=record_set_fixture.id)
created_grid = await grid.create_async(
timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn
)
self.schedule_for_cleanup(
lambda: self.syn.restDELETE(f"/grid/session/{created_grid.session_id}")
)

# WHEN: Importing a CSV using the file handle ID from the record set
result = await created_grid.import_csv_async(
file_handle_id=record_set_fixture.data_file_handle_id,
timeout=ASYNC_JOB_TIMEOUT_SEC,
synapse_client=self.syn,
)

# THEN: The import should succeed and return the same Grid instance
assert result is created_grid
assert result.session_id == created_grid.session_id

async def test_import_csv_with_path_async(
self, record_set_fixture: RecordSet
) -> None:
# GIVEN: A grid session and a local CSV file
grid = Grid(record_set_id=record_set_fixture.id)
created_grid = await grid.create_async(
timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn
)
self.schedule_for_cleanup(
lambda: self.syn.restDELETE(f"/grid/session/{created_grid.session_id}")
)

# Create a temporary CSV file with the same schema as the record set
test_data = pd.DataFrame(
{
"id": [6, 7],
"name": ["Zeta", "Eta"],
"value": [60.1, 70.2],
"category": ["A", "B"],
"active": [True, False],
}
)
temp_fd, filename = tempfile.mkstemp(suffix=".csv")
try:
os.close(temp_fd)
test_data.to_csv(filename, index=False)
self.schedule_for_cleanup(filename)

# WHEN: Importing the CSV from a local path
result = await created_grid.import_csv_async(
path=filename,
timeout=ASYNC_JOB_TIMEOUT_SEC,
synapse_client=self.syn,
)

# THEN: The import should succeed and return the same Grid instance
assert result is created_grid
assert result.session_id == created_grid.session_id
except Exception:
if os.path.exists(filename):
os.unlink(filename)
raise

async def test_import_csv_validation_errors_async(self) -> None:
# GIVEN: A Grid instance with no session_id
grid = Grid()

# WHEN/THEN: Calling import_csv_async without session_id raises ValueError
with pytest.raises(
ValueError,
match="session_id is required",
):
await grid.import_csv_async(file_handle_id="123", synapse_client=self.syn)

# GIVEN: A Grid with session_id but no file source
grid_with_session = Grid(session_id="some-session-id")

# WHEN/THEN: Calling without file_handle_id or path raises ValueError
with pytest.raises(
ValueError,
match="Either file_handle_id or path",
):
await grid_with_session.import_csv_async(synapse_client=self.syn)
Loading
Loading