#!/usr/bin/env python3 """llama.cpp cross-backend profiler analysis tool. Usage: python -m tools.profiler.profiler profile.json python -m tools.profiler.profiler profile.json --chrome-trace trace.json """ from __future__ import annotations import json import sys from dataclasses import dataclass, field from pathlib import Path from typing import Optional OP_EVENT = 0 COPY_EVENT = 1 TYPE_NAMES = {0: "OP", 1: "COPY"} @dataclass class ProfileRecord: type: int name: str backend_id: int split_id: int start_ns: int duration_ns: int bytes: int extra: Optional[str] ne_src0: list[int] = field(default_factory=lambda: [0, 0, 0, 0]) ne_src1: list[int] = field(default_factory=lambda: [0, 0, 0, 0]) ne_src2: list[int] = field(default_factory=lambda: [0, 0, 0, 0]) @property def type_name(self) -> str: return TYPE_NAMES.get(self.type, f"UNKNOWN({self.type})") @property def duration_us(self) -> float: return self.duration_ns / 1000.0 @property def duration_ms(self) -> float: return self.duration_ns / 1_000_000.0 @property def bandwidth_gbps(self) -> float: """Bandwidth in GB/s.""" if self.duration_ns == 0 or self.bytes == 0: return 0.0 return self.bytes / self.duration_ns @staticmethod def _fmt_ne(ne: list[int]) -> str: dims = [n for n in ne if n > 0] if not dims: return "" return "[" + ", ".join(str(d) for d in dims) + "]" @property def shape_str(self) -> str: """Human-readable tensor shapes, e.g. '[4096, 4096] x [4096, 1] x [8, 1]'.""" s0 = self._fmt_ne(self.ne_src0) s1 = self._fmt_ne(self.ne_src1) s2 = self._fmt_ne(self.ne_src2) parts = [s for s in (s0, s1, s2) if s] return " x ".join(parts) def to_dict(self) -> dict: return { "type": self.type, "name": self.name, "backend_id": self.backend_id, "split_id": self.split_id, "start_ns": self.start_ns, "duration_ns": self.duration_ns, "bytes": self.bytes, "extra": self.extra, "ne_src0": self.ne_src0, "ne_src1": self.ne_src1, "ne_src2": self.ne_src2, } @dataclass class OpStats: name: str event_type: int backend_id: int count: int = 0 total_ns: int = 0 min_ns: int = 0 max_ns: int = 0 total_bytes: int = 0 representative_ne: list[int] = field(default_factory=lambda: [0, 0, 0, 0]) @property def avg_ns(self) -> float: return self.total_ns / self.count if self.count > 0 else 0 @property def avg_us(self) -> float: return self.avg_ns / 1000.0 @property def total_ms(self) -> float: return self.total_ns / 1_000_000.0 @property def min_us(self) -> float: return self.min_ns / 1000.0 @property def max_us(self) -> float: return self.max_ns / 1000.0 @property def bandwidth_gbps(self) -> float: if self.total_ns == 0 or self.total_bytes == 0: return 0.0 return self.total_bytes / self.total_ns @property def time_per_byte_ns(self) -> float: """Time per byte (lower = more efficient).""" if self.total_bytes == 0: return float("inf") return self.total_ns / self.total_bytes @property def type_name(self) -> str: return TYPE_NAMES.get(self.event_type, f"UNKNOWN({self.event_type})") class ProfileData: def __init__(self, records: list[ProfileRecord], metadata: dict): self.records = records self.metadata = metadata @classmethod def load(cls, filepath: str | Path) -> ProfileData: """Load a profiler JSON file.""" with open(filepath, "r") as f: data = json.load(f) if data.get("profiler") != "ggml": print(f"Warning: file may not be a ggml profiler output (profiler={data.get('profiler')})") records = [] def _pad_ne(v): if isinstance(v, list) and len(v) < 4: return v + [0] * (4 - len(v)) if not isinstance(v, list): return [0, 0, 0, 0] return v for r in data.get("records", []): # Support both old "ne" format and new "ne_src0"/"ne_src1" format ne_src0 = _pad_ne(r.get("ne_src0", r.get("ne", [0, 0, 0, 0]))) ne_src1 = _pad_ne(r.get("ne_src1", [0, 0, 0, 0])) ne_src2 = _pad_ne(r.get("ne_src2", [0, 0, 0, 0])) records.append(ProfileRecord( type=r.get("type", 0), name=r.get("name", "unknown"), backend_id=r.get("backend_id", 0), split_id=r.get("split_id", 0), start_ns=r.get("start_ns", 0), duration_ns=r.get("duration_ns", 0), bytes=r.get("bytes", 0), extra=r.get("extra"), ne_src0=ne_src0, ne_src1=ne_src1, ne_src2=ne_src2, )) backends_raw = data.get("backends", []) backends = [] for b in backends_raw: backends.append({ "id": b.get("id", 0), "name": b.get("name", "unknown"), "device": b.get("device", "unknown"), "device_type": b.get("device_type", 0), }) metadata = { "version": data.get("version", 0), "total_records": data.get("total_records", len(records)), "total_ns": data.get("total_ns", sum(r.duration_ns for r in records)), "backends": backends, } return cls(records, metadata) @property def total_ns(self) -> int: return sum(r.duration_ns for r in self.records) @property def total_ms(self) -> float: return self.total_ns / 1_000_000.0 def stats(self) -> list[OpStats]: """Aggregate stats grouped by (name, type, backend_id).""" groups: dict[tuple, OpStats] = {} for rec in self.records: key = (rec.name, rec.type, rec.backend_id) if key not in groups: groups[key] = OpStats( name=rec.name, event_type=rec.type, backend_id=rec.backend_id, min_ns=rec.duration_ns, max_ns=rec.duration_ns, representative_ne=list(rec.ne_src0), ) s = groups[key] s.count += 1 s.total_ns += rec.duration_ns s.min_ns = min(s.min_ns, rec.duration_ns) s.max_ns = max(s.max_ns, rec.duration_ns) s.total_bytes += rec.bytes # Track the ne from the longest individual call if rec.duration_ns >= s.max_ns: s.representative_ne = list(rec.ne_src0) return sorted(groups.values(), key=lambda s: s.total_ns, reverse=True) def top_operations(self, n: int = 10) -> list[OpStats]: """Return the N most time-consuming operations (aggregated).""" return self.stats()[:n] def top_kernels(self, n: int = 10) -> list[ProfileRecord]: """Return the N longest individual kernel executions.""" return sorted(self.records, key=lambda r: r.duration_ns, reverse=True)[:n] def by_backend(self) -> dict[int, list[ProfileRecord]]: """Group records by backend ID.""" groups: dict[int, list[ProfileRecord]] = {} for rec in self.records: groups.setdefault(rec.backend_id, []).append(rec) return dict(sorted(groups.items())) def timeline(self) -> list[ProfileRecord]: """Return records sorted by start_ns for timeline visualization.""" return sorted(self.records, key=lambda r: r.start_ns) def inefficiency_ranking(self, n: int = 10) -> list[OpStats]: """Rank operations by time per byte (inefficiency). Lower is better.""" all_stats = [s for s in self.stats() if s.total_bytes > 0 and s.event_type == OP_EVENT] return sorted(all_stats, key=lambda s: s.time_per_byte_ns, reverse=True)[:n] def summary(self) -> None: """Print a formatted summary table to stdout.""" print(f"\n{'='*80}") print(f" ggml Profiler Summary") print(f"{'='*80}") print(f" Total records: {len(self.records)}") print(f" Total time: {self.total_ms:.2f} ms") print(f" Unique ops: {len(set((r.name, r.type, r.backend_id) for r in self.records))}") print(f"{'='*80}\n") stats = self.stats() if not stats: print(" No profiling data.\n") return print(f" {'TYPE':<5} {'BKND':>4} {'Operation':<28} {'%Time':>7} {'Count':>6} " f"{'Total':>10} {'Avg':>10} {'Min':>10} {'Max':>10} {'Bandwidth':>12}") print(f" {'':->5} {'':->4} {'':->28} {'':->7} {'':->6} " f"{'(ms)':>10} {'(us)':>10} {'(us)':>10} {'(us)':>10} {'':->12}") for s in stats: pct = 100.0 * s.total_ns / self.total_ns if self.total_ns > 0 else 0 line = (f" {s.type_name:<5} {s.backend_id:>4} {s.name:<28} {pct:>6.1f}% " f"{s.count:>6} {s.total_ms:>10.2f} {s.avg_us:>10.2f} " f"{s.min_us:>10.2f} {s.max_us:>10.2f}") if s.total_bytes > 0 and s.total_ns > 0: bw = s.bandwidth_gbps if bw >= 1000.0: line += f" {bw / 1000.0:>9.2f} TB/s" else: line += f" {bw:>9.2f} GB/s" else: line += f" {'':>12}" # Tensor shape from longest call shape_dims = [n for n in s.representative_ne if n > 0] if shape_dims: line += f" [{', '.join(str(d) for d in shape_dims)}]" print(line) backend_groups = self.by_backend() if len(backend_groups) > 1: print(f"\n --- By Backend ---") for bid, recs in sorted(backend_groups.items()): bk_total = sum(r.duration_ns for r in recs) bk_pct = 100.0 * bk_total / self.total_ns if self.total_ns > 0 else 0 print(f" Backend {bid}: {bk_total / 1e6:.2f} ms ({bk_pct:.1f}%) — {len(recs)} records") inef = self.inefficiency_ranking(5) if inef: print(f"\n --- Top 5 Inefficient Operations (time/byte) ---") for s in inef: print(f" {s.name:<28} {s.time_per_byte_ns / 1000:.2f} us/byte " f"({s.count} calls, {s.total_bytes / 1e6:.1f} MB)") top_k = self.top_kernels(5) print(f"\n --- Top 5 Longest Kernels ---") for rec in top_k: shape = f" {rec.shape_str}" if rec.shape_str else "" print(f" {rec.type_name:<5} {rec.name:<28} {rec.duration_us:>10.2f} us{shape} " f"(split={rec.split_id}, backend={rec.backend_id})") print() def export_chrome_trace(self, filepath: str | Path) -> None: """Export as Chrome Trace Event format for chrome://tracing.""" events = [] # Build backend name mapping and remap to non-negative PIDs # (Chrome cannot handle negative PIDs) backend_ids = sorted(set(rec.backend_id for rec in self.records)) backend_names: dict[int, str] = {} pid_map: dict[int, int] = {} # Use metadata from JSON if available metadata_backends = self.metadata.get("backends", []) backend_by_id: dict[int, dict] = {b["id"]: b for b in metadata_backends} device_type_names = {0: "CPU", 1: "GPU", 2: "ACCEL"} for idx, bid in enumerate(backend_ids): pid_map[bid] = idx if bid in backend_by_id: binfo = backend_by_id[bid] dev_type = binfo.get("device_type", 0) dev_name = binfo.get("device", "") type_name = device_type_names.get(dev_type, "Device") if dev_name and dev_name != "unknown": backend_names[bid] = f"{type_name}: {dev_name}" else: backend_names[bid] = f"{type_name}: {binfo.get('name', f'Backend {bid}')}" else: backend_names[bid] = f"Backend {bid}" # Process metadata events for bid in backend_ids: pid = pid_map[bid] events.append({ "ph": "M", # metadata "pid": pid, "name": "process_name", "args": {"name": backend_names[bid]}, }) # Use real timestamps, but prevent overlaps within each track. # GPU kernels are launched rapidly (small start_ns gaps) but have long # durations, so naive real timestamps overlap. Sweep-line per track: # sort by start_ns, then place each event at max(start, prev_end). from collections import defaultdict tracks: dict[tuple, list[ProfileRecord]] = defaultdict(list) for rec in self.records: tracks[(rec.backend_id, rec.split_id)].append(rec) for key in tracks: tracks[key].sort(key=lambda r: r.start_ns) for key, recs in tracks.items(): pid = pid_map[key[0]] tid = f"split_{key[1]}" cursor = 0.0 for rec in recs: ts = max(rec.start_ns / 1000.0, cursor) dur = rec.duration_ns / 1000.0 cat = "copy" if rec.type == COPY_EVENT else "compute" events.append({ "ph": "X", # complete event "pid": pid, "tid": tid, "name": rec.name, "ts": ts, "dur": dur, "cat": cat, "args": { "bytes": rec.bytes, "duration_us": dur, "shape": rec.shape_str, }, }) cursor = ts + dur trace = {"traceEvents": events} with open(filepath, "w") as f: json.dump(trace, f, indent=2) print(f"Chrome trace exported to: {filepath}") print(f"Open chrome://tracing in Chrome/Edge and load this file.") def export_html_viewer(self, filepath: str | Path, max_records: int = 0) -> None: """Export a self-contained interactive HTML timeline viewer using Canvas.""" import json as json_mod metadata_backends = self.metadata.get("backends", []) backend_by_id: dict[int, dict] = {b["id"]: b for b in metadata_backends} backend_names: dict[int, str] = {} for bid in sorted(set(rec.backend_id for rec in self.records)): binfo = backend_by_id.get(bid, {}) name = binfo.get("name", f"Backend {bid}") device = binfo.get("device", "") backend_names[bid] = device if device and device != "unknown" else name events: list[dict] = [] cum_us = 0.0 for rec in self.records: dur_us = rec.duration_ns / 1000.0 events.append({ "n": rec.name, "d": dur_us, "s": rec.shape_str, "b": rec.bytes, "t": rec.type, "bid": rec.backend_id, "start": cum_us, }) cum_us += dur_us total_us = cum_us if max_records > 0 and len(events) > max_records: stride = len(events) // max_records events = events[::stride][:max_records] if total_us == 0: print("No profiling data to export.") return header_stats = str(len(events)) + ' events | ' + f'{total_us/1000:.1f}' + ' ms' # Build backend name map with string keys for JSON bn_str = {str(k): v for k, v in backend_names.items()} # --- HTML --- html = ( '\n
' '