Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,11 @@ const EnvironmentSchema = z
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB

MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(1_000), // 1 second
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
QUEUE_SIZE_CACHE_ENABLED: z.coerce.number().int().optional().default(1),
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),

Expand Down Expand Up @@ -591,6 +594,16 @@ const EnvironmentSchema = z
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(),

// TTL System settings for automatic run expiration
RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false),
RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(),
RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000),
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),

/** Optional maximum TTL for all runs (e.g. "14d"). If set, runs without an explicit TTL
* will use this as their TTL, and runs with a TTL larger than this will be clamped. */
RUN_ENGINE_DEFAULT_MAX_TTL: z.string().optional(),

RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { marqs } from "~/v3/marqs/index.server";
import { engine } from "~/v3/runEngine.server";
import { getQueueSizeLimit } from "~/v3/utils/queueLimits.server";
import { BasePresenter } from "./basePresenter.server";

export type Environment = {
Expand All @@ -9,6 +10,7 @@ export type Environment = {
concurrencyLimit: number;
burstFactor: number;
runsEnabled: boolean;
queueSizeLimit: number | null;
};

export class EnvironmentQueuePresenter extends BasePresenter {
Expand All @@ -30,19 +32,24 @@ export class EnvironmentQueuePresenter extends BasePresenter {
},
select: {
runsEnabled: true,
maximumDevQueueSize: true,
maximumDeployedQueueSize: true,
},
});

if (!organization) {
throw new Error("Organization not found");
}

const queueSizeLimit = getQueueSizeLimit(environment.type, organization);

return {
running,
queued,
concurrencyLimit: environment.maximumConcurrencyLimit,
burstFactor: environment.concurrencyLimitBurstFactor.toNumber(),
runsEnabled: environment.type === "DEVELOPMENT" || organization.runsEnabled,
queueSizeLimit,
};
}
}
51 changes: 36 additions & 15 deletions apps/webapp/app/presenters/v3/LimitsPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Ratelimit } from "@upstash/ratelimit";
import { RuntimeEnvironmentType } from "@trigger.dev/database";
import { createHash } from "node:crypto";
import { env } from "~/env.server";
import { getCurrentPlan } from "~/services/platform.v3.server";
Expand All @@ -12,6 +13,8 @@ import { BasePresenter } from "./basePresenter.server";
import { singleton } from "~/utils/singleton";
import { logger } from "~/services/logger.server";
import { CheckScheduleService } from "~/v3/services/checkSchedule.server";
import { engine } from "~/v3/runEngine.server";
import { getQueueSizeLimit, getQueueSizeLimitSource } from "~/v3/utils/queueLimits.server";

// Create a singleton Redis client for rate limit queries
const rateLimitRedisClient = singleton("rateLimitQueryRedisClient", () =>
Expand Down Expand Up @@ -66,8 +69,7 @@ export type LimitsResult = {
logRetentionDays: QuotaInfo | null;
realtimeConnections: QuotaInfo | null;
batchProcessingConcurrency: QuotaInfo;
devQueueSize: QuotaInfo;
deployedQueueSize: QuotaInfo;
queueSize: QuotaInfo;
};
features: {
hasStagingEnvironment: FeatureInfo;
Expand All @@ -84,11 +86,13 @@ export class LimitsPresenter extends BasePresenter {
organizationId,
projectId,
environmentId,
environmentType,
environmentApiKey,
}: {
organizationId: string;
projectId: string;
environmentId: string;
environmentType: RuntimeEnvironmentType;
environmentApiKey: string;
}): Promise<LimitsResult> {
// Get organization with all limit-related fields
Expand Down Expand Up @@ -167,6 +171,30 @@ export class LimitsPresenter extends BasePresenter {
batchRateLimitConfig
);

// Get current queue size for this environment
// We need the runtime environment fields for the engine query
const runtimeEnv = await this._replica.runtimeEnvironment.findFirst({
where: { id: environmentId },
select: {
id: true,
maximumConcurrencyLimit: true,
concurrencyLimitBurstFactor: true,
},
});

let currentQueueSize = 0;
if (runtimeEnv) {
const engineEnv = {
id: runtimeEnv.id,
type: environmentType,
maximumConcurrencyLimit: runtimeEnv.maximumConcurrencyLimit,
concurrencyLimitBurstFactor: runtimeEnv.concurrencyLimitBurstFactor,
organization: { id: organizationId },
project: { id: projectId },
};
currentQueueSize = (await engine.lengthOfEnvQueue(engineEnv)) ?? 0;
}

// Get plan-level limits
const schedulesLimit = limits?.schedules?.number ?? null;
const teamMembersLimit = limits?.teamMembers?.number ?? null;
Expand Down Expand Up @@ -282,19 +310,12 @@ export class LimitsPresenter extends BasePresenter {
canExceed: true,
isUpgradable: true,
},
devQueueSize: {
name: "Dev queue size",
description: "Maximum pending runs in development environments",
limit: organization.maximumDevQueueSize ?? null,
currentUsage: 0, // Would need to query Redis for this
source: organization.maximumDevQueueSize ? "override" : "default",
},
deployedQueueSize: {
name: "Deployed queue size",
description: "Maximum pending runs in deployed environments",
limit: organization.maximumDeployedQueueSize ?? null,
currentUsage: 0, // Would need to query Redis for this
source: organization.maximumDeployedQueueSize ? "override" : "default",
queueSize: {
name: "Max queued runs",
description: "Maximum pending runs per individual queue in this environment",
limit: getQueueSizeLimit(environmentType, organization),
currentUsage: currentQueueSize,
source: getQueueSizeLimitSource(environmentType, organization),
},
},
features: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
organizationId: project.organizationId,
projectId: project.id,
environmentId: environment.id,
environmentType: environment.type,
environmentApiKey: environment.apiKey,
})
);
Expand Down Expand Up @@ -507,9 +508,8 @@ function QuotasSection({
// Include batch processing concurrency
quotaRows.push(quotas.batchProcessingConcurrency);

// Add queue size quotas if set
if (quotas.devQueueSize.limit !== null) quotaRows.push(quotas.devQueueSize);
if (quotas.deployedQueueSize.limit !== null) quotaRows.push(quotas.deployedQueueSize);
// Add queue size quota if set
if (quotas.queueSize.limit !== null) quotaRows.push(quotas.queueSize);

return (
<div className="flex flex-col gap-3">
Expand Down Expand Up @@ -556,9 +556,12 @@ function QuotaRow({
billingPath: string;
}) {
// For log retention, we don't show current usage as it's a duration, not a count
// For queue size, we don't show current usage as the limit is per-queue, not environment-wide
const isRetentionQuota = quota.name === "Log retention";
const isQueueSizeQuota = quota.name === "Max queued runs";
const hideCurrentUsage = isRetentionQuota || isQueueSizeQuota;
const percentage =
!isRetentionQuota && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;
!hideCurrentUsage && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;

// Special handling for Log retention
if (quota.name === "Log retention") {
Expand Down Expand Up @@ -657,10 +660,10 @@ function QuotaRow({
alignment="right"
className={cn(
"tabular-nums",
isRetentionQuota ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
hideCurrentUsage ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
)}
>
{isRetentionQuota ? "–" : formatNumber(quota.currentUsage)}
{hideCurrentUsage ? "–" : formatNumber(quota.currentUsage)}
</TableCell>
<TableCell alignment="right">
<SourceBadge source={quota.source} />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ export default function Page() {
<BigNumber
title="Queued"
value={environment.queued}
suffix={env.paused && environment.queued > 0 ? "paused" : undefined}
suffix={env.paused ? <span className="text-warning">paused</span> : undefined}
animate
accessory={
<div className="flex items-start gap-1">
Expand All @@ -364,7 +364,7 @@ export default function Page() {
/>
</div>
}
valueClassName={cn(env.paused ? "text-warning" : undefined, "tabular-nums")}
valueClassName={env.paused ? "text-warning tabular-nums" : "tabular-nums"}
compactThreshold={1000000}
/>
<BigNumber
Expand Down Expand Up @@ -509,7 +509,10 @@ export default function Page() {
{queues.length > 0 ? (
queues.map((queue) => {
const limit = queue.concurrencyLimit ?? environment.concurrencyLimit;
const isAtLimit = queue.running >= limit;
const isAtConcurrencyLimit = queue.running >= limit;
const isAtQueueLimit =
environment.queueSizeLimit !== null &&
queue.queued >= environment.queueSizeLimit;
const queueFilterableName = `${queue.type === "task" ? "task/" : ""}${
queue.name
}`;
Expand All @@ -535,7 +538,12 @@ export default function Page() {
Paused
</Badge>
) : null}
{isAtLimit ? (
{isAtQueueLimit ? (
<Badge variant="extra-small" className="text-error">
At queue limit
</Badge>
) : null}
{isAtConcurrencyLimit ? (
<Badge variant="extra-small" className="text-warning">
At concurrency limit
</Badge>
Expand All @@ -546,7 +554,8 @@ export default function Page() {
alignment="right"
className={cn(
"w-[1%] pl-16 tabular-nums",
queue.paused ? "opacity-50" : undefined
queue.paused ? "opacity-50" : undefined,
isAtQueueLimit && "text-error"
)}
>
{queue.queued}
Expand All @@ -557,7 +566,7 @@ export default function Page() {
"w-[1%] pl-16 tabular-nums",
queue.paused ? "opacity-50" : undefined,
queue.running > 0 && "text-text-bright",
isAtLimit && "text-warning"
isAtConcurrencyLimit && "text-warning"
)}
>
{queue.running}
Expand All @@ -577,7 +586,7 @@ export default function Page() {
className={cn(
"w-[1%] pl-16",
queue.paused ? "opacity-50" : undefined,
isAtLimit && "text-warning",
isAtConcurrencyLimit && "text-warning",
queue.concurrency?.overriddenAt && "font-medium text-text-bright"
)}
>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,18 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
organizationSlug,
runParam,
spanParam,
error,
linkedRunId,
error:
error instanceof Error
? {
name: error.name,
message: error.message,
stack: error.stack,
cause: error.cause instanceof Error
? { name: error.cause.name, message: error.cause.message }
: error.cause,
}
: error,
});
return redirectWithErrorMessage(
v3RunPath(
Expand Down
25 changes: 20 additions & 5 deletions apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,26 @@ export class IdempotencyKeyConcern {
}

// We have an idempotent run, so we return it
const associatedWaitpoint = existingRun.associatedWaitpoint;
const parentRunId = request.body.options?.parentRunId;
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;

//We're using `andWait` so we need to block the parent run with a waitpoint
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
if (resumeParentOnCompletion && parentRunId) {
// Get or create waitpoint lazily (existing run may not have one if it was standalone)
let associatedWaitpoint = existingRun.associatedWaitpoint;
if (!associatedWaitpoint) {
associatedWaitpoint = await this.engine.getOrCreateRunWaitpoint({
runId: existingRun.id,
projectId: request.environment.projectId,
environmentId: request.environment.id,
});
}

// If run already completed, return without blocking
if (!associatedWaitpoint) {
return { isCached: true, run: existingRun };
}

await this.traceEventConcern.traceIdempotentRun(
request,
parentStore,
Expand All @@ -98,13 +113,13 @@ export class IdempotencyKeyConcern {
request.options?.parentAsLinkType === "replay"
? event.spanId
: event.traceparent?.spanId
? `${event.traceparent.spanId}:${event.spanId}`
: event.spanId;
? `${event.traceparent.spanId}:${event.spanId}`
: event.spanId;

//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint.id,
waitpoints: associatedWaitpoint!.id,
spanIdToComplete: spanId,
batch: request.options?.batchId
? {
Expand Down
Loading