From b89bb6aa997bc02f46d958db11e06b4973b92044 Mon Sep 17 00:00:00 2001 From: Sirshendu Ganguly Date: Sat, 7 Mar 2026 21:57:47 -0500 Subject: [PATCH 1/6] Added streaming support --- requirements.txt | 1 + runware/base.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++-- runware/types.py | 11 +++++++++ runware/utils.py | 17 +++++++++++++ 4 files changed, 91 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 611060a..f202503 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ aiofiles==23.2.1 +httpx>=0.27.0 python-dotenv==1.0.1 websockets>=12.0 \ No newline at end of file diff --git a/runware/base.py b/runware/base.py index ef819d6..1453ac3 100644 --- a/runware/base.py +++ b/runware/base.py @@ -1,13 +1,15 @@ import asyncio import inspect +import json import logging import os import re from asyncio import gather from dataclasses import asdict from random import uniform -from typing import List, Optional, Union, Callable, Any, Dict, Tuple +from typing import List, Optional, Union, Callable, Any, Dict, Tuple, AsyncIterator +import httpx from websockets.protocol import State from .logging_config import configure_logging @@ -58,11 +60,14 @@ IUploadMediaRequest, ITextInference, IText, + ITextInferenceUsage, + ITextStreamChunk, ) from .types import IImage, IError, SdkType, ListenerType from .utils import ( BASE_RUNWARE_URLS, getUUID, + get_http_url_from_ws_url, fileToBase64, createImageFromResponse, createImageToTextFromResponse, @@ -2028,7 +2033,12 @@ async def _inference3d(self, request3d: I3dInference) -> Union[List[I3d], IAsync await self.ensureConnection() return await self._request3d(request3d) - async def textInference(self, requestText: ITextInference) -> Union[List[IText], IAsyncTaskResponse]: + async def textInference( + self, requestText: ITextInference + ) -> Union[List[IText], IAsyncTaskResponse, AsyncIterator[Union[str, ITextStreamChunk]]]: + delivery = getattr(requestText, "deliveryMethod", "sync") + if isinstance(delivery, str) and delivery.lower() == "stream": + return self._requestTextStream(requestText) async with self._request_semaphore: return await self._retry_async_with_reconnect( self._requestText, @@ -2253,6 +2263,56 @@ def _buildTextRequest(self, requestText: ITextInference) -> Dict[str, Any]: self._addTextProviderSettings(request_object, requestText) return request_object + async def _requestTextStream( + self, requestText: ITextInference + ) -> AsyncIterator[Union[str, ITextStreamChunk]]: + """Stream text inference via HTTP SSE. Yields content (str), then one ITextStreamChunk with cost/finishReason.""" + requestText.taskUUID = requestText.taskUUID or getUUID() + request_object = self._buildTextRequest(requestText) + body = [request_object] + http_url = get_http_url_from_ws_url(self._url or "") + headers = { + "Accept": "text/event-stream", + "Authorization": f"Bearer {self._apiKey}", + "Content-Type": "application/json", + } + async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: + async with client.stream( + "POST", + http_url, + json=body, + headers=headers, + ) as response: + response.raise_for_status() + async for line in response.aiter_lines(): + line = line.strip().strip("\r") + if not line or not line.startswith("data:"): + continue + _, _, payload = line.partition("data:") + payload = payload.strip() + if not payload: + continue + try: + obj = json.loads(payload) + except json.JSONDecodeError: + continue + choices = obj.get("choices") or [] + if choices and isinstance(choices[0], dict): + delta = choices[0].get("delta") or {} + content = delta.get("content") or delta.get("text") or "" + if content: + yield content + if choices and choices[0].get("finish_reason") is not None: + usage_data = obj.get("usage") + usage = instantiateDataclass(ITextInferenceUsage, usage_data) if usage_data else None + yield ITextStreamChunk( + cost=obj.get("cost"), + finishReason=choices[0].get("finish_reason"), + usage=usage, + taskUUID=obj.get("taskUUID"), + ) + return + async def _requestText(self, requestText: ITextInference) -> Union[List[IText], IAsyncTaskResponse]: await self.ensureConnection() requestText.taskUUID = requestText.taskUUID or getUUID() diff --git a/runware/types.py b/runware/types.py index 7d32aa8..964a8ff 100644 --- a/runware/types.py +++ b/runware/types.py @@ -106,6 +106,7 @@ class EOpenPosePreProcessor(Enum): class EDeliveryMethod(Enum): SYNC = "sync" ASYNC = "async" + STREAM = "stream" class OperationState(Enum): """State machine for pending operations.""" @@ -1667,6 +1668,16 @@ class ITextInference: webhookURL: Optional[str] = None +@dataclass +class ITextStreamChunk: + """One chunk of a streaming text inference response (SSE).""" + content: Optional[str] = None + finishReason: Optional[str] = None + usage: Optional[ITextInferenceUsage] = None + cost: Optional[float] = None + taskUUID: Optional[str] = None + + @dataclass class IText: taskType: str diff --git a/runware/utils.py b/runware/utils.py index f590ffe..dc4ed3e 100644 --- a/runware/utils.py +++ b/runware/utils.py @@ -42,6 +42,23 @@ Environment.TEST: "ws://localhost:8080", } +# HTTP REST base URL for streaming (e.g. textInference with deliveryMethod=stream) +BASE_RUNWARE_HTTP_URLS = { + Environment.PRODUCTION: "https://api.runware.ai/v1", + Environment.TEST: "http://localhost:8080", +} + + +def get_http_url_from_ws_url(ws_url: str) -> str: + """Derive HTTP API URL from WebSocket URL for streaming requests.""" + if not ws_url: + return BASE_RUNWARE_HTTP_URLS[Environment.PRODUCTION] + if "ws-api.runware.ai" in ws_url: + return "https://api.runware.ai/v1" + if "localhost" in ws_url or "127.0.0.1" in ws_url: + return ws_url.replace("wss://", "https://", 1).replace("ws://", "http://", 1) + return ws_url.replace("wss://", "https://", 1).replace("ws://", "http://", 1) + RETRY_SDK_COUNTS = { "GLOBAL": 2, From 270d439d33a8744268044cb984d8a520048f996b Mon Sep 17 00:00:00 2001 From: Sirshendu Ganguly Date: Sat, 7 Mar 2026 22:40:51 -0500 Subject: [PATCH 2/6] Removed redundant class types --- runware/base.py | 22 +++++++++++++--------- runware/types.py | 10 ---------- runware/utils.py | 14 ++++++++------ 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/runware/base.py b/runware/base.py index 1453ac3..8c20433 100644 --- a/runware/base.py +++ b/runware/base.py @@ -61,7 +61,6 @@ ITextInference, IText, ITextInferenceUsage, - ITextStreamChunk, ) from .types import IImage, IError, SdkType, ListenerType from .utils import ( @@ -2035,9 +2034,13 @@ async def _inference3d(self, request3d: I3dInference) -> Union[List[I3d], IAsync async def textInference( self, requestText: ITextInference - ) -> Union[List[IText], IAsyncTaskResponse, AsyncIterator[Union[str, ITextStreamChunk]]]: - delivery = getattr(requestText, "deliveryMethod", "sync") - if isinstance(delivery, str) and delivery.lower() == "stream": + ) -> Union[List[IText], IAsyncTaskResponse, AsyncIterator[Union[str, IText]]]: + delivery_method_enum = ( + requestText.deliveryMethod + if isinstance(requestText.deliveryMethod, EDeliveryMethod) + else EDeliveryMethod(requestText.deliveryMethod) + ) + if delivery_method_enum == EDeliveryMethod.STREAM: return self._requestTextStream(requestText) async with self._request_semaphore: return await self._retry_async_with_reconnect( @@ -2265,8 +2268,8 @@ def _buildTextRequest(self, requestText: ITextInference) -> Dict[str, Any]: async def _requestTextStream( self, requestText: ITextInference - ) -> AsyncIterator[Union[str, ITextStreamChunk]]: - """Stream text inference via HTTP SSE. Yields content (str), then one ITextStreamChunk with cost/finishReason.""" + ) -> AsyncIterator[Union[str, IText]]: + requestText.taskUUID = requestText.taskUUID or getUUID() request_object = self._buildTextRequest(requestText) body = [request_object] @@ -2305,11 +2308,12 @@ async def _requestTextStream( if choices and choices[0].get("finish_reason") is not None: usage_data = obj.get("usage") usage = instantiateDataclass(ITextInferenceUsage, usage_data) if usage_data else None - yield ITextStreamChunk( - cost=obj.get("cost"), + yield IText( + taskType=ETaskType.TEXT_INFERENCE.value, + taskUUID=obj.get("taskUUID") or "", finishReason=choices[0].get("finish_reason"), usage=usage, - taskUUID=obj.get("taskUUID"), + cost=obj.get("cost"), ) return diff --git a/runware/types.py b/runware/types.py index 964a8ff..c19c54c 100644 --- a/runware/types.py +++ b/runware/types.py @@ -1668,16 +1668,6 @@ class ITextInference: webhookURL: Optional[str] = None -@dataclass -class ITextStreamChunk: - """One chunk of a streaming text inference response (SSE).""" - content: Optional[str] = None - finishReason: Optional[str] = None - usage: Optional[ITextInferenceUsage] = None - cost: Optional[float] = None - taskUUID: Optional[str] = None - - @dataclass class IText: taskType: str diff --git a/runware/utils.py b/runware/utils.py index dc4ed3e..d798e6e 100644 --- a/runware/utils.py +++ b/runware/utils.py @@ -48,16 +48,18 @@ Environment.TEST: "http://localhost:8080", } +# Map each WebSocket base URL to its HTTP counterpart (for streaming requests). +_WS_TO_HTTP = { + BASE_RUNWARE_URLS[Environment.PRODUCTION]: BASE_RUNWARE_HTTP_URLS[Environment.PRODUCTION], + BASE_RUNWARE_URLS[Environment.TEST]: BASE_RUNWARE_HTTP_URLS[Environment.TEST], +} + def get_http_url_from_ws_url(ws_url: str) -> str: - """Derive HTTP API URL from WebSocket URL for streaming requests.""" + """Return the HTTP URL for this ws_url from _WS_TO_HTTP.""" if not ws_url: return BASE_RUNWARE_HTTP_URLS[Environment.PRODUCTION] - if "ws-api.runware.ai" in ws_url: - return "https://api.runware.ai/v1" - if "localhost" in ws_url or "127.0.0.1" in ws_url: - return ws_url.replace("wss://", "https://", 1).replace("ws://", "http://", 1) - return ws_url.replace("wss://", "https://", 1).replace("ws://", "http://", 1) + return _WS_TO_HTTP.get(ws_url, BASE_RUNWARE_HTTP_URLS[Environment.PRODUCTION]) RETRY_SDK_COUNTS = { From 319ade7df6edc07a4f5a8de9693d088c2be0d261 Mon Sep 17 00:00:00 2001 From: Sirshendu Ganguly Date: Sat, 7 Mar 2026 23:22:20 -0500 Subject: [PATCH 3/6] Simplified logic --- runware/base.py | 76 +++++++++++++++++++++++------------------------- runware/utils.py | 8 +++++ 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/runware/base.py b/runware/base.py index 8c20433..44fd415 100644 --- a/runware/base.py +++ b/runware/base.py @@ -85,6 +85,7 @@ createAsyncTaskResponse, VIDEO_INITIAL_TIMEOUT, TEXT_INITIAL_TIMEOUT, + TEXT_STREAM_READ_TIMEOUT, VIDEO_POLLING_DELAY, WEBHOOK_TIMEOUT, IMAGE_INFERENCE_TIMEOUT, @@ -2041,7 +2042,11 @@ async def textInference( else EDeliveryMethod(requestText.deliveryMethod) ) if delivery_method_enum == EDeliveryMethod.STREAM: - return self._requestTextStream(requestText) + async def stream_with_semaphore() -> AsyncIterator[Union[str, IText]]: + async with self._request_semaphore: + async for chunk in self._requestTextStream(requestText): + yield chunk + return stream_with_semaphore() async with self._request_semaphore: return await self._retry_async_with_reconnect( self._requestText, @@ -2269,7 +2274,6 @@ def _buildTextRequest(self, requestText: ITextInference) -> Dict[str, Any]: async def _requestTextStream( self, requestText: ITextInference ) -> AsyncIterator[Union[str, IText]]: - requestText.taskUUID = requestText.taskUUID or getUUID() request_object = self._buildTextRequest(requestText) body = [request_object] @@ -2279,43 +2283,37 @@ async def _requestTextStream( "Authorization": f"Bearer {self._apiKey}", "Content-Type": "application/json", } - async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: - async with client.stream( - "POST", - http_url, - json=body, - headers=headers, - ) as response: - response.raise_for_status() - async for line in response.aiter_lines(): - line = line.strip().strip("\r") - if not line or not line.startswith("data:"): - continue - _, _, payload = line.partition("data:") - payload = payload.strip() - if not payload: - continue - try: - obj = json.loads(payload) - except json.JSONDecodeError: - continue - choices = obj.get("choices") or [] - if choices and isinstance(choices[0], dict): - delta = choices[0].get("delta") or {} - content = delta.get("content") or delta.get("text") or "" - if content: - yield content - if choices and choices[0].get("finish_reason") is not None: - usage_data = obj.get("usage") - usage = instantiateDataclass(ITextInferenceUsage, usage_data) if usage_data else None - yield IText( - taskType=ETaskType.TEXT_INFERENCE.value, - taskUUID=obj.get("taskUUID") or "", - finishReason=choices[0].get("finish_reason"), - usage=usage, - cost=obj.get("cost"), - ) - return + try: + async with httpx.AsyncClient(timeout=TEXT_STREAM_READ_TIMEOUT / 1000) as client: + async with client.stream( + "POST", + http_url, + json=body, + headers=headers, + ) as response: + response.raise_for_status() + async for line in response.aiter_lines(): + try: + line = json.loads(line.replace("data:", "", 1)) + except json.JSONDecodeError: + continue + data = line.get("data") or line + choice = (data.get("choices") or [{}])[0] + delta = choice.get("delta") or {} + if delta.get("content"): + yield delta.get("content") + if choice.get("finish_reason") is not None: + usage = instantiateDataclass(ITextInferenceUsage, data.get("usage")) + yield IText( + taskType=ETaskType.TEXT_INFERENCE.value, + taskUUID=data.get("taskUUID") or "", + finishReason=choice.get("finish_reason"), + usage=usage, + cost=data.get("cost"), + ) + return + except Exception as e: + raise RunwareAPIError({"message": str(e)}) async def _requestText(self, requestText: ITextInference) -> Union[List[IText], IAsyncTaskResponse]: await self.ensureConnection() diff --git a/runware/utils.py b/runware/utils.py index d798e6e..212a0f7 100644 --- a/runware/utils.py +++ b/runware/utils.py @@ -144,6 +144,14 @@ def get_http_url_from_ws_url(ws_url: str) -> str: 30000 )) +# Text streaming read timeout (milliseconds) +# Maximum time to wait for data on the SSE stream; long to avoid ReadTimeout mid-stream +# Used in: _requestTextStream() for deliveryMethod=stream +TEXT_STREAM_READ_TIMEOUT = int(os.environ.get( + "RUNWARE_TEXT_STREAM_TIMEOUT", + 600000 +)) + # Audio generation timeout (milliseconds) # Maximum time to wait for audio generation completion # Used in: _waitForAudioCompletion() for single audio generation From 24c4f18ce585d82f71789d3021476c0edad64a2c Mon Sep 17 00:00:00 2001 From: Sirshendu Ganguly Date: Sat, 7 Mar 2026 23:39:52 -0500 Subject: [PATCH 4/6] Added RUNWARE_TEXT_STREAM_TIMEOUT to readme.md --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index fa5faf3..d631e5e 100644 --- a/README.md +++ b/README.md @@ -1106,6 +1106,9 @@ RUNWARE_AUDIO_INFERENCE_TIMEOUT=300000 # Audio generation (default: 5 min) RUNWARE_AUDIO_POLLING_DELAY=1000 # Delay between status checks (default: 1 sec) RUNWARE_MAX_POLLS_AUDIO_GENERATION=240 # Max polling attempts for audio inference (default: 240, ~4 min total) +# Text Operations (milliseconds) +RUNWARE_TEXT_STREAM_TIMEOUT=600000 # Text inference streaming (SSE) read timeout (default: 10 min) + # Other Operations (milliseconds) RUNWARE_PROMPT_ENHANCE_TIMEOUT=60000 # Prompt enhancement (default: 1 min) RUNWARE_WEBHOOK_TIMEOUT=30000 # Webhook acknowledgment (default: 30 sec) From b418a61cb6245ae06b192942cb425d44ecaafef1 Mon Sep 17 00:00:00 2001 From: Sirshendu Ganguly Date: Sat, 7 Mar 2026 23:44:54 -0500 Subject: [PATCH 5/6] Fixed readme with usage --- README.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/README.md b/README.md index d631e5e..75cc51b 100644 --- a/README.md +++ b/README.md @@ -870,6 +870,37 @@ The `IAudioInference` class supports the following parameters: - `duration`: Duration of the generated audio in seconds - `includeCost`: Whether to include cost information in the response +### Text inference streaming + +To stream text inference (e.g. LLM chat) over HTTP SSE, set `deliveryMethod="stream"`. The SDK yields content chunks (strings) and a final `IText` with usage and cost: + +```python +import asyncio +from runware import Runware, ITextInference, ITextInferenceMessage + +async def main() -> None: + runware = Runware(api_key=RUNWARE_API_KEY) + await runware.connect() + + request = ITextInference( + model="runware:qwen3-thinking@1", + messages=[ITextInferenceMessage(role="user", content="Explain photosynthesis in one sentence.")], + deliveryMethod="stream", + includeCost=True, + ) + + stream = await runware.textInference(request) + async for chunk in stream: + if isinstance(chunk, str): + print(chunk, end="", flush=True) + else: + print(chunk) + +asyncio.run(main()) +``` + +Streaming uses the same concurrency limit as other requests (`RUNWARE_MAX_CONCURRENT_REQUESTS`). To allow longer streams, set `RUNWARE_TEXT_STREAM_TIMEOUT` (milliseconds; default 600000). + ### Model Upload To upload model using the Runware API, you can use the `uploadModel` method of the `Runware` class. Here are examples: From 744ea79c537de6a48ac5cc187bfdf49ec4adde31 Mon Sep 17 00:00:00 2001 From: Sirshendu Ganguly Date: Mon, 9 Mar 2026 20:08:48 -0400 Subject: [PATCH 6/6] Fixed error message --- runware/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runware/base.py b/runware/base.py index 44fd415..563f8cd 100644 --- a/runware/base.py +++ b/runware/base.py @@ -2298,6 +2298,8 @@ async def _requestTextStream( except json.JSONDecodeError: continue data = line.get("data") or line + if data.get("error") is not None: + raise RunwareAPIError(data["error"]) choice = (data.get("choices") or [{}])[0] delta = choice.get("delta") or {} if delta.get("content"):