old musi stuff
This commit is contained in:
105
windowing.py
Normal file
105
windowing.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import ray
|
||||
from pathlib import Path
|
||||
import os
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from utils import *
|
||||
|
||||
# Initialize Ray
|
||||
ray.init()
|
||||
print(ray.cluster_resources())
|
||||
print(ray.available_resources())
|
||||
|
||||
print(ray.nodes())
|
||||
|
||||
# feature list
|
||||
def windowing(filename, window_size):
|
||||
data = pd.read_csv(filename)
|
||||
|
||||
feat_headers = []
|
||||
|
||||
# Get the zero one classification
|
||||
y = data['lost'].to_numpy()
|
||||
indxs = np.where(y==1)
|
||||
mask = np.full(shape=y.shape, fill_value=-1, dtype=int)
|
||||
mask[indxs] = 1
|
||||
y = mask
|
||||
|
||||
|
||||
# Get the features
|
||||
X = data[feat_headers].to_numpy()
|
||||
|
||||
# Get the windowed data
|
||||
windows, lbls = window_signal(X, y, mode='center', window_size=window_size)
|
||||
|
||||
return windows, lbls
|
||||
|
||||
# Multiprocessing the windowing of the entire directory for each window size
|
||||
@ray.remote
|
||||
def process_file(file_path, output_dir, window_size):
|
||||
print(f"Processing file: {file_path}")
|
||||
|
||||
out_feats = ['pwr-delta', 'pwr-theta', 'pwr-low-alpha', 'pwr-high-alpha',
|
||||
'pwr-low-beta', 'pwr-high-beta',
|
||||
'pwr-delta/beta', 'pwr-theta/beta', 'pwr-alpha/beta', 'pwr-alpha-theta']
|
||||
|
||||
"""Process individual files in parallel"""
|
||||
print(f"Processing file: {file_path} for window size {window_size}")
|
||||
|
||||
# Thread-safe windowing operation
|
||||
windows, lbls = windowing(file_path, window_size)
|
||||
|
||||
# Create DataFrame with dynamic column names
|
||||
feature_cols = get_header(out_feats, window_size).split(',')
|
||||
windowed_df = pd.DataFrame(
|
||||
np.hstack((windows, lbls)),
|
||||
columns=feature_cols
|
||||
)
|
||||
|
||||
# Save with window size in filename
|
||||
output_path = output_dir / f"{file_path.stem}_windowed_{window_size}.csv"
|
||||
windowed_df.to_csv(output_path, index=False)
|
||||
|
||||
return output_path
|
||||
|
||||
@ray.remote
|
||||
def window_files(input_dir, output_dir, window_size):
|
||||
output_dir = Path(output_dir) / f"window_size_{window_size}"
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Collect all CSV paths
|
||||
csv_paths = []
|
||||
for root, _, files in os.walk(input_dir):
|
||||
for f in files:
|
||||
if f.endswith('.csv'):
|
||||
csv_paths.append(Path(root)/f)
|
||||
|
||||
# Process files in parallel
|
||||
file_tasks = [process_file.remote(path, output_dir, window_size) for path in csv_paths]
|
||||
results = ray.get(file_tasks)
|
||||
|
||||
print(f"Processed {len(results)} files for window size {window_size}")
|
||||
return results
|
||||
|
||||
def main():
|
||||
window_sizes = [50]
|
||||
input_dirs = ['train/raw', 'test/raw']
|
||||
output_dirs = ['train', 'test']
|
||||
|
||||
tasks = [(input_dir, output_dir, window_size)
|
||||
for window_size in window_sizes
|
||||
for input_dir, output_dir in zip(input_dirs, output_dirs)]
|
||||
|
||||
# Use Ray for outer parallelization
|
||||
outer_tasks = [window_files.remote(input_dir, output_dir, window_size)
|
||||
for input_dir, output_dir, window_size in tasks]
|
||||
|
||||
# Get all results
|
||||
all_results = ray.get(outer_tasks)
|
||||
|
||||
# Flatten results if needed
|
||||
flat_results = [item for sublist in all_results for item in sublist]
|
||||
print(f"Total processed files: {len(flat_results)}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user