@grandgular/rx extends RxJS with Angular-optimized operators, providing enhanced reactive programming capabilities for Angular applications.
- 🧠 Stateful Operators – Advanced operators with configurable memory and state management
- ⚡ Angular-First – Designed to work seamlessly with Angular's change detection and lifecycle
- 🏎️ Performance-Optimized – Minimal overhead, maximum efficiency for high-frequency streams
- 🛠️ Type-Safe – Full TypeScript support with strict interfaces
- 🔄 Reactive Patterns – Simplify common RxJS challenges in Angular apps
- 📦 Lightweight – Tiny bundle size with zero unnecessary dependencies
🔹 Angular Community:
Join our Telegram channel for Angular tips and discussions:
🔹 Author:
Connect with me on LinkedIn:
tapFirst
log
distinctUntilChangedWithMemory
redirectOnHttpError
npm install @grandgular/rxAn enhanced version of RxJS's tap() that executes side-effects only for the first emitted value, with optional state isolation for individual subscribers.
- Selective side-effects - Execute callbacks only for the first value in the stream
- Dual mode operation - Choose between shared or isolated state per subscriber
- Memory efficient - No additional subscriptions or complex state management
- Type-safe - Full TypeScript support with proper generics
- RxJS native - Seamlessly integrates with existing RxJS pipelines
import { of } from "rxjs";
import { tapFirst } from "./tap-first";
const shared$ = of(1, 2, 3);
shared$.pipe(tapFirst((value) => console.log("First value detected:", value))).subscribe(console.log);
// Output:
// First value detected: 1
// 1
// 2
// 3const numbers$ = of(10, 20, 30);
// Isolated state
const isolated$ = numbers$.pipe(tapFirst((val) => console.log("Isolated first:", val), true));
isolated$.subscribe(); // Logs: "Isolated first: 10"
isolated$.subscribe(); // Logs: "Isolated first: 10" - fresh state for each subscriberA custom RxJS operator for logging stream values, errors, and completion events.
Useful for inspecting observable streams without modifying their data flow.
import { of } from "rxjs";
import { log } from "@grandgular/rx";
// Logs values and completion as-is
of(1, 2, 3).pipe(log()).subscribe();
// output: 1
// output: 2
// output: 3
// output: Completed
// Logs with a prefix for easier identification
of(4, Error("not 5"), 6).pipe(log("MyStream")).subscribe();
// output: [MyStream] 4
// output: [MyStream] Error: not 5
// output: [MyStream] 6
// output: [MyStream] Completed
// Logs using a custom formatter function
of(7, 8, 9)
.pipe(log((v) => v * 1000))
.subscribe();
// output: 7000
// output: 8000
// output: 9000
// output: CompletedAn enhanced version of RxJS's distinctUntilChanged() that maintains a memory of previous values, allowing for more sophisticated duplicate detection.
- Configurable memory size - Remember last N values (default: Infinity)
- Custom comparison - Define your own duplicate detection logic
- Angular-optimized - Works seamlessly with Angular's change detection
- Type-safe - Full TypeScript support
const nums$ = of(1, 2, 3, 4, 1, 2, 5);
nums$
.pipe(
distinctUntilChangedWithMemory(), // Defaults to Infinity
)
.subscribe(console.log);
// Output: 1, 2, 3, 4, 5const source$ = of(1, 2, 3, 1, 2, 3, 4, 3, 5);
source$
.pipe(
distinctUntilChangedWithMemory(3), // Remember last 3 values
)
.subscribe(console.log);
// Output: 1, 2, 3, 4, 5
// Explanation: The second '1' and '2' are filtered because they're in the memoryconst words$ = of("Apple", "Banana", "apple", "orange", "Apple");
words$
.pipe(
distinctUntilChangedWithMemory({
memorySize: 4,
comparator: (prev, curr) => prev.some((word) => word.toLowerCase() === curr.toLowerCase()),
}),
)
.subscribe(console.log);
// Output: 'Apple', 'Banana', 'orange'An RxJS operator that handles HTTP errors by redirecting to specified routes based on error status codes, while maintaining the original error for downstream processing.
- Status-based routing - Map specific HTTP status codes to different routes
- Flexible configuration - Accepts either a route map or a single fallback route
- History control - Option to skip location change in browser history
- Error logging - Optional callback for error logging
- Type-safe - Full TypeScript support with HttpStatusCode enum
import { redirectOnHttpError } from "./path/to/operator";
this.http
.get("/api")
.pipe(
redirectOnHttpError(this.router, {
redirectConfig: "/error", // Single fallback route
}),
)
.subscribe({
next: (data) => handleData(data),
error: (err) => console.error("Original error preserved:", err),
});this.http
.get("/api")
.pipe(
redirectOnHttpError(this.router, {
redirectConfig: {
[HttpStatusCode.NotFound]: "/not-found",
[HttpStatusCode.Forbidden]: "/access-denied",
default: "/server-error",
},
skipLocationChange: true, // Won't add to browser history
}),
)
.subscribe();this.http
.post("/api", payload)
.pipe(
redirectOnHttpError(this.router, {
redirectConfig: {
[HttpStatusCode.BadRequest]: "/invalid-request",
default: "/error",
},
logError: (error) => this.errorService.log(error), // Custom logging
}),
)
.subscribe();| Option | Type | Description |
|---|---|---|
redirectConfig |
string OR ErrorRoutesTypes |
Either a single fallback route or a map of status codes to routes (must include 'default') |
skipLocationChange |
boolean (optional) |
If true, navigation won't push a new state into browser history (default: false) |
logError |
(error: unknown) => void (optional) |
Callback for custom error logging |
MIT © Grandgular