import logging import os import sys import MetaTrader5 as mt5 import pandas as pd import numpy as np import matplotlib # Use a non-interactive backend for matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import time import mplfinance as mpf # For candlestick charts from datetime import datetime from deap import base, creator, tools, algorithms import random import warnings from multiprocessing import cpu_count from functools import partial import gc import xgboost as xgb from sklearn.preprocessing import MinMaxScaler from sklearn.model_selection import cross_val_score, train_test_split, StratifiedKFold from sklearn.metrics import ( mean_squared_error, mean_absolute_error, r2_score, mean_absolute_percentage_error, classification_report, roc_auc_score ) from ta.volatility import AverageTrueRange, BollingerBands from ta.momentum import RSIIndicator, StochasticOscillator, WilliamsRIndicator, ROCIndicator from ta.trend import MACD, EMAIndicator, CCIIndicator, ADXIndicator, IchimokuIndicator from sklearn.ensemble import StackingRegressor, RandomForestRegressor, GradientBoostingRegressor from sklearn.linear_model import LinearRegression import joblib import seaborn as sns import shap from statsmodels.tsa.stattools import adfuller from tabulate import tabulate warnings.filterwarnings('ignore') # --------------------- Logging Configuration --------------------- class ExcludeInfoFilter(logging.Filter): def filter(self, record): exclude_keywords = ['Classification Report', 'ROC AUC Score'] return not any(keyword in record.getMessage() for keyword in exclude_keywords) # Remove existing handlers if any for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) # Create logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Capture all logs # Create FileHandler file_handler = logging.FileHandler("strategy_optimization.log") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) # Create StreamHandler stream_handler = logging.StreamHandler(sys.stdout) stream_handler.setLevel(logging.INFO) stream_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) stream_handler.addFilter(ExcludeInfoFilter()) logger.addHandler(file_handler) logger.addHandler(stream_handler) # --------------------- Parameters and Global Variables --------------------- start_date = datetime(2015, 1, 1) end_date = datetime.today() split_date = datetime(2021, 1, 1) backtest_start_date = datetime(2022, 1, 1) backtest_end_date = datetime.today() transaction_cost = 0.0002 slippage = 0.0001 risk_free_rate = 0.02 / 252 time_steps = 30 timeframes_to_process = [ mt5.TIMEFRAME_D1, ] show_plots = True my_directory = '1BBa' plot_base_dir = "plots" current_date = datetime.today().strftime('%Y-%m-%d') num_cores_to_use = 6 currency_pairs = ["AUDUSD"] timeframes_dict = { mt5.TIMEFRAME_M1: "M1", mt5.TIMEFRAME_M2: "M2", mt5.TIMEFRAME_M3: "M3", mt5.TIMEFRAME_M4: "M4", mt5.TIMEFRAME_M5: "M5", mt5.TIMEFRAME_M6: "M6", mt5.TIMEFRAME_M10: "M10", mt5.TIMEFRAME_M12: "M12", mt5.TIMEFRAME_M15: "M15", mt5.TIMEFRAME_M20: "M20", mt5.TIMEFRAME_M30: "M30", mt5.TIMEFRAME_H1: "H1", mt5.TIMEFRAME_H2: "H2", mt5.TIMEFRAME_H3: "H3", mt5.TIMEFRAME_H4: "H4", mt5.TIMEFRAME_H6: "H6", mt5.TIMEFRAME_H8: "H8", mt5.TIMEFRAME_H12: "H12", mt5.TIMEFRAME_D1: "D1", mt5.TIMEFRAME_W1: "W1", mt5.TIMEFRAME_MN1: "MN1", } ga_base_seed = 41 ga_pop_size = 150 ga_num_generations = 10 ga_cxpb = 0.7 ga_mutpb = 0.2 ga_mutate_indpb = 0.3 ga_mate_indpb = 0.9 xgb_n_estimators = 200 xgb_max_depth = 20 xgb_learning_rate = 0.1 xgb_random_state = ga_base_seed # Initialize DEAP framework creator.create('FitnessMax', base.Fitness, weights=(1.0,)) creator.create('Individual', list, fitness=creator.FitnessMax) sl_features = [ 'RSI', 'MACD', 'ATR', 'BB_High', 'BB_Low', 'EMA_14', 'SMA_14', 'DayOfWeek', 'WeekOfYear', 'Month', 'ROC_14', 'Momentum_14', 'WilliamsR', 'Ichimoku_A', 'Ichimoku_B', 'Corr_Close_RSI_14', 'Return_Lag_1', 'Return_Lag_2', 'Return_Lag_3', 'RSI_MACD', 'ATR_Close', 'Close_Stationary', 'High_Stationary', 'Low_Stationary' ] window_values_ga = np.arange(20, 252, 1) multiplier_values_ga = np.arange(0.8, 3.0, 0.1) tp_factor_values_ga = np.arange(1.2, 5.61, 0.05) sl_factor_values_ga = np.arange(0.1, 1.21, 0.05) sl_max_depth_values = [3, 5, 7] sl_learning_rate_values = [0.01, 0.05, 0.1] sl_n_estimators_values = [100, 200, 300] window_values_ga_list = window_values_ga.tolist() multiplier_values_ga_list = multiplier_values_ga.tolist() tp_factor_values_ga_list = tp_factor_values_ga.tolist() sl_factor_values_ga_list = sl_factor_values_ga.tolist() sl_max_depth_list = sl_max_depth_values sl_learning_rate_list = sl_learning_rate_values sl_n_estimators_list = sl_n_estimators_values scaler_X_selected_dict = {} pip_scale_factor = 100000 # --------------------- Helper Functions --------------------- def get_conversion_rate(from_cur, to_cur): direct_symbol = to_cur + from_cur reverse_symbol = from_cur + to_cur symbol_info_direct = mt5.symbol_info(direct_symbol) if symbol_info_direct is not None: mt5.symbol_select(direct_symbol, True) tick = mt5.symbol_info_tick(direct_symbol) if tick is not None: if direct_symbol.startswith(to_cur): rate = 1.0 / tick.ask return rate else: return tick.ask symbol_info_reverse = mt5.symbol_info(reverse_symbol) if symbol_info_reverse is not None: mt5.symbol_select(reverse_symbol, True) tick = mt5.symbol_info_tick(reverse_symbol) if tick is not None: if reverse_symbol.startswith(to_cur): rate = 1.0 / tick.ask return rate else: return tick.ask return None def apply_transformations(series): transformations = {} transformations['first_diff'] = series.diff().fillna(0) transformations['pct_change'] = series.pct_change().fillna(0) transformations['log'] = np.log(series.replace(0, np.nan)).fillna(0) return transformations def replace_inf_nan(series): series = series.replace([np.inf, -np.inf], np.nan) series = series.fillna(method='ffill').fillna(method='bfill') return series def select_best_transformation(series, transformations): adf_results = {} for name, transformed_series in transformations.items(): transformed_series_clean = transformed_series.dropna() if len(transformed_series_clean) < 10: adf_results[name] = 1.0 continue result = adfuller(transformed_series_clean, autolag='AIC') adf_results[name] = result[1] stationary_transforms = {k: v for k, v in adf_results.items() if v < 0.05} if not stationary_transforms: logging.warning("No stationary transformation found. Using first_diff as default.") best_transform = 'first_diff' else: best_transform = min(stationary_transforms, key=stationary_transforms.get) best_series = transformations[best_transform] return best_transform, best_series def get_mt5_data(symbol, timeframe, start, end): timeframe_name = timeframes_dict.get(timeframe, 'UNKNOWN') file_name = f"{symbol}_{timeframe_name}_{start.date()}_to_{end.date()}.csv" if os.path.exists(file_name): logging.info(f"Loading data for {symbol} from {file_name}...") try: data = pd.read_csv(file_name, index_col='Date', parse_dates=True) if 'Close' not in data.columns and symbol not in data.columns: logging.error(f"Columns 'Close' or '{symbol}' not found in CSV file {file_name}") return pd.DataFrame() return data except Exception as e: logging.error(f"Error reading CSV file {file_name}: {e}") return pd.DataFrame() else: logging.info(f"Loading data for {symbol} from MT5...") try: rates = mt5.copy_rates_range(symbol, timeframe, start, end) if rates is None or len(rates) == 0: logging.error(f"No data for symbol {symbol}.") return pd.DataFrame() data = pd.DataFrame(rates) data['time'] = pd.to_datetime(data['time'], unit='s') data.rename(columns={ "time": "Date", "open": "Open", "high": "High", "low": "Low", "close": "Close", "tick_volume": "Volume", "spread": "Spread", "real_volume": "RealVolume" }, inplace=True) data.set_index('Date', inplace=True) return data except Exception as e: logging.error(f"Error loading data for {symbol}: {e}") return pd.DataFrame() # --------------------- StrategyEvaluator Class --------------------- class StrategyEvaluator: def __init__(self, symbol, df, train, models, scaler_y_high, scaler_y_low, scaler_y_close, prediction_columns=['Ensemble_Predicted_Close_Future']): self.symbol = symbol self.df = df self.train = train self.models = models self.scaler_y_high = scaler_y_high self.scaler_y_low = scaler_y_low self.scaler_y_close = scaler_y_close self.prediction_columns = prediction_columns self.trades_dict = {col: [] for col in prediction_columns} self.symbol_info = mt5.symbol_info(self.symbol) if self.symbol_info is None: logging.error(f"Cannot retrieve symbol info for {self.symbol}. Using default digits=5.") self.point = 0.00001 self.spread = 0 self.volume_min = 0.01 self.volume_max = 500.0 self.volume_step = 0.01 self.swap_long = 0.0 self.swap_short = 0.0 self.trade_tick_value = 0.0 self.trade_tick_value_profit = 0.0 self.trade_tick_value_loss = 0.0 self.contract_size = 100000.0 self.leverage = 30.0 self.deposit_currency = "EUR" self.currency_margin = "EUR" else: self.point = self.symbol_info.point self.spread = self.symbol_info.spread * self.point self.volume_min = self.symbol_info.volume_min self.volume_max = self.symbol_info.volume_max self.volume_step = self.symbol_info.volume_step self.swap_long = self.symbol_info.swap_long self.swap_short = self.symbol_info.swap_short self.trade_tick_value = self.symbol_info.trade_tick_value self.trade_tick_value_profit = self.symbol_info.trade_tick_value_profit self.trade_tick_value_loss = self.symbol_info.trade_tick_value_loss self.contract_size = self.symbol_info.trade_contract_size self.leverage = 30.0 self.deposit_currency = "EUR" self.currency_margin = self.symbol_info.currency_margin global pip_scale_factor if self.symbol_info.digits: digits = self.symbol_info.digits pip_scale_factor = 10 ** digits point_in_string = f"{self.point:f}" point_in_string = point_in_string.rstrip('0') if point_in_string.endswith('.'): point_in_string += '0' logging.info(f"Using {self.symbol} digits = {digits}. Point: {point_in_string}") if self.deposit_currency == "EUR" and self.currency_margin != "EUR": conversion_rate = get_conversion_rate(self.currency_margin, self.deposit_currency) if conversion_rate is None: logging.warning( f"Cannot find conversion rate from {self.currency_margin} to {self.deposit_currency}. Using 1.0 as fallback.") self.currency_conversion_rate = 1.0 else: self.currency_conversion_rate = conversion_rate logging.info( f"Conversion rate from {self.currency_margin} to {self.deposit_currency}: {self.currency_conversion_rate}") else: self.currency_conversion_rate = 1.0 def evaluate(self, individual_params, classifier_sl=None, sl_threshold=0.7, backtest_data=None, data_source=None): sortino_ratios = [] try: logging.debug(f"Evaluating individual parameters: {individual_params}") window, multiplier, tp_factor, sl_factor = map(float, individual_params) window = int(window) if window >= len(self.df) or window < 10: logging.warning(f"Invalid window size: {window}. Assigning minimal fitness.") sortino_ratios = [1e-10] * len(self.prediction_columns) return tuple(sortino_ratios) df_to_use = backtest_data if backtest_data is not None else self.df if 'Ask' not in df_to_use.columns or 'Bid' not in df_to_use.columns: logging.error("Missing 'Ask' or 'Bid' column in DataFrame.") return tuple([1e-10] * len(self.prediction_columns)) for prediction_column in self.prediction_columns: rolling_mean = df_to_use[prediction_column].rolling(window).mean().shift(1).values rolling_std = df_to_use[prediction_column].rolling(window).std().shift(1).values upper = rolling_mean + multiplier * rolling_std lower = rolling_mean - multiplier * rolling_std pc_shift2 = df_to_use[prediction_column].shift(2).values pc_shift1 = df_to_use[prediction_column].shift(1).values pc = df_to_use[prediction_column].values condition_buy = ( (pc_shift2 < pc_shift1) & (pc_shift1 < pc) & (pc <= upper) & (lower < np.roll(lower, 1)) ) condition_sell = ( (pc_shift2 > pc_shift1) & (pc_shift1 > pc) & (pc >= lower) & (upper > np.roll(upper, 1)) ) signals = np.zeros(len(df_to_use)) signals[condition_buy] = 1 signals[condition_sell] = -1 positions = np.zeros(len(df_to_use)) open_position = False entry_price = 0.0 take_profit = 0.0 stop_loss = 0.0 entry_index = 0 trade_type = 0 trades_list = [] pnl_history = [] time_in_profit = 0 time_in_loss = 0 for i in range(len(signals)): if i < 2: positions[i] = 0 continue if not open_position: signal = signals[i] if signal != 0: if classifier_sl is not None: if not set(sl_features).issubset(df_to_use.columns): logging.error("SL features missing.") positions[i] = 0 continue entry_features = df_to_use.iloc[i][sl_features].values.reshape(1, -1) if 'Close_SL' not in scaler_X_selected_dict: logging.error("SL scaler not found.") positions[i] = 0 continue entry_features_scaled = scaler_X_selected_dict['Close_SL'].transform(entry_features) sl_proba = classifier_sl.predict_proba(entry_features_scaled)[0, 1] if sl_proba >= sl_threshold: positions[i] = 0 continue else: positions[i] = signal else: positions[i] = signal if signal == 1: entry_price = df_to_use['Ask'].iloc[i] else: entry_price = df_to_use['Bid'].iloc[i] stops_level_points = self.symbol_info.trade_stops_level * self.point if self.symbol_info else self.point * 10 if stops_level_points < self.point * 10: stops_level_points = self.point * 10 take_profit = entry_price + (tp_factor * rolling_std[i] * signal) stop_loss = entry_price - (sl_factor * rolling_std[i] * signal) if (take_profit - entry_price) * signal < stops_level_points * signal: take_profit = entry_price + (stops_level_points * signal) if (entry_price - stop_loss) * signal < stops_level_points * signal: stop_loss = entry_price - (stops_level_points * signal) open_position = True entry_index = i trade_type = signal time_in_profit = 0 time_in_loss = 0 pnl_history = [] else: positions[i] = 0 else: positions[i] = positions[i - 1] if trade_type == 1: current_high = df_to_use['High'].iloc[i] current_low = df_to_use['Low'].iloc[i] else: current_high = df_to_use['High'].iloc[i] current_low = df_to_use['Low'].iloc[i] if trade_type == 1: unrealized_pnl = (current_high - entry_price) / self.point else: unrealized_pnl = (entry_price - current_low) / self.point if unrealized_pnl > 0: pnl_history.append(1) time_in_profit += 1 elif unrealized_pnl < 0: pnl_history.append(-1) time_in_loss += 1 else: pnl_history.append(0) exit_reason = None exit_price = None exit_index = None if trade_type == 1: if current_high >= take_profit: exit_reason = 'TP' exit_price = take_profit exit_index = i elif current_low <= stop_loss: exit_reason = 'SL' exit_price = stop_loss exit_index = i elif signals[i] == -1: exit_reason = 'Opposite Signal' exit_price = current_low exit_index = i elif i == len(signals) - 1: exit_reason = 'Close at End' exit_price = current_high exit_index = i else: if current_low <= take_profit: exit_reason = 'TP' exit_price = take_profit exit_index = i elif current_high >= stop_loss: exit_reason = 'SL' exit_price = stop_loss exit_index = i elif signals[i] == 1: exit_reason = 'Opposite Signal' exit_price = current_high exit_index = i elif i == len(signals) - 1: exit_reason = 'Close at End' exit_price = current_low exit_index = i if exit_reason: positions[i] = 0 open_position = False trade_duration = exit_index - entry_index trade_duration = trade_duration if trade_duration > 0 else 1 half_duration = (trade_duration + 1) // 2 first_half = pnl_history[:half_duration] second_half = pnl_history[half_duration:] time_in_profit_first_half = first_half.count(1) time_in_loss_first_half = first_half.count(-1) time_in_profit_second_half = second_half.count(1) time_in_loss_second_half = second_half.count(-1) if trade_type == 1: points = (exit_price - entry_price) / self.point else: points = (entry_price - exit_price) / self.point points_eur = points * self.currency_conversion_rate total_time_first_half = len(first_half) total_time_second_half = len(second_half) profit_time_ratio_first_half = ( time_in_profit_first_half / total_time_first_half) if total_time_first_half > 0 else 0 loss_time_ratio_first_half = ( time_in_loss_first_half / total_time_first_half) if total_time_first_half > 0 else 0 profit_time_ratio_second_half = ( time_in_profit_second_half / total_time_second_half) if total_time_second_half > 0 else 0 loss_time_ratio_second_half = ( time_in_loss_second_half / total_time_second_half) if total_time_second_half > 0 else 0 trades_list.append({ 'Entry_Index': entry_index, 'Exit_Index': exit_index, 'Entry_Date': df_to_use.index[entry_index], 'Exit_Date': df_to_use.index[exit_index], 'Entry_Price': entry_price, 'Exit_Price': exit_price, 'Trade_Type': trade_type, 'Exit_Reason': exit_reason, 'Hit_SL': 1 if exit_reason == 'SL' else 0, 'Points': points_eur, 'Duration': trade_duration, 'Time_in_Profit_First_Half': time_in_profit_first_half, 'Time_in_Loss_First_Half': time_in_loss_first_half, 'Time_in_Profit_Second_Half': time_in_profit_second_half, 'Time_in_Loss_Second_Half': time_in_loss_second_half, 'Profit_Time_Ratio_First_Half': profit_time_ratio_first_half, 'Loss_Time_Ratio_First_Half': loss_time_ratio_first_half, 'Profit_Time_Ratio_Second_Half': profit_time_ratio_second_half, 'Loss_Time_Ratio_Second_Half': loss_time_ratio_second_half, 'Profit_Time_Ratio': (time_in_profit / trade_duration) if trade_duration > 0 else 0, 'Loss_Time_Ratio': (time_in_loss / trade_duration) if trade_duration > 0 else 0, 'Time_in_Profit': time_in_profit, 'Time_in_Loss': time_in_loss }) market_returns = df_to_use['Close'].pct_change().fillna(0).values strategy_returns = np.roll(positions, 1) * market_returns trades = np.abs(np.diff(positions, prepend=0)) net_returns = strategy_returns * self.currency_conversion_rate - trades * transaction_cost - trades * slippage # Change made here: Include data_source in column name df_to_use[f'Strategy_Return_Net_{prediction_column}_({data_source})'] = net_returns self.trades_dict[prediction_column] = trades_list net_returns_series = pd.Series(net_returns, index=df_to_use.index) target_return = 0 downside_returns = net_returns_series[net_returns_series < target_return] downside_std = downside_returns.std() sortino_ratio = ( (net_returns_series.mean() - risk_free_rate) / downside_std if downside_std != 0 else 0 ) if sortino_ratio <= 0: sortino_ratio = 1e-10 sortino_ratios.append(sortino_ratio) if len(sortino_ratios) != len(self.prediction_columns): logging.error("Sortino ratios length mismatch. Assigning minimal fitness.") sortino_ratios = [1e-10] * len(self.prediction_columns) return tuple(sortino_ratios) except KeyError as ke: logging.error(f"KeyError in evaluate function: {ke}") return tuple([1e-10] * len(self.prediction_columns)) except Exception as e: logging.error(f"Error in evaluate function: {e}") return tuple([1e-10] * len(self.prediction_columns)) # --------------------- Plotting Functions --------------------- if show_plots: def plot_error_distribution(test, symbol, timeframe_name, prediction_method, plot_dir, data_source): try: # Adjusted column name to include data_source error_col = f'Strategy_Return_Net_{prediction_method}_({data_source})' if error_col not in test.columns: logging.error(f"'{error_col}' column is missing in the test DataFrame.") return errors = test[error_col] errors_in_pips = errors * pip_scale_factor plt.figure(figsize=(10, 6)) plt.hist(errors_in_pips, bins=50, alpha=0.7, color='blue', edgecolor='black') plt.title( f'Distribution of Prediction Errors for {symbol} ({timeframe_name})\nPrediction Method: {prediction_method}_({data_source})') plt.xlabel('Error (Actual - Predicted) in Pips') plt.ylabel('Frequency') plt.grid(True) plt.savefig(os.path.join(plot_dir, f'error_distribution_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() except Exception as e: logging.error(f"Error plotting error distribution: {e}") def plot_residuals(test, symbol, timeframe_name, prediction_method, plot_dir, data_source): try: # Adjusted column name to include data_source error_col = f'Strategy_Return_Net_{prediction_method}_({data_source})' if error_col not in test.columns: logging.error(f"'{error_col}' column is missing in the test DataFrame.") return residuals = test[error_col] residuals_in_pips = residuals * pip_scale_factor plt.figure(figsize=(14, 7)) plt.scatter(test.index, residuals_in_pips, alpha=0.5) plt.title(f'Residuals Over Time for {symbol} ({timeframe_name}) - {prediction_method}_({data_source})') plt.xlabel('Date') plt.ylabel('Residual (Actual - Predicted) in Pips') plt.grid(True) plt.savefig(os.path.join(plot_dir, f'residuals_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() except Exception as e: logging.error(f"Error plotting residuals: {e}") def backtest_strategy(test, symbol, timeframe_name, prediction_method, label_suffix, plot_dir, data_source): try: net_return_column = f'Strategy_Return_Net_{prediction_method}_({data_source})' if net_return_column not in test.columns: logging.error(f"'{net_return_column}' column is missing in the test DataFrame.") return cumulative_returns = (1 + test[net_return_column].fillna(0)).cumprod() - 1 plt.figure(figsize=(14, 7)) plt.plot(test.index, cumulative_returns, label=f'Cumulative Return ({label_suffix})', color='purple') plt.title(f'Cumulative Return for {symbol} ({timeframe_name}) - {prediction_method}_({data_source})') plt.xlabel('Date') plt.ylabel('Cumulative Return') plt.legend() plt.grid(True) plt.savefig(os.path.join(plot_dir, f'cumulative_returns_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() except Exception as e: logging.error(f"Error in backtesting: {e}") EXIT_REASON_PALETTE = { 'TP': 'green', 'SL': 'red', 'Opposite Signal (Profit)': 'lime', 'Opposite Signal (Loss)': 'darkred', 'Close at End': 'blue', } PROFIT_LOSS_PALETTE = { 'Profit': 'green', 'Loss': 'red' } def plot_trade_metrics(trades_list, symbol, timeframe_name, prediction_method, plot_dir, data_source): try: df_trades = pd.DataFrame(trades_list) if df_trades.empty: logging.warning(f"No trades to plot for {symbol} ({timeframe_name}) - {prediction_method}_({data_source}).") return df_trades['Exit_Reason_Detailed'] = df_trades.apply( lambda row: 'Opposite Signal (Profit)' if row['Exit_Reason'] == 'Opposite Signal' and row['Points'] > 0 else ('Opposite Signal (Loss)' if row['Exit_Reason'] == 'Opposite Signal' and row['Points'] <= 0 else row['Exit_Reason']), axis=1 ) plt.figure(figsize=(12, 6)) sns.barplot(x='Exit_Reason_Detailed', y='Points', data=df_trades, palette=EXIT_REASON_PALETTE) plt.title(f'Position Lengths in Pips for {symbol} ({timeframe_name}) - {prediction_method}_({data_source})') plt.xlabel('Exit Reason') plt.ylabel('Points') plt.tight_layout() plt.savefig(os.path.join(plot_dir, f'position_lengths_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Number of Winning and Losing Trades num_winning_trades = len(df_trades[df_trades['Points'] > 0]) num_losing_trades = len(df_trades[df_trades['Points'] <= 0]) total_pips_winning = df_trades[df_trades['Points'] > 0]['Points'].sum() total_pips_losing = df_trades[df_trades['Points'] <= 0]['Points'].sum() fig, ax1 = plt.subplots(figsize=(10, 6)) bar_width = 0.35 indices = np.arange(2) bars1 = ax1.bar(indices - bar_width / 2, [num_winning_trades, num_losing_trades], bar_width, label='Number of Trades', color=['green', 'red'], alpha=0.7, edgecolor='black') ax1.set_xlabel('Trade Outcome', fontsize=14) ax1.set_ylabel('Number of Trades', color='black', fontsize=14) ax1.set_title( f'Number of and Total Pips in Winning and Losing Trades\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) ax1.set_xticks(indices) ax1.set_xticklabels(['Winning Trades', 'Losing Trades']) ax1.tick_params(axis='y', labelcolor='black') ax2 = ax1.twinx() bars2 = ax2.bar(indices + bar_width / 2, [total_pips_winning, total_pips_losing], bar_width, label='Total Pips', color=['darkgreen', 'darkred'], alpha=0.7, edgecolor='black') ax2.set_ylabel('Total Pips', color='black', fontsize=14) ax2.tick_params(axis='y', labelcolor='black') lines1, labels1 = ax1.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels() ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper right') for bar in bars1: height = bar.get_height() ax1.annotate(f'{height}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), textcoords="offset points", ha='center', va='bottom', fontsize=10, fontweight='bold') for bar in bars2: height = bar.get_height() ax2.annotate(f'{height:.1f}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3), textcoords="offset points", ha='center', va='bottom', fontsize=10, fontweight='bold') plt.grid(True, axis='y', linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(os.path.join(plot_dir, f'winning_losing_trades_and_pips_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Profit and Loss Distribution df_trades['Profit_Loss'] = df_trades['Points'].apply(lambda x: 'Profit' if x > 0 else 'Loss') plt.figure(figsize=(12, 6)) sns.histplot( data=df_trades, x='Points', hue='Profit_Loss', multiple='stack', bins=50, palette=PROFIT_LOSS_PALETTE, alpha=0.7 ) plt.title(f'Profit and Loss Distribution (in Points)\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) plt.xlabel('Profit/Loss (Points)', fontsize=14) plt.ylabel('Number of Trades', fontsize=14) plt.legend(title='Trade Outcome') plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig( os.path.join(plot_dir, f'profit_loss_distribution_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Cumulative Profit Over Time df_trades_sorted = df_trades.sort_values('Exit_Date') df_trades_sorted['Cumulative_Points'] = df_trades_sorted['Points'].cumsum() plt.figure(figsize=(14, 7)) plt.plot(df_trades_sorted['Exit_Date'], df_trades_sorted['Cumulative_Points'], label='Cumulative Profit (Points)', color='purple', linewidth=2) plt.title(f'Cumulative Profit Over Time\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) plt.xlabel('Date', fontsize=14) plt.ylabel('Cumulative Profit (Points)', fontsize=14) plt.legend() plt.grid(True) plt.savefig(os.path.join(plot_dir, f'cumulative_profit_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Average Time in Profit and Loss for Profitable Trades profitable_trades = df_trades[df_trades['Points'] > 0] if not profitable_trades.empty: avg_time_in_profit = profitable_trades['Time_in_Profit'].mean() avg_time_in_loss = profitable_trades['Time_in_Loss'].mean() labels = ['Time in Profit', 'Time in Loss'] times = [avg_time_in_profit, avg_time_in_loss] plt.figure(figsize=(8, 6)) plt.bar(labels, times, color=['green', 'red'], alpha=0.7, edgecolor='black') plt.title( f'Average Time in Profit and Loss for Profitable Trades\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) plt.ylabel('Time (Number of Periods)', fontsize=14) plt.grid(True, axis='y', linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig( os.path.join(plot_dir, f'avg_time_profit_loss_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Average Time in Profit and Loss for Losing Trades losing_trades = df_trades[df_trades['Points'] <= 0] if not losing_trades.empty: avg_time_in_profit_loss = losing_trades['Time_in_Profit'].mean() avg_time_in_loss_loss = losing_trades['Time_in_Loss'].mean() labels = ['Time in Profit', 'Time in Loss'] times = [avg_time_in_profit_loss, avg_time_in_loss_loss] plt.figure(figsize=(8, 6)) plt.bar(labels, times, color=['green', 'red'], alpha=0.7, edgecolor='black') plt.title( f'Average Time in Profit and Loss for Losing Trades\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) plt.ylabel('Time (Number of Periods)', fontsize=14) plt.grid(True, axis='y', linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(os.path.join(plot_dir, f'avg_time_profit_loss_losing_trades_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Trade Duration Distribution plt.figure(figsize=(12, 6)) sns.histplot(df_trades['Duration'], bins=50, kde=True, color='orange', alpha=0.7) plt.title(f'Trade Duration Distribution\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) plt.xlabel('Duration (Number of Periods)', fontsize=14) plt.ylabel('Number of Trades', fontsize=14) plt.grid(True, linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(os.path.join(plot_dir, f'trade_duration_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Boxplot for Time in Profit and Loss - First and Second Half plt.figure(figsize=(14, 7)) sns.boxplot(data=df_trades[['Time_in_Profit_First_Half', 'Time_in_Loss_First_Half', 'Time_in_Profit_Second_Half', 'Time_in_Loss_Second_Half']], palette='Set3') plt.title( f'Time in Profit and Loss - First and Second Half\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) plt.xlabel('Trade Phase') plt.ylabel('Time (Number of Periods)') plt.tight_layout() plt.savefig( os.path.join(plot_dir, f'time_profit_loss_half_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() # Average Profit and Loss Ratios by Phase avg_profit_ratio_first_half = df_trades['Profit_Time_Ratio_First_Half'].mean() avg_loss_ratio_first_half = df_trades['Loss_Time_Ratio_First_Half'].mean() avg_profit_ratio_second_half = df_trades['Profit_Time_Ratio_Second_Half'].mean() avg_loss_ratio_second_half = df_trades['Loss_Time_Ratio_Second_Half'].mean() ratios = pd.DataFrame({ 'Phase': ['First Half', 'Second Half'], 'Profit Ratio': [avg_profit_ratio_first_half, avg_profit_ratio_second_half], 'Loss Ratio': [avg_loss_ratio_first_half, avg_loss_ratio_second_half] }) ratios_melted = ratios.melt(id_vars='Phase', value_vars=['Profit Ratio', 'Loss Ratio'], var_name='Type', value_name='Ratio') plt.figure(figsize=(10, 6)) sns.barplot(x='Phase', y='Ratio', hue='Type', data=ratios_melted, palette=['green', 'red']) plt.title(f'Average Profit and Loss Ratios by Phase\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})', fontsize=16) plt.xlabel('Phase of Trade') plt.ylabel('Average Ratio') plt.ylim(0, 1) plt.legend(title='Type') plt.tight_layout() plt.savefig(os.path.join(plot_dir, f'avg_profit_loss_ratio_half_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() except Exception as e: logging.error(f"Error : {e}") def plot_trade_execution(test, trades_list, symbol, timeframe_name, prediction_method, plot_dir, data_source): try: data_ohlc = test[['Open', 'High', 'Low', 'Close']].copy() df_trades = pd.DataFrame(trades_list) if df_trades.empty: logging.warning(f"No trades to plot for {symbol} ({timeframe_name}) - {prediction_method}_({data_source}).") return df_trades['Exit_Reason_Detailed'] = df_trades.apply( lambda row: 'Opposite Signal (Profit)' if row['Exit_Reason'] == 'Opposite Signal' and row['Points'] > 0 else ('Opposite Signal (Loss)' if row['Exit_Reason'] == 'Opposite Signal' and row['Points'] <= 0 else row['Exit_Reason']), axis=1 ) plt.figure(figsize=(14, 7)) plt.plot(data_ohlc.index, data_ohlc['Close'], label='Close Price', color='black', linewidth=0.5) exit_reason_styles = { 'TP': {'color': EXIT_REASON_PALETTE['TP'], 'marker': 'o'}, 'SL': {'color': EXIT_REASON_PALETTE['SL'], 'marker': 'x'}, 'Close at End': {'color': EXIT_REASON_PALETTE['Close at End'], 'marker': 's'}, 'Opposite Signal (Profit)': {'color': EXIT_REASON_PALETTE['Opposite Signal (Profit)'], 'marker': '^'}, 'Opposite Signal (Loss)': {'color': EXIT_REASON_PALETTE['Opposite Signal (Loss)'], 'marker': 'v'}, } for reason, style in exit_reason_styles.items(): reason_trades = df_trades[df_trades['Exit_Reason_Detailed'] == reason] if not reason_trades.empty: plt.scatter(reason_trades['Exit_Date'], reason_trades['Exit_Price'], color=style['color'], marker=style['marker'], label=f'Exit by {reason}') plt.title(f'Trade Exits on Price Chart\n{symbol} ({timeframe_name}) - {prediction_method}_({data_source})') plt.xlabel('Date') plt.ylabel('Price') plt.legend(title='Exit Reason') plt.grid(True) plt.savefig(os.path.join(plot_dir, f'trade_exits_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() except Exception as e: logging.error(f"Error plotting trade execution: {e}") def plot_predicted_vs_actual(test, symbol, timeframe_name, prediction_method, plot_dir, data_source): try: plt.figure(figsize=(14, 7)) plt.plot(test.index, test['Close'], label='Actual Close Price', color='blue') plt.plot(test.index, test[prediction_method], label='Predicted Close Price', color='red', linewidth=0.5) plt.title( f'Actual vs Predicted Close Price for {symbol} ({timeframe_name})\nPrediction Method: {prediction_method}_({data_source})') plt.xlabel('Date') plt.ylabel('Price') plt.legend() plt.grid(True) plt.tight_layout() plt.savefig( os.path.join(plot_dir, f'actual_vs_predicted_close_{symbol}_{timeframe_name}_{prediction_method}_({data_source}).png')) plt.close() except Exception as e: logging.error(f"Error plotting Actual vs Predicted Close Price: {e}") if show_plots: def plot_sl_features_shap(sl_model, X_sl_scaled, feature_names, symbol, timeframe_name, plot_dir, data_source): try: explainer_sl = shap.TreeExplainer(sl_model) shap_values_sl = explainer_sl.shap_values(X_sl_scaled) plt.figure(figsize=(12, 8)) shap.summary_plot(shap_values_sl, X_sl_scaled, feature_names=feature_names, show=False) plt.title(f'SHAP Summary Plot for SL Features\n{symbol} ({timeframe_name}) - {data_source}', fontsize=16) plt.tight_layout() plt.savefig(os.path.join(plot_dir, f'shap_sl_features_importance_{symbol}_{timeframe_name}_({data_source}).png')) plt.close() return shap_values_sl except Exception as e: logging.error(f"Error plotting SHAP values for SL features: {e}") return None # --------------------- Genetic Algorithm Functions --------------------- def random_window_ga(): return random.choice(window_values_ga_list) def random_multiplier_ga(): return random.choice(multiplier_values_ga_list) def random_tp_factor_ga(): return random.choice(tp_factor_values_ga_list) def random_sl_factor_ga(): return random.choice(sl_factor_values_ga_list) def random_sl_max_depth_ga(): return random.choice(sl_max_depth_list) def random_sl_learning_rate_ga(): return random.choice(sl_learning_rate_list) def random_sl_n_estimators_ga(): return random.choice(sl_n_estimators_list) def custom_mutate(individual, indpb): mutation_mask = np.random.rand(len(individual)) < indpb for i, mutate in enumerate(mutation_mask): if mutate: if i == 0: new_window = random.choice(window_values_ga_list) if 10 <= new_window < 252: individual[i] = new_window elif i == 1: individual[i] = random.choice(multiplier_values_ga_list) elif i == 2: individual[i] = random.choice(tp_factor_values_ga_list) elif i == 3: individual[i] = random.choice(sl_factor_values_ga_list) elif i == 4: individual[i] = random.choice(sl_max_depth_list) elif i == 5: individual[i] = random.choice(sl_learning_rate_list) elif i == 6: individual[i] = random.choice(sl_n_estimators_list) return (individual,) def global_evaluate(individual, evaluator_instance, sl_features, ga_base_seed, backtest_data=None, data_source=None): try: if len(individual) != 7: logging.error(f"Expected 7 parameters, got {len(individual)}: {individual}") return tuple([1e-10] * len(evaluator_instance.prediction_columns)) window, multiplier, tp_factor, sl_factor, sl_max_depth, sl_learning_rate, sl_n_estimators = map(float, individual) window = int(window) if window <= 0 or multiplier <= 0: logging.warning("Invalid parameter values. Returning minimal fitness.") return tuple([1e-10] * len(evaluator_instance.prediction_columns)) sortino_ratios_no_sl = evaluator_instance.evaluate( [window, multiplier, tp_factor, sl_factor], classifier_sl=None, sl_threshold=0.7, backtest_data=backtest_data, data_source=data_source # Ensure data_source is passed ) trade_entries = [] for trade in evaluator_instance.trades_dict.get('Ensemble_Predicted_Close_Future', []): entry_date = trade.get('Entry_Date') hit_sl = trade.get('Hit_SL') if entry_date is None or hit_sl is None: continue try: entry_features = evaluator_instance.df.loc[entry_date, sl_features].to_dict() trade_entries.append({'Hit_SL': hit_sl, **entry_features}) except KeyError: continue df_trade_entries = pd.DataFrame(trade_entries) if not df_trade_entries.empty: df_trade_entries.fillna(method='ffill', inplace=True) df_trade_entries.fillna(method='bfill', inplace=True) X_sl_new = df_trade_entries[sl_features].values y_sl_new = df_trade_entries['Hit_SL'].values scaler_sl = MinMaxScaler() X_sl_scaled = scaler_sl.fit_transform(X_sl_new) try: X_train_sl, X_val_sl, y_train_sl, y_val_sl = train_test_split( X_sl_scaled, y_sl_new, test_size=0.2, random_state=ga_base_seed, stratify=y_sl_new ) except ValueError: X_train_sl, X_val_sl, y_train_sl, y_val_sl = train_test_split( X_sl_scaled, y_sl_new, test_size=0.2, random_state=ga_base_seed ) classifier_sl = xgb.XGBClassifier( objective='binary:logistic', n_estimators=int(sl_n_estimators), max_depth=int(sl_max_depth), learning_rate=sl_learning_rate, random_state=ga_base_seed, use_label_encoder=False, eval_metric='logloss', n_jobs=-1 ) classifier_sl.fit(X_train_sl, y_train_sl) joblib.dump(classifier_sl, 'sl_prediction_model.joblib') joblib.dump(scaler_sl, 'sl_scaler.joblib') scaler_X_selected_dict['Close_SL'] = scaler_sl sortino_ratios = evaluator_instance.evaluate( [window, multiplier, tp_factor, sl_factor], classifier_sl=classifier_sl, sl_threshold=0.7, backtest_data=backtest_data, data_source=data_source # Ensure data_source is passed ) return tuple(sortino_ratios) logging.warning("No trades generated for this individual. Returning minimal fitness.") return tuple([1e-10] * len(evaluator_instance.prediction_columns)) except Exception as e: logging.exception("Unexpected error in global_evaluate") return tuple([1e-10] * len(evaluator_instance.prediction_columns)) def run_ga(toolbox, ga_num_generations, ga_pop_size, label, patience=10, backtest_data=None, data_source=None): try: random.seed(ga_base_seed) np.random.seed(ga_base_seed) pop = toolbox.population(n=ga_pop_size) logging.info(f"Initial Population Size: {len(pop)}") logging.info(f"Starting genetic algorithm optimization for {label}...") stats = tools.Statistics(lambda ind: ind.fitness.values) stats.register('max', np.max) stats.register('avg', np.mean) hof = tools.HallOfFame(1) logbook = tools.Logbook() best_fitness = -np.inf patience_counter = patience gen = 0 while gen < ga_num_generations: offspring = algorithms.varAnd(pop, toolbox, cxpb=ga_cxpb, mutpb=ga_mutpb) #logging.info(f"Generation {gen}: Offspring Size: {len(offspring)}") fits = [toolbox.evaluate(ind, backtest_data=backtest_data, data_source=data_source) for ind in offspring] for fit, ind in zip(fits, offspring): ind.fitness.values = fit if not offspring: logging.error(f"Generation {gen}: Offspring population is empty.") break pop = toolbox.select(offspring, k=len(pop)) hof.update(pop) #logging.info(f"Generation {gen}: Population Size after selection: {len(pop)}") record = stats.compile(pop) logbook.record(gen=gen, **record) logging.info(f"{label} - Generation {gen}: Max {record['max']}, Avg {record['avg']}") current_max = pow(record['max'], 0.6) * pow(record['avg'], 0.4) if current_max > best_fitness: best_fitness = current_max patience_counter = patience logging.info(f"Generation {gen}: Improvement found. Resetting patience.") else: patience_counter -= 1 logging.info(f"Generation {gen}: No improvement. Patience remaining: {patience_counter}") if gen == ga_num_generations - 1 and patience_counter >= patience * 0.5: old_ga_num_generations = ga_num_generations ga_num_generations += patience patience_counter = patience - (ga_num_generations / 10) logging.info( f"Maximum generations reached and no improvement for GA, increasing generations to {ga_num_generations} (from {old_ga_num_generations})") # No continue needed here, the while loop condition will handle it if patience_counter <= 0: logging.info(f"Early stopping triggered after {gen + 1} generations for {label}.") break gen += 1 # Important: Increment gen at the end of the loop logging.info(f"Final Population Size: {len(pop)}") return hof[0] if len(hof) > 0 else None except Exception as e: logging.error(f"Error during GA optimization: {e}") return None def create_toolbox(evaluator_instance, sl_features, ga_base_seed): def evaluate_with_backtest(ind, backtest_data=None, data_source=None): return global_evaluate(ind, evaluator_instance=evaluator_instance, sl_features=sl_features, ga_base_seed=ga_base_seed, backtest_data=backtest_data, data_source=data_source) toolbox = base.Toolbox() toolbox.register('window', random_window_ga) toolbox.register('multiplier', random_multiplier_ga) toolbox.register('tp_factor', random_tp_factor_ga) toolbox.register('sl_factor', random_sl_factor_ga) toolbox.register('sl_max_depth', random_sl_max_depth_ga) toolbox.register('sl_learning_rate', random_sl_learning_rate_ga) toolbox.register('sl_n_estimators', random_sl_n_estimators_ga) toolbox.register('individual', tools.initCycle, creator.Individual, (toolbox.window, toolbox.multiplier, toolbox.tp_factor, toolbox.sl_factor, toolbox.sl_max_depth, toolbox.sl_learning_rate, toolbox.sl_n_estimators), n=1) toolbox.register('population', tools.initRepeat, list, toolbox.individual) toolbox.register('mate', tools.cxUniform, indpb=ga_mate_indpb) toolbox.register('mutate', custom_mutate, indpb=ga_mutate_indpb) toolbox.register('select', tools.selTournament, tournsize=3) toolbox.register('evaluate', evaluate_with_backtest) return toolbox # --------------------- Main Function --------------------- def main_function(): print("Starting main function...") if not mt5.initialize(): logging.error("Failed to initialize MT5.") print("Failed to initialize MT5.") return print("MT5 initialized successfully.") total_cores = cpu_count() cpu_cores = min(num_cores_to_use, total_cores) logging.info(f"Using {cpu_cores} cores for multiprocessing.") os.makedirs(plot_base_dir, exist_ok=True) for timeframe in timeframes_to_process: timeframe_name = timeframes_dict.get(timeframe, 'UNKNOWN') logging.info(f"\nProcessing timeframe: {timeframe_name}") data_dict = {} for selected_pair in currency_pairs: logging.info(f"\nLoading and processing data for pair: {selected_pair}") data = get_mt5_data(selected_pair, timeframe, start_date, end_date) if data.empty: logging.warning(f"Data for {selected_pair} in timeframe {timeframe_name} is empty. Skipping.") continue data_dict[selected_pair] = data symbol = my_directory if my_directory else selected_pair plot_dir_symbol_date = os.path.join(plot_base_dir, symbol, current_date) os.makedirs(plot_dir_symbol_date, exist_ok=True) top_pairs = [(selected_pair,)] for pair in top_pairs: symbol = pair[0] logging.info(f"\nOptimizing strategy for pair: {symbol}") df = data_dict[symbol].copy() # Calculate 'Ask' and 'Bid' symbol_info = mt5.symbol_info(symbol) if symbol_info is None: logging.error(f"Cannot retrieve symbol info for {symbol}. Using default digits=5.") point = 0.00001 spread = 0 df['Bid'] = df['Close'] df['Ask'] = df['Close'] + spread else: point = symbol_info.point spread = symbol_info.spread * point df['Bid'] = df['Close'] df['Ask'] = df['Close'] + (symbol_info.spread * point) # Corrected from 'df['Spread']' # Feature Engineering lag_features = ['Close', 'High', 'Low', 'Open'] lags = range(1, 6) for feature in lag_features: for lag in lags: df[f"{feature}_Lag_{lag}"] = df[feature].shift(lag) df['High_Future'] = df['High'].shift(-1) df['Low_Future'] = df['Low'].shift(-1) df['Close_Future'] = df['Close'].shift(-1) df.dropna(inplace=True) for price_col in ['Close', 'High', 'Low']: transformations = apply_transformations(df[price_col]) best_transform_name, best_transform_series = select_best_transformation(df[price_col], transformations) best_transform_series = replace_inf_nan(best_transform_series) df[f"{price_col}_Stationary"] = best_transform_series df['Range'] = df['High'] - df['Low'] df['Close_Open'] = df['Close'] - df['Open'] df['High_Close_Prev'] = df['High'] - df['Close'].shift(1) df['Low_Close_Prev'] = df['Low'] - df['Close'].shift(1) atr = AverageTrueRange(high=df['High'], low=df['Low'], close=df['Close'], window=14) df['ATR'] = atr.average_true_range() rsi = RSIIndicator(close=df['Close'], window=14) df['RSI'] = rsi.rsi() bb = BollingerBands(close=df['Close'], window=20, window_dev=2) df['BB_High'] = bb.bollinger_hband() df['BB_Low'] = bb.bollinger_lband() df['BB_Mid'] = bb.bollinger_mavg() df['BB_Width'] = bb.bollinger_wband() macd = MACD(close=df['Close'], window_slow=26, window_fast=12, window_sign=9) df['MACD'] = macd.macd() df['MACD_Signal'] = macd.macd_signal() df['MACD_Diff'] = macd.macd_diff() stochastic = StochasticOscillator(high=df['High'], low=df['Low'], close=df['Close'], window=14, smooth_window=3) df['Stochastic'] = stochastic.stoch() df['Stochastic_Signal'] = stochastic.stoch_signal() cci = CCIIndicator(high=df['High'], low=df['Low'], close=df['Close'], window=20) df['CCI'] = cci.cci() adx = ADXIndicator(high=df['High'], low=df['Low'], close=df['Close'], window=14) df['ADX'] = adx.adx() df['ADX_Pos'] = adx.adx_pos() df['ADX_Neg'] = adx.adx_neg() ema = EMAIndicator(close=df['Close'], window=14) df['EMA_14'] = ema.ema_indicator() df['SMA_14'] = df['Close'].rolling(window=14).mean() df['DayOfWeek'] = df.index.dayofweek df['WeekOfYear'] = df.index.isocalendar().week.astype(int) df['Month'] = df.index.month roc = ROCIndicator(close=df['Close'], window=14) df['ROC_14'] = roc.roc() df['Momentum_14'] = df['Close'] - df['Close'].shift(14) williams_r = WilliamsRIndicator(high=df['High'], low=df['Low'], close=df['Close'], lbp=14) df['WilliamsR'] = williams_r.williams_r() ichimoku = IchimokuIndicator(high=df['High'], low=df['Low'], window1=9, window2=26, window3=52) df['Ichimoku_A'] = ichimoku.ichimoku_a() df['Ichimoku_B'] = ichimoku.ichimoku_b() df['Corr_Close_RSI_14'] = df['Close'].rolling(window=14).corr(df['RSI']) df['Return_Lag_1'] = df['Close'].pct_change(1).shift(1) df['Return_Lag_2'] = df['Close'].pct_change(2).shift(1) df['Return_Lag_3'] = df['Close'].pct_change(3).shift(1) df['RSI_MACD'] = df['RSI'] * df['MACD'] df['ATR_Close'] = df['ATR'] / df['Close'] df.dropna(inplace=True) # Split data train = df[:split_date].copy() test = df[split_date:backtest_start_date] # Removed .copy() here backtest = df[backtest_start_date:backtest_end_date] # Removed .copy() here df.fillna(method='ffill', inplace=True) df.fillna(method='bfill', inplace=True) close_mean = df['Close'].mean() close_std = df['Close'].std() df = df[np.abs(df['Close'] - close_mean) < 3 * close_std] feature_columns = [ 'Open', 'High', 'Low', 'Close', 'Close_Lag_1', 'Close_Lag_2', 'Close_Lag_3', 'Close_Lag_4', 'Close_Lag_5', 'High_Lag_1', 'High_Lag_2', 'High_Lag_3', 'High_Lag_4', 'High_Lag_5', 'Low_Lag_1', 'Low_Lag_2', 'Low_Lag_3', 'Low_Lag_4', 'Low_Lag_5', 'Open_Lag_1', 'Open_Lag_2', 'Open_Lag_3', 'Open_Lag_4', 'Open_Lag_5', 'Range', 'Close_Open', 'High_Close_Prev', 'Low_Close_Prev', 'ATR', 'RSI', 'BB_High', 'BB_Low', 'BB_Mid', 'BB_Width', 'MACD', 'MACD_Signal', 'MACD_Diff', 'Stochastic', 'Stochastic_Signal', 'CCI', 'ADX', 'ADX_Pos', 'ADX_Neg', 'EMA_14', 'SMA_14', 'DayOfWeek', 'WeekOfYear', 'Month', 'ROC_14', 'Momentum_14', 'WilliamsR', 'Ichimoku_A', 'Ichimoku_B', 'Corr_Close_RSI_14', 'Return_Lag_1', 'Return_Lag_2', 'Return_Lag_3', 'RSI_MACD', 'ATR_Close', 'Close_Stationary', 'High_Stationary', 'Low_Stationary' ] # Scaling scaler_X = MinMaxScaler() scaler_y_high = MinMaxScaler() scaler_y_low = MinMaxScaler() scaler_y_close = MinMaxScaler() features = train[feature_columns].values target_high = train[['High_Future']].values target_low = train[['Low_Future']].values target_close = train[['Close_Future']].values X_train = scaler_X.fit_transform(features) y_train_high = scaler_y_high.fit_transform(target_high) y_train_low = scaler_y_low.fit_transform(target_low) y_train_close = scaler_y_close.fit_transform(target_close) test_features = test[feature_columns].values X_test = scaler_X.transform(test_features) y_test = test[['High_Future', 'Low_Future', 'Close_Future']].values backtest_features = backtest[feature_columns].values X_backtest = scaler_X.transform(backtest_features) y_backtest = backtest[['High_Future', 'Low_Future', 'Close_Future']].values # Initial Model Training for feature importance models_initial = {} shap_values_dict = {} for col, scaler_y in zip(['High', 'Low', 'Close'], [scaler_y_high, scaler_y_low, scaler_y_close]): model = xgb.XGBRegressor( objective='reg:squarederror', n_estimators=xgb_n_estimators, max_depth=xgb_max_depth, learning_rate=xgb_learning_rate, random_state=xgb_random_state, n_jobs=-1 ) model.fit(X_train, scaler_y.transform(train[[f"{col}_Future"]]).ravel()) models_initial[col] = model explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_train) shap_values_dict[col] = shap_values # Feature Selection based on SHAP selected_features_dict = {} excluded_features_dict = {} for col in ['High', 'Low', 'Close']: shap_values = shap_values_dict[col] shap_importance = np.abs(shap_values).mean(axis=0) shap_importance_df = pd.DataFrame({ 'Feature': feature_columns, 'SHAP Importance': shap_importance }) shap_importance_df.sort_values(by='SHAP Importance', ascending=False, inplace=True) N = 30 selected_features = shap_importance_df['Feature'].head(N).tolist() excluded_features = shap_importance_df['Feature'].tail(len(feature_columns) - N).tolist() selected_features_dict[col] = selected_features excluded_features_dict[col] = excluded_features if show_plots: plt.figure(figsize=(12, 8)) sns.barplot(x='SHAP Importance', y='Feature', data=shap_importance_df.head(20)) plt.title(f'SHAP Feature Importance for {symbol} ({timeframe_name}) - {col}') plt.xlabel('Mean Absolute SHAP Value') plt.ylabel('Feature') plt.tight_layout() plt.savefig(os.path.join(plot_dir_symbol_date, f'shap_feature_importance_{symbol}_{timeframe_name}_{col}.png')) plt.close() X_train_selected_dict = {} X_test_selected_dict = {} X_backtest_selected_dict = {} scaler_X_selected_dict_local = {} for col in ['High', 'Low', 'Close']: selected_features = selected_features_dict[col] features_selected = train[selected_features].values test_features_selected = test[selected_features].values backtest_features_selected = backtest[selected_features].values scaler_X_selected = MinMaxScaler() X_train_selected = scaler_X_selected.fit_transform(features_selected) X_test_selected = scaler_X_selected.transform(test_features_selected) X_backtest_selected = scaler_X_selected.transform(backtest_features_selected) X_train_selected_dict[col] = X_train_selected X_test_selected_dict[col] = X_test_selected X_backtest_selected_dict[col] = X_backtest_selected scaler_X_selected_dict_local[col] = scaler_X_selected scaler_X_selected_dict.update(scaler_X_selected_dict_local) models = {} for idx, col in enumerate(['High', 'Low', 'Close']): scaler_y = [scaler_y_high, scaler_y_low, scaler_y_close][idx] X_train_selected = X_train_selected_dict[col] model = xgb.XGBRegressor( objective='reg:squarederror', n_estimators=xgb_n_estimators, max_depth=xgb_max_depth, learning_rate=xgb_learning_rate, random_state=xgb_random_state, n_jobs=-1 ) model.fit(X_train_selected, scaler_y.transform(train[[f"{col}_Future"]]).ravel()) models[col] = model # Generate predictions only for Test period predictions_test = {} for idx, col in enumerate(['High', 'Low', 'Close']): scaler_y = [scaler_y_high, scaler_y_low, scaler_y_close][idx] X_test_selected = X_test_selected_dict[col] preds_scaled = models[col].predict(X_test_selected) preds_test = scaler_y.inverse_transform(preds_scaled.reshape(-1, 1)).flatten() predictions_test[col] = preds_test # Generate predictions only for Backtest period predictions_backtest = {} for idx, col in enumerate(['High', 'Low', 'Close']): scaler_y = [scaler_y_high, scaler_y_low, scaler_y_close][idx] X_backtest_selected = X_backtest_selected_dict[col] preds_scaled = models[col].predict(X_backtest_selected) preds_backtest = scaler_y.inverse_transform(preds_scaled.reshape(-1, 1)).flatten() predictions_backtest[col] = preds_backtest test['Predicted_High_Future'] = predictions_test['High'] test['Predicted_Low_Future'] = predictions_test['Low'] test['Predicted_Close_Future'] = predictions_test['Close'] backtest['Predicted_High_Future'] = predictions_backtest['High'] backtest['Predicted_Low_Future'] = predictions_backtest['Low'] backtest['Predicted_Close_Future'] = predictions_backtest['Close'] if show_plots and False: plot_predicted_vs_actual(test, symbol, timeframe_name, 'Predicted_Close_Future', plot_dir_symbol_date, data_source='test') plot_predicted_vs_actual(backtest, symbol, timeframe_name, 'Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') X_train_selected_close = X_train_selected_dict['Close'] scores = cross_val_score(models['Close'], X_train_selected_close, y_train_close.ravel(), cv=50, scoring='neg_mean_squared_error', n_jobs=-1) estimators_close = [ ('xgb', xgb.XGBRegressor( objective='reg:squarederror', n_estimators=xgb_n_estimators, max_depth=xgb_max_depth, learning_rate=xgb_learning_rate, random_state=xgb_random_state, n_jobs=-1 )), ('rf', RandomForestRegressor(n_estimators=100, random_state=xgb_random_state, n_jobs=-1)), ('gb', GradientBoostingRegressor(n_estimators=100, random_state=xgb_random_state)), ('lr', LinearRegression()) ] stacking_regressor_close = StackingRegressor( estimators=estimators_close, final_estimator=LinearRegression(), n_jobs=-1 ) stacking_regressor_close.fit(X_train_selected_dict['Close'], y_train_close.ravel()) predictions_scaled_close = stacking_regressor_close.predict(X_test_selected_dict['Close']) predictions_close = scaler_y_close.inverse_transform( predictions_scaled_close.reshape(-1, 1)).ravel() predictions_scaled_close_backtest = stacking_regressor_close.predict(X_backtest_selected_dict['Close']) predictions_close_backtest = scaler_y_close.inverse_transform( predictions_scaled_close_backtest.reshape(-1, 1)).ravel() test['Ensemble_Predicted_Close_Future'] = predictions_close backtest['Ensemble_Predicted_Close_Future'] = predictions_close_backtest if show_plots and False: plot_error_distribution(test, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') plot_residuals(test, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') plot_error_distribution(backtest, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') plot_residuals(backtest, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') # Change made here: Pass test and backtest directly without .copy() evaluator_ensemble_close_test = StrategyEvaluator( symbol=symbol, df=test, # Changed from test.copy() to test train=train.copy(), models=models, scaler_y_high=scaler_y_high, scaler_y_low=scaler_y_low, scaler_y_close=scaler_y_close, prediction_columns=['Ensemble_Predicted_Close_Future'] ) # Define default parameters before using window_default = 20 multiplier_default = 1.0 tp_factor_default = 2.0 sl_factor_default = 0.5 # Evaluate Strategy on Test Data evaluator_ensemble_close_test.evaluate( [window_default, multiplier_default, tp_factor_default, sl_factor_default], classifier_sl=None, sl_threshold=0.7, backtest_data=None, data_source='test' # Ensure data_source is passed ) evaluator_ensemble_close = StrategyEvaluator( symbol=symbol, df=backtest, # Changed from backtest.copy() to backtest train=train.copy(), models=models, scaler_y_high=scaler_y_high, scaler_y_low=scaler_y_low, scaler_y_close=scaler_y_close, prediction_columns=['Ensemble_Predicted_Close_Future'] ) # Evaluate Strategy on Backtest Data evaluator_ensemble_close.evaluate( [window_default, multiplier_default, tp_factor_default, sl_factor_default], classifier_sl=None, sl_threshold=0.7, backtest_data=backtest, data_source='backtest' # Ensure data_source is passed ) df_trades = pd.DataFrame(evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future']) if df_trades.empty: # Try fallback parameters fallback_window = 10 fallback_multiplier = 0.5 fallback_tp_factor = 1.0 fallback_sl_factor = 0.2 evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future'].clear() evaluator_ensemble_close.evaluate( [fallback_window, fallback_multiplier, fallback_tp_factor, fallback_sl_factor], classifier_sl=None, sl_threshold=0.7, backtest_data=backtest, data_source='backtest' # Ensure data_source is passed ) df_trades = pd.DataFrame(evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future']) if df_trades.empty: logging.warning("No trades even with fallback parameters.") continue if df_trades.empty: logging.warning("No trades to train SL model.") continue trade_entries = [] for trade in evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future']: entry_date = trade['Entry_Date'] hit_sl = trade['Hit_SL'] if entry_date in train.index: entry_features = train.loc[entry_date, sl_features].to_dict() elif entry_date in test.index: entry_features = test.loc[entry_date, sl_features].to_dict() elif entry_date in backtest.index: entry_features = backtest.loc[entry_date, sl_features].to_dict() else: continue trade_entries.append({ 'Hit_SL': hit_sl, **entry_features }) df_trade_entries = pd.DataFrame(trade_entries) if df_trade_entries.empty: logging.warning("No valid trade entries for SL model.") continue df_trade_entries.fillna(method='ffill', inplace=True) df_trade_entries.fillna(method='bfill', inplace=True) X_sl = df_trade_entries[sl_features].values y_sl = df_trade_entries['Hit_SL'].values scaler_sl = MinMaxScaler() X_sl_scaled = scaler_sl.fit_transform(X_sl) sl_max_depth_default = 5 sl_learning_rate_default = 0.1 sl_n_estimators_default = 100 try: X_train_sl, X_val_sl, y_train_sl, y_val_sl = train_test_split( X_sl_scaled, y_sl, test_size=0.2, random_state=ga_base_seed, stratify=y_sl ) except ValueError: X_train_sl, X_val_sl, y_train_sl, y_val_sl = train_test_split( X_sl_scaled, y_sl, test_size=0.2, random_state=ga_base_seed ) classifier_sl = xgb.XGBClassifier( objective='binary:logistic', n_estimators=int(sl_n_estimators_default), max_depth=int(sl_max_depth_default), learning_rate=sl_learning_rate_default, random_state=xgb_random_state, use_label_encoder=False, eval_metric='logloss', n_jobs=-1 ) classifier_sl.fit(X_train_sl, y_train_sl) joblib.dump(classifier_sl, 'sl_prediction_model.joblib') joblib.dump(scaler_sl, 'sl_scaler.joblib') scaler_X_selected_dict['Close_SL'] = scaler_sl if show_plots: plot_sl_features_shap( classifier_sl, X_sl_scaled, sl_features, symbol, timeframe_name, plot_dir_symbol_date, data_source='test' ) toolbox_ga = create_toolbox(evaluator_instance=evaluator_ensemble_close, sl_features=sl_features, ga_base_seed=ga_base_seed) best_individual = run_ga(toolbox_ga, ga_num_generations, ga_pop_size, f"Ensemble Predictions Close - GA Optimization", patience=10, backtest_data=backtest, data_source='backtest') # Added data_source='backtest' if best_individual is not None: best_params = [float(x) for x in best_individual] logging.info(f"Best GA Individual: {best_params}") evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future'].clear() evaluator_ensemble_close.evaluate( [int(best_params[0]), best_params[1], best_params[2], best_params[3]], classifier_sl=None, sl_threshold=0.7, backtest_data=backtest, data_source='backtest' # Ensure data_source is passed ) trade_entries_best = [] for trade in evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future']: entry_date = trade['Entry_Date'] hit_sl = trade['Hit_SL'] try: entry_features = evaluator_ensemble_close.df.loc[entry_date, sl_features].to_dict() trade_entries_best.append({'Hit_SL': hit_sl, **entry_features}) except KeyError: continue df_trade_entries_best = pd.DataFrame(trade_entries_best) if not df_trade_entries_best.empty: df_trade_entries_best.fillna(method='ffill', inplace=True) df_trade_entries_best.fillna(method='bfill', inplace=True) X_sl_best = df_trade_entries_best[sl_features].values y_sl_best = df_trade_entries_best['Hit_SL'].values scaler_sl_best = MinMaxScaler() X_sl_scaled_best = scaler_sl_best.fit_transform(X_sl_best) classifier_sl_best = xgb.XGBClassifier( objective='binary:logistic', n_estimators=int(best_params[6]), max_depth=int(best_params[4]), learning_rate=best_params[5], random_state=xgb_random_state, use_label_encoder=False, eval_metric='logloss', n_jobs=-1 ) classifier_sl_best.fit(X_sl_scaled_best, y_sl_best) scaler_X_selected_dict['Close_SL'] = scaler_sl_best joblib.dump(classifier_sl_best, 'sl_prediction_model_best.joblib') joblib.dump(scaler_sl_best, 'sl_scaler_best.joblib') evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future'].clear() evaluator_ensemble_close.evaluate( [int(best_params[0]), best_params[1], best_params[2], best_params[3]], classifier_sl=classifier_sl_best, sl_threshold=0.7, backtest_data=backtest, data_source='backtest' # Ensure data_source is passed ) logging.info("Best GA Individual Strategy Evaluation Complete.") else: logging.warning("No trade entries found for the best GA individual.") continue else: logging.warning(f"No valid GA individual found for {symbol} ({timeframe_name}). Skipping.") continue if show_plots: plot_error_distribution(test, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') plot_residuals(test, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') backtest_strategy(evaluator_ensemble_close_test.df, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') trades_list_test = evaluator_ensemble_close_test.trades_dict['Ensemble_Predicted_Close_Future'] plot_trade_metrics(trades_list_test, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') plot_trade_execution(evaluator_ensemble_close_test.df, trades_list_test, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') plot_predicted_vs_actual(evaluator_ensemble_close_test.df, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='test') plot_error_distribution(backtest, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') plot_residuals(backtest, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') backtest_strategy(backtest, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') trades_list = evaluator_ensemble_close.trades_dict['Ensemble_Predicted_Close_Future'] plot_trade_metrics(trades_list, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') plot_trade_execution(backtest, trades_list, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') plot_predicted_vs_actual(backtest, symbol, timeframe_name, 'Ensemble_Predicted_Close_Future', plot_dir_symbol_date, data_source='backtest') mse_ensemble = mean_squared_error(backtest['Close_Future'], backtest['Ensemble_Predicted_Close_Future']) mae_ensemble = mean_absolute_error(backtest['Close_Future'], backtest['Ensemble_Predicted_Close_Future']) r2_ensemble = r2_score(backtest['Close_Future'], backtest['Ensemble_Predicted_Close_Future']) mape_ensemble = mean_absolute_percentage_error(backtest['Close_Future'], backtest['Ensemble_Predicted_Close_Future']) logging.info(f"Ensemble Model Performance for {symbol} ({timeframe_name}):") logging.info(f"MSE: {mse_ensemble:.4f}") logging.info(f"MAE: {mae_ensemble:.4f}") logging.info(f"R² Score: {r2_ensemble:.4f}") logging.info(f"MAPE: {mape_ensemble:.4f}") del df, train, test, backtest, models, predictions_test, predictions_backtest gc.collect() gc.collect() gc.collect() if __name__ == '__main__': try: main_function() finally: logging.info("MT5 shutdown successfully.") print("MT5 shutdown successfully.") mt5.shutdown()