Files
DRAMSys/scripts/simulation/simulation.py

332 lines
9.1 KiB
Python
Executable File

import argparse
import subprocess
import sys
import sqlite3
import json
import os
import re
from pathlib import Path
from dataclasses import dataclass, fields
from typing import Optional
from multiprocessing.pool import ThreadPool
from tqdm import tqdm
import pandas as pd
sys.path.append("extensions/apps/traceAnalyzer/scripts")
from metrics import (
average_response_latency_in_ns,
max_response_latency_in_ns,
memory_active_in_percent,
maximum_data_rate,
)
@dataclass
class Options:
dramsys: Path
override: bool
out_dir: Path
simulate: bool
metrics: bool
base_config: Path | None
resource_dir: Path | None = None
jobs: int | None = None
@dataclass(frozen=True)
class SubConfig:
name: str
parameters: dict[str, str]
@dataclass(frozen=True)
class Statistics:
filename: str
databus_utilization: float
bandwidth: float
max_bandwidth: float
avg_latency: float
max_latency: float
@dataclass(frozen=True)
class Configuration:
name: str
tokens: dict[str, str | int]
@dataclass
class Simulation:
config: Configuration
directory: Optional[str] = None
statistics: Optional[list[Statistics]] = None
def run_dramsys(dramsys: Path, simulation_dir: Path, resource_dir: Path | None):
with open(f"{simulation_dir}/out.txt", "w", encoding="utf-8") as output_file:
command = [dramsys.absolute(), "config.json"]
if resource_dir is not None:
command.append(resource_dir)
subprocess.run(command, cwd=simulation_dir, stdout=output_file, check=True)
def calculate_simulation_metrics(simulation: Simulation):
simulation_dir = simulation.directory
stats: list[Statistics] = []
for file in os.listdir(simulation_dir):
if file.endswith(".tdb"):
connection = sqlite3.connect(f"{simulation_dir}/{file}")
try:
max_bandwidth = maximum_data_rate(connection) / 1000
avg_latency = average_response_latency_in_ns(connection)
max_latency = max_response_latency_in_ns(connection)
databus_utilization = memory_active_in_percent(connection) / 100
bandwidth = databus_utilization * max_bandwidth
stats.append(
Statistics(
file,
databus_utilization,
bandwidth,
max_bandwidth,
avg_latency,
max_latency,
)
)
except Exception as error:
print(
f"Warning: Could not calculate metrics for {simulation_dir}/{file}: {error}"
)
simulation.statistics = stats
# Replace placeholders with actual values
def replace_placeholders(config_json: str, tokens: dict) -> str:
for key, value in tokens.items():
placeholder = f"<{key}>"
config_json = config_json.replace(placeholder, str(value))
return config_json
@dataclass
class WorkItem:
dramsys: Path
simulation: Simulation
base_config: Path
resource_dir: Path | None
def simulate(
work_item: WorkItem,
):
simulation_dir = work_item.simulation.directory
json_config = None
with open(work_item.base_config, encoding="utf-8") as config_file:
config_string = config_file.read()
config_string = replace_placeholders(
config_string, work_item.simulation.config.tokens
)
json_config = json.loads(config_string)
simulation_json = simulation_dir / "config.json"
# Save config besides simulation directory
with open(simulation_json, "w", encoding="utf-8") as config_file:
json.dump(json_config, config_file, indent=4)
run_dramsys(work_item.dramsys, simulation_dir, work_item.resource_dir)
calculate_simulation_metrics(work_item.simulation)
def generate_dataframe(simulations: list[Simulation], out_dir: str) -> pd.DataFrame:
# Pack results in a panda dataframe
labels = ["name", "channel"]
statistic_labels = list(map(lambda field: field.name, fields(Statistics)))
# Get one simulation...
config_keys, _ = zip(*simulations[0].config.tokens.items())
labels.extend(config_keys)
labels.extend(statistic_labels)
entries = []
for simulation in simulations:
_, config_values = zip(*simulation.config.tokens.items())
for stat in simulation.statistics:
channel_pattern = re.compile("(?<=ch)[0-9]+")
channel = int(channel_pattern.search(stat.filename)[0])
entries.append(
[
simulation.config.name,
channel,
*config_values,
stat.filename,
stat.databus_utilization,
stat.bandwidth,
stat.max_bandwidth,
stat.avg_latency,
stat.max_latency,
]
)
dataframe = pd.DataFrame(data=entries, columns=labels)
dataframe.to_csv(f"{out_dir}/statistics.csv", sep=";")
return dataframe
def populate_simulation_directories(
simulations: list[Simulation], out_dir: str, override: bool
):
for simulation in simulations:
simulation_dir = Path(f"{out_dir}/simulations/{simulation.config.name}")
try:
simulation_dir.mkdir(parents=True, exist_ok=override)
except FileExistsError:
print(
"Previous simulations artifacts found. To continue, enable the force override flag."
)
sys.exit(-1)
simulation.directory = simulation_dir
def calculate_metrics(
simulations: list[Simulation], out_dir: str, jobs: int | None
) -> pd.DataFrame:
populate_simulation_directories(simulations, out_dir, override=True)
with ThreadPool(jobs) as thread_pool:
for _ in tqdm(
thread_pool.imap_unordered(calculate_simulation_metrics, simulations),
total=len(simulations),
):
pass
return generate_dataframe(simulations, out_dir)
def run_simulations(simulations: list[Simulation], options: Options) -> pd.DataFrame:
if len(simulations) == 0:
print("Must specify at least one simulation configuration!")
sys.exit(-1)
if options.base_config is None:
print("Must specify a base config")
sys.exit(-1)
print("Create simulation directories...")
populate_simulation_directories(simulations, options.out_dir, options.override)
print("Run simulations...")
with ThreadPool(options.jobs) as thread_pool:
args = list(
WorkItem(
options.dramsys, simulation, options.base_config, options.resource_dir
)
for simulation in simulations
)
for _ in tqdm(thread_pool.imap_unordered(simulate, args), total=len(args)):
pass
print("Calculate metrics...")
return calculate_metrics(simulations, options.out_dir, options.jobs)
def get_options_from_args() -> Options:
parser = argparse.ArgumentParser(description="DRAMSys simulation utility")
parser.add_argument("dramsys", type=Path, help="path to the DRAMSys executable")
parser.add_argument(
"--simulate",
default=False,
action="store_true",
help="run the simulations generating simulation artifacts",
)
parser.add_argument(
"--metrics",
default=False,
action="store_true",
help="calculate the metrics from existing simulation artifacts",
)
parser.add_argument(
"-f",
"--force",
default=False,
action="store_true",
help="force override existing simulation artifacts",
)
parser.add_argument(
"--out-dir",
type=Path,
default="out",
help="path to the output directory",
)
parser.add_argument(
"--base-config",
type=Path,
help="path to the base configuration file",
)
parser.add_argument(
"--resource-dir",
type=Path,
help="path to the resource directory",
)
parser.add_argument(
"-j",
"--jobs",
metavar="N",
type=int,
default=None,
help="run N jobs in parallel",
)
arguments = parser.parse_args()
return Options(
arguments.dramsys,
arguments.force,
arguments.out_dir,
arguments.simulate,
arguments.metrics,
arguments.base_config,
arguments.resource_dir,
arguments.jobs,
)
def simulation_results(
options: Options,
simulations: list[Simulation],
) -> pd.DataFrame:
if options.simulate:
return run_simulations(simulations, options)
if options.metrics:
return calculate_metrics(simulations, options.out_dir, options.jobs)
print("Summarizing simulation results in statistics.csv...")
statistics_file = f"{options.out_dir}/statistics.csv"
if not os.path.isfile(statistics_file):
print("Run the simulations first to generate simulation artifacts")
sys.exit(-1)
return pd.read_csv(f"{options.out_dir}/statistics.csv", sep=";")