ftmemsim-valgrind/cachegrind/cg_annotate.in
Nicholas Nethercote 8765b3358f Overhaul cg_annotate output.
Most notable, the "Function summary" section, which printed one CC for each
`file:function` combination, has been replaced by two sections, "File:function
summary" and "Function:file summary".

These new sections both feature "deep CCs", which have an "outer CC" for the
file (or function), and one or more "inner CCs" for the paired functions (or
files).

Here is a file:function example, which helps show which files have a lot of
events, even if those events are spread across a lot of functions.
```
> 12,427,830 (5.4%, 26.3%)  /home/njn/moz/gecko-dev/js/src/ds/LifoAlloc.h:
   6,107,862 (2.7%)           js::frontend::ParseNodeVerifier::visit(js::frontend::ParseNode*)
   3,685,203 (1.6%)           js::detail::BumpChunk::setBump(unsigned char*)
   1,640,591 (0.7%)           js::LifoAlloc::alloc(unsigned long)
     711,008 (0.3%)           js::detail::BumpChunk::assertInvariants()
```
And here is a function:file example, which shows how heavy inlining can result
in a machine code function being derived from source code from multiple files:
```
>  1,343,736 (0.6%, 35.6%)  js::gc::TenuredCell::isMarkedGray() const:
     651,108 (0.3%)           /home/njn/moz/gecko-dev/js/src/d64/dist/include/js/HeapAPI.h
     292,672 (0.1%)           /home/njn/moz/gecko-dev/js/src/gc/Cell.h
     254,854 (0.1%)           /home/njn/moz/gecko-dev/js/src/gc/Heap.h
```
Previously these patterns were very hard to find, and it was easy to overlook a
hot piece of code because its counts were spread across multiple non-adjacent
entries. I have already found these changes very useful for profiling Rust
code.

Also, cumulative percentages on the outer CCs (e.g. the 26.3% and 35.6% in the
example) tell you what fraction of all events are covered by the entries so
far, something I've wanted for a long time.

Some other, related changes:
- Column event headers are now padded with `_`, e.g. `Ir__________`. This makes
  the column/event mapping clearer.
- The "Cachegrind profile" section is now called "Metadata", which is
  shorter and clearer.
- A few minor test tweaks, beyond those required for the output changes.
- I converted some doc comments to normal comments. Not standard Python, but
  nicer to read, and there are no public APIs here.
- Roughly 2x speedups to `cg_annotate` and smaller improvements for `cg_diff`
  and `cg_merge`, due to the following.
  - Change the `Cc` class to a type alias for `list[int]`, to avoid the class
    overhead (sigh).
  - Process event count lines in a single split, instead of a regex
    match + split.
  - Add the `add_cc_to_ccs` function, which does multiple CC additions in a
    single function call.
  - Better handling of dicts while reading input, minimizing lookups.
  - Pre-computing the missing CC string for each CcPrinter, instead of
    regenerating it each time.
2023-04-11 09:58:43 +10:00

1029 lines
34 KiB
Python
Executable File

#! /usr/bin/env python3
# pyright: strict
# --------------------------------------------------------------------
# --- Cachegrind's annotator. cg_annotate.in ---
# --------------------------------------------------------------------
# This file is part of Cachegrind, a Valgrind tool for cache
# profiling programs.
#
# Copyright (C) 2002-2023 Nicholas Nethercote
# njn@valgrind.org
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
#
# The GNU General Public License is contained in the file COPYING.
# This script reads Cachegrind output files and produces human-readable output.
#
# Use `make pyann` to "build" this script with `auxprogs/pybuild.rs` every time
# it is changed. This runs the formatters, type-checkers, and linters on
# `cg_annotate.in` and then generates `cg_annotate`.
from __future__ import annotations
import os
import re
import sys
from argparse import ArgumentParser, BooleanOptionalAction, Namespace
from collections import defaultdict
from typing import DefaultDict, NoReturn, TextIO
# A typed wrapper for parsed args.
class Args(Namespace):
# None of these fields are modified after arg parsing finishes.
show: list[str]
sort: list[str]
threshold: float # a percentage
show_percs: bool
annotate: bool
context: int
include: list[str]
cgout_filename: list[str]
@staticmethod
def parse() -> Args:
def comma_separated_list(values: str) -> list[str]:
return values.split(",")
def threshold(n: str) -> float:
f = float(n)
if 0 <= f <= 20:
return f
raise ValueError
# Add a bool argument that defaults to true.
#
# Supports these forms: `--foo`, `--no-foo`, `--foo=yes`, `--foo=no`.
# The latter two were the forms supported by the old Perl version of
# `cg_annotate`, and are now deprecated.
def add_bool_argument(
p: ArgumentParser, new_name: str, old_name: str, help_: str
) -> None:
new_flag = "--" + new_name
old_flag = "--" + old_name
dest = new_name.replace("-", "_")
# Note: the default value is always printed with `BooleanOptionalAction`,
# due to an argparse bug: https://github.com/python/cpython/issues/83137.
p.add_argument(
new_flag,
default=True,
action=BooleanOptionalAction,
help=help_,
)
p.add_argument(
f"{old_flag}=yes",
dest=dest,
action="store_true",
help=f"(deprecated) same as --{new_name}",
)
p.add_argument(
f"{old_flag}=no",
dest=dest,
action="store_false",
help=f"(deprecated) same as --no-{new_name}",
)
p = ArgumentParser(description="Process a Cachegrind output file.")
p.add_argument("--version", action="version", version="%(prog)s-@VERSION@")
p.add_argument(
"--show",
type=comma_separated_list,
metavar="A,B,C",
help="only show figures for events A,B,C (default: all events)",
)
p.add_argument(
"--sort",
type=comma_separated_list,
metavar="A,B,C",
help="sort functions by events A,B,C (default: event column order)",
)
p.add_argument(
"--threshold",
type=threshold,
default=0.1,
metavar="N:[0,20]",
help="only show file:function/function:file pairs with more than "
"N%% of primary sort event counts (default: %(default)s)",
)
add_bool_argument(
p,
"show-percs",
"show-percs",
"show a percentage for each non-zero count",
)
add_bool_argument(
p,
"annotate",
"auto",
"annotate all source files containing functions that reached the "
"event count threshold",
)
p.add_argument(
"--context",
type=int,
default=8,
metavar="N",
help="print N lines of context before and after annotated lines "
"(default: %(default)s)",
)
p.add_argument(
"-I",
"--include",
action="append",
default=[],
metavar="D",
help="add D to the list of searched source file directories",
)
p.add_argument(
"cgout_filename",
nargs=1,
metavar="cachegrind-out-file",
help="file produced by Cachegrind",
)
return p.parse_args(namespace=Args())
# Args are stored in a global for easy access.
args = Args.parse()
# A single instance of this class is constructed, from `args` and the `events:`
# line in the cgout file.
class Events:
# The event names.
events: list[str]
# Equal to `len(self.events)`.
num_events: int
# The order in which we must traverse events for --show. Can be shorter
# than `events`.
show_events: list[str]
# Like `show_events`, but indices into `events`, rather than names.
show_indices: list[int]
# The order in which we must traverse events for --sort. Can be shorter
# than `events`.
sort_events: list[str]
# Like `sort_events`, but indices into `events`, rather than names.
sort_indices: list[int]
def __init__(self, text: str) -> None:
self.events = text.split()
self.num_events = len(self.events)
# A temporary dict mapping events to indices, [0, n-1].
event_indices = {event: n for n, event in enumerate(self.events)}
# If --show is given, check it is valid. If --show is not given,
# default to all events in the standard order.
if args.show:
for event in args.show:
if event not in event_indices:
die(f"--show event `{event}` did not appear in `events:` line")
self.show_events = args.show
else:
self.show_events = self.events
self.show_indices = [event_indices[event] for event in self.show_events]
# Likewise for --sort.
if args.sort:
for event in args.sort:
if event not in event_indices:
die(f"--sort event `{event}` did not appear in `events:` line")
self.sort_events = args.sort
else:
self.sort_events = self.events
self.sort_indices = [event_indices[event] for event in self.sort_events]
# Raises a `ValueError` exception on syntax error.
def mk_cc(self, str_counts: list[str]) -> Cc:
# This is slightly faster than a list comprehension.
counts = list(map(int, str_counts))
if len(counts) == self.num_events:
pass
elif len(counts) < self.num_events:
# Add zeroes at the end for any missing numbers.
counts.extend([0] * (self.num_events - len(counts)))
else:
raise ValueError
return counts
def mk_empty_cc(self) -> Cc:
# This is much faster than a list comprehension.
return [0] * self.num_events
def mk_empty_dcc(self) -> Dcc:
return Dcc(self.mk_empty_cc(), defaultdict(self.mk_empty_cc))
# A "cost centre", which is a dumb container for counts. Always the same length
# as `Events.events`, but it doesn't even know event names. `Events.mk_cc` and
# `Events.mk_empty_cc` are used for construction.
#
# This used to be a class with a single field `counts: list[int]`, but this
# type is very hot and just using a type alias is much faster.
Cc = list[int]
# Add the counts in `a_cc` to `b_cc`.
def add_cc_to_cc(a_cc: Cc, b_cc: Cc) -> None:
for i, a_count in enumerate(a_cc):
b_cc[i] += a_count
# Unrolled version of `add_cc_to_cc`, for speed.
def add_cc_to_ccs(
a_cc: Cc, b_cc1: Cc, b_cc2: Cc, b_cc3: Cc, b_cc4: Cc, b_cc5: Cc
) -> None:
for i, a_count in enumerate(a_cc):
b_cc1[i] += a_count
b_cc2[i] += a_count
b_cc3[i] += a_count
b_cc4[i] += a_count
b_cc5[i] += a_count
# Update `min_cc` and `max_cc` with `self`.
def update_cc_extremes(self: Cc, min_cc: Cc, max_cc: Cc) -> None:
for i, count in enumerate(self):
if count > max_cc[i]:
max_cc[i] = count
elif count < min_cc[i]:
min_cc[i] = count
# A deep cost centre with a dict for the inner names and CCs.
class Dcc:
outer_cc: Cc
inner_dict_name_cc: DictNameCc
def __init__(self, outer_cc: Cc, inner_dict_name_cc: DictNameCc) -> None:
self.outer_cc = outer_cc
self.inner_dict_name_cc = inner_dict_name_cc
# A deep cost centre with a list for the inner names and CCs. Used during
# filtering and sorting.
class Lcc:
outer_cc: Cc
inner_list_name_cc: ListNameCc
def __init__(self, outer_cc: Cc, inner_list_name_cc: ListNameCc) -> None:
self.outer_cc = outer_cc
self.inner_list_name_cc = inner_list_name_cc
# Per-file/function CCs. The list version is used during filtering and sorting.
DictNameCc = DefaultDict[str, Cc]
ListNameCc = list[tuple[str, Cc]]
# Per-file/function DCCs. The outer names are filenames and the inner names are
# function names, or vice versa. The list version is used during filtering and
# sorting.
DictNameDcc = DefaultDict[str, Dcc]
ListNameLcc = list[tuple[str, Lcc]]
# Per-line CCs, organised by filename and line number.
DictLineCc = DefaultDict[int, Cc]
DictFlDictLineCc = DefaultDict[str, DictLineCc]
def die(msg: str) -> NoReturn:
print("cg_annotate: error:", msg, file=sys.stderr)
sys.exit(1)
def read_cgout_file() -> tuple[
str,
str,
Events,
DictNameDcc,
DictNameDcc,
DictFlDictLineCc,
Cc,
]:
# The file format is described in Cachegrind's manual.
try:
cgout_file = open(args.cgout_filename[0], "r", encoding="utf-8")
except OSError as err:
die(f"{err}")
with cgout_file:
cgout_line_num = 0
def parse_die(msg: str) -> NoReturn:
die(f"{cgout_file.name}:{cgout_line_num}: {msg}")
def readline() -> str:
nonlocal cgout_line_num
cgout_line_num += 1
return cgout_file.readline()
# Read "desc:" lines.
desc = ""
while line := readline():
if m := re.match(r"desc:\s+(.*)", line):
desc += m.group(1) + "\n"
else:
break
# Read "cmd:" line. (`line` is already set from the "desc:" loop.)
if m := re.match(r"cmd:\s+(.*)", line):
cmd = m.group(1)
else:
parse_die("missing a `command:` line")
# Read "events:" line.
line = readline()
if m := re.match(r"events:\s+(.*)", line):
events = Events(m.group(1))
else:
parse_die("missing an `events:` line")
def mk_empty_dict_line_cc() -> DictLineCc:
return defaultdict(events.mk_empty_cc)
# The current filename and function name.
fl = ""
fn = ""
# Different places where we accumulate CC data.
dict_fl_dcc: DictNameDcc = defaultdict(events.mk_empty_dcc)
dict_fn_dcc: DictNameDcc = defaultdict(events.mk_empty_dcc)
dict_fl_dict_line_cc: DictFlDictLineCc = defaultdict(mk_empty_dict_line_cc)
summary_cc = None
# These are refs into the dicts above, used to avoid repeated lookups.
# They are all overwritten before first use.
fl_dcc = events.mk_empty_dcc()
fn_dcc = events.mk_empty_dcc()
fl_dcc_inner_fn_cc = events.mk_empty_cc()
fn_dcc_inner_fl_cc = events.mk_empty_cc()
dict_line_cc = mk_empty_dict_line_cc()
# Line matching is done in order of pattern frequency, for speed.
while line := readline():
if line[0].isdigit():
split_line = line.split()
try:
line_num = int(split_line[0])
cc = events.mk_cc(split_line[1:])
except ValueError:
parse_die("malformed or too many event counts")
# Record this CC at the file:function level, the function:file
# level, and the file/line level.
add_cc_to_ccs(
cc,
fl_dcc.outer_cc,
fn_dcc.outer_cc,
fl_dcc_inner_fn_cc,
fn_dcc_inner_fl_cc,
dict_line_cc[line_num],
)
elif line.startswith("fn="):
fn = line[3:-1]
# `fl_dcc` is unchanged.
fn_dcc = dict_fn_dcc[fn]
fl_dcc_inner_fn_cc = fl_dcc.inner_dict_name_cc[fn]
fn_dcc_inner_fl_cc = fn_dcc.inner_dict_name_cc[fl]
elif line.startswith("fl="):
fl = line[3:-1]
# A `fn=` line should follow, overwriting the function name.
fn = "<unspecified>"
fl_dcc = dict_fl_dcc[fl]
fn_dcc = dict_fn_dcc[fn]
fl_dcc_inner_fn_cc = fl_dcc.inner_dict_name_cc[fn]
fn_dcc_inner_fl_cc = fn_dcc.inner_dict_name_cc[fl]
dict_line_cc = dict_fl_dict_line_cc[fl]
elif m := re.match(r"summary:\s+(.*)", line):
try:
summary_cc = events.mk_cc(m.group(1).split())
except ValueError:
parse_die("malformed or too many event counts")
elif line == "\n" or line.startswith("#"):
# Skip empty lines and comment lines.
pass
else:
parse_die(f"malformed line: {line[:-1]}")
# Check if summary line was present.
if not summary_cc:
parse_die("missing `summary:` line, aborting")
# Check summary is correct. (Only using the outer CCs.)
total_cc = events.mk_empty_cc()
for dcc in dict_fl_dcc.values():
add_cc_to_cc(dcc.outer_cc, total_cc)
if summary_cc != total_cc:
msg = (
"`summary:` line doesn't match computed total\n"
f"- summary: {summary_cc}\n"
f"- total: {total_cc}"
)
parse_die(msg)
return (
desc,
cmd,
events,
dict_fl_dcc,
dict_fn_dcc,
dict_fl_dict_line_cc,
summary_cc,
)
# The width of a column, in three parts.
class Width:
# Width of the widest commified event count.
count: int
# Width of the widest first percentage, of the form ` (n.n%)` or ` (n.n%,`.
perc1: int
# Width of the widest second percentage, of the form ` n.n%)`.
perc2: int
def __init__(self, count: int, perc1: int, perc2: int) -> None:
self.count = count
self.perc1 = perc1
self.perc2 = perc2
class CcPrinter:
# Note: every `CcPrinter` gets the same `Events` object.
events: Events
# Note: every `CcPrinter` gets the same summary CC.
summary_cc: Cc
# String to print before the event names.
events_prefix: str
# The widths of each event column. For simplicity, its length matches
# `events.events`, even though not all events are necessarily shown.
widths: list[Width]
# Text of a missing CC, which can be computed in advance.
missing_cc_str: str
# Must call `init_ccs` or `init_list_name_lcc` after this.
def __init__(self, events: Events, summary_cc: Cc) -> None:
self.events = events
self.summary_cc = summary_cc
# Other fields initialized in `init_*`.
def init_ccs(self, ccs: list[Cc]) -> None:
self.events_prefix = ""
# Find min and max count for each event. One of them will be the widest
# value.
min_cc = self.events.mk_empty_cc()
max_cc = self.events.mk_empty_cc()
for cc in ccs:
update_cc_extremes(cc, min_cc, max_cc)
self.init_widths(min_cc, max_cc, None, None)
def init_list_name_lcc(self, list_name_lcc: ListNameLcc) -> None:
self.events_prefix = " "
cumul_cc = self.events.mk_empty_cc()
# Find min and max value for each event. One of them will be the widest
# value. Likewise for the cumulative counts.
min_cc = self.events.mk_empty_cc()
max_cc = self.events.mk_empty_cc()
min_cumul_cc = self.events.mk_empty_cc()
max_cumul_cc = self.events.mk_empty_cc()
for _, lcc in list_name_lcc:
# Consider both outer and inner CCs for `count` and `perc1`.
update_cc_extremes(lcc.outer_cc, min_cc, max_cc)
for _, inner_cc in lcc.inner_list_name_cc:
update_cc_extremes(inner_cc, min_cc, max_cc)
# Consider only outer CCs for `perc2`.
add_cc_to_cc(lcc.outer_cc, cumul_cc)
update_cc_extremes(cumul_cc, min_cumul_cc, max_cumul_cc)
self.init_widths(min_cc, max_cc, min_cumul_cc, max_cumul_cc)
def init_widths(
self, min_cc1: Cc, max_cc1: Cc, min_cc2: Cc | None, max_cc2: Cc | None
) -> None:
self.widths = [Width(0, 0, 0)] * self.events.num_events
for i in range(len(self.events.events)):
# Get count and percs widths of the min and max CCs.
(min_count, min_perc1, min_perc2) = self.count_and_percs_strs(
min_cc1, min_cc2, i
)
(max_count, max_perc1, max_perc2) = self.count_and_percs_strs(
max_cc1, max_cc2, i
)
self.widths[i] = Width(
max(len(min_count), len(max_count)),
max(len(min_perc1), len(max_perc1)),
max(len(min_perc2), len(max_perc2)),
)
self.missing_cc_str = ""
for i in self.events.show_indices:
self.missing_cc_str += self.count_and_percs_str(i, ".", "", "")
# Get the count and perc string for `cc1[i]` and the perc string for
# `cc2[i]`. (Unless `cc2` is `None`, in which case `perc2` will be "".)
def count_and_percs_strs(
self, cc1: Cc, cc2: Cc | None, i: int
) -> tuple[str, str, str]:
count = f"{cc1[i]:,d}" # commify
if args.show_percs:
summary_count = self.summary_cc[i]
if cc2 is None:
# A plain or inner CC, with a single percentage.
if cc1[i] == 0:
# Don't show percentages for "0" entries, it's just clutter.
perc1 = ""
elif summary_count == 0:
# Avoid dividing by zero.
perc1 = " (n/a)"
else:
perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%)"
perc2 = ""
else:
# An outer CC, with two percentages.
if summary_count == 0:
# Avoid dividing by zero.
perc1 = " (n/a,"
perc2 = " n/a)"
else:
perc1 = f" ({cc1[i] * 100 / summary_count:.1f}%,"
perc2 = f" {cc2[i] * 100 / summary_count:.1f}%)"
else:
perc1 = ""
perc2 = ""
return (count, perc1, perc2)
def count_and_percs_str(self, i: int, count: str, perc1: str, perc2: str) -> str:
event_w = len(self.events.events[i])
count_w = self.widths[i].count
perc1_w = self.widths[i].perc1
perc2_w = self.widths[i].perc2
pre_w = max(0, event_w - count_w - perc1_w - perc2_w)
return f"{'':>{pre_w}}{count:>{count_w}}{perc1:>{perc1_w}}{perc2:>{perc2_w}} "
def print_events(self, suffix: str) -> None:
print(self.events_prefix, end="")
for i in self.events.show_indices:
event = self.events.events[i]
event_w = len(event)
count_w = self.widths[i].count
perc1_w = self.widths[i].perc1
perc2_w = self.widths[i].perc2
print(f"{event:_<{max(event_w, count_w + perc1_w + perc2_w)}} ", end="")
print(suffix)
def print_lcc(self, lcc: Lcc, outer_name: str, cumul_cc: Cc) -> None:
print("> ", end="")
if (
len(lcc.inner_list_name_cc) == 1
and lcc.outer_cc == lcc.inner_list_name_cc[0][1]
):
# There is only one inner CC, it met the threshold, and it is equal
# to the outer CC. Print the inner CC and outer CC in a single
# line, because they are the same.
inner_name = lcc.inner_list_name_cc[0][0]
self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_name}:{inner_name}")
else:
# There are multiple inner CCs, and at least one met the threshold.
# Print the outer CC and then the inner CCs, indented.
self.print_cc(lcc.outer_cc, cumul_cc, f"{outer_name}:")
for inner_name, inner_cc in lcc.inner_list_name_cc:
print(" ", end="")
self.print_cc(inner_cc, None, f" {inner_name}")
print()
# If `cc2` is `None`, it's a vanilla CC or inner CC. Otherwise, it's an
# outer CC.
def print_cc(self, cc: Cc, cc2: Cc | None, suffix: str) -> None:
for i in self.events.show_indices:
(count, perc1, perc2) = self.count_and_percs_strs(cc, cc2, i)
print(self.count_and_percs_str(i, count, perc1, perc2), end="")
print("", suffix)
def print_missing_cc(self, suffix: str) -> None:
print(self.missing_cc_str, suffix)
# Used in various places in the output.
def print_fancy(text: str) -> None:
fancy = "-" * 80
print(fancy)
print("--", text)
print(fancy)
def print_metadata(desc: str, cmd: str, events: Events) -> None:
print_fancy("Metadata")
print(desc, end="")
print("Command: ", cmd)
print("Data file: ", args.cgout_filename[0])
print("Events recorded: ", *events.events)
print("Events shown: ", *events.show_events)
print("Event sort order:", *events.sort_events)
print("Threshold: ", args.threshold)
if len(args.include) == 0:
print("Include dirs: ")
else:
print(f"Include dirs: {args.include[0]}")
for include_dirname in args.include[1:]:
print(f" {include_dirname}")
print("Annotation: ", "on" if args.annotate else "off")
print()
def print_summary(events: Events, summary_cc: Cc) -> None:
printer = CcPrinter(events, summary_cc)
printer.init_ccs([summary_cc])
print_fancy("Summary")
printer.print_events("")
print()
printer.print_cc(summary_cc, None, "PROGRAM TOTALS")
print()
def print_name_summary(
kind: str, events: Events, dict_name_dcc: DictNameDcc, summary_cc: Cc
) -> set[str]:
# The primary sort event is used for the threshold.
threshold_index = events.sort_indices[0]
# Convert the threshold from a percentage to an event count.
threshold = args.threshold * abs(summary_cc[threshold_index]) / 100
def meets_threshold(name_and_cc: tuple[str, Cc]) -> bool:
cc = name_and_cc[1]
return abs(cc[threshold_index]) >= threshold
# Create a list with the outer CC counts in sort order, so that
# left-to-right list comparison does the right thing. Plus the outer name
# at the end for deterministic output when all the event counts are
# identical in two CCs.
def key_name_and_lcc(name_and_lcc: tuple[str, Lcc]) -> tuple[list[int], str]:
(outer_name, lcc) = name_and_lcc
return (
[abs(lcc.outer_cc[i]) for i in events.sort_indices],
outer_name,
)
# Similar to `key_name_and_lcc`.
def key_name_and_cc(name_and_cc: tuple[str, Cc]) -> tuple[list[int], str]:
(name, cc) = name_and_cc
return ([abs(cc[i]) for i in events.sort_indices], name)
# This is a `filter_map` operation, which Python doesn't directly support.
list_name_lcc: ListNameLcc = []
for outer_name, dcc in dict_name_dcc.items():
# Filter out inner CCs for which the primary sort event count is below the
# threshold, and sort the remainder.
inner_list_name_cc = sorted(
filter(meets_threshold, dcc.inner_dict_name_cc.items()),
key=key_name_and_cc,
reverse=True,
)
# If no inner CCs meet the threshold, ignore the entire DCC, even if
# the outer CC meets the threshold.
if len(inner_list_name_cc) == 0:
continue
list_name_lcc.append((outer_name, Lcc(dcc.outer_cc, inner_list_name_cc)))
list_name_lcc = sorted(list_name_lcc, key=key_name_and_lcc, reverse=True)
printer = CcPrinter(events, summary_cc)
printer.init_list_name_lcc(list_name_lcc)
print_fancy(kind + " summary")
printer.print_events(" " + kind.lower())
print()
# Print LCCs.
threshold_names = set([])
cumul_cc = events.mk_empty_cc()
for name, lcc in list_name_lcc:
add_cc_to_cc(lcc.outer_cc, cumul_cc)
printer.print_lcc(lcc, name, cumul_cc)
threshold_names.add(name)
return threshold_names
class AnnotatedCcs:
line_nums_known_cc: Cc
line_nums_unknown_cc: Cc
unreadable_cc: Cc
below_threshold_cc: Cc
files_unknown_cc: Cc
labels = [
" annotated: files known & above threshold & readable, line numbers known",
" annotated: files known & above threshold & readable, line numbers unknown",
"unannotated: files known & above threshold & unreadable ",
"unannotated: files known & below threshold",
"unannotated: files unknown",
]
def __init__(self, events: Events) -> None:
self.line_nums_known_cc = events.mk_empty_cc()
self.line_nums_unknown_cc = events.mk_empty_cc()
self.unreadable_cc = events.mk_empty_cc()
self.below_threshold_cc = events.mk_empty_cc()
self.files_unknown_cc = events.mk_empty_cc()
def ccs(self) -> list[Cc]:
return [
self.line_nums_known_cc,
self.line_nums_unknown_cc,
self.unreadable_cc,
self.below_threshold_cc,
self.files_unknown_cc,
]
def mk_warning(msg: str) -> str:
return f"""\
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@ WARNING @@
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
{msg}\
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
"""
def warn_src_file_is_newer(src_filename: str, cgout_filename: str) -> None:
msg = f"""\
@ Source file '{src_filename}' is newer than data file '{cgout_filename}'.
@ Annotations may not be correct.
"""
print(mk_warning(msg))
def warn_bogus_lines(src_filename: str) -> None:
msg = f"""\
@@ Information recorded about lines past the end of '{src_filename}'.
"""
print(mk_warning(msg), end="")
def print_annotated_src_file(
events: Events,
dict_line_cc: DictLineCc,
src_file: TextIO,
annotated_ccs: AnnotatedCcs,
summary_cc: Cc,
) -> None:
# If the source file is more recent than the cgout file, issue warning.
if os.stat(src_file.name).st_mtime_ns > os.stat(args.cgout_filename[0]).st_mtime_ns:
warn_src_file_is_newer(src_file.name, args.cgout_filename[0])
printer = CcPrinter(events, summary_cc)
printer.init_ccs(list(dict_line_cc.values()))
# The starting fancy has already been printed by the caller.
printer.print_events("")
print()
# The CC for line 0 is special, holding counts attributed to the source
# file but not to any particular line (due to incomplete debug info).
# Annotate the start of the file with this info, if present.
line0_cc = dict_line_cc.pop(0, None)
if line0_cc:
suffix = "<unknown (line 0)>"
printer.print_cc(line0_cc, None, suffix)
add_cc_to_cc(line0_cc, annotated_ccs.line_nums_unknown_cc)
print()
# Find interesting line ranges: all lines with a CC, and all lines within
# `args.context` lines of a line with a CC.
line_nums = list(sorted(dict_line_cc.keys()))
pairs: list[tuple[int, int]] = []
n = len(line_nums)
i = 0
context = args.context
while i < n:
lo = max(line_nums[i] - context, 1) # `max` to prevent negatives
while i < n - 1 and line_nums[i] + 2 * context >= line_nums[i + 1]:
i += 1
hi = line_nums[i] + context
pairs.append((lo, hi))
i += 1
def print_lines(pairs: list[tuple[int, int]]) -> None:
line_num = 0
while pairs:
src_line = ""
(lo, hi) = pairs.pop(0)
while line_num < lo - 1:
src_line = src_file.readline()
line_num += 1
if not src_line:
return # EOF
# Print line number, unless start of file.
if lo != 1:
print("-- line", lo, "-" * 40)
while line_num < hi:
src_line = src_file.readline()
line_num += 1
if not src_line:
return # EOF
if line_nums and line_num == line_nums[0]:
printer.print_cc(dict_line_cc[line_num], None, src_line[:-1])
add_cc_to_cc(
dict_line_cc[line_num], annotated_ccs.line_nums_known_cc
)
del line_nums[0]
else:
printer.print_missing_cc(src_line[:-1])
# Print line number.
print("-- line", hi, "-" * 40)
# Annotate chosen lines, tracking total annotated counts.
if pairs:
print_lines(pairs)
# If there was info on lines past the end of the file, warn.
if line_nums:
print()
for line_num in line_nums:
printer.print_cc(
dict_line_cc[line_num], None, f"<bogus line {line_num}>"
)
add_cc_to_cc(dict_line_cc[line_num], annotated_ccs.line_nums_known_cc)
print()
warn_bogus_lines(src_file.name)
print()
# This (partially) consumes `dict_fl_dict_line_cc`.
def print_annotated_src_files(
events: Events,
ann_src_filenames: set[str],
dict_fl_dict_line_cc: DictFlDictLineCc,
summary_cc: Cc,
) -> AnnotatedCcs:
annotated_ccs = AnnotatedCcs(events)
def add_dict_line_cc_to_cc(dict_line_cc: DictLineCc | None, accum_cc: Cc) -> None:
if dict_line_cc:
for line_cc in dict_line_cc.values():
add_cc_to_cc(line_cc, accum_cc)
# Exclude the unknown ("???") file, which is unannotatable.
ann_src_filenames.discard("???")
dict_line_cc = dict_fl_dict_line_cc.pop("???", None)
add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.files_unknown_cc)
# Prepend "" to the include dirnames so things work in the case where the
# filename has the full path.
include_dirnames = args.include.copy()
include_dirnames.insert(0, "")
def print_ann_fancy(src_filename: str) -> None:
print_fancy(f"Annotated source file: {src_filename}")
for src_filename in sorted(ann_src_filenames):
readable = False
for include_dirname in include_dirnames:
if include_dirname == "":
full_src_filename = src_filename
else:
full_src_filename = os.path.join(include_dirname, src_filename)
try:
with open(full_src_filename, "r", encoding="utf-8") as src_file:
dict_line_cc = dict_fl_dict_line_cc.pop(src_filename, None)
assert dict_line_cc is not None
print_ann_fancy(src_file.name) # includes full path
print_annotated_src_file(
events,
dict_line_cc,
src_file,
annotated_ccs,
summary_cc,
)
readable = True
break
except OSError:
pass
if not readable:
dict_line_cc = dict_fl_dict_line_cc.pop(src_filename, None)
add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.unreadable_cc)
print_ann_fancy(src_filename)
print("This file was unreadable")
print()
# Sum the CCs remaining in `dict_fl_dict_line_cc`, which are all in files
# below the threshold.
for dict_line_cc in dict_fl_dict_line_cc.values():
add_dict_line_cc_to_cc(dict_line_cc, annotated_ccs.below_threshold_cc)
return annotated_ccs
def print_annotation_summary(
events: Events,
annotated_ccs: AnnotatedCcs,
summary_cc: Cc,
) -> None:
# Show how many events were covered by annotated lines above.
printer = CcPrinter(events, summary_cc)
printer.init_ccs(annotated_ccs.ccs())
print_fancy("Annotation summary")
printer.print_events("")
print()
total_cc = events.mk_empty_cc()
for (cc, label) in zip(annotated_ccs.ccs(), AnnotatedCcs.labels):
printer.print_cc(cc, None, label)
add_cc_to_cc(cc, total_cc)
print()
# Internal sanity check.
if summary_cc != total_cc:
msg = (
"`summary:` line doesn't match computed annotated counts\n"
f"- summary: {summary_cc}\n"
f"- annotated: {total_cc}"
)
die(msg)
def main() -> None:
(
desc,
cmd,
events,
dict_fl_dcc,
dict_fn_dcc,
dict_fl_dict_line_cc,
summary_cc,
) = read_cgout_file()
# Each of the following calls prints a section of the output.
print_metadata(desc, cmd, events)
print_summary(events, summary_cc)
ann_src_filenames = print_name_summary(
"File:function", events, dict_fl_dcc, summary_cc
)
print_name_summary("Function:file", events, dict_fn_dcc, summary_cc)
if args.annotate:
annotated_ccs = print_annotated_src_files(
events, ann_src_filenames, dict_fl_dict_line_cc, summary_cc
)
print_annotation_summary(events, annotated_ccs, summary_cc)
if __name__ == "__main__":
main()