Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/store/migrations/003_blob_index_unique.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE UNIQUE INDEX IF NOT EXISTS idx_blobs_ns_height_index ON blobs(namespace, height, blob_index);
DROP INDEX IF EXISTS idx_blobs_ns_height;
41 changes: 33 additions & 8 deletions pkg/store/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ func configureSQLite(ctx context.Context, db *sql.DB) error {
if _, err := db.ExecContext(ctx, "PRAGMA foreign_keys=ON"); err != nil {
return fmt.Errorf("set foreign_keys: %w", err)
}
// NORMAL is crash-safe with WAL and avoids an extra fsync per commit.
if _, err := db.ExecContext(ctx, "PRAGMA synchronous=NORMAL"); err != nil {
return fmt.Errorf("set synchronous: %w", err)
}
// 64 MB page cache (negative value = KiB).
if _, err := db.ExecContext(ctx, "PRAGMA cache_size=-65536"); err != nil {
return fmt.Errorf("set cache_size: %w", err)
}
// Keep temp tables and sort spills in memory.
if _, err := db.ExecContext(ctx, "PRAGMA temp_store=MEMORY"); err != nil {
return fmt.Errorf("set temp_store: %w", err)
}
return nil
}

Expand All @@ -102,6 +114,7 @@ type migrationStep struct {
var allMigrations = []migrationStep{
{version: 1, file: "migrations/001_init.sql"},
{version: 2, file: "migrations/002_commitment_index.sql"},
{version: 3, file: "migrations/003_blob_index_unique.sql"},
}

func (s *SQLiteStore) migrate() error {
Expand Down Expand Up @@ -170,14 +183,23 @@ func (s *SQLiteStore) PutBlobs(ctx context.Context, blobs []types.Blob) error {

for i := range blobs {
b := &blobs[i]
if err := ensureSQLiteBlobInvariant(ctx, tx, b); err != nil {
return err
}
if _, err := stmt.ExecContext(ctx,
res, err := stmt.ExecContext(ctx,
b.Height, b.Namespace[:], b.Commitment, b.Data, b.ShareVersion, b.Signer, b.Index,
); err != nil {
)
if err != nil {
return fmt.Errorf("insert blob at height %d index %d: %w", b.Height, b.Index, err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("rows affected at height %d index %d: %w", b.Height, b.Index, err)
}
if n == 0 {
// INSERT OR IGNORE skipped this row — a unique constraint matched.
// Verify it is an idempotent re-insert, not a data conflict.
if err := verifyBlobNotConflicting(ctx, tx, b); err != nil {
return err
}
}
}

return tx.Commit()
Expand Down Expand Up @@ -367,12 +389,15 @@ func scanBlobRow(rows *sql.Rows) (types.Blob, error) {
return b, nil
}

func ensureSQLiteBlobInvariant(ctx context.Context, tx *sql.Tx, b *types.Blob) error {
existingByIndex, err := queryBlobByIndex(ctx, tx, b.Namespace, b.Height, b.Index)
// verifyBlobNotConflicting is called only when INSERT OR IGNORE skipped a row.
// It distinguishes an idempotent re-insert (same data) from a true conflict
// (different data at the same position or commitment).
func verifyBlobNotConflicting(ctx context.Context, tx *sql.Tx, b *types.Blob) error {
existing, err := queryBlobByIndex(ctx, tx, b.Namespace, b.Height, b.Index)
if err != nil {
return err
}
if existingByIndex != nil && !sameBlob(existingByIndex, b) {
if existing != nil && !sameBlob(existing, b) {
return fmt.Errorf("blob conflict at height %d namespace %s index %d", b.Height, b.Namespace, b.Index)
Comment on lines +400 to 401
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use hex formatting for namespace in error message.

b.Namespace is a byte array. Using %s may produce non-printable or garbled output. Use %x for consistent hex representation.

Proposed fix
 	if existing != nil && !sameBlob(existing, b) {
-		return fmt.Errorf("blob conflict at height %d namespace %s index %d", b.Height, b.Namespace, b.Index)
+		return fmt.Errorf("blob conflict at height %d namespace %x index %d", b.Height, b.Namespace, b.Index)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if existing != nil && !sameBlob(existing, b) {
return fmt.Errorf("blob conflict at height %d namespace %s index %d", b.Height, b.Namespace, b.Index)
if existing != nil && !sameBlob(existing, b) {
return fmt.Errorf("blob conflict at height %d namespace %x index %d", b.Height, b.Namespace, b.Index)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/sqlite.go` around lines 400 - 401, The error message uses %s for
b.Namespace (a byte slice) which can produce garbled output; update the
fmt.Errorf call inside the conflict check (the branch with existing != nil &&
!sameBlob(existing, b)) to format the namespace as hex by replacing %s with %x
so the call becomes fmt.Errorf("blob conflict at height %d namespace %x index
%d", b.Height, b.Namespace, b.Index).

}

Expand Down
Loading