106 lines
3.1 KiB
Python
106 lines
3.1 KiB
Python
import ray
|
|
from pathlib import Path
|
|
import os
|
|
import pandas as pd
|
|
import numpy as np
|
|
from utils import *
|
|
|
|
# Initialize Ray
|
|
ray.init()
|
|
print(ray.cluster_resources())
|
|
print(ray.available_resources())
|
|
|
|
print(ray.nodes())
|
|
|
|
# feature list
|
|
def windowing(filename, window_size):
|
|
data = pd.read_csv(filename)
|
|
|
|
feat_headers = []
|
|
|
|
# Get the zero one classification
|
|
y = data['lost'].to_numpy()
|
|
indxs = np.where(y==1)
|
|
mask = np.full(shape=y.shape, fill_value=-1, dtype=int)
|
|
mask[indxs] = 1
|
|
y = mask
|
|
|
|
|
|
# Get the features
|
|
X = data[feat_headers].to_numpy()
|
|
|
|
# Get the windowed data
|
|
windows, lbls = window_signal(X, y, mode='center', window_size=window_size)
|
|
|
|
return windows, lbls
|
|
|
|
# Multiprocessing the windowing of the entire directory for each window size
|
|
@ray.remote
|
|
def process_file(file_path, output_dir, window_size):
|
|
print(f"Processing file: {file_path}")
|
|
|
|
out_feats = ['pwr-delta', 'pwr-theta', 'pwr-low-alpha', 'pwr-high-alpha',
|
|
'pwr-low-beta', 'pwr-high-beta',
|
|
'pwr-delta/beta', 'pwr-theta/beta', 'pwr-alpha/beta', 'pwr-alpha-theta']
|
|
|
|
"""Process individual files in parallel"""
|
|
print(f"Processing file: {file_path} for window size {window_size}")
|
|
|
|
# Thread-safe windowing operation
|
|
windows, lbls = windowing(file_path, window_size)
|
|
|
|
# Create DataFrame with dynamic column names
|
|
feature_cols = get_header(out_feats, window_size).split(',')
|
|
windowed_df = pd.DataFrame(
|
|
np.hstack((windows, lbls)),
|
|
columns=feature_cols
|
|
)
|
|
|
|
# Save with window size in filename
|
|
output_path = output_dir / f"{file_path.stem}_windowed_{window_size}.csv"
|
|
windowed_df.to_csv(output_path, index=False)
|
|
|
|
return output_path
|
|
|
|
@ray.remote
|
|
def window_files(input_dir, output_dir, window_size):
|
|
output_dir = Path(output_dir) / f"window_size_{window_size}"
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Collect all CSV paths
|
|
csv_paths = []
|
|
for root, _, files in os.walk(input_dir):
|
|
for f in files:
|
|
if f.endswith('.csv'):
|
|
csv_paths.append(Path(root)/f)
|
|
|
|
# Process files in parallel
|
|
file_tasks = [process_file.remote(path, output_dir, window_size) for path in csv_paths]
|
|
results = ray.get(file_tasks)
|
|
|
|
print(f"Processed {len(results)} files for window size {window_size}")
|
|
return results
|
|
|
|
def main():
|
|
window_sizes = [50]
|
|
input_dirs = ['train/raw', 'test/raw']
|
|
output_dirs = ['train', 'test']
|
|
|
|
tasks = [(input_dir, output_dir, window_size)
|
|
for window_size in window_sizes
|
|
for input_dir, output_dir in zip(input_dirs, output_dirs)]
|
|
|
|
# Use Ray for outer parallelization
|
|
outer_tasks = [window_files.remote(input_dir, output_dir, window_size)
|
|
for input_dir, output_dir, window_size in tasks]
|
|
|
|
# Get all results
|
|
all_results = ray.get(outer_tasks)
|
|
|
|
# Flatten results if needed
|
|
flat_results = [item for sublist in all_results for item in sublist]
|
|
print(f"Total processed files: {len(flat_results)}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|