Skip to content
14 changes: 8 additions & 6 deletions constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ export const SYNC_TXS_JOBS_RETRY_DELAY = 2000
export const BCH_TIMESTAMP_THRESHOLD = 1501588800 // 2017 Aug 1, 12PM
export const XEC_TIMESTAMP_THRESHOLD = 1605398400 // 2020 Nov 15, 12AM

export const DEFAULT_WORKER_LOCK_DURATION = 120000
export const PRICE_SYNC_WORKER_LOCK_DURATION = 180000
export const BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION = 900000
export const CLEANUP_WORKER_LOCK_DURATION = 120000
// Wait time (in ms) between sync of current prices
export const CURRENT_PRICE_REPEAT_DELAY = 60000

Expand All @@ -197,7 +199,7 @@ export const QUOTE_IDS: KeyValueT<number> = { USD: 1, CAD: 2 }

export type BLOCKCHAIN_CLIENT_OPTIONS = 'chronik'

export const UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT = 45000
export const UPSERT_TRANSACTION_PRICES_ON_DB_TIMEOUT = 90000
export const DEFAULT_TX_PAGE_SIZE = 100

export const PAYMENT_WEEK_KEY_FORMAT = 'YYYY:WW'
Expand Down Expand Up @@ -276,10 +278,10 @@ export const MEMPOOL_PROCESS_DELAY = 100
// When fetching some address transactions, number of transactions to fetch at a time.
// On chronik, the max allowed is 200
export const CHRONIK_FETCH_N_TXS_PER_PAGE = 200

export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 128
export const TX_EMIT_BATCH_SIZE = 2_000 // for our generator, not chronik
export const DB_COMMIT_BATCH_SIZE = 2_000 // tamanho dos lotes para commit no DB
export const INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY = 1
export const TX_EMIT_BATCH_SIZE = 500
export const DB_COMMIT_BATCH_SIZE = 500
export const SYNC_MISSED_TXS_BATCH_SIZE = 50

export const TRIGGER_POST_CONCURRENCY = 100
export const TRIGGER_EMAIL_CONCURRENCY = 100
Expand Down
36 changes: 25 additions & 11 deletions jobs/initJobs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CLIENT_PAYMENT_EXPIRATION_TIME, CURRENT_PRICE_REPEAT_DELAY } from 'constants/index'
import { Queue } from 'bullmq'
import { Queue, QueueEvents } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import EventEmitter from 'events'
import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker, cleanupClientPaymentsWorker } from './workers'
Expand All @@ -16,6 +16,30 @@ const main = async (): Promise<void> => {
await blockchainQueue.obliterate({ force: true })
await cleanupQueue.obliterate({ force: true })

await blockchainQueue.add('syncBlockchainAndPrices',
{},
{
jobId: 'syncBlockchainAndPrices',
removeOnComplete: true,
removeOnFail: true
}
)
await syncBlockchainAndPricesWorker(blockchainQueue.name)

const blockchainEvents = new QueueEvents('blockchainSync', { connection: redisBullMQ })
await new Promise<void>((resolve) => {
blockchainEvents.on('completed', ({ jobId }) => {
if (jobId === 'syncBlockchainAndPrices') {
void blockchainEvents.close().then(() => resolve())
}
})
blockchainEvents.on('failed', ({ jobId }) => {
if (jobId === 'syncBlockchainAndPrices') {
void blockchainEvents.close().then(() => resolve())
}
})
})

await pricesQueue.add('syncCurrentPrices',
{},
{
Expand All @@ -29,16 +53,6 @@ const main = async (): Promise<void> => {

await syncCurrentPricesWorker(pricesQueue.name)

await blockchainQueue.add('syncBlockchainAndPrices',
{},
{
jobId: 'syncBlockchainAndPrices',
removeOnComplete: true,
removeOnFail: true
}
)
await syncBlockchainAndPricesWorker(blockchainQueue.name)

await cleanupQueue.add(
'cleanupClientPayments',
{},
Expand Down
8 changes: 4 additions & 4 deletions jobs/workers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Worker } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index'
import { PRICE_SYNC_WORKER_LOCK_DURATION, BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION, CLEANUP_WORKER_LOCK_DURATION } from 'constants/index'
import { multiBlockchainClient } from 'services/chronikService'
import { connectAllTransactionsToPrices } from 'services/transactionService'
import { cleanupExpiredClientPayments } from 'services/clientPaymentService'
Expand All @@ -16,7 +16,7 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise<void>
},
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
lockDuration: PRICE_SYNC_WORKER_LOCK_DURATION
}
)
worker.on('completed', job => {
Expand All @@ -42,7 +42,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string): Promise<
},
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
lockDuration: BLOCKCHAIN_SYNC_WORKER_LOCK_DURATION
}
)

Expand Down Expand Up @@ -76,7 +76,7 @@ export const cleanupClientPaymentsWorker = async (queueName: string): Promise<vo
},
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
lockDuration: CLEANUP_WORKER_LOCK_DURATION
}
)

Expand Down
41 changes: 36 additions & 5 deletions prisma-local/clientInstance.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,53 @@
import { PrismaClient } from '@prisma/client'
import { PrismaClient, Prisma } from '@prisma/client'

let prisma: PrismaClient

const CONNECTION_LIMIT = 10
const POOL_TIMEOUT = 60
const SLOW_QUERY_THRESHOLD_MS = 10000

function buildDatasourceUrl (): string {
const baseUrl = process.env.DATABASE_URL ?? ''
const separator = baseUrl.includes('?') ? '&' : '?'
return `${baseUrl}${separator}connection_limit=${CONNECTION_LIMIT}&pool_timeout=${POOL_TIMEOUT}`
}

function createPrismaClient (): PrismaClient {
const logLevels: Prisma.LogLevel[] = process.env.PRISMA_LOG === 'true'
? ['query', 'info', 'warn', 'error']
: ['warn', 'error']

const client = new PrismaClient({
datasources: { db: { url: buildDatasourceUrl() } },
log: [
...logLevels.map(level => ({ level, emit: 'event' as const })),
{ level: 'query' as const, emit: 'event' as const }
]
})

// Log slow queries even when PRISMA_LOG is off
client.$on('query', (e: Prisma.QueryEvent) => {
if (e.duration >= SLOW_QUERY_THRESHOLD_MS) {
console.warn(`[Prisma SLOW QUERY] ${e.duration}ms: ${e.query.slice(0, 200)}...`)
}
})

return client
}

interface CustomNodeJsGlobal extends NodeJS.Global {
prisma: PrismaClient
}
declare const global: CustomNodeJsGlobal

if (process.env.NODE_ENV === 'production') {
prisma = new PrismaClient()
prisma = createPrismaClient()
} else {
if (global.prisma === undefined) {
global.prisma = new PrismaClient()
global.prisma = createPrismaClient()
}

prisma = global.prisma
}

export default prisma

// https://github.com/prisma/prisma/issues/1983#issuecomment-620621213
2 changes: 2 additions & 0 deletions redis/paymentCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export async function * getUserUncachedAddresses (userId: string): AsyncGenerato
export const getPaymentList = async (userId: string): Promise<Payment[]> => {
const uncachedAddressStream = getUserUncachedAddresses(userId)
for await (const address of uncachedAddressStream) {
console.log('[CACHE]: Creating cache for uncached address', address.address)
void await CacheSet.addressCreation(address)
}
return await getCachedPaymentsForUser(userId)
Expand Down Expand Up @@ -283,6 +284,7 @@ export const clearRecentAddressCache = async (addressString: string, timestamps:
export const initPaymentCache = async (address: Address): Promise<boolean> => {
const cachedKeys = await getCachedWeekKeysForAddress(address.address)
if (cachedKeys.length === 0) {
console.log('[CACHE]: Initializing cache for address', address.address)
await CacheSet.addressCreation(address)
return true
}
Expand Down
15 changes: 9 additions & 6 deletions scripts/paybutton-server-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ echo Connected to the db.
yarn || exit 1
# Clear logs

start_processes() {
pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs
pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer
pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- "$1"
}

logtime=$(date +%Y-%m-%d@%H:%M)
[ -e logs/next.log ] && mv logs/next.log logs/history/next_"$logtime".log
[ -e logs/jobs.log ] && mv logs/jobs.log logs/history/jobs_"$logtime".log
Expand All @@ -20,14 +26,11 @@ logtime=$(date +%Y-%m-%d@%H:%M)
if [ "$ENVIRONMENT" = "production" ]; then
yarn prisma migrate deploy || exit 1
yarn prisma generate || exit 1
pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs || exit 1
pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer || exit 1
pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- prod || exit 1
start_processes prod
else
yarn prisma migrate dev || exit 1
yarn prisma db seed || exit 1
pm2 start yarn --time --interpreter ash --name jobs --output logs/jobs.log --error logs/jobs.log -- initJobs || exit 1
pm2 start yarn --time --interpreter ash --name WSServer --output logs/ws-server.log --error logs/ws-server.log -- initWSServer || exit 1
pm2 start yarn --time --interpreter ash --name next --output logs/next.log --error logs/next.log -- dev || exit 1
start_processes dev
fi
pm2 logs next

70 changes: 34 additions & 36 deletions services/chronikService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BlockInfo, ChronikClient, ConnectionStrategy, ScriptUtxo, Tx, WsConfig, WsEndpoint, WsMsgClient, WsSubScriptClient } from 'chronik-client'
import { encodeCashAddress, decodeCashAddress } from 'ecashaddrjs'
import { AddressWithTransaction, BlockchainInfo, TransactionDetails, ProcessedMessages, SubbedAddressesLog, SyncAndSubscriptionReturn, SubscriptionReturn, SimpleBlockInfo } from 'types/chronikTypes'
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE } from 'constants/index'
import { CHRONIK_MESSAGE_CACHE_DELAY, RESPONSE_MESSAGES, XEC_TIMESTAMP_THRESHOLD, XEC_NETWORK_ID, BCH_NETWORK_ID, BCH_TIMESTAMP_THRESHOLD, CHRONIK_FETCH_N_TXS_PER_PAGE, KeyValueT, NETWORK_IDS_FROM_SLUGS, SOCKET_MESSAGES, NETWORK_IDS, NETWORK_TICKERS, MainNetworkSlugsType, MAX_MEMPOOL_TXS_TO_PROCESS_AT_A_TIME, MEMPOOL_PROCESS_DELAY, CHRONIK_INITIALIZATION_DELAY, LATENCY_TEST_CHECK_DELAY, INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY, TX_EMIT_BATCH_SIZE, DB_COMMIT_BATCH_SIZE, SYNC_MISSED_TXS_BATCH_SIZE } from 'constants/index'
import { productionAddresses } from 'prisma-local/seeds/addresses'
import {
TransactionWithAddressAndPrices,
Expand All @@ -20,7 +20,7 @@ import {
import { Address, Prisma, ClientPaymentStatus } from '@prisma/client'
import xecaddr from 'xecaddrjs'
import { getAddressPrefix, satoshisToUnit } from 'utils/index'
import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced, upsertAddress } from './addressService'
import { fetchAddressesArray, fetchAllAddressesForNetworkId, getEarliestUnconfirmedTxTimestampForAddress, getLatestConfirmedTxTimestampForAddress, setSyncing, setSyncingBatch, updateLastSynced, updateManyLastSynced } from './addressService'
import * as ws from 'ws'
import { BroadcastTxData } from 'ws-service/types'
import config from 'config'
Expand Down Expand Up @@ -287,23 +287,7 @@ export class ChronikBlockchainClient {
const { amount, opReturn } = await this.getTransactionAmountAndData(transaction, address.address)
const inputAddresses = this.getSortedInputAddresses(transaction)
const outputAddresses = this.getSortedOutputAddresses(transaction)

const uniqueAddressStrings = [...new Set([
...inputAddresses.map(({ address: addr }) => addr),
...outputAddresses.map(({ address: addr }) => addr)
])]
const addressIdMap = new Map<string, string>()
await Promise.all(
uniqueAddressStrings.map(async (addrStr) => {
try {
const parsed = parseAddress(addrStr)
const addr = await upsertAddress(parsed)
addressIdMap.set(parsed, addr.id)
} catch {
// Skip invalid addresses: don't upsert, don't add to map
}
})
)

const getAddressId = (addr: string): string | undefined => {
try {
Expand Down Expand Up @@ -844,32 +828,38 @@ export class ChronikBlockchainClient {

for await (const batch of this.fetchLatestTxsForAddresses(addresses)) {
if (batch.addressesSynced.length > 0) {
// marcador de slice => desmarca syncing
console.log(`${pfx} marking batch as syncing...`)
await setSyncingBatch(batch.addressesSynced, false)
console.log(`${pfx} marked batch as syncing.`)
continue
}

const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id))

try {
console.log(`${pfx} getting ${batch.chronikTxs.length} txs from chronik...`)
const pairsFromBatch: RowWithRaw[] = await Promise.all(
batch.chronikTxs.map(async ({ tx, address }) => {
const row = await this.getTransactionFromChronikTransaction(tx, address)
return { row, raw: tx }
})
)
console.log(`${pfx} got txs from chronik.`)

for (const { row } of pairsFromBatch) {
perAddrCount.set(row.addressId, (perAddrCount.get(row.addressId) ?? 0) + 1)
}

console.log(`${pfx} added ${pairsFromBatch.length} to commit buffer ${toCommit.length}`)
toCommit.push(...pairsFromBatch)

if (toCommit.length >= DB_COMMIT_BATCH_SIZE) {
console.log(`${pfx} ${toCommit.length} reached commit batch size of ${DB_COMMIT_BATCH_SIZE}, committing to DB...`)
const commitPairs = toCommit.slice(0, DB_COMMIT_BATCH_SIZE)
toCommit = toCommit.slice(DB_COMMIT_BATCH_SIZE)

const rows = commitPairs.map(p => p.row)
console.log(`${pfx} creating txs from ${rows.length} rows...`)
const createdTxs = await createManyTransactions(rows)
console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}`)

Expand Down Expand Up @@ -958,20 +948,30 @@ export class ChronikBlockchainClient {

public async syncMissedTransactions (): Promise<void> {
const addresses = await fetchAllAddressesForNetworkId(this.networkId)
try {
const { failedAddressesWithErrors, successfulAddressesWithCount } = await this.syncAddresses(addresses, true)
Object.keys(failedAddressesWithErrors).forEach((addr) => {
console.error(`${this.CHRONIK_MSG_PREFIX}: When syncing missing addresses for address ${addr} encountered error: ${failedAddressesWithErrors[addr]}`)
})
console.log(`${this.CHRONIK_MSG_PREFIX}: Missed txs successfully synced per address:`)
Object.keys(successfulAddressesWithCount).forEach((addr) => {
if (successfulAddressesWithCount[addr] > 0) {
console.log(`${this.CHRONIK_MSG_PREFIX}:> ${addr} — ${successfulAddressesWithCount[addr]}.`)
}
})
} catch (err: any) {
console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR: (skipping anyway) initial missing transactions sync failed: ${err.message as string} ${err.stack as string}`)
const failedAddressesWithErrors: KeyValueT<string> = {}
const successfulAddressesWithCount: KeyValueT<number> = {}

for (let i = 0; i < addresses.length; i += SYNC_MISSED_TXS_BATCH_SIZE) {
const batch = addresses.slice(i, i + SYNC_MISSED_TXS_BATCH_SIZE)
console.log(`${this.CHRONIK_MSG_PREFIX}: Syncing missed txs batch ${Math.floor(i / SYNC_MISSED_TXS_BATCH_SIZE) + 1}/${Math.ceil(addresses.length / SYNC_MISSED_TXS_BATCH_SIZE)} (${batch.length} addresses)`)
try {
const result = await this.syncAddresses(batch, true)
Object.assign(failedAddressesWithErrors, result.failedAddressesWithErrors)
Object.assign(successfulAddressesWithCount, result.successfulAddressesWithCount)
} catch (err: any) {
console.error(`${this.CHRONIK_MSG_PREFIX}: ERROR: (skipping batch) missed transactions sync failed for batch starting at index ${i}: ${err.message as string} ${err.stack as string}`)
}
}

Object.keys(failedAddressesWithErrors).forEach((addr) => {
console.error(`${this.CHRONIK_MSG_PREFIX}: When syncing missing addresses for address ${addr} encountered error: ${failedAddressesWithErrors[addr]}`)
})
console.log(`${this.CHRONIK_MSG_PREFIX}: Missed txs successfully synced per address:`)
Object.keys(successfulAddressesWithCount).forEach((addr) => {
if (successfulAddressesWithCount[addr] > 0) {
console.log(`${this.CHRONIK_MSG_PREFIX}:> ${addr} — ${successfulAddressesWithCount[addr]}.`)
}
})
}

public async subscribeInitialAddresses (): Promise<void> {
Expand Down Expand Up @@ -1178,10 +1178,8 @@ class MultiBlockchainClient {

public async syncMissedTransactions (): Promise<void> {
await this.waitForStart()
await Promise.all([
this.clients.ecash.syncMissedTransactions(),
this.clients.bitcoincash.syncMissedTransactions()
])
await this.clients.ecash.syncMissedTransactions()
await this.clients.bitcoincash.syncMissedTransactions()
}

public async syncAndSubscribeAddresses (addresses: Address[]): Promise<SyncAndSubscriptionReturn> {
Expand Down
Loading
Loading