Skip to content

Latest commit

 

History

History
303 lines (256 loc) · 11.8 KB

File metadata and controls

303 lines (256 loc) · 11.8 KB

C++ Implementation: Financial Data Processing with Threading and Optimization

Overview

This C++ implementation demonstrates high-performance financial data processing using threading and compiler optimizations. It calculates 100+ quantitative features for each row in a large financial dataset (3.9M+ records) using cache-efficient algorithms.

Code Breakdown

1. Header Includes and Standard Library

#include <iostream>    // Input/output streams
#include <fstream>     // File stream operations
#include <vector>      // Dynamic arrays
#include <string>      // String operations
#include <sstream>     // String stream for parsing
#include <chrono>      // Time measurement utilities
#include <numeric>     // Numeric algorithms (std::accumulate)
#include <unordered_map> // Hash map for storing results
#include <algorithm>   // Standard algorithms
#include <cmath>       // Mathematical functions
#include <thread>      // Threading support
#include <future>      // Future/promise for asynchronous operations
#include <valarray>    // Value arrays for mathematical operations
  • Each header provides specific functionality needed for the implementation
  • <thread> and <future> enable multi-threading capabilities
  • <chrono> provides high-resolution timing for performance measurement

2. Data Structure Definition

struct Candlestick {
    std::string timestamp;  // Time of the candlestick bar
    double open;            // Opening price
    double high;            // Highest price during the period
    double low;             // Lowest price during the period
    double close;           // Closing price
    double volume;          // Trading volume
};
  • Simple struct for efficient memory layout
  • Using double for financial data ensures sufficient precision
  • std::string for timestamp (though not heavily used in calculations)

3. CSV Reading Function

std::vector<Candlestick> readCSV(const std::string& filename) {
    std::vector<Candlestick> data;
    std::ifstream file(filename);
    std::string line;
    
    // Skip header
    std::getline(file, line);
    
    while (std::getline(file, line)) {        // Read each line
        std::stringstream ss(line);           // Create string stream for parsing
        std::string cell;
        std::vector<std::string> row;
        
        while (std::getline(ss, cell, ',')) { // Parse comma-separated values
            row.push_back(cell);
        }
        
        if (row.size() >= 6) {                // Ensure sufficient columns
            Candlestick candle;
            candle.timestamp = row[0];
            candle.open = std::stod(row[1]);  // Convert string to double
            candle.high = std::stod(row[2]);
            candle.low = std::stod(row[3]);
            candle.close = std::stod(row[4]);
            candle.volume = std::stod(row[5]);
            data.push_back(candle);           // Add to data vector
        }
    }
    
    return data;                              // Return populated vector
}
  • Uses std::ifstream for efficient file reading
  • std::stringstream for parsing comma-separated values
  • std::stod() converts strings to doubles safely
  • Bounds checking prevents errors from malformed CSV

4. Parallel Feature Calculation Function

std::vector<std::vector<double>> calculateQuantitativeFeaturesParallel(const std::vector<Candlestick>& data) {
    size_t n = data.size();
    std::vector<std::vector<double>> features(n, std::vector<double>(101, 0.0)); // 101 features for each row
    
    // Determine number of threads to use
    unsigned int num_threads = std::thread::hardware_concurrency();
    if (num_threads == 0) num_threads = 4; // fallback
    
    // Calculate chunk size
    size_t chunk_size = n / num_threads;
    if (chunk_size == 0) chunk_size = 1;  // Ensure at least 1 element per thread
    
    // Vector to hold futures for thread synchronization
    std::vector<std::future<void>> futures;
  • std::thread::hardware_concurrency(): Gets the number of CPU cores
  • Calculates optimal chunk size for load balancing
  • std::future<void>: Used to track completion of asynchronous tasks
  • Pre-allocates memory with known dimensions for efficiency

5. Threading Implementation

    for (unsigned int t = 0; t < num_threads; ++t) {
        size_t start_idx = t * chunk_size;
        size_t end_idx = (t == num_threads - 1) ? n : (t + 1) * chunk_size;  // Last thread gets remainder
        
        futures.push_back(std::async(std::launch::async, [&data, &features, start_idx, end_idx]() {
            for (size_t i = start_idx; i < end_idx; ++i) {
                const auto& candle = data[i];  // Reference to avoid copying
  • std::async(): Launches asynchronous tasks
  • std::launch::async: Ensures tasks run in separate threads
  • Lambda capture by reference (&data, &features) to share data
  • Proper boundary calculation ensures all elements are processed

6. Mathematical Feature Calculations

                // Basic price features (0-4)
                features[i][0] = candle.close;           // Close price
                features[i][1] = candle.open;            // Open price
                features[i][2] = candle.high;            // High price
                features[i][3] = candle.low;             // Low price
                features[i][4] = candle.volume;          // Volume
                
                // Return calculations (5-7)
                features[i][5] = (candle.open != 0.0) ? (candle.close - candle.open) / candle.open : 0.0; // Return
                features[i][6] = (candle.open != 0.0) ? (candle.high - candle.low) / candle.open : 0.0;   // True range
                features[i][7] = (candle.high != candle.low) ? 
                    (candle.close - candle.low) / (candle.high - candle.low) : 0.5; // Stochastic
  • Ternary operators prevent division by zero
  • These are standard financial indicators:
    • Return: percentage change from open to close
    • True range: measure of volatility
    • Stochastic: momentum indicator

7. Moving Average Calculations with Sliding Window

                // Simple moving averages and volatilities for different periods (8-22)
                if (i >= 5) {
                    size_t start_ma_idx = (i >= 5) ? i - 5 : 0;
                    std::vector<double> recent_prices;
                    for (size_t j = start_ma_idx; j <= i; ++j) {
                        recent_prices.push_back(data[j].close);
                    }
                    
                    double sum = std::accumulate(recent_prices.begin(), recent_prices.end(), 0.0);
                    features[i][8] = sum / recent_prices.size(); // 5-period SMA
                    
                    double mean = features[i][8];
                    double variance = 0.0;
                    for (double p : recent_prices) {
                        variance += (p - mean) * (p - mean);
                    }
                    variance /= recent_prices.size();
                    features[i][9] = std::sqrt(variance); // 5-period volatility
  • Sliding window technique for efficient moving average calculation
  • std::accumulate() for efficient summation
  • Variance calculation for volatility measurement
  • Bounds checking prevents index out of bounds errors

8. Threading Synchronization

        }));
    }
    
    // Wait for all threads to complete
    for (auto& future : futures) {
        future.wait();  // Block until thread completes
    }
    
    return features;
}
  • future.wait(): Ensures all threads complete before continuing
  • Proper synchronization prevents race conditions
  • All threads must complete before returning results

9. Optimized Moving Average Calculation

std::vector<double> calculateMovingAverage(const std::vector<double>& prices, int period) {
    std::vector<double> ma;
    if (prices.size() < period) {
        return ma;  // Early return if insufficient data
    }
    
    ma.reserve(prices.size() - period + 1);  // Pre-allocate memory
    
    // Calculate initial sum for the first window
    double sum = 0.0;
    for (int i = 0; i < period; ++i) {
        sum += prices[i];
    }
    ma.push_back(sum / period);
    
    // Use sliding window technique for efficiency
    for (size_t i = period; i < prices.size(); ++i) {
        sum = sum - prices[i - period] + prices[i];  // Subtract old, add new
        ma.push_back(sum / period);
    }
    
    return ma;
}
  • Sliding window technique: O(n) complexity instead of O(n*m)
  • reserve(): Pre-allocates memory to avoid repeated allocations
  • Efficient sum update: subtracts old value, adds new value
  • Significantly faster than recalculating the entire sum each time

10. Parallel Moving Average Calculation

std::unordered_map<std::string, std::vector<double>> calculateMultipleMovingAveragesParallel(
    const std::vector<double>& prices, 
    const std::vector<int>& periods) {
    
    std::unordered_map<std::string, std::vector<double>> results;
    
    #ifdef _OPENMP
    #pragma omp parallel for
    #endif
    for (int idx = 0; idx < static_cast<int>(periods.size()); ++idx) {
        int period = periods[idx];
        std::string key = "MA_" + std::to_string(period);
        std::vector<double> ma_values = calculateMovingAverage(prices, period);
        
        #ifdef _OPENMP
        #pragma omp critical
        #endif
        {
            results[key] = std::move(ma_values);  // Move to avoid copying
        }
    }
    
    return results;
}
  • Uses OpenMP pragmas for parallelization (when available)
  • #pragma omp parallel for: Parallelizes the loop
  • #pragma omp critical: Ensures thread-safe access to shared map
  • std::move(): Transfers ownership without copying data

Key C++ Concepts Demonstrated

1. Memory Management

  • RAII (Resource Acquisition Is Initialization) for automatic cleanup
  • std::vector for dynamic arrays with automatic memory management
  • reserve() to pre-allocate memory and avoid reallocations
  • References to avoid unnecessary copying

2. Performance Optimizations

  • Sliding window technique for O(n) moving average calculation
  • Pre-allocation of memory with reserve()
  • Efficient algorithms from <numeric> and <algorithm>
  • Move semantics to avoid expensive copies

3. Threading and Concurrency

  • std::async() for asynchronous task execution
  • std::future for thread synchronization
  • Proper bounds checking to prevent race conditions
  • Thread-safe access patterns

4. Modern C++ Features

  • Range-based for loops for cleaner code
  • auto for type inference
  • Lambda expressions for inline functions
  • const correctness for safety

Why This Approach Was Taken

1. Performance Optimization Strategy

  • Sliding window technique reduces complexity from O(n*m) to O(n)
  • Threading allows parallel processing across CPU cores
  • Memory pre-allocation minimizes allocation overhead
  • Efficient algorithms from standard library

2. Memory Layout Considerations

  • Using std::vector for cache-friendly contiguous storage
  • References to avoid unnecessary copying
  • Pre-allocation to prevent memory fragmentation

3. Error Handling

  • Bounds checking prevents array access violations
  • Early returns for invalid inputs
  • Proper resource cleanup through RAII

4. Portability

  • Standard library functions ensure cross-platform compatibility
  • Compiler directives for optional OpenMP support
  • Standard C++ features that work across different compilers

Compiler Optimizations

When compiled with flags like -O3 -march=native -flto, the compiler:

  • Inlines small functions to reduce call overhead
  • Vectorizes loops when possible
  • Optimizes memory access patterns
  • Eliminates dead code and redundant calculations

This implementation demonstrates C++'s strengths in systems programming: fine-grained control over performance, efficient memory usage, and the ability to write both high-level abstractions and low-level optimizations.