import ray from pathlib import Path import os import pandas as pd import numpy as np from utils import * # Initialize Ray ray.init() print(ray.cluster_resources()) print(ray.available_resources()) print(ray.nodes()) # feature list def windowing(filename, window_size): data = pd.read_csv(filename) feat_headers = [] # Get the zero one classification y = data['lost'].to_numpy() indxs = np.where(y==1) mask = np.full(shape=y.shape, fill_value=-1, dtype=int) mask[indxs] = 1 y = mask # Get the features X = data[feat_headers].to_numpy() # Get the windowed data windows, lbls = window_signal(X, y, mode='center', window_size=window_size) return windows, lbls # Multiprocessing the windowing of the entire directory for each window size @ray.remote def process_file(file_path, output_dir, window_size): print(f"Processing file: {file_path}") out_feats = ['pwr-delta', 'pwr-theta', 'pwr-low-alpha', 'pwr-high-alpha', 'pwr-low-beta', 'pwr-high-beta', 'pwr-delta/beta', 'pwr-theta/beta', 'pwr-alpha/beta', 'pwr-alpha-theta'] """Process individual files in parallel""" print(f"Processing file: {file_path} for window size {window_size}") # Thread-safe windowing operation windows, lbls = windowing(file_path, window_size) # Create DataFrame with dynamic column names feature_cols = get_header(out_feats, window_size).split(',') windowed_df = pd.DataFrame( np.hstack((windows, lbls)), columns=feature_cols ) # Save with window size in filename output_path = output_dir / f"{file_path.stem}_windowed_{window_size}.csv" windowed_df.to_csv(output_path, index=False) return output_path @ray.remote def window_files(input_dir, output_dir, window_size): output_dir = Path(output_dir) / f"window_size_{window_size}" output_dir.mkdir(parents=True, exist_ok=True) # Collect all CSV paths csv_paths = [] for root, _, files in os.walk(input_dir): for f in files: if f.endswith('.csv'): csv_paths.append(Path(root)/f) # Process files in parallel file_tasks = [process_file.remote(path, output_dir, window_size) for path in csv_paths] results = ray.get(file_tasks) print(f"Processed {len(results)} files for window size {window_size}") return results def main(): window_sizes = [50] input_dirs = ['train/raw', 'test/raw'] output_dirs = ['train', 'test'] tasks = [(input_dir, output_dir, window_size) for window_size in window_sizes for input_dir, output_dir in zip(input_dirs, output_dirs)] # Use Ray for outer parallelization outer_tasks = [window_files.remote(input_dir, output_dir, window_size) for input_dir, output_dir, window_size in tasks] # Get all results all_results = ray.get(outer_tasks) # Flatten results if needed flat_results = [item for sublist in all_results for item in sublist] print(f"Total processed files: {len(flat_results)}") if __name__ == '__main__': main()