From cc0f553a195dcd39d429412d086048e7c0d64b3f Mon Sep 17 00:00:00 2001 From: Parker Henderson Date: Wed, 8 Apr 2026 14:58:09 -0700 Subject: [PATCH 1/6] refactor(cli): extract shared project command context --- src/experiments/mod.rs | 36 +++++++++------------ src/functions/mod.rs | 52 ++++++++++-------------------- src/main.rs | 1 + src/project_context.rs | 71 +++++++++++++++++++++++++++++++++++++++++ src/projects/context.rs | 44 ------------------------- src/projects/mod.rs | 1 - src/prompts/mod.rs | 35 ++++++++------------ 7 files changed, 118 insertions(+), 122 deletions(-) create mode 100644 src/project_context.rs delete mode 100644 src/projects/context.rs diff --git a/src/experiments/mod.rs b/src/experiments/mod.rs index 4908032..c03fb81 100644 --- a/src/experiments/mod.rs +++ b/src/experiments/mod.rs @@ -4,7 +4,7 @@ use clap::{Args, Subcommand}; use crate::{ args::BaseArgs, http::ApiClient, - projects::context::{resolve_project_context, ProjectContext}, + project_context::resolve_project_command_context_with_auth_mode, ui::{self, with_spinner}, }; @@ -15,7 +15,7 @@ mod view; use api::{self as experiments_api, Experiment}; -pub(crate) type ResolvedContext = ProjectContext; +pub(crate) use crate::project_context::ProjectContext as ResolvedContext; #[derive(Debug, Clone, Args)] #[command(after_help = "\ @@ -106,33 +106,27 @@ pub(crate) async fn select_experiment_interactive( } pub async fn run(base: BaseArgs, args: ExperimentsArgs) -> Result<()> { + let read_only = experiments_command_is_read_only(args.command.as_ref()); + let ctx = resolve_project_command_context_with_auth_mode(&base, read_only).await?; + match args.command { - None | Some(ExperimentsCommands::List) => { - let ctx = resolve_project_context(&base, true).await?; - list::run(&ctx, base.json).await - } - Some(ExperimentsCommands::View(v)) => { - let ctx = resolve_project_context(&base, true).await?; - view::run(&ctx, v.name(), base.json, v.web).await - } - Some(ExperimentsCommands::Delete(d)) => { - let ctx = resolve_project_context(&base, false).await?; - delete::run(&ctx, d.name(), d.force).await - } + None | Some(ExperimentsCommands::List) => list::run(&ctx, base.json).await, + Some(ExperimentsCommands::View(v)) => view::run(&ctx, v.name(), base.json, v.web).await, + Some(ExperimentsCommands::Delete(d)) => delete::run(&ctx, d.name(), d.force).await, } } +fn experiments_command_is_read_only(command: Option<&ExperimentsCommands>) -> bool { + matches!( + command, + None | Some(ExperimentsCommands::List) | Some(ExperimentsCommands::View(_)) + ) +} + #[cfg(test)] mod tests { use super::*; - fn experiments_command_is_read_only(command: Option<&ExperimentsCommands>) -> bool { - matches!( - command, - None | Some(ExperimentsCommands::List) | Some(ExperimentsCommands::View(_)) - ) - } - #[test] fn experiments_routes_list_and_view_to_read_only_auth() { assert!(experiments_command_is_read_only(None)); diff --git a/src/functions/mod.rs b/src/functions/mod.rs index 845c3ef..da6def6 100644 --- a/src/functions/mod.rs +++ b/src/functions/mod.rs @@ -1,15 +1,15 @@ use std::path::PathBuf; -use anyhow::{anyhow, bail, Result}; +use anyhow::{bail, Result}; use clap::{builder::BoolishValueParser, Args, Subcommand, ValueEnum}; use crate::{ args::BaseArgs, auth::{login, AvailableOrg}, - config, http::ApiClient, - projects::api::{get_project_by_name, Project}, - ui::{self, is_interactive, select_project_interactive, with_spinner}, + project_context::{resolve_project_optional, resolve_required_project}, + projects::api::Project, + ui::{self, with_spinner}, }; pub(crate) mod api; @@ -456,11 +456,7 @@ pub(crate) struct AuthContext { pub org_id: String, } -pub(crate) struct ResolvedContext { - pub client: ApiClient, - pub app_url: String, - pub project: Project, -} +pub(crate) use crate::project_context::ProjectContext as ResolvedContext; pub(crate) async fn resolve_auth_context(base: &BaseArgs) -> Result { let ctx = login(base).await?; @@ -512,9 +508,7 @@ pub(crate) async fn resolve_project_context( base: &BaseArgs, auth_ctx: &AuthContext, ) -> Result { - resolve_project_context_optional(base, auth_ctx, true) - .await? - .ok_or_else(|| anyhow!("--project required (or set BRAINTRUST_DEFAULT_PROJECT)")) + resolve_required_project(base, &auth_ctx.client, true).await } pub(crate) async fn resolve_project_context_optional( @@ -522,22 +516,7 @@ pub(crate) async fn resolve_project_context_optional( auth_ctx: &AuthContext, allow_interactive_selection: bool, ) -> Result> { - let config_project = config::load().ok().and_then(|c| c.project); - let project_name = match base.project.as_deref().or(config_project.as_deref()) { - Some(p) => Some(p.to_string()), - None if allow_interactive_selection && is_interactive() => { - Some(select_project_interactive(&auth_ctx.client, None, None).await?) - } - None => None, - }; - - match project_name { - Some(project_name) => get_project_by_name(&auth_ctx.client, &project_name) - .await? - .map(Some) - .ok_or_else(|| anyhow!("project '{project_name}' not found")), - None => Ok(None), - } + resolve_project_optional(base, &auth_ctx.client, allow_interactive_selection).await } async fn resolve_context(base: &BaseArgs) -> Result { @@ -995,13 +974,15 @@ mod tests { .expect("parse"); assert!(function_command_is_read_only(parsed.args.command.as_ref())); - let parsed = - FunctionArgsHarness::try_parse_from(["bt-tools", "delete", "--slug", "my-tool", "--force"]) - .expect("parse"); + let parsed = FunctionArgsHarness::try_parse_from([ + "bt-tools", "delete", "--slug", "my-tool", "--force", + ]) + .expect("parse"); assert!(!function_command_is_read_only(parsed.args.command.as_ref())); - let parsed = FunctionArgsHarness::try_parse_from(["bt-tools", "invoke", "--slug", "my-tool"]) - .expect("parse"); + let parsed = + FunctionArgsHarness::try_parse_from(["bt-tools", "invoke", "--slug", "my-tool"]) + .expect("parse"); assert!(!function_command_is_read_only(parsed.args.command.as_ref())); } @@ -1014,8 +995,9 @@ mod tests { let parsed = FunctionsArgsHarness::try_parse_from(["bt-functions", "list"]).expect("parse"); assert!(functions_command_is_read_only(parsed.args.command.as_ref())); - let parsed = FunctionsArgsHarness::try_parse_from(["bt-functions", "view", "--slug", "my-fn"]) - .expect("parse"); + let parsed = + FunctionsArgsHarness::try_parse_from(["bt-functions", "view", "--slug", "my-fn"]) + .expect("parse"); assert!(functions_command_is_read_only(parsed.args.command.as_ref())); let parsed = FunctionsArgsHarness::try_parse_from([ diff --git a/src/main.rs b/src/main.rs index d5b838b..b9ac457 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ mod functions; mod http; mod init; mod js_runner; +mod project_context; mod projects; mod prompts; mod python_runner; diff --git a/src/project_context.rs b/src/project_context.rs new file mode 100644 index 0000000..86d9e82 --- /dev/null +++ b/src/project_context.rs @@ -0,0 +1,71 @@ +use anyhow::{anyhow, Result}; + +use crate::{ + args::BaseArgs, + auth::{login, login_read_only}, + config, + http::ApiClient, + projects::api::{get_project_by_name, Project}, + ui::{is_interactive, select_project_interactive}, +}; + +pub(crate) struct ProjectContext { + pub client: ApiClient, + pub app_url: String, + pub project: Project, +} + +pub(crate) async fn resolve_project_optional( + base: &BaseArgs, + client: &ApiClient, + allow_interactive_selection: bool, +) -> Result> { + let config_project = config::load().ok().and_then(|config| config.project); + let project_name = match base.project.as_deref().or(config_project.as_deref()) { + Some(project_name) => Some(project_name.to_string()), + None if allow_interactive_selection && is_interactive() => { + Some(select_project_interactive(client, None, None).await?) + } + None => None, + }; + + match project_name { + Some(project_name) => get_project_by_name(client, &project_name) + .await? + .map(Some) + .ok_or_else(|| anyhow!("project '{project_name}' not found")), + None => Ok(None), + } +} + +pub(crate) async fn resolve_required_project( + base: &BaseArgs, + client: &ApiClient, + allow_interactive_selection: bool, +) -> Result { + resolve_project_optional(base, client, allow_interactive_selection) + .await? + .ok_or_else(|| anyhow!("--project required (or set BRAINTRUST_DEFAULT_PROJECT)")) +} + +pub(crate) async fn resolve_project_command_context(base: &BaseArgs) -> Result { + resolve_project_command_context_with_auth_mode(base, false).await +} + +pub(crate) async fn resolve_project_command_context_with_auth_mode( + base: &BaseArgs, + read_only: bool, +) -> Result { + let auth = if read_only { + login_read_only(base).await? + } else { + login(base).await? + }; + let client = ApiClient::new(&auth)?; + let project = resolve_required_project(base, &client, true).await?; + Ok(ProjectContext { + client, + app_url: auth.app_url, + project, + }) +} diff --git a/src/projects/context.rs b/src/projects/context.rs deleted file mode 100644 index e635267..0000000 --- a/src/projects/context.rs +++ /dev/null @@ -1,44 +0,0 @@ -use anyhow::{anyhow, bail, Result}; - -use crate::{ - args::BaseArgs, - auth::{login, login_read_only}, - config, - http::ApiClient, - ui::{is_interactive, select_project_interactive}, -}; - -use super::api::{get_project_by_name, Project}; - -pub(crate) struct ProjectContext { - pub client: ApiClient, - pub app_url: String, - pub project: Project, -} - -pub(crate) async fn resolve_project_context( - base: &BaseArgs, - read_only: bool, -) -> Result { - let auth = if read_only { - login_read_only(base).await? - } else { - login(base).await? - }; - let client = ApiClient::new(&auth)?; - let config_project = config::load().ok().and_then(|c| c.project); - let project_name = match base.project.as_deref().or(config_project.as_deref()) { - Some(p) => p.to_string(), - None if is_interactive() => select_project_interactive(&client, None, None).await?, - None => bail!("--project required (or set BRAINTRUST_DEFAULT_PROJECT)"), - }; - let project = get_project_by_name(&client, &project_name) - .await? - .ok_or_else(|| anyhow!("project '{project_name}' not found"))?; - - Ok(ProjectContext { - client, - app_url: auth.app_url, - project, - }) -} diff --git a/src/projects/mod.rs b/src/projects/mod.rs index 2ee1dd8..10fd85d 100644 --- a/src/projects/mod.rs +++ b/src/projects/mod.rs @@ -6,7 +6,6 @@ use crate::auth::login; use crate::http::ApiClient; pub(crate) mod api; -pub(crate) mod context; mod create; mod delete; mod list; diff --git a/src/prompts/mod.rs b/src/prompts/mod.rs index 0a87fde..96c60f4 100644 --- a/src/prompts/mod.rs +++ b/src/prompts/mod.rs @@ -1,12 +1,9 @@ use anyhow::Result; use clap::{Args, Subcommand}; -use crate::{ - args::BaseArgs, - projects::context::{resolve_project_context, ProjectContext}, -}; +use crate::{args::BaseArgs, project_context::resolve_project_command_context_with_auth_mode}; -pub(crate) type ResolvedContext = ProjectContext; +pub(crate) use crate::project_context::ProjectContext as ResolvedContext; mod api; mod delete; @@ -86,33 +83,29 @@ impl DeleteArgs { } pub async fn run(base: BaseArgs, args: PromptsArgs) -> Result<()> { + let read_only = prompts_command_is_read_only(args.command.as_ref()); + let ctx = resolve_project_command_context_with_auth_mode(&base, read_only).await?; + match args.command { - None | Some(PromptsCommands::List) => { - let ctx = resolve_project_context(&base, true).await?; - list::run(&ctx, base.json).await - } + None | Some(PromptsCommands::List) => list::run(&ctx, base.json).await, Some(PromptsCommands::View(p)) => { - let ctx = resolve_project_context(&base, true).await?; view::run(&ctx, p.slug(), base.json, p.web, p.verbose).await } - Some(PromptsCommands::Delete(p)) => { - let ctx = resolve_project_context(&base, false).await?; - delete::run(&ctx, p.slug(), p.force).await - } + Some(PromptsCommands::Delete(p)) => delete::run(&ctx, p.slug(), p.force).await, } } +fn prompts_command_is_read_only(command: Option<&PromptsCommands>) -> bool { + matches!( + command, + None | Some(PromptsCommands::List) | Some(PromptsCommands::View(_)) + ) +} + #[cfg(test)] mod tests { use super::*; - fn prompts_command_is_read_only(command: Option<&PromptsCommands>) -> bool { - matches!( - command, - None | Some(PromptsCommands::List) | Some(PromptsCommands::View(_)) - ) - } - #[test] fn prompts_routes_list_and_view_to_read_only_auth() { assert!(prompts_command_is_read_only(None)); From bfb711b5f3cfe6e17ec22173bc641e4a10a3564b Mon Sep 17 00:00:00 2001 From: Parker Henderson Date: Wed, 8 Apr 2026 14:59:45 -0700 Subject: [PATCH 2/6] feat(datasets): add remote dataset management commands --- README.md | 41 +- src/datasets/api.rs | 196 +++++++ src/datasets/create.rs | 87 +++ src/datasets/delete.rs | 62 ++ src/datasets/list.rs | 63 +++ src/datasets/mod.rs | 463 +++++++++++++++ src/datasets/records.rs | 476 ++++++++++++++++ src/datasets/refresh.rs | 114 ++++ src/datasets/upload.rs | 123 ++++ src/datasets/view.rs | 190 +++++++ src/http.rs | 4 + src/main.rs | 6 + .../create-inputs/fixture.json | 63 +++ .../create-inputs/rows.jsonl | 2 + .../upload-refresh/fixture.json | 77 +++ .../upload-refresh/refresh.jsonl | 2 + .../upload-refresh/upload.jsonl | 2 + tests/datasets.rs | 531 ++++++++++++++++++ 18 files changed, 2489 insertions(+), 13 deletions(-) create mode 100644 src/datasets/api.rs create mode 100644 src/datasets/create.rs create mode 100644 src/datasets/delete.rs create mode 100644 src/datasets/list.rs create mode 100644 src/datasets/mod.rs create mode 100644 src/datasets/records.rs create mode 100644 src/datasets/refresh.rs create mode 100644 src/datasets/upload.rs create mode 100644 src/datasets/view.rs create mode 100644 tests/datasets-fixtures/create-inputs/fixture.json create mode 100644 tests/datasets-fixtures/create-inputs/rows.jsonl create mode 100644 tests/datasets-fixtures/upload-refresh/fixture.json create mode 100644 tests/datasets-fixtures/upload-refresh/refresh.jsonl create mode 100644 tests/datasets-fixtures/upload-refresh/upload.jsonl create mode 100644 tests/datasets.rs diff --git a/README.md b/README.md index 92a972e..caaa770 100644 --- a/README.md +++ b/README.md @@ -108,19 +108,20 @@ Remove-Item -Recurse -Force (Join-Path $env:APPDATA "bt") -ErrorAction SilentlyC ## Commands -| Command | Description | -| ---------------- | ------------------------------------------------------------------ | -| `bt init` | Initialize `.bt/` config directory and link to a project | -| `bt auth` | Authenticate with Braintrust | -| `bt switch` | Switch org and project context | -| `bt status` | Show current org and project context | -| `bt eval` | Run eval files (Unix only) | -| `bt sql` | Run SQL queries against Braintrust | -| `bt view` | View logs, traces, and spans | -| `bt projects` | Manage projects (list, create, view, delete) | -| `bt prompts` | Manage prompts (list, view, delete) | -| `bt sync` | Synchronize project logs between Braintrust and local NDJSON files | -| `bt self update` | Update bt in-place | +| Command | Description | +| ---------------- | -------------------------------------------------------------------- | +| `bt init` | Initialize `.bt/` config directory and link to a project | +| `bt auth` | Authenticate with Braintrust | +| `bt switch` | Switch org and project context | +| `bt status` | Show current org and project context | +| `bt eval` | Run eval files (Unix only) | +| `bt sql` | Run SQL queries against Braintrust | +| `bt view` | View logs, traces, and spans | +| `bt projects` | Manage projects (list, create, view, delete) | +| `bt datasets` | Manage remote datasets (list, create, upload, refresh, view, delete) | +| `bt prompts` | Manage prompts (list, view, delete) | +| `bt sync` | Synchronize project logs between Braintrust and local NDJSON files | +| `bt self update` | Update bt in-place | ## `bt eval` @@ -151,6 +152,20 @@ Use `--` to forward extra arguments to the eval file via `process.argv`: bt eval foo.eval.ts -- --description "Prod" --shard=1/4 ``` +## `bt datasets` + +- `bt datasets` works directly against remote Braintrust datasets — no local `bt sync` artifact flow is required. +- `bt datasets create my-dataset` — create an empty remote dataset in the current project. +- `bt datasets create my-dataset --file records.jsonl` — create the remote dataset and seed it from a JSON/JSONL file. +- `cat records.jsonl | bt datasets create my-dataset` — create the dataset and seed it from stdin. +- `bt datasets create my-dataset --rows '[{"id":"case-1","input":{"text":"hi"},"expected":"hello"}]'` — create the dataset from inline JSON rows. +- `bt datasets add my-dataset --file records.jsonl` — add rows to an existing remote dataset. +- `bt datasets append my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]'` — alias for `add`/`upload` when you want to append rows explicitly. +- `bt datasets upload my-dataset --file records.jsonl` — legacy-compatible alias for `add`. +- `bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id --prune` — deterministically upsert rows by stable record id and optionally prune stale remote rows. +- `bt datasets view my-dataset` — show dataset metadata and the important row fields by default; pass `--verbose` to inspect full row payloads. +- Accepted row fields for create/upload/update/refresh are `id` (or your `--id-field` path), `input`, `expected`, `metadata`, and `tags`. + ## `bt sql` - Runs interactively on TTY by default. diff --git a/src/datasets/api.rs b/src/datasets/api.rs new file mode 100644 index 0000000..8aece2e --- /dev/null +++ b/src/datasets/api.rs @@ -0,0 +1,196 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use urlencoding::encode; + +use crate::http::ApiClient; + +const MAX_DATASET_ROWS_PAGE_LIMIT: usize = 1000; +const DATASET_ROWS_SINCE: &str = "1970-01-01T00:00:00Z"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Dataset { + pub id: String, + pub name: String, + #[serde(default)] + pub project_id: Option, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub created: Option, + #[serde(default)] + pub created_at: Option, + #[serde(default)] + pub metadata: Option, +} + +pub type DatasetRow = Map; + +impl Dataset { + pub fn description_text(&self) -> Option<&str> { + self.description + .as_deref() + .filter(|description| !description.is_empty()) + .or_else(|| { + self.metadata + .as_ref() + .and_then(|metadata| metadata.get("description")) + .and_then(|description| description.as_str()) + .filter(|description| !description.is_empty()) + }) + } + + pub fn created_text(&self) -> Option<&str> { + self.created + .as_deref() + .filter(|created| !created.is_empty()) + .or_else(|| { + self.created_at + .as_deref() + .filter(|created| !created.is_empty()) + }) + } +} + +#[derive(Debug, Deserialize)] +struct ListResponse { + objects: Vec, +} + +#[derive(Debug, Deserialize)] +struct DatasetRowsResponse { + #[serde(default)] + data: Vec, + #[serde(default)] + cursor: Option, +} + +pub async fn list_datasets(client: &ApiClient, project_id: &str) -> Result> { + let path = format!( + "/v1/dataset?org_name={}&project_id={}", + encode(client.org_name()), + encode(project_id) + ); + let list: ListResponse = client.get(&path).await?; + Ok(list.objects) +} + +pub async fn get_dataset_by_name( + client: &ApiClient, + project_id: &str, + name: &str, +) -> Result> { + let datasets = list_datasets(client, project_id).await?; + Ok(datasets.into_iter().find(|dataset| dataset.name == name)) +} + +pub async fn list_dataset_rows(client: &ApiClient, dataset_id: &str) -> Result> { + let mut rows = Vec::new(); + let mut cursor: Option = None; + + loop { + let query = + build_dataset_rows_query(dataset_id, MAX_DATASET_ROWS_PAGE_LIMIT, cursor.as_deref()); + let body = serde_json::json!({ + "query": query, + "fmt": "json", + }); + let org_name = client.org_name(); + let headers = if !org_name.is_empty() { + vec![("x-bt-org-name", org_name)] + } else { + Vec::new() + }; + let response: DatasetRowsResponse = + client.post_with_headers("/btql", &body, &headers).await?; + + if response.data.is_empty() { + break; + } + + rows.extend(response.data); + + let next_cursor = response.cursor.filter(|cursor| !cursor.is_empty()); + if next_cursor.is_none() { + break; + } + cursor = next_cursor; + } + + Ok(rows) +} + +pub async fn create_dataset(client: &ApiClient, project_id: &str, name: &str) -> Result { + let body = serde_json::json!({ + "name": name, + "project_id": project_id, + "org_name": client.org_name(), + }); + client.post("/v1/dataset", &body).await +} + +pub async fn get_or_create_dataset( + client: &ApiClient, + project_id: &str, + name: &str, +) -> Result<(Dataset, bool)> { + if let Some(dataset) = get_dataset_by_name(client, project_id, name).await? { + return Ok((dataset, false)); + } + + let dataset = create_dataset(client, project_id, name).await?; + Ok((dataset, true)) +} + +pub async fn delete_dataset(client: &ApiClient, dataset_id: &str) -> Result<()> { + let path = format!("/v1/dataset/{}", encode(dataset_id)); + client.delete(&path).await +} + +fn build_dataset_rows_query(dataset_id: &str, limit: usize, cursor: Option<&str>) -> String { + let cursor_clause = cursor + .map(|cursor| format!(" | cursor: {}", btql_quote(cursor))) + .unwrap_or_default(); + format!( + "select: * | from: dataset({}) | filter: created >= {} | limit: {}{}", + sql_quote(dataset_id), + btql_quote(DATASET_ROWS_SINCE), + limit, + cursor_clause + ) +} + +fn btql_quote(value: &str) -> String { + serde_json::to_string(value) + .unwrap_or_else(|_| format!("\"{}\"", value.replace('\\', "\\\\").replace('"', "\\\""))) +} + +fn sql_quote(value: &str) -> String { + format!("'{}'", value.replace('\'', "''")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn dataset_rows_query_includes_required_filter() { + let query = build_dataset_rows_query("dataset-id", 1000, None); + assert_eq!( + query, + "select: * | from: dataset('dataset-id') | filter: created >= \"1970-01-01T00:00:00Z\" | limit: 1000" + ); + } + + #[test] + fn dataset_rows_query_quotes_cursor() { + let query = build_dataset_rows_query("dataset-id", 200, Some("cursor-123")); + assert!(query.contains("limit: 200 | cursor: \"cursor-123\"")); + } + + #[test] + fn dataset_rows_query_escapes_dataset_id() { + let query = build_dataset_rows_query("dataset'with-quote", 10, None); + assert!(query.contains("from: dataset('dataset''with-quote')")); + } +} diff --git a/src/datasets/create.rs b/src/datasets/create.rs new file mode 100644 index 0000000..1eb30eb --- /dev/null +++ b/src/datasets/create.rs @@ -0,0 +1,87 @@ +use std::{path::Path, time::Duration}; + +use anyhow::{bail, Result}; +use serde_json::json; + +use crate::ui::{print_command_status, with_spinner, with_spinner_visible, CommandStatus}; + +use super::{api, records::load_optional_upload_records, upload, ResolvedContext}; + +pub async fn run( + ctx: &ResolvedContext, + name: Option<&str>, + input_path: Option<&Path>, + inline_rows: Option<&str>, + id_field: &str, + json_output: bool, +) -> Result<()> { + let name = upload::resolve_dataset_name(name, "create")?; + + let exists = with_spinner( + "Checking dataset...", + api::get_dataset_by_name(&ctx.client, &ctx.project.id, &name), + ) + .await?; + if exists.is_some() { + bail!( + "dataset '{name}' already exists in project '{}'; use `bt datasets upload {name}` to add rows", + ctx.project.name + ); + } + + let records = load_optional_upload_records(input_path, inline_rows, id_field)?; + let uploaded = records.as_ref().map_or(0, |records| records.len()); + + let dataset = match with_spinner_visible( + "Creating dataset...", + api::create_dataset(&ctx.client, &ctx.project.id, &name), + Duration::from_millis(300), + ) + .await + { + Ok(dataset) => dataset, + Err(error) => { + print_command_status(CommandStatus::Error, &format!("Failed to create '{name}'")); + return Err(error); + } + }; + + if let Some(records) = records.as_ref() { + if let Err(error) = upload::submit_prepared_records( + ctx, + &dataset.id, + records, + "Uploading dataset rows...", + "dataset upload failed", + ) + .await + { + print_command_status( + CommandStatus::Error, + &format!("Created '{name}' but failed to upload initial rows"), + ); + return Err(error); + } + } + + if json_output { + println!( + "{}", + serde_json::to_string(&json!({ + "dataset": dataset, + "created_dataset": true, + "uploaded": uploaded, + "mode": "create", + }))? + ); + return Ok(()); + } + + let detail = if uploaded == 0 { + format!("Successfully created '{name}'") + } else { + format!("Created '{}' and uploaded {} records.", name, uploaded) + }; + print_command_status(CommandStatus::Success, &detail); + Ok(()) +} diff --git a/src/datasets/delete.rs b/src/datasets/delete.rs new file mode 100644 index 0000000..10b6b05 --- /dev/null +++ b/src/datasets/delete.rs @@ -0,0 +1,62 @@ +use anyhow::{anyhow, bail, Result}; +use dialoguer::Confirm; + +use crate::ui::{is_interactive, is_quiet, print_command_status, with_spinner, CommandStatus}; + +use super::{api, ResolvedContext}; + +pub async fn run(ctx: &ResolvedContext, name: Option<&str>, force: bool) -> Result<()> { + if force && name.is_none() { + bail!("name required when using --force. Use: bt datasets delete --force"); + } + + let dataset = match name { + Some(name) => api::get_dataset_by_name(&ctx.client, &ctx.project.id, name) + .await? + .ok_or_else(|| anyhow!("dataset '{name}' not found"))?, + None => { + if !is_interactive() { + bail!("dataset name required. Use: bt datasets delete "); + } + super::select_dataset_interactive(&ctx.client, &ctx.project.id).await? + } + }; + + if !force && is_interactive() { + let confirm = Confirm::new() + .with_prompt(format!( + "Delete dataset '{}' from {}?", + &dataset.name, &ctx.project.name + )) + .default(false) + .interact()?; + if !confirm { + return Ok(()); + } + } + + match with_spinner( + "Deleting dataset...", + api::delete_dataset(&ctx.client, &dataset.id), + ) + .await + { + Ok(_) => { + print_command_status( + CommandStatus::Success, + &format!("Deleted '{}'", dataset.name), + ); + if !is_quiet() { + eprintln!("Run `bt datasets list` to see remaining datasets."); + } + Ok(()) + } + Err(error) => { + print_command_status( + CommandStatus::Error, + &format!("Failed to delete '{}'", dataset.name), + ); + Err(error) + } + } +} diff --git a/src/datasets/list.rs b/src/datasets/list.rs new file mode 100644 index 0000000..12538be --- /dev/null +++ b/src/datasets/list.rs @@ -0,0 +1,63 @@ +use std::fmt::Write as _; + +use anyhow::Result; +use dialoguer::console; + +use crate::{ + ui::{apply_column_padding, header, print_with_pager, styled_table, truncate, with_spinner}, + utils::pluralize, +}; + +use super::{api, ResolvedContext}; + +pub async fn run(ctx: &ResolvedContext, json: bool) -> Result<()> { + let datasets = with_spinner( + "Loading datasets...", + api::list_datasets(&ctx.client, &ctx.project.id), + ) + .await?; + + if json { + println!("{}", serde_json::to_string(&datasets)?); + return Ok(()); + } + + let mut output = String::new(); + let count = format!( + "{} {}", + datasets.len(), + pluralize(datasets.len(), "dataset", None) + ); + writeln!( + output, + "{} found in {} {} {}\n", + console::style(count), + console::style(ctx.client.org_name()).bold(), + console::style("/").dim().bold(), + console::style(&ctx.project.name).bold() + )?; + + let mut table = styled_table(); + table.set_header(vec![ + header("Name"), + header("Description"), + header("Created"), + ]); + apply_column_padding(&mut table, (0, 6)); + + for dataset in &datasets { + let description = dataset + .description_text() + .map(|description| truncate(description, 60)) + .unwrap_or_else(|| "-".to_string()); + let created = dataset + .created_text() + .map(|created| truncate(created, 10)) + .unwrap_or_else(|| "-".to_string()); + table.add_row(vec![&dataset.name, &description, &created]); + } + + write!(output, "{table}")?; + print_with_pager(&output)?; + Ok(()) +} diff --git a/src/datasets/mod.rs b/src/datasets/mod.rs new file mode 100644 index 0000000..4caa51d --- /dev/null +++ b/src/datasets/mod.rs @@ -0,0 +1,463 @@ +use std::path::PathBuf; + +use anyhow::{bail, Result}; +use clap::{builder::BoolishValueParser, Args, Subcommand}; + +use crate::{ + args::BaseArgs, + http::ApiClient, + project_context::resolve_project_command_context, + ui::{self, with_spinner}, +}; + +pub(crate) mod api; +mod create; +mod delete; +mod list; +mod records; +mod refresh; +mod upload; +mod view; + +use api::{self as datasets_api, Dataset}; + +pub(crate) use crate::project_context::ProjectContext as ResolvedContext; + +#[derive(Debug, Clone, Args)] +struct DatasetNameArgs { + /// Dataset name (positional) + #[arg(value_name = "NAME", conflicts_with = "name_flag")] + name_positional: Option, + + /// Dataset name (flag) + #[arg(long = "name", short = 'n', conflicts_with = "name_positional")] + name_flag: Option, +} + +impl DatasetNameArgs { + fn name(&self) -> Option<&str> { + self.name_positional + .as_deref() + .or(self.name_flag.as_deref()) + } +} + +#[derive(Debug, Clone, Args)] +struct DatasetInputArgs { + /// JSON/JSONL input file. If omitted, bt reads dataset rows from --rows or stdin. + #[arg( + long, + env = "BT_DATASETS_FILE", + value_name = "PATH", + conflicts_with = "rows" + )] + file: Option, + + /// Inline dataset rows as JSON, such as an array of row objects. + #[arg( + long, + env = "BT_DATASETS_ROWS", + value_name = "JSON", + conflicts_with = "file" + )] + rows: Option, + + /// Dot-separated field path used to read stable record ids. + #[arg( + long, + env = "BT_DATASETS_ID_FIELD", + value_name = "PATH", + default_value = "id" + )] + id_field: String, +} + +#[derive(Debug, Clone, Args)] +#[command(after_help = r#"Examples: + bt datasets list + bt datasets create my-dataset + bt datasets create my-dataset --file records.jsonl + cat records.jsonl | bt datasets create my-dataset + bt datasets create my-dataset --rows '[{"id":"case-1","input":{"text":"hi"},"expected":"hello"}]' + bt datasets add my-dataset --file more-records.jsonl + bt datasets append my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]' + bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id --prune + bt datasets view my-dataset + bt datasets delete my-dataset +"#)] +pub struct DatasetsArgs { + #[command(subcommand)] + command: Option, +} + +#[derive(Debug, Clone, Subcommand)] +enum DatasetsCommands { + /// List all datasets + List, + /// Create a new dataset, optionally seeding rows from a file, --rows, or stdin + Create(CreateArgs), + /// Add rows to a remote Braintrust dataset + #[command(visible_aliases = ["add", "append", "update"])] + Upload(UploadArgs), + /// Deterministically refresh a remote Braintrust dataset by record id + Refresh(RefreshArgs), + /// View a dataset + View(ViewArgs), + /// Delete a dataset + Delete(DeleteArgs), +} + +#[derive(Debug, Clone, Args)] +struct CreateArgs { + #[command(flatten)] + name: DatasetNameArgs, + + #[command(flatten)] + input: DatasetInputArgs, +} + +impl CreateArgs { + fn name(&self) -> Option<&str> { + self.name.name() + } +} + +#[derive(Debug, Clone, Args)] +struct UploadArgs { + #[command(flatten)] + name: DatasetNameArgs, + + #[command(flatten)] + input: DatasetInputArgs, +} + +#[derive(Debug, Clone, Args)] +struct RefreshArgs { + #[command(flatten)] + name: DatasetNameArgs, + + #[command(flatten)] + input: DatasetInputArgs, + + /// Delete remote rows whose ids are not present in the input. + #[arg( + long, + env = "BT_DATASETS_REFRESH_PRUNE", + value_parser = BoolishValueParser::new(), + default_value_t = false + )] + prune: bool, +} + +#[derive(Debug, Clone, Args)] +struct ViewArgs { + #[command(flatten)] + name: DatasetNameArgs, + + /// Open in browser + #[arg( + long, + env = "BT_DATASETS_WEB", + value_parser = BoolishValueParser::new(), + default_value_t = false + )] + web: bool, + + /// Show full dataset row payloads + #[arg( + long, + env = "BT_DATASETS_VERBOSE", + value_parser = BoolishValueParser::new(), + default_value_t = false + )] + verbose: bool, +} + +impl ViewArgs { + fn name(&self) -> Option<&str> { + self.name.name() + } +} + +#[derive(Debug, Clone, Args)] +struct DeleteArgs { + #[command(flatten)] + name: DatasetNameArgs, + + /// Skip confirmation + #[arg( + long, + short = 'f', + env = "BT_DATASETS_FORCE", + value_parser = BoolishValueParser::new(), + default_value_t = false + )] + force: bool, +} + +impl DeleteArgs { + fn name(&self) -> Option<&str> { + self.name.name() + } +} + +pub(crate) async fn select_dataset_interactive( + client: &ApiClient, + project_id: &str, +) -> Result { + let mut datasets = with_spinner( + "Loading datasets...", + datasets_api::list_datasets(client, project_id), + ) + .await?; + + if datasets.is_empty() { + bail!("no datasets found"); + } + + datasets.sort_by(|a, b| a.name.cmp(&b.name)); + let names: Vec<&str> = datasets + .iter() + .map(|dataset| dataset.name.as_str()) + .collect(); + let selection = ui::fuzzy_select("Select dataset", &names, 0)?; + Ok(datasets[selection].clone()) +} + +pub async fn run(base: BaseArgs, args: DatasetsArgs) -> Result<()> { + let ctx = resolve_project_command_context(&base).await?; + + match args.command { + None | Some(DatasetsCommands::List) => list::run(&ctx, base.json).await, + Some(DatasetsCommands::Create(create_args)) => { + create::run( + &ctx, + create_args.name(), + create_args.input.file.as_deref(), + create_args.input.rows.as_deref(), + &create_args.input.id_field, + base.json, + ) + .await + } + Some(DatasetsCommands::Upload(upload_args)) => { + upload::run( + &ctx, + upload_args.name.name(), + upload_args.input.file.as_deref(), + upload_args.input.rows.as_deref(), + &upload_args.input.id_field, + base.json, + ) + .await + } + Some(DatasetsCommands::Refresh(refresh_args)) => { + refresh::run( + &ctx, + refresh_args.name.name(), + refresh_args.input.file.as_deref(), + refresh_args.input.rows.as_deref(), + &refresh_args.input.id_field, + refresh_args.prune, + base.json, + ) + .await + } + Some(DatasetsCommands::View(view_args)) => { + view::run( + &ctx, + view_args.name(), + base.json, + view_args.web, + view_args.verbose, + ) + .await + } + Some(DatasetsCommands::Delete(delete_args)) => { + delete::run(&ctx, delete_args.name(), delete_args.force).await + } + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use clap::{Parser, Subcommand}; + + use super::*; + + #[derive(Debug, Parser)] + struct CliHarness { + #[command(subcommand)] + command: Commands, + } + + #[derive(Debug, Subcommand)] + enum Commands { + Datasets(DatasetsArgs), + } + + fn parse(args: &[&str]) -> anyhow::Result { + let mut argv = vec!["bt"]; + argv.extend_from_slice(args); + let parsed = CliHarness::try_parse_from(argv)?; + match parsed.command { + Commands::Datasets(args) => Ok(args), + } + } + + #[test] + fn datasets_without_subcommand_defaults_to_list() { + let parsed = parse(&["datasets"]).expect("parse datasets"); + assert!(parsed.command.is_none()); + } + + #[test] + fn create_parses_positional_name() { + let parsed = parse(&["datasets", "create", "my-dataset"]).expect("parse create"); + let DatasetsCommands::Create(create) = parsed.command.expect("subcommand") else { + panic!("expected create command"); + }; + assert_eq!(create.name(), Some("my-dataset")); + } + + #[test] + fn create_parses_file_rows_and_id_field() { + let parsed = parse(&[ + "datasets", + "create", + "my-dataset", + "--file", + "records.jsonl", + "--id-field", + "metadata.case_id", + ]) + .expect("parse create with file"); + let DatasetsCommands::Create(create) = parsed.command.expect("subcommand") else { + panic!("expected create command"); + }; + assert_eq!(create.name(), Some("my-dataset")); + assert_eq!(create.input.file, Some(PathBuf::from("records.jsonl"))); + assert_eq!(create.input.id_field, "metadata.case_id"); + assert!(create.input.rows.is_none()); + + let parsed = parse(&[ + "datasets", + "create", + "my-dataset", + "--rows", + r#"[{"id":"case-1"}]"#, + ]) + .expect("parse create with rows"); + let DatasetsCommands::Create(create) = parsed.command.expect("subcommand") else { + panic!("expected create command"); + }; + assert_eq!(create.input.rows.as_deref(), Some(r#"[{"id":"case-1"}]"#)); + assert!(create.input.file.is_none()); + } + + #[test] + fn upload_parses_file_and_id_field() { + let parsed = parse(&[ + "datasets", + "upload", + "my-dataset", + "--file", + "records.jsonl", + "--id-field", + "metadata.case_id", + ]) + .expect("parse upload"); + let DatasetsCommands::Upload(upload) = parsed.command.expect("subcommand") else { + panic!("expected upload command"); + }; + assert_eq!(upload.name.name(), Some("my-dataset")); + assert_eq!(upload.input.file, Some(PathBuf::from("records.jsonl"))); + assert_eq!(upload.input.id_field, "metadata.case_id"); + } + + #[test] + fn upload_visible_aliases_parse() { + for alias in ["add", "append", "update"] { + let parsed = parse(&[ + "datasets", + alias, + "my-dataset", + "--rows", + r#"[{"id":"case-1"}]"#, + ]) + .unwrap_or_else(|err| panic!("parse {alias} alias: {err}")); + let DatasetsCommands::Upload(upload) = parsed.command.expect("subcommand") else { + panic!("expected upload command"); + }; + assert_eq!(upload.name.name(), Some("my-dataset")); + assert_eq!(upload.input.rows.as_deref(), Some(r#"[{"id":"case-1"}]"#)); + } + } + + #[test] + fn refresh_parses_prune() { + let parsed = parse(&[ + "datasets", + "refresh", + "my-dataset", + "--file", + "records.jsonl", + "--prune", + ]) + .expect("parse refresh"); + let DatasetsCommands::Refresh(refresh) = parsed.command.expect("subcommand") else { + panic!("expected refresh command"); + }; + assert_eq!(refresh.name.name(), Some("my-dataset")); + assert_eq!(refresh.input.file, Some(PathBuf::from("records.jsonl"))); + assert!(refresh.prune); + assert_eq!(refresh.input.id_field, "id"); + } + + #[test] + fn view_parses_name_flag_web_and_verbose() { + let parsed = parse(&[ + "datasets", + "view", + "--name", + "my-dataset", + "--web", + "--verbose", + ]) + .expect("parse view"); + let DatasetsCommands::View(view) = parsed.command.expect("subcommand") else { + panic!("expected view command"); + }; + assert_eq!(view.name(), Some("my-dataset")); + assert!(view.web); + assert!(view.verbose); + } + + #[test] + fn delete_parses_name_and_force() { + let parsed = parse(&["datasets", "delete", "my-dataset", "--force"]).expect("parse delete"); + let DatasetsCommands::Delete(delete) = parsed.command.expect("subcommand") else { + panic!("expected delete command"); + }; + assert_eq!(delete.name(), Some("my-dataset")); + assert!(delete.force); + } + + #[test] + fn dataset_name_rejects_positional_and_name_flag_together() { + let err = parse(&[ + "datasets", + "delete", + "positional-name", + "--name", + "flag-name", + "--force", + ]) + .expect_err("name should be ambiguous when both positional and --name are set"); + let rendered = err.to_string(); + assert!(rendered.contains("cannot be used with")); + assert!(rendered.contains("--name")); + } +} diff --git a/src/datasets/records.rs b/src/datasets/records.rs new file mode 100644 index 0000000..5d68e6a --- /dev/null +++ b/src/datasets/records.rs @@ -0,0 +1,476 @@ +use std::{ + collections::{HashMap, HashSet}, + fs, + io::{self, IsTerminal, Read}, + path::Path, +}; + +use anyhow::{anyhow, bail, Context, Result}; +use chrono::Utc; +use serde_json::{Map, Value}; +use sha2::{Digest, Sha256}; + +use super::api::DatasetRow; + +pub(crate) const DATASET_UPLOAD_BATCH_SIZE: usize = 1000; + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct PreparedDatasetRecord { + pub id: String, + pub input: Option, + pub expected: Option, + pub metadata: Option>, + pub tags: Option>, +} + +impl PreparedDatasetRecord { + pub fn to_upload_row(&self, dataset_id: &str) -> Map { + let mut row = Map::new(); + row.insert("id".to_string(), Value::String(self.id.clone())); + row.insert( + "dataset_id".to_string(), + Value::String(dataset_id.to_string()), + ); + row.insert( + "created".to_string(), + Value::String(Utc::now().to_rfc3339()), + ); + if let Some(input) = &self.input { + row.insert("input".to_string(), input.clone()); + } + if let Some(expected) = &self.expected { + row.insert("expected".to_string(), expected.clone()); + } + if let Some(metadata) = &self.metadata { + row.insert("metadata".to_string(), Value::Object(metadata.clone())); + } + if let Some(tags) = &self.tags { + row.insert( + "tags".to_string(), + Value::Array(tags.iter().cloned().map(Value::String).collect()), + ); + } + row + } +} + +pub(crate) fn delete_row(id: &str, dataset_id: &str) -> Map { + let mut row = Map::new(); + row.insert("id".to_string(), Value::String(id.to_string())); + row.insert( + "dataset_id".to_string(), + Value::String(dataset_id.to_string()), + ); + row.insert( + "created".to_string(), + Value::String(Utc::now().to_rfc3339()), + ); + row.insert("_object_delete".to_string(), Value::Bool(true)); + row +} + +pub(crate) fn load_upload_records( + input_path: Option<&Path>, + inline_rows: Option<&str>, + id_field: &str, +) -> Result> { + let raw = load_required_record_objects(input_path, inline_rows)?; + prepare_records(raw, id_field, false) +} + +pub(crate) fn load_optional_upload_records( + input_path: Option<&Path>, + inline_rows: Option<&str>, + id_field: &str, +) -> Result>> { + let Some(raw) = load_optional_record_objects(input_path, inline_rows)? else { + return Ok(None); + }; + Ok(Some(prepare_records(raw, id_field, false)?)) +} + +pub(crate) fn load_refresh_records( + input_path: Option<&Path>, + inline_rows: Option<&str>, + id_field: &str, +) -> Result> { + let raw = load_required_record_objects(input_path, inline_rows)?; + prepare_records(raw, id_field, true) +} + +pub(crate) fn remote_records_by_id( + rows: Vec, +) -> Result> { + let mut records = HashMap::new(); + for row in rows { + if let Some(record) = prepared_record_from_remote_row(&row)? { + records.insert(record.id.clone(), record); + } + } + Ok(records) +} + +fn load_required_record_objects( + input_path: Option<&Path>, + inline_rows: Option<&str>, +) -> Result>> { + load_optional_record_objects(input_path, inline_rows)?.ok_or_else(|| { + anyhow!( + "dataset input required. Pass --file , --rows , or pipe JSON/JSONL into stdin" + ) + }) +} + +fn load_optional_record_objects( + input_path: Option<&Path>, + inline_rows: Option<&str>, +) -> Result>>> { + let Some(contents) = read_input_contents(input_path, inline_rows)? else { + return Ok(None); + }; + if contents.trim().is_empty() && input_path.is_none() && inline_rows.is_none() { + return Ok(None); + } + Ok(Some(parse_record_objects(&contents)?)) +} + +fn parse_record_objects(contents: &str) -> Result>> { + let trimmed = contents.trim(); + if trimmed.is_empty() { + bail!("dataset input is empty"); + } + + if trimmed.starts_with('[') { + return parse_json_records(trimmed); + } + + if trimmed.starts_with('{') { + let json_result = parse_json_records(trimmed); + if json_result.is_ok() { + return json_result; + } + + if trimmed.lines().skip(1).any(|line| !line.trim().is_empty()) { + if let Ok(records) = parse_jsonl_records(trimmed) { + return Ok(records); + } + } + + return json_result; + } + + parse_jsonl_records(trimmed) +} + +fn read_input_contents( + input_path: Option<&Path>, + inline_rows: Option<&str>, +) -> Result> { + match (input_path, inline_rows) { + (Some(path), None) => fs::read_to_string(path) + .with_context(|| format!("failed to read dataset input {}", path.display())) + .map(Some), + (None, Some(rows)) => Ok(Some(rows.to_string())), + (Some(_), Some(_)) => bail!("pass either --file or --rows, not both"), + (None, None) => { + if io::stdin().is_terminal() { + return Ok(None); + } + let mut buffer = String::new(); + io::stdin() + .read_to_string(&mut buffer) + .context("failed to read dataset input from stdin")?; + Ok(Some(buffer)) + } + } +} + +fn parse_json_records(contents: &str) -> Result>> { + let value: Value = serde_json::from_str(contents).context("invalid dataset JSON input")?; + match value { + Value::Array(values) => values + .into_iter() + .enumerate() + .map(|(index, value)| expect_record_object(value, Some(index + 1))) + .collect(), + Value::Object(mut object) => { + if let Some(rows) = object.remove("rows") { + match rows { + Value::Array(values) => values + .into_iter() + .enumerate() + .map(|(index, value)| expect_record_object(value, Some(index + 1))) + .collect(), + _ => bail!("dataset JSON field 'rows' must be an array of objects"), + } + } else { + Ok(vec![object]) + } + } + _ => bail!("dataset JSON input must be an object, an array of objects, or an object with a 'rows' array"), + } +} + +fn parse_jsonl_records(contents: &str) -> Result>> { + let mut rows = Vec::new(); + for (line_index, raw_line) in contents.lines().enumerate() { + let line = raw_line.trim(); + if line.is_empty() { + continue; + } + let value: Value = serde_json::from_str(line) + .with_context(|| format!("invalid JSON on line {}", line_index + 1))?; + rows.push(expect_record_object(value, Some(line_index + 1))?); + } + + if rows.is_empty() { + bail!("dataset input did not contain any records"); + } + + Ok(rows) +} + +fn expect_record_object(value: Value, line_number: Option) -> Result> { + match value { + Value::Object(object) => Ok(object), + _ => match line_number { + Some(line_number) => { + bail!("dataset record on line {line_number} must be a JSON object") + } + None => bail!("dataset record must be a JSON object"), + }, + } +} + +fn prepare_records( + raw_records: Vec>, + id_field: &str, + require_ids: bool, +) -> Result> { + let id_path = parse_id_field_path(id_field)?; + let mut records = Vec::with_capacity(raw_records.len()); + let mut seen_ids = HashSet::new(); + + for (row_index, raw_record) in raw_records.into_iter().enumerate() { + let record = + prepared_record_from_input_object(raw_record, &id_path, require_ids, row_index)?; + if !seen_ids.insert(record.id.clone()) { + bail!("duplicate dataset record id '{}' in input", record.id); + } + records.push(record); + } + + if records.is_empty() { + bail!("dataset input did not contain any records"); + } + + Ok(records) +} + +fn prepared_record_from_input_object( + object: Map, + id_path: &[String], + require_id: bool, + row_index: usize, +) -> Result { + let explicit_id = lookup_object_path(&object, id_path) + .map(coerce_id_value) + .transpose()?; + + let id = match explicit_id { + Some(id) => id, + None if require_id => bail!( + "dataset record {} is missing a stable id at '{}'; pass --id-field or include an id field", + row_index + 1, + id_path.join(".") + ), + None => generated_record_id(&object, row_index)?, + }; + + Ok(PreparedDatasetRecord { + id, + input: object.get("input").cloned(), + expected: object + .get("expected") + .cloned() + .or_else(|| object.get("output").cloned()), + metadata: parse_metadata(object.get("metadata"))?, + tags: parse_tags(object.get("tags"))?, + }) +} + +fn prepared_record_from_remote_row(row: &DatasetRow) -> Result> { + let Some(id_value) = row.get("id").or_else(|| row.get("span_id")) else { + return Ok(None); + }; + let id = coerce_id_value(id_value)?; + Ok(Some(PreparedDatasetRecord { + id, + input: row.get("input").cloned(), + expected: row + .get("expected") + .cloned() + .or_else(|| row.get("output").cloned()), + metadata: parse_metadata(row.get("metadata"))?, + tags: parse_tags(row.get("tags"))?, + })) +} + +fn parse_id_field_path(id_field: &str) -> Result> { + let path = id_field + .split('.') + .map(str::trim) + .filter(|part| !part.is_empty()) + .map(ToOwned::to_owned) + .collect::>(); + if path.is_empty() { + bail!("id field path cannot be empty"); + } + Ok(path) +} + +fn lookup_object_path<'a>(object: &'a Map, path: &[String]) -> Option<&'a Value> { + let mut current = object.get(path.first()?.as_str())?; + for part in path.iter().skip(1) { + current = current.as_object()?.get(part.as_str())?; + } + Some(current) +} + +fn coerce_id_value(value: &Value) -> Result { + match value { + Value::String(value) => { + let trimmed = value.trim(); + if trimmed.is_empty() { + bail!("dataset record id cannot be empty"); + } + Ok(trimmed.to_string()) + } + Value::Number(value) => Ok(value.to_string()), + Value::Bool(value) => Ok(value.to_string()), + Value::Null => bail!("dataset record id cannot be null"), + _ => Err(anyhow!( + "dataset record id must be a string, number, or boolean" + )), + } +} + +fn parse_metadata(value: Option<&Value>) -> Result>> { + match value { + None | Some(Value::Null) => Ok(None), + Some(Value::Object(metadata)) => Ok(Some(metadata.clone())), + Some(_) => bail!("dataset record metadata must be a JSON object"), + } +} + +fn parse_tags(value: Option<&Value>) -> Result>> { + match value { + None | Some(Value::Null) => Ok(None), + Some(Value::Array(values)) => values + .iter() + .enumerate() + .map(|(index, value)| match value { + Value::String(value) => Ok(value.clone()), + _ => bail!("dataset record tags[{index}] must be a string"), + }) + .collect::>>() + .map(Some), + Some(_) => bail!("dataset record tags must be an array of strings"), + } +} + +fn generated_record_id(object: &Map, row_index: usize) -> Result { + let payload = serde_json::to_vec(object).context("failed to serialize dataset record")?; + let digest = Sha256::digest(payload); + let mut short_hash = String::with_capacity(16); + for byte in digest.iter().take(8) { + short_hash.push_str(&format!("{byte:02x}")); + } + Ok(format!("bt-dataset-{row_index:06}-{short_hash}")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_json_rows_wrapper_extracts_rows() { + let records = + parse_json_records(r#"{"dataset":{"id":"ds"},"rows":[{"id":"a"},{"id":"b"}]}"#) + .expect("parse rows wrapper"); + assert_eq!(records.len(), 2); + assert_eq!(records[0].get("id"), Some(&Value::String("a".to_string()))); + } + + #[test] + fn parse_record_objects_accepts_jsonl_objects() { + let records = parse_record_objects( + r#"{"id":"a"} +{"id":"b"} +"#, + ) + .expect("parse jsonl records"); + assert_eq!(records.len(), 2); + assert_eq!(records[1].get("id"), Some(&Value::String("b".to_string()))); + } + + #[test] + fn read_input_contents_prefers_inline_rows() { + let records = read_input_contents(None, Some(r#"[{"id":"case-1"}]"#)) + .expect("read inline rows") + .expect("inline rows present"); + assert_eq!(records, r#"[{"id":"case-1"}]"#); + } + + #[test] + fn prepare_records_uses_nested_id_field() { + let records = prepare_records( + vec![serde_json::from_value(serde_json::json!({ + "metadata": {"case_id": "case-1"}, + "input": {"text": "hello"}, + "expected": "world" + })) + .expect("map")], + "metadata.case_id", + true, + ) + .expect("prepare records"); + assert_eq!(records[0].id, "case-1"); + } + + #[test] + fn prepare_records_generates_stable_id_when_missing() { + let source: Map = serde_json::from_value(serde_json::json!({ + "input": {"text": "hello"}, + "expected": "world" + })) + .expect("map"); + let first = prepare_records(vec![source.clone()], "id", false).expect("first"); + let second = prepare_records(vec![source], "id", false).expect("second"); + assert_eq!(first[0].id, second[0].id); + } + + #[test] + fn remote_record_prefers_expected_over_output() { + let row = serde_json::from_value::>(serde_json::json!({ + "id": "case-1", + "expected": "expected", + "output": "output" + })) + .expect("map"); + let record = prepared_record_from_remote_row(&row) + .expect("parse remote") + .expect("record"); + assert_eq!(record.expected, Some(Value::String("expected".to_string()))); + } + + #[test] + fn prepare_records_rejects_duplicate_ids() { + let first = serde_json::from_value(serde_json::json!({"id": "dup"})).expect("map"); + let second = serde_json::from_value(serde_json::json!({"id": "dup"})).expect("map"); + let err = prepare_records(vec![first, second], "id", true).expect_err("duplicate ids"); + assert!(err + .to_string() + .contains("duplicate dataset record id 'dup'")); + } +} diff --git a/src/datasets/refresh.rs b/src/datasets/refresh.rs new file mode 100644 index 0000000..e958f5f --- /dev/null +++ b/src/datasets/refresh.rs @@ -0,0 +1,114 @@ +use std::path::Path; + +use anyhow::Result; +use serde_json::json; + +use crate::ui::{print_command_status, with_spinner, CommandStatus}; + +use super::{ + api, + records::{delete_row, load_refresh_records, remote_records_by_id}, + upload, ResolvedContext, +}; + +pub async fn run( + ctx: &ResolvedContext, + name: Option<&str>, + input_path: Option<&Path>, + inline_rows: Option<&str>, + id_field: &str, + prune: bool, + json_output: bool, +) -> Result<()> { + let dataset_name = upload::resolve_dataset_name(name, "refresh")?; + let local_records = load_refresh_records(input_path, inline_rows, id_field)?; + + let (dataset, created_dataset) = crate::ui::with_spinner_visible( + "Resolving remote dataset...", + api::get_or_create_dataset(&ctx.client, &ctx.project.id, &dataset_name), + std::time::Duration::from_millis(300), + ) + .await?; + + let remote_records = if created_dataset { + Default::default() + } else { + let remote_rows = with_spinner( + "Loading remote dataset rows...", + api::list_dataset_rows(&ctx.client, &dataset.id), + ) + .await?; + remote_records_by_id(remote_rows)? + }; + + let mut upload_rows = Vec::new(); + let mut local_ids = std::collections::HashSet::new(); + let mut created = 0usize; + let mut updated = 0usize; + let mut unchanged = 0usize; + let mut deleted = 0usize; + + for record in &local_records { + local_ids.insert(record.id.clone()); + match remote_records.get(&record.id) { + None => { + created += 1; + upload_rows.push(record.to_upload_row(&dataset.id)); + } + Some(existing) if existing == record => { + unchanged += 1; + } + Some(_) => { + updated += 1; + upload_rows.push(record.to_upload_row(&dataset.id)); + } + } + } + + if prune { + for remote_id in remote_records.keys() { + if !local_ids.contains(remote_id) { + deleted += 1; + upload_rows.push(delete_row(remote_id, &dataset.id)); + } + } + } + + if !upload_rows.is_empty() { + upload::submit_rows( + ctx, + &upload_rows, + "Refreshing remote dataset...", + "dataset refresh failed", + ) + .await?; + } + + if json_output { + println!( + "{}", + serde_json::to_string(&json!({ + "dataset": dataset, + "created_dataset": created_dataset, + "created": created, + "updated": updated, + "deleted": deleted, + "unchanged": unchanged, + "pruned": prune, + "mode": "refresh", + }))? + ); + return Ok(()); + } + + let detail = if upload_rows.is_empty() { + format!("'{}' is already up to date.", dataset.name) + } else { + format!( + "Refreshed '{}' (created {}, updated {}, deleted {}, unchanged {}).", + dataset.name, created, updated, deleted, unchanged + ) + }; + print_command_status(CommandStatus::Success, &detail); + Ok(()) +} diff --git a/src/datasets/upload.rs b/src/datasets/upload.rs new file mode 100644 index 0000000..63a9468 --- /dev/null +++ b/src/datasets/upload.rs @@ -0,0 +1,123 @@ +use std::{path::Path, time::Duration}; + +use anyhow::{anyhow, bail, Result}; +use braintrust_sdk_rust::Logs3BatchUploader; +use dialoguer::Input; +use serde_json::{json, Map, Value}; + +use crate::ui::{is_interactive, print_command_status, with_spinner_visible, CommandStatus}; + +use super::{ + api, + records::{load_upload_records, PreparedDatasetRecord, DATASET_UPLOAD_BATCH_SIZE}, + ResolvedContext, +}; + +pub async fn run( + ctx: &ResolvedContext, + name: Option<&str>, + input_path: Option<&Path>, + inline_rows: Option<&str>, + id_field: &str, + json_output: bool, +) -> Result<()> { + let dataset_name = resolve_dataset_name(name, "upload")?; + let records = load_upload_records(input_path, inline_rows, id_field)?; + let record_count = records.len(); + + let (dataset, created) = with_spinner_visible( + "Resolving remote dataset...", + api::get_or_create_dataset(&ctx.client, &ctx.project.id, &dataset_name), + Duration::from_millis(300), + ) + .await?; + + submit_prepared_records( + ctx, + &dataset.id, + &records, + "Uploading dataset rows...", + "dataset upload failed", + ) + .await?; + + if json_output { + println!( + "{}", + serde_json::to_string(&json!({ + "dataset": dataset, + "created_dataset": created, + "uploaded": record_count, + "mode": "upload", + }))? + ); + return Ok(()); + } + + let detail = if created { + format!( + "Uploaded {record_count} records to '{}' and created the remote dataset.", + dataset.name + ) + } else { + format!("Uploaded {record_count} records to '{}'.", dataset.name) + }; + print_command_status(CommandStatus::Success, &detail); + Ok(()) +} + +pub(crate) fn resolve_dataset_name(name: Option<&str>, command: &str) -> Result { + match name { + Some(name) if !name.trim().is_empty() => Ok(name.trim().to_string()), + _ => { + if !is_interactive() { + bail!("dataset name required. Use: bt datasets {command} "); + } + Ok(Input::new().with_prompt("Dataset name").interact_text()?) + } + } +} + +pub(crate) async fn submit_prepared_records( + ctx: &ResolvedContext, + dataset_id: &str, + records: &[PreparedDatasetRecord], + spinner_label: &str, + error_context: &str, +) -> Result<()> { + let rows = records + .iter() + .map(|record| record.to_upload_row(dataset_id)) + .collect::>(); + submit_rows(ctx, &rows, spinner_label, error_context).await +} + +pub(crate) async fn submit_rows( + ctx: &ResolvedContext, + rows: &[Map], + spinner_label: &str, + error_context: &str, +) -> Result<()> { + let mut uploader = dataset_uploader(ctx)?; + with_spinner_visible( + spinner_label, + async { + uploader + .upload_rows(rows, DATASET_UPLOAD_BATCH_SIZE) + .await + .map_err(|err| anyhow!("{error_context}: {err}")) + }, + Duration::from_millis(300), + ) + .await?; + Ok(()) +} + +fn dataset_uploader(ctx: &ResolvedContext) -> Result { + Logs3BatchUploader::new( + ctx.client.base_url(), + ctx.client.api_key().to_string(), + (!ctx.client.org_name().trim().is_empty()).then_some(ctx.client.org_name().to_string()), + ) + .map_err(|err| anyhow!("failed to initialize dataset uploader: {err}")) +} diff --git a/src/datasets/view.rs b/src/datasets/view.rs new file mode 100644 index 0000000..ca1bf84 --- /dev/null +++ b/src/datasets/view.rs @@ -0,0 +1,190 @@ +use std::fmt::Write as _; + +use anyhow::{anyhow, bail, Result}; +use dialoguer::console; +use serde_json::{Map, Value}; +use urlencoding::encode; + +use crate::ui::{ + is_interactive, print_command_status, print_with_pager, with_spinner, CommandStatus, +}; + +use super::{api, ResolvedContext}; + +pub async fn run( + ctx: &ResolvedContext, + name: Option<&str>, + json: bool, + web: bool, + verbose: bool, +) -> Result<()> { + let dataset = match name { + Some(name) => with_spinner( + "Loading dataset...", + api::get_dataset_by_name(&ctx.client, &ctx.project.id, name), + ) + .await? + .ok_or_else(|| anyhow!("dataset '{name}' not found"))?, + None => { + if !is_interactive() { + bail!("dataset name required. Use: bt datasets view "); + } + super::select_dataset_interactive(&ctx.client, &ctx.project.id).await? + } + }; + + let url = format!( + "{}/app/{}/p/{}/datasets/{}", + ctx.app_url.trim_end_matches('/'), + encode(ctx.client.org_name()), + encode(&ctx.project.name), + encode(&dataset.name) + ); + + if web { + open::that(&url)?; + print_command_status(CommandStatus::Success, &format!("Opened {url} in browser")); + return Ok(()); + } + + let rows = with_spinner( + "Loading dataset rows...", + api::list_dataset_rows(&ctx.client, &dataset.id), + ) + .await?; + + if json { + println!( + "{}", + serde_json::to_string(&serde_json::json!({ + "dataset": dataset, + "rows": rows, + }))? + ); + return Ok(()); + } + + let display_rows = if verbose { + rows.clone() + } else { + rows.iter().map(compact_row_for_display).collect() + }; + + let mut output = String::new(); + writeln!(output, "Viewing {}", console::style(&dataset.name).bold())?; + + if let Some(description) = dataset.description_text() { + writeln!( + output, + "{} {}", + console::style("Description:").dim(), + description + )?; + } + writeln!(output, "{} {}", console::style("ID:").dim(), dataset.id)?; + writeln!( + output, + "{} {}", + console::style("Project:").dim(), + ctx.project.name + )?; + if let Some(created) = dataset.created_text() { + writeln!(output, "{} {}", console::style("Created:").dim(), created)?; + } + writeln!(output, "{} {}", console::style("Rows:").dim(), rows.len())?; + + writeln!( + output, + "\n{} {}", + console::style("View dataset:").dim(), + console::style(&url).underlined() + )?; + + writeln!(output, "\n{}", console::style("Dataset rows:").dim())?; + if !verbose { + writeln!( + output, + "{}", + console::style( + "Showing row id/input/expected/output/metadata/tags only. Re-run with --verbose to inspect full row payloads." + ) + .dim() + )?; + } + writeln!(output, "{}", serde_json::to_string_pretty(&display_rows)?)?; + + print_with_pager(&output)?; + Ok(()) +} + +fn compact_row_for_display(row: &Map) -> Map { + let mut compact = Map::new(); + + if let Some(value) = row.get("id").cloned() { + compact.insert("id".to_string(), value); + } else if let Some(value) = row.get("span_id").cloned() { + compact.insert("id".to_string(), value); + } + + for key in ["input", "expected", "output", "metadata", "tags"] { + if let Some(value) = row.get(key).cloned() { + compact.insert(key.to_string(), value); + } + } + + compact +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn compact_row_for_display_keeps_user_facing_fields() { + let row = serde_json::from_value::>(serde_json::json!({ + "id": "case-1", + "dataset_id": "dataset_1", + "created": "2026-01-01T00:00:00Z", + "input": {"prompt": "hello"}, + "expected": "world", + "metadata": {"topic": "math"}, + "tags": ["smoke"], + "span_id": "span-1" + })) + .expect("row map"); + + let compact = compact_row_for_display(&row); + assert_eq!(compact.len(), 5); + assert_eq!( + compact.get("id"), + Some(&Value::String("case-1".to_string())) + ); + assert!(compact.get("dataset_id").is_none()); + assert!(compact.get("created").is_none()); + assert!(compact.get("input").is_some()); + assert!(compact.get("expected").is_some()); + assert!(compact.get("metadata").is_some()); + assert!(compact.get("tags").is_some()); + } + + #[test] + fn compact_row_for_display_falls_back_to_span_id_and_output() { + let row = serde_json::from_value::>(serde_json::json!({ + "span_id": "span-1", + "output": "value", + "created": "2026-01-01T00:00:00Z" + })) + .expect("row map"); + + let compact = compact_row_for_display(&row); + assert_eq!( + compact.get("id"), + Some(&Value::String("span-1".to_string())) + ); + assert_eq!( + compact.get("output"), + Some(&Value::String("value".to_string())) + ); + assert!(compact.get("created").is_none()); + } +} diff --git a/src/http.rs b/src/http.rs index e077add..6fd0c5d 100644 --- a/src/http.rs +++ b/src/http.rs @@ -60,6 +60,10 @@ impl ApiClient { &self.api_key } + pub fn base_url(&self) -> &str { + &self.base_url + } + pub fn org_name(&self) -> &str { &self.org_name } diff --git a/src/main.rs b/src/main.rs index b9ac457..6fc4052 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod args; mod auth; #[allow(dead_code)] mod config; +mod datasets; mod env; #[cfg(unix)] mod eval; @@ -60,6 +61,7 @@ Core Projects & resources projects Manage projects + datasets Manage datasets prompts Manage prompts functions Manage functions (tools, scorers, and more) tools Manage tools @@ -130,6 +132,8 @@ enum Commands { Eval(CLIArgs), /// Manage projects Projects(CLIArgs), + /// Manage datasets + Datasets(CLIArgs), /// Manage prompts Prompts(CLIArgs), #[command(name = "self")] @@ -167,6 +171,7 @@ impl Commands { #[cfg(unix)] Commands::Eval(cmd) => &cmd.base, Commands::Projects(cmd) => &cmd.base, + Commands::Datasets(cmd) => &cmd.base, Commands::Prompts(cmd) => &cmd.base, Commands::SelfCommand(cmd) => &cmd.base, Commands::Tools(cmd) => &cmd.base, @@ -221,6 +226,7 @@ async fn try_main() -> Result<()> { #[cfg(unix)] Commands::Eval(cmd) => eval::run(cmd.base, cmd.args).await?, Commands::Projects(cmd) => projects::run(cmd.base, cmd.args).await?, + Commands::Datasets(cmd) => datasets::run(cmd.base, cmd.args).await?, Commands::Prompts(cmd) => prompts::run(cmd.base, cmd.args).await?, Commands::Tools(cmd) => tools::run(cmd.base, cmd.args).await?, Commands::Scorers(cmd) => scorers::run(cmd.base, cmd.args).await?, diff --git a/tests/datasets-fixtures/create-inputs/fixture.json b/tests/datasets-fixtures/create-inputs/fixture.json new file mode 100644 index 0000000..c2ed952 --- /dev/null +++ b/tests/datasets-fixtures/create-inputs/fixture.json @@ -0,0 +1,63 @@ +{ + "env": { + "BRAINTRUST_API_KEY": "fixture-api-key", + "BRAINTRUST_API_URL": "__MOCK_SERVER_URL__", + "BRAINTRUST_APP_URL": "__MOCK_SERVER_URL__", + "BRAINTRUST_ORG_NAME": "test-org", + "BRAINTRUST_DEFAULT_PROJECT": "fixtures-project" + }, + "steps": [ + { + "command": [ + "datasets", + "--json", + "--no-input", + "create", + "file-fixture", + "--file", + "rows.jsonl" + ], + "stdout_contains": [ + "\"name\":\"file-fixture\"", + "\"uploaded\":2", + "\"mode\":\"create\"" + ] + }, + { + "command": [ + "datasets", + "--json", + "--no-input", + "create", + "stdin-fixture" + ], + "stdin_file": "rows.jsonl", + "stdout_contains": [ + "\"name\":\"stdin-fixture\"", + "\"uploaded\":2", + "\"mode\":\"create\"" + ] + }, + { + "command": [ + "datasets", + "--json", + "--no-input", + "create", + "inline-fixture", + "--rows", + "[{\"id\":\"inline-1\",\"input\":{\"prompt\":\"inline\"},\"expected\":\"ok\"}]" + ], + "stdout_contains": [ + "\"name\":\"inline-fixture\"", + "\"uploaded\":1", + "\"mode\":\"create\"" + ] + }, + { + "command": ["datasets", "--json", "--no-input", "view", "inline-fixture"], + "stdout_contains": ["\"id\":\"inline-1\"", "\"expected\":\"ok\""] + } + ], + "expected_logs3_requests": 3 +} diff --git a/tests/datasets-fixtures/create-inputs/rows.jsonl b/tests/datasets-fixtures/create-inputs/rows.jsonl new file mode 100644 index 0000000..678d246 --- /dev/null +++ b/tests/datasets-fixtures/create-inputs/rows.jsonl @@ -0,0 +1,2 @@ +{"id":"case-1","input":{"prompt":"one"},"expected":"A1"} +{"id":"case-2","input":{"prompt":"two"},"expected":"B1"} diff --git a/tests/datasets-fixtures/upload-refresh/fixture.json b/tests/datasets-fixtures/upload-refresh/fixture.json new file mode 100644 index 0000000..06f06f5 --- /dev/null +++ b/tests/datasets-fixtures/upload-refresh/fixture.json @@ -0,0 +1,77 @@ +{ + "env": { + "BRAINTRUST_API_KEY": "fixture-api-key", + "BRAINTRUST_API_URL": "__MOCK_SERVER_URL__", + "BRAINTRUST_APP_URL": "__MOCK_SERVER_URL__", + "BRAINTRUST_ORG_NAME": "test-org", + "BRAINTRUST_DEFAULT_PROJECT": "fixtures-project" + }, + "steps": [ + { + "command": [ + "datasets", + "--json", + "--no-input", + "append", + "qa-fixture", + "--file", + "upload.jsonl" + ], + "stdout_contains": [ + "\"uploaded\":2", + "\"created_dataset\":true", + "\"mode\":\"upload\"" + ] + }, + { + "command": [ + "datasets", + "--json", + "--no-input", + "refresh", + "qa-fixture", + "--file", + "refresh.jsonl", + "--prune" + ], + "stdout_contains": [ + "\"created\":1", + "\"updated\":1", + "\"deleted\":1", + "\"unchanged\":0", + "\"pruned\":true", + "\"mode\":\"refresh\"" + ] + }, + { + "command": [ + "datasets", + "--json", + "--no-input", + "refresh", + "qa-fixture", + "--file", + "refresh.jsonl", + "--prune" + ], + "stdout_contains": [ + "\"created\":0", + "\"updated\":0", + "\"deleted\":0", + "\"unchanged\":2", + "\"pruned\":true", + "\"mode\":\"refresh\"" + ] + }, + { + "command": ["datasets", "--json", "--no-input", "view", "qa-fixture"], + "stdout_contains": [ + "\"id\":\"case-1\"", + "\"expected\":\"A2\"", + "\"id\":\"case-3\"" + ], + "stdout_not_contains": ["\"id\":\"case-2\""] + } + ], + "expected_logs3_requests": 2 +} diff --git a/tests/datasets-fixtures/upload-refresh/refresh.jsonl b/tests/datasets-fixtures/upload-refresh/refresh.jsonl new file mode 100644 index 0000000..42cede4 --- /dev/null +++ b/tests/datasets-fixtures/upload-refresh/refresh.jsonl @@ -0,0 +1,2 @@ +{"id":"case-1","input":{"prompt":"one"},"expected":"A2","metadata":{"topic":"math"},"tags":["smoke","updated"]} +{"id":"case-3","input":{"prompt":"three"},"expected":"C1"} diff --git a/tests/datasets-fixtures/upload-refresh/upload.jsonl b/tests/datasets-fixtures/upload-refresh/upload.jsonl new file mode 100644 index 0000000..f20df98 --- /dev/null +++ b/tests/datasets-fixtures/upload-refresh/upload.jsonl @@ -0,0 +1,2 @@ +{"id":"case-1","input":{"prompt":"one"},"expected":"A1","metadata":{"topic":"math"},"tags":["smoke"]} +{"id":"case-2","input":{"prompt":"two"},"expected":"B1"} diff --git a/tests/datasets.rs b/tests/datasets.rs new file mode 100644 index 0000000..0243e18 --- /dev/null +++ b/tests/datasets.rs @@ -0,0 +1,531 @@ +use std::collections::BTreeMap; +use std::fs::{self, File}; +use std::net::TcpListener; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::sync::{Arc, Mutex}; + +use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; +use serde::Deserialize; +use serde_json::{Map, Value}; + +#[derive(Debug, Deserialize)] +struct FixtureConfig { + #[serde(default)] + env: BTreeMap, + steps: Vec, + #[serde(default)] + expected_logs3_requests: Option, +} + +#[derive(Debug, Deserialize)] +struct FixtureStep { + command: Vec, + #[serde(default)] + stdin_file: Option, + #[serde(default = "default_expect_success")] + expect_success: bool, + #[serde(default)] + stdout_contains: Vec, + #[serde(default)] + stderr_contains: Vec, + #[serde(default)] + stdout_not_contains: Vec, + #[serde(default)] + stderr_not_contains: Vec, +} + +fn default_expect_success() -> bool { + true +} + +fn repo_root() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) +} + +fn bt_binary_path() -> PathBuf { + if let Ok(path) = std::env::var("CARGO_BIN_EXE_bt") { + return PathBuf::from(path); + } + + let root = repo_root(); + let candidate = root.join("target").join("debug").join("bt"); + if !candidate.is_file() { + build_bt_binary(&root); + } + candidate +} + +fn build_bt_binary(root: &Path) { + let status = Command::new("cargo") + .args(["build", "--bin", "bt"]) + .current_dir(root) + .status() + .expect("cargo build --bin bt"); + if !status.success() { + panic!("cargo build --bin bt failed"); + } +} + +fn read_fixture_config(path: &Path) -> FixtureConfig { + let raw = fs::read_to_string(path).expect("read fixture.json"); + serde_json::from_str(&raw).expect("parse fixture.json") +} + +fn sanitize_dataset_env(cmd: &mut Command) { + for (key, _) in std::env::vars_os() { + if key + .to_str() + .is_some_and(|name| name.starts_with("BRAINTRUST_") || name.starts_with("BT_DATASETS_")) + { + cmd.env_remove(&key); + } + } +} + +fn expand_fixture_value(value: &str, mock_server_url: &str) -> String { + value.replace("__MOCK_SERVER_URL__", mock_server_url) +} + +#[derive(Debug, Clone)] +struct MockProject { + id: String, + name: String, + org_id: String, +} + +#[derive(Debug, Clone)] +struct MockDataset { + id: String, + name: String, + project_id: String, + created: String, +} + +#[derive(Debug)] +struct MockServerState { + requests: Mutex>, + projects: Mutex>, + datasets: Mutex>, + dataset_rows: Mutex>>>, +} + +impl MockServerState { + fn seeded() -> Self { + Self { + requests: Mutex::new(Vec::new()), + projects: Mutex::new(vec![MockProject { + id: "proj_fixture".to_string(), + name: "fixtures-project".to_string(), + org_id: "org_mock".to_string(), + }]), + datasets: Mutex::new(Vec::new()), + dataset_rows: Mutex::new(BTreeMap::new()), + } + } +} + +struct MockServer { + base_url: String, + handle: actix_web::dev::ServerHandle, +} + +impl MockServer { + async fn start(state: Arc) -> Self { + let listener = TcpListener::bind(("127.0.0.1", 0)).expect("bind mock server"); + let addr = listener.local_addr().expect("mock server addr"); + let base_url = format!("http://{addr}"); + let data = web::Data::new(state); + + let server = HttpServer::new(move || { + App::new() + .app_data(data.clone()) + .route("/api/apikey/login", web::post().to(mock_login)) + .route("/v1/project", web::get().to(mock_list_projects)) + .route("/v1/dataset", web::get().to(mock_list_datasets)) + .route("/v1/dataset", web::post().to(mock_create_dataset)) + .route("/btql", web::post().to(mock_btql)) + .route("/version", web::get().to(mock_version)) + .route("/logs3", web::post().to(mock_logs3)) + }) + .workers(1) + .listen(listener) + .expect("listen mock server") + .run(); + let handle = server.handle(); + tokio::spawn(server); + + Self { base_url, handle } + } + + async fn stop(&self) { + self.handle.stop(true).await; + } +} + +async fn mock_login(state: web::Data>, req: HttpRequest) -> HttpResponse { + log_request(state.get_ref(), &req); + let base = request_base_url(&req); + HttpResponse::Ok().json(serde_json::json!({ + "org_info": [ + { + "id": "org_mock", + "name": "test-org", + "api_url": base + } + ] + })) +} + +async fn mock_list_projects( + state: web::Data>, + req: HttpRequest, +) -> HttpResponse { + log_request(state.get_ref(), &req); + let query = parse_query(req.query_string()); + let requested_name = query.get("project_name").cloned(); + let projects = state.projects.lock().expect("projects lock").clone(); + let objects = projects + .into_iter() + .filter(|project| { + requested_name + .as_deref() + .is_none_or(|name| project.name == name) + }) + .map(|project| { + serde_json::json!({ + "id": project.id, + "name": project.name, + "org_id": project.org_id + }) + }) + .collect::>(); + HttpResponse::Ok().json(serde_json::json!({ "objects": objects })) +} + +async fn mock_list_datasets( + state: web::Data>, + req: HttpRequest, +) -> HttpResponse { + log_request(state.get_ref(), &req); + let query = parse_query(req.query_string()); + let requested_project_id = query.get("project_id").cloned(); + let datasets = state.datasets.lock().expect("datasets lock").clone(); + let objects = datasets + .into_iter() + .filter(|dataset| { + requested_project_id + .as_deref() + .is_none_or(|project_id| dataset.project_id == project_id) + }) + .map(|dataset| { + serde_json::json!({ + "id": dataset.id, + "name": dataset.name, + "project_id": dataset.project_id, + "created": dataset.created + }) + }) + .collect::>(); + HttpResponse::Ok().json(serde_json::json!({ "objects": objects })) +} + +#[derive(Debug, Deserialize)] +struct CreateDatasetRequest { + name: String, + project_id: String, +} + +async fn mock_create_dataset( + state: web::Data>, + req: HttpRequest, + body: web::Json, +) -> HttpResponse { + log_request(state.get_ref(), &req); + let mut datasets = state.datasets.lock().expect("datasets lock"); + if let Some(existing) = datasets + .iter() + .find(|dataset| dataset.project_id == body.project_id && dataset.name == body.name) + { + return HttpResponse::Ok().json(serde_json::json!({ + "id": existing.id, + "name": existing.name, + "project_id": existing.project_id, + "created": existing.created + })); + } + + let created = MockDataset { + id: format!("dataset_{}", datasets.len() + 1), + name: body.name.clone(), + project_id: body.project_id.clone(), + created: "2026-01-01T00:00:00Z".to_string(), + }; + datasets.push(created.clone()); + HttpResponse::Ok().json(serde_json::json!({ + "id": created.id, + "name": created.name, + "project_id": created.project_id, + "created": created.created + })) +} + +#[derive(Debug, Deserialize)] +struct BtqlRequest { + query: String, +} + +async fn mock_btql( + state: web::Data>, + req: HttpRequest, + body: web::Json, +) -> HttpResponse { + log_request(state.get_ref(), &req); + if !body.query.contains("filter: created >=") { + return HttpResponse::BadRequest().body("BTQL query must include a timestamp filter"); + } + + let Some(dataset_id) = extract_dataset_id_from_query(&body.query) else { + return HttpResponse::BadRequest().body("missing dataset(...) source in BTQL query"); + }; + + let rows = state + .dataset_rows + .lock() + .expect("dataset rows lock") + .get(&dataset_id) + .map(|rows| rows.values().cloned().collect::>()) + .unwrap_or_default(); + + HttpResponse::Ok().json(serde_json::json!({ + "data": rows, + "cursor": null, + })) +} + +async fn mock_version(state: web::Data>, req: HttpRequest) -> HttpResponse { + log_request(state.get_ref(), &req); + HttpResponse::Ok().json(serde_json::json!({})) +} + +async fn mock_logs3( + state: web::Data>, + req: HttpRequest, + body: web::Bytes, +) -> HttpResponse { + log_request(state.get_ref(), &req); + + let payload: Value = match serde_json::from_slice(&body) { + Ok(payload) => payload, + Err(err) => { + return HttpResponse::BadRequest().body(format!("invalid logs3 body: {err}")); + } + }; + + let Some(rows) = payload.get("rows").and_then(Value::as_array) else { + return HttpResponse::BadRequest().body("logs3 request body must contain a rows array"); + }; + + let mut dataset_rows = state.dataset_rows.lock().expect("dataset rows lock"); + for row in rows { + let Some(object) = row.as_object() else { + return HttpResponse::BadRequest().body("logs3 rows must be objects"); + }; + let Some(dataset_id) = object.get("dataset_id").and_then(Value::as_str) else { + return HttpResponse::BadRequest().body("logs3 rows must include dataset_id"); + }; + let Some(row_id) = object.get("id").and_then(Value::as_str) else { + return HttpResponse::BadRequest().body("logs3 rows must include id"); + }; + + let rows_for_dataset = dataset_rows.entry(dataset_id.to_string()).or_default(); + if object + .get("_object_delete") + .and_then(Value::as_bool) + .unwrap_or(false) + { + rows_for_dataset.remove(row_id); + } else { + rows_for_dataset.insert(row_id.to_string(), object.clone()); + } + } + + HttpResponse::Ok().json(serde_json::json!({})) +} + +fn extract_dataset_id_from_query(query: &str) -> Option { + let marker = "from: dataset('"; + let start = query.find(marker)? + marker.len(); + let rest = &query[start..]; + let end = rest.find("')")?; + Some(rest[..end].replace("''", "'")) +} + +fn log_request(state: &Arc, req: &HttpRequest) { + let entry = if req.query_string().is_empty() { + req.path().to_string() + } else { + format!("{}?{}", req.path(), req.query_string()) + }; + state.requests.lock().expect("requests lock").push(entry); +} + +fn request_base_url(req: &HttpRequest) -> String { + let info = req.connection_info(); + format!("{}://{}", info.scheme(), info.host()) +} + +fn parse_query(query: &str) -> BTreeMap { + let mut values = BTreeMap::new(); + for pair in query.split('&') { + if pair.is_empty() { + continue; + } + let (raw_key, raw_value) = pair.split_once('=').unwrap_or((pair, "")); + let key = urlencoding::decode(raw_key) + .map(|value| value.into_owned()) + .unwrap_or_else(|_| raw_key.to_string()); + let value = urlencoding::decode(raw_value) + .map(|value| value.into_owned()) + .unwrap_or_else(|_| raw_value.to_string()); + values.insert(key, value); + } + values +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn datasets_fixtures() { + let root = repo_root(); + let fixtures_root = root.join("tests").join("datasets-fixtures"); + if !fixtures_root.exists() { + eprintln!("No dataset fixtures found."); + return; + } + + let bt_path = bt_binary_path(); + let mut fixture_dirs: Vec = fs::read_dir(&fixtures_root) + .expect("read datasets fixture root") + .filter_map(|entry| entry.ok()) + .map(|entry| entry.path()) + .filter(|path| path.is_dir()) + .collect(); + fixture_dirs.sort(); + + let mut ran_any = false; + for dir in fixture_dirs { + let config_path = dir.join("fixture.json"); + if !config_path.is_file() { + continue; + } + ran_any = true; + + let fixture_name = dir + .file_name() + .map(|name| name.to_string_lossy().to_string()) + .expect("fixture directory name"); + let config = read_fixture_config(&config_path); + if config.steps.is_empty() { + panic!("Fixture {fixture_name} has no configured steps."); + } + + let state = Arc::new(MockServerState::seeded()); + let server = MockServer::start(Arc::clone(&state)).await; + + for (index, step) in config.steps.iter().enumerate() { + if step.command.is_empty() { + panic!( + "Fixture {fixture_name} step {} has an empty command.", + index + 1 + ); + } + + let mut cmd = Command::new(&bt_path); + cmd.args(&step.command).current_dir(&dir); + sanitize_dataset_env(&mut cmd); + for (key, value) in &config.env { + cmd.env(key, expand_fixture_value(value, &server.base_url)); + } + if let Some(stdin_file) = &step.stdin_file { + let stdin_path = dir.join(stdin_file); + let stdin = File::open(&stdin_path).unwrap_or_else(|err| { + panic!( + "failed to open fixture {fixture_name} step {} stdin file {}: {err}", + index + 1, + stdin_path.display(), + ) + }); + cmd.stdin(Stdio::from(stdin)); + } + + let output = cmd.output().unwrap_or_else(|err| { + panic!( + "failed to run fixture {fixture_name} step {} {:?}: {err}", + index + 1, + step.command, + ) + }); + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + if output.status.success() != step.expect_success { + panic!( + "Fixture {fixture_name} step {} command {:?} had status {} (expected success={})\nstdout:\n{}\nstderr:\n{}", + index + 1, + step.command, + output.status, + step.expect_success, + stdout, + stderr + ); + } + + for expected in &step.stdout_contains { + assert!( + stdout.contains(expected), + "Fixture {fixture_name} step {}: stdout missing expected text: {expected}\nstdout:\n{stdout}", + index + 1, + ); + } + for expected in &step.stderr_contains { + assert!( + stderr.contains(expected), + "Fixture {fixture_name} step {}: stderr missing expected text: {expected}\nstderr:\n{stderr}", + index + 1, + ); + } + for unexpected in &step.stdout_not_contains { + assert!( + !stdout.contains(unexpected), + "Fixture {fixture_name} step {}: stdout unexpectedly contained text: {unexpected}\nstdout:\n{stdout}", + index + 1, + ); + } + for unexpected in &step.stderr_not_contains { + assert!( + !stderr.contains(unexpected), + "Fixture {fixture_name} step {}: stderr unexpectedly contained text: {unexpected}\nstderr:\n{stderr}", + index + 1, + ); + } + } + + if let Some(expected_logs3_requests) = config.expected_logs3_requests { + let actual_logs3_requests = state + .requests + .lock() + .expect("requests lock") + .iter() + .filter(|request| request.as_str() == "/logs3") + .count(); + assert_eq!( + actual_logs3_requests, expected_logs3_requests, + "Fixture {fixture_name}: expected {expected_logs3_requests} /logs3 requests, saw {actual_logs3_requests}" + ); + } + + server.stop().await; + } + + if !ran_any { + eprintln!("No datasets fixtures with fixture.json found."); + } +} From fa97620061c5f7ce41eb7218b553a59388a1059c Mon Sep 17 00:00:00 2001 From: Parker Henderson Date: Thu, 9 Apr 2026 15:33:57 -0700 Subject: [PATCH 3/6] feat(datasets): add spinner feedback and improve CLI arg handling --- src/datasets/delete.rs | 9 ++++++--- src/datasets/mod.rs | 17 +++++++++-------- src/datasets/refresh.rs | 10 ++++++---- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/datasets/delete.rs b/src/datasets/delete.rs index 10b6b05..8ff941e 100644 --- a/src/datasets/delete.rs +++ b/src/datasets/delete.rs @@ -11,9 +11,12 @@ pub async fn run(ctx: &ResolvedContext, name: Option<&str>, force: bool) -> Resu } let dataset = match name { - Some(name) => api::get_dataset_by_name(&ctx.client, &ctx.project.id, name) - .await? - .ok_or_else(|| anyhow!("dataset '{name}' not found"))?, + Some(name) => with_spinner( + "Loading dataset...", + api::get_dataset_by_name(&ctx.client, &ctx.project.id, name), + ) + .await? + .ok_or_else(|| anyhow!("dataset '{name}' not found"))?, None => { if !is_interactive() { bail!("dataset name required. Use: bt datasets delete "); diff --git a/src/datasets/mod.rs b/src/datasets/mod.rs index 4caa51d..0848a9d 100644 --- a/src/datasets/mod.rs +++ b/src/datasets/mod.rs @@ -26,11 +26,11 @@ pub(crate) use crate::project_context::ProjectContext as ResolvedContext; #[derive(Debug, Clone, Args)] struct DatasetNameArgs { /// Dataset name (positional) - #[arg(value_name = "NAME", conflicts_with = "name_flag")] + #[arg(value_name = "NAME")] name_positional: Option, /// Dataset name (flag) - #[arg(long = "name", short = 'n', conflicts_with = "name_positional")] + #[arg(long = "name", short = 'n')] name_flag: Option, } @@ -446,8 +446,8 @@ mod tests { } #[test] - fn dataset_name_rejects_positional_and_name_flag_together() { - let err = parse(&[ + fn dataset_name_positional_takes_precedence_over_flag() { + let parsed = parse(&[ "datasets", "delete", "positional-name", @@ -455,9 +455,10 @@ mod tests { "flag-name", "--force", ]) - .expect_err("name should be ambiguous when both positional and --name are set"); - let rendered = err.to_string(); - assert!(rendered.contains("cannot be used with")); - assert!(rendered.contains("--name")); + .expect("both positional and --name should parse"); + let DatasetsCommands::Delete(delete) = parsed.command.expect("subcommand") else { + panic!("expected delete command"); + }; + assert_eq!(delete.name(), Some("positional-name")); } } diff --git a/src/datasets/refresh.rs b/src/datasets/refresh.rs index e958f5f..fee507a 100644 --- a/src/datasets/refresh.rs +++ b/src/datasets/refresh.rs @@ -1,9 +1,11 @@ +use std::collections::HashSet; use std::path::Path; +use std::time::Duration; use anyhow::Result; use serde_json::json; -use crate::ui::{print_command_status, with_spinner, CommandStatus}; +use crate::ui::{print_command_status, with_spinner, with_spinner_visible, CommandStatus}; use super::{ api, @@ -23,10 +25,10 @@ pub async fn run( let dataset_name = upload::resolve_dataset_name(name, "refresh")?; let local_records = load_refresh_records(input_path, inline_rows, id_field)?; - let (dataset, created_dataset) = crate::ui::with_spinner_visible( + let (dataset, created_dataset) = with_spinner_visible( "Resolving remote dataset...", api::get_or_create_dataset(&ctx.client, &ctx.project.id, &dataset_name), - std::time::Duration::from_millis(300), + Duration::from_millis(300), ) .await?; @@ -42,7 +44,7 @@ pub async fn run( }; let mut upload_rows = Vec::new(); - let mut local_ids = std::collections::HashSet::new(); + let mut local_ids = HashSet::new(); let mut created = 0usize; let mut updated = 0usize; let mut unchanged = 0usize; From d29c4bc762a0a8bea55ad5e455df81739b3a6596 Mon Sep 17 00:00:00 2001 From: Parker Henderson Date: Fri, 10 Apr 2026 14:04:20 -0700 Subject: [PATCH 4/6] refactor(datasets): refactor dataset update commands and add row limits --- README.md | 39 ++- src/datasets/api.rs | 87 +++++-- src/datasets/create.rs | 2 +- src/datasets/mod.rs | 230 ++++++++++++------ src/datasets/records.rs | 50 ++-- src/datasets/refresh.rs | 38 ++- src/datasets/upload.rs | 62 +---- src/datasets/view.rs | 26 +- src/project_context.rs | 4 - .../upload-refresh/fixture.json | 23 +- 10 files changed, 326 insertions(+), 235 deletions(-) diff --git a/README.md b/README.md index caaa770..f19e469 100644 --- a/README.md +++ b/README.md @@ -108,20 +108,20 @@ Remove-Item -Recurse -Force (Join-Path $env:APPDATA "bt") -ErrorAction SilentlyC ## Commands -| Command | Description | -| ---------------- | -------------------------------------------------------------------- | -| `bt init` | Initialize `.bt/` config directory and link to a project | -| `bt auth` | Authenticate with Braintrust | -| `bt switch` | Switch org and project context | -| `bt status` | Show current org and project context | -| `bt eval` | Run eval files (Unix only) | -| `bt sql` | Run SQL queries against Braintrust | -| `bt view` | View logs, traces, and spans | -| `bt projects` | Manage projects (list, create, view, delete) | -| `bt datasets` | Manage remote datasets (list, create, upload, refresh, view, delete) | -| `bt prompts` | Manage prompts (list, view, delete) | -| `bt sync` | Synchronize project logs between Braintrust and local NDJSON files | -| `bt self update` | Update bt in-place | +| Command | Description | +| ---------------- | ------------------------------------------------------------------ | +| `bt init` | Initialize `.bt/` config directory and link to a project | +| `bt auth` | Authenticate with Braintrust | +| `bt switch` | Switch org and project context | +| `bt status` | Show current org and project context | +| `bt eval` | Run eval files (Unix only) | +| `bt sql` | Run SQL queries against Braintrust | +| `bt view` | View logs, traces, and spans | +| `bt projects` | Manage projects (list, create, view, delete) | +| `bt datasets` | Manage remote datasets (list, create, update, view, delete) | +| `bt prompts` | Manage prompts (list, view, delete) | +| `bt sync` | Synchronize project logs between Braintrust and local NDJSON files | +| `bt self update` | Update bt in-place | ## `bt eval` @@ -159,12 +159,11 @@ bt eval foo.eval.ts -- --description "Prod" --shard=1/4 - `bt datasets create my-dataset --file records.jsonl` — create the remote dataset and seed it from a JSON/JSONL file. - `cat records.jsonl | bt datasets create my-dataset` — create the dataset and seed it from stdin. - `bt datasets create my-dataset --rows '[{"id":"case-1","input":{"text":"hi"},"expected":"hello"}]'` — create the dataset from inline JSON rows. -- `bt datasets add my-dataset --file records.jsonl` — add rows to an existing remote dataset. -- `bt datasets append my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]'` — alias for `add`/`upload` when you want to append rows explicitly. -- `bt datasets upload my-dataset --file records.jsonl` — legacy-compatible alias for `add`. -- `bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id --prune` — deterministically upsert rows by stable record id and optionally prune stale remote rows. -- `bt datasets view my-dataset` — show dataset metadata and the important row fields by default; pass `--verbose` to inspect full row payloads. -- Accepted row fields for create/upload/update/refresh are `id` (or your `--id-field` path), `input`, `expected`, `metadata`, and `tags`. +- `bt datasets update my-dataset --file records.jsonl` — deterministically upsert rows by stable record id. +- `bt datasets add my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]'` — alias for `update`. +- `bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id` — alias for `update` with explicit id path (fails if the dataset does not exist, and does not delete remote rows missing from the input). +- `bt datasets view my-dataset` — show dataset metadata and row payloads; defaults to loading up to 200 rows. Use `--limit ` to adjust or `--all-rows` to load everything. +- `update`/`add`/`refresh` require stable IDs (via `id` or your `--id-field` path). Accepted record fields are `id`, `input`, `expected`, `metadata`, and `tags`. ## `bt sql` diff --git a/src/datasets/api.rs b/src/datasets/api.rs index 8aece2e..bb786dc 100644 --- a/src/datasets/api.rs +++ b/src/datasets/api.rs @@ -1,4 +1,6 @@ -use anyhow::Result; +use std::collections::HashSet; + +use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use urlencoding::encode; @@ -6,6 +8,7 @@ use urlencoding::encode; use crate::http::ApiClient; const MAX_DATASET_ROWS_PAGE_LIMIT: usize = 1000; +const MAX_DATASET_ROWS_PAGES: usize = 10_000; const DATASET_ROWS_SINCE: &str = "1970-01-01T00:00:00Z"; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -85,10 +88,43 @@ pub async fn get_dataset_by_name( } pub async fn list_dataset_rows(client: &ApiClient, dataset_id: &str) -> Result> { + let (rows, _truncated) = list_dataset_rows_limited(client, dataset_id, None).await?; + Ok(rows) +} + +pub async fn list_dataset_rows_limited( + client: &ApiClient, + dataset_id: &str, + max_rows: Option, +) -> Result<(Vec, bool)> { + if matches!(max_rows, Some(0)) { + return Ok((Vec::new(), false)); + } + let mut rows = Vec::new(); let mut cursor: Option = None; + let mut seen_cursors = HashSet::new(); + let mut page_count = 0usize; + let mut truncated = false; loop { + page_count += 1; + if page_count > MAX_DATASET_ROWS_PAGES { + bail!( + "dataset rows pagination exceeded {} pages for dataset '{}'", + MAX_DATASET_ROWS_PAGES, + dataset_id + ); + } + if let Some(current_cursor) = cursor.as_ref() { + if !seen_cursors.insert(current_cursor.clone()) { + bail!( + "dataset rows pagination loop detected for dataset '{}'", + dataset_id + ); + } + } + let query = build_dataset_rows_query(dataset_id, MAX_DATASET_ROWS_PAGE_LIMIT, cursor.as_deref()); let body = serde_json::json!({ @@ -104,20 +140,46 @@ pub async fn list_dataset_rows(client: &ApiClient, dataset_id: &str) -> Result remaining { + rows.extend(response.data.into_iter().take(remaining)); + truncated = true; + break; + } + } + rows.extend(response.data); - let next_cursor = response.cursor.filter(|cursor| !cursor.is_empty()); - if next_cursor.is_none() { - break; + match next_cursor { + Some(next_cursor) => { + if max_rows.is_some_and(|max_rows| rows.len() >= max_rows) { + truncated = true; + break; + } + cursor = Some(next_cursor); + } + None => break, } - cursor = next_cursor; } - Ok(rows) + Ok((rows, truncated)) } pub async fn create_dataset(client: &ApiClient, project_id: &str, name: &str) -> Result { @@ -129,19 +191,6 @@ pub async fn create_dataset(client: &ApiClient, project_id: &str, name: &str) -> client.post("/v1/dataset", &body).await } -pub async fn get_or_create_dataset( - client: &ApiClient, - project_id: &str, - name: &str, -) -> Result<(Dataset, bool)> { - if let Some(dataset) = get_dataset_by_name(client, project_id, name).await? { - return Ok((dataset, false)); - } - - let dataset = create_dataset(client, project_id, name).await?; - Ok((dataset, true)) -} - pub async fn delete_dataset(client: &ApiClient, dataset_id: &str) -> Result<()> { let path = format!("/v1/dataset/{}", encode(dataset_id)); client.delete(&path).await diff --git a/src/datasets/create.rs b/src/datasets/create.rs index 1eb30eb..637082d 100644 --- a/src/datasets/create.rs +++ b/src/datasets/create.rs @@ -24,7 +24,7 @@ pub async fn run( .await?; if exists.is_some() { bail!( - "dataset '{name}' already exists in project '{}'; use `bt datasets upload {name}` to add rows", + "dataset '{name}' already exists in project '{}'; use `bt datasets update {name}` to add rows", ctx.project.name ); } diff --git a/src/datasets/mod.rs b/src/datasets/mod.rs index 0848a9d..3f91a39 100644 --- a/src/datasets/mod.rs +++ b/src/datasets/mod.rs @@ -6,7 +6,7 @@ use clap::{builder::BoolishValueParser, Args, Subcommand}; use crate::{ args::BaseArgs, http::ApiClient, - project_context::resolve_project_command_context, + project_context::resolve_project_command_context_with_auth_mode, ui::{self, with_spinner}, }; @@ -22,6 +22,7 @@ mod view; use api::{self as datasets_api, Dataset}; pub(crate) use crate::project_context::ProjectContext as ResolvedContext; +const DEFAULT_DATASETS_VIEW_ROW_LIMIT: usize = 200; #[derive(Debug, Clone, Args)] struct DatasetNameArgs { @@ -79,9 +80,9 @@ struct DatasetInputArgs { bt datasets create my-dataset --file records.jsonl cat records.jsonl | bt datasets create my-dataset bt datasets create my-dataset --rows '[{"id":"case-1","input":{"text":"hi"},"expected":"hello"}]' - bt datasets add my-dataset --file more-records.jsonl - bt datasets append my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]' - bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id --prune + bt datasets update my-dataset --file records.jsonl + bt datasets add my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]' + bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id bt datasets view my-dataset bt datasets delete my-dataset "#)] @@ -96,11 +97,9 @@ enum DatasetsCommands { List, /// Create a new dataset, optionally seeding rows from a file, --rows, or stdin Create(CreateArgs), - /// Add rows to a remote Braintrust dataset - #[command(visible_aliases = ["add", "append", "update"])] - Upload(UploadArgs), - /// Deterministically refresh a remote Braintrust dataset by record id - Refresh(RefreshArgs), + /// Deterministically update remote dataset rows by record id + #[command(visible_aliases = ["add", "refresh"])] + Update(UpdateArgs), /// View a dataset View(ViewArgs), /// Delete a dataset @@ -123,7 +122,7 @@ impl CreateArgs { } #[derive(Debug, Clone, Args)] -struct UploadArgs { +struct UpdateArgs { #[command(flatten)] name: DatasetNameArgs, @@ -131,24 +130,6 @@ struct UploadArgs { input: DatasetInputArgs, } -#[derive(Debug, Clone, Args)] -struct RefreshArgs { - #[command(flatten)] - name: DatasetNameArgs, - - #[command(flatten)] - input: DatasetInputArgs, - - /// Delete remote rows whose ids are not present in the input. - #[arg( - long, - env = "BT_DATASETS_REFRESH_PRUNE", - value_parser = BoolishValueParser::new(), - default_value_t = false - )] - prune: bool, -} - #[derive(Debug, Clone, Args)] struct ViewArgs { #[command(flatten)] @@ -171,6 +152,20 @@ struct ViewArgs { default_value_t = false )] verbose: bool, + + /// Maximum number of rows to load. Defaults to 200 unless --all-rows is passed. + #[arg(long, env = "BT_DATASETS_VIEW_LIMIT", value_name = "N")] + limit: Option, + + /// Load all rows (can be expensive for large datasets). + #[arg( + long = "all-rows", + env = "BT_DATASETS_VIEW_ALL", + value_parser = BoolishValueParser::new(), + default_value_t = false, + conflicts_with = "limit" + )] + all_rows: bool, } impl ViewArgs { @@ -225,7 +220,8 @@ pub(crate) async fn select_dataset_interactive( } pub async fn run(base: BaseArgs, args: DatasetsArgs) -> Result<()> { - let ctx = resolve_project_command_context(&base).await?; + let read_only = datasets_command_is_read_only(args.command.as_ref()); + let ctx = resolve_project_command_context_with_auth_mode(&base, read_only).await?; match args.command { None | Some(DatasetsCommands::List) => list::run(&ctx, base.json).await, @@ -240,25 +236,13 @@ pub async fn run(base: BaseArgs, args: DatasetsArgs) -> Result<()> { ) .await } - Some(DatasetsCommands::Upload(upload_args)) => { - upload::run( - &ctx, - upload_args.name.name(), - upload_args.input.file.as_deref(), - upload_args.input.rows.as_deref(), - &upload_args.input.id_field, - base.json, - ) - .await - } - Some(DatasetsCommands::Refresh(refresh_args)) => { + Some(DatasetsCommands::Update(update_args)) => { refresh::run( &ctx, - refresh_args.name.name(), - refresh_args.input.file.as_deref(), - refresh_args.input.rows.as_deref(), - &refresh_args.input.id_field, - refresh_args.prune, + update_args.name.name(), + update_args.input.file.as_deref(), + update_args.input.rows.as_deref(), + &update_args.input.id_field, base.json, ) .await @@ -270,6 +254,7 @@ pub async fn run(base: BaseArgs, args: DatasetsArgs) -> Result<()> { base.json, view_args.web, view_args.verbose, + resolve_view_row_limit(&view_args), ) .await } @@ -279,6 +264,21 @@ pub async fn run(base: BaseArgs, args: DatasetsArgs) -> Result<()> { } } +fn datasets_command_is_read_only(command: Option<&DatasetsCommands>) -> bool { + matches!( + command, + None | Some(DatasetsCommands::List) | Some(DatasetsCommands::View(_)) + ) +} + +fn resolve_view_row_limit(args: &ViewArgs) -> Option { + if args.all_rows { + None + } else { + Some(args.limit.unwrap_or(DEFAULT_DATASETS_VIEW_ROW_LIMIT)) + } +} + #[cfg(test)] mod tests { use std::path::PathBuf; @@ -358,28 +358,28 @@ mod tests { } #[test] - fn upload_parses_file_and_id_field() { + fn update_parses_file_and_id_field() { let parsed = parse(&[ "datasets", - "upload", + "update", "my-dataset", "--file", "records.jsonl", "--id-field", "metadata.case_id", ]) - .expect("parse upload"); - let DatasetsCommands::Upload(upload) = parsed.command.expect("subcommand") else { - panic!("expected upload command"); + .expect("parse update"); + let DatasetsCommands::Update(update) = parsed.command.expect("subcommand") else { + panic!("expected update command"); }; - assert_eq!(upload.name.name(), Some("my-dataset")); - assert_eq!(upload.input.file, Some(PathBuf::from("records.jsonl"))); - assert_eq!(upload.input.id_field, "metadata.case_id"); + assert_eq!(update.name.name(), Some("my-dataset")); + assert_eq!(update.input.file, Some(PathBuf::from("records.jsonl"))); + assert_eq!(update.input.id_field, "metadata.case_id"); } #[test] - fn upload_visible_aliases_parse() { - for alias in ["add", "append", "update"] { + fn update_visible_aliases_parse() { + for alias in ["add", "refresh"] { let parsed = parse(&[ "datasets", alias, @@ -388,32 +388,30 @@ mod tests { r#"[{"id":"case-1"}]"#, ]) .unwrap_or_else(|err| panic!("parse {alias} alias: {err}")); - let DatasetsCommands::Upload(upload) = parsed.command.expect("subcommand") else { - panic!("expected upload command"); + let DatasetsCommands::Update(update) = parsed.command.expect("subcommand") else { + panic!("expected update command"); }; - assert_eq!(upload.name.name(), Some("my-dataset")); - assert_eq!(upload.input.rows.as_deref(), Some(r#"[{"id":"case-1"}]"#)); + assert_eq!(update.name.name(), Some("my-dataset")); + assert_eq!(update.input.rows.as_deref(), Some(r#"[{"id":"case-1"}]"#)); } } #[test] - fn refresh_parses_prune() { + fn refresh_alias_parses_file_and_default_id_field() { let parsed = parse(&[ "datasets", "refresh", "my-dataset", "--file", "records.jsonl", - "--prune", ]) .expect("parse refresh"); - let DatasetsCommands::Refresh(refresh) = parsed.command.expect("subcommand") else { - panic!("expected refresh command"); + let DatasetsCommands::Update(update) = parsed.command.expect("subcommand") else { + panic!("expected update command"); }; - assert_eq!(refresh.name.name(), Some("my-dataset")); - assert_eq!(refresh.input.file, Some(PathBuf::from("records.jsonl"))); - assert!(refresh.prune); - assert_eq!(refresh.input.id_field, "id"); + assert_eq!(update.name.name(), Some("my-dataset")); + assert_eq!(update.input.file, Some(PathBuf::from("records.jsonl"))); + assert_eq!(update.input.id_field, "id"); } #[test] @@ -425,6 +423,8 @@ mod tests { "my-dataset", "--web", "--verbose", + "--limit", + "25", ]) .expect("parse view"); let DatasetsCommands::View(view) = parsed.command.expect("subcommand") else { @@ -433,6 +433,43 @@ mod tests { assert_eq!(view.name(), Some("my-dataset")); assert!(view.web); assert!(view.verbose); + assert_eq!(view.limit, Some(25)); + assert!(!view.all_rows); + } + + #[test] + fn view_parses_all_rows() { + let parsed = parse(&["datasets", "view", "my-dataset", "--all-rows"]).expect("parse view"); + let DatasetsCommands::View(view) = parsed.command.expect("subcommand") else { + panic!("expected view command"); + }; + assert_eq!(view.name(), Some("my-dataset")); + assert!(view.all_rows); + assert!(view.limit.is_none()); + } + + #[test] + fn view_limit_defaults_and_all_rows_override() { + let default_args = ViewArgs { + name: DatasetNameArgs { + name_positional: Some("dataset".to_string()), + name_flag: None, + }, + web: false, + verbose: false, + limit: None, + all_rows: false, + }; + assert_eq!( + resolve_view_row_limit(&default_args), + Some(DEFAULT_DATASETS_VIEW_ROW_LIMIT) + ); + + let all_rows_args = ViewArgs { + all_rows: true, + ..default_args + }; + assert_eq!(resolve_view_row_limit(&all_rows_args), None); } #[test] @@ -461,4 +498,61 @@ mod tests { }; assert_eq!(delete.name(), Some("positional-name")); } + + #[test] + fn datasets_routes_list_and_view_to_read_only_auth() { + assert!(datasets_command_is_read_only(None)); + assert!(datasets_command_is_read_only(Some(&DatasetsCommands::List))); + assert!(datasets_command_is_read_only(Some( + &DatasetsCommands::View(ViewArgs { + name: DatasetNameArgs { + name_positional: Some("dataset".to_string()), + name_flag: None, + }, + web: false, + verbose: false, + limit: None, + all_rows: false, + }) + ))); + } + + #[test] + fn datasets_routes_write_commands_to_validated_auth() { + assert!(!datasets_command_is_read_only(Some( + &DatasetsCommands::Create(CreateArgs { + name: DatasetNameArgs { + name_positional: Some("dataset".to_string()), + name_flag: None, + }, + input: DatasetInputArgs { + file: None, + rows: Some("[]".to_string()), + id_field: "id".to_string(), + }, + }) + ))); + assert!(!datasets_command_is_read_only(Some( + &DatasetsCommands::Update(UpdateArgs { + name: DatasetNameArgs { + name_positional: Some("dataset".to_string()), + name_flag: None, + }, + input: DatasetInputArgs { + file: None, + rows: Some("[]".to_string()), + id_field: "id".to_string(), + }, + }) + ))); + assert!(!datasets_command_is_read_only(Some( + &DatasetsCommands::Delete(DeleteArgs { + name: DatasetNameArgs { + name_positional: Some("dataset".to_string()), + name_flag: None, + }, + force: true, + }) + ))); + } } diff --git a/src/datasets/records.rs b/src/datasets/records.rs index 5d68e6a..b27f950 100644 --- a/src/datasets/records.rs +++ b/src/datasets/records.rs @@ -54,30 +54,6 @@ impl PreparedDatasetRecord { } } -pub(crate) fn delete_row(id: &str, dataset_id: &str) -> Map { - let mut row = Map::new(); - row.insert("id".to_string(), Value::String(id.to_string())); - row.insert( - "dataset_id".to_string(), - Value::String(dataset_id.to_string()), - ); - row.insert( - "created".to_string(), - Value::String(Utc::now().to_rfc3339()), - ); - row.insert("_object_delete".to_string(), Value::Bool(true)); - row -} - -pub(crate) fn load_upload_records( - input_path: Option<&Path>, - inline_rows: Option<&str>, - id_field: &str, -) -> Result> { - let raw = load_required_record_objects(input_path, inline_rows)?; - prepare_records(raw, id_field, false) -} - pub(crate) fn load_optional_upload_records( input_path: Option<&Path>, inline_rows: Option<&str>, @@ -104,7 +80,10 @@ pub(crate) fn remote_records_by_id( let mut records = HashMap::new(); for row in rows { if let Some(record) = prepared_record_from_remote_row(&row)? { - records.insert(record.id.clone(), record); + let record_id = record.id.clone(); + if records.insert(record_id.clone(), record).is_some() { + bail!("remote dataset contains duplicate record id '{record_id}'"); + } } } Ok(records) @@ -450,6 +429,13 @@ mod tests { assert_eq!(first[0].id, second[0].id); } + #[test] + fn load_refresh_records_requires_explicit_ids() { + let err = load_refresh_records(None, Some(r#"[{"input":{"text":"hello"}}]"#), "id") + .expect_err("refresh should require explicit ids"); + assert!(err.to_string().contains("missing a stable id at 'id'")); + } + #[test] fn remote_record_prefers_expected_over_output() { let row = serde_json::from_value::>(serde_json::json!({ @@ -464,6 +450,20 @@ mod tests { assert_eq!(record.expected, Some(Value::String("expected".to_string()))); } + #[test] + fn remote_records_by_id_rejects_duplicate_ids() { + let rows = vec![ + serde_json::from_value(serde_json::json!({"id": "dup", "expected": "a"})) + .expect("first row"), + serde_json::from_value(serde_json::json!({"id": "dup", "expected": "b"})) + .expect("second row"), + ]; + let err = remote_records_by_id(rows).expect_err("duplicate remote ids"); + assert!(err + .to_string() + .contains("remote dataset contains duplicate record id 'dup'")); + } + #[test] fn prepare_records_rejects_duplicate_ids() { let first = serde_json::from_value(serde_json::json!({"id": "dup"})).expect("map"); diff --git a/src/datasets/refresh.rs b/src/datasets/refresh.rs index fee507a..15ac698 100644 --- a/src/datasets/refresh.rs +++ b/src/datasets/refresh.rs @@ -1,15 +1,14 @@ -use std::collections::HashSet; use std::path::Path; use std::time::Duration; -use anyhow::Result; +use anyhow::{bail, Result}; use serde_json::json; use crate::ui::{print_command_status, with_spinner, with_spinner_visible, CommandStatus}; use super::{ api, - records::{delete_row, load_refresh_records, remote_records_by_id}, + records::{load_refresh_records, remote_records_by_id}, upload, ResolvedContext, }; @@ -19,18 +18,25 @@ pub async fn run( input_path: Option<&Path>, inline_rows: Option<&str>, id_field: &str, - prune: bool, json_output: bool, ) -> Result<()> { let dataset_name = upload::resolve_dataset_name(name, "refresh")?; let local_records = load_refresh_records(input_path, inline_rows, id_field)?; - let (dataset, created_dataset) = with_spinner_visible( + let existing_dataset = with_spinner_visible( "Resolving remote dataset...", - api::get_or_create_dataset(&ctx.client, &ctx.project.id, &dataset_name), + api::get_dataset_by_name(&ctx.client, &ctx.project.id, &dataset_name), Duration::from_millis(300), ) .await?; + let (dataset, created_dataset) = match existing_dataset { + Some(dataset) => (dataset, false), + None => bail!( + "dataset '{}' not found in project '{}'", + dataset_name, + ctx.project.name + ), + }; let remote_records = if created_dataset { Default::default() @@ -44,14 +50,11 @@ pub async fn run( }; let mut upload_rows = Vec::new(); - let mut local_ids = HashSet::new(); let mut created = 0usize; let mut updated = 0usize; let mut unchanged = 0usize; - let mut deleted = 0usize; for record in &local_records { - local_ids.insert(record.id.clone()); match remote_records.get(&record.id) { None => { created += 1; @@ -67,15 +70,6 @@ pub async fn run( } } - if prune { - for remote_id in remote_records.keys() { - if !local_ids.contains(remote_id) { - deleted += 1; - upload_rows.push(delete_row(remote_id, &dataset.id)); - } - } - } - if !upload_rows.is_empty() { upload::submit_rows( ctx, @@ -94,10 +88,8 @@ pub async fn run( "created_dataset": created_dataset, "created": created, "updated": updated, - "deleted": deleted, "unchanged": unchanged, - "pruned": prune, - "mode": "refresh", + "mode": "update", }))? ); return Ok(()); @@ -107,8 +99,8 @@ pub async fn run( format!("'{}' is already up to date.", dataset.name) } else { format!( - "Refreshed '{}' (created {}, updated {}, deleted {}, unchanged {}).", - dataset.name, created, updated, deleted, unchanged + "Updated '{}' (created {}, updated {}, unchanged {}).", + dataset.name, created, updated, unchanged ) }; print_command_status(CommandStatus::Success, &detail); diff --git a/src/datasets/upload.rs b/src/datasets/upload.rs index 63a9468..373a06f 100644 --- a/src/datasets/upload.rs +++ b/src/datasets/upload.rs @@ -1,71 +1,17 @@ -use std::{path::Path, time::Duration}; +use std::time::Duration; use anyhow::{anyhow, bail, Result}; use braintrust_sdk_rust::Logs3BatchUploader; use dialoguer::Input; -use serde_json::{json, Map, Value}; +use serde_json::{Map, Value}; -use crate::ui::{is_interactive, print_command_status, with_spinner_visible, CommandStatus}; +use crate::ui::{is_interactive, with_spinner_visible}; use super::{ - api, - records::{load_upload_records, PreparedDatasetRecord, DATASET_UPLOAD_BATCH_SIZE}, + records::{PreparedDatasetRecord, DATASET_UPLOAD_BATCH_SIZE}, ResolvedContext, }; -pub async fn run( - ctx: &ResolvedContext, - name: Option<&str>, - input_path: Option<&Path>, - inline_rows: Option<&str>, - id_field: &str, - json_output: bool, -) -> Result<()> { - let dataset_name = resolve_dataset_name(name, "upload")?; - let records = load_upload_records(input_path, inline_rows, id_field)?; - let record_count = records.len(); - - let (dataset, created) = with_spinner_visible( - "Resolving remote dataset...", - api::get_or_create_dataset(&ctx.client, &ctx.project.id, &dataset_name), - Duration::from_millis(300), - ) - .await?; - - submit_prepared_records( - ctx, - &dataset.id, - &records, - "Uploading dataset rows...", - "dataset upload failed", - ) - .await?; - - if json_output { - println!( - "{}", - serde_json::to_string(&json!({ - "dataset": dataset, - "created_dataset": created, - "uploaded": record_count, - "mode": "upload", - }))? - ); - return Ok(()); - } - - let detail = if created { - format!( - "Uploaded {record_count} records to '{}' and created the remote dataset.", - dataset.name - ) - } else { - format!("Uploaded {record_count} records to '{}'.", dataset.name) - }; - print_command_status(CommandStatus::Success, &detail); - Ok(()) -} - pub(crate) fn resolve_dataset_name(name: Option<&str>, command: &str) -> Result { match name { Some(name) if !name.trim().is_empty() => Ok(name.trim().to_string()), diff --git a/src/datasets/view.rs b/src/datasets/view.rs index ca1bf84..7bdd4b6 100644 --- a/src/datasets/view.rs +++ b/src/datasets/view.rs @@ -17,6 +17,7 @@ pub async fn run( json: bool, web: bool, verbose: bool, + max_rows: Option, ) -> Result<()> { let dataset = match name { Some(name) => with_spinner( @@ -47,9 +48,9 @@ pub async fn run( return Ok(()); } - let rows = with_spinner( + let (rows, rows_truncated) = with_spinner( "Loading dataset rows...", - api::list_dataset_rows(&ctx.client, &dataset.id), + api::list_dataset_rows_limited(&ctx.client, &dataset.id, max_rows), ) .await?; @@ -59,6 +60,8 @@ pub async fn run( serde_json::to_string(&serde_json::json!({ "dataset": dataset, "rows": rows, + "rows_truncated": rows_truncated, + "row_limit": max_rows, }))? ); return Ok(()); @@ -91,7 +94,14 @@ pub async fn run( if let Some(created) = dataset.created_text() { writeln!(output, "{} {}", console::style("Created:").dim(), created)?; } - writeln!(output, "{} {}", console::style("Rows:").dim(), rows.len())?; + if rows_truncated { + let label = max_rows + .map(|max_rows| format!("{} (truncated to {})", rows.len(), max_rows)) + .unwrap_or_else(|| rows.len().to_string()); + writeln!(output, "{} {}", console::style("Rows:").dim(), label)?; + } else { + writeln!(output, "{} {}", console::style("Rows:").dim(), rows.len())?; + } writeln!( output, @@ -111,6 +121,16 @@ pub async fn run( .dim() )?; } + if rows_truncated { + writeln!( + output, + "{}", + console::style( + "Row output was truncated. Re-run with --all-rows or a larger --limit to inspect more rows." + ) + .dim() + )?; + } writeln!(output, "{}", serde_json::to_string_pretty(&display_rows)?)?; print_with_pager(&output)?; diff --git a/src/project_context.rs b/src/project_context.rs index 86d9e82..7f9c8f0 100644 --- a/src/project_context.rs +++ b/src/project_context.rs @@ -48,10 +48,6 @@ pub(crate) async fn resolve_required_project( .ok_or_else(|| anyhow!("--project required (or set BRAINTRUST_DEFAULT_PROJECT)")) } -pub(crate) async fn resolve_project_command_context(base: &BaseArgs) -> Result { - resolve_project_command_context_with_auth_mode(base, false).await -} - pub(crate) async fn resolve_project_command_context_with_auth_mode( base: &BaseArgs, read_only: bool, diff --git a/tests/datasets-fixtures/upload-refresh/fixture.json b/tests/datasets-fixtures/upload-refresh/fixture.json index 06f06f5..6cbbdfc 100644 --- a/tests/datasets-fixtures/upload-refresh/fixture.json +++ b/tests/datasets-fixtures/upload-refresh/fixture.json @@ -12,7 +12,7 @@ "datasets", "--json", "--no-input", - "append", + "create", "qa-fixture", "--file", "upload.jsonl" @@ -20,7 +20,7 @@ "stdout_contains": [ "\"uploaded\":2", "\"created_dataset\":true", - "\"mode\":\"upload\"" + "\"mode\":\"create\"" ] }, { @@ -31,16 +31,13 @@ "refresh", "qa-fixture", "--file", - "refresh.jsonl", - "--prune" + "refresh.jsonl" ], "stdout_contains": [ "\"created\":1", "\"updated\":1", - "\"deleted\":1", "\"unchanged\":0", - "\"pruned\":true", - "\"mode\":\"refresh\"" + "\"mode\":\"update\"" ] }, { @@ -51,16 +48,13 @@ "refresh", "qa-fixture", "--file", - "refresh.jsonl", - "--prune" + "refresh.jsonl" ], "stdout_contains": [ "\"created\":0", "\"updated\":0", - "\"deleted\":0", "\"unchanged\":2", - "\"pruned\":true", - "\"mode\":\"refresh\"" + "\"mode\":\"update\"" ] }, { @@ -68,9 +62,10 @@ "stdout_contains": [ "\"id\":\"case-1\"", "\"expected\":\"A2\"", - "\"id\":\"case-3\"" + "\"id\":\"case-3\"", + "\"id\":\"case-2\"" ], - "stdout_not_contains": ["\"id\":\"case-2\""] + "stdout_not_contains": [] } ], "expected_logs3_requests": 2 From 20e9b29dd3f8575ab46896d446c8df2febe283d5 Mon Sep 17 00:00:00 2001 From: Parker Henderson Date: Mon, 13 Apr 2026 10:02:55 -0700 Subject: [PATCH 5/6] feat(datasets): add description support for dataset creation --- README.md | 1 + src/datasets/api.rs | 12 ++++++++++-- src/datasets/create.rs | 3 ++- src/datasets/mod.rs | 18 ++++++++++++++++++ 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f19e469..456b5d4 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,7 @@ bt eval foo.eval.ts -- --description "Prod" --shard=1/4 - `bt datasets` works directly against remote Braintrust datasets — no local `bt sync` artifact flow is required. - `bt datasets create my-dataset` — create an empty remote dataset in the current project. +- `bt datasets create my-dataset --description "Dataset for smoke tests"` — create a dataset with a description. - `bt datasets create my-dataset --file records.jsonl` — create the remote dataset and seed it from a JSON/JSONL file. - `cat records.jsonl | bt datasets create my-dataset` — create the dataset and seed it from stdin. - `bt datasets create my-dataset --rows '[{"id":"case-1","input":{"text":"hi"},"expected":"hello"}]'` — create the dataset from inline JSON rows. diff --git a/src/datasets/api.rs b/src/datasets/api.rs index bb786dc..8f9f48d 100644 --- a/src/datasets/api.rs +++ b/src/datasets/api.rs @@ -182,12 +182,20 @@ pub async fn list_dataset_rows_limited( Ok((rows, truncated)) } -pub async fn create_dataset(client: &ApiClient, project_id: &str, name: &str) -> Result { - let body = serde_json::json!({ +pub async fn create_dataset( + client: &ApiClient, + project_id: &str, + name: &str, + description: Option<&str>, +) -> Result { + let mut body = serde_json::json!({ "name": name, "project_id": project_id, "org_name": client.org_name(), }); + if let Some(description) = description.filter(|description| !description.is_empty()) { + body["description"] = serde_json::Value::String(description.to_string()); + } client.post("/v1/dataset", &body).await } diff --git a/src/datasets/create.rs b/src/datasets/create.rs index 637082d..c6485d3 100644 --- a/src/datasets/create.rs +++ b/src/datasets/create.rs @@ -10,6 +10,7 @@ use super::{api, records::load_optional_upload_records, upload, ResolvedContext} pub async fn run( ctx: &ResolvedContext, name: Option<&str>, + description: Option<&str>, input_path: Option<&Path>, inline_rows: Option<&str>, id_field: &str, @@ -34,7 +35,7 @@ pub async fn run( let dataset = match with_spinner_visible( "Creating dataset...", - api::create_dataset(&ctx.client, &ctx.project.id, &name), + api::create_dataset(&ctx.client, &ctx.project.id, &name, description), Duration::from_millis(300), ) .await diff --git a/src/datasets/mod.rs b/src/datasets/mod.rs index 3f91a39..6c83a29 100644 --- a/src/datasets/mod.rs +++ b/src/datasets/mod.rs @@ -77,6 +77,7 @@ struct DatasetInputArgs { #[command(after_help = r#"Examples: bt datasets list bt datasets create my-dataset + bt datasets create my-dataset --description "Dataset for smoke tests" bt datasets create my-dataset --file records.jsonl cat records.jsonl | bt datasets create my-dataset bt datasets create my-dataset --rows '[{"id":"case-1","input":{"text":"hi"},"expected":"hello"}]' @@ -111,6 +112,15 @@ struct CreateArgs { #[command(flatten)] name: DatasetNameArgs, + /// Optional dataset description. + #[arg( + long, + short = 'd', + env = "BT_DATASETS_DESCRIPTION", + value_name = "TEXT" + )] + description: Option, + #[command(flatten)] input: DatasetInputArgs, } @@ -229,6 +239,7 @@ pub async fn run(base: BaseArgs, args: DatasetsArgs) -> Result<()> { create::run( &ctx, create_args.name(), + create_args.description.as_deref(), create_args.input.file.as_deref(), create_args.input.rows.as_deref(), &create_args.input.id_field, @@ -328,6 +339,8 @@ mod tests { "datasets", "create", "my-dataset", + "--description", + "Dataset for smoke tests", "--file", "records.jsonl", "--id-field", @@ -338,6 +351,10 @@ mod tests { panic!("expected create command"); }; assert_eq!(create.name(), Some("my-dataset")); + assert_eq!( + create.description.as_deref(), + Some("Dataset for smoke tests") + ); assert_eq!(create.input.file, Some(PathBuf::from("records.jsonl"))); assert_eq!(create.input.id_field, "metadata.case_id"); assert!(create.input.rows.is_none()); @@ -525,6 +542,7 @@ mod tests { name_positional: Some("dataset".to_string()), name_flag: None, }, + description: None, input: DatasetInputArgs { file: None, rows: Some("[]".to_string()), From 97b2a6aef1c2c64cc9d554d2674a6f57e39d1218 Mon Sep 17 00:00:00 2001 From: Parker Henderson Date: Mon, 13 Apr 2026 10:31:47 -0700 Subject: [PATCH 6/6] docs: update dataset records documentation and validation --- README.md | 4 +- src/datasets/records.rs | 62 ++++++++++++++++++- .../upload-refresh/fixture.json | 15 +++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 456b5d4..53acbc0 100644 --- a/README.md +++ b/README.md @@ -160,11 +160,13 @@ bt eval foo.eval.ts -- --description "Prod" --shard=1/4 - `bt datasets create my-dataset --file records.jsonl` — create the remote dataset and seed it from a JSON/JSONL file. - `cat records.jsonl | bt datasets create my-dataset` — create the dataset and seed it from stdin. - `bt datasets create my-dataset --rows '[{"id":"case-1","input":{"text":"hi"},"expected":"hello"}]'` — create the dataset from inline JSON rows. +- `bt datasets create my-dataset --rows '[{"input":{"text":"hi"},"expected":"hello"}]'` — create a dataset when rows do not include `id`; bt auto-generates deterministic record IDs. - `bt datasets update my-dataset --file records.jsonl` — deterministically upsert rows by stable record id. - `bt datasets add my-dataset --rows '[{"id":"case-2","input":{"text":"bye"},"expected":"goodbye"}]'` — alias for `update`. - `bt datasets refresh my-dataset --file records.jsonl --id-field metadata.case_id` — alias for `update` with explicit id path (fails if the dataset does not exist, and does not delete remote rows missing from the input). - `bt datasets view my-dataset` — show dataset metadata and row payloads; defaults to loading up to 200 rows. Use `--limit ` to adjust or `--all-rows` to load everything. -- `update`/`add`/`refresh` require stable IDs (via `id` or your `--id-field` path). Accepted record fields are `id`, `input`, `expected`, `metadata`, and `tags`. +- `update`/`add`/`refresh` require explicit stable IDs via `id` or `--id-field`. +- Accepted top-level record fields are `id`, `input`, `expected`, `output`, `metadata`, and `tags` (plus the root field referenced by `--id-field`, if different). ## `bt sql` diff --git a/src/datasets/records.rs b/src/datasets/records.rs index b27f950..bda5ecc 100644 --- a/src/datasets/records.rs +++ b/src/datasets/records.rs @@ -227,10 +227,12 @@ fn prepare_records( require_ids: bool, ) -> Result> { let id_path = parse_id_field_path(id_field)?; + let id_root = id_path.first().cloned().expect("id path must be non-empty"); let mut records = Vec::with_capacity(raw_records.len()); let mut seen_ids = HashSet::new(); for (row_index, raw_record) in raw_records.into_iter().enumerate() { + validate_supported_fields(&raw_record, &id_root, row_index)?; let record = prepared_record_from_input_object(raw_record, &id_path, require_ids, row_index)?; if !seen_ids.insert(record.id.clone()) { @@ -246,6 +248,37 @@ fn prepare_records( Ok(records) } +fn validate_supported_fields( + object: &Map, + id_root: &str, + row_index: usize, +) -> Result<()> { + const BASE_ALLOWED_FIELDS: [&str; 6] = + ["id", "input", "expected", "output", "metadata", "tags"]; + let mut unsupported = object + .keys() + .filter(|field| !BASE_ALLOWED_FIELDS.contains(&field.as_str()) && field.as_str() != id_root) + .cloned() + .collect::>(); + + if unsupported.is_empty() { + return Ok(()); + } + + unsupported.sort(); + let id_clause = if BASE_ALLOWED_FIELDS.contains(&id_root) { + String::new() + } else { + format!(", and id root '{}'", id_root) + }; + bail!( + "dataset record {} contains unsupported top-level field(s): {}. Allowed fields are id, input, expected, output, metadata, tags{}", + row_index + 1, + unsupported.join(", "), + id_clause + ); +} + fn prepared_record_from_input_object( object: Map, id_path: &[String], @@ -259,7 +292,7 @@ fn prepared_record_from_input_object( let id = match explicit_id { Some(id) => id, None if require_id => bail!( - "dataset record {} is missing a stable id at '{}'; pass --id-field or include an id field", + "dataset record {} is missing a stable id at '{}'. `bt datasets update`/`add`/`refresh` require explicit ids; include an id field or pass --id-field", row_index + 1, id_path.join(".") ), @@ -434,6 +467,9 @@ mod tests { let err = load_refresh_records(None, Some(r#"[{"input":{"text":"hello"}}]"#), "id") .expect_err("refresh should require explicit ids"); assert!(err.to_string().contains("missing a stable id at 'id'")); + assert!(err + .to_string() + .contains("update`/`add`/`refresh` require explicit ids")); } #[test] @@ -473,4 +509,28 @@ mod tests { .to_string() .contains("duplicate dataset record id 'dup'")); } + + #[test] + fn prepare_records_rejects_unsupported_top_level_fields() { + let record = + serde_json::from_value(serde_json::json!({"id": "case-1", "foo": "bar"})).expect("map"); + let err = prepare_records(vec![record], "id", true) + .expect_err("unsupported top-level field should error"); + assert!(err + .to_string() + .contains("unsupported top-level field(s): foo")); + } + + #[test] + fn prepare_records_allows_custom_id_root_field() { + let record = serde_json::from_value(serde_json::json!({ + "custom": {"record_id": "case-1"}, + "input": {"prompt": "hello"}, + "expected": "world" + })) + .expect("map"); + let prepared = prepare_records(vec![record], "custom.record_id", true) + .expect("custom id-field root should be allowed"); + assert_eq!(prepared[0].id, "case-1"); + } } diff --git a/tests/datasets-fixtures/upload-refresh/fixture.json b/tests/datasets-fixtures/upload-refresh/fixture.json index 6cbbdfc..fa4a119 100644 --- a/tests/datasets-fixtures/upload-refresh/fixture.json +++ b/tests/datasets-fixtures/upload-refresh/fixture.json @@ -23,6 +23,21 @@ "\"mode\":\"create\"" ] }, + { + "command": [ + "datasets", + "--no-input", + "update", + "qa-fixture", + "--rows", + "[{\"input\":{\"prompt\":\"missing-id\"},\"expected\":\"nope\"}]" + ], + "expect_success": false, + "stderr_contains": [ + "missing a stable id at 'id'", + "update`/`add`/`refresh` require explicit ids" + ] + }, { "command": [ "datasets",