Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ export * from "./graphql/modules/MerkleWitnessResolver";
export * from "./graphql/modules/LinkedMerkleWitnessResolver";
export * from "./graphql/VanillaGraphqlModules";
export * from "./metrics/OpenTelemetryServer";
export * from "./metrics/OpenTelemetryTracer";
export * from "./metrics/ModularizedInstrumentation";
97 changes: 97 additions & 0 deletions packages/api/src/metrics/ModularizedInstrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { injectable, injectAll } from "tsyringe";
import { PollInstrumentation, PushInstrumentation } from "@proto-kit/sequencer";
import { InstrumentationBase } from "@opentelemetry/instrumentation";
import { mapSequential, splitArray } from "@proto-kit/common";

const INSTRUMENTATION_PREFIX = "protokit";

@injectable()
export class ModularizedInstrumentation extends InstrumentationBase<{}> {
// private readonly pushInstrumentations: PushInstrumentation[];

// private readonly pollInstrumentations: PollInstrumentation[];

public constructor(
@injectAll("Instrumentation")
private readonly instrumentations: (
| PushInstrumentation
| PollInstrumentation
)[]
) {
super("protokit", "canary", {});
}

public async start() {
const { pushInstrumentations } = this.splitInstrumentations();
await mapSequential(pushInstrumentations, async (i) => await i.start());
}

public splitInstrumentations() {
// PushInstrumentation is abstract, therefore we can filter like this, while Poll
// isn't. But all remainders are of that type (bcs of the annotation) so it's fine
const split = splitArray(this.instrumentations, (i) =>
i instanceof PushInstrumentation ? "push" : "poll"
);
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const pushInstrumentations = (split.push as PushInstrumentation[]) ?? [];
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const pollInstrumentations = (split.poll as PollInstrumentation[]) ?? [];

return {
pushInstrumentations,
pollInstrumentations,
};
}

public initialize() {
const { pushInstrumentations, pollInstrumentations } =
this.splitInstrumentations();

pollInstrumentations.forEach((instrumentation) => {
const observableCounter = this.meter.createObservableCounter(
`${INSTRUMENTATION_PREFIX}_${instrumentation.name}`,
{
description: instrumentation.description,
}
);

this.meter.addBatchObservableCallback(
async (observableResult) => {
const metric = await instrumentation.poll();

observableResult.observe(observableCounter, metric);
},
[observableCounter]
);
});

pushInstrumentations.forEach((instrumentation) => {
const gauges = instrumentation.names.map((name) => {
const gauge = this.meter.createGauge(
`${INSTRUMENTATION_PREFIX}_${name}`,
{
description: instrumentation.description,
}
);
return [name, gauge] as const;
});

instrumentation.setPushFn((name, v) => {
const gauge = gauges.find(([candidate]) => candidate === name);
gauge![1].record(v);
});
});
}

// Called when a new `MeterProvider` is set
// the Meter (result of @opentelemetry/api's getMeter) is
// available as this.meter within this method
// eslint-disable-next-line no-underscore-dangle
override _updateMetricInstruments() {
if (this.instrumentations !== undefined) {
this.initialize();
}
}

init() {}
}
7 changes: 4 additions & 3 deletions packages/api/src/metrics/OpenTelemetryServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import { diag, DiagConsoleLogger, DiagLogLevel } from "@opentelemetry/api";
import { inject } from "tsyringe";
import { dependencyFactory, DependencyRecord, log } from "@proto-kit/common";

import { SequencerInstrumentation } from "./SequencerInstrumentation";
import { OpenTelemetryTracer } from "./OpenTelemetryTracer";
import { ModularizedInstrumentation } from "./ModularizedInstrumentation";

export type OpenTelemetryServerConfig = {
metrics?: {
Expand Down Expand Up @@ -54,9 +54,8 @@ export class OpenTelemetryServer extends SequencerModule<OpenTelemetryServerConf
config: { metrics, tracing },
} = this;

// TODO Modularize Instrumentations
const seqMetrics = this.sequencer.dependencyContainer.resolve(
SequencerInstrumentation
ModularizedInstrumentation
);

const metricReader =
Expand Down Expand Up @@ -91,6 +90,8 @@ export class OpenTelemetryServer extends SequencerModule<OpenTelemetryServerConf

sdk.start();

await seqMetrics.start();

// TODO Write logger to directly integrate with our logging library
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.ERROR);

Expand Down
53 changes: 0 additions & 53 deletions packages/api/src/metrics/SequencerInstrumentation.ts

This file was deleted.

3 changes: 3 additions & 0 deletions packages/sequencer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ export * from "./appChain/AppChain";
export * from "./appChain/AppChainModule";
export * from "./appChain/AreProofsEnabledFactory";
export * from "./appChain/SharedDependencyFactory";
export * from "./metrics/Instrumentation";
export * from "./metrics/BlockProductionInstrumentation";
export * from "./metrics/MempoolInstrumentation";
12 changes: 11 additions & 1 deletion packages/sequencer/src/mempool/private/PrivateMempool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventEmitter, log, noop } from "@proto-kit/common";
import { dependencyFactory, EventEmitter, log, noop } from "@proto-kit/common";
import { inject } from "tsyringe";

import type { Mempool, MempoolEvents } from "../Mempool";
Expand All @@ -14,13 +14,15 @@ import { trace } from "../../logging/trace";
import { IncomingMessagesService } from "../../settlement/messages/IncomingMessagesService";
import { MempoolSorting } from "../sorting/MempoolSorting";
import { DefaultMempoolSorting } from "../sorting/DefaultMempoolSorting";
import { MempoolInstrumentation } from "../../metrics/MempoolInstrumentation";

type PrivateMempoolConfig = {
type?: "hybrid" | "private" | "based";
targetBlockSize?: number;
};

@sequencerModule()
@dependencyFactory()
export class PrivateMempool
extends SequencerModule<PrivateMempoolConfig>
implements Mempool
Expand All @@ -43,6 +45,14 @@ export class PrivateMempool
this.mempoolSorting = mempoolSorting ?? new DefaultMempoolSorting();
}

public static dependencies() {
return {
MempoolInstrumentation: {
useClass: MempoolInstrumentation,
},
};
}

private type() {
return this.config.type ?? "hybrid";
}
Expand Down
37 changes: 37 additions & 0 deletions packages/sequencer/src/metrics/BlockProductionInstrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { inject, injectable } from "tsyringe";

import type { BlockTriggerBase } from "../protocol/production/trigger/BlockTrigger";

import { instrumentation, PushInstrumentation } from "./Instrumentation";

@instrumentation()
@injectable()
export class BlockProductionInstrumentation extends PushInstrumentation {
names = ["block_height", "block_result", "batch_height"];

description = "L2 block height";

public constructor(
@inject("BlockTrigger")
private readonly trigger: BlockTriggerBase
) {
super();
}

public async start() {
this.trigger.events.on("block-produced", (block) => {
this.pushValue("block_height", parseInt(block.height.toString(), 10));
});

this.trigger.events.on("block-metadata-produced", (block) => {
this.pushValue(
"block_result",
parseInt(block.block.height.toString(), 10)
);
});

this.trigger.events.on("batch-produced", (batch) => {
this.pushValue("batch_height", parseInt(batch.height.toString(), 10));
});
}
}
41 changes: 41 additions & 0 deletions packages/sequencer/src/metrics/Instrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { implement, Startable } from "@proto-kit/common";

export function instrumentation() {
return implement<PollInstrumentation | PushInstrumentation>(
"Instrumentation"
);
}

export interface InstrumentationModule {
name: string;

description?: string;
}

export interface PollInstrumentation extends InstrumentationModule {
poll(): Promise<number>;
}

export abstract class PushInstrumentation implements Startable {
abstract names: string[];

abstract description?: string;

private pushFn?: (name: string, value: number) => void;

public setPushFn(f: (name: string, value: number) => void) {
this.pushFn = f;
}

protected pushValue(name: string, value: number) {
if (this.pushFn === undefined) {
throw new Error("Pushfn not initialized");
}
if (!this.names.includes(name)) {
throw new Error(`Name ${name} not specified in declared name list`);
}
this.pushFn(name, value);
}

abstract start(): Promise<void>;
}
22 changes: 22 additions & 0 deletions packages/sequencer/src/metrics/MempoolInstrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { inject, injectable } from "tsyringe";

import type { PrivateMempool } from "../mempool/private/PrivateMempool";

import { instrumentation, PollInstrumentation } from "./Instrumentation";

@instrumentation()
@injectable()
export class MempoolInstrumentation implements PollInstrumentation {
name = "mempool_size";

description = "The size of the mempool";

public constructor(
@inject("Mempool")
private readonly mempool: PrivateMempool
) {}

async poll() {
return await this.mempool.length();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { inject, injectable } from "tsyringe";
import { log } from "@proto-kit/common";
import { dependencyFactory, log } from "@proto-kit/common";

import { closeable, Closeable } from "../../../sequencer/builder/Closeable";
import { BatchProducerModule } from "../BatchProducerModule";
Expand All @@ -13,6 +13,7 @@ import {
} from "../../../settlement/BridgingModule";
import { ensureNotBusy } from "../../../helpers/BusyGuard";
import { SequencerStartupModule } from "../../../sequencer/SequencerStartupModule";
import { BlockProductionInstrumentation } from "../../../metrics/BlockProductionInstrumentation";

import { BlockTriggerBase } from "./BlockTrigger";

Expand All @@ -26,6 +27,7 @@ export interface TimedBlockTriggerConfig {

@injectable()
@closeable()
@dependencyFactory()
export class TimedBlockTrigger
extends BlockTriggerBase<TimedBlockTriggerConfig>
implements Closeable
Expand Down Expand Up @@ -59,6 +61,14 @@ export class TimedBlockTrigger
);
}

public static dependencies() {
return {
BlockProductionInstrumentation: {
useClass: BlockProductionInstrumentation,
},
};
}

public async start(): Promise<void> {
log.info("Starting timed block trigger");
const { settlementInterval, blockInterval } = this.config;
Expand Down
6 changes: 5 additions & 1 deletion packages/stack/src/scripts/graphql/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ export async function startServer() {
},
metrics: {
enabled: true,
prometheus: {
port: 9464,
host: "0.0.0.0",
},
},
},

Expand Down Expand Up @@ -239,7 +243,7 @@ export async function startServer() {
let nonce = Number(as?.nonce.toString() ?? "0");

setInterval(async () => {
const random = Math.floor(Math.random() * 5);
const random = 0; //Math.floor(Math.random() * 5);
await mapSequential(range(0, random), async () => {
const tx = await appChain.transaction(
priv.toPublicKey(),
Expand Down
Loading