From 45175e754ae2c2c893c37a5eda999c1614d6dd47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Wed, 25 Feb 2026 14:50:04 -0300 Subject: [PATCH 01/17] fix: reduce memory spent on initial sync --- constants/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/constants/index.ts b/constants/index.ts index de1daea1..aaf71fde 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -277,9 +277,9 @@ export const MEMPOOL_PROCESS_DELAY = 100 // On chronik, the max allowed is 200 export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200 -export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 128 -export const TX_EMIT_BATCH_SIZE = 2_000 // for our generator, not chronik -export const DB_COMMIT_BATCH_SIZE = 2_000 // tamanho dos lotes para commit no DB +export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 64 +export const TX_EMIT_BATCH_SIZE = 1_000 // for our generator, not chronik +export const DB_COMMIT_BATCH_SIZE = 1_000 // tamanho dos lotes para commit no DB export const TRIGGER_POST_CONCURRENCY = 100 export const TRIGGER_EMAIL_CONCURRENCY = 100 From 2b817c74b28d8aa3cd787fc40354f9dff2f9f786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Wed, 25 Feb 2026 21:54:42 -0300 Subject: [PATCH 02/17] fix: constants values --- constants/index.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/constants/index.ts b/constants/index.ts index aaf71fde..96b2a838 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -276,10 +276,9 @@ export const MEMPOOL_PROCESS_DELAY = 100 // When fetching some address transactions, number of transactions to fetch at a time. // On chronik, the max allowed is 200 export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200 - -export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 64 -export const TX_EMIT_BATCH_SIZE = 1_000 // for our generator, not chronik -export const DB_COMMIT_BATCH_SIZE = 1_000 // tamanho dos lotes para commit no DB +export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 5 +export const TX_EMIT_BATCH_SIZE = 500 +export const DB_COMMIT_BATCH_SIZE = 500 export const TRIGGER_POST_CONCURRENCY = 100 export const TRIGGER_EMAIL_CONCURRENCY = 100 From 07815ebd67d8af6d36ec309440fa504bac0871b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 06:51:14 -0300 Subject: [PATCH 03/17] wip WIP --- constants/index.ts | 2 +- prisma-local/clientInstance.ts | 13 +++++++++++-- scripts/paybutton-server-start.sh | 15 +++++++++------ 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/constants/index.ts b/constants/index.ts index 96b2a838..e9557ee1 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -276,7 +276,7 @@ export const MEMPOOL_PROCESS_DELAY = 100 // When fetching some address transactions, number of transactions to fetch at a time. // On chronik, the max allowed is 200 export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200 -export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 5 +export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 1 export const TX_EMIT_BATCH_SIZE = 500 export const DB_COMMIT_BATCH_SIZE = 500 diff --git a/prisma-local/clientInstance.ts b/prisma-local/clientInstance.ts index b018e07f..52e3cb2c 100644 --- a/prisma-local/clientInstance.ts +++ b/prisma-local/clientInstance.ts @@ -2,16 +2,25 @@ import { PrismaClient } from '@prisma/client' let prisma: PrismaClient +const CONNECTION_LIMIT = 5 +const POOL_TIMEOUT = 30 + +function buildDatasourceUrl (): string { + const baseUrl = process.env.DATABASE_URL ?? '' + const separator = baseUrl.includes('?') ? '&' : '?' + return `${baseUrl}${separator}connection_limit=${CONNECTION_LIMIT}&pool_timeout=${POOL_TIMEOUT}` +} + interface CustomNodeJsGlobal extends NodeJS.Global { prisma: PrismaClient } declare const global: CustomNodeJsGlobal if (process.env.NODE_ENV === 'production') { - prisma = new PrismaClient() + prisma = new PrismaClient({ datasources: { db: { url: buildDatasourceUrl() } } }) } else { if (global.prisma === undefined) { - global.prisma = new PrismaClient() + global.prisma = new PrismaClient({ datasources: { db: { url: buildDatasourceUrl() } } }) } prisma = global.prisma diff --git a/scripts/paybutton-server-start.sh b/scripts/paybutton-server-start.sh index 1028c14f..4e8f28cb 100755 --- a/scripts/paybutton-server-start.sh +++ b/scripts/paybutton-server-start.sh @@ -12,6 +12,12 @@ echo Connected to the db. yarn || exit 1 # Clear logs +start_processes() { + pm2 start yarn --time --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs + pm2 start yarn --time --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer + pm2 start yarn --time --name next --output logs/next.log --error logs/next.log -- "$1" +} + logtime=$(date +%Y-%m-%d@%H:%M) [ -e logs/next.log ] && mv logs/next.log logs/history/next_"$logtime".log [ -e logs/jobs.log ] && mv logs/jobs.log logs/history/jobs_"$logtime".log @@ -20,14 +26,11 @@ logtime=$(date +%Y-%m-%d@%H:%M) if [ "$ENVIRONMENT" = "production" ]; then yarn prisma migrate deploy || exit 1 yarn prisma generate || exit 1 - pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs || exit 1 - pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer || exit 1 - pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- prod || exit 1 + start_processes prod else yarn prisma migrate dev || exit 1 yarn prisma db seed || exit 1 - pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs || exit 1 - pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer || exit 1 - pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- dev || exit 1 + start_processes dev fi pm2 logs next + From b3d6db8acd0a1a6b2266fc63b36869a55256513e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 06:55:10 -0300 Subject: [PATCH 04/17] WIP --- services/transactionService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/transactionService.ts b/services/transactionService.ts index 825772b9..95c29a21 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -259,7 +259,7 @@ export async function fetchTransactionsWithPaybuttonsAndPricesForIdList (txIdLis }) } -export async function * generateTransactionsWithPaybuttonsAndPricesForAddress (addressId: string, pageSize = 5000): AsyncGenerator { +export async function * generateTransactionsWithPaybuttonsAndPricesForAddress (addressId: string, pageSize = 500): AsyncGenerator { let page = 0 while (true) { From f4da4ddeeac1d05840bdd6e2d3c9bbfcbb21c32d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 06:56:27 -0300 Subject: [PATCH 05/17] WIP --- redis/paymentCache.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/redis/paymentCache.ts b/redis/paymentCache.ts index c59d6fea..9911e570 100755 --- a/redis/paymentCache.ts +++ b/redis/paymentCache.ts @@ -33,6 +33,7 @@ export async function * getUserUncachedAddresses (userId: string): AsyncGenerato export const getPaymentList = async (userId: string): Promise => { const uncachedAddressStream = getUserUncachedAddresses(userId) for await (const address of uncachedAddressStream) { + console.log('[CACHE]: Creating cache for uncached address', address.address) void await CacheSet.addressCreation(address) } return await getCachedPaymentsForUser(userId) @@ -283,6 +284,7 @@ export const clearRecentAddressCache = async (addressString: string, timestamps: export const initPaymentCache = async (address: Address): Promise => { const cachedKeys = await getCachedWeekKeysForAddress(address.address) if (cachedKeys.length === 0) { + console.log('[CACHE]: Initializing cache for address', address.address) await CacheSet.addressCreation(address) return true } From 6768d0601ef1c9254cec3c03e0e550afd4c7428f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 07:09:27 -0300 Subject: [PATCH 06/17] WIP --- scripts/paybutton-server-start.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/paybutton-server-start.sh b/scripts/paybutton-server-start.sh index 4e8f28cb..799d7e8c 100755 --- a/scripts/paybutton-server-start.sh +++ b/scripts/paybutton-server-start.sh @@ -13,9 +13,9 @@ yarn || exit 1 # Clear logs start_processes() { - pm2 start yarn --time --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs - pm2 start yarn --time --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer - pm2 start yarn --time --name next --output logs/next.log --error logs/next.log -- "$1" + pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs + pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer + pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- "$1" } logtime=$(date +%Y-%m-%d@%H:%M) From 0e185edce2842740208b4ebb29a120f048c2ddeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 07:32:24 -0300 Subject: [PATCH 07/17] smaller prisma txs --- services/transactionService.ts | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/services/transactionService.ts b/services/transactionService.ts index 95c29a21..7c1625ab 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -568,10 +568,16 @@ export async function createManyTransactions ( ): Promise { const insertedTransactionsDistinguished: TxDistinguished[] = [] - await prisma.$transaction( - async (prisma) => { - const BATCH_SIZE = 50 - for (let i = 0; i < transactionsData.length; i += BATCH_SIZE) { + const txStart = Date.now() + console.log(`[createManyTransactions] Starting transaction for ${transactionsData.length} transactions`) + + const BATCH_SIZE = 50 + const totalBatches = Math.ceil(transactionsData.length / BATCH_SIZE) + for (let i = 0; i < transactionsData.length; i += BATCH_SIZE) { + await prisma.$transaction( + async (prisma) => { + const batchNum = Math.floor(i / BATCH_SIZE) + 1 + const batchStart = Date.now() const batch = transactionsData.slice(i, i + BATCH_SIZE) const results = await Promise.all( @@ -593,18 +599,20 @@ export async function createManyTransactions ( ) ) + const batchElapsed = Date.now() - batchStart + const totalElapsed = Date.now() - txStart + console.log(`[createManyTransactions] Batch ${batchNum}/${totalBatches}: ${batch.length} upserts in ${batchElapsed}ms (total: ${totalElapsed}ms / ${UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT}ms timeout)`) + for (const upsertedTx of results) { insertedTransactionsDistinguished.push({ tx: upsertedTx, isCreated: upsertedTx.createdAt.getTime() === upsertedTx.updatedAt.getTime() }) } - } - }, - { - timeout: UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT - } - ) + }, { timeout: UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT } + ) + console.log(`[createManyTransactions] Transaction completed in ${Date.now() - txStart}ms`) + } const insertedTransactions = insertedTransactionsDistinguished .filter((txD) => txD.isCreated) From f868ad86f710b682edb44e38dbe086a1ceb14691 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 07:33:21 -0300 Subject: [PATCH 08/17] prisma logs --- prisma-local/clientInstance.ts | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/prisma-local/clientInstance.ts b/prisma-local/clientInstance.ts index 52e3cb2c..8916330c 100644 --- a/prisma-local/clientInstance.ts +++ b/prisma-local/clientInstance.ts @@ -1,9 +1,10 @@ -import { PrismaClient } from '@prisma/client' +import { PrismaClient, Prisma } from '@prisma/client' let prisma: PrismaClient const CONNECTION_LIMIT = 5 const POOL_TIMEOUT = 30 +const SLOW_QUERY_THRESHOLD_MS = 10000 function buildDatasourceUrl (): string { const baseUrl = process.env.DATABASE_URL ?? '' @@ -11,21 +12,42 @@ function buildDatasourceUrl (): string { return `${baseUrl}${separator}connection_limit=${CONNECTION_LIMIT}&pool_timeout=${POOL_TIMEOUT}` } +function createPrismaClient (): PrismaClient { + const logLevels: Prisma.LogLevel[] = process.env.PRISMA_LOG === 'true' + ? ['query', 'info', 'warn', 'error'] + : ['warn', 'error'] + + const client = new PrismaClient({ + datasources: { db: { url: buildDatasourceUrl() } }, + log: [ + ...logLevels.map(level => ({ level, emit: 'event' as const })), + { level: 'query' as const, emit: 'event' as const } + ] + }) + + // Log slow queries even when PRISMA_LOG is off + client.$on('query', (e: Prisma.QueryEvent) => { + if (e.duration >= SLOW_QUERY_THRESHOLD_MS) { + console.warn(`[Prisma SLOW QUERY] ${e.duration}ms: ${e.query.slice(0, 200)}...`) + } + }) + + return client +} + interface CustomNodeJsGlobal extends NodeJS.Global { prisma: PrismaClient } declare const global: CustomNodeJsGlobal if (process.env.NODE_ENV === 'production') { - prisma = new PrismaClient({ datasources: { db: { url: buildDatasourceUrl() } } }) + prisma = createPrismaClient() } else { if (global.prisma === undefined) { - global.prisma = new PrismaClient({ datasources: { db: { url: buildDatasourceUrl() } } }) + global.prisma = createPrismaClient() } prisma = global.prisma } export default prisma - -// https://github.com/prisma/prisma/issues/1983#issuecomment-620621213 From a40dd06e5657520e57d39afcb9d4ffe450019045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 07:57:10 -0300 Subject: [PATCH 09/17] one network at a time --- services/chronikService.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index 63b708e0..28421df9 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -1178,10 +1178,8 @@ class MultiBlockchainClient { public async syncMissedTransactions (): Promise { await this.waitForStart() - await Promise.all([ - this.clients.ecash.syncMissedTransactions(), - this.clients.bitcoincash.syncMissedTransactions() - ]) + await this.clients.ecash.syncMissedTransactions() + await this.clients.bitcoincash.syncMissedTransactions() } public async syncAndSubscribeAddresses (addresses: Address[]): Promise { From 86104fccac742fc4f973720601a71c3a1bdb7bf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 07:57:35 -0300 Subject: [PATCH 10/17] wip --- constants/index.ts | 6 +++-- jobs/workers.ts | 8 +++---- services/transactionService.ts | 42 +++++++++++++++++++--------------- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/constants/index.ts b/constants/index.ts index e9557ee1..aa3a5228 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -174,7 +174,9 @@ export const SYNC_TXS_JOBS_RETRY_DELAY = 2000 export const BCH_TIMESTAMP_THRESHOLD = 1501588800 // 2017 Aug 1, 12PM export const XEC_TIMESTAMP_THRESHOLD = 1605398400 // 2020 Nov 15, 12AM -export const DEFAULT_WORKER_LOCK_DURATION = 120000 +export const PRICE_SYNC_WORKER_LOCK_DURATION = 180000 +export const BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION = 900000 +export const CLEANUP_WORKER_LOCK_DURATION = 120000 // Wait time (in ms) between sync of current prices export const CURRENT_PRICE_REPEAT_DELAY = 60000 @@ -197,7 +199,7 @@ export const QUOTE_IDS: KeyValueT = { USD: 1, CAD: 2 } export type BLOCKCHAIN_CLIENT_OPTIONS = 'chronik' -export const UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT = 45000 +export const UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT = 90000 export const DEFAULT_TX_PAGE_SIZE = 100 export const PAYMENT_WEEK_KEY_FORMAT = 'YYYY:WW' diff --git a/jobs/workers.ts b/jobs/workers.ts index d913176f..460f2bc4 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -1,6 +1,6 @@ import { Worker } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' -import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index' +import { PRICE_SYNC_WORKER_LOCK_DURATION, BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION, CLEANUP_WORKER_LOCK_DURATION } from 'constants/index' import { multiBlockchainClient } from 'services/chronikService' import { connectAllTransactionsToPrices } from 'services/transactionService' import { cleanupExpiredClientPayments } from 'services/clientPaymentService' @@ -16,7 +16,7 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise }, { connection: redisBullMQ, - lockDuration: DEFAULT_WORKER_LOCK_DURATION + lockDuration: PRICE_SYNC_WORKER_LOCK_DURATION } ) worker.on('completed', job => { @@ -42,7 +42,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string): Promise< }, { connection: redisBullMQ, - lockDuration: DEFAULT_WORKER_LOCK_DURATION + lockDuration: BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION } ) @@ -76,7 +76,7 @@ export const cleanupClientPaymentsWorker = async (queueName: string): Promise - await prisma.transaction.upsert({ - create: tx, - where: { - Transaction_hash_addressId_unique_constraint: { - hash: tx.hash, - addressId: tx.addressId - } - }, - update: { - confirmed: tx.confirmed, - timestamp: tx.timestamp - }, - include: includeNetwork - }) + const results: TransactionWithNetwork[] = [] + for (let j = 0; j < batch.length; j += UPSERT_PARALLELISM) { + const upsertSlice = batch.slice(j, j + UPSERT_PARALLELISM) + const sliceResults = await Promise.all( + upsertSlice.map(async (tx) => + await prisma.transaction.upsert({ + create: tx, + where: { + Transaction_hash_addressId_unique_constraint: { + hash: tx.hash, + addressId: tx.addressId + } + }, + update: { + confirmed: tx.confirmed, + timestamp: tx.timestamp + }, + include: includeNetwork + }) + ) ) - ) + results.push(...sliceResults) + } const batchElapsed = Date.now() - batchStart const totalElapsed = Date.now() - txStart - console.log(`[createManyTransactions] Batch ${batchNum}/${totalBatches}: ${batch.length} upserts in ${batchElapsed}ms (total: ${totalElapsed}ms / ${UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT}ms timeout)`) + console.log(`[createManyTransactions] Batch ${batchNum}/${totalBatches}: ${batch.length} upserts in ${batchElapsed}ms (parallelism=${UPSERT_PARALLELISM}, total: ${totalElapsed}ms / ${UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT}ms timeout)`) for (const upsertedTx of results) { insertedTransactionsDistinguished.push({ From cca880975dcb7bb93a357f084054d303662246b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 08:37:42 -0300 Subject: [PATCH 11/17] batch addresses --- constants/index.ts | 1 + services/chronikService.ts | 38 ++++++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/constants/index.ts b/constants/index.ts index aa3a5228..d7424834 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -281,6 +281,7 @@ export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200 export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 1 export const TX_EMIT_BATCH_SIZE = 500 export const DB_COMMIT_BATCH_SIZE = 500 +export const SYNC_MISSED_TXS_BATCH_SIZE = 50 export const TRIGGER_POST_CONCURRENCY = 100 export const TRIGGER_EMAIL_CONCURRENCY = 100 diff --git a/services/chronikService.ts b/services/chronikService.ts index 28421df9..f29dae1b 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -1,7 +1,7 @@ import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client' import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs' import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes' -import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE } from 'constants/index' +import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, SYNC_MISSED_TXS_BATCH_SIZE } from 'constants/index' import { productionAddresses } from 'prisma-local/seeds/addresses' import { TransactionWithAddressAndPrices, @@ -958,20 +958,30 @@ export class ChronikBlockchainClient { public async syncMissedTransactions (): Promise { const addresses = await fetchAllAddressesForNetworkId(this.networkId) - try { - const { failedAddressesWithErrors, successfulAddressesWithCount } = await this.syncAddresses(addresses, true) - Object.keys(failedAddressesWithErrors).forEach((addr) => { - console.error(`${this.CHRONIK_MSG_PREFIX}: When syncing missing addresses for address ${addr} encountered error: ${failedAddressesWithErrors[addr]}`) - }) - console.log(`${this.CHRONIK_MSG_PREFIX}: Missed txs successfully synced per address:`) - Object.keys(successfulAddressesWithCount).forEach((addr) => { - if (successfulAddressesWithCount[addr] > 0) { - console.log(`${this.CHRONIK_MSG_PREFIX}:> ${addr} — ${successfulAddressesWithCount[addr]}.`) - } - }) - } catch (err: any) { - console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR: (skipping anyway) initial missing transactions sync failed: ${err.message as string} ${err.stack as string}`) + const failedAddressesWithErrors: KeyValueT = {} + const successfulAddressesWithCount: KeyValueT = {} + + for (let i = 0; i < addresses.length; i += SYNC_MISSED_TXS_BATCH_SIZE) { + const batch = addresses.slice(i, i + SYNC_MISSED_TXS_BATCH_SIZE) + console.log(`${this.CHRONIK_MSG_PREFIX}: Syncing missed txs batch ${Math.floor(i / SYNC_MISSED_TXS_BATCH_SIZE) + 1}/${Math.ceil(addresses.length / SYNC_MISSED_TXS_BATCH_SIZE)} (${batch.length} addresses)`) + try { + const result = await this.syncAddresses(batch, true) + Object.assign(failedAddressesWithErrors, result.failedAddressesWithErrors) + Object.assign(successfulAddressesWithCount, result.successfulAddressesWithCount) + } catch (err: any) { + console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR: (skipping batch) missed transactions sync failed for batch starting at index ${i}: ${err.message as string} ${err.stack as string}`) + } } + + Object.keys(failedAddressesWithErrors).forEach((addr) => { + console.error(`${this.CHRONIK_MSG_PREFIX}: When syncing missing addresses for address ${addr} encountered error: ${failedAddressesWithErrors[addr]}`) + }) + console.log(`${this.CHRONIK_MSG_PREFIX}: Missed txs successfully synced per address:`) + Object.keys(successfulAddressesWithCount).forEach((addr) => { + if (successfulAddressesWithCount[addr] > 0) { + console.log(`${this.CHRONIK_MSG_PREFIX}:> ${addr} — ${successfulAddressesWithCount[addr]}.`) + } + }) } public async subscribeInitialAddresses (): Promise { From e0fe3db0574aa8c7200b4d19de3defccc84225b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 08:42:27 -0300 Subject: [PATCH 12/17] logs wip --- services/chronikService.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index f29dae1b..fcd4ba2a 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -844,32 +844,38 @@ export class ChronikBlockchainClient { for await (const batch of this.fetchLatestTxsForAddresses(addresses)) { if (batch.addressesSynced.length > 0) { - // marcador de slice => desmarca syncing + console.log(`${pfx} marking batch as syncing...`) await setSyncingBatch(batch.addressesSynced, false) + console.log(`${pfx} marked batch as syncing.`) continue } const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id)) try { + console.log(`${pfx} getting ${batch.chronikTxs.length} txs from chronik...`) const pairsFromBatch: RowWithRaw[] = await Promise.all( batch.chronikTxs.map(async ({ tx, address }) => { const row = await this.getTransactionFromChronikTransaction(tx, address) return { row, raw: tx } }) ) + console.log(`${pfx} got txs from chronik.`) for (const { row } of pairsFromBatch) { perAddrCount.set(row.addressId, (perAddrCount.get(row.addressId) ?? 0) + 1) } + console.log(`${pfx} added ${pairsFromBatch.length} to commit buffer ${toCommit.length}`) toCommit.push(...pairsFromBatch) if (toCommit.length >= DB_COMMIT_BATCH_SIZE) { + console.log(`${pfx} ${toCommit.length} reached commit batch size of ${DB_COMMIT_BATCH_SIZE}, committing to DB...`) const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE) toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE) const rows = commitPairs.map(p => p.row) + console.log(`${pfx} creating txs from ${rows.length} rows...`) const createdTxs = await createManyTransactions(rows) console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}`) From 9133b71e63c815b938a81dce5d6151e20030b83e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 08:56:47 -0300 Subject: [PATCH 13/17] const --- prisma-local/clientInstance.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prisma-local/clientInstance.ts b/prisma-local/clientInstance.ts index 8916330c..c815a70b 100644 --- a/prisma-local/clientInstance.ts +++ b/prisma-local/clientInstance.ts @@ -2,8 +2,8 @@ import { PrismaClient, Prisma } from '@prisma/client' let prisma: PrismaClient -const CONNECTION_LIMIT = 5 -const POOL_TIMEOUT = 30 +const CONNECTION_LIMIT = 10 +const POOL_TIMEOUT = 60 const SLOW_QUERY_THRESHOLD_MS = 10000 function buildDatasourceUrl (): string { From f431409155cf69d40825cdc3e73ce09dd1f56047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 09:03:51 -0300 Subject: [PATCH 14/17] logs --- services/chronikService.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/services/chronikService.ts b/services/chronikService.ts index fcd4ba2a..30af77c7 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -857,6 +857,7 @@ export class ChronikBlockchainClient { const pairsFromBatch: RowWithRaw[] = await Promise.all( batch.chronikTxs.map(async ({ tx, address }) => { const row = await this.getTransactionFromChronikTransaction(tx, address) + console.log(`${pfx} got 1 row from chronik for address ${address.address}`) return { row, raw: tx } }) ) From caa02f736c0e31ced81d87d62330598b24da43d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 09:25:31 -0300 Subject: [PATCH 15/17] don't upsert addresses --- services/chronikService.ts | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index 30af77c7..1e8dfc86 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -20,7 +20,7 @@ import { import { Address, Prisma, ClientPaymentStatus } from '@prisma/client' import xecaddr from 'xecaddrjs' import { getAddressPrefix, satoshisToUnit } from 'utils/index' -import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced, upsertAddress } from './addressService' +import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced } from './addressService' import * as ws from 'ws' import { BroadcastTxData } from 'ws-service/types' import config from 'config' @@ -287,23 +287,7 @@ export class ChronikBlockchainClient { const { amount, opReturn } = await this.getTransactionAmountAndData(transaction, address.address) const inputAddresses = this.getSortedInputAddresses(transaction) const outputAddresses = this.getSortedOutputAddresses(transaction) - - const uniqueAddressStrings = [...new Set([ - ...inputAddresses.map(({ address: addr }) => addr), - ...outputAddresses.map(({ address: addr }) => addr) - ])] const addressIdMap = new Map() - await Promise.all( - uniqueAddressStrings.map(async (addrStr) => { - try { - const parsed = parseAddress(addrStr) - const addr = await upsertAddress(parsed) - addressIdMap.set(parsed, addr.id) - } catch { - // Skip invalid addresses: don't upsert, don't add to map - } - }) - ) const getAddressId = (addr: string): string | undefined => { try { From 15004df2ca8005505ca3b77c1d136511ee5d0af5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 09:49:44 -0300 Subject: [PATCH 16/17] wip --- jobs/initJobs.ts | 38 +++++++++++++++++++++++++++----------- services/chronikService.ts | 1 - 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/jobs/initJobs.ts b/jobs/initJobs.ts index 0a25b755..a1d968f4 100644 --- a/jobs/initJobs.ts +++ b/jobs/initJobs.ts @@ -1,5 +1,5 @@ import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY } from 'constants/index' -import { Queue } from 'bullmq' +import { Queue, QueueEvents } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import EventEmitter from 'events' import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker } from './workers' @@ -16,6 +16,32 @@ const main = async (): Promise => { await blockchainQueue.obliterate({ force: true }) await cleanupQueue.obliterate({ force: true }) + await blockchainQueue.add('syncBlockchainAndPrices', + {}, + { + jobId: 'syncBlockchainAndPrices', + removeOnComplete: true, + removeOnFail: true + } + ) + await syncBlockchainAndPricesWorker(blockchainQueue.name) + + const blockchainEvents = new QueueEvents('blockchainSync', { connection: redisBullMQ }) + await new Promise((resolve) => { + blockchainEvents.on('completed', async ({ jobId }) => { + if (jobId === 'syncBlockchainAndPrices') { + await blockchainEvents.close() + resolve() + } + }) + blockchainEvents.on('failed', async ({ jobId }) => { + if (jobId === 'syncBlockchainAndPrices') { + await blockchainEvents.close() + resolve() + } + }) + }) + await pricesQueue.add('syncCurrentPrices', {}, { @@ -29,16 +55,6 @@ const main = async (): Promise => { await syncCurrentPricesWorker(pricesQueue.name) - await blockchainQueue.add('syncBlockchainAndPrices', - {}, - { - jobId: 'syncBlockchainAndPrices', - removeOnComplete: true, - removeOnFail: true - } - ) - await syncBlockchainAndPricesWorker(blockchainQueue.name) - await cleanupQueue.add( 'cleanupClientPayments', {}, diff --git a/services/chronikService.ts b/services/chronikService.ts index 1e8dfc86..b94827aa 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -841,7 +841,6 @@ export class ChronikBlockchainClient { const pairsFromBatch: RowWithRaw[] = await Promise.all( batch.chronikTxs.map(async ({ tx, address }) => { const row = await this.getTransactionFromChronikTransaction(tx, address) - console.log(`${pfx} got 1 row from chronik for address ${address.address}`) return { row, raw: tx } }) ) From 69fd5d7c74403d043fed999097c599f720754b37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Estev=C3=A3o?= Date: Thu, 26 Feb 2026 10:35:07 -0300 Subject: [PATCH 17/17] wip --- jobs/initJobs.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/jobs/initJobs.ts b/jobs/initJobs.ts index a1d968f4..41401a95 100644 --- a/jobs/initJobs.ts +++ b/jobs/initJobs.ts @@ -28,16 +28,14 @@ const main = async (): Promise => { const blockchainEvents = new QueueEvents('blockchainSync', { connection: redisBullMQ }) await new Promise((resolve) => { - blockchainEvents.on('completed', async ({ jobId }) => { + blockchainEvents.on('completed', ({ jobId }) => { if (jobId === 'syncBlockchainAndPrices') { - await blockchainEvents.close() - resolve() + void blockchainEvents.close().then(() => resolve()) } }) - blockchainEvents.on('failed', async ({ jobId }) => { + blockchainEvents.on('failed', ({ jobId }) => { if (jobId === 'syncBlockchainAndPrices') { - await blockchainEvents.close() - resolve() + void blockchainEvents.close().then(() => resolve()) } }) })