Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## Unreleased

### Added

- Added missing block detection and recovery in the indexer.[#488](https://github.com/proto-kit/framework/pull/488)
- `@dependencyFactory` for static dependency factory type safety
- Added Mempool sorting [#395](https://github.com/proto-kit/framework/pull/395)
- Introduced dynamic block building and JIT transaction fetching [#394](https://github.com/proto-kit/framework/pull/394)
Expand Down
149 changes: 96 additions & 53 deletions packages/indexer/src/IndexerNotifier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
BlockTriggerBase,
BlockStorage,
BlockWithResult,
Sequencer,
sequencerModule,
SequencerModule,
Expand All @@ -12,7 +14,8 @@ import {
import { log } from "@proto-kit/common";
import { inject } from "tsyringe";

import { IndexBlockTask } from "./tasks/IndexBlockTask";
import { IndexBlockTask, IndexBlockResult } from "./tasks/IndexBlockTask";
import { IndexMissingBlocksTask } from "./tasks/IndexMissingBlocksTask";
import { IndexPendingTxTask } from "./tasks/IndexPendingTxTask";
import { IndexSettlementTask } from "./tasks/IndexSettlementTask";
import { IndexBatchTask } from "./tasks/IndexBatchTask";
Expand All @@ -30,7 +33,10 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
public sequencer: Sequencer<NotifierMandatorySequencerModules>,
@inject("TaskQueue")
public taskQueue: TaskQueue,
@inject("BlockStorage")
private readonly blockStorage: BlockStorage,
public indexBlockTask: IndexBlockTask,
public indexMissingBlocksTask: IndexMissingBlocksTask,
public indexPendingTxTask: IndexPendingTxTask,
public indexBatchTask: IndexBatchTask,
public indexSettlementTask: IndexSettlementTask,
Expand All @@ -39,6 +45,67 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
super();
}

private async pushTask(
queueName: string,
name: string,
payload: string
): Promise<void> {
const queue = await this.taskQueue.getQueue(queueName);
await queue.addTask({
name,
payload,
flowId: "",
sequencerId: this.sequencerIdProvider.getSequencerId(),
});
}

private async handleIndexBlockTaskCompleted(
payload: TaskPayload
): Promise<void> {
if (payload.name !== this.indexBlockTask.name) {
return;
}

try {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const result = JSON.parse(payload.payload) as IndexBlockResult;

if (
result.status !== "missing-blocks" ||
result.missingHeights.length === 0
) {
return;
}

const heights = [...result.missingHeights, result.incomingHeight];

const blocks = await Promise.all(
heights.map((h) => this.blockStorage.getBlockWithResultAt(h))
);

const filteredBlocks = blocks.filter(
(block): block is BlockWithResult => block !== undefined
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is filterNonUndefined in the common package that we can reuse for this

);

if (filteredBlocks.length === 0) {
log.warn("No blocks found to re-send");
return;
}

const serialized = await this.indexMissingBlocksTask
.inputSerializer()
.toJSON(filteredBlocks);

await this.pushTask(
this.indexMissingBlocksTask.name,
this.indexMissingBlocksTask.name,
serialized
);
} catch (error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave that to the queue implementation to handle errors that come from this. Generally, catching stuff early and not handling it (just emitting it is not handling it imo) lead to weird and non-recoverable behaviour.
Throwing stuff is okay, sometimes stuff just breaks down, but then the sequencer should crash and restart.
At least that's the ideal philosophy I try to approach these things nowadays. Idk I just thought I'd share this - here it's definitely not missing critical so it might be unwise to let the sequencer crash because of a peripherial system. Yeah idk let's talk about it more i guess

log.error("Failed to handle block task completion result", error);
}
}

public async propagateEventsAsTasks() {
const queue = await this.taskQueue.getQueue(this.indexBlockTask.name);
const inputSerializer = this.indexBlockTask.inputSerializer();
Expand All @@ -47,86 +114,62 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> {
const settlementInputSerializer =
this.indexSettlementTask.inputSerializer();

await queue.onCompleted(
async (payload) => await this.handleIndexBlockTaskCompleted(payload)
);

this.sequencer.events.on("block-metadata-produced", async (block) => {
log.debug(
"Notifiying the indexer about block",
block.block.height.toBigInt()
);
const payload = await inputSerializer.toJSON(block);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBlockTask.name,
payload,
flowId: "", // empty for now
sequencerId,
};

await queue.addTask(task);
await this.pushTask(
this.indexBlockTask.name,
this.indexBlockTask.name,
payload
);
});

this.sequencer.events.on("mempool-transaction-added", async (tx) => {
try {
const txQueue = await this.taskQueue.getQueue(
this.indexPendingTxTask.name
);
const payload = await txInputSerializer.toJSON(tx);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexPendingTxTask.name,
payload,
flowId: "",
sequencerId,
};

await txQueue.addTask(task);
await this.pushTask(
this.indexPendingTxTask.name,
this.indexPendingTxTask.name,
payload
);
} catch (err) {
log.error("Failed to add pending-tx task", err);
}
});

this.sequencer.events.on("batch-produced", async (batch) => {
log.debug("Notifiying the indexer about batch", batch?.height);
try {
const batchQueue = await this.taskQueue.getQueue(
this.indexBatchTask.name
);

const payload = await batchInputSerializer.toJSON(batch);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexBatchTask.name,
payload,
flowId: "",
sequencerId,
};

await batchQueue.addTask(task);
await this.pushTask(
this.indexBatchTask.name,
this.indexBatchTask.name,
payload
);
} catch (err) {
log.error(`Failed to index batch ${batch?.height} ${err}`);
}
});

this.sequencer.events.on("settlement-submitted", async (settlement) => {
log.debug(
"Notifiying the indexer about settlement",
"Notifying the indexer about settlement",
settlement.transactionHash
);
try {
const settlementQueue = await this.taskQueue.getQueue(
this.indexSettlementTask.name
);

const payload = await settlementInputSerializer.toJSON(settlement);
const sequencerId = this.sequencerIdProvider.getSequencerId();

const task: TaskPayload = {
name: this.indexSettlementTask.name,
payload,
flowId: "",
sequencerId,
};

await settlementQueue.addTask(task);
await this.pushTask(
this.indexSettlementTask.name,
this.indexSettlementTask.name,
payload
);
} catch (err) {
log.error(
`Failed to add index settlement: ${settlement.transactionHash} ${err}`
Expand Down
1 change: 1 addition & 0 deletions packages/indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./tasks/IndexBlockTaskParameters";
export * from "./tasks/IndexPendingTxTask";
export * from "./tasks/IndexBatchTask";
export * from "./tasks/IndexSettlementTask";
export * from "./tasks/IndexMissingBlocksTask";
45 changes: 34 additions & 11 deletions packages/indexer/src/tasks/IndexBlockTask.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BlockQueue,
BlockStorage,
Task,
TaskSerializer,
TaskWorkerModule,
Expand All @@ -12,17 +13,25 @@ import {
IndexBlockTaskParametersSerializer,
} from "./IndexBlockTaskParameters";

export interface IndexBlockResult {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think for this we can type it as a union type
{ status: "ok" } | { status: "missing-blocks", missingHeights: .... } if we want.
But I leave it to you if we want to do it that way

status: "ok" | "missing-blocks";
missingHeights: number[];
incomingHeight: number;
}

@injectable()
export class IndexBlockTask
extends TaskWorkerModule
implements Task<IndexBlockTaskParameters, string | void>
implements Task<IndexBlockTaskParameters, IndexBlockResult>
{
public name = "index-block";

public constructor(
public taskSerializer: IndexBlockTaskParametersSerializer,
@inject("BlockQueue")
public blockStorage: BlockQueue
public blockStorage: BlockQueue,
@inject("BlockStorage")
private readonly blockRepository: BlockStorage
) {
super();
}
Expand All @@ -32,27 +41,41 @@ export class IndexBlockTask

public async compute(
input: IndexBlockTaskParameters
): Promise<string | void> {
): Promise<IndexBlockResult> {
const incomingHeight = Number(input.block.height.toBigInt());
try {
const currentHeight = await this.blockRepository.getCurrentBlockHeight();

if (incomingHeight > currentHeight) {
const missingHeights = Array.from(
{ length: incomingHeight - currentHeight },
(_, i) => currentHeight + i
);

return { status: "missing-blocks", missingHeights, incomingHeight };
}
await this.blockStorage.pushBlock(input.block);
await this.blockStorage.pushResult(input.result);

log.info(`Block ${incomingHeight} indexed successfully`);
return { status: "ok", missingHeights: [], incomingHeight };
} catch (error) {
log.error("Failed to index block", input.block.height.toBigInt(), error);
return undefined;
log.error("Failed to index block", incomingHeight, error);
return { status: "ok", missingHeights: [], incomingHeight };
}

log.info(`Block ${input.block.height.toBigInt()} indexed sucessfully`);
return "";
}

public inputSerializer(): TaskSerializer<IndexBlockTaskParameters> {
return this.taskSerializer;
}

public resultSerializer(): TaskSerializer<string | void> {
public resultSerializer(): TaskSerializer<IndexBlockResult> {
return {
fromJSON: async () => {},
toJSON: async () => "",
toJSON: async (input: IndexBlockResult) => JSON.stringify(input),

fromJSON: async (json: string) =>
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
JSON.parse(json) as IndexBlockResult,
};
}
}
74 changes: 74 additions & 0 deletions packages/indexer/src/tasks/IndexMissingBlocksTask.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import {
BlockQueue,
BlockWithResult,
Task,
TaskSerializer,
TaskWorkerModule,
} from "@proto-kit/sequencer";
import { log } from "@proto-kit/common";
import { inject, injectable } from "tsyringe";

import { IndexBlockTaskParametersSerializer } from "./IndexBlockTaskParameters";

@injectable()
export class IndexMissingBlocksTask
extends TaskWorkerModule
implements Task<BlockWithResult[], string | void>
{
public name = "index-missing-blocks";

public constructor(
public taskSerializer: IndexBlockTaskParametersSerializer,
@inject("BlockQueue")
public blockStorage: BlockQueue
) {
super();
}

// eslint-disable-next-line @typescript-eslint/no-empty-function
public async prepare(): Promise<void> {}

public async compute(input: BlockWithResult[]): Promise<string | void> {
const lastHeight = Number(input[input.length - 1].block.height.toBigInt());
for (const blockWithResult of input) {
const height = Number(blockWithResult.block.height.toBigInt());
const isLast = height === lastHeight;
try {
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushBlock(blockWithResult.block);
// eslint-disable-next-line no-await-in-loop
await this.blockStorage.pushResult(blockWithResult.result);
log.info(
`${isLast ? "" : "Missing "}block ${height} indexed successfully`
);
} catch (error) {
log.error(
`Failed to index ${isLast ? "" : "missing "}block at height ${height}`,
error
);
return undefined;
}
}
return "";
}

public inputSerializer(): TaskSerializer<BlockWithResult[]> {
return {
toJSON: (blocks: BlockWithResult[]): string =>
JSON.stringify(blocks.map((b) => this.taskSerializer.toJSON(b))),

fromJSON: (json: string): BlockWithResult[] => {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const items = JSON.parse(json) as string[];
return items.map((item) => this.taskSerializer.fromJSON(item));
},
};
}

public resultSerializer(): TaskSerializer<string | void> {
return {
fromJSON: async () => {},
toJSON: async () => "",
};
}
}
Loading
Loading