Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 14 additions & 239 deletions benchmarks/scripts/cccl/bench/bench.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import itertools
import json
import os
import signal
import subprocess
import time

import fpzip
import numpy as np

from .bench_result import BenchResult, get_axis_name
from .cmake import CMake
from .config import BasePoint, Config
from .json_cache import device_json, json_benches
from .logger import Logger
from .process_runner import ProcessRunner
from .score import compute_axes_ids, compute_weight_matrices, get_workload_weight
from .storage import Storage, get_bench_table_name

Expand All @@ -29,45 +30,6 @@ def first_val(my_dict):
return first_value


class JsonCache:
_instance = None

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.bench_cache = {}
cls._instance.device_cache = {}
return cls._instance

def get_bench(self, algname):
if algname not in self.bench_cache:
result = subprocess.check_output(
[os.path.join(".", "bin", algname + ".base"), "--jsonlist-benches"]
)
self.bench_cache[algname] = json.loads(result)
return self.bench_cache[algname]

def get_device(self, algname):
if algname not in self.device_cache:
result = subprocess.check_output(
[os.path.join(".", "bin", algname + ".base"), "--jsonlist-devices"]
)
devices = json.loads(result)["devices"]

if len(devices) != 1:
raise Exception(
"NVBench doesn't work well with multiple GPUs, use `CUDA_VISIBLE_DEVICES`"
)

self.device_cache[algname] = devices[0]

return self.device_cache[algname]


def json_benches(algname):
return JsonCache().get_bench(algname)


def create_benches_tables(conn, subbench, bench_axes):
with conn:
conn.execute("""
Expand Down Expand Up @@ -117,163 +79,12 @@ def create_benches_tables(conn, subbench, bench_axes):
)


def read_json(filename):
with open(filename, "r") as f:
file_root = json.load(f)
return file_root


def extract_filename(summary):
summary_data = summary["data"]
value_data = next(filter(lambda v: v["name"] == "filename", summary_data))
assert value_data["type"] == "string"
return value_data["value"]


def extract_size(summary):
summary_data = summary["data"]
value_data = next(filter(lambda v: v["name"] == "size", summary_data))
assert value_data["type"] == "int64"
return int(value_data["value"])


def extract_bw(summary):
summary_data = summary["data"]
value_data = next(filter(lambda v: v["name"] == "value", summary_data))
assert value_data["type"] == "float64"
return float(value_data["value"])


def parse_samples_meta(state):
summaries = state["summaries"]
if not summaries:
return None, None

summary = next(
filter(lambda s: s["tag"] == "nv/json/bin:nv/cold/sample_times", summaries),
None,
)
if not summary:
return None, None

sample_filename = extract_filename(summary)
sample_count = extract_size(summary)
return sample_count, sample_filename


def parse_samples(state):
sample_count, samples_filename = parse_samples_meta(state)
if not sample_count or not samples_filename:
return np.array([], dtype=np.float32)

with open(samples_filename, "rb") as f:
samples = np.fromfile(f, "<f4")

samples.sort()

assert sample_count == len(samples)
return samples


def parse_bw(state):
bwutil = next(
filter(
lambda s: s["tag"] == "nv/cold/bw/global/utilization", state["summaries"]
),
None,
)
if not bwutil:
return None

return extract_bw(bwutil)


class SubBenchState:
def __init__(self, state, axes_names, axes_values):
self.samples = parse_samples(state)
self.bw = parse_bw(state)

self.point = {}
for axis in state["axis_values"]:
name = axes_names[axis["name"]]
value = axes_values[axis["name"]][axis["value"]]
self.point[name] = value

def __repr__(self):
return str(self.__dict__)

def name(self):
return " ".join(f"{k}={v}" for k, v in self.point.items())

def center(self, estimator):
return estimator(self.samples)


class SubBenchResult:
def __init__(self, bench):
axes_names = {}
axes_values = {}
for axis in bench["axes"]:
short_name = axis["name"]
full_name = get_axis_name(axis)
axes_names[short_name] = full_name
axes_values[short_name] = {}
for value in axis["values"]:
if "value" in value:
axes_values[axis["name"]][str(value["value"])] = value[
"input_string"
]
else:
axes_values[axis["name"]][value["input_string"]] = value[
"input_string"
]

self.states = []
for state in bench["states"]:
if not state["is_skipped"]:
self.states.append(SubBenchState(state, axes_names, axes_values))

def __repr__(self):
return str(self.__dict__)

def centers(self, estimator):
result = {}
for state in self.states:
result[state.name()] = state.center(estimator)
return result


class BenchResult:
def __init__(self, json_path, code, elapsed):
self.code = code
self.elapsed = elapsed

if json_path:
self.subbenches = {}
if code == 0:
for bench in read_json(json_path)["benchmarks"]:
self.subbenches[bench["name"]] = SubBenchResult(bench)

def __repr__(self):
return str(self.__dict__)

def centers(self, estimator):
result = {}
for subbench in self.subbenches:
result[subbench] = self.subbenches[subbench].centers(estimator)
return result


def device_json(algname):
return JsonCache().get_device(algname)


def get_device_name(device):
gpu_name = device["name"]
bus_width = device["global_memory_bus_width"]
sms = device["number_of_sms"]
ecc = "eccon" if device["ecc_state"] else "eccoff"
name = "{} ({}, {}, {})".format(gpu_name, bus_width, sms, ecc)
name = f"{gpu_name} ({bus_width}, {sms}, {ecc})"
return name.replace("NVIDIA ", "")


Expand All @@ -287,7 +98,7 @@ def state_to_rt_workload(bench, state):
name, value = param.split("=")
if is_ct_axis(name):
continue
rt_workload.append("{}={}".format(name, value))
rt_workload.append(f"{name}={value}")
return rt_workload


Expand Down Expand Up @@ -352,7 +163,7 @@ def __new__(cls, *args, **kwargs):

return cls._instance

def create_table_if_not_exists(self, conn, bench):
def ensure_table_exists(self, conn, bench):
bench_base = bench.get_base()
alg_name = bench_base.algorithm_name()

Expand All @@ -368,10 +179,10 @@ def push_bench_centers(self, bench, result, estimator):
config = Config()
ctk = config.ctk
cccl = config.cccl
gpu = get_device_name(device_json(bench.algname))
gpu = get_device_name(device_json(bench.algorithm_name()))
conn = Storage().connection()

self.create_table_if_not_exists(conn, bench)
self.ensure_table_exists(conn, bench)

centers = {}
with conn:
Expand All @@ -385,7 +196,7 @@ def push_bench_centers(self, bench, result, estimator):

for name in state.point:
value = state.point[name]
columns = columns + ', "{}"'.format(name)
columns = columns + f', "{name}"'
placeholders = placeholders + ", ?"
values.append(value)

Expand Down Expand Up @@ -421,7 +232,7 @@ def pull_bench_centers(self, bench, ct_workload_point, rt_values):
gpu = get_device_name(device_json(bench.algname))
conn = Storage().connection()

self.create_table_if_not_exists(conn, bench)
self.ensure_table_exists(conn, bench)

centers = {}

Expand Down Expand Up @@ -457,13 +268,6 @@ def pull_bench_centers(self, bench, ct_workload_point, rt_values):
return centers


def get_axis_name(axis):
name = axis["name"]
if axis["flags"]:
name = name + "[{}]".format(axis["flags"])
return name


def speedup(base, variant):
# If one of the runs failed, dict is empty
if not base or not variant:
Expand Down Expand Up @@ -496,37 +300,6 @@ def values_to_space(axes):
return list(itertools.product(*result))


class ProcessRunner:
_instance = None

def __new__(cls, *args, **kwargs):
if not isinstance(cls._instance, cls):
cls._instance = super(ProcessRunner, cls).__new__(cls, *args, **kwargs)
return cls._instance

def __init__(self):
self.process = None
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)

def new_process(self, cmd):
self.process = subprocess.Popen(
cmd,
start_new_session=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return self.process

def signal_handler(self, signum, frame):
self.kill_process()
raise SystemExit("search was interrupted")

def kill_process(self):
if self.process is not None:
self.process.kill()


class Bench:
def __init__(self, algorithm_name, variant, ct_workload):
self.algname = algorithm_name
Expand Down Expand Up @@ -603,9 +376,11 @@ def ct_axes_value_descriptions(self):
continue
if axis["flags"]:
name = name + "[{}]".format(axis["flags"])
descriptions[name] = {}
axis_descr = {}
for value in axis["values"]:
descriptions[name][value["input_string"]] = value["description"]
k, v = value["input_string"], value["description"]
axis_descr[k] = v
descriptions[name] = axis_descr

subbench_descriptions[bench["name"]] = descriptions
return first_val(subbench_descriptions)
Expand Down
Loading
Loading