-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmoving_average_python_quant.py
More file actions
295 lines (246 loc) · 13.5 KB
/
moving_average_python_quant.py
File metadata and controls
295 lines (246 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import numpy as np
import pandas as pd
from numba import jit
import time
import talib # Technical analysis library
@jit(nopython=True)
def calculate_basic_features(opens, highs, lows, closes, volumes):
"""Calculate basic quantitative features for each row"""
n = len(closes)
features = np.zeros((n, 50)) # Start with 50 basic features
for i in range(n):
# Price-based features
features[i, 0] = closes[i] # Close price
features[i, 1] = opens[i] # Open price
features[i, 2] = highs[i] # High price
features[i, 3] = lows[i] # Low price
# Basic ratios and differences
features[i, 4] = (closes[i] - opens[i]) / opens[i] if opens[i] != 0 else 0 # Return
features[i, 5] = (highs[i] - lows[i]) / opens[i] if opens[i] != 0 else 0 # True range
features[i, 6] = (closes[i] - lows[i]) / (highs[i] - lows[i]) if (highs[i] - lows[i]) != 0 else 0.5 # Stochastic
features[i, 7] = volumes[i] if i < len(volumes) else 0 # Volume
# Simple moving statistics (if enough history)
if i >= 5:
recent_close = closes[max(0, i-5):i+1]
features[i, 8] = np.mean(recent_close) # 5-period SMA
features[i, 9] = np.std(recent_close) # 5-period volatility
features[i, 10] = (closes[i] - np.min(recent_close)) / (np.max(recent_close) - np.min(recent_close)) if (np.max(recent_close) - np.min(recent_close)) != 0 else 0 # 5-period percentile
if i >= 10:
recent_close_10 = closes[max(0, i-10):i+1]
features[i, 11] = np.mean(recent_close_10) # 10-period SMA
features[i, 12] = np.std(recent_close_10) # 10-period volatility
features[i, 13] = (closes[i] - np.min(recent_close_10)) / (np.max(recent_close_10) - np.min(recent_close_10)) if (np.max(recent_close_10) - np.min(recent_close_10)) != 0 else 0 # 10-period percentile
if i >= 20:
recent_close_20 = closes[max(0, i-20):i+1]
features[i, 14] = np.mean(recent_close_20) # 20-period SMA
features[i, 15] = np.std(recent_close_20) # 20-period volatility
features[i, 16] = (closes[i] - np.min(recent_close_20)) / (np.max(recent_close_20) - np.min(recent_close_20)) if (np.max(recent_close_20) - np.min(recent_close_20)) != 0 else 0 # 20-period percentile
# Momentum indicators
if i >= 1:
features[i, 17] = (closes[i] - closes[i-1]) / closes[i-1] if closes[i-1] != 0 else 0 # 1-period return
if i >= 3:
features[i, 18] = (closes[i] - closes[i-3]) / closes[i-3] if closes[i-3] != 0 else 0 # 3-period return
if i >= 5:
features[i, 19] = (closes[i] - closes[i-5]) / closes[i-5] if closes[i-5] != 0 else 0 # 5-period return
# Price position relative to recent highs/lows
if i >= 10:
recent_high_10 = np.max(highs[max(0, i-10):i+1])
recent_low_10 = np.min(lows[max(0, i-10):i+1])
features[i, 20] = (closes[i] - recent_low_10) / (recent_high_10 - recent_low_10) if (recent_high_10 - recent_low_10) != 0 else 0.5 # Position in 10-day range
features[i, 21] = (highs[i] - recent_low_10) / (recent_high_10 - recent_low_10) if (recent_high_10 - recent_low_10) != 0 else 0.5 # High position in 10-day range
features[i, 22] = (lows[i] - recent_low_10) / (recent_high_10 - recent_low_10) if (recent_high_10 - recent_low_10) != 0 else 0.5 # Low position in 10-day range
# Volatility measures
if i >= 10:
returns = np.diff(closes[max(0, i-10):i+1])
features[i, 23] = np.std(returns) if len(returns) > 1 else 0 # 10-period return volatility
features[i, 24] = np.mean(returns) # 10-period average return
# Range-based features
features[i, 25] = (highs[i] - lows[i]) / opens[i] if opens[i] != 0 else 0 # Daily range
features[i, 26] = (highs[i] - closes[i]) / opens[i] if opens[i] != 0 else 0 # Upper shadow
features[i, 27] = (closes[i] - lows[i]) / opens[i] if opens[i] != 0 else 0 # Lower shadow
features[i, 28] = abs(opens[i] - closes[i]) / opens[i] if opens[i] != 0 else 0 # Body size
# Log returns
if i >= 1 and closes[i-1] != 0:
features[i, 29] = np.log(closes[i] / closes[i-1])
else:
features[i, 29] = 0
# Higher moment features (using manual calculations since Numba doesn't support scipy.stats)
if i >= 20:
recent_returns = np.diff(closes[max(0, i-20):i+1])
if len(recent_returns) > 2:
mean_ret = np.mean(recent_returns)
std_ret = np.std(recent_returns)
if std_ret != 0:
# Manual skewness calculation
n = len(recent_returns)
skew_num = np.sum(((recent_returns - mean_ret) / std_ret) ** 3)
features[i, 30] = skew_num * n / ((n - 1) * (n - 2)) # Adjusted skewness
# Manual kurtosis calculation
kurt = np.mean(((recent_returns - mean_ret) / std_ret) ** 4)
features[i, 31] = kurt - 3 # Excess kurtosis
else:
features[i, 30] = 0
features[i, 31] = 0
else:
features[i, 30] = 0
features[i, 31] = 0
# Volume features (if available)
if i < len(volumes) and i >= 5:
recent_volumes = volumes[max(0, i-5):i+1]
features[i, 32] = volumes[i] / np.mean(recent_volumes) if np.mean(recent_volumes) != 0 else 1 # Volume ratio to recent avg
features[i, 33] = volumes[i] # Absolute volume
# RSI-like indicator
if i >= 14:
total_gain = 0.0
total_loss = 0.0
count = 0
for j in range(i-13, i+1):
if j > 0:
change = closes[j] - closes[j-1]
if change > 0:
total_gain += change
else:
total_loss += abs(change)
count += 1
avg_gain = total_gain / count if count > 0 else 0
avg_loss = total_loss / count if count > 0 else 0
if avg_loss != 0:
features[i, 34] = 100 - (100 / (1 + avg_gain / avg_loss)) # RSI approximation
else:
features[i, 34] = 100
# Bollinger Bands components
if i >= 20:
bb_mean = np.mean(closes[max(0, i-20):i+1])
bb_std = np.std(closes[max(0, i-20):i+1])
if bb_std != 0:
features[i, 35] = (closes[i] - bb_mean) / bb_std # Bollinger Band position
else:
features[i, 35] = 0
# Directional features
features[i, 36] = 1 if closes[i] > opens[i] else 0 # Bullish/Bearish
features[i, 37] = 1 if highs[i] > highs[max(0, i-1)] else 0 # New high
features[i, 38] = 1 if lows[i] < lows[max(0, i-1)] else 0 # New low
# Gap features
if i > 0:
features[i, 39] = (opens[i] - closes[i-1]) / closes[i-1] if closes[i-1] != 0 else 0 # Gap up/down
features[i, 40] = 1 if opens[i] > closes[i-1] else 0 # Gap up indicator
features[i, 41] = 1 if opens[i] < closes[i-1] else 0 # Gap down indicator
# Price level features
features[i, 42] = np.log(closes[i]) if closes[i] > 0 else 0 # Log price
features[i, 43] = closes[i] - opens[i] # Body difference
features[i, 44] = (highs[i] - max(opens[i], closes[i])) / opens[i] if opens[i] != 0 else 0 # Upper shadow normalized
features[i, 45] = (min(opens[i], closes[i]) - lows[i]) / opens[i] if opens[i] != 0 else 0 # Lower shadow normalized
# Acceleration features
if i >= 2:
prev_return = (closes[i-1] - closes[i-2]) / closes[i-2] if closes[i-2] != 0 else 0
curr_return = (closes[i] - closes[i-1]) / closes[i-1] if closes[i-1] != 0 else 0
features[i, 46] = curr_return - prev_return # Return acceleration
# Trend strength (manual linear regression since Numba doesn't support scipy.stats)
if i >= 10:
recent_closes = closes[max(0, i-10):i+1]
n = len(recent_closes)
if n > 1:
x = np.arange(n)
y = recent_closes
# Manual linear regression: slope = (n*sum(xy) - sum(x)*sum(y)) / (n*sum(x^2) - sum(x)^2)
sum_x = np.sum(x)
sum_y = np.sum(y)
sum_xy = np.sum(x * y)
sum_x2 = np.sum(x * x)
denominator = n * sum_x2 - sum_x * sum_x
if denominator != 0:
slope = (n * sum_xy - sum_x * sum_y) / denominator
features[i, 47] = slope # Trend direction and strength
else:
features[i, 47] = 0
else:
features[i, 47] = 0
# Support/resistance proxies
if i >= 10:
features[i, 48] = np.std(closes[max(0, i-10):i+1]) # Volatility as resistance strength proxy
features[i, 49] = np.mean(np.abs(np.diff(closes[max(0, i-10):i+1]))) # Mean absolute change
return features
def calculate_additional_features(opens, highs, lows, closes, volumes):
"""Calculate additional technical analysis features without TA-Lib to maintain Numba compatibility"""
n = len(closes)
additional_features = np.zeros((n, 51)) # Additional 51 features
# Simple moving averages of different periods
for i in range(n):
if i >= 14:
# RSI approximation (14-period)
gains = 0.0
losses = 0.0
for j in range(i-13, i+1):
if j > 0:
change = closes[j] - closes[j-1]
if change > 0:
gains += change
else:
losses += abs(change)
avg_gain = gains / 14
avg_loss = losses / 14
if avg_loss != 0:
rs = avg_gain / avg_loss
additional_features[i, 0] = 100 - (100 / (1 + rs)) # RSI
else:
additional_features[i, 0] = 50 # Neutral value when no losses
if i >= 12:
# Simple MACD approximation (12, 26 period)
ema_short = np.mean(closes[max(0, i-11):i+1]) # Simplified EMA
ema_long = np.mean(closes[max(0, i-25):i+1]) # Simplified EMA
additional_features[i, 1] = ema_short - ema_long # MACD line approximation
# Volatility (ATR approximation)
if i >= 14:
tr_list = []
for j in range(max(1, i-14), i+1):
if j > 0:
h_l = highs[j] - lows[j]
h_pc = abs(highs[j] - closes[j-1])
l_pc = abs(lows[j] - closes[j-1])
true_range = max(h_l, h_pc, l_pc)
tr_list.append(true_range)
if tr_list:
additional_features[i, 11] = np.mean(tr_list) # ATR approximation
# Volume indicators
if i >= 5 and len(volumes) > i:
additional_features[i, 8] = volumes[i] # Volume
additional_features[i, 9] = volumes[i] / np.mean(volumes[max(0, i-5):i+1]) if np.mean(volumes[max(0, i-5):i+1]) != 0 else 1 # Volume ratio
# More features can be added here...
return additional_features
def main():
start_time = time.time()
print("Reading CSV file with pandas...")
df = pd.read_csv("USDJPY2.csv")
print(f"Loaded {len(df)} records.")
# Extract price arrays
opens = df['Open'].values.astype(np.float64)
highs = df['High'].values.astype(np.float64)
lows = df['Low'].values.astype(np.float64)
closes = df['Close'].values.astype(np.float64)
volumes = df['volume'].values.astype(np.float64) if 'volume' in df.columns else np.ones(len(df)) * 1000
print("Calculating 100+ quantitative features for each row...")
basic_features = calculate_basic_features(opens, highs, lows, closes, volumes)
# Calculate additional TA features without TA-Lib to maintain Numba compatibility
ta_features = calculate_additional_features(opens, highs, lows, closes, volumes)
# Combine all features
all_features = np.concatenate([basic_features, ta_features], axis=1)
print(f"Calculated {all_features.shape[1]} quantitative features for {all_features.shape[0]} rows.")
# Calculate moving averages for periods 200-220
print("Calculating moving averages for periods 200-220...")
ma_periods = list(range(200, 221)) # 200 to 220 inclusive
for period in ma_periods:
# Calculate MA for the first feature as an example (in practice you might calculate for multiple features)
if len(closes) >= period:
ma_values = np.zeros(len(closes) - period + 1)
window_sum = np.sum(closes[:period])
ma_values[0] = window_sum / period
for i in range(1, len(ma_values)):
window_sum = window_sum - closes[i - 1] + closes[i + period - 1]
ma_values[i] = window_sum / period
print(f"Calculated {len(ma_values)} MA_{period} values.")
end_time = time.time()
duration = (end_time - start_time) * 1000 # Convert to milliseconds
print(f"Total execution time: {duration:.2f} ms")
print(f"Features shape: {all_features.shape}")
if __name__ == "__main__":
main()