-
Notifications
You must be signed in to change notification settings - Fork 16
feat(indexer): recover missing blocks #488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| import { | ||
| BlockTriggerBase, | ||
| BlockStorage, | ||
| BlockWithResult, | ||
| Sequencer, | ||
| sequencerModule, | ||
| SequencerModule, | ||
|
|
@@ -12,7 +14,8 @@ import { | |
| import { log } from "@proto-kit/common"; | ||
| import { inject } from "tsyringe"; | ||
|
|
||
| import { IndexBlockTask } from "./tasks/IndexBlockTask"; | ||
| import { IndexBlockTask, IndexBlockResult } from "./tasks/IndexBlockTask"; | ||
| import { IndexMissingBlocksTask } from "./tasks/IndexMissingBlocksTask"; | ||
| import { IndexPendingTxTask } from "./tasks/IndexPendingTxTask"; | ||
| import { IndexSettlementTask } from "./tasks/IndexSettlementTask"; | ||
| import { IndexBatchTask } from "./tasks/IndexBatchTask"; | ||
|
|
@@ -30,7 +33,10 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> { | |
| public sequencer: Sequencer<NotifierMandatorySequencerModules>, | ||
| @inject("TaskQueue") | ||
| public taskQueue: TaskQueue, | ||
| @inject("BlockStorage") | ||
| private readonly blockStorage: BlockStorage, | ||
| public indexBlockTask: IndexBlockTask, | ||
| public indexMissingBlocksTask: IndexMissingBlocksTask, | ||
| public indexPendingTxTask: IndexPendingTxTask, | ||
| public indexBatchTask: IndexBatchTask, | ||
| public indexSettlementTask: IndexSettlementTask, | ||
|
|
@@ -39,6 +45,67 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> { | |
| super(); | ||
| } | ||
|
|
||
| private async pushTask( | ||
| queueName: string, | ||
| name: string, | ||
| payload: string | ||
| ): Promise<void> { | ||
| const queue = await this.taskQueue.getQueue(queueName); | ||
| await queue.addTask({ | ||
| name, | ||
| payload, | ||
| flowId: "", | ||
| sequencerId: this.sequencerIdProvider.getSequencerId(), | ||
| }); | ||
| } | ||
|
|
||
| private async handleIndexBlockTaskCompleted( | ||
| payload: TaskPayload | ||
| ): Promise<void> { | ||
| if (payload.name !== this.indexBlockTask.name) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| // eslint-disable-next-line @typescript-eslint/consistent-type-assertions | ||
| const result = JSON.parse(payload.payload) as IndexBlockResult; | ||
|
|
||
| if ( | ||
| result.status !== "missing-blocks" || | ||
| result.missingHeights.length === 0 | ||
| ) { | ||
| return; | ||
| } | ||
|
|
||
| const heights = [...result.missingHeights, result.incomingHeight]; | ||
|
|
||
| const blocks = await Promise.all( | ||
| heights.map((h) => this.blockStorage.getBlockWithResultAt(h)) | ||
| ); | ||
|
|
||
| const filteredBlocks = blocks.filter( | ||
| (block): block is BlockWithResult => block !== undefined | ||
| ); | ||
|
|
||
| if (filteredBlocks.length === 0) { | ||
| log.warn("No blocks found to re-send"); | ||
| return; | ||
| } | ||
|
|
||
| const serialized = await this.indexMissingBlocksTask | ||
| .inputSerializer() | ||
| .toJSON(filteredBlocks); | ||
|
|
||
| await this.pushTask( | ||
| this.indexMissingBlocksTask.name, | ||
| this.indexMissingBlocksTask.name, | ||
| serialized | ||
| ); | ||
| } catch (error) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's leave that to the queue implementation to handle errors that come from this. Generally, catching stuff early and not handling it (just emitting it is not handling it imo) lead to weird and non-recoverable behaviour. |
||
| log.error("Failed to handle block task completion result", error); | ||
| } | ||
| } | ||
|
|
||
| public async propagateEventsAsTasks() { | ||
| const queue = await this.taskQueue.getQueue(this.indexBlockTask.name); | ||
| const inputSerializer = this.indexBlockTask.inputSerializer(); | ||
|
|
@@ -47,86 +114,62 @@ export class IndexerNotifier extends SequencerModule<Record<never, never>> { | |
| const settlementInputSerializer = | ||
| this.indexSettlementTask.inputSerializer(); | ||
|
|
||
| await queue.onCompleted( | ||
| async (payload) => await this.handleIndexBlockTaskCompleted(payload) | ||
| ); | ||
|
|
||
| this.sequencer.events.on("block-metadata-produced", async (block) => { | ||
| log.debug( | ||
| "Notifiying the indexer about block", | ||
| block.block.height.toBigInt() | ||
| ); | ||
| const payload = await inputSerializer.toJSON(block); | ||
| const sequencerId = this.sequencerIdProvider.getSequencerId(); | ||
|
|
||
| const task: TaskPayload = { | ||
| name: this.indexBlockTask.name, | ||
| payload, | ||
| flowId: "", // empty for now | ||
| sequencerId, | ||
| }; | ||
|
|
||
| await queue.addTask(task); | ||
| await this.pushTask( | ||
| this.indexBlockTask.name, | ||
| this.indexBlockTask.name, | ||
| payload | ||
| ); | ||
| }); | ||
|
|
||
| this.sequencer.events.on("mempool-transaction-added", async (tx) => { | ||
| try { | ||
| const txQueue = await this.taskQueue.getQueue( | ||
| this.indexPendingTxTask.name | ||
| ); | ||
| const payload = await txInputSerializer.toJSON(tx); | ||
| const sequencerId = this.sequencerIdProvider.getSequencerId(); | ||
|
|
||
| const task: TaskPayload = { | ||
| name: this.indexPendingTxTask.name, | ||
| payload, | ||
| flowId: "", | ||
| sequencerId, | ||
| }; | ||
|
|
||
| await txQueue.addTask(task); | ||
| await this.pushTask( | ||
| this.indexPendingTxTask.name, | ||
| this.indexPendingTxTask.name, | ||
| payload | ||
| ); | ||
| } catch (err) { | ||
| log.error("Failed to add pending-tx task", err); | ||
| } | ||
| }); | ||
|
|
||
| this.sequencer.events.on("batch-produced", async (batch) => { | ||
| log.debug("Notifiying the indexer about batch", batch?.height); | ||
| try { | ||
| const batchQueue = await this.taskQueue.getQueue( | ||
| this.indexBatchTask.name | ||
| ); | ||
|
|
||
| const payload = await batchInputSerializer.toJSON(batch); | ||
| const sequencerId = this.sequencerIdProvider.getSequencerId(); | ||
|
|
||
| const task: TaskPayload = { | ||
| name: this.indexBatchTask.name, | ||
| payload, | ||
| flowId: "", | ||
| sequencerId, | ||
| }; | ||
|
|
||
| await batchQueue.addTask(task); | ||
| await this.pushTask( | ||
| this.indexBatchTask.name, | ||
| this.indexBatchTask.name, | ||
| payload | ||
| ); | ||
| } catch (err) { | ||
| log.error(`Failed to index batch ${batch?.height} ${err}`); | ||
| } | ||
| }); | ||
|
|
||
| this.sequencer.events.on("settlement-submitted", async (settlement) => { | ||
| log.debug( | ||
| "Notifiying the indexer about settlement", | ||
| "Notifying the indexer about settlement", | ||
| settlement.transactionHash | ||
| ); | ||
| try { | ||
| const settlementQueue = await this.taskQueue.getQueue( | ||
| this.indexSettlementTask.name | ||
| ); | ||
|
|
||
| const payload = await settlementInputSerializer.toJSON(settlement); | ||
| const sequencerId = this.sequencerIdProvider.getSequencerId(); | ||
|
|
||
| const task: TaskPayload = { | ||
| name: this.indexSettlementTask.name, | ||
| payload, | ||
| flowId: "", | ||
| sequencerId, | ||
| }; | ||
|
|
||
| await settlementQueue.addTask(task); | ||
| await this.pushTask( | ||
| this.indexSettlementTask.name, | ||
| this.indexSettlementTask.name, | ||
| payload | ||
| ); | ||
| } catch (err) { | ||
| log.error( | ||
| `Failed to add index settlement: ${settlement.transactionHash} ${err}` | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| import { | ||
| BlockQueue, | ||
| BlockStorage, | ||
| Task, | ||
| TaskSerializer, | ||
| TaskWorkerModule, | ||
|
|
@@ -12,17 +13,25 @@ import { | |
| IndexBlockTaskParametersSerializer, | ||
| } from "./IndexBlockTaskParameters"; | ||
|
|
||
| export interface IndexBlockResult { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I think for this we can type it as a union type |
||
| status: "ok" | "missing-blocks"; | ||
| missingHeights: number[]; | ||
| incomingHeight: number; | ||
| } | ||
|
|
||
| @injectable() | ||
| export class IndexBlockTask | ||
| extends TaskWorkerModule | ||
| implements Task<IndexBlockTaskParameters, string | void> | ||
| implements Task<IndexBlockTaskParameters, IndexBlockResult> | ||
| { | ||
| public name = "index-block"; | ||
|
|
||
| public constructor( | ||
| public taskSerializer: IndexBlockTaskParametersSerializer, | ||
| @inject("BlockQueue") | ||
| public blockStorage: BlockQueue | ||
| public blockStorage: BlockQueue, | ||
| @inject("BlockStorage") | ||
| private readonly blockRepository: BlockStorage | ||
| ) { | ||
| super(); | ||
| } | ||
|
|
@@ -32,27 +41,41 @@ export class IndexBlockTask | |
|
|
||
| public async compute( | ||
| input: IndexBlockTaskParameters | ||
| ): Promise<string | void> { | ||
| ): Promise<IndexBlockResult> { | ||
| const incomingHeight = Number(input.block.height.toBigInt()); | ||
| try { | ||
| const currentHeight = await this.blockRepository.getCurrentBlockHeight(); | ||
|
|
||
| if (incomingHeight > currentHeight) { | ||
| const missingHeights = Array.from( | ||
| { length: incomingHeight - currentHeight }, | ||
| (_, i) => currentHeight + i | ||
| ); | ||
|
|
||
| return { status: "missing-blocks", missingHeights, incomingHeight }; | ||
| } | ||
| await this.blockStorage.pushBlock(input.block); | ||
| await this.blockStorage.pushResult(input.result); | ||
|
|
||
| log.info(`Block ${incomingHeight} indexed successfully`); | ||
| return { status: "ok", missingHeights: [], incomingHeight }; | ||
| } catch (error) { | ||
| log.error("Failed to index block", input.block.height.toBigInt(), error); | ||
| return undefined; | ||
| log.error("Failed to index block", incomingHeight, error); | ||
| return { status: "ok", missingHeights: [], incomingHeight }; | ||
| } | ||
|
|
||
| log.info(`Block ${input.block.height.toBigInt()} indexed sucessfully`); | ||
| return ""; | ||
| } | ||
|
|
||
| public inputSerializer(): TaskSerializer<IndexBlockTaskParameters> { | ||
| return this.taskSerializer; | ||
| } | ||
|
|
||
| public resultSerializer(): TaskSerializer<string | void> { | ||
| public resultSerializer(): TaskSerializer<IndexBlockResult> { | ||
| return { | ||
| fromJSON: async () => {}, | ||
| toJSON: async () => "", | ||
| toJSON: async (input: IndexBlockResult) => JSON.stringify(input), | ||
|
|
||
| fromJSON: async (json: string) => | ||
| // eslint-disable-next-line @typescript-eslint/consistent-type-assertions | ||
| JSON.parse(json) as IndexBlockResult, | ||
| }; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| import { | ||
| BlockQueue, | ||
| BlockWithResult, | ||
| Task, | ||
| TaskSerializer, | ||
| TaskWorkerModule, | ||
| } from "@proto-kit/sequencer"; | ||
| import { log } from "@proto-kit/common"; | ||
| import { inject, injectable } from "tsyringe"; | ||
|
|
||
| import { IndexBlockTaskParametersSerializer } from "./IndexBlockTaskParameters"; | ||
|
|
||
| @injectable() | ||
| export class IndexMissingBlocksTask | ||
| extends TaskWorkerModule | ||
| implements Task<BlockWithResult[], string | void> | ||
| { | ||
| public name = "index-missing-blocks"; | ||
|
|
||
| public constructor( | ||
| public taskSerializer: IndexBlockTaskParametersSerializer, | ||
| @inject("BlockQueue") | ||
| public blockStorage: BlockQueue | ||
| ) { | ||
| super(); | ||
| } | ||
|
|
||
| // eslint-disable-next-line @typescript-eslint/no-empty-function | ||
| public async prepare(): Promise<void> {} | ||
|
|
||
| public async compute(input: BlockWithResult[]): Promise<string | void> { | ||
| const lastHeight = Number(input[input.length - 1].block.height.toBigInt()); | ||
| for (const blockWithResult of input) { | ||
| const height = Number(blockWithResult.block.height.toBigInt()); | ||
| const isLast = height === lastHeight; | ||
| try { | ||
| // eslint-disable-next-line no-await-in-loop | ||
| await this.blockStorage.pushBlock(blockWithResult.block); | ||
| // eslint-disable-next-line no-await-in-loop | ||
| await this.blockStorage.pushResult(blockWithResult.result); | ||
| log.info( | ||
| `${isLast ? "" : "Missing "}block ${height} indexed successfully` | ||
| ); | ||
| } catch (error) { | ||
| log.error( | ||
| `Failed to index ${isLast ? "" : "missing "}block at height ${height}`, | ||
| error | ||
| ); | ||
| return undefined; | ||
| } | ||
| } | ||
| return ""; | ||
| } | ||
|
|
||
| public inputSerializer(): TaskSerializer<BlockWithResult[]> { | ||
| return { | ||
| toJSON: (blocks: BlockWithResult[]): string => | ||
| JSON.stringify(blocks.map((b) => this.taskSerializer.toJSON(b))), | ||
|
|
||
| fromJSON: (json: string): BlockWithResult[] => { | ||
| // eslint-disable-next-line @typescript-eslint/consistent-type-assertions | ||
| const items = JSON.parse(json) as string[]; | ||
| return items.map((item) => this.taskSerializer.fromJSON(item)); | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| public resultSerializer(): TaskSerializer<string | void> { | ||
| return { | ||
| fromJSON: async () => {}, | ||
| toJSON: async () => "", | ||
| }; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is
filterNonUndefinedin the common package that we can reuse for this