import argparse import json import os import re import sqlite3 import subprocess import sys from dataclasses import dataclass, field, fields from multiprocessing.pool import ThreadPool from pathlib import Path import polars as pl from tqdm import tqdm sys.path.append("extensions/apps/traceAnalyzer/scripts") from metrics import (average_response_latency_in_ns, max_response_latency_in_ns, maximum_data_rate, memory_active_in_percent) @dataclass class Options: dramsys: Path | None override: bool out_dir: Path simulate: bool metrics: bool base_config: Path | None resource_dir: Path | None jobs: int | None @dataclass(frozen=True) class Statistics: filename: str databus_utilization: float bandwidth: float max_bandwidth: float avg_latency: float max_latency: float @dataclass(frozen=True) class ConfigTokens: name: str tokens: dict[str, str | int] @dataclass class Simulation: config_tokens: ConfigTokens directory: Path | None = None statistics: list[Statistics] = field(default_factory=list) def run_dramsys(dramsys: Path, simulation_dir: Path, resource_dir: Path | None): with open(simulation_dir / "stdout.txt", "w", encoding="utf-8") as output_file: command = [dramsys.absolute().as_posix(), "config.json"] if resource_dir is not None: command.append(resource_dir.absolute().as_posix()) subprocess.run( command, cwd=simulation_dir, stdout=output_file, stderr=output_file, check=True, ) def calculate_simulation_metrics(simulation: Simulation): simulation_dir = simulation.directory stats: list[Statistics] = [] for file in os.listdir(simulation_dir): if file.endswith(".tdb"): connection = sqlite3.connect(f"{simulation_dir}/{file}") try: max_bandwidth = maximum_data_rate(connection) / 1000 avg_latency = average_response_latency_in_ns(connection) max_latency = max_response_latency_in_ns(connection) databus_utilization = memory_active_in_percent(connection) / 100 bandwidth = databus_utilization * max_bandwidth stats.append( Statistics( file, databus_utilization, bandwidth, max_bandwidth, avg_latency, max_latency, ) ) except Exception as error: print( f"Warning: Could not calculate metrics for {simulation_dir}/{file}: {error}" ) simulation.statistics = stats # Replace placeholders with actual values def replace_placeholders(config_json: str, tokens: dict) -> str: for key, value in tokens.items(): placeholder = f"<{key}>" config_json = config_json.replace(placeholder, str(value)) return config_json @dataclass class WorkItem: dramsys: Path simulation: Simulation base_config: Path resource_dir: Path | None def simulate( work_item: WorkItem, ): simulation_dir = work_item.simulation.directory assert simulation_dir json_config = None with open(work_item.base_config, encoding="utf-8") as config_file: config_string = config_file.read() config_string = replace_placeholders( config_string, work_item.simulation.config_tokens.tokens ) json_config = json.loads(config_string) simulation_json = simulation_dir / "config.json" # Save config besides simulation directory with open(simulation_json, "w", encoding="utf-8") as config_file: json.dump(json_config, config_file, indent=4) run_dramsys(work_item.dramsys, simulation_dir, work_item.resource_dir) calculate_simulation_metrics(work_item.simulation) def generate_dataframe(simulations: list[Simulation], out_dir: Path) -> pl.DataFrame: # Pack results in dataframe labels = ["name", "channel"] statistic_labels = [field.name for field in fields(Statistics)] # Get one simulation... config_keys = simulations[0].config_tokens.tokens.keys() labels.extend(config_keys) labels.extend(statistic_labels) entries = [] for simulation in simulations: config_values = simulation.config_tokens.tokens.values() for stat in simulation.statistics: m = re.search("(?<=ch)[0-9]+", stat.filename) channel = m.group(0) if m else -1 entries.append( [ simulation.config_tokens.name, channel, *config_values, stat.filename, stat.databus_utilization, stat.bandwidth, stat.max_bandwidth, stat.avg_latency, stat.max_latency, ] ) df = pl.DataFrame(data=entries, schema=labels) df.write_csv(out_dir / "statistics.csv") return df def populate_simulation_directories( simulations: list[Simulation], out_dir: Path, override: bool ): for simulation in simulations: simulation_dir = out_dir / "simulations" / simulation.config_tokens.name try: simulation_dir.mkdir(parents=True, exist_ok=override) except FileExistsError: print( "Previous simulations artifacts found. To continue, enable the force override flag." ) sys.exit(-1) simulation.directory = simulation_dir def calculate_metrics( simulations: list[Simulation], out_dir: Path, jobs: int | None ) -> pl.DataFrame: populate_simulation_directories(simulations, out_dir, override=True) with ThreadPool(jobs) as thread_pool: for _ in tqdm( thread_pool.imap_unordered(calculate_simulation_metrics, simulations), total=len(simulations), ): pass return generate_dataframe(simulations, out_dir) def run_simulations(simulations: list[Simulation], options: Options) -> pl.DataFrame: if len(simulations) == 0: print("Must specify at least one simulation configuration!") sys.exit(-1) if options.dramsys is None: print("Must specify DRAMSys executable!") sys.exit(-1) if options.base_config is None: print("Must specify a base config!") sys.exit(-1) print("Create simulation directories...") populate_simulation_directories(simulations, options.out_dir, options.override) print("Run simulations...") with ThreadPool(options.jobs) as thread_pool: args = list( WorkItem( options.dramsys, simulation, options.base_config, options.resource_dir ) for simulation in simulations ) for _ in tqdm(thread_pool.imap_unordered(simulate, args), total=len(args)): pass print("Calculate metrics...") return calculate_metrics(simulations, options.out_dir, options.jobs) def get_argparser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="DRAMSys simulation utility") parser.add_argument( "dramsys", type=Path, nargs="?", help="path to the DRAMSys executable" ) parser.add_argument( "--simulate", default=False, action="store_true", help="run the simulations generating simulation artifacts", ) parser.add_argument( "--metrics", default=False, action="store_true", help="calculate the metrics from existing simulation artifacts", ) parser.add_argument( "-f", "--force", dest="override", default=False, action="store_true", help="force override existing simulation artifacts", ) parser.add_argument( "--out-dir", type=Path, default="out", help="path to the output directory", ) parser.add_argument( "--base-config", type=Path, help="path to the base configuration file", ) parser.add_argument( "--resource-dir", type=Path, help="path to the resource directory", ) parser.add_argument( "-j", "--jobs", metavar="N", type=int, default=None, help="run N jobs in parallel", ) return parser def get_options(args: argparse.Namespace) -> Options: return Options(**vars(args)) def simulation_results( options: Options, simulations: list[Simulation], ) -> pl.DataFrame: if options.simulate: return run_simulations(simulations, options) if options.metrics: return calculate_metrics(simulations, options.out_dir, options.jobs) print("Summarizing simulation results in statistics.csv...") statistics_file = options.out_dir / "statistics.csv" if not os.path.isfile(statistics_file): print("Run the simulations first to generate simulation artifacts") sys.exit(-1) return pl.read_csv(statistics_file)