#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause-Clear
# Copyright (c) 2019, The Numerical Algorithms Group, Ltd. All rights reserved.
"""\
Extrae Trace Utilities
----------------------
Routines for analysis and management of Extrae traces, including Paramedir automation.
"""
import re
import os
from os.path import basename, dirname, normpath, splitext
from tempfile import mkdtemp, mkstemp
import subprocess as sp
from pkg_resources import resource_filename
import pandas as pd
import numpy as np
from .prv import get_prv_header_info, _parse_paraver_headerline, zipopen
floatmatch = re.compile(r"[0-9,]+\.[0-9]+")
keymatch = re.compile(r"@.*@")
tabmatch = re.compile(r"\t+")
ROI_FILTER_XML = "filters/tracing_state.xml"
CUTTER_SINGLE_SKEL = "cutters/single_region.skel"
[docs]def remove_trace(tracefile, failure_is_error=False):
"""Remove prv or dim file and its associated .row and .pcf files
The function should be passed a filename ending in .dim or .prv and will
attempt to delete this and the pcf and row file. The filenames of these
will be constructed by direct substitution of the file extension.
Parameters
----------
tracefile: str
Path to tracefile, should end with .dim or prv
"""
if not (tracefile.endswith(".prv") or tracefile.endswith(".dim")):
raise ValueError("Expected a .prv or .dim file")
try:
os.remove(tracefile)
except FileNotFoundError as err:
if failure_is_error:
raise err
for ext in [".pcf", ".row"]:
try:
os.remove(splitext(tracefile)[0] + ext)
except FileNotFoundError as err:
if failure_is_error:
raise err
[docs]def sort_traces_by_commsize(tracelist):
"""Take a list of traces and return them sorted in ascending order of
their MPI commsize
Parameters
----------
tracelist: iterable of str
List of traces in arbitrary order
Returns
-------
commsizes: list of int
Sorted list of comm sizes
traces: list of str
Sorted list of tracefiles
"""
tracelist = list(tracelist)
nproc_list = [get_prv_header_info(x).application_layout.commsize for x in tracelist]
return zip(*sorted(zip(nproc_list, tracelist)))
[docs]def chop_prv_to_roi(prv_file, outfile=None):
"""Cut down a prv trace to just the region of interest
This will cut a trace to just include the region of interest, which is
assumed to be bracketed by calls to `Extrae_Restart()` and
`Extrae_Shutdown()` which are located by their corresponding events
(40000012) in the trace.
Parameters
----------
prv_file: str
Trace file to be cut
outfile: str or None
Optional output file for chopped trace. (If not specified will be
created in a temporary folder.)
Returns
-------
chopped: str
Path to chopped tracefile in prv format.
"""
if outfile:
workdir = dirname(normpath(outfile))
else:
tgtname = ".chop".join(splitext(basename(prv_file)))
workdir = mkdtemp()
outfile = os.path.join(workdir, tgtname)
roi_filter = resource_filename(__name__, ROI_FILTER_XML)
roi_prv = os.path.join(workdir, ".roifilter".join(splitext(basename(prv_file))))
filter_cmds = [
"paramedir",
"--filter",
roi_filter,
"--output-name",
roi_prv,
prv_file,
]
result = sp.run(filter_cmds, stdout=sp.PIPE, stderr=sp.STDOUT)
if result.returncode != 0 or not os.path.exists(roi_prv):
raise RuntimeError(
"Failed to filter ROI file:\n{}" "".format(result.stdout.decode())
)
starttime, endtime = _get_roi_times(roi_prv)
remove_trace(roi_prv)
wfh, cutter_xml = mkstemp(dir=workdir, suffix=".xml", text=True)
os.close(wfh)
cutter_skel = resource_filename(__name__, CUTTER_SINGLE_SKEL)
with open(cutter_skel, "rt") as rfh, open(cutter_xml, "wt") as wfh:
for line in rfh:
line = line.replace("@MIN_TIME@", "{:d}".format(starttime))
line = line.replace("@MAX_TIME@", "{:d}".format(endtime))
wfh.write(line)
wfh.close()
cutter_cmds = [
"paramedir",
"--cutter",
prv_file,
cutter_xml,
"--output-name",
outfile,
]
result = sp.run(cutter_cmds, stdout=sp.PIPE, stderr=sp.PIPE)
if result.returncode != 0 or not os.path.exists(outfile):
raise RuntimeError(
"Failed to cut prv file {}:\n{}" "".format(outfile, result.stdout.decode())
)
return outfile
def _get_roi_times(roi_prv):
""" Extract ROi timing information from a filtered trace
Expects a trace containing only Extrae On/Off events and returns tuple of
earliest and latest time
"""
with zipopen(roi_prv, "rt") as fh:
headerline = fh.readline()
prvheader = _parse_paraver_headerline(headerline)
# First want commsize
commsize = prvheader.application_layout.commsize
# skip over communicator definition lines
line = fh.readline().strip()
while line:
# If we get to a non-communicator line we are done, but seek back
if not line.startswith("c"):
fh.seek(fh.tell() - len(line))
break
line = fh.readline().strip()
# Now skip over commsize shutdown events at beginning of trace:
for i in range(commsize):
line = fh.readline().strip()
if not line.endswith("40000012:0"):
raise ValueError(
"Unexpected event or misordered events: " "{}\n".format(line)
)
# Can now grab the start events
starttime = None
for i in range(commsize):
line = fh.readline().strip()
if not line.endswith("40000012:1"):
raise ValueError(
"Unexpected event or misordered events: " "{}\n".format(line)
)
tmptime = int(line.split(":")[5])
starttime = max(starttime, tmptime) if starttime else tmptime
# and the end events
endtime = None
for i in range(commsize):
line = fh.readline().strip()
if not line.endswith("40000012:0"):
raise ValueError(
"Unexpected event or misordered events :" "{}\n".format(line)
)
tmptime = int(line.split(":")[5])
endtime = min(endtime, tmptime) if endtime else tmptime
return (starttime, endtime - 1)
[docs]def paramedir_analyze(
tracefile, config, variables=None, index_by_thread=False, statistic_names=None
):
"""Analyze a tracefile with paramedir
Parameters
----------
tracefile: str
Path to `*.prv` tracefile from Extrae
config: str
Path to Paraver/Paramedir `*.cfg`
variables: dict or None
Optional dict of key-value pairs for replacement in config file prior
to running paramedir.
index_by_thread: bool
If True return data organised by a multilevel index of MPI ranks and
threads. Note that this discards Paramedir calculated statistical
info.
statistic_names: list of str or None
Optional list of string names for the statistics returned by the config
file. If not provided names will be taken from paramedir output.
Returns
-------
result: pandas.DataFrame
Result data loaded from the resulting csv.
"""
with open(config, "r") as fh:
confstring = " ".join(fh)
if "Analyzer2D.3D" in confstring:
datatype = "Hist3D"
elif "Analyzer2D" in confstring:
datatype = "Hist2D"
return _analyze_hist2D(
tracefile, config, variables, index_by_thread, statistic_names
)
else:
datatype = "Raw counts"
raise ValueError('Unsupported analysis type "{}"'.format(datatype))
def _analyze_hist2D(tracefile, config, variables, index_by_thread, stat_names):
"""Run config producing a 2D histogram and return result DataFrame
"""
histfile = run_paramedir(tracefile, config, variables=variables)
data = load_paraver_histdata(histfile)
os.remove(histfile)
if stat_names:
data.index = pd.Index(stat_names)
if index_by_thread:
return reindex_by_thread(data)
return data
[docs]def reindex_by_thread(stats_dframe, thread_prefix="THREAD"):
"""Convert stats Dataframe index in-place to a rank,thread MultiIndex
Parameters
----------
stats_dframe: pd.DataFrame
Dataframe to reindex. Typically this will have been produced using
paramedir_analyze().
thread_prefix: str
Prefix before thread number pattern in current index. Should almost
always be "THREAD". Paraver/Paramedir default is "THREAD a.r.t" with r
the rank number and t the thread number.
"""
if not isinstance(stats_dframe, pd.DataFrame):
raise TypeError("stats_dframe must be a Pandas DataFrame")
oc_select = [c for c in stats_dframe.columns if c.startswith(thread_prefix)]
newcols = pd.MultiIndex.from_tuples(
[tuple(int(x) for x in y.split(".")[1:]) for y in oc_select]
)
stats_dframe = stats_dframe[oc_select].set_axis(
newcols, axis="columns", inplace=False
)
stats_dframe.columns.rename(["rank", "thread"], inplace=True)
return stats_dframe
[docs]def run_paramedir(tracefile, config, outfile=None, variables=None):
"""Run paramedir on a tracefile
Parameters
----------
tracefile: str
Path to `*.prv` tracefile from Extrae
config: str
Path to Paraver/Paramedir `*.cfg`
outfile: str or None
Path to output file. If None or "" a randomly named temporary file will
be used.
variables: dict of str:str
Dict of variables to replace in the config file. For a key-value pair
"key":val any occurrence of @key@ in the file will be replaced with
"val"
Returns
-------
outfile: str
Path to the output file.
"""
tmpdir = mkdtemp()
# If variables is none, still sub with empty dict
variables = variables if variables else {}
tmp_config = _write_substituted_config(config, tmpdir, variables)
if not outfile:
outfile = os.path.join(tmpdir, os.path.splitext(os.path.basename(config))[0])
paramedir_params = ["paramedir", tracefile, tmp_config, outfile]
result = sp.run(paramedir_params, stdout=sp.PIPE, stderr=sp.STDOUT)
if not os.path.exists(outfile) or result.returncode != 0:
raise RuntimeError(
"Paramedir execution failed:\n{}" "".format(result.stdout.decode())
)
return outfile
def _write_substituted_config(config, tmpdir, variables):
"""Copy config to tempfile, substituting placeholders from variables dict
"""
newconfig = os.path.join(tmpdir, os.path.basename(config))
with open(newconfig, "w") as newfh, open(config, "r") as oldfh:
for line in oldfh:
newline = line
for match in keymatch.findall(line):
if match[1:-1] in variables:
newline = newline.replace(match, variables[match[1:-1]])
else:
raise ValueError("Unhandled key {} in {}" "".format(match, config))
newfh.write(newline)
return newconfig
def _split_binline(binline):
"""Internal function to read the line of bin specs
Returns bin width and array of bin lower edges
"""
bin_strings = tabmatch.split(binline.strip())
# Grab the first float from each binspec, we'll return lower edges
# Note that commas must be stripped from numbers...
try:
bins = np.fromiter(
(floatmatch.findall(x)[0].replace(",", "") for x in bin_strings),
dtype=np.float64,
)
except IndexError:
bins = np.asarray(bin_strings)
return bins
def _split_countline(countline, bins):
"""Internal function to read the line of counts
Returns count name and array of counts
"""
num_values = len(bins)
count_strings = tabmatch.split(countline.strip())
# Must be at least one more string than values to be the label
if len(count_strings) <= num_values:
raise ValueError("Malformed count line")
extra_strings = len(count_strings) - num_values
if extra_strings > 1:
print(
"Warning, got more label strings ({}) than expected (1)"
"".format(extra_strings)
)
# However many extra strings there are, join them to make the name
count_name = " ".join(count_strings[0:extra_strings])
counts = np.asarray(count_strings[extra_strings:], dtype=np.float64)
return (count_name, pd.Series(counts, index=bins))
[docs]def load_paraver_histdata(hist_file):
"""Read a Paraver histogram file and return pandas dataframe containing the
data.
Parameters
----------
hist_file : str
Path to the histogram data file
"""
data_dict = {}
with open(hist_file, "r") as fh:
# First line should be bins
bins = _split_binline(fh.readline())
# Now process count lines
for count_line in fh:
if count_line.strip():
name, data_dict[name] = _split_countline(count_line, bins)
return pd.DataFrame.from_dict(data_dict)