diff --git a/constants/index.ts b/constants/index.ts index de1daea1..d7424834 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -174,7 +174,9 @@ export const SYNC_TXS_JOBS_RETRY_DELAY = 2000 export const BCH_TIMESTAMP_THRESHOLD = 1501588800 // 2017 Aug 1, 12PM export const XEC_TIMESTAMP_THRESHOLD = 1605398400 // 2020 Nov 15, 12AM -export const DEFAULT_WORKER_LOCK_DURATION = 120000 +export const PRICE_SYNC_WORKER_LOCK_DURATION = 180000 +export const BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION = 900000 +export const CLEANUP_WORKER_LOCK_DURATION = 120000 // Wait time (in ms) between sync of current prices export const CURRENT_PRICE_REPEAT_DELAY = 60000 @@ -197,7 +199,7 @@ export const QUOTE_IDS: KeyValueT = { USD: 1, CAD: 2 } export type BLOCKCHAIN_CLIENT_OPTIONS = 'chronik' -export const UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT = 45000 +export const UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT = 90000 export const DEFAULT_TX_PAGE_SIZE = 100 export const PAYMENT_WEEK_KEY_FORMAT = 'YYYY:WW' @@ -276,10 +278,10 @@ export const MEMPOOL_PROCESS_DELAY = 100 // When fetching some address transactions, number of transactions to fetch at a time. // On chronik, the max allowed is 200 export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200 - -export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 128 -export const TX_EMIT_BATCH_SIZE = 2_000 // for our generator, not chronik -export const DB_COMMIT_BATCH_SIZE = 2_000 // tamanho dos lotes para commit no DB +export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 1 +export const TX_EMIT_BATCH_SIZE = 500 +export const DB_COMMIT_BATCH_SIZE = 500 +export const SYNC_MISSED_TXS_BATCH_SIZE = 50 export const TRIGGER_POST_CONCURRENCY = 100 export const TRIGGER_EMAIL_CONCURRENCY = 100 diff --git a/jobs/initJobs.ts b/jobs/initJobs.ts index 0a25b755..41401a95 100644 --- a/jobs/initJobs.ts +++ b/jobs/initJobs.ts @@ -1,5 +1,5 @@ import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY } from 'constants/index' -import { Queue } from 'bullmq' +import { Queue, QueueEvents } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import EventEmitter from 'events' import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker } from './workers' @@ -16,6 +16,30 @@ const main = async (): Promise => { await blockchainQueue.obliterate({ force: true }) await cleanupQueue.obliterate({ force: true }) + await blockchainQueue.add('syncBlockchainAndPrices', + {}, + { + jobId: 'syncBlockchainAndPrices', + removeOnComplete: true, + removeOnFail: true + } + ) + await syncBlockchainAndPricesWorker(blockchainQueue.name) + + const blockchainEvents = new QueueEvents('blockchainSync', { connection: redisBullMQ }) + await new Promise((resolve) => { + blockchainEvents.on('completed', ({ jobId }) => { + if (jobId === 'syncBlockchainAndPrices') { + void blockchainEvents.close().then(() => resolve()) + } + }) + blockchainEvents.on('failed', ({ jobId }) => { + if (jobId === 'syncBlockchainAndPrices') { + void blockchainEvents.close().then(() => resolve()) + } + }) + }) + await pricesQueue.add('syncCurrentPrices', {}, { @@ -29,16 +53,6 @@ const main = async (): Promise => { await syncCurrentPricesWorker(pricesQueue.name) - await blockchainQueue.add('syncBlockchainAndPrices', - {}, - { - jobId: 'syncBlockchainAndPrices', - removeOnComplete: true, - removeOnFail: true - } - ) - await syncBlockchainAndPricesWorker(blockchainQueue.name) - await cleanupQueue.add( 'cleanupClientPayments', {}, diff --git a/jobs/workers.ts b/jobs/workers.ts index d913176f..460f2bc4 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -1,6 +1,6 @@ import { Worker } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' -import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index' +import { PRICE_SYNC_WORKER_LOCK_DURATION, BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION, CLEANUP_WORKER_LOCK_DURATION } from 'constants/index' import { multiBlockchainClient } from 'services/chronikService' import { connectAllTransactionsToPrices } from 'services/transactionService' import { cleanupExpiredClientPayments } from 'services/clientPaymentService' @@ -16,7 +16,7 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise }, { connection: redisBullMQ, - lockDuration: DEFAULT_WORKER_LOCK_DURATION + lockDuration: PRICE_SYNC_WORKER_LOCK_DURATION } ) worker.on('completed', job => { @@ -42,7 +42,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string): Promise< }, { connection: redisBullMQ, - lockDuration: DEFAULT_WORKER_LOCK_DURATION + lockDuration: BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION } ) @@ -76,7 +76,7 @@ export const cleanupClientPaymentsWorker = async (queueName: string): Promise ({ level, emit: 'event' as const })), + { level: 'query' as const, emit: 'event' as const } + ] + }) + + // Log slow queries even when PRISMA_LOG is off + client.$on('query', (e: Prisma.QueryEvent) => { + if (e.duration >= SLOW_QUERY_THRESHOLD_MS) { + console.warn(`[Prisma SLOW QUERY] ${e.duration}ms: ${e.query.slice(0, 200)}...`) + } + }) + + return client +} + interface CustomNodeJsGlobal extends NodeJS.Global { prisma: PrismaClient } declare const global: CustomNodeJsGlobal if (process.env.NODE_ENV === 'production') { - prisma = new PrismaClient() + prisma = createPrismaClient() } else { if (global.prisma === undefined) { - global.prisma = new PrismaClient() + global.prisma = createPrismaClient() } prisma = global.prisma } export default prisma - -// https://github.com/prisma/prisma/issues/1983#issuecomment-620621213 diff --git a/redis/paymentCache.ts b/redis/paymentCache.ts index c59d6fea..9911e570 100755 --- a/redis/paymentCache.ts +++ b/redis/paymentCache.ts @@ -33,6 +33,7 @@ export async function * getUserUncachedAddresses (userId: string): AsyncGenerato export const getPaymentList = async (userId: string): Promise => { const uncachedAddressStream = getUserUncachedAddresses(userId) for await (const address of uncachedAddressStream) { + console.log('[CACHE]: Creating cache for uncached address', address.address) void await CacheSet.addressCreation(address) } return await getCachedPaymentsForUser(userId) @@ -283,6 +284,7 @@ export const clearRecentAddressCache = async (addressString: string, timestamps: export const initPaymentCache = async (address: Address): Promise => { const cachedKeys = await getCachedWeekKeysForAddress(address.address) if (cachedKeys.length === 0) { + console.log('[CACHE]: Initializing cache for address', address.address) await CacheSet.addressCreation(address) return true } diff --git a/scripts/paybutton-server-start.sh b/scripts/paybutton-server-start.sh index 1028c14f..799d7e8c 100755 --- a/scripts/paybutton-server-start.sh +++ b/scripts/paybutton-server-start.sh @@ -12,6 +12,12 @@ echo Connected to the db. yarn || exit 1 # Clear logs +start_processes() { + pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs + pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer + pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- "$1" +} + logtime=$(date +%Y-%m-%d@%H:%M) [ -e logs/next.log ] && mv logs/next.log logs/history/next_"$logtime".log [ -e logs/jobs.log ] && mv logs/jobs.log logs/history/jobs_"$logtime".log @@ -20,14 +26,11 @@ logtime=$(date +%Y-%m-%d@%H:%M) if [ "$ENVIRONMENT" = "production" ]; then yarn prisma migrate deploy || exit 1 yarn prisma generate || exit 1 - pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs || exit 1 - pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer || exit 1 - pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- prod || exit 1 + start_processes prod else yarn prisma migrate dev || exit 1 yarn prisma db seed || exit 1 - pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs || exit 1 - pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer || exit 1 - pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- dev || exit 1 + start_processes dev fi pm2 logs next + diff --git a/services/chronikService.ts b/services/chronikService.ts index 63b708e0..b94827aa 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -1,7 +1,7 @@ import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client' import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs' import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes' -import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE } from 'constants/index' +import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, SYNC_MISSED_TXS_BATCH_SIZE } from 'constants/index' import { productionAddresses } from 'prisma-local/seeds/addresses' import { TransactionWithAddressAndPrices, @@ -20,7 +20,7 @@ import { import { Address, Prisma, ClientPaymentStatus } from '@prisma/client' import xecaddr from 'xecaddrjs' import { getAddressPrefix, satoshisToUnit } from 'utils/index' -import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced, upsertAddress } from './addressService' +import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced } from './addressService' import * as ws from 'ws' import { BroadcastTxData } from 'ws-service/types' import config from 'config' @@ -287,23 +287,7 @@ export class ChronikBlockchainClient { const { amount, opReturn } = await this.getTransactionAmountAndData(transaction, address.address) const inputAddresses = this.getSortedInputAddresses(transaction) const outputAddresses = this.getSortedOutputAddresses(transaction) - - const uniqueAddressStrings = [...new Set([ - ...inputAddresses.map(({ address: addr }) => addr), - ...outputAddresses.map(({ address: addr }) => addr) - ])] const addressIdMap = new Map() - await Promise.all( - uniqueAddressStrings.map(async (addrStr) => { - try { - const parsed = parseAddress(addrStr) - const addr = await upsertAddress(parsed) - addressIdMap.set(parsed, addr.id) - } catch { - // Skip invalid addresses: don't upsert, don't add to map - } - }) - ) const getAddressId = (addr: string): string | undefined => { try { @@ -844,32 +828,38 @@ export class ChronikBlockchainClient { for await (const batch of this.fetchLatestTxsForAddresses(addresses)) { if (batch.addressesSynced.length > 0) { - // marcador de slice => desmarca syncing + console.log(`${pfx} marking batch as syncing...`) await setSyncingBatch(batch.addressesSynced, false) + console.log(`${pfx} marked batch as syncing.`) continue } const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id)) try { + console.log(`${pfx} getting ${batch.chronikTxs.length} txs from chronik...`) const pairsFromBatch: RowWithRaw[] = await Promise.all( batch.chronikTxs.map(async ({ tx, address }) => { const row = await this.getTransactionFromChronikTransaction(tx, address) return { row, raw: tx } }) ) + console.log(`${pfx} got txs from chronik.`) for (const { row } of pairsFromBatch) { perAddrCount.set(row.addressId, (perAddrCount.get(row.addressId) ?? 0) + 1) } + console.log(`${pfx} added ${pairsFromBatch.length} to commit buffer ${toCommit.length}`) toCommit.push(...pairsFromBatch) if (toCommit.length >= DB_COMMIT_BATCH_SIZE) { + console.log(`${pfx} ${toCommit.length} reached commit batch size of ${DB_COMMIT_BATCH_SIZE}, committing to DB...`) const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE) toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE) const rows = commitPairs.map(p => p.row) + console.log(`${pfx} creating txs from ${rows.length} rows...`) const createdTxs = await createManyTransactions(rows) console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}`) @@ -958,20 +948,30 @@ export class ChronikBlockchainClient { public async syncMissedTransactions (): Promise { const addresses = await fetchAllAddressesForNetworkId(this.networkId) - try { - const { failedAddressesWithErrors, successfulAddressesWithCount } = await this.syncAddresses(addresses, true) - Object.keys(failedAddressesWithErrors).forEach((addr) => { - console.error(`${this.CHRONIK_MSG_PREFIX}: When syncing missing addresses for address ${addr} encountered error: ${failedAddressesWithErrors[addr]}`) - }) - console.log(`${this.CHRONIK_MSG_PREFIX}: Missed txs successfully synced per address:`) - Object.keys(successfulAddressesWithCount).forEach((addr) => { - if (successfulAddressesWithCount[addr] > 0) { - console.log(`${this.CHRONIK_MSG_PREFIX}:> ${addr} — ${successfulAddressesWithCount[addr]}.`) - } - }) - } catch (err: any) { - console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR: (skipping anyway) initial missing transactions sync failed: ${err.message as string} ${err.stack as string}`) + const failedAddressesWithErrors: KeyValueT = {} + const successfulAddressesWithCount: KeyValueT = {} + + for (let i = 0; i < addresses.length; i += SYNC_MISSED_TXS_BATCH_SIZE) { + const batch = addresses.slice(i, i + SYNC_MISSED_TXS_BATCH_SIZE) + console.log(`${this.CHRONIK_MSG_PREFIX}: Syncing missed txs batch ${Math.floor(i / SYNC_MISSED_TXS_BATCH_SIZE) + 1}/${Math.ceil(addresses.length / SYNC_MISSED_TXS_BATCH_SIZE)} (${batch.length} addresses)`) + try { + const result = await this.syncAddresses(batch, true) + Object.assign(failedAddressesWithErrors, result.failedAddressesWithErrors) + Object.assign(successfulAddressesWithCount, result.successfulAddressesWithCount) + } catch (err: any) { + console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR: (skipping batch) missed transactions sync failed for batch starting at index ${i}: ${err.message as string} ${err.stack as string}`) + } } + + Object.keys(failedAddressesWithErrors).forEach((addr) => { + console.error(`${this.CHRONIK_MSG_PREFIX}: When syncing missing addresses for address ${addr} encountered error: ${failedAddressesWithErrors[addr]}`) + }) + console.log(`${this.CHRONIK_MSG_PREFIX}: Missed txs successfully synced per address:`) + Object.keys(successfulAddressesWithCount).forEach((addr) => { + if (successfulAddressesWithCount[addr] > 0) { + console.log(`${this.CHRONIK_MSG_PREFIX}:> ${addr} — ${successfulAddressesWithCount[addr]}.`) + } + }) } public async subscribeInitialAddresses (): Promise { @@ -1178,10 +1178,8 @@ class MultiBlockchainClient { public async syncMissedTransactions (): Promise { await this.waitForStart() - await Promise.all([ - this.clients.ecash.syncMissedTransactions(), - this.clients.bitcoincash.syncMissedTransactions() - ]) + await this.clients.ecash.syncMissedTransactions() + await this.clients.bitcoincash.syncMissedTransactions() } public async syncAndSubscribeAddresses (addresses: Address[]): Promise { diff --git a/services/transactionService.ts b/services/transactionService.ts index 825772b9..8d34c67b 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -259,7 +259,7 @@ export async function fetchTransactionsWithPaybuttonsAndPricesForIdList (txIdLis }) } -export async function * generateTransactionsWithPaybuttonsAndPricesForAddress (addressId: string, pageSize = 5000): AsyncGenerator { +export async function * generateTransactionsWithPaybuttonsAndPricesForAddress (addressId: string, pageSize = 500): AsyncGenerator { let page = 0 while (true) { @@ -568,30 +568,46 @@ export async function createManyTransactions ( ): Promise { const insertedTransactionsDistinguished: TxDistinguished[] = [] - await prisma.$transaction( - async (prisma) => { - const BATCH_SIZE = 50 - for (let i = 0; i < transactionsData.length; i += BATCH_SIZE) { + const txStart = Date.now() + console.log(`[createManyTransactions] Starting transaction for ${transactionsData.length} transactions`) + + const BATCH_SIZE = 50 + const UPSERT_PARALLELISM = 10 + const totalBatches = Math.ceil(transactionsData.length / BATCH_SIZE) + for (let i = 0; i < transactionsData.length; i += BATCH_SIZE) { + await prisma.$transaction( + async (prisma) => { + const batchNum = Math.floor(i / BATCH_SIZE) + 1 + const batchStart = Date.now() const batch = transactionsData.slice(i, i + BATCH_SIZE) - const results = await Promise.all( - batch.map(async (tx) => - await prisma.transaction.upsert({ - create: tx, - where: { - Transaction_hash_addressId_unique_constraint: { - hash: tx.hash, - addressId: tx.addressId - } - }, - update: { - confirmed: tx.confirmed, - timestamp: tx.timestamp - }, - include: includeNetwork - }) + const results: TransactionWithNetwork[] = [] + for (let j = 0; j < batch.length; j += UPSERT_PARALLELISM) { + const upsertSlice = batch.slice(j, j + UPSERT_PARALLELISM) + const sliceResults = await Promise.all( + upsertSlice.map(async (tx) => + await prisma.transaction.upsert({ + create: tx, + where: { + Transaction_hash_addressId_unique_constraint: { + hash: tx.hash, + addressId: tx.addressId + } + }, + update: { + confirmed: tx.confirmed, + timestamp: tx.timestamp + }, + include: includeNetwork + }) + ) ) - ) + results.push(...sliceResults) + } + + const batchElapsed = Date.now() - batchStart + const totalElapsed = Date.now() - txStart + console.log(`[createManyTransactions] Batch ${batchNum}/${totalBatches}: ${batch.length} upserts in ${batchElapsed}ms (parallelism=${UPSERT_PARALLELISM}, total: ${totalElapsed}ms / ${UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT}ms timeout)`) for (const upsertedTx of results) { insertedTransactionsDistinguished.push({ @@ -599,12 +615,10 @@ export async function createManyTransactions ( isCreated: upsertedTx.createdAt.getTime() === upsertedTx.updatedAt.getTime() }) } - } - }, - { - timeout: UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT - } - ) + }, { timeout: UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT } + ) + console.log(`[createManyTransactions] Transaction completed in ${Date.now() - txStart}ms`) + } const insertedTransactions = insertedTransactionsDistinguished .filter((txD) => txD.isCreated)