From c7462135a93a8cfdaf282b0317ae343242d74b00 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Thu, 19 Mar 2026 13:36:03 +0100 Subject: [PATCH] performance --- .../migrations/003_blob_index_unique.sql | 2 + pkg/store/sqlite.go | 41 +++++++++++++++---- 2 files changed, 35 insertions(+), 8 deletions(-) create mode 100644 pkg/store/migrations/003_blob_index_unique.sql diff --git a/pkg/store/migrations/003_blob_index_unique.sql b/pkg/store/migrations/003_blob_index_unique.sql new file mode 100644 index 0000000..6a9bbf7 --- /dev/null +++ b/pkg/store/migrations/003_blob_index_unique.sql @@ -0,0 +1,2 @@ +CREATE UNIQUE INDEX IF NOT EXISTS idx_blobs_ns_height_index ON blobs(namespace, height, blob_index); +DROP INDEX IF EXISTS idx_blobs_ns_height; diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index 8659966..852ef68 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -89,6 +89,18 @@ func configureSQLite(ctx context.Context, db *sql.DB) error { if _, err := db.ExecContext(ctx, "PRAGMA foreign_keys=ON"); err != nil { return fmt.Errorf("set foreign_keys: %w", err) } + // NORMAL is crash-safe with WAL and avoids an extra fsync per commit. + if _, err := db.ExecContext(ctx, "PRAGMA synchronous=NORMAL"); err != nil { + return fmt.Errorf("set synchronous: %w", err) + } + // 64 MB page cache (negative value = KiB). + if _, err := db.ExecContext(ctx, "PRAGMA cache_size=-65536"); err != nil { + return fmt.Errorf("set cache_size: %w", err) + } + // Keep temp tables and sort spills in memory. + if _, err := db.ExecContext(ctx, "PRAGMA temp_store=MEMORY"); err != nil { + return fmt.Errorf("set temp_store: %w", err) + } return nil } @@ -102,6 +114,7 @@ type migrationStep struct { var allMigrations = []migrationStep{ {version: 1, file: "migrations/001_init.sql"}, {version: 2, file: "migrations/002_commitment_index.sql"}, + {version: 3, file: "migrations/003_blob_index_unique.sql"}, } func (s *SQLiteStore) migrate() error { @@ -170,14 +183,23 @@ func (s *SQLiteStore) PutBlobs(ctx context.Context, blobs []types.Blob) error { for i := range blobs { b := &blobs[i] - if err := ensureSQLiteBlobInvariant(ctx, tx, b); err != nil { - return err - } - if _, err := stmt.ExecContext(ctx, + res, err := stmt.ExecContext(ctx, b.Height, b.Namespace[:], b.Commitment, b.Data, b.ShareVersion, b.Signer, b.Index, - ); err != nil { + ) + if err != nil { return fmt.Errorf("insert blob at height %d index %d: %w", b.Height, b.Index, err) } + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("rows affected at height %d index %d: %w", b.Height, b.Index, err) + } + if n == 0 { + // INSERT OR IGNORE skipped this row — a unique constraint matched. + // Verify it is an idempotent re-insert, not a data conflict. + if err := verifyBlobNotConflicting(ctx, tx, b); err != nil { + return err + } + } } return tx.Commit() @@ -367,12 +389,15 @@ func scanBlobRow(rows *sql.Rows) (types.Blob, error) { return b, nil } -func ensureSQLiteBlobInvariant(ctx context.Context, tx *sql.Tx, b *types.Blob) error { - existingByIndex, err := queryBlobByIndex(ctx, tx, b.Namespace, b.Height, b.Index) +// verifyBlobNotConflicting is called only when INSERT OR IGNORE skipped a row. +// It distinguishes an idempotent re-insert (same data) from a true conflict +// (different data at the same position or commitment). +func verifyBlobNotConflicting(ctx context.Context, tx *sql.Tx, b *types.Blob) error { + existing, err := queryBlobByIndex(ctx, tx, b.Namespace, b.Height, b.Index) if err != nil { return err } - if existingByIndex != nil && !sameBlob(existingByIndex, b) { + if existing != nil && !sameBlob(existing, b) { return fmt.Errorf("blob conflict at height %d namespace %s index %d", b.Height, b.Namespace, b.Index) }