This C++ implementation demonstrates high-performance financial data processing using threading and compiler optimizations. It calculates 100+ quantitative features for each row in a large financial dataset (3.9M+ records) using cache-efficient algorithms.
#include <iostream> // Input/output streams
#include <fstream> // File stream operations
#include <vector> // Dynamic arrays
#include <string> // String operations
#include <sstream> // String stream for parsing
#include <chrono> // Time measurement utilities
#include <numeric> // Numeric algorithms (std::accumulate)
#include <unordered_map> // Hash map for storing results
#include <algorithm> // Standard algorithms
#include <cmath> // Mathematical functions
#include <thread> // Threading support
#include <future> // Future/promise for asynchronous operations
#include <valarray> // Value arrays for mathematical operations- Each header provides specific functionality needed for the implementation
<thread>and<future>enable multi-threading capabilities<chrono>provides high-resolution timing for performance measurement
struct Candlestick {
std::string timestamp; // Time of the candlestick bar
double open; // Opening price
double high; // Highest price during the period
double low; // Lowest price during the period
double close; // Closing price
double volume; // Trading volume
};- Simple struct for efficient memory layout
- Using
doublefor financial data ensures sufficient precision std::stringfor timestamp (though not heavily used in calculations)
std::vector<Candlestick> readCSV(const std::string& filename) {
std::vector<Candlestick> data;
std::ifstream file(filename);
std::string line;
// Skip header
std::getline(file, line);
while (std::getline(file, line)) { // Read each line
std::stringstream ss(line); // Create string stream for parsing
std::string cell;
std::vector<std::string> row;
while (std::getline(ss, cell, ',')) { // Parse comma-separated values
row.push_back(cell);
}
if (row.size() >= 6) { // Ensure sufficient columns
Candlestick candle;
candle.timestamp = row[0];
candle.open = std::stod(row[1]); // Convert string to double
candle.high = std::stod(row[2]);
candle.low = std::stod(row[3]);
candle.close = std::stod(row[4]);
candle.volume = std::stod(row[5]);
data.push_back(candle); // Add to data vector
}
}
return data; // Return populated vector
}- Uses
std::ifstreamfor efficient file reading std::stringstreamfor parsing comma-separated valuesstd::stod()converts strings to doubles safely- Bounds checking prevents errors from malformed CSV
std::vector<std::vector<double>> calculateQuantitativeFeaturesParallel(const std::vector<Candlestick>& data) {
size_t n = data.size();
std::vector<std::vector<double>> features(n, std::vector<double>(101, 0.0)); // 101 features for each row
// Determine number of threads to use
unsigned int num_threads = std::thread::hardware_concurrency();
if (num_threads == 0) num_threads = 4; // fallback
// Calculate chunk size
size_t chunk_size = n / num_threads;
if (chunk_size == 0) chunk_size = 1; // Ensure at least 1 element per thread
// Vector to hold futures for thread synchronization
std::vector<std::future<void>> futures;std::thread::hardware_concurrency(): Gets the number of CPU cores- Calculates optimal chunk size for load balancing
std::future<void>: Used to track completion of asynchronous tasks- Pre-allocates memory with known dimensions for efficiency
for (unsigned int t = 0; t < num_threads; ++t) {
size_t start_idx = t * chunk_size;
size_t end_idx = (t == num_threads - 1) ? n : (t + 1) * chunk_size; // Last thread gets remainder
futures.push_back(std::async(std::launch::async, [&data, &features, start_idx, end_idx]() {
for (size_t i = start_idx; i < end_idx; ++i) {
const auto& candle = data[i]; // Reference to avoid copyingstd::async(): Launches asynchronous tasksstd::launch::async: Ensures tasks run in separate threads- Lambda capture by reference (
&data,&features) to share data - Proper boundary calculation ensures all elements are processed
// Basic price features (0-4)
features[i][0] = candle.close; // Close price
features[i][1] = candle.open; // Open price
features[i][2] = candle.high; // High price
features[i][3] = candle.low; // Low price
features[i][4] = candle.volume; // Volume
// Return calculations (5-7)
features[i][5] = (candle.open != 0.0) ? (candle.close - candle.open) / candle.open : 0.0; // Return
features[i][6] = (candle.open != 0.0) ? (candle.high - candle.low) / candle.open : 0.0; // True range
features[i][7] = (candle.high != candle.low) ?
(candle.close - candle.low) / (candle.high - candle.low) : 0.5; // Stochastic- Ternary operators prevent division by zero
- These are standard financial indicators:
- Return: percentage change from open to close
- True range: measure of volatility
- Stochastic: momentum indicator
// Simple moving averages and volatilities for different periods (8-22)
if (i >= 5) {
size_t start_ma_idx = (i >= 5) ? i - 5 : 0;
std::vector<double> recent_prices;
for (size_t j = start_ma_idx; j <= i; ++j) {
recent_prices.push_back(data[j].close);
}
double sum = std::accumulate(recent_prices.begin(), recent_prices.end(), 0.0);
features[i][8] = sum / recent_prices.size(); // 5-period SMA
double mean = features[i][8];
double variance = 0.0;
for (double p : recent_prices) {
variance += (p - mean) * (p - mean);
}
variance /= recent_prices.size();
features[i][9] = std::sqrt(variance); // 5-period volatility- Sliding window technique for efficient moving average calculation
std::accumulate()for efficient summation- Variance calculation for volatility measurement
- Bounds checking prevents index out of bounds errors
}));
}
// Wait for all threads to complete
for (auto& future : futures) {
future.wait(); // Block until thread completes
}
return features;
}future.wait(): Ensures all threads complete before continuing- Proper synchronization prevents race conditions
- All threads must complete before returning results
std::vector<double> calculateMovingAverage(const std::vector<double>& prices, int period) {
std::vector<double> ma;
if (prices.size() < period) {
return ma; // Early return if insufficient data
}
ma.reserve(prices.size() - period + 1); // Pre-allocate memory
// Calculate initial sum for the first window
double sum = 0.0;
for (int i = 0; i < period; ++i) {
sum += prices[i];
}
ma.push_back(sum / period);
// Use sliding window technique for efficiency
for (size_t i = period; i < prices.size(); ++i) {
sum = sum - prices[i - period] + prices[i]; // Subtract old, add new
ma.push_back(sum / period);
}
return ma;
}- Sliding window technique: O(n) complexity instead of O(n*m)
reserve(): Pre-allocates memory to avoid repeated allocations- Efficient sum update: subtracts old value, adds new value
- Significantly faster than recalculating the entire sum each time
std::unordered_map<std::string, std::vector<double>> calculateMultipleMovingAveragesParallel(
const std::vector<double>& prices,
const std::vector<int>& periods) {
std::unordered_map<std::string, std::vector<double>> results;
#ifdef _OPENMP
#pragma omp parallel for
#endif
for (int idx = 0; idx < static_cast<int>(periods.size()); ++idx) {
int period = periods[idx];
std::string key = "MA_" + std::to_string(period);
std::vector<double> ma_values = calculateMovingAverage(prices, period);
#ifdef _OPENMP
#pragma omp critical
#endif
{
results[key] = std::move(ma_values); // Move to avoid copying
}
}
return results;
}- Uses OpenMP pragmas for parallelization (when available)
#pragma omp parallel for: Parallelizes the loop#pragma omp critical: Ensures thread-safe access to shared mapstd::move(): Transfers ownership without copying data
- RAII (Resource Acquisition Is Initialization) for automatic cleanup
std::vectorfor dynamic arrays with automatic memory managementreserve()to pre-allocate memory and avoid reallocations- References to avoid unnecessary copying
- Sliding window technique for O(n) moving average calculation
- Pre-allocation of memory with
reserve() - Efficient algorithms from
<numeric>and<algorithm> - Move semantics to avoid expensive copies
std::async()for asynchronous task executionstd::futurefor thread synchronization- Proper bounds checking to prevent race conditions
- Thread-safe access patterns
- Range-based for loops for cleaner code
autofor type inference- Lambda expressions for inline functions
constcorrectness for safety
- Sliding window technique reduces complexity from O(n*m) to O(n)
- Threading allows parallel processing across CPU cores
- Memory pre-allocation minimizes allocation overhead
- Efficient algorithms from standard library
- Using
std::vectorfor cache-friendly contiguous storage - References to avoid unnecessary copying
- Pre-allocation to prevent memory fragmentation
- Bounds checking prevents array access violations
- Early returns for invalid inputs
- Proper resource cleanup through RAII
- Standard library functions ensure cross-platform compatibility
- Compiler directives for optional OpenMP support
- Standard C++ features that work across different compilers
When compiled with flags like -O3 -march=native -flto, the compiler:
- Inlines small functions to reduce call overhead
- Vectorizes loops when possible
- Optimizes memory access patterns
- Eliminates dead code and redundant calculations
This implementation demonstrates C++'s strengths in systems programming: fine-grained control over performance, efficient memory usage, and the ability to write both high-level abstractions and low-level optimizations.