Source code for pypop.extrae

#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause-Clear
# Copyright (c) 2019, The Numerical Algorithms Group, Ltd. All rights reserved.

"""\
Extrae Trace Utilities
----------------------

Routines for analysis and management of Extrae traces, including Paramedir automation.
"""

import re
import os
from os.path import basename, dirname, normpath, splitext
from tempfile import mkdtemp, mkstemp
import subprocess as sp

from pkg_resources import resource_filename

import pandas as pd
import numpy as np

from .prv import get_prv_header_info, _parse_paraver_headerline, zipopen

floatmatch = re.compile(r"[0-9,]+\.[0-9]+")
keymatch = re.compile(r"@.*@")
tabmatch = re.compile(r"\t+")

ROI_FILTER_XML = "filters/tracing_state.xml"
CUTTER_SINGLE_SKEL = "cutters/single_region.skel"


[docs]def remove_trace(tracefile, failure_is_error=False): """Remove prv or dim file and its associated .row and .pcf files The function should be passed a filename ending in .dim or .prv and will attempt to delete this and the pcf and row file. The filenames of these will be constructed by direct substitution of the file extension. Parameters ---------- tracefile: str Path to tracefile, should end with .dim or prv """ if not (tracefile.endswith(".prv") or tracefile.endswith(".dim")): raise ValueError("Expected a .prv or .dim file") try: os.remove(tracefile) except FileNotFoundError as err: if failure_is_error: raise err for ext in [".pcf", ".row"]: try: os.remove(splitext(tracefile)[0] + ext) except FileNotFoundError as err: if failure_is_error: raise err
[docs]def sort_traces_by_commsize(tracelist): """Take a list of traces and return them sorted in ascending order of their MPI commsize Parameters ---------- tracelist: iterable of str List of traces in arbitrary order Returns ------- commsizes: list of int Sorted list of comm sizes traces: list of str Sorted list of tracefiles """ tracelist = list(tracelist) nproc_list = [get_prv_header_info(x).application_layout.commsize for x in tracelist] return zip(*sorted(zip(nproc_list, tracelist)))
[docs]def chop_prv_to_roi(prv_file, outfile=None): """Cut down a prv trace to just the region of interest This will cut a trace to just include the region of interest, which is assumed to be bracketed by calls to `Extrae_Restart()` and `Extrae_Shutdown()` which are located by their corresponding events (40000012) in the trace. Parameters ---------- prv_file: str Trace file to be cut outfile: str or None Optional output file for chopped trace. (If not specified will be created in a temporary folder.) Returns ------- chopped: str Path to chopped tracefile in prv format. """ if outfile: workdir = dirname(normpath(outfile)) else: tgtname = ".chop".join(splitext(basename(prv_file))) workdir = mkdtemp() outfile = os.path.join(workdir, tgtname) roi_filter = resource_filename(__name__, ROI_FILTER_XML) roi_prv = os.path.join(workdir, ".roifilter".join(splitext(basename(prv_file)))) filter_cmds = [ "paramedir", "--filter", roi_filter, "--output-name", roi_prv, prv_file, ] result = sp.run(filter_cmds, stdout=sp.PIPE, stderr=sp.STDOUT) if result.returncode != 0 or not os.path.exists(roi_prv): raise RuntimeError( "Failed to filter ROI file:\n{}" "".format(result.stdout.decode()) ) starttime, endtime = _get_roi_times(roi_prv) remove_trace(roi_prv) wfh, cutter_xml = mkstemp(dir=workdir, suffix=".xml", text=True) os.close(wfh) cutter_skel = resource_filename(__name__, CUTTER_SINGLE_SKEL) with open(cutter_skel, "rt") as rfh, open(cutter_xml, "wt") as wfh: for line in rfh: line = line.replace("@MIN_TIME@", "{:d}".format(starttime)) line = line.replace("@MAX_TIME@", "{:d}".format(endtime)) wfh.write(line) wfh.close() cutter_cmds = [ "paramedir", "--cutter", prv_file, cutter_xml, "--output-name", outfile, ] result = sp.run(cutter_cmds, stdout=sp.PIPE, stderr=sp.PIPE) if result.returncode != 0 or not os.path.exists(outfile): raise RuntimeError( "Failed to cut prv file {}:\n{}" "".format(outfile, result.stdout.decode()) ) return outfile
def _get_roi_times(roi_prv): """ Extract ROi timing information from a filtered trace Expects a trace containing only Extrae On/Off events and returns tuple of earliest and latest time """ with zipopen(roi_prv, "rt") as fh: headerline = fh.readline() prvheader = _parse_paraver_headerline(headerline) # First want commsize commsize = prvheader.application_layout.commsize # skip over communicator definition lines line = fh.readline().strip() while line: # If we get to a non-communicator line we are done, but seek back if not line.startswith("c"): fh.seek(fh.tell() - len(line)) break line = fh.readline().strip() # Now skip over commsize shutdown events at beginning of trace: for i in range(commsize): line = fh.readline().strip() if not line.endswith("40000012:0"): raise ValueError( "Unexpected event or misordered events: " "{}\n".format(line) ) # Can now grab the start events starttime = None for i in range(commsize): line = fh.readline().strip() if not line.endswith("40000012:1"): raise ValueError( "Unexpected event or misordered events: " "{}\n".format(line) ) tmptime = int(line.split(":")[5]) starttime = max(starttime, tmptime) if starttime else tmptime # and the end events endtime = None for i in range(commsize): line = fh.readline().strip() if not line.endswith("40000012:0"): raise ValueError( "Unexpected event or misordered events :" "{}\n".format(line) ) tmptime = int(line.split(":")[5]) endtime = min(endtime, tmptime) if endtime else tmptime return (starttime, endtime - 1)
[docs]def paramedir_analyze( tracefile, config, variables=None, index_by_thread=False, statistic_names=None ): """Analyze a tracefile with paramedir Parameters ---------- tracefile: str Path to `*.prv` tracefile from Extrae config: str Path to Paraver/Paramedir `*.cfg` variables: dict or None Optional dict of key-value pairs for replacement in config file prior to running paramedir. index_by_thread: bool If True return data organised by a multilevel index of MPI ranks and threads. Note that this discards Paramedir calculated statistical info. statistic_names: list of str or None Optional list of string names for the statistics returned by the config file. If not provided names will be taken from paramedir output. Returns ------- result: pandas.DataFrame Result data loaded from the resulting csv. """ with open(config, "r") as fh: confstring = " ".join(fh) if "Analyzer2D.3D" in confstring: datatype = "Hist3D" elif "Analyzer2D" in confstring: datatype = "Hist2D" return _analyze_hist2D( tracefile, config, variables, index_by_thread, statistic_names ) else: datatype = "Raw counts" raise ValueError('Unsupported analysis type "{}"'.format(datatype))
def _analyze_hist2D(tracefile, config, variables, index_by_thread, stat_names): """Run config producing a 2D histogram and return result DataFrame """ histfile = run_paramedir(tracefile, config, variables=variables) data = load_paraver_histdata(histfile) os.remove(histfile) if stat_names: data.index = pd.Index(stat_names) if index_by_thread: return reindex_by_thread(data) return data
[docs]def reindex_by_thread(stats_dframe, thread_prefix="THREAD"): """Convert stats Dataframe index in-place to a rank,thread MultiIndex Parameters ---------- stats_dframe: pd.DataFrame Dataframe to reindex. Typically this will have been produced using paramedir_analyze(). thread_prefix: str Prefix before thread number pattern in current index. Should almost always be "THREAD". Paraver/Paramedir default is "THREAD a.r.t" with r the rank number and t the thread number. """ if not isinstance(stats_dframe, pd.DataFrame): raise TypeError("stats_dframe must be a Pandas DataFrame") oc_select = [c for c in stats_dframe.columns if c.startswith(thread_prefix)] newcols = pd.MultiIndex.from_tuples( [tuple(int(x) for x in y.split(".")[1:]) for y in oc_select] ) stats_dframe = stats_dframe[oc_select].set_axis( newcols, axis="columns", inplace=False ) stats_dframe.columns.rename(["rank", "thread"], inplace=True) return stats_dframe
[docs]def run_paramedir(tracefile, config, outfile=None, variables=None): """Run paramedir on a tracefile Parameters ---------- tracefile: str Path to `*.prv` tracefile from Extrae config: str Path to Paraver/Paramedir `*.cfg` outfile: str or None Path to output file. If None or "" a randomly named temporary file will be used. variables: dict of str:str Dict of variables to replace in the config file. For a key-value pair "key":val any occurrence of @key@ in the file will be replaced with "val" Returns ------- outfile: str Path to the output file. """ tmpdir = mkdtemp() # If variables is none, still sub with empty dict variables = variables if variables else {} tmp_config = _write_substituted_config(config, tmpdir, variables) if not outfile: outfile = os.path.join(tmpdir, os.path.splitext(os.path.basename(config))[0]) paramedir_params = ["paramedir", tracefile, tmp_config, outfile] result = sp.run(paramedir_params, stdout=sp.PIPE, stderr=sp.STDOUT) if not os.path.exists(outfile) or result.returncode != 0: raise RuntimeError( "Paramedir execution failed:\n{}" "".format(result.stdout.decode()) ) return outfile
def _write_substituted_config(config, tmpdir, variables): """Copy config to tempfile, substituting placeholders from variables dict """ newconfig = os.path.join(tmpdir, os.path.basename(config)) with open(newconfig, "w") as newfh, open(config, "r") as oldfh: for line in oldfh: newline = line for match in keymatch.findall(line): if match[1:-1] in variables: newline = newline.replace(match, variables[match[1:-1]]) else: raise ValueError("Unhandled key {} in {}" "".format(match, config)) newfh.write(newline) return newconfig def _split_binline(binline): """Internal function to read the line of bin specs Returns bin width and array of bin lower edges """ bin_strings = tabmatch.split(binline.strip()) # Grab the first float from each binspec, we'll return lower edges # Note that commas must be stripped from numbers... try: bins = np.fromiter( (floatmatch.findall(x)[0].replace(",", "") for x in bin_strings), dtype=np.float64, ) except IndexError: bins = np.asarray(bin_strings) return bins def _split_countline(countline, bins): """Internal function to read the line of counts Returns count name and array of counts """ num_values = len(bins) count_strings = tabmatch.split(countline.strip()) # Must be at least one more string than values to be the label if len(count_strings) <= num_values: raise ValueError("Malformed count line") extra_strings = len(count_strings) - num_values if extra_strings > 1: print( "Warning, got more label strings ({}) than expected (1)" "".format(extra_strings) ) # However many extra strings there are, join them to make the name count_name = " ".join(count_strings[0:extra_strings]) counts = np.asarray(count_strings[extra_strings:], dtype=np.float64) return (count_name, pd.Series(counts, index=bins))
[docs]def load_paraver_histdata(hist_file): """Read a Paraver histogram file and return pandas dataframe containing the data. Parameters ---------- hist_file : str Path to the histogram data file """ data_dict = {} with open(hist_file, "r") as fh: # First line should be bins bins = _split_binline(fh.readline()) # Now process count lines for count_line in fh: if count_line.strip(): name, data_dict[name] = _split_countline(count_line, bins) return pd.DataFrame.from_dict(data_dict)