import argparse import subprocess import sys import sqlite3 import json import os import re from pathlib import Path from dataclasses import dataclass, fields from typing import Optional from multiprocessing.pool import ThreadPool from tqdm import tqdm import pandas as pd sys.path.append("extensions/apps/traceAnalyzer/scripts") from metrics import ( average_response_latency_in_ns, max_response_latency_in_ns, memory_active_in_percent, maximum_data_rate, ) @dataclass class Options: dramsys: Path override: bool out_dir: Path simulate: bool metrics: bool base_config: Path | None resource_dir: Path | None = None jobs: int | None = None @dataclass(frozen=True) class SubConfig: name: str parameters: dict[str, str] @dataclass(frozen=True) class Statistics: filename: str databus_utilization: float bandwidth: float max_bandwidth: float avg_latency: float max_latency: float @dataclass(frozen=True) class Configuration: name: str tokens: dict[str, str | int] @dataclass class Simulation: config: Configuration directory: Optional[str] = None statistics: Optional[list[Statistics]] = None def run_dramsys(dramsys: Path, simulation_dir: Path, resource_dir: Path | None): with open(f"{simulation_dir}/out.txt", "w", encoding="utf-8") as output_file: command = [dramsys.absolute(), "config.json"] if resource_dir is not None: command.append(resource_dir) subprocess.run(command, cwd=simulation_dir, stdout=output_file, check=True) def calculate_simulation_metrics(simulation: Simulation): simulation_dir = simulation.directory stats: list[Statistics] = [] for file in os.listdir(simulation_dir): if file.endswith(".tdb"): connection = sqlite3.connect(f"{simulation_dir}/{file}") try: max_bandwidth = maximum_data_rate(connection) / 1000 avg_latency = average_response_latency_in_ns(connection) max_latency = max_response_latency_in_ns(connection) databus_utilization = memory_active_in_percent(connection) / 100 bandwidth = databus_utilization * max_bandwidth stats.append( Statistics( file, databus_utilization, bandwidth, max_bandwidth, avg_latency, max_latency, ) ) except Exception as error: print( f"Warning: Could not calculate metrics for {simulation_dir}/{file}: {error}" ) simulation.statistics = stats # Replace placeholders with actual values def replace_placeholders(config_json: str, tokens: dict) -> str: for key, value in tokens.items(): placeholder = f"<{key}>" config_json = config_json.replace(placeholder, str(value)) return config_json @dataclass class WorkItem: dramsys: Path simulation: Simulation base_config: Path resource_dir: Path | None def simulate( work_item: WorkItem, ): simulation_dir = work_item.simulation.directory json_config = None with open(work_item.base_config, encoding="utf-8") as config_file: config_string = config_file.read() config_string = replace_placeholders( config_string, work_item.simulation.config.tokens ) json_config = json.loads(config_string) simulation_json = simulation_dir / "config.json" # Save config besides simulation directory with open(simulation_json, "w", encoding="utf-8") as config_file: json.dump(json_config, config_file, indent=4) run_dramsys(work_item.dramsys, simulation_dir, work_item.resource_dir) calculate_simulation_metrics(work_item.simulation) def generate_dataframe(simulations: list[Simulation], out_dir: str) -> pd.DataFrame: # Pack results in a panda dataframe labels = ["name", "channel"] statistic_labels = list(map(lambda field: field.name, fields(Statistics))) # Get one simulation... config_keys, _ = zip(*simulations[0].config.tokens.items()) labels.extend(config_keys) labels.extend(statistic_labels) entries = [] for simulation in simulations: _, config_values = zip(*simulation.config.tokens.items()) for stat in simulation.statistics: channel_pattern = re.compile("(?<=ch)[0-9]+") channel = int(channel_pattern.search(stat.filename)[0]) entries.append( [ simulation.config.name, channel, *config_values, stat.filename, stat.databus_utilization, stat.bandwidth, stat.max_bandwidth, stat.avg_latency, stat.max_latency, ] ) dataframe = pd.DataFrame(data=entries, columns=labels) dataframe.to_csv(f"{out_dir}/statistics.csv", sep=";") return dataframe def populate_simulation_directories( simulations: list[Simulation], out_dir: str, override: bool ): for simulation in simulations: simulation_dir = Path(f"{out_dir}/simulations/{simulation.config.name}") try: simulation_dir.mkdir(parents=True, exist_ok=override) except FileExistsError: print( "Previous simulations artifacts found. To continue, enable the force override flag." ) sys.exit(-1) simulation.directory = simulation_dir def calculate_metrics( simulations: list[Simulation], out_dir: str, jobs: int | None ) -> pd.DataFrame: populate_simulation_directories(simulations, out_dir, override=True) with ThreadPool(jobs) as thread_pool: for _ in tqdm( thread_pool.imap_unordered(calculate_simulation_metrics, simulations), total=len(simulations), ): pass return generate_dataframe(simulations, out_dir) def run_simulations(simulations: list[Simulation], options: Options) -> pd.DataFrame: if len(simulations) == 0: print("Must specify at least one simulation configuration!") sys.exit(-1) if options.base_config is None: print("Must specify a base config") sys.exit(-1) print("Create simulation directories...") populate_simulation_directories(simulations, options.out_dir, options.override) print("Run simulations...") with ThreadPool(options.jobs) as thread_pool: args = list( WorkItem( options.dramsys, simulation, options.base_config, options.resource_dir ) for simulation in simulations ) for _ in tqdm(thread_pool.imap_unordered(simulate, args), total=len(args)): pass print("Calculate metrics...") return calculate_metrics(simulations, options.out_dir, options.jobs) def get_options_from_args() -> Options: parser = argparse.ArgumentParser(description="DRAMSys simulation utility") parser.add_argument("dramsys", type=Path, help="path to the DRAMSys executable") parser.add_argument( "--simulate", default=False, action="store_true", help="run the simulations generating simulation artifacts", ) parser.add_argument( "--metrics", default=False, action="store_true", help="calculate the metrics from existing simulation artifacts", ) parser.add_argument( "-f", "--force", default=False, action="store_true", help="force override existing simulation artifacts", ) parser.add_argument( "--out-dir", type=Path, default="out", help="path to the output directory", ) parser.add_argument( "--base-config", type=Path, help="path to the base configuration file", ) parser.add_argument( "--resource-dir", type=Path, help="path to the resource directory", ) parser.add_argument( "-j", "--jobs", metavar="N", type=int, default=None, help="run N jobs in parallel", ) arguments = parser.parse_args() return Options( arguments.dramsys, arguments.force, arguments.out_dir, arguments.simulate, arguments.metrics, arguments.base_config, arguments.resource_dir, arguments.jobs, ) def simulation_results( options: Options, simulations: list[Simulation], ) -> pd.DataFrame: if options.simulate: return run_simulations(simulations, options) if options.metrics: return calculate_metrics(simulations, options.out_dir, options.jobs) print("Summarizing simulation results in statistics.csv...") statistics_file = f"{options.out_dir}/statistics.csv" if not os.path.isfile(statistics_file): print("Run the simulations first to generate simulation artifacts") sys.exit(-1) return pd.read_csv(f"{options.out_dir}/statistics.csv", sep=";")