Files
DRAMSys/scripts/simulation/simulation.py

329 lines
9.1 KiB
Python
Executable File

import argparse
import json
import os
import re
import sqlite3
import subprocess
import sys
from dataclasses import dataclass, field, fields
from multiprocessing.pool import ThreadPool
from pathlib import Path
import polars as pl
from tqdm import tqdm
sys.path.append("extensions/apps/traceAnalyzer/scripts")
from metrics import (average_response_latency_in_ns,
max_response_latency_in_ns, maximum_data_rate,
memory_active_in_percent)
@dataclass
class Options:
dramsys: Path | None
override: bool
out_dir: Path
simulate: bool
metrics: bool
base_config: Path | None
resource_dir: Path | None
jobs: int | None
@dataclass(frozen=True)
class Statistics:
filename: str
databus_utilization: float
bandwidth: float
max_bandwidth: float
avg_latency: float
max_latency: float
@dataclass(frozen=True)
class ConfigTokens:
name: str
tokens: dict[str, str | int]
@dataclass
class Simulation:
config_tokens: ConfigTokens
directory: Path | None = None
statistics: list[Statistics] = field(default_factory=list)
def run_dramsys(dramsys: Path, simulation_dir: Path, resource_dir: Path | None):
with open(simulation_dir / "stdout.txt", "w", encoding="utf-8") as output_file:
command = [dramsys.absolute().as_posix(), "config.json"]
if resource_dir is not None:
command.append(resource_dir.absolute().as_posix())
subprocess.run(
command,
cwd=simulation_dir,
stdout=output_file,
stderr=output_file,
check=True,
)
def calculate_simulation_metrics(simulation: Simulation):
simulation_dir = simulation.directory
stats: list[Statistics] = []
for file in os.listdir(simulation_dir):
if file.endswith(".tdb"):
connection = sqlite3.connect(f"{simulation_dir}/{file}")
try:
max_bandwidth = maximum_data_rate(connection) / 1000
avg_latency = average_response_latency_in_ns(connection)
max_latency = max_response_latency_in_ns(connection)
databus_utilization = memory_active_in_percent(connection) / 100
bandwidth = databus_utilization * max_bandwidth
stats.append(
Statistics(
file,
databus_utilization,
bandwidth,
max_bandwidth,
avg_latency,
max_latency,
)
)
except Exception as error:
print(
f"Warning: Could not calculate metrics for {simulation_dir}/{file}: {error}"
)
simulation.statistics = stats
# Replace placeholders with actual values
def replace_placeholders(config_json: str, tokens: dict) -> str:
for key, value in tokens.items():
placeholder = f"<{key}>"
config_json = config_json.replace(placeholder, str(value))
return config_json
@dataclass
class WorkItem:
dramsys: Path
simulation: Simulation
base_config: Path
resource_dir: Path | None
def simulate(
work_item: WorkItem,
):
simulation_dir = work_item.simulation.directory
assert simulation_dir
json_config = None
with open(work_item.base_config, encoding="utf-8") as config_file:
config_string = config_file.read()
config_string = replace_placeholders(
config_string, work_item.simulation.config_tokens.tokens
)
json_config = json.loads(config_string)
simulation_json = simulation_dir / "config.json"
# Save config besides simulation directory
with open(simulation_json, "w", encoding="utf-8") as config_file:
json.dump(json_config, config_file, indent=4)
run_dramsys(work_item.dramsys, simulation_dir, work_item.resource_dir)
calculate_simulation_metrics(work_item.simulation)
def generate_dataframe(simulations: list[Simulation], out_dir: Path) -> pl.DataFrame:
# Pack results in dataframe
labels = ["name", "channel"]
statistic_labels = [field.name for field in fields(Statistics)]
# Get one simulation...
config_keys = simulations[0].config_tokens.tokens.keys()
labels.extend(config_keys)
labels.extend(statistic_labels)
entries = []
for simulation in simulations:
config_values = simulation.config_tokens.tokens.values()
for stat in simulation.statistics:
m = re.search("(?<=ch)[0-9]+", stat.filename)
channel = m.group(0) if m else -1
entries.append(
[
simulation.config_tokens.name,
channel,
*config_values,
stat.filename,
stat.databus_utilization,
stat.bandwidth,
stat.max_bandwidth,
stat.avg_latency,
stat.max_latency,
]
)
df = pl.DataFrame(data=entries, schema=labels)
df.write_csv(out_dir / "statistics.csv")
return df
def populate_simulation_directories(
simulations: list[Simulation], out_dir: Path, override: bool
):
for simulation in simulations:
simulation_dir = out_dir / "simulations" / simulation.config_tokens.name
try:
simulation_dir.mkdir(parents=True, exist_ok=override)
except FileExistsError:
print(
"Previous simulations artifacts found. To continue, enable the force override flag."
)
sys.exit(-1)
simulation.directory = simulation_dir
def calculate_metrics(
simulations: list[Simulation], out_dir: Path, jobs: int | None
) -> pl.DataFrame:
populate_simulation_directories(simulations, out_dir, override=True)
with ThreadPool(jobs) as thread_pool:
for _ in tqdm(
thread_pool.imap_unordered(calculate_simulation_metrics, simulations),
total=len(simulations),
):
pass
return generate_dataframe(simulations, out_dir)
def run_simulations(simulations: list[Simulation], options: Options) -> pl.DataFrame:
if len(simulations) == 0:
print("Must specify at least one simulation configuration!")
sys.exit(-1)
if options.dramsys is None:
print("Must specify DRAMSys executable!")
sys.exit(-1)
if options.base_config is None:
print("Must specify a base config!")
sys.exit(-1)
print("Create simulation directories...")
populate_simulation_directories(simulations, options.out_dir, options.override)
print("Run simulations...")
with ThreadPool(options.jobs) as thread_pool:
args = list(
WorkItem(
options.dramsys, simulation, options.base_config, options.resource_dir
)
for simulation in simulations
)
for _ in tqdm(thread_pool.imap_unordered(simulate, args), total=len(args)):
pass
print("Calculate metrics...")
return calculate_metrics(simulations, options.out_dir, options.jobs)
def get_argparser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="DRAMSys simulation utility")
parser.add_argument(
"dramsys", type=Path, nargs="?", help="path to the DRAMSys executable"
)
parser.add_argument(
"--simulate",
default=False,
action="store_true",
help="run the simulations generating simulation artifacts",
)
parser.add_argument(
"--metrics",
default=False,
action="store_true",
help="calculate the metrics from existing simulation artifacts",
)
parser.add_argument(
"-f",
"--force",
dest="override",
default=False,
action="store_true",
help="force override existing simulation artifacts",
)
parser.add_argument(
"--out-dir",
type=Path,
default="out",
help="path to the output directory",
)
parser.add_argument(
"--base-config",
type=Path,
help="path to the base configuration file",
)
parser.add_argument(
"--resource-dir",
type=Path,
help="path to the resource directory",
)
parser.add_argument(
"-j",
"--jobs",
metavar="N",
type=int,
default=None,
help="run N jobs in parallel",
)
return parser
def get_options(args: argparse.Namespace) -> Options:
return Options(**vars(args))
def simulation_results(
options: Options,
simulations: list[Simulation],
) -> pl.DataFrame:
if options.simulate:
return run_simulations(simulations, options)
if options.metrics:
return calculate_metrics(simulations, options.out_dir, options.jobs)
print("Summarizing simulation results in statistics.csv...")
statistics_file = options.out_dir / "statistics.csv"
if not os.path.isfile(statistics_file):
print("Run the simulations first to generate simulation artifacts")
sys.exit(-1)
return pl.read_csv(statistics_file)