diff --git a/packages/uipath-platform/pyproject.toml b/packages/uipath-platform/pyproject.toml index b9180b83c..39b933efa 100644 --- a/packages/uipath-platform/pyproject.toml +++ b/packages/uipath-platform/pyproject.toml @@ -108,7 +108,7 @@ show_missing = true source = ["src"] [tool.uv.sources] -uipath-core = { path = "../uipath-core", editable = true } +uipath-core = { path = "../uipath-core", editable = false } [[tool.uv.index]] name = "testpypi" diff --git a/packages/uipath-platform/src/uipath/platform/_uipath.py b/packages/uipath-platform/src/uipath/platform/_uipath.py index 2143e5dd6..1665e2235 100644 --- a/packages/uipath-platform/src/uipath/platform/_uipath.py +++ b/packages/uipath-platform/src/uipath/platform/_uipath.py @@ -32,6 +32,7 @@ QueuesService, ) from .resource_catalog import ResourceCatalogService +from .studio_web import StudioWebService def _has_valid_client_credentials( @@ -164,3 +165,7 @@ def agenthub(self) -> AgentHubService: @property def automation_tracker(self) -> AutomationTrackerService: return AutomationTrackerService(self._config, self._execution_context) + + @property + def studio_web(self) -> StudioWebService: + return StudioWebService(self._config, self._execution_context) diff --git a/packages/uipath-platform/src/uipath/platform/auth/__init__.py b/packages/uipath-platform/src/uipath/platform/auth/__init__.py new file mode 100644 index 000000000..f0ba12b68 --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/auth/__init__.py @@ -0,0 +1,41 @@ +"""UiPath Auth package. + +Provides reusable authentication building blocks: token acquisition, +token management, portal API calls, OIDC configuration, and URL utilities. +""" + +from ._auth_service import AuthService +from ._errors import AuthenticationError +from ._models import ( + AccessTokenData, + AuthConfig, + AuthorizationRequest, + OrganizationInfo, + TenantInfo, + TenantsAndOrganizationInfoResponse, +) +from ._url_utils import build_service_url, extract_org_tenant, resolve_domain +from ._utils import ( + get_auth_data, + get_parsed_token_data, + parse_access_token, + update_auth_file, +) + +__all__ = [ + "AuthService", + "AuthenticationError", + "AuthConfig", + "AuthorizationRequest", + "AccessTokenData", + "TenantInfo", + "OrganizationInfo", + "TenantsAndOrganizationInfoResponse", + "build_service_url", + "extract_org_tenant", + "resolve_domain", + "get_auth_data", + "get_parsed_token_data", + "parse_access_token", + "update_auth_file", +] diff --git a/packages/uipath-platform/src/uipath/platform/auth/_auth_service.py b/packages/uipath-platform/src/uipath/platform/auth/_auth_service.py new file mode 100644 index 000000000..353885ecd --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/auth/_auth_service.py @@ -0,0 +1,321 @@ +import json +import logging +import os +import time +from functools import cached_property +from urllib.parse import urlencode + +import httpx + +from uipath.platform.common._http_config import get_httpx_client_kwargs +from uipath.platform.common.auth import TokenData + +from ._errors import AuthenticationError +from ._models import ( + AuthConfig, + AuthorizationRequest, + TenantsAndOrganizationInfoResponse, +) +from ._oidc_utils import ( + _select_config_file, + generate_code_verifier_and_challenge, + get_state_param, +) +from ._url_utils import build_service_url +from ._utils import parse_access_token + +_logger = logging.getLogger(__name__) + + +class AuthService: + """Service for UiPath OAuth2 authentication and portal API operations. + + Provides the full OAuth2 Authorization Code + PKCE flow for obtaining + user tokens, as well as token refresh and tenant/organization discovery. + + This is a standalone service that does not inherit from ``BaseService`` + because it operates before an access token is available (i.e., it is used + to *obtain* the token that other services require). + + Args: + domain: The UiPath domain (e.g., ``https://cloud.uipath.com``). + + Examples: + **Obtain a user token using the OAuth2 PKCE flow:** + + ```python + import asyncio + from uipath.platform.auth import AuthService + + auth = AuthService("https://cloud.uipath.com") + + # 1. Build the authorization URL + redirect_uri = "http://localhost:8104/oidc/login" + auth_request = auth.get_authorization_url(redirect_uri) + print(f"Open this URL in your browser: {auth_request.url}") + + # 2. After user authorizes, exchange the code for tokens + token_data = asyncio.run( + auth.exchange_authorization_code( + code="", + code_verifier=auth_request.code_verifier, + redirect_uri=redirect_uri, + ) + ) + print(f"Access token: {token_data.access_token}") + ``` + + **Refresh an expired token:** + + ```python + import asyncio + from uipath.platform.auth import AuthService + + auth = AuthService("https://cloud.uipath.com") + + # ensure_valid_token returns the same token if still valid, + # or refreshes it automatically if expired + refreshed = asyncio.run(auth.ensure_valid_token(token_data)) + ``` + + **Discover available tenants:** + + ```python + import asyncio + from uipath.platform.auth import AuthService + + auth = AuthService("https://cloud.uipath.com") + info = asyncio.run( + auth.get_tenants_and_organizations(token_data.access_token) + ) + for tenant in info["tenants"]: + print(f"{tenant['name']} ({tenant['id']})") + ``` + """ + + def __init__(self, domain: str): + self.domain = domain + + @cached_property + def auth_config(self) -> AuthConfig: + """Get the OIDC auth configuration for this domain. + + The configuration is automatically selected based on the domain + and the server version (cloud vs. on-premise 25.10). + The result is cached after the first access. + + Returns: + AuthConfig with client_id and scope. + """ + config_file = _select_config_file(self.domain) + config_path = os.path.join(os.path.dirname(__file__), config_file) + with open(config_path, "r") as f: + raw = json.load(f) + return AuthConfig(client_id=raw["client_id"], scope=raw["scope"]) + + def get_authorization_url(self, redirect_uri: str) -> AuthorizationRequest: + """Build the authorization URL for the OAuth2 PKCE flow. + + Generates a PKCE code verifier/challenge pair and a random state + parameter, then constructs the full authorization URL. + + Args: + redirect_uri: The redirect URI for the OAuth callback + (e.g., ``http://localhost:8104/oidc/login``). + + Returns: + AuthorizationRequest containing the authorization URL, + the code verifier (needed for token exchange), and the state. + + Examples: + ```python + from uipath.platform.auth import AuthService + + auth = AuthService("https://cloud.uipath.com") + request = auth.get_authorization_url("http://localhost:8104/oidc/login") + + # Open request.url in the browser + # After redirect, use request.code_verifier to exchange the code + ``` + """ + code_verifier, code_challenge = generate_code_verifier_and_challenge() + state = get_state_param() + query_params = { + "client_id": self.auth_config.client_id, + "redirect_uri": redirect_uri, + "response_type": "code", + "scope": self.auth_config.scope, + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + } + url = build_service_url( + self.domain, f"/identity_/connect/authorize?{urlencode(query_params)}" + ) + return AuthorizationRequest(url=url, code_verifier=code_verifier, state=state) + + async def exchange_authorization_code( + self, code: str, code_verifier: str, redirect_uri: str + ) -> TokenData: + """Exchange an authorization code for tokens (PKCE flow). + + Args: + code: The authorization code received from the OAuth callback. + code_verifier: The PKCE code verifier from ``get_authorization_url``. + redirect_uri: The redirect URI (must match the one used in the auth URL). + + Returns: + TokenData with access_token, refresh_token, expires_in, etc. + + Raises: + AuthenticationError: If the token exchange fails. + + Examples: + ```python + import asyncio + from uipath.platform.auth import AuthService + + auth = AuthService("https://cloud.uipath.com") + token_data = asyncio.run( + auth.exchange_authorization_code( + code="abc123", + code_verifier=auth_request.code_verifier, + redirect_uri="http://localhost:8104/oidc/login", + ) + ) + ``` + """ + url = build_service_url(self.domain, "/identity_/connect/token") + data = { + "grant_type": "authorization_code", + "code": code, + "code_verifier": code_verifier, + "redirect_uri": redirect_uri, + "client_id": self.auth_config.client_id, + } + headers = {"Content-Type": "application/x-www-form-urlencoded"} + async with httpx.AsyncClient(**get_httpx_client_kwargs()) as client: + response = await client.post(url, data=data, headers=headers) + + if response.status_code >= 400: + raise AuthenticationError( + f"Failed to exchange authorization code: {response.status_code}" + ) + + return TokenData.model_validate(response.json()) + + async def ensure_valid_token(self, token_data: TokenData) -> TokenData: + """Check if the token is still valid; refresh it if expired. + + Parses the JWT ``exp`` claim from the access token. If the token + is still valid, returns it as-is. If expired, uses the refresh + token to obtain a new one. + + Args: + token_data: The current token data to validate. + + Returns: + The same TokenData if still valid, or a freshly refreshed one. + + Raises: + AuthenticationError: If no refresh token is available or the + refresh request fails. + + Examples: + ```python + import asyncio + from uipath.platform.auth import AuthService, get_auth_data + + auth = AuthService("https://cloud.uipath.com") + current_token = get_auth_data() + valid_token = asyncio.run(auth.ensure_valid_token(current_token)) + ``` + """ + claims = parse_access_token(token_data.access_token) + exp = claims.get("exp") + + if exp is not None and float(exp) > time.time(): + return token_data + + if not token_data.refresh_token: + raise AuthenticationError("No refresh token found. Please re-authenticate.") + + return await self._refresh_access_token(token_data.refresh_token) + + async def get_tenants_and_organizations( + self, access_token: str + ) -> TenantsAndOrganizationInfoResponse: + """Get available tenants and organization info for the authenticated user. + + Args: + access_token: A valid access token. + + Returns: + Response containing a list of tenants and the organization info. + + Raises: + AuthenticationError: If the access token is invalid or the + request fails. + + Examples: + ```python + import asyncio + from uipath.platform.auth import AuthService + + auth = AuthService("https://cloud.uipath.com") + info = asyncio.run( + auth.get_tenants_and_organizations(token_data.access_token) + ) + org = info["organization"] + print(f"Organization: {org['name']}") + for tenant in info["tenants"]: + print(f" Tenant: {tenant['name']} ({tenant['id']})") + ``` + """ + claims = parse_access_token(access_token) + prt_id = claims.get("prt_id") + + url = build_service_url( + self.domain, + f"/{prt_id}/portal_/api/filtering/leftnav/tenantsAndOrganizationInfo", + ) + async with httpx.AsyncClient(**get_httpx_client_kwargs()) as client: + response = await client.get( + url, headers={"Authorization": f"Bearer {access_token}"} + ) + + if response.status_code == 401: + raise AuthenticationError( + "Unauthorized: access token is invalid or expired." + ) + + if response.status_code >= 400: + raise AuthenticationError( + f"Failed to get tenants and organizations: {response.status_code} {response.text}" + ) + + return response.json() + + async def _refresh_access_token(self, refresh_token: str) -> TokenData: + """Refresh an access token using a refresh token.""" + url = build_service_url(self.domain, "/identity_/connect/token") + data = { + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": self.auth_config.client_id, + } + headers = {"Content-Type": "application/x-www-form-urlencoded"} + async with httpx.AsyncClient(**get_httpx_client_kwargs()) as client: + response = await client.post(url, data=data, headers=headers) + + if response.status_code == 401: + raise AuthenticationError( + "Unauthorized: refresh token is invalid or expired." + ) + + if response.status_code >= 400: + raise AuthenticationError( + f"Failed to refresh token: {response.status_code}" + ) + + return TokenData.model_validate(response.json()) diff --git a/packages/uipath-platform/src/uipath/platform/auth/_errors.py b/packages/uipath-platform/src/uipath/platform/auth/_errors.py new file mode 100644 index 000000000..44cf111a8 --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/auth/_errors.py @@ -0,0 +1,4 @@ +class AuthenticationError(Exception): + """Raised when authentication fails or token operations cannot be completed.""" + + pass diff --git a/packages/uipath/src/uipath/_cli/_auth/_models.py b/packages/uipath-platform/src/uipath/platform/auth/_models.py similarity index 71% rename from packages/uipath/src/uipath/_cli/_auth/_models.py rename to packages/uipath-platform/src/uipath/platform/auth/_models.py index 4d71a2c5e..ff6ea5f44 100644 --- a/packages/uipath/src/uipath/_cli/_auth/_models.py +++ b/packages/uipath-platform/src/uipath/platform/auth/_models.py @@ -1,15 +1,23 @@ from typing import TypedDict +from pydantic import BaseModel -class AuthConfig(TypedDict): - """TypedDict for auth_config.json structure.""" + +class AuthConfig(BaseModel): + """OIDC auth configuration.""" client_id: str - port: int - redirect_uri: str scope: str +class AuthorizationRequest(BaseModel): + """Result of building an OAuth2 PKCE authorization URL.""" + + url: str + code_verifier: str + state: str + + class AccessTokenData(TypedDict): """TypedDict for access token data structure.""" diff --git a/packages/uipath-platform/src/uipath/platform/auth/_oidc_utils.py b/packages/uipath-platform/src/uipath/platform/auth/_oidc_utils.py new file mode 100644 index 000000000..09721223b --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/auth/_oidc_utils.py @@ -0,0 +1,69 @@ +import base64 +import hashlib +import os +from urllib.parse import urlparse + +import httpx + +from uipath.platform.common._http_config import get_httpx_client_kwargs + +from ._url_utils import build_service_url + + +def generate_code_verifier_and_challenge() -> tuple[str, str]: + """Generate PKCE code verifier and challenge.""" + code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8").rstrip("=") + + code_challenge_bytes = hashlib.sha256(code_verifier.encode("utf-8")).digest() + code_challenge = ( + base64.urlsafe_b64encode(code_challenge_bytes).decode("utf-8").rstrip("=") + ) + + return code_verifier, code_challenge + + +def get_state_param() -> str: + return base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8").rstrip("=") + + +def _get_version_from_api(domain: str) -> str | None: + """Fetch the version from the UiPath orchestrator API.""" + try: + version_url = build_service_url(domain, "/orchestrator_/api/status/version") + client_kwargs = get_httpx_client_kwargs() + client_kwargs["timeout"] = 5.0 + + with httpx.Client(**client_kwargs) as client: + response = client.get(version_url) + response.raise_for_status() + data = response.json() + return data.get("version") + except Exception: + return None + + +def _is_cloud_domain(domain: str) -> bool: + """Check if the domain is a cloud domain (alpha, staging, or cloud.uipath.com).""" + parsed = urlparse(domain) + netloc = parsed.netloc.lower() + return netloc in [ + "alpha.uipath.com", + "staging.uipath.com", + "cloud.uipath.com", + ] + + +def _select_config_file(domain: str) -> str: + """Select the appropriate auth config file based on domain and version.""" + if _is_cloud_domain(domain): + return "auth_config_cloud.json" + + version = _get_version_from_api(domain) + + if version is None: + return "auth_config_cloud.json" + + if version.startswith("25.10"): + return "auth_config_25_10.json" + + return "auth_config_cloud.json" diff --git a/packages/uipath/src/uipath/_cli/_auth/_url_utils.py b/packages/uipath-platform/src/uipath/platform/auth/_url_utils.py similarity index 78% rename from packages/uipath/src/uipath/_cli/_auth/_url_utils.py rename to packages/uipath-platform/src/uipath/platform/auth/_url_utils.py index 844968762..7bb7039d3 100644 --- a/packages/uipath/src/uipath/_cli/_auth/_url_utils.py +++ b/packages/uipath-platform/src/uipath/platform/auth/_url_utils.py @@ -2,10 +2,6 @@ from typing import Tuple from urllib.parse import urlparse -from .._utils._console import ConsoleLogger - -console = ConsoleLogger() - def resolve_domain(base_url: str | None, environment: str | None) -> str: """Resolve the UiPath domain, giving priority to base_url when valid. @@ -13,12 +9,13 @@ def resolve_domain(base_url: str | None, environment: str | None) -> str: Args: base_url: The base URL explicitly provided. environment: The environment name (e.g., 'alpha', 'staging', 'cloud'). - force: Whether to ignore UIPATH_URL from environment variables when base_url is set. Returns: A valid base URL for UiPath services. + + Raises: + ValueError: If UIPATH_URL env var is set but malformed. """ - # If base_url is a real URL, prefer it if base_url and base_url.startswith("http"): parsed = urlparse(base_url) domain = f"{parsed.scheme}://{parsed.netloc}" @@ -30,15 +27,13 @@ def resolve_domain(base_url: str | None, environment: str | None) -> str: if uipath_url: parsed = urlparse(uipath_url) if parsed.scheme and parsed.netloc: - domain = f"{parsed.scheme}://{parsed.netloc}" - return domain + return f"{parsed.scheme}://{parsed.netloc}" else: - console.error( + raise ValueError( f"Malformed UIPATH_URL: '{uipath_url}'. " "Please ensure it includes scheme and netloc (e.g., 'https://cloud.uipath.com')." ) - # Otherwise, fall back to environment return f"https://{environment or 'cloud'}.uipath.com" @@ -61,15 +56,12 @@ def extract_org_tenant(uipath_url: str) -> Tuple[str | None, str | None]: Accepts values like: - https://cloud.uipath.com/myOrg/myTenant - https://alpha.uipath.com/myOrg/myTenant/anything_else - - cloud.uipath.com/myOrg/myTenant (scheme will be assumed https) Args: uipath_url: The UiPath URL to parse Returns: - A tuple of (organization, tenant) where: - - organization: 'myOrg' or None - - tenant: 'myTenant' or None + A tuple of (organization, tenant) where either can be None. Example: >>> extract_org_tenant('https://cloud.uipath.com/myOrg/myTenant') diff --git a/packages/uipath-platform/src/uipath/platform/auth/_utils.py b/packages/uipath-platform/src/uipath/platform/auth/_utils.py new file mode 100644 index 000000000..169228831 --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/auth/_utils.py @@ -0,0 +1,41 @@ +import base64 +import json +import os +from pathlib import Path +from typing import Optional + +from uipath.platform.common.auth import TokenData + +from ._models import AccessTokenData + + +def parse_access_token(access_token: str) -> AccessTokenData: + token_parts = access_token.split(".") + if len(token_parts) < 2: + raise ValueError("Invalid access token: expected a JWT with at least 2 parts") + payload = base64.urlsafe_b64decode( + token_parts[1] + "=" * (-len(token_parts[1]) % 4) + ) + return json.loads(payload) + + +def update_auth_file(token_data: TokenData) -> None: + os.makedirs(Path.cwd() / ".uipath", exist_ok=True) + auth_file = Path.cwd() / ".uipath" / ".auth.json" + with open(auth_file, "w") as f: + json.dump(token_data.model_dump(exclude_none=True), f) + + +def get_auth_data() -> TokenData: + auth_file = Path.cwd() / ".uipath" / ".auth.json" + if not auth_file.exists(): + raise FileNotFoundError( + "No authentication file found. Run 'uipath auth' first." + ) + return TokenData.model_validate(json.load(open(auth_file))) + + +def get_parsed_token_data(token_data: Optional[TokenData] = None) -> AccessTokenData: + if not token_data: + token_data = get_auth_data() + return parse_access_token(token_data.access_token) diff --git a/packages/uipath/src/uipath/_cli/_auth/auth_config_25_10.json b/packages/uipath-platform/src/uipath/platform/auth/auth_config_25_10.json similarity index 100% rename from packages/uipath/src/uipath/_cli/_auth/auth_config_25_10.json rename to packages/uipath-platform/src/uipath/platform/auth/auth_config_25_10.json diff --git a/packages/uipath/src/uipath/_cli/_auth/auth_config_cloud.json b/packages/uipath-platform/src/uipath/platform/auth/auth_config_cloud.json similarity index 100% rename from packages/uipath/src/uipath/_cli/_auth/auth_config_cloud.json rename to packages/uipath-platform/src/uipath/platform/auth/auth_config_cloud.json diff --git a/packages/uipath-platform/src/uipath/platform/studio_web/__init__.py b/packages/uipath-platform/src/uipath/platform/studio_web/__init__.py new file mode 100644 index 000000000..80663592a --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/studio_web/__init__.py @@ -0,0 +1,5 @@ +"""Studio Web service for UiPath Platform.""" + +from ._studio_web_service import StudioWebService + +__all__ = ["StudioWebService"] diff --git a/packages/uipath-platform/src/uipath/platform/studio_web/_studio_web_service.py b/packages/uipath-platform/src/uipath/platform/studio_web/_studio_web_service.py new file mode 100644 index 000000000..1f51fad0b --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/studio_web/_studio_web_service.py @@ -0,0 +1,30 @@ +from ..common._base_service import BaseService +from ..common._config import UiPathApiConfig +from ..common._execution_context import UiPathExecutionContext + + +class StudioWebService(BaseService): + """Service for enabling UiPath Studio Web.""" + + def __init__( + self, config: UiPathApiConfig, execution_context: UiPathExecutionContext + ) -> None: + super().__init__(config=config, execution_context=execution_context) + + def enable(self) -> None: + """Enable Studio Web (TryEnableFirstRun + AcquireLicense).""" + urls = [ + "/orchestrator_/api/StudioWeb/TryEnableFirstRun", + "/orchestrator_/api/StudioWeb/AcquireLicense", + ] + for url in urls: + self.request("POST", url) + + async def enable_async(self) -> None: + """Enable Studio Web (TryEnableFirstRun + AcquireLicense).""" + urls = [ + "/orchestrator_/api/StudioWeb/TryEnableFirstRun", + "/orchestrator_/api/StudioWeb/AcquireLicense", + ] + for url in urls: + await self.request_async("POST", url) diff --git a/packages/uipath-platform/tests/services/auth/__init__.py b/packages/uipath-platform/tests/services/auth/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/uipath/tests/cli/test_oidc_utils.py b/packages/uipath-platform/tests/services/auth/test_oidc_utils.py similarity index 68% rename from packages/uipath/tests/cli/test_oidc_utils.py rename to packages/uipath-platform/tests/services/auth/test_oidc_utils.py index 8dfd23c50..e5cb1d197 100644 --- a/packages/uipath/tests/cli/test_oidc_utils.py +++ b/packages/uipath-platform/tests/services/auth/test_oidc_utils.py @@ -1,5 +1,5 @@ """ -Unit tests for OidcUtils.get_auth_config() method. +Unit tests for OIDC utility functions and AuthService config/URL methods. IMPORTANT: Backwards Compatibility Notice ========================================= @@ -13,8 +13,8 @@ import pytest -from uipath._cli._auth._oidc_utils import ( - OidcUtils, +from uipath.platform.auth._auth_service import AuthService +from uipath.platform.auth._oidc_utils import ( _get_version_from_api, _is_cloud_domain, _select_config_file, @@ -22,34 +22,27 @@ class TestOidcUtils: - """Test suite for OidcUtils class.""" + """Test suite for OIDC utility functions.""" def test_auth_config_backwards_compatibility_v2025_10(self): """ Test that auth_config_25_10.json maintains backwards compatibility with release/v2025.10. - - This test validates that the authentication configuration values remain - unchanged to ensure compatibility with release/v2025.10 and later branches. - - CRITICAL: Any failure indicates a breaking change that requires coordination - across all supported release branches. """ - # Read the actual auth_config_25_10.json file config_path = os.path.join( os.path.dirname(__file__), "..", "..", + "..", "src", "uipath", - "_cli", - "_auth", + "platform", + "auth", "auth_config_25_10.json", ) with open(config_path, "r") as f: actual_config = json.load(f) - # Assert exact values for non-scope fields assert actual_config["client_id"] == "36dea5b8-e8bb-423d-8e7b-c808df8f1c00", ( f"BACKWARDS COMPATIBILITY VIOLATION: client_id has changed! " f"Expected: 36dea5b8-e8bb-423d-8e7b-c808df8f1c00, Got: {actual_config['client_id']}" @@ -68,7 +61,6 @@ def test_auth_config_backwards_compatibility_v2025_10(self): f"Expected: 8104, Got: {actual_config['port']}" ) - # For scopes, ensure actual scopes are a subset of the allowed scopes (no new scopes allowed) allowed_scopes = set( [ "offline_access", @@ -111,7 +103,7 @@ def test_auth_config_backwards_compatibility_v2025_10(self): ("https://alpha.uipath.com", True), ("https://staging.uipath.com", True), ("https://cloud.uipath.com", True), - ("https://ALPHA.UIPATH.COM", True), # Test case insensitivity + ("https://ALPHA.UIPATH.COM", True), ("https://custom.domain.com", False), ("https://cloud.uipath.dev", False), ("https://alpha-test.uipath.com", False), @@ -165,69 +157,64 @@ def test_get_version_from_api_network_error(self): @pytest.mark.parametrize( "domain,mock_version,expected_config", [ - # Cloud domains should always use auth_config_cloud.json ("https://alpha.uipath.com", None, "auth_config_cloud.json"), ("https://staging.uipath.com", None, "auth_config_cloud.json"), ("https://cloud.uipath.com", None, "auth_config_cloud.json"), - # Version 25.10.* should use auth_config_25_10.json ( "https://custom.domain.com", "25.10.0-beta.415", "auth_config_25_10.json", ), ("https://custom.domain.com", "25.10.1", "auth_config_25_10.json"), - # Other versions should fallback to cloud config ("https://custom.domain.com", "24.10.0", "auth_config_cloud.json"), ("https://custom.domain.com", "26.1.0", "auth_config_cloud.json"), - # Unable to determine version should fallback to cloud config ("https://custom.domain.com", None, "auth_config_cloud.json"), ], ) def test_select_config_file(self, domain, mock_version, expected_config): """Test _select_config_file selects the correct config based on domain and version.""" with patch( - "uipath._cli._auth._oidc_utils._get_version_from_api", + "uipath.platform.auth._oidc_utils._get_version_from_api", return_value=mock_version, ): config_file = _select_config_file(domain) assert config_file == expected_config - def test_get_auth_config_without_domain(self): - """Test get_auth_config without domain parameter uses default config.""" - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils._find_free_port", return_value=8104 - ): - config = OidcUtils.get_auth_config() - assert config["client_id"] == "36dea5b8-e8bb-423d-8e7b-c808df8f1c00" - assert config["port"] == 8104 - def test_get_auth_config_with_cloud_domain(self): - """Test get_auth_config with cloud domain uses auth_config_cloud.json.""" - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils._find_free_port", return_value=8104 - ): - config = OidcUtils.get_auth_config("https://alpha.uipath.com") - assert config["client_id"] == "36dea5b8-e8bb-423d-8e7b-c808df8f1c00" - assert config["port"] == 8104 +class TestAuthServiceConfig: + """Test suite for AuthService auth_config and get_authorization_url.""" - def test_get_auth_config_with_25_10_version(self): - """Test get_auth_config with version 25.10 uses auth_config_25_10.json.""" - mock_response = MagicMock() - mock_response.json.return_value = {"version": "25.10.0-beta.415"} - mock_response.raise_for_status = MagicMock() + def test_auth_config_cloud_domain(self): + """Test auth_config with cloud domain.""" + service = AuthService("https://cloud.uipath.com") + config = service.auth_config + assert config.client_id == "36dea5b8-e8bb-423d-8e7b-c808df8f1c00" + assert "offline_access" in config.scope - mock_client = MagicMock() - mock_client.get.return_value = mock_response - mock_client.__enter__.return_value = mock_client - mock_client.__exit__ = MagicMock() + def test_auth_config_cached(self): + """Test auth_config is cached (same object returned).""" + service = AuthService("https://cloud.uipath.com") + config1 = service.auth_config + config2 = service.auth_config + assert config1 is config2 - with ( - patch("httpx.Client", return_value=mock_client), - patch( - "uipath._cli._auth._oidc_utils.OidcUtils._find_free_port", - return_value=8104, - ), - ): - config = OidcUtils.get_auth_config("https://custom.domain.com") - assert config["client_id"] == "36dea5b8-e8bb-423d-8e7b-c808df8f1c00" - assert config["port"] == 8104 + def test_get_authorization_url_returns_model(self): + """Test get_authorization_url returns AuthorizationRequest model.""" + service = AuthService("https://cloud.uipath.com") + result = service.get_authorization_url("http://localhost:8104/oidc/login") + + assert result.url.startswith( + "https://cloud.uipath.com/identity_/connect/authorize" + ) + assert "client_id=" in result.url + assert "redirect_uri=" in result.url + assert "code_challenge=" in result.url + assert len(result.code_verifier) > 0 + assert len(result.state) > 0 + + def test_get_authorization_url_uses_provided_redirect_uri(self): + """Test that redirect_uri param is included in the auth URL.""" + service = AuthService("https://alpha.uipath.com") + redirect = "http://localhost:9999/callback" + result = service.get_authorization_url(redirect) + assert "localhost%3A9999" in result.url or "localhost:9999" in result.url diff --git a/packages/uipath-platform/tests/services/auth/test_portal_service_ensure_valid_token.py b/packages/uipath-platform/tests/services/auth/test_portal_service_ensure_valid_token.py new file mode 100644 index 000000000..bda088ccb --- /dev/null +++ b/packages/uipath-platform/tests/services/auth/test_portal_service_ensure_valid_token.py @@ -0,0 +1,144 @@ +""" +Tests for AuthService.ensure_valid_token method. +""" + +import time +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from uipath.platform.auth import AuthenticationError +from uipath.platform.auth._auth_service import AuthService +from uipath.platform.common import TokenData + + +@pytest.fixture +def sample_token_data(): + """Sample refreshed token data.""" + return { + "access_token": "new_access_token_123", + "refresh_token": "new_refresh_token_456", + "expires_in": 3600, + "token_type": "Bearer", + "scope": "openid profile offline_access", + "id_token": "id_token_789", + } + + +def _make_async_client_mock(response_mock): + mock_client = MagicMock() + mock_client.post = AsyncMock(return_value=response_mock) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + return mock_client + + +class TestAuthServiceEnsureValidToken: + """Test class for AuthService.ensure_valid_token.""" + + @pytest.mark.asyncio + async def test_returns_same_token_if_still_valid(self): + """Valid token is returned as-is, no refresh.""" + future_exp = time.time() + 3600 + token_data = TokenData( + access_token=_make_jwt({"exp": future_exp, "prt_id": "p1"}), + refresh_token="rt", + expires_in=3600, + token_type="Bearer", + scope="openid", + ) + + service = AuthService("https://cloud.uipath.com") + + with patch("httpx.AsyncClient") as mock_cls: + result = await service.ensure_valid_token(token_data) + + mock_cls.assert_not_called() + assert result is token_data + + @pytest.mark.asyncio + async def test_refreshes_expired_token(self, sample_token_data): + """Expired token triggers refresh and returns new token.""" + past_exp = time.time() - 3600 + old_token = TokenData( + access_token=_make_jwt({"exp": past_exp, "prt_id": "p1"}), + refresh_token="valid_refresh", + expires_in=3600, + token_type="Bearer", + scope="openid", + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = sample_token_data + mock_client = _make_async_client_mock(mock_response) + + service = AuthService("https://cloud.uipath.com") + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await service.ensure_valid_token(old_token) + + assert result.access_token == sample_token_data["access_token"] + mock_client.post.assert_called_once() + + @pytest.mark.asyncio + async def test_raises_if_no_refresh_token(self): + """Expired token without refresh_token raises AuthenticationError.""" + past_exp = time.time() - 3600 + token_data = TokenData( + access_token=_make_jwt({"exp": past_exp, "prt_id": "p1"}), + expires_in=3600, + token_type="Bearer", + scope="openid", + ) + + service = AuthService("https://cloud.uipath.com") + + with pytest.raises(AuthenticationError, match="No refresh token found"): + await service.ensure_valid_token(token_data) + + @pytest.mark.parametrize( + "domain", + [ + "https://cloud.uipath.com", + "https://alpha.uipath.com", + "https://staging.uipath.com", + "https://custom.automationsuite.org", + ], + ) + @pytest.mark.asyncio + async def test_refresh_uses_correct_domain(self, domain, sample_token_data): + """Ensure refresh call goes to the correct domain's token endpoint.""" + past_exp = time.time() - 3600 + old_token = TokenData( + access_token=_make_jwt({"exp": past_exp, "prt_id": "p1"}), + refresh_token="rt", + expires_in=3600, + token_type="Bearer", + scope="openid", + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = sample_token_data + mock_client = _make_async_client_mock(mock_response) + + service = AuthService(domain) + + with patch("httpx.AsyncClient", return_value=mock_client): + await service.ensure_valid_token(old_token) + + expected_url = f"{domain}/identity_/connect/token" + actual_url = mock_client.post.call_args[0][0] + assert actual_url == expected_url + + +def _make_jwt(claims: dict[str, Any]) -> str: + """Create a minimal JWT with the given claims for testing.""" + import base64 + import json + + header = base64.urlsafe_b64encode(json.dumps({"alg": "none"}).encode()).rstrip(b"=") + payload = base64.urlsafe_b64encode(json.dumps(claims).encode()).rstrip(b"=") + return f"{header.decode()}.{payload.decode()}.sig" diff --git a/packages/uipath-platform/tests/services/auth/test_portal_service_refresh_token.py b/packages/uipath-platform/tests/services/auth/test_portal_service_refresh_token.py new file mode 100644 index 000000000..e064a798b --- /dev/null +++ b/packages/uipath-platform/tests/services/auth/test_portal_service_refresh_token.py @@ -0,0 +1,199 @@ +""" +Unit tests for AuthService._refresh_access_token and ensure_valid_token methods. +""" + +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from uipath.platform.auth import AuthenticationError +from uipath.platform.auth._auth_service import AuthService +from uipath.platform.auth._url_utils import resolve_domain + + +@pytest.fixture +def mock_auth_config(): + """Mock auth config fixture.""" + return { + "client_id": "test_client_id", + "port": 8104, + "redirect_uri": "http://localhost:8104/callback", + "scope": "openid profile offline_access", + } + + +@pytest.fixture +def sample_token_data(): + """Sample token data for testing.""" + return { + "access_token": "new_access_token_123", + "refresh_token": "new_refresh_token_456", + "expires_in": 3600, + "token_type": "Bearer", + "scope": "openid profile offline_access", + "id_token": "id_token_789", + } + + +def _make_async_client_mock(response_mock): + """Create an AsyncClient context manager mock returning the given response.""" + mock_client = MagicMock() + mock_client.post = AsyncMock(return_value=response_mock) + mock_client.get = AsyncMock(return_value=response_mock) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + return mock_client + + +class TestAuthServiceRefreshToken: + """Test class for AuthService refresh token functionality.""" + + @pytest.mark.parametrize( + "environment, expected_token_url", + [ + ("cloud", "https://cloud.uipath.com/identity_/connect/token"), + ("alpha", "https://alpha.uipath.com/identity_/connect/token"), + ("staging", "https://staging.uipath.com/identity_/connect/token"), + ], + ) + @pytest.mark.asyncio + async def test_refresh_token_different_domains( + self, environment, expected_token_url, sample_token_data + ): + """Test refresh token request with different domain configurations.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = sample_token_data + + mock_client = _make_async_client_mock(mock_response) + + domain = resolve_domain(None, environment) + service = AuthService(domain) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await service._refresh_access_token("test_refresh_token") + + mock_client.post.assert_called_once_with( + expected_token_url, + data={ + "grant_type": "refresh_token", + "refresh_token": "test_refresh_token", + "client_id": service.auth_config.client_id, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + assert result.access_token == sample_token_data["access_token"] + assert result.refresh_token == sample_token_data["refresh_token"] + + @pytest.mark.parametrize( + "env_var_url, environment, expected_token_url", + [ + ( + "https://custom.automationsuite.org/org/tenant", + None, + "https://custom.automationsuite.org/identity_/connect/token", + ), + ( + "https://mycompany.uipath.com/org/tenant/", + None, + "https://mycompany.uipath.com/identity_/connect/token", + ), + ( + "https://custom.automationsuite.org/org/tenant", + "alpha", + "https://alpha.uipath.com/identity_/connect/token", + ), + ( + "https://custom.automationsuite.org/org/tenant", + "staging", + "https://staging.uipath.com/identity_/connect/token", + ), + ( + "https://custom.automationsuite.org/org/tenant", + "cloud", + "https://cloud.uipath.com/identity_/connect/token", + ), + ], + ) + @pytest.mark.asyncio + async def test_refresh_token_with_uipath_url_env( + self, + env_var_url, + environment, + expected_token_url, + sample_token_data, + ): + """Test refresh token request with UIPATH_URL environment variable.""" + original_env = os.environ.get("UIPATH_URL") + os.environ["UIPATH_URL"] = env_var_url + + try: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = sample_token_data + + mock_client = _make_async_client_mock(mock_response) + + domain = resolve_domain(None, environment) + service = AuthService(domain) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await service._refresh_access_token("test_refresh_token") + + assert result.access_token == sample_token_data["access_token"] + assert result.refresh_token == sample_token_data["refresh_token"] + finally: + if original_env is not None: + os.environ["UIPATH_URL"] = original_env + elif "UIPATH_URL" in os.environ: + del os.environ["UIPATH_URL"] + + @pytest.mark.asyncio + async def test_refresh_token_unauthorized(self): + """Test refresh token request with 401 raises AuthenticationError.""" + mock_response = MagicMock() + mock_response.status_code = 401 + + mock_client = _make_async_client_mock(mock_response) + + service = AuthService("https://cloud.uipath.com") + + with patch("httpx.AsyncClient", return_value=mock_client): + with pytest.raises(AuthenticationError, match="Unauthorized"): + await service._refresh_access_token("test_refresh_token") + + @pytest.mark.asyncio + async def test_refresh_token_server_error(self): + """Test refresh token request with 500 raises AuthenticationError.""" + mock_response = MagicMock() + mock_response.status_code = 500 + + mock_client = _make_async_client_mock(mock_response) + + service = AuthService("https://cloud.uipath.com") + + with patch("httpx.AsyncClient", return_value=mock_client): + with pytest.raises( + AuthenticationError, match="Failed to refresh token: 500" + ): + await service._refresh_access_token("test_refresh_token") + + @pytest.mark.asyncio + async def test_refresh_token_response_format(self, sample_token_data): + """Test successful refresh returns proper TokenData.""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = sample_token_data + + mock_client = _make_async_client_mock(mock_response) + + service = AuthService("https://cloud.uipath.com") + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await service._refresh_access_token("test_refresh_token") + + assert result.access_token == sample_token_data["access_token"] + assert result.refresh_token == sample_token_data["refresh_token"] + assert result.expires_in is not None + assert result.token_type is not None diff --git a/packages/uipath-platform/uv.lock b/packages/uipath-platform/uv.lock index cc5073ff8..46906e878 100644 --- a/packages/uipath-platform/uv.lock +++ b/packages/uipath-platform/uv.lock @@ -1048,15 +1048,33 @@ wheels = [ [[package]] name = "uipath-core" version = "0.5.6" -source = { registry = "https://pypi.org/simple" } +source = { directory = "../uipath-core" } dependencies = [ { name = "opentelemetry-instrumentation" }, { name = "opentelemetry-sdk" }, { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ea/8a/d129d33a81865f99d9134391a52f8691f557d95a18a38df4d88917b3e235/uipath_core-0.5.6.tar.gz", hash = "sha256:bebaf2e62111e844739e4f4e4dc47c48bac93b7e6fce6754502a9f4979c41888", size = 112659, upload-time = "2026-03-04T18:04:42.963Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/54/8f/77ab712518aa2a8485a558a0de245ac425e07fd8b74cfa8951550f0aea63/uipath_core-0.5.6-py3-none-any.whl", hash = "sha256:4a741fc760605165b0541b3abb6ade728bfa386e000ace00054bc43995720e5b", size = 42047, upload-time = "2026-03-04T18:04:41.606Z" }, + +[package.metadata] +requires-dist = [ + { name = "opentelemetry-instrumentation", specifier = ">=0.60b0,<1.0.0" }, + { name = "opentelemetry-sdk", specifier = ">=1.39.0,<2.0.0" }, + { name = "pydantic", specifier = ">=2.12.5,<3.0.0" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "bandit", specifier = ">=1.8.2" }, + { name = "mypy", specifier = ">=1.14.1" }, + { name = "pre-commit", specifier = ">=4.1.0" }, + { name = "pytest", specifier = ">=7.4.0" }, + { name = "pytest-asyncio", specifier = ">=1.0.0" }, + { name = "pytest-cov", specifier = ">=4.1.0" }, + { name = "pytest-httpx", specifier = ">=0.35.0" }, + { name = "pytest-mock", specifier = ">=3.11.1" }, + { name = "pytest-trio", specifier = ">=0.8.0" }, + { name = "ruff", specifier = ">=0.9.4" }, + { name = "rust-just", specifier = ">=1.39.0" }, ] [[package]] @@ -1092,7 +1110,7 @@ requires-dist = [ { name = "pydantic-function-models", specifier = ">=0.1.11" }, { name = "tenacity", specifier = ">=9.0.0" }, { name = "truststore", specifier = ">=0.10.1" }, - { name = "uipath-core", specifier = ">=0.5.4,<0.6.0" }, + { name = "uipath-core", directory = "../uipath-core" }, ] [package.metadata.requires-dev] diff --git a/packages/uipath/docs/core/auth.md b/packages/uipath/docs/core/auth.md new file mode 100644 index 000000000..ac97e7a54 --- /dev/null +++ b/packages/uipath/docs/core/auth.md @@ -0,0 +1 @@ +::: uipath.platform.auth._auth_service diff --git a/packages/uipath/mkdocs.yml b/packages/uipath/mkdocs.yml index ea36fccc0..8776e8d6a 100644 --- a/packages/uipath/mkdocs.yml +++ b/packages/uipath/mkdocs.yml @@ -61,6 +61,7 @@ nav: - CLI Reference: cli/index.md - Tracing: core/traced.md - Services: + - Auth: core/auth.md - Assets: core/assets.md - Attachments: core/attachments.md - Buckets: core/buckets.md diff --git a/packages/uipath/pyproject.toml b/packages/uipath/pyproject.toml index a1e30f8e6..4cb1264f0 100644 --- a/packages/uipath/pyproject.toml +++ b/packages/uipath/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.11" dependencies = [ "uipath-core>=0.5.2, <0.6.0", "uipath-runtime>=0.9.1, <0.10.0", - "uipath-platform>=0.0.4, <0.1.0", + "uipath-platform>=0.0.17, <0.1.0", "click>=8.3.1", "httpx>=0.28.1", "pyjwt>=2.10.1", @@ -148,8 +148,8 @@ show_missing = true source = ["src"] [tool.uv.sources] -uipath-core = { path = "../uipath-core", editable = true } -uipath-platform = { path = "../uipath-platform", editable = true } +uipath-core = { path = "../uipath-core", editable = false } +uipath-platform = { path = "../uipath-platform", editable = false } [[tool.uv.index]] name = "testpypi" diff --git a/packages/uipath/src/uipath/_cli/_auth/_auth_service.py b/packages/uipath/src/uipath/_cli/_auth/_auth_service.py index b160b6978..125ffa466 100644 --- a/packages/uipath/src/uipath/_cli/_auth/_auth_service.py +++ b/packages/uipath/src/uipath/_cli/_auth/_auth_service.py @@ -1,22 +1,72 @@ -import asyncio import os import webbrowser +from pathlib import Path +from socket import AF_INET, SOCK_STREAM, error, socket +from typing import Any + +import click from uipath._cli._auth._auth_server import HTTPServer -from uipath._cli._auth._oidc_utils import OidcUtils -from uipath._cli._auth._portal_service import ( - PortalService, -) -from uipath._cli._auth._url_utils import extract_org_tenant, resolve_domain -from uipath._cli._auth._utils import get_parsed_token_data from uipath._cli._utils._console import ConsoleLogger -from uipath._utils._auth import update_env_file +from uipath.platform import UiPath +from uipath.platform.auth import ( + AuthService, + TenantsAndOrganizationInfoResponse, + extract_org_tenant, + get_auth_data, + get_parsed_token_data, + resolve_domain, + update_auth_file, +) from uipath.platform.common import ExternalApplicationService, TokenData -from ._utils import update_auth_file +def _find_free_port(candidates: list[int]) -> int | None: + """Find the first free port from the given candidates.""" + + def is_free(port: int) -> bool: + with socket(AF_INET, SOCK_STREAM) as s: + try: + s.bind(("localhost", port)) + return True + except error: + return False + + return next((p for p in candidates if is_free(p)), None) + + +def _get_redirect_uri_and_port() -> tuple[str, int]: + """Resolve a free port and build the redirect URI.""" + custom_port = os.getenv("UIPATH_AUTH_PORT") + candidates = [int(custom_port)] if custom_port else [8104, 8055, 42042] -class AuthService: + port = _find_free_port(candidates) + if port is None: + ports_str = ", ".join(str(p) for p in candidates) + raise ValueError( + f"All configured ports ({ports_str}) are in use. " + "Please close applications using these ports or configure different ports via UIPATH_AUTH_PORT." + ) + + redirect_uri = f"http://localhost:{port}/oidc/login" + return redirect_uri, port + + +def update_env_file(env_contents: dict[str, Any]) -> None: + env_path = Path.cwd() / ".env" + if env_path.exists(): + with open(env_path, "r") as f: + for line in f: + if "=" in line: + key, value = line.strip().split("=", 1) + if key not in env_contents: + env_contents[key] = value + lines = [f"{key}={value}\n" for key, value in env_contents.items()] + with open(env_path, "w") as f: + f.writelines(lines) + + +class AuthHandler: def __init__( self, environment: str | None, @@ -36,15 +86,16 @@ def __init__( self._tenant = tenant self._domain = resolve_domain(self._base_url, environment) self._scope = scope + self._auth_service = AuthService(self._domain) - def authenticate(self) -> None: + async def authenticate(self) -> None: if self._client_id and self._client_secret: - self._authenticate_client_credentials() + await self._authenticate_client_credentials() return - self._authenticate_authorization_code() + await self._authenticate_authorization_code() - def _authenticate_client_credentials(self): + async def _authenticate_client_credentials(self): assert self._client_id and self._client_secret, ( "Client ID and Client Secret must be provided." ) @@ -72,50 +123,56 @@ def _authenticate_client_credentials(self): if tenant_name: self._tenant = tenant_name - with PortalService(self._domain) as portal_service: - portal_service.update_token_data(token_data) - tenant_info = portal_service.resolve_tenant_info(self._tenant) - env_vars["UIPATH_TENANT_ID"] = tenant_info["tenant_id"] + data = await self._auth_service.get_tenants_and_organizations( + token_data.access_token + ) + tenant_info = self._find_tenant(data, self._tenant) + env_vars["UIPATH_TENANT_ID"] = tenant_info["tenant_id"] else: self._console.warning("Could not extract tenant from --base-url.") update_env_file(env_vars) - def _authenticate_authorization_code(self) -> None: - with PortalService(self._domain) as portal_service: - if not self._force and self._can_reuse_existing_token(portal_service): - return - - token_data = self._perform_oauth_flow() - portal_service.update_token_data(token_data) - update_auth_file(token_data) - - tenant_info = portal_service.resolve_tenant_info(self._tenant) - uipath_url = portal_service.build_tenant_url() - - update_env_file( - { - "UIPATH_ACCESS_TOKEN": token_data.access_token, - "UIPATH_URL": uipath_url, - "UIPATH_TENANT_ID": tenant_info["tenant_id"], - "UIPATH_ORGANIZATION_ID": tenant_info["organization_id"], - } - ) + async def _authenticate_authorization_code(self) -> None: + if not self._force and await self._can_reuse_existing_token(): + return - try: - portal_service.enable_studio_web(uipath_url) - except Exception: - self._console.error( - "Could not prepare the environment. Please try again." - ) + token_data = await self._perform_oauth_flow() + update_auth_file(token_data) + + data = await self._auth_service.get_tenants_and_organizations( + token_data.access_token + ) + tenant_info = await self._resolve_tenant_info(data, self._tenant) + organization_name = data["organization"]["name"] + uipath_url = f"{self._domain}/{organization_name}/{tenant_info['tenant_name']}" + + update_env_file( + { + "UIPATH_ACCESS_TOKEN": token_data.access_token, + "UIPATH_URL": uipath_url, + "UIPATH_TENANT_ID": tenant_info["tenant_id"], + "UIPATH_ORGANIZATION_ID": tenant_info["organization_id"], + } + ) + + try: + client = UiPath(base_url=uipath_url, secret=token_data.access_token) + await client.studio_web.enable_async() + except Exception: + self._console.error("Could not prepare the environment. Please try again.") - def _can_reuse_existing_token(self, portal_service: PortalService) -> bool: + async def _can_reuse_existing_token(self) -> bool: if ( os.getenv("UIPATH_URL") and os.getenv("UIPATH_TENANT_ID") and os.getenv("UIPATH_ORGANIZATION_ID") ): try: - portal_service.ensure_valid_token() + auth_data = get_auth_data() + token_data = await self._auth_service.ensure_valid_token(auth_data) + if token_data is not auth_data: + update_auth_file(token_data) + update_env_file({"UIPATH_ACCESS_TOKEN": token_data.access_token}) return True except Exception: self._console.error( @@ -123,19 +180,19 @@ def _can_reuse_existing_token(self, portal_service: PortalService) -> bool: ) return False - def _perform_oauth_flow(self) -> TokenData: - auth_config = OidcUtils.get_auth_config(self._domain) - auth_url, code_verifier, state = OidcUtils.get_auth_url( - self._domain, auth_config - ) - self._open_browser(auth_url) + async def _perform_oauth_flow(self) -> TokenData: + redirect_uri, port = _get_redirect_uri_and_port() + auth_request = self._auth_service.get_authorization_url(redirect_uri) + self._open_browser(auth_request.url) server = HTTPServer( - port=auth_config["port"], - redirect_uri=auth_config["redirect_uri"], - client_id=auth_config["client_id"], + port=port, + redirect_uri=redirect_uri, + client_id=self._auth_service.auth_config.client_id, + ) + token_data = await server.start( + auth_request.state, auth_request.code_verifier, self._domain ) - token_data = asyncio.run(server.start(state, code_verifier, self._domain)) if not token_data: self._console.error( @@ -144,8 +201,52 @@ def _perform_oauth_flow(self) -> TokenData: return TokenData.model_validate(token_data) + async def _resolve_tenant_info( + self, data: TenantsAndOrganizationInfoResponse, tenant: str | None + ) -> dict[str, Any]: + if tenant: + return self._find_tenant(data, tenant) + return self._select_tenant(data) + + def _find_tenant( + self, data: TenantsAndOrganizationInfoResponse, tenant_name: str + ) -> dict[str, Any]: + """Find a tenant by name from the tenants/org response.""" + organization = data["organization"] + tenants = data["tenants"] + tenant = next((t for t in tenants if t["name"] == tenant_name), None) + if not tenant: + raise ValueError(f"Tenant '{tenant_name}' not found.") + return { + "tenant_id": tenant["id"], + "organization_id": organization["id"], + "tenant_name": tenant["name"], + } + + def _select_tenant( + self, data: TenantsAndOrganizationInfoResponse + ) -> dict[str, Any]: + """Interactively select a tenant from the list.""" + organization = data["organization"] + tenants = data["tenants"] + tenant_names = [t["name"] for t in tenants] + + self._console.display_options(tenant_names, "Select tenant:") + tenant_idx = ( + 0 + if len(tenant_names) == 1 + else self._console.prompt("Select tenant number", type=int) + ) + + tenant = tenants[tenant_idx] + self._console.info(f"Selected tenant: {click.style(tenant['name'], fg='cyan')}") + return { + "tenant_id": tenant["id"], + "organization_id": organization["id"], + "tenant_name": tenant["name"], + } + def _open_browser(self, url: str) -> None: - # Try to open browser. Always print the fallback link. webbrowser.open(url, new=1) self._console.link( "If a browser window did not open, please open the following URL in your browser:", diff --git a/packages/uipath/src/uipath/_cli/_auth/_oidc_utils.py b/packages/uipath/src/uipath/_cli/_auth/_oidc_utils.py deleted file mode 100644 index b6acdd6a5..000000000 --- a/packages/uipath/src/uipath/_cli/_auth/_oidc_utils.py +++ /dev/null @@ -1,195 +0,0 @@ -import base64 -import hashlib -import json -import os -from urllib.parse import urlencode, urlparse - -import httpx - -from ..._utils._ssl_context import get_httpx_client_kwargs -from .._utils._console import ConsoleLogger -from ._models import AuthConfig -from ._url_utils import build_service_url - - -def generate_code_verifier_and_challenge(): - """Generate PKCE code verifier and challenge.""" - code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8").rstrip("=") - - code_challenge_bytes = hashlib.sha256(code_verifier.encode("utf-8")).digest() - code_challenge = ( - base64.urlsafe_b64encode(code_challenge_bytes).decode("utf-8").rstrip("=") - ) - - return code_verifier, code_challenge - - -def get_state_param() -> str: - return base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8").rstrip("=") - - -def _get_version_from_api(domain: str) -> str | None: - """Fetch the version from the UiPath orchestrator API. - - Args: - domain: The UiPath domain (e.g., 'https://alpha.uipath.com') - - Returns: - The version string (e.g., '25.10.0-beta.415') or None if unable to fetch - """ - try: - version_url = build_service_url(domain, "/orchestrator_/api/status/version") - client_kwargs = get_httpx_client_kwargs() - # Override timeout for version check - client_kwargs["timeout"] = 5.0 - - with httpx.Client(**client_kwargs) as client: - response = client.get(version_url) - response.raise_for_status() - data = response.json() - return data.get("version") - except Exception: - # Silently fail and return None if we can't fetch the version - return None - - -def _is_cloud_domain(domain: str) -> bool: - """Check if the domain is a cloud domain (alpha, staging, or cloud.uipath.com). - - Args: - domain: The domain string (e.g., 'https://alpha.uipath.com') - - Returns: - True if it's a cloud domain, False otherwise - """ - parsed = urlparse(domain) - netloc = parsed.netloc.lower() - return netloc in [ - "alpha.uipath.com", - "staging.uipath.com", - "cloud.uipath.com", - ] - - -def _select_config_file(domain: str) -> str: - """Select the appropriate auth config file based on domain and version. - - Logic: - 1. If domain is alpha/staging/cloud.uipath.com -> use auth_config_cloud.json - 2. Otherwise, try to get version from API - 3. If version starts with '25.10' -> use auth_config_25_10.json - 4. If version can't be determined -> fallback to auth_config_cloud.json - 5. Otherwise -> fallback to auth_config_cloud.json - - Args: - domain: The UiPath domain - - Returns: - The filename of the config to use - """ - # Check if it's a known cloud domain - if _is_cloud_domain(domain): - return "auth_config_cloud.json" - - # Try to get version from API - version = _get_version_from_api(domain) - - # If we can't determine version, fallback to cloud config - if version is None: - return "auth_config_cloud.json" - - # Check if version is 25.10.* - if version.startswith("25.10"): - return "auth_config_25_10.json" - - # Default fallback to cloud config - return "auth_config_cloud.json" - - -class OidcUtils: - _console = ConsoleLogger() - - @classmethod - def _find_free_port(cls, candidates: list[int]): - from socket import AF_INET, SOCK_STREAM, error, socket - - def is_free(port: int) -> bool: - with socket(AF_INET, SOCK_STREAM) as s: - try: - s.bind(("localhost", port)) - return True - except error: - return False - - return next((p for p in candidates if is_free(p)), None) - - @classmethod - def get_auth_config(cls, domain: str | None = None) -> AuthConfig: - """Get the appropriate auth configuration based on domain. - - Args: - domain: The UiPath domain (e.g., 'https://cloud.uipath.com'). - If None, uses default auth_config_cloud.json - - Returns: - AuthConfig with the appropriate configuration - """ - # Select the appropriate config file based on domain - if domain: - config_file = _select_config_file(domain) - else: - config_file = "auth_config_cloud.json" - - config_path = os.path.join(os.path.dirname(__file__), config_file) - with open(config_path, "r") as f: - auth_config = json.load(f) - - custom_port = os.getenv("UIPATH_AUTH_PORT") - candidates = [int(custom_port)] if custom_port else [8104, 8055, 42042] - - port = cls._find_free_port(candidates) - if port is None: - ports_str = ", ".join(str(p) for p in candidates) - cls._console.error( - f"All configured ports ({ports_str}) are in use. Please close applications using these ports or configure different ports." - ) - - redirect_uri = auth_config["redirect_uri"].replace( - "__PY_REPLACE_PORT__", str(port) - ) - - return AuthConfig( - client_id=auth_config["client_id"], - redirect_uri=redirect_uri, - scope=auth_config["scope"], - port=port, - ) - - @classmethod - def get_auth_url(cls, domain: str, auth_config: AuthConfig) -> tuple[str, str, str]: - """Get the authorization URL for OAuth2 PKCE flow. - - Args: - domain (str): The UiPath domain to authenticate against (e.g. 'alpha', 'cloud') - auth_config (AuthConfig): The authentication configuration to use - - Returns: - tuple[str, str]: A tuple containing: - - The authorization URL with query parameters - - The code verifier for PKCE flow - """ - code_verifier, code_challenge = generate_code_verifier_and_challenge() - state = get_state_param() - query_params = { - "client_id": auth_config["client_id"], - "redirect_uri": auth_config["redirect_uri"], - "response_type": "code", - "scope": auth_config["scope"], - "state": state, - "code_challenge": code_challenge, - "code_challenge_method": "S256", - } - - query_string = urlencode(query_params) - url = build_service_url(domain, f"/identity_/connect/authorize?{query_string}") - return url, code_verifier, state diff --git a/packages/uipath/src/uipath/_cli/_auth/_portal_service.py b/packages/uipath/src/uipath/_cli/_auth/_portal_service.py deleted file mode 100644 index c3fc95cc9..000000000 --- a/packages/uipath/src/uipath/_cli/_auth/_portal_service.py +++ /dev/null @@ -1,228 +0,0 @@ -import time - -import click -import httpx - -from uipath.platform.common import TokenData -from uipath.runtime.errors import ( - UiPathErrorCategory, - UiPathErrorCode, - UiPathRuntimeError, -) - -from ..._utils._auth import update_env_file -from ..._utils._ssl_context import get_httpx_client_kwargs -from .._utils._console import ConsoleLogger -from ._models import OrganizationInfo, TenantInfo, TenantsAndOrganizationInfoResponse -from ._oidc_utils import OidcUtils -from ._url_utils import build_service_url -from ._utils import get_auth_data, get_parsed_token_data, update_auth_file - - -class PortalService: - """Service for interacting with the UiPath Portal API.""" - - access_token: str | None = None - prt_id: str | None = None - domain: str - selected_tenant: str | None = None - - _client: httpx.Client | None = None - _tenants_and_organizations: TenantsAndOrganizationInfoResponse | None = None - - def __init__( - self, - domain: str, - access_token: str | None = None, - prt_id: str | None = None, - ): - self.domain = domain - self.access_token = access_token - self.prt_id = prt_id - self._console = ConsoleLogger() - self._tenants_and_organizations = None - self._client = None - - @property - def client(self) -> httpx.Client: - if self._client is None: - self._client = httpx.Client(**get_httpx_client_kwargs()) - return self._client - - def close(self): - """Explicitly close the HTTP client.""" - if self._client: - self._client.close() - self._client = None - - def __enter__(self): - """Enter the runtime context related to this object.""" - return self - - def __exit__(self, exc_type, exc_value, traceback): - """Exit the runtime context and close the HTTP client.""" - self.close() - - def update_token_data(self, token_data: TokenData): - self.access_token = token_data.access_token - self.prt_id = get_parsed_token_data(token_data).get("prt_id") - - def get_tenants_and_organizations( - self, - ): - if self._tenants_and_organizations is not None: - return self._tenants_and_organizations - - url = build_service_url( - self.domain, - f"/{self.prt_id}/portal_/api/filtering/leftnav/tenantsAndOrganizationInfo", - ) - response = self.client.get( - url, headers={"Authorization": f"Bearer {self.access_token}"} - ) - if response.status_code < 400: - self._tenants_and_organizations = response.json() - return self._tenants_and_organizations - - if response.status_code == 401: - self._console.error("Unauthorized") - - self._console.error( - f"Failed to get tenants and organizations: {response.status_code} {response.text}" - ) - - def refresh_access_token(self, refresh_token: str) -> TokenData: - url = build_service_url(self.domain, "/identity_/connect/token") - client_id = OidcUtils.get_auth_config(self.domain).get("client_id") - - data = { - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "client_id": client_id, - } - - headers = {"Content-Type": "application/x-www-form-urlencoded"} - response = self.client.post(url, data=data, headers=headers) - if response.status_code < 400: - return TokenData.model_validate(response.json()) - - if response.status_code == 401: - self._console.error("Unauthorized") - - self._console.error(f"Failed to refresh token: {response.status_code}") - raise Exception(f"Failed to refresh token: {response.status_code}") - - def ensure_valid_token(self): - """Ensure the access token is valid and refresh it if necessary. - - This function should be called when running CLI commands to verify authentication. - It checks if an auth file exists and contains a valid non-expired token. - If the token is expired, it will attempt to refresh it. - If no auth file exists, it will raise an exception. - - Raises: - Exception: If no auth file exists or token refresh fails - """ - auth_data = get_auth_data() - claims = get_parsed_token_data(auth_data) - exp = claims.get("exp") - - def finalize(token_data: TokenData): - self.update_token_data(token_data) - update_auth_file(token_data) - update_env_file({"UIPATH_ACCESS_TOKEN": token_data.access_token}) - - if exp is not None and float(exp) > time.time(): - finalize(auth_data) - return - - refresh_token = auth_data.refresh_token - if not refresh_token: - raise UiPathRuntimeError( - UiPathErrorCode.EXECUTION_ERROR, - "No refresh token found", - "The refresh token could not be retrieved. Please retry authenticating.", - UiPathErrorCategory.SYSTEM, - ) - - token_data = self.refresh_access_token(refresh_token) - finalize(token_data) - - def enable_studio_web(self, base_url: str) -> None: - or_base_url = self.build_orchestrator_url(base_url) - - urls = [ - f"{or_base_url}/api/StudioWeb/TryEnableFirstRun", - f"{or_base_url}/api/StudioWeb/AcquireLicense", - ] - - for url in urls: - try: - resp = self.client.post( - url, headers={"Authorization": f"Bearer {self.access_token}"} - ) - if resp.status_code >= 400: - self._console.warning(f"Call to {url} failed: {resp.status_code}") - except httpx.HTTPError as e: - self._console.warning( - f"Exception during enable_studio_web request to {url}: {e}" - ) - - def _set_tenant(self, tenant: TenantInfo, organization: OrganizationInfo): - self.selected_tenant = tenant["name"] - return {"tenant_id": tenant["id"], "organization_id": organization["id"]} - - def _select_tenant(self): - data = self.get_tenants_and_organizations() - organization = data["organization"] - tenants = data["tenants"] - - tenant_names = [tenant["name"] for tenant in tenants] - - self._console.display_options(tenant_names, "Select tenant:") - tenant_idx = ( - 0 - if len(tenant_names) == 1 - else self._console.prompt("Select tenant number", type=int) - ) - - tenant = data["tenants"][tenant_idx] - - self._console.info(f"Selected tenant: {click.style(tenant['name'], fg='cyan')}") - return self._set_tenant(tenant, organization) - - def _retrieve_tenant( - self, - tenant_name: str, - ): - data = self.get_tenants_and_organizations() - organization = data["organization"] - tenants = data["tenants"] - - tenant = next((t for t in tenants if t["name"] == tenant_name), None) - if not tenant: - self._console.error(f"Tenant '{tenant_name}' not found.") - raise Exception(f"Tenant '{tenant_name}' not found.") - - return self._set_tenant(tenant, organization) - - def resolve_tenant_info(self, tenant: str | None = None): - if tenant: - return self._retrieve_tenant(tenant) - return self._select_tenant() - - def build_tenant_url(self) -> str: - data = self.get_tenants_and_organizations() - organization_name = data["organization"]["name"] - return f"{self.domain}/{organization_name}/{self.selected_tenant}" - - def build_orchestrator_url(self, base_url: str) -> str: - if base_url: - return f"{base_url}/orchestrator_" - data = self.get_tenants_and_organizations() - organization = data.get("organization") - if organization is None: - self._console.error("Organization not found.") - return "" - organization_name = organization.get("name") - return f"{self.domain}/{organization_name}/{self.selected_tenant}/orchestrator_" diff --git a/packages/uipath/src/uipath/_cli/_auth/_utils.py b/packages/uipath/src/uipath/_cli/_auth/_utils.py deleted file mode 100644 index 86f0cddc0..000000000 --- a/packages/uipath/src/uipath/_cli/_auth/_utils.py +++ /dev/null @@ -1,28 +0,0 @@ -import json -import os -from pathlib import Path - -from uipath.platform.common import TokenData - -from ..._utils._auth import parse_access_token -from ._models import AccessTokenData - - -def update_auth_file(token_data: TokenData): - os.makedirs(Path.cwd() / ".uipath", exist_ok=True) - auth_file = Path.cwd() / ".uipath" / ".auth.json" - with open(auth_file, "w") as f: - json.dump(token_data.model_dump(exclude_none=True), f) - - -def get_auth_data() -> TokenData: - auth_file = Path.cwd() / ".uipath" / ".auth.json" - if not auth_file.exists(): - raise Exception("No authentication file found") - return TokenData.model_validate(json.load(open(auth_file))) - - -def get_parsed_token_data(token_data: TokenData | None = None) -> AccessTokenData: - if not token_data: - token_data = get_auth_data() - return parse_access_token(token_data.access_token) diff --git a/packages/uipath/src/uipath/_cli/cli_auth.py b/packages/uipath/src/uipath/_cli/cli_auth.py index a014f0d2e..08847ac86 100644 --- a/packages/uipath/src/uipath/_cli/cli_auth.py +++ b/packages/uipath/src/uipath/_cli/cli_auth.py @@ -1,6 +1,8 @@ +import asyncio + import click -from ._auth._auth_service import AuthService +from ._auth._auth_service import AuthHandler from ._utils._common import environment_options from ._utils._console import ConsoleLogger @@ -67,18 +69,22 @@ def auth( - Set REQUESTS_CA_BUNDLE to specify a custom CA bundle for SSL verification - Set UIPATH_DISABLE_SSL_VERIFY to disable SSL verification (not recommended) """ - auth_service = AuthService( - environment=environment, - force=force, - client_id=client_id, - client_secret=client_secret, - base_url=base_url, - tenant=tenant, - scope=scope, - ) + try: + auth_handler = AuthHandler( + environment=environment, + force=force, + client_id=client_id, + client_secret=client_secret, + base_url=base_url, + tenant=tenant, + scope=scope, + ) + except ValueError as e: + console.error(str(e)) + with console.spinner("Authenticating with UiPath ..."): try: - auth_service.authenticate() + asyncio.run(auth_handler.authenticate()) console.success( "Authentication successful.", ) diff --git a/packages/uipath/tests/cli/test_auth.py b/packages/uipath/tests/cli/test_auth.py index 7550c2aba..338bad16a 100644 --- a/packages/uipath/tests/cli/test_auth.py +++ b/packages/uipath/tests/cli/test_auth.py @@ -1,5 +1,5 @@ import os -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from click.testing import CliRunner @@ -33,6 +33,31 @@ """ +def _mock_auth_service(domain="https://cloud.uipath.com"): + """Create a mock AuthService with default behavior.""" + mock = MagicMock() + mock.domain = domain + mock.auth_config.client_id = "test-client-id" + mock.auth_config.scope = "openid offline_access" + + def _get_auth_url(redirect_uri): + auth_request = MagicMock() + auth_request.url = f"{mock.domain}/identity_/connect/authorize?client_id=test-client-id&redirect_uri={redirect_uri}" + auth_request.code_verifier = "test-verifier" + auth_request.state = "test-state" + return auth_request + + mock.get_authorization_url = MagicMock(side_effect=_get_auth_url) + mock.get_tenants_and_organizations = AsyncMock( + return_value={ + "tenants": [{"name": "DefaultTenant", "id": "tenant-id"}], + "organization": {"name": "DefaultOrg", "id": "org-id"}, + } + ) + mock.ensure_valid_token = AsyncMock() + return mock + + class TestAuth: @pytest.mark.parametrize( "scenario_name, cli_args, env_vars, expected_url_part, expected_select_tenant_return", @@ -105,20 +130,20 @@ def test_auth_scenarios( patch("uipath._cli._auth._auth_service.webbrowser.open") as mock_open, patch("uipath._cli._auth._auth_service.HTTPServer") as mock_server, patch( - "uipath._cli._auth._auth_service.PortalService" - ) as mock_portal_service, + "uipath._cli._auth._auth_service.AuthService" + ) as mock_auth_service_cls, + patch("uipath._cli._auth._auth_service.UiPath") as mock_uipath_cls, ): + + def _create_mock_auth_service(domain): + return _mock_auth_service(domain) + + mock_auth_service_cls.side_effect = _create_mock_auth_service + mock_uipath_cls.return_value.studio_web.enable_async = AsyncMock() + mock_server.return_value.start = AsyncMock( return_value={"access_token": "test_token"} ) - mock_portal_service.return_value.__enter__.return_value.get_tenants_and_organizations.return_value = { - "tenants": [{"name": "DefaultTenant", "id": "tenant-id"}], - "organization": {"name": "DefaultOrg", "id": "org-id"}, - } - mock_portal_service.return_value.__enter__.return_value._select_tenant.return_value = { - "tenant_id": "tenant-id", - "organization_id": "org-id", - } with runner.isolated_filesystem(): for key, value in env_vars.items(): @@ -160,30 +185,31 @@ def test_auth_with_tenant_flag(self): patch("uipath._cli._auth._auth_service.webbrowser.open") as mock_open, patch("uipath._cli._auth._auth_service.HTTPServer") as mock_server, patch( - "uipath._cli._auth._auth_service.PortalService" - ) as mock_portal_service, - patch( - "uipath._cli._auth._url_utils.resolve_domain", - return_value="https://alpha.uipath.com", - ), + "uipath._cli._auth._auth_service.AuthService" + ) as mock_auth_service_cls, + patch("uipath._cli._auth._auth_service.UiPath") as mock_uipath_cls, ): + + def _create_mock_auth_service(domain): + svc = _mock_auth_service(domain) + svc.get_tenants_and_organizations = AsyncMock( + return_value={ + "tenants": [ + {"name": "MyTenantName", "id": "tenant-id"}, + {"name": "OtherTenant", "id": "other-id"}, + ], + "organization": {"name": "MyOrg", "id": "org-id"}, + } + ) + return svc + + mock_auth_service_cls.side_effect = _create_mock_auth_service + mock_uipath_cls.return_value.studio_web.enable_async = AsyncMock() + mock_server.return_value.start = AsyncMock( return_value={"access_token": "test_token"} ) - portal = mock_portal_service.return_value.__enter__.return_value - portal.get_tenants_and_organizations.return_value = { - "tenants": [ - {"name": "MyTenantName", "id": "tenant-id"}, - {"name": "OtherTenant", "id": "other-id"}, - ], - "organization": {"name": "MyOrg", "id": "org-id"}, - } - portal.resolve_tenant_info.return_value = { - "tenant_id": "tenant-id", - "organization_id": "org-id", - } - portal.selected_tenant = "MyTenantName" with runner.isolated_filesystem(): result = runner.invoke( cli, ["auth", "--alpha", "--tenant", "MyTenantName", "--force"] @@ -191,5 +217,3 @@ def test_auth_with_tenant_flag(self): assert result.exit_code == 0, result.output mock_open.assert_called_once() - - portal.resolve_tenant_info.assert_called_once_with("MyTenantName") diff --git a/packages/uipath/tests/cli/test_portal_service_ensure_valid_token.py b/packages/uipath/tests/cli/test_portal_service_ensure_valid_token.py deleted file mode 100644 index 0f9455353..000000000 --- a/packages/uipath/tests/cli/test_portal_service_ensure_valid_token.py +++ /dev/null @@ -1,343 +0,0 @@ -""" -Integration tests for PortalService.ensure_valid_token method. - -This test suite ensures that the ensure_valid_token method properly integrates -with the post_refresh_token_request method across all the domain scenarios -described in test_auth.py. -""" - -import os -import time -from unittest.mock import Mock, patch - -import pytest -from click.testing import CliRunner - -from uipath._cli._auth._portal_service import PortalService -from uipath.platform.common import TokenData -from uipath.runtime.errors import UiPathRuntimeError - - -@pytest.fixture -def mock_auth_config(): - """Mock auth config fixture.""" - return { - "client_id": "test_client_id", - "port": 8104, - "redirect_uri": "http://localhost:8104/callback", - "scope": "openid profile offline_access", - } - - -@pytest.fixture -def sample_token_data(): - """Sample token data for testing.""" - return { - "access_token": "new_access_token_123", - "refresh_token": "new_refresh_token_456", - "expires_in": 3600, - "token_type": "Bearer", - "scope": "openid profile offline_access", - "id_token": "id_token_789", - } - - -@pytest.fixture -def expired_auth_data(): - """Sample expired auth data.""" - return { - "access_token": "old_access_token", - "refresh_token": "valid_refresh_token", - "expires_in": 3600, - "token_type": "Bearer", - "scope": "openid profile offline_access", - "id_token": "old_id_token", - } - - -@pytest.fixture -def valid_auth_data(): - """Sample valid (non-expired) auth data.""" - future_time = time.time() + 3600 # 1 hour in the future - return { - "access_token": "valid_access_token", - "refresh_token": "valid_refresh_token", - "expires_in": 3600, - "token_type": "Bearer", - "scope": "openid profile offline_access", - "id_token": "valid_id_token", - "exp": future_time, - } - - -class TestPortalServiceEnsureValidToken: - """Test class for PortalService ensure_valid_token functionality.""" - - @pytest.mark.parametrize( - "domain, expected_token_url", - [ - ( - "https://cloud.uipath.com", - "https://cloud.uipath.com/identity_/connect/token", - ), - ( - "https://alpha.uipath.com", - "https://alpha.uipath.com/identity_/connect/token", - ), - ( - "https://staging.uipath.com", - "https://staging.uipath.com/identity_/connect/token", - ), - ( - "https://custom.automationsuite.org", - "https://custom.automationsuite.org/identity_/connect/token", - ), - ], - ) - def test_ensure_valid_token_refresh_flow_different_domains( - self, - domain, - expected_token_url, - mock_auth_config, - sample_token_data, - expired_auth_data, - ): - """Test ensure_valid_token refresh flow with different domain configurations.""" - - # Mock the token as expired - past_time = time.time() - 3600 # 1 hour ago - expired_token_claims = {"exp": past_time, "prt_id": "test_prt_id"} - - with ( - patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ), - patch( - "uipath._cli._auth._portal_service.get_auth_data", - return_value=TokenData.model_validate(expired_auth_data), - ), - patch( - "uipath._cli._auth._portal_service.get_parsed_token_data", - return_value=expired_token_claims, - ), - patch( - "uipath._cli._auth._portal_service.update_auth_file" - ) as mock_update_auth, - patch( - "uipath._cli._auth._portal_service.update_env_file" - ) as mock_update_env, - patch.object(PortalService, "_select_tenant"), - ): - # Create a mock HTTP client - mock_client = Mock() - - # Mock the refresh token response - mock_refresh_response = Mock() - mock_refresh_response.status_code = 200 - mock_refresh_response.json.return_value = sample_token_data - - # Mock the tenants response - mock_tenants_response = Mock() - mock_tenants_response.status_code = 200 - mock_tenants_response.json.return_value = { - "tenants": [{"name": "DefaultTenant", "id": "tenant-id"}], - "organization": {"name": "DefaultOrg", "id": "org-id"}, - } - - mock_client.post.return_value = mock_refresh_response - mock_client.get.return_value = mock_tenants_response - - # Create PortalService instance - portal_service = PortalService(domain) - portal_service._client = mock_client - - # Test ensure_valid_token - portal_service.ensure_valid_token() - - # Verify refresh token request was made to correct URL - mock_client.post.assert_called_with( - expected_token_url, - data={ - "grant_type": "refresh_token", - "refresh_token": expired_auth_data["refresh_token"], - "client_id": mock_auth_config["client_id"], - }, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - - # Verify auth file was updated - mock_update_auth.assert_called_once() - call_args = mock_update_auth.call_args[0][0] - assert call_args.access_token == sample_token_data["access_token"] - - # Verify env file was updated - mock_update_env.assert_called_with( - { - "UIPATH_ACCESS_TOKEN": sample_token_data["access_token"], - } - ) - - def test_ensure_valid_token_with_valid_token_and_uipath_url_set( - self, mock_auth_config, valid_auth_data - ): - """Test ensure_valid_token when token is still valid and UIPATH_URL is set.""" - - # Mock environment variable - os.environ["UIPATH_URL"] = "https://test.example.com/org/tenant" - - try: - runner = CliRunner() - with runner.isolated_filesystem(): - with ( - patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ), - patch( - "uipath._cli._auth._portal_service.get_auth_data", - return_value=TokenData.model_validate(valid_auth_data), - ), - patch( - "uipath._cli._auth._portal_service.get_parsed_token_data", - return_value=valid_auth_data, - ), - ): - # Create PortalService instance - portal_service = PortalService("cloud") - mock_client = Mock() - portal_service._client = mock_client - - # Test ensure_valid_token - portal_service.ensure_valid_token() - - # Verify no refresh request was made (token is still valid) - mock_client.post.assert_not_called() - - finally: - if "UIPATH_URL" in os.environ: - del os.environ["UIPATH_URL"] - - def test_ensure_valid_token_missing_refresh_token_raises_exception( - self, mock_auth_config - ): - """Test ensure_valid_token when refresh token is missing.""" - - # Auth data without refresh token - auth_data_no_refresh = { - "access_token": "old_access_token", - "expires_in": 3600, - "token_type": "Bearer", - "scope": "openid profile offline_access", - "id_token": "old_id_token", - } - - # Mock the token as expired - past_time = time.time() - 3600 - expired_token_claims = {"exp": past_time, "prt_id": "test_prt_id"} - - with ( - patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ), - patch( - "uipath._cli._auth._portal_service.get_auth_data", - return_value=TokenData.model_validate(auth_data_no_refresh), - ), - patch( - "uipath._cli._auth._portal_service.get_parsed_token_data", - return_value=expired_token_claims, - ), - ): - # Create PortalService instance - portal_service = PortalService("cloud") - portal_service._client = Mock() - - # Test should raise exception - with pytest.raises( - UiPathRuntimeError, match="The refresh token could not be retrieved" - ): - portal_service.ensure_valid_token() - - @pytest.mark.parametrize( - "env_vars, domain", - [ - ({"UIPATH_URL": "https://custom.automationsuite.org/org/tenant"}, "cloud"), - ({"UIPATH_URL": "https://custom.uipath.com/org/tenant"}, "alpha"), - ({}, "staging"), - ({}, "cloud"), - ], - ) - def test_ensure_valid_token_integration_with_auth_scenarios( - self, env_vars, domain, mock_auth_config, sample_token_data, expired_auth_data - ): - """Test ensure_valid_token integration with auth command scenarios.""" - - # Store original environment variables - original_env_vars = {} - for key in env_vars: - original_env_vars[key] = os.environ.get(key) - - try: - # Set test environment variables - for key, value in env_vars.items(): - os.environ[key] = value - - # Mock the token as expired - past_time = time.time() - 3600 - expired_token_claims = {"exp": past_time, "prt_id": "test_prt_id"} - - with ( - patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ), - patch( - "uipath._cli._auth._portal_service.get_auth_data", - return_value=TokenData.model_validate(expired_auth_data), - ), - patch( - "uipath._cli._auth._portal_service.get_parsed_token_data", - return_value=expired_token_claims, - ), - patch("uipath._cli._auth._portal_service.update_auth_file"), - patch("uipath._cli._auth._portal_service.update_env_file"), - patch.object(PortalService, "_select_tenant"), - ): - # Create a mock HTTP client - mock_client = Mock() - - # Mock the refresh token response - mock_refresh_response = Mock() - mock_refresh_response.status_code = 200 - mock_refresh_response.json.return_value = sample_token_data - - # Mock the tenants response - mock_tenants_response = Mock() - mock_tenants_response.status_code = 200 - mock_tenants_response.json.return_value = { - "tenants": [{"name": "DefaultTenant", "id": "tenant-id"}], - "organization": {"name": "DefaultOrg", "id": "org-id"}, - } - - mock_client.post.return_value = mock_refresh_response - mock_client.get.return_value = mock_tenants_response - - # Create PortalService instance - portal_service = PortalService(domain) - portal_service._client = mock_client - - # Test ensure_valid_token - portal_service.ensure_valid_token() - - # Verify refresh was attempted - assert mock_client.post.called - - finally: - # Restore original environment variables - for key, original_value in original_env_vars.items(): - if original_value is not None: - os.environ[key] = original_value - elif key in os.environ: - del os.environ[key] diff --git a/packages/uipath/tests/cli/test_portal_service_refresh_token.py b/packages/uipath/tests/cli/test_portal_service_refresh_token.py deleted file mode 100644 index 26f93cfd8..000000000 --- a/packages/uipath/tests/cli/test_portal_service_refresh_token.py +++ /dev/null @@ -1,498 +0,0 @@ -""" -Unit tests for the PortalService.post_refresh_token_request method. - -This test suite covers the following scenarios for the token refresh logic: - -1. **UIPATH_URL Environment Variable**: - Ensures the refresh token request correctly uses the domain from the UIPATH_URL - environment variable when calling the token endpoint. - -2. **Alpha, Staging, Cloud Domains**: - Verifies that the refresh token request works correctly with different UiPath domains - (alpha.uipath.com, staging.uipath.com, cloud.uipath.com). - -3. **Custom Domain**: - Tests that the refresh token request works with custom automation suite domains. - -4. **Error Handling**: - Tests proper error handling for various HTTP status codes (401, 500, etc.). - -5. **Client Initialization**: - Tests that the method properly handles uninitialized HTTP client scenarios. -""" - -import os -from unittest.mock import Mock, patch - -import httpx -import pytest - -from uipath._cli._auth._auth_service import AuthService -from uipath._cli._auth._portal_service import PortalService - - -@pytest.fixture -def mock_auth_config(): - """Mock auth config fixture.""" - return { - "client_id": "test_client_id", - "port": 8104, - "redirect_uri": "http://localhost:8104/callback", - "scope": "openid profile offline_access", - } - - -@pytest.fixture -def sample_token_data(): - """Sample token data for testing.""" - return { - "access_token": "new_access_token_123", - "refresh_token": "new_refresh_token_456", - "expires_in": 3600, - "token_type": "Bearer", - "scope": "openid profile offline_access", - "id_token": "id_token_789", - } - - -class TestPortalServiceRefreshToken: - """Test class for PortalService refresh token functionality.""" - - @pytest.mark.parametrize( - "environment, expected_token_url", - [ - # Standard UiPath domains - ("cloud", "https://cloud.uipath.com/identity_/connect/token"), - ("alpha", "https://alpha.uipath.com/identity_/connect/token"), - ("staging", "https://staging.uipath.com/identity_/connect/token"), - ], - ) - def test_post_refresh_token_request_different_domains( - self, environment, expected_token_url, mock_auth_config, sample_token_data - ): - """Test refresh token request with different domain configurations.""" - - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ): - # Create a mock HTTP client - mock_client = Mock(spec=httpx.Client) - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = sample_token_data - mock_client.post.return_value = mock_response - - # Create AuthService instance - auth_service = AuthService(environment=environment, force=False) - - # Create PortalService instance - portal_service = PortalService(auth_service._domain) - portal_service._client = mock_client - - # Test refresh token request - refresh_token = "test_refresh_token" - result = portal_service.refresh_access_token(refresh_token) - - # Verify the correct URL was called - mock_client.post.assert_called_once_with( - expected_token_url, - data={ - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "client_id": mock_auth_config["client_id"], - }, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - - # Verify the response - assert result.access_token == sample_token_data["access_token"] - assert result.refresh_token == sample_token_data["refresh_token"] - - @pytest.mark.parametrize( - "env_var_url, environment, expected_token_url", - [ - # UIPATH_URL should be used when environment is None (no flag specified) - ( - "https://custom.automationsuite.org/org/tenant", - None, - "https://custom.automationsuite.org/identity_/connect/token", - ), - ( - "https://mycompany.uipath.com/org/tenant/", - None, - "https://mycompany.uipath.com/identity_/connect/token", - ), - # Explicit environment flags should override UIPATH_URL - ( - "https://custom.automationsuite.org/org/tenant", - "alpha", - "https://alpha.uipath.com/identity_/connect/token", - ), - ( - "https://custom.automationsuite.org/org/tenant", - "staging", - "https://staging.uipath.com/identity_/connect/token", - ), - ( - "https://custom.automationsuite.org/org/tenant", - "cloud", - "https://cloud.uipath.com/identity_/connect/token", - ), - ], - ) - def test_post_refresh_token_request_with_uipath_url_env( - self, - env_var_url, - environment, - expected_token_url, - mock_auth_config, - sample_token_data, - ): - """Test refresh token request with UIPATH_URL environment variable.""" - - # Set the environment variable - original_env = os.environ.get("UIPATH_URL") - os.environ["UIPATH_URL"] = env_var_url - - try: - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ): - # Create a mock HTTP client - mock_client = Mock(spec=httpx.Client) - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = sample_token_data - mock_client.post.return_value = mock_response - - # Create AuthService instance - auth_service = AuthService(environment=environment, force=False) - - # Create PortalService instance - portal_service = PortalService(auth_service._domain) - portal_service._client = mock_client - - # Test refresh token request - refresh_token = "test_refresh_token" - result = portal_service.refresh_access_token(refresh_token) - - # Verify the correct URL was called - mock_client.post.assert_called_once_with( - expected_token_url, - data={ - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "client_id": mock_auth_config["client_id"], - }, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - - # Verify the response - assert result.access_token == sample_token_data["access_token"] - assert result.refresh_token == sample_token_data["refresh_token"] - - finally: - # Clean up environment variable - if original_env is not None: - os.environ["UIPATH_URL"] = original_env - elif "UIPATH_URL" in os.environ: - del os.environ["UIPATH_URL"] - - def test_post_refresh_token_request_unauthorized(self, mock_auth_config): - """Test refresh token request with 401 Unauthorized response.""" - - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ): - with patch( - "uipath._cli._auth._portal_service.ConsoleLogger" - ) as mock_logger_cls: - mock_console = Mock() - mock_console.error.side_effect = SystemExit(1) - mock_logger_cls.return_value = mock_console - - # Create a mock HTTP client - mock_client = Mock(spec=httpx.Client) - mock_response = Mock() - mock_response.status_code = 401 - mock_client.post.return_value = mock_response - - # Create AuthService instance - auth_service = AuthService(environment="cloud", force=False) - - # Create PortalService instance - portal_service = PortalService(auth_service._domain) - portal_service._client = mock_client - - # Test refresh token request - should raise exception due to console.error - with pytest.raises(SystemExit): - portal_service.refresh_access_token("test_refresh_token") - - # Verify error was logged - mock_console.error.assert_called_once_with("Unauthorized") - - def test_post_refresh_token_request_server_error(self, mock_auth_config): - """Test refresh token request with 500 server error response.""" - - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ): - with patch( - "uipath._cli._auth._portal_service.ConsoleLogger" - ) as mock_logger_cls: - mock_console = Mock() - mock_console.error.side_effect = SystemExit(1) - mock_logger_cls.return_value = mock_console - - # Create a mock HTTP client - mock_client = Mock(spec=httpx.Client) - mock_response = Mock() - mock_response.status_code = 500 - mock_client.post.return_value = mock_response - - # Create AuthService instance - auth_service = AuthService(environment="cloud", force=False) - - # Create PortalService instance - portal_service = PortalService(auth_service._domain) - portal_service._client = mock_client - - # Test refresh token request - should raise exception due to console.error - with pytest.raises(SystemExit): - portal_service.refresh_access_token("test_refresh_token") - - # Verify error was logged - mock_console.error.assert_called_once_with( - "Failed to refresh token: 500" - ) - - def test_refresh_token_lazy_initializes_client( - self, mock_auth_config, sample_token_data - ): - portal_service = PortalService("https://cloud.uipath.com") - assert portal_service._client is None - - with patch("httpx.Client") as mock_client_cls: - mock_client = Mock() - mock_client.post.return_value.status_code = 200 - mock_client.post.return_value.json.return_value = sample_token_data - mock_client_cls.return_value = mock_client - - result = portal_service.refresh_access_token("test_refresh_token") - - assert portal_service._client is mock_client - assert result.access_token == sample_token_data["access_token"] - - def test_post_refresh_token_request_success_response_format( - self, mock_auth_config, sample_token_data - ): - """Test that successful refresh token request returns proper TokenData format.""" - - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ): - # Create a mock HTTP client - mock_client = Mock(spec=httpx.Client) - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = sample_token_data - mock_client.post.return_value = mock_response - - # Create AuthService instance - auth_service = AuthService(environment="cloud", force=False) - - # Create PortalService instance - portal_service = PortalService(auth_service._domain) - portal_service._client = mock_client - - # Test refresh token request - result = portal_service.refresh_access_token("test_refresh_token") - - # Verify result is a TokenData model with all expected fields - assert result.access_token is not None - assert result.refresh_token is not None - assert result.expires_in is not None - assert result.token_type is not None - assert result.scope is not None - assert result.id_token is not None - - # Verify values match expected - assert result.access_token == sample_token_data["access_token"] - assert result.refresh_token == sample_token_data["refresh_token"] - - def test_post_refresh_token_request_malformed_domain_handling( - self, mock_auth_config, sample_token_data - ): - """Test refresh token request with various domain formats.""" - - test_cases = [ - # Domain with trailing slash should not create double slash - ( - "https://example.uipath.com/", - None, - "https://example.uipath.com/identity_/connect/token", - ), - # Domain without scheme gets .uipath.com appended (current behavior) - ( - "https://example.com/", - "example", - "https://example.uipath.com/identity_/connect/token", - ), - # Domain with path should use base only - ( - "https://example.com/some/path", - "example", - "https://example.uipath.com/identity_/connect/token", - ), - ] - - for uipath_url, environment, expected_url in test_cases: - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ): - os.environ["UIPATH_URL"] = uipath_url - - # Create a mock HTTP client - mock_client = Mock(spec=httpx.Client) - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = sample_token_data - mock_client.post.return_value = mock_response - - # Create AuthService instance - auth_service = AuthService(environment=environment, force=False) - - # Create PortalService instance - portal_service = PortalService(auth_service._domain) - portal_service._client = mock_client - - # Test refresh token request - portal_service.refresh_access_token("test_refresh_token") - - # Verify the correct URL was called - mock_client.post.assert_called_with( - expected_url, - data={ - "grant_type": "refresh_token", - "refresh_token": "test_refresh_token", - "client_id": mock_auth_config["client_id"], - }, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - - # Reset mock for next iteration - mock_client.reset_mock() - - @pytest.mark.parametrize( - "scenario_name, env_vars, environment, expected_token_url", - [ - # These scenarios mirror the test_auth.py test cases but focus on the refresh token endpoint - ( - "refresh_with_uipath_url_env_variable", - {"UIPATH_URL": "https://custom.automationsuite.org/org/tenant"}, - None, # None when no flag specified - "https://custom.automationsuite.org/identity_/connect/token", - ), - ( - "refresh_with_uipath_url_env_variable_with_trailing_slash", - {"UIPATH_URL": "https://custom.uipath.com/org/tenant/"}, - None, - "https://custom.uipath.com/identity_/connect/token", - ), - ( - "refresh_with_alpha_flag_overrides_env", - {"UIPATH_URL": "https://custom.uipath.com/org/tenant"}, - "alpha", # alpha flag overrides UIPATH_URL - "https://alpha.uipath.com/identity_/connect/token", - ), - ( - "refresh_with_staging_flag_overrides_env", - {"UIPATH_URL": "https://custom.uipath.com/org/tenant"}, - "staging", # staging flag overrides UIPATH_URL - "https://staging.uipath.com/identity_/connect/token", - ), - ( - "refresh_with_cloud_flag_overrides_env", - {"UIPATH_URL": "https://custom.uipath.com/org/tenant"}, - "cloud", # cloud flag overrides UIPATH_URL - "https://cloud.uipath.com/identity_/connect/token", - ), - ( - "refresh_default_to_cloud", - {}, - None, # None defaults to cloud - "https://cloud.uipath.com/identity_/connect/token", - ), - ], - ) - def test_post_refresh_token_request_auth_scenarios_integration( - self, - scenario_name, - env_vars, - environment, - expected_token_url, - mock_auth_config, - sample_token_data, - ): - """Test refresh token request integration with all auth command scenarios from test_auth.py.""" - - # Store original environment variables - original_env_vars = {} - for key in env_vars: - original_env_vars[key] = os.environ.get(key) - - try: - # Set test environment variables - for key, value in env_vars.items(): - os.environ[key] = value - - with patch( - "uipath._cli._auth._oidc_utils.OidcUtils.get_auth_config", - return_value=mock_auth_config, - ): - # Create a mock HTTP client - mock_client = Mock(spec=httpx.Client) - mock_response = Mock() - mock_response.status_code = 200 - mock_response.json.return_value = sample_token_data - mock_client.post.return_value = mock_response - - # Create AuthService instance - auth_service = AuthService(environment=environment, force=False) - - # Create PortalService instance with the domain that would be determined - # by the auth command logic - portal_service = PortalService(auth_service._domain) - portal_service._client = mock_client - - # Test refresh token request - result = portal_service.refresh_access_token("test_refresh_token") - - # Verify the correct URL was called - mock_client.post.assert_called_once_with( - expected_token_url, - data={ - "grant_type": "refresh_token", - "refresh_token": "test_refresh_token", - "client_id": mock_auth_config["client_id"], - }, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - ) - - # Verify the response - assert result.access_token == sample_token_data["access_token"] - assert result.refresh_token == sample_token_data["refresh_token"] - - finally: - # Restore original environment variables - for key, original_value in original_env_vars.items(): - if original_value is not None: - os.environ[key] = original_value - elif key in os.environ: - del os.environ[key]