#!/usr/bin/env python3
"""
SuperMark Observability — local audit script.

Reads your Claude Code session logs (~/.claude/projects/*/*.jsonl) entirely
on your machine. Emits aggregated metrics as JSON. Raw conversation content
and file paths are NEVER included in the output — only counts, ratios, and
categorical flags.

Usage:
    python3 supermark-observability-audit.py --json > report.json
    python3 supermark-observability-audit.py --modules expert-usage --json
    python3 supermark-observability-audit.py --help

No dependencies beyond the Python standard library.
"""

from __future__ import annotations

import argparse
import glob
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path


SCRIPT_VERSION = "0.8.0"
SCRIPT_NAME = "supermark-observability-audit"
DEFAULT_PROJECTS_DIR = os.path.expanduser("~/.claude/projects")


# ═══════════════════════════════════════════════════════════════════════════
#  MODULE 1 — Expert Usage Audit
# ═══════════════════════════════════════════════════════════════════════════

EXPERT_USAGE_ID = "expert-usage"
EXPERT_USAGE_VERSION = "0.5.0"
EXPERT_USAGE_NAME = "Expert Usage Audit"

# Domain taxonomy — internal mapping only. The canonical names are transmitted;
# the pattern lists stay here so the server never needs file paths to classify.
DOMAIN_MAP = {
    "Technical": [
        "technical-architect", "mobile-architect", "lead-developer",
        "technical-architecture",
    ],
    "UX & Design": [
        "ux-director", "ux-frontier", "journey-architect", "brand-designer",
        "accessibility-expert", "brand-book", "ux-manual",
    ],
    "Legal": [
        "legal-counsel", "legal-reviewer", "legal-adversary",
        "legal-playbook", "legal-process",
    ],
    "Content": [
        "publishing-director", "content-strategist", "marketing-strategist",
        "technical-writer", "seo-expert", "audio-producer", "course-architect",
        "content-playbook", "course-design-playbook", "documentation-playbook",
    ],
    "Strategy": [
        "strategic-advisor", "poc-expert", "creative-ideator", "davos-expert",
        "landscape-researcher", "product-manager", "founder-in-residence",
        "conference-curator",
    ],
    "Quality": ["qa-lead", "security-reviewer"],
    "Meta/System": [
        "expert-coach", "calibration-analyst", "expert-system-architect",
        "learning-designer", "expert-operating-system", "expert-protocol",
        "expert-onboarding", "development-tracker",
    ],
    "Art": ["art-director", "art-forensics", "art-critic", "digital-artist"],
    "Production": [
        "music-video-director", "casting-agent", "location-scout",
        "production-designer", "stylist", "colourist", "dop",
        "prompt-director", "editor", "music-producer", "music-lyricist",
        "music-architect", "music-ar",
    ],
    "Research": [
        "qual-research-lead", "research-synthesist", "research-analyst",
        "conference-scout", "expert-architect", "case-study-hunter",
        "landscape-analyst",
    ],
}
DOMAIN_ORDER = [
    "Technical", "UX & Design", "Legal", "Content", "Strategy",
    "Quality", "Meta/System", "Art", "Production", "Research",
]
EXPERT_SKILL_KEYWORDS = ["expert", "calibrat", "360", "coach", "drill", "review-session"]
META_EXPERT_KEYWORDS = ["coach", "calibrat", "system-architect", "learning-designer"]
EXPERT_FRAMING_RE = re.compile(
    r"(you are|act as|you're now|speaking as|I am)\s+"
    r"(a |an |the )?"
    r"(expert|specialist|advisor|counsel|architect|director|coach|"
    r"analyst|designer|strategist|producer|reviewer|scout|critic)",
    re.I,
)
MT_CODE_RE = re.compile(r"MT-[A-Z]{2}\d{2}")

# Slash-command invocations appear in user text as:
#   <command-name>/foo</command-name>
# This regex pulls the command name out.
SLASH_COMMAND_RE = re.compile(r"<command-name>/([a-zA-Z0-9_:-]+)</command-name>")

# Expert-specific slash commands indicate an active expert workflow.
EXPERT_SLASH_RE = re.compile(
    r"^(expert-[a-z0-9-]+|research:[a-z0-9-]+|panel|focus-group|"
    r"find-cases|validate-cases|review-cases|"
    r"coach-expert|calibrate-expert|audit-expert)$"
)

# Claude Code built-in slash commands — NOT OS signals, just tool operations.
# Everything NOT in this set (and not an expert slash) is a SuperMark-authored
# OS-layer command and counts as method-equipped usage.
CLAUDE_BUILTIN_SLASHES = frozenset({
    "exit", "cost", "login", "logout", "mcp", "compact", "context",
    "status", "fresh", "resume", "restart", "help", "bug", "extra-usage",
    "clear", "config", "model", "theme", "review", "pr-comments",
})

# MCP calls to the SuperMark observability/memory store — expert OS infrastructure.
MCP_EXPERT_OS_RE = re.compile(
    r"^mcp__supermark__(expert_|memory_|decision_|awe_)"
)
MCP_SESSION_RE = re.compile(r"^mcp__supermark__(session_|idea_)")

# OS-layer skills invoked via the Skill tool. When a user types /log, Claude
# invokes the `log` skill via the Skill tool rather than typing the slash
# command — SLASH_COMMAND_RE never sees it. These skills are the same method
# equipment as the user-typed slashes, so they get the same `has_os_slash`
# promotion. Empirical basis: 5/20 L0 sessions in the 2026-04-17 sample hit
# this blind spot (sessions 12, 15, 16, 17, 20 in l0-tags-2026-04-17.md).
OS_SKILL_NAMES = frozenset({
    "log", "continue", "app-restart", "work", "idea", "brief", "focus",
    "lean", "url", "shelf", "unshelf", "init", "legal", "seo-on",
    "document", "focus-group",
})

# Method-equipped review skills — build-check, security-review, differential-
# review and friends. These do real code-review method work (audits, security
# scans, checklists) but aren't expert personas or OS slashes. Promote to the
# has_os_slash signal (L3 floor). Empirical basis: session 9 in the 2026-04-17
# sample ran a 47-turn code review invisibly via `build-check`.
METHOD_REVIEW_SKILLS = frozenset({
    "build-check", "security-review", "differential-review", "review",
    "requesting-code-review", "receiving-code-review",
    "test-driven-development", "verification-before-completion",
    "systematic-debugging", "web-design-guidelines",
    "second-opinion",
})

# Registry / relationship-map files. Reading these is method-equipped
# "consulting the expert index" behaviour, weaker than loading a full persona
# but stronger than L0. The existing startswith("EXPERT"/"AGENT") filter was
# dropping these on the floor. Empirical basis: session 17 in the sample.
REGISTRY_FILES = frozenset({
    "EXPERT-REGISTRY.md", "RELATIONSHIP-MAP.md",
})

# ── Work-type classifier (v0.5.0) ────────────────────────────────────────
# Buckets + minimum defensible L-level from
# `_shared/experts/calibration-cards/session-expert-usage.md` §2.
BUCKET_MIN_LEVEL = {
    "quick-ops":       0,
    "build-code":      2,
    "debug":           2,
    "ui-iteration":    3,
    "design-strategy": 2,
    "code-review":     4,
    "method-ops":      3,
}
BUCKET_ORDER = [
    "quick-ops", "build-code", "debug", "ui-iteration",
    "design-strategy", "code-review", "method-ops", "unclassified",
]

# First-turn framing detectors — only the user's opening turn is scanned so
# long sessions don't get mis-classified because someone later mentioned a
# bug or asked a strategy question. Keep patterns narrow: whole words only,
# phrase-based where possible.
STRATEGY_FRAMING_RE = re.compile(
    r"\b(should we|what could we|can we (?:add|build|do|include|use|think)|"
    r"what if we|how should we|how could we|thinking of|do you think|"
    r"we might (?:add|build|do|include|use|try)|"
    r"what (?:is|are) the (?:benefit|tradeoff|downside|upside|pro|con)|"
    r"(?:lower|faster|cheaper) (?:cost|way)|"
    r"work out what (?:things|we)|"
    r"explore (?:options|ideas)|"
    r"looking for (?:options|ways|a way))\b",
    re.IGNORECASE,
)
BUG_FRAMING_RE = re.compile(
    r"\b(bug|broken|fixing|doesn'?t work|isn'?t working|not working|"
    r"failing|regression|debug|still (?:not|wrong|broken)|401|error)\b",
    re.IGNORECASE,
)
SECURITY_FRAMING_RE = re.compile(
    r"\b(security review|security audit|security scan|pen[- ]?test|"
    r"vulnerability|threat model|OWASP|CVE-\d)\b",
    re.IGNORECASE,
)

# "scr" shorthand or "screenshot" in a human turn — UI-iteration signal.
# Whole-word boundaries prevent matches inside "script", "screen", etc.
SCREENSHOT_LANG_RE = re.compile(r"\b(scr|screenshot)s?\b", re.IGNORECASE)

# Method-ops signals: handover reads and repo-infra paths.
HANDOVER_FILE_RE = re.compile(r"_handover/continue--.+\.md$", re.IGNORECASE)
INFRA_PATH_RE = re.compile(r"(projects/infra/|_shared/process/)", re.IGNORECASE)


def _score_expert_session(filepath: str) -> dict:
    """Analyse one JSONL file for expert-usage signals. Returns level + signals."""
    signals = {
        "has_persona_read": False,
        "has_memory_read": False,
        "has_notepad_read": False,
        "has_playbook_read": False,
        "has_kp_read": False,
        "has_focus_group": False,
        "has_meta_expert": False,
        "has_cross_expert": False,
        "has_expert_skill": False,
        "has_inline_framing": False,
        "has_mt_code": False,
        "has_feedforward": False,
        "has_compliance": False,
        "has_registry_consult": False,  # read EXPERT-REGISTRY.md / RELATIONSHIP-MAP.md
        "has_expert_slash": False,  # /expert-*, /research:*, /panel, /focus-group
        "has_os_slash": False,      # /log, /logf, /continue, /brief
        "has_mcp_expert_os": False, # mcp__supermark__expert_/memory_/decision_/awe_
        "has_mcp_session": False,   # mcp__supermark__session_/idea_
        # Classifier signals (card §2) — telemetry-only, no intent inference.
        "has_handover_read": False,      # Read of _handover/continue--*.md
        "has_infra_touch": False,        # any tool touching projects/infra/ or _shared/process/
        "has_review_skill": False,       # Skill invocations of METHOD_REVIEW_SKILLS
        "has_strategy_framing": False,   # first-turn "should we" / "what could we"
        "has_bug_framing": False,        # first-turn "bug" / "broken" / "fix"
        "has_security_framing": False,   # first-turn "security review" / "pen-test"
        "persona_count": 0,
    }
    personas = set()
    domains_hit = set()
    timestamp = None
    # Session-shape counters — used to skip empty sessions from aggregate
    # stats (e.g. one user turn that's just /exit, or an abandoned tab).
    human_turns = 0
    tool_uses = 0
    # Classifier inputs — tool-name counts + user-turn-derived signals.
    edit_calls = 0
    write_calls = 0
    grep_calls = 0
    read_calls = 0
    screenshot_turns = 0  # human turns matching SCREENSHOT_LANG_RE
    first_turn_text: str | None = None

    try:
        with open(filepath, "r", encoding="utf-8", errors="replace") as fh:
            for line in fh:
                try:
                    obj = json.loads(line.strip())
                except (json.JSONDecodeError, ValueError):
                    continue

                ts = obj.get("timestamp")
                if ts and not timestamp:
                    timestamp = ts[:10]

                rtype = obj.get("type")
                msg = obj.get("message", {}) or {}
                content = msg.get("content", [])

                texts = []
                if isinstance(content, str):
                    texts.append(content)
                elif isinstance(content, list):
                    for block in content:
                        if isinstance(block, dict) and block.get("type") == "text":
                            texts.append(block.get("text", ""))
                        if isinstance(block, dict) and block.get("type") == "tool_use":
                            tool_uses += 1

                if rtype == "user":
                    joined = "\n".join(t for t in texts if isinstance(t, str)).strip()
                    if (joined and not joined.startswith("<")
                            and "[Request interrupted" not in joined[:80]):
                        human_turns += 1
                        # Screenshot language in this human turn? Scan only
                        # the first 300 chars so a long pasted blob doesn't
                        # accidentally false-match on "scr" inside other words.
                        if SCREENSHOT_LANG_RE.search(joined[:300]):
                            screenshot_turns += 1
                        # Capture the opening user turn verbatim (capped at
                        # 500 chars). First-turn framing is all we scan for
                        # design-strategy / debug / security signals.
                        if first_turn_text is None:
                            first_turn_text = joined[:500]

                for text in texts:
                    if EXPERT_FRAMING_RE.search(text):
                        signals["has_inline_framing"] = True
                    if MT_CODE_RE.search(text):
                        signals["has_mt_code"] = True
                    low = text.lower()
                    if "feedforward" in low:
                        signals["has_feedforward"] = True
                    if "compliance" in low and "register" in low:
                        signals["has_compliance"] = True
                    for cmd in SLASH_COMMAND_RE.findall(text):
                        if EXPERT_SLASH_RE.match(cmd):
                            signals["has_expert_slash"] = True
                            domains_hit.add("Meta/System")
                            if cmd.startswith("focus-group") or cmd.startswith("research:"):
                                domains_hit.add("Research")
                            if ("coach" in cmd or "calibrate" in cmd
                                    or "audit" in cmd):
                                signals["has_meta_expert"] = True
                        elif cmd not in CLAUDE_BUILTIN_SLASHES:
                            # Any non-builtin slash is a SuperMark OS command
                            # (/log, /logf, /continue, /url, /focus, /brief,
                            # /idea, /work, /app-restart, /document, /image,
                            # /brand-kit, /init, /legal, /lean, ...).
                            signals["has_os_slash"] = True

                if isinstance(content, list):
                    for block in content:
                        if not isinstance(block, dict):
                            continue
                        if block.get("type") != "tool_use":
                            continue

                        name = block.get("name", "")
                        inp = block.get("input", {}) or {}

                        # Tool-name counters for work-type classification.
                        if name == "Edit":
                            edit_calls += 1
                        elif name == "Write":
                            write_calls += 1
                        elif name == "Grep":
                            grep_calls += 1
                        elif name == "Read":
                            read_calls += 1

                        # Path-based method-ops signals — any tool touching
                        # a handover file or a repo-infra path counts.
                        fp_any = inp.get("file_path", "") or inp.get("path", "")
                        if fp_any:
                            if HANDOVER_FILE_RE.search(fp_any):
                                signals["has_handover_read"] = True
                            if INFRA_PATH_RE.search(fp_any):
                                signals["has_infra_touch"] = True

                        if name == "Read":
                            fp = inp.get("file_path", "")
                            fn = fp.split("/")[-1]
                            stem = (fn
                                    .replace(".md", "")
                                    .replace(".memory", "")
                                    .replace(".notepad", ""))

                            if "experts/" in fp:
                                if ".memory" in fn:
                                    signals["has_memory_read"] = True
                                elif ".notepad" in fn:
                                    signals["has_notepad_read"] = True
                                elif any(x in fn for x in ("playbook", "guide", "manual")):
                                    signals["has_playbook_read"] = True
                                elif "knowledge-pack" in fp:
                                    signals["has_kp_read"] = True
                                elif fn in REGISTRY_FILES:
                                    signals["has_registry_consult"] = True
                                elif fn.endswith(".md") and not fn.startswith(("EXPERT", "AGENT")):
                                    signals["has_persona_read"] = True
                                    personas.add(fn)
                                    if any(m in fn for m in META_EXPERT_KEYWORDS):
                                        signals["has_meta_expert"] = True

                                for domain, patterns in DOMAIN_MAP.items():
                                    if any(p in stem for p in patterns):
                                        domains_hit.add(domain)
                                        break

                            elif "focus-group" in fp:
                                signals["has_focus_group"] = True
                                domains_hit.add("Research")
                            elif "compliance-register" in fp:
                                signals["has_compliance"] = True

                        elif name == "Skill":
                            sk = inp.get("skill", "")
                            if any(e in sk for e in EXPERT_SKILL_KEYWORDS):
                                signals["has_expert_skill"] = True
                                domains_hit.add("Meta/System")
                            if "focus-group" in sk or "panel" in sk:
                                signals["has_focus_group"] = True
                                domains_hit.add("Research")
                            if sk.startswith("research:"):
                                signals["has_expert_skill"] = True
                                domains_hit.add("Research")
                            if any(m in sk for m in META_EXPERT_KEYWORDS):
                                signals["has_meta_expert"] = True
                                domains_hit.add("Meta/System")
                            if sk in OS_SKILL_NAMES:
                                signals["has_os_slash"] = True
                            if sk in METHOD_REVIEW_SKILLS:
                                signals["has_os_slash"] = True
                                signals["has_review_skill"] = True
                        elif name.startswith("mcp__supermark__"):
                            if MCP_EXPERT_OS_RE.match(name):
                                signals["has_mcp_expert_os"] = True
                                domains_hit.add("Meta/System")
                            elif MCP_SESSION_RE.match(name):
                                signals["has_mcp_session"] = True
    except OSError:
        return {"timestamp": None, "level": 0, "domains": [], "persona_count": 0, **signals}

    signals["persona_count"] = len(personas)
    signals["has_cross_expert"] = len(personas) >= 2

    # First-turn framing — applied only to the opening human turn so that
    # later conversational drift doesn't flip the bucket. (Card §6 guardrail
    # 2: triggers are behavioural thresholds, not intent inference.)
    if first_turn_text:
        if STRATEGY_FRAMING_RE.search(first_turn_text):
            signals["has_strategy_framing"] = True
        if BUG_FRAMING_RE.search(first_turn_text):
            signals["has_bug_framing"] = True
        if SECURITY_FRAMING_RE.search(first_turn_text):
            signals["has_security_framing"] = True

    level = 0
    if signals["has_inline_framing"]:
        level = max(level, 1)
    if signals["has_mt_code"] and level == 0:
        level = 1
    if signals["has_registry_consult"] and level == 0:
        level = 1
    if signals["has_persona_read"] or signals["has_memory_read"]:
        level = max(level, 2)
    if signals["has_playbook_read"] or signals["has_kp_read"]:
        level = max(level, 3)
    # L3: Method-equipped — OS-layer slash commands and session logging
    if signals["has_os_slash"] or signals["has_mcp_session"]:
        level = max(level, 3)
    # L4: Calibrated — expert skills, expert slash commands, MCP expert/memory store
    if (signals["has_expert_skill"] or signals["has_compliance"]
            or signals["has_expert_slash"] or signals["has_mcp_expert_os"]):
        level = max(level, 4)
    if signals["has_meta_expert"] and signals["has_cross_expert"]:
        level = max(level, 4)
    # L5: Operating system — feedforward or (meta-expert + any expert workflow trigger)
    l4_expert_trigger = (
        signals["has_expert_skill"]
        or signals["has_expert_slash"]
        or signals["has_mcp_expert_os"]
    )
    if signals["has_feedforward"] or (signals["has_meta_expert"] and l4_expert_trigger):
        level = max(level, 5)

    # "Empty" session — e.g. a tab where the user only typed /exit, or an
    # abandoned one. These drag the L0 bucket down without representing real
    # work. Mark them so the aggregator can exclude them from the denominator.
    is_empty = human_turns < 2 and tool_uses < 3

    return {
        "timestamp": timestamp,
        "level": level,
        "domains": sorted(domains_hit),
        "persona_count": signals["persona_count"],
        "human_turns": human_turns,
        "tool_uses": tool_uses,
        "edit_calls": edit_calls,
        "write_calls": write_calls,
        "grep_calls": grep_calls,
        "read_calls": read_calls,
        "screenshot_turns": screenshot_turns,
        "is_empty": is_empty,
        **signals,
    }


def _classify_work_type(r: dict) -> str:
    """Classify a scored session into a work-type bucket.

    Priority tuning (2026-04-18, spot-checked against the 20 L0 anchors
    in `projects/infra/ai_observability/taxonomy-evidence/`):

    INTENT signals (first-turn framing + dedicated skills) outrank
    BEHAVIOURAL signals (tool-count thresholds). A session that opens
    with "what could we do about X" stays design-strategy even if it
    later happens to touch 5 files. A session that opens with "bug in
    OAuth" stays debug even if the user pastes 27 screenshots to show
    the error. This keeps intent stable and prevents late-session drift
    from flipping the bucket.

    Priority:
      (1) code-review      — review skill OR security framing
      (2) design-strategy  — strategy framing in first turn
      (3) method-ops       — handover file read (explicit continuation)
      (4) debug            — bug framing + sustained edits
      (5) ui-iteration     — ≥2 scr turns + edits, or 1 scr + heavy edits
      (6) build-code       — edit+write pattern, sustained turns
      (7) debug secondary  — raw grep+edit cycle without bug framing
      (8) method-ops short — infra-path touch on a small session
      (9) quick-ops        — short session fallback
      else → unclassified.
    """
    turns = r.get("human_turns", 0)
    tools = r.get("tool_uses", 0)
    edits = r.get("edit_calls", 0)
    writes = r.get("write_calls", 0)
    greps = r.get("grep_calls", 0)
    scr = r.get("screenshot_turns", 0)

    # (1) code-review
    if r.get("has_review_skill") or r.get("has_security_framing"):
        return "code-review"

    # (2) design-strategy — first-turn intent is a strong signal. Cap at
    # turns ≤ 15 so a strategy opener that turned into a 40-turn build
    # doesn't stay in design-strategy.
    if r.get("has_strategy_framing") and turns <= 15:
        return "design-strategy"

    # (3) method-ops — explicit handover file read on a session that
    # didn't spin up into substantive work. Long /continue sessions
    # that went on to build/debug belong in those buckets, not here.
    if r.get("has_handover_read") and (turns <= 10 or edits <= 2):
        return "method-ops"

    # (4) debug — bug framing + sustained edits. Priority above UI so
    # bug-driven sessions with heavy screenshot use don't drift into
    # ui-iteration (anchors: OAuth 401, voice design bug).
    if r.get("has_bug_framing") and edits >= 3 and turns >= 5:
        return "debug"

    # (5) ui-iteration
    if scr >= 2 and edits >= 3:
        return "ui-iteration"
    if scr >= 1 and edits >= 10 and turns <= 10:
        return "ui-iteration"

    # (6) build-code. Fallback: edit+write share > 15% catches frontend
    # tweaks that mostly Edit without net-new Writes.
    if edits >= 3 and writes >= 1 and turns >= 10:
        return "build-code"
    if turns >= 10 and tools and (edits + writes) / tools > 0.15 and edits >= 5:
        return "build-code"

    # (7) method-ops short — infra-path touch on a small session (ports
    # fixes, process doc edits). Has to run BEFORE debug-secondary so a
    # short ports-fix session with grep+edit activity doesn't get
    # mislabelled as debug. Longer infra-touching sessions are already
    # classified by the substantive-work rules above.
    if r.get("has_infra_touch") and turns <= 8:
        return "method-ops"

    # (8) debug secondary — raw grep+edit cycle without bug framing.
    if greps >= 5 and edits >= 3:
        return "debug"

    # (9) quick-ops
    if turns <= 3 and tools <= 30:
        return "quick-ops"

    return "unclassified"


def _compute_red_flags(r: dict, bucket: str) -> list[str]:
    """Return the RF codes the session triggers (card §5).

    Each flag is a behavioural threshold + L0 check. Bucket is passed in
    for RF1 / RF6 / RF7 where the bucket name is part of the trigger.
    Sessions at L > 0 cannot trigger any flag — by design, all red flags
    are L0-conditioned (an equipped session is, by definition, not a
    calibration failure of the type this card catches).
    """
    if r.get("level", 0) != 0:
        return []
    turns = r.get("human_turns", 0)
    tools = r.get("tool_uses", 0)
    edits = r.get("edit_calls", 0)
    greps = r.get("grep_calls", 0)
    scr = r.get("screenshot_turns", 0)
    flags: list[str] = []

    # RF1: quick-ops mis-classify — session ended up quick-ops by bucket
    # but exceeded the quick-ops thresholds. Given the bucket definition
    # already caps at turns ≤ 3 + tools ≤ 30, this fires on `unclassified`
    # sessions whose shape looks like a "grown" lookup.
    if bucket in ("quick-ops", "unclassified") and (turns > 5 or tools > 30):
        if turns > 5 and tools > 30:  # both exceeded — strong mis-classify signal
            flags.append("RF1")

    # RF2: build-code at L0 with ≥10 turns + ≥3 edits.
    if turns >= 10 and edits >= 3:
        flags.append("RF2")

    # RF3: debug at L0 with sustained fix pattern (grep+edit).
    if turns >= 10 and greps >= 5 and edits >= 3:
        flags.append("RF3")

    # RF4: ui-iteration at L0 with screenshot language + sustained turns.
    if scr >= 1 and turns >= 10:
        flags.append("RF4")

    # RF5: design-strategy at L0 with strategy framing + ≥5 turns.
    if r.get("has_strategy_framing") and turns >= 5:
        flags.append("RF5")

    # RF6 — Disqualifying. Security/audit scope at L0.
    if r.get("has_security_framing") or r.get("has_review_skill"):
        flags.append("RF6")

    # RF7: method-ops at L0 — handover read or infra-path touch.
    if r.get("has_handover_read") or r.get("has_infra_touch"):
        flags.append("RF7")

    return flags


def _scan_expert_roster(session_dir: str) -> tuple[dict, int, int]:
    """Walk from session_dir project name back to the repo's expert directory.
    Returns (available_by_domain, memory_file_count, notepad_file_count)."""
    available = defaultdict(set)
    dir_name = Path(session_dir).name
    repo_path = "/" + dir_name.lstrip("-").replace("-", "/")

    candidates = [
        Path(repo_path) / "_shared" / "experts",
        Path(repo_path) / "experts",
        Path(repo_path) / ".experts",
    ]
    expert_dir = next((c for c in candidates if c.is_dir()), None)
    if not expert_dir:
        return available, 0, 0

    for f in expert_dir.glob("*.md"):
        fn = f.name
        if fn.startswith(("EXPERT", "AGENT", "INDEX", "README")):
            continue
        if ".memory" in fn or ".notepad" in fn:
            continue
        if any(x in fn for x in ("register", "protocol", "system", "operating",
                                 "onboarding", "checklist", "blueprint",
                                 "360", "store", "suite", "log", "health")):
            continue
        stem = fn.replace(".md", "")
        for domain, patterns in DOMAIN_MAP.items():
            if any(p in stem for p in patterns):
                available[domain].add(fn)
                break

    memory_count = len(list(expert_dir.glob("*.memory*.md")))
    notepad_count = len(list(expert_dir.glob("*.notepad.md")))
    return available, memory_count, notepad_count


def run_expert_usage(session_dir: str, session_files: list) -> dict:
    """Run Expert Usage module. Returns raw metrics dict (no scoring, no PII)."""
    all_results = [_score_expert_session(f) for f in session_files]
    empty_count = sum(1 for r in all_results if r.get("is_empty"))
    # Filter empties out of the scoring denominator — see `_score_expert_session`.
    results = [r for r in all_results if not r.get("is_empty")]
    total = len(results)
    if total == 0:
        return {"total_sessions": 0, "empty_sessions_excluded": empty_count}

    level_dist = defaultdict(int)
    for r in results:
        level_dist[r["level"]] += 1

    # Recent-window cutoff — same 7-day horizon as the Hook Activity module,
    # so "this period" means one thing across the dashboard.
    recent_window_days = 7
    from datetime import timedelta
    cutoff_date = (datetime.now(timezone.utc) - timedelta(seconds=recent_window_days * 24 * 3600)).strftime("%Y-%m-%d")

    domain_sessions = defaultdict(int)
    domain_sessions_recent = defaultdict(int)
    recent_total = 0
    for r in results:
        ts = r.get("timestamp") or ""
        is_recent = ts and ts >= cutoff_date
        if is_recent:
            recent_total += 1
        for d in r["domains"]:
            domain_sessions[d] += 1
            if is_recent:
                domain_sessions_recent[d] += 1

    available_by_domain, memory_count, notepad_count = _scan_expert_roster(session_dir)

    max_level = max((r["level"] for r in results), default=0)
    avg_level = sum(r["level"] for r in results) / total
    l2_plus = sum(1 for r in results if r["level"] >= 2)

    domain_coverage = {}
    for d in DOMAIN_ORDER:
        invoked = domain_sessions.get(d, 0)
        invoked_recent = domain_sessions_recent.get(d, 0)
        available = len(available_by_domain.get(d, set()))
        # Four-state semantics:
        #   gap              — no personas at all
        #   unused_all_time  — personas exist, never invoked
        #   dormant_recently — invoked historically, quiet in the recent window
        #   active_recently  — invoked in the recent window
        if available == 0 and invoked == 0:
            status = "gap"
        elif invoked == 0:
            status = "unused_all_time"
        elif invoked_recent == 0:
            status = "dormant_recently"
        else:
            status = "active_recently"
        domain_coverage[d] = {
            "available": available,
            "invoked_sessions": invoked,
            "invoked_sessions_recent": invoked_recent,
            "invoked_pct": round(invoked * 100 / total, 1),
            "invoked_pct_recent": round(invoked_recent * 100 / recent_total, 1) if recent_total else 0.0,
            "status": status,
        }

    recent = results[-30:]
    recent_dist = defaultdict(int)
    for r in recent:
        recent_dist[r["level"]] += 1

    # ── Work-type classification (card §2) ────────────────────────────
    # Classify every non-empty session. Denormalised (bucket + flags)
    # lives only in the aggregate — the per-session dict is never
    # emitted, preserving the same privacy guarantee the rest of the
    # module already holds.
    bucket_counts = {b: 0 for b in BUCKET_ORDER}
    bucket_appropriate = {b: 0 for b in BUCKET_ORDER}
    red_flag_counts = {f"RF{n}": 0 for n in range(1, 8)}
    recent_bucket_counts = {b: 0 for b in BUCKET_ORDER}
    recent_bucket_appropriate = {b: 0 for b in BUCKET_ORDER}

    appropriate_total = 0
    scorable_total = 0  # total minus "unclassified" — card §8 open Q1
    for r in results:
        bucket = _classify_work_type(r)
        bucket_counts[bucket] += 1
        for flag in _compute_red_flags(r, bucket):
            red_flag_counts[flag] += 1
        if bucket != "unclassified":
            scorable_total += 1
            if r.get("level", 0) >= BUCKET_MIN_LEVEL[bucket]:
                bucket_appropriate[bucket] += 1
                appropriate_total += 1

    for r in recent:
        bucket = _classify_work_type(r)
        recent_bucket_counts[bucket] += 1
        if bucket != "unclassified" and r.get("level", 0) >= BUCKET_MIN_LEVEL[bucket]:
            recent_bucket_appropriate[bucket] += 1

    def _pct(n: int, d: int) -> float:
        return round(n * 100 / d, 1) if d else 0.0

    bucket_distribution = {
        b: {
            "count": bucket_counts[b],
            "pct": _pct(bucket_counts[b], total),
        }
        for b in BUCKET_ORDER
    }
    per_bucket_appropriateness = {
        b: {
            "count": bucket_counts[b],
            "appropriate": bucket_appropriate[b],
            "appropriate_pct": _pct(bucket_appropriate[b], bucket_counts[b]),
            "min_level_required": BUCKET_MIN_LEVEL.get(b, 0),
        }
        for b in BUCKET_ORDER if b != "unclassified"
    }
    recent_30_appropriateness = {
        b: {
            "count": recent_bucket_counts[b],
            "appropriate": recent_bucket_appropriate[b],
            "appropriate_pct": _pct(recent_bucket_appropriate[b], recent_bucket_counts[b]),
        }
        for b in BUCKET_ORDER if b != "unclassified"
    }

    return {
        "total_sessions": total,
        "empty_sessions_excluded": empty_count,
        "overall": {
            "peak_level": max_level,
            "average_level": round(avg_level, 2),
            "l2_plus_sessions": l2_plus,
            "l2_plus_pct": round(l2_plus * 100 / total, 1),
            # Appropriateness % — card §3 headline metric. Denominator
            # excludes unclassified (card §8 open Q1, pilot v0.1 will
            # re-test this decision).
            "scorable_sessions": scorable_total,
            "appropriate_sessions": appropriate_total,
            "appropriateness_pct": _pct(appropriate_total, scorable_total),
        },
        "level_distribution": {
            f"L{lvl}": {
                "count": level_dist[lvl],
                "pct": round(level_dist[lvl] * 100 / total, 1),
            }
            for lvl in range(6)
        },
        "domain_coverage": domain_coverage,
        "recent_window_days": recent_window_days,
        "recent_total_sessions": recent_total,
        "recent_30": {f"L{lvl}": recent_dist[lvl] for lvl in range(6)},
        "roster": {
            "memory_files": memory_count,
            "notepad_files": notepad_count,
        },
        # Work-type classification output (card §3) — live alongside
        # the depth-score fields above, never replacing them.
        "bucket_distribution": bucket_distribution,
        "per_bucket_appropriateness": per_bucket_appropriateness,
        "recent_30_appropriateness": recent_30_appropriateness,
        "red_flag_counts": red_flag_counts,
    }


# ═══════════════════════════════════════════════════════════════════════════
#  MODULE 2 — Token Efficiency Audit
# ═══════════════════════════════════════════════════════════════════════════

TOKEN_EFF_ID = "token-efficiency"
TOKEN_EFF_VERSION = "0.3.0"
TOKEN_EFF_NAME = "Token Efficiency Audit"

# Upset-language detector — phrase-based, not single words. Single words
# like "error" / "fix" / "bug" are part of normal task descriptions and were
# producing a ~50% false-positive rate in v0.2. Only multi-word phrases and
# a short list of explicit frustration tokens (nope, stuck, wtf, !!!) match.
UPSET_RE = re.compile(
    r"(doesn'?t work|does not work|not working|did not work|didn'?t work|"
    r"still (?:not|broken|failing|wrong|doesn'?t|isn'?t)|"
    r"not fixed|fix failed|broken (?:now|again|already)|completely broken|"
    r"that'?s wrong|that is wrong|you (?:just )?broke|"
    r"annoy(?:ing|ed|s)?|\bwtf\b|\bffs\b|\bugh\b|\bargh\b|"
    r"!!!|"
    r"\bnope\b|\bstuck\b|"
    r"undo (?:this|that|it)|revert (?:this|that|it)|\brollback\b|"
    r"why (?:is this|isn'?t|doesn'?t|won'?t|can'?t|did you|are you|the hell))",
    re.IGNORECASE,
)

# Bash-tool error detector — only scans results known to come from the Bash
# tool (via tool_use_id → tool_name mapping). Patterns chosen to match
# shell/interpreter failure output specifically, not generic occurrences of
# the word "error" in happy-path content.
BASH_ERROR_RE = re.compile(
    r"(traceback \(most recent call last\)|command not found|"
    r"no such file or directory|permission denied|"
    r"syntax error|segmentation fault|illegal (option|instruction)|"
    r"^error: |^fatal: |^error - |modulenotfounderror|"
    r"exit(ed)? (with )?(code |status )?[1-9])",
    re.IGNORECASE | re.MULTILINE,
)

# Slash-command expansions and system wrappers look like user text but are
# programmatic payloads. Skip them so upset-detection only scans real typed
# human messages.
def _is_human_user_text(text: str) -> bool:
    text = text.strip()
    if not text:
        return False
    if text.startswith("<"):
        return False
    if "<command-name>" in text or "<command-message>" in text:
        return False
    if text.startswith(("# ", "## ", "### ")):
        return False
    if len(text) > 2000:
        return False
    return True


def _parse_iso(ts):
    if not ts:
        return None
    try:
        if ts.endswith("Z"):
            ts = ts[:-1] + "+00:00"
        return datetime.fromisoformat(ts)
    except (ValueError, TypeError):
        return None


def _analyse_token_session(path: str):
    user_msgs: list[dict] = []
    text_user_msgs: list[dict] = []
    asst_msgs: list[dict] = []
    tool_results_total = 0
    tool_results_errors_any = 0     # any tool that flagged is_error (cheap signal)
    bash_results_total = 0          # tool_results whose originating tool was Bash
    bash_results_errors = 0         # Bash-scoped errors (is_error OR BASH_ERROR_RE)
    cache_read = 0
    cache_write = 0
    output_tokens = 0
    uncached_input = 0
    tool_name_by_id: dict[str, str] = {}  # tool_use_id → tool name

    try:
        with open(path, "r", encoding="utf-8", errors="replace") as fh:
            for line in fh:
                line = line.strip()
                if not line:
                    continue
                try:
                    rec = json.loads(line)
                except json.JSONDecodeError:
                    continue
                rtype = rec.get("type")
                if rtype == "user":
                    content = rec.get("message", {}).get("content", "")
                    user_msgs.append({"ts": rec.get("timestamp")})
                    if isinstance(content, list):
                        text_blocks = []
                        for block in content:
                            if not isinstance(block, dict):
                                continue
                            btype = block.get("type")
                            if btype == "tool_result":
                                tool_results_total += 1
                                if block.get("is_error"):
                                    tool_results_errors_any += 1
                                tuse_id = block.get("tool_use_id", "")
                                tname = tool_name_by_id.get(tuse_id, "")
                                if tname == "Bash":
                                    bash_results_total += 1
                                    if block.get("is_error"):
                                        bash_results_errors += 1
                                    else:
                                        bc = block.get("content", "")
                                        if isinstance(bc, list):
                                            bc = " ".join(
                                                c.get("text", "") if isinstance(c, dict) else str(c)
                                                for c in bc
                                            )
                                        if not isinstance(bc, str):
                                            bc = str(bc)
                                        if BASH_ERROR_RE.search(bc[:1500]):
                                            bash_results_errors += 1
                            elif btype == "text":
                                t = block.get("text", "")
                                if isinstance(t, str):
                                    text_blocks.append(t)
                        text = " ".join(text_blocks).strip()
                        if text and _is_human_user_text(text):
                            text_user_msgs.append({"ts": rec.get("timestamp"), "text": text})
                    elif isinstance(content, str):
                        if content.strip() and _is_human_user_text(content):
                            text_user_msgs.append({"ts": rec.get("timestamp"), "text": content})
                elif rtype == "assistant":
                    asst_msgs.append({"ts": rec.get("timestamp")})
                    msg = rec.get("message", {}) or {}
                    usage = msg.get("usage", {}) or {}
                    cache_read += usage.get("cache_read_input_tokens", 0) or 0
                    cache_write += usage.get("cache_creation_input_tokens", 0) or 0
                    output_tokens += usage.get("output_tokens", 0) or 0
                    uncached_input += usage.get("input_tokens", 0) or 0
                    ac = msg.get("content", []) or []
                    if isinstance(ac, list):
                        for b in ac:
                            if isinstance(b, dict) and b.get("type") == "tool_use":
                                tid = b.get("id")
                                if tid:
                                    tool_name_by_id[tid] = b.get("name", "")
    except OSError:
        return None

    if not user_msgs and not asst_msgs:
        return None

    all_ts = sorted(filter(None, (_parse_iso(m.get("ts")) for m in user_msgs + asst_msgs)))
    # ISO year-week bucket for weekly time-series. Use the session's first
    # timestamp so a session that straddles midnight counts in the week it
    # began.
    iso_week = ""
    if all_ts:
        y, w, _ = all_ts[0].isocalendar()
        iso_week = f"{y}-W{w:02d}"
    wall_clock_h = 0.0
    if len(all_ts) >= 2:
        wall_clock_h = (all_ts[-1] - all_ts[0]).total_seconds() / 3600.0
    active_h = 0.0
    if len(all_ts) >= 2:
        for a, b in zip(all_ts, all_ts[1:]):
            gap = (b - a).total_seconds()
            if gap < 7200:
                active_h += gap / 3600.0
    idle_ratio = (wall_clock_h - active_h) / wall_clock_h if wall_clock_h > 0 else 0.0

    upset_count = sum(1 for m in text_user_msgs if UPSET_RE.search(m["text"]))
    upset_pct = (upset_count / len(text_user_msgs)) if text_user_msgs else 0.0
    bash_err_pct = (bash_results_errors / bash_results_total) if bash_results_total else 0.0

    return {
        "iso_week": iso_week,
        "turns": len(user_msgs),
        "asst_turns": len(asst_msgs),
        "wall_clock_h": round(wall_clock_h, 2),
        "active_h": round(active_h, 2),
        "idle_ratio": round(idle_ratio, 2),
        "cache_read": cache_read,
        "cache_write": cache_write,
        "output_tokens": output_tokens,
        "uncached_input": uncached_input,
        "upset_pct": round(upset_pct * 100, 1),
        "bash_err_pct": round(bash_err_pct * 100, 1),
        "tool_calls": tool_results_total,
        "tool_errors_any": tool_results_errors_any,
        "bash_calls": bash_results_total,
        "bash_errors": bash_results_errors,
        "human_user_msgs": len(text_user_msgs),
    }


def run_token_efficiency(session_dir: str, session_files: list) -> dict:
    """Run Token Efficiency module. Returns aggregate metrics only.
    No session paths, filenames, or conversation text are emitted."""
    sessions = []
    for f in session_files:
        s = _analyse_token_session(f)
        if s:
            sessions.append(s)

    if not sessions:
        return {"total_sessions": 0}

    total_turns = sum(s["turns"] for s in sessions)
    total_output = sum(s["output_tokens"] for s in sessions)
    total_cread = sum(s["cache_read"] for s in sessions)
    total_cwrite = sum(s["cache_write"] for s in sessions)
    total_uncached = sum(s["uncached_input"] for s in sessions)
    total_active_h = sum(s["active_h"] for s in sessions)
    avg_turns = total_turns / len(sessions) if sessions else 0

    top_n = 10
    whales = sorted(sessions, key=lambda s: s["cache_read"], reverse=True)[:top_n]
    whale_share = sum(w["cache_read"] for w in whales) / total_cread if total_cread else 0

    vital_pool = [s for s in sessions if s["turns"] >= 20]
    vital = sorted(
        vital_pool,
        key=lambda s: s["upset_pct"] + s["bash_err_pct"],
        reverse=True,
    )[:top_n]
    avg_upset = sum(v["upset_pct"] for v in vital) / len(vital) if vital else 0
    avg_bash_err = sum(v["bash_err_pct"] for v in vital) / len(vital) if vital else 0

    # Session-length histogram — counts only
    buckets = [("0-25", 0, 25), ("25-100", 25, 100), ("100-200", 100, 200),
               ("200-500", 200, 500), ("500+", 500, 10**9)]
    hist = {label: 0 for label, _, _ in buckets}
    for s in sessions:
        t = s["turns"]
        for label, lo, hi in buckets:
            if lo <= t < hi:
                hist[label] += 1
                break

    # Weekly time-series — ISO-week buckets, sorted ascending. For each week,
    # emit: session count, cache reads, output tokens, ratio, the week's top
    # session's share of cache reads (whale concentration that week), and the
    # week's average upset% / bash-err% across sessions with ≥20 turns.
    weekly_buckets: dict[str, list] = defaultdict(list)
    for s in sessions:
        wk = s.get("iso_week") or "unknown"
        weekly_buckets[wk].append(s)

    weekly = []
    for wk in sorted(weekly_buckets.keys()):
        wk_sess = weekly_buckets[wk]
        wk_cread = sum(s["cache_read"] for s in wk_sess)
        wk_output = sum(s["output_tokens"] for s in wk_sess)
        wk_active = sum(s["active_h"] for s in wk_sess)
        top_session = max(wk_sess, key=lambda s: s["cache_read"])
        top_share = (top_session["cache_read"] / wk_cread) if wk_cread else 0
        wk_vital = [s for s in wk_sess if s["turns"] >= 20]
        wk_upset = (sum(s["upset_pct"] for s in wk_vital) / len(wk_vital)) if wk_vital else 0
        wk_bash = (sum(s["bash_err_pct"] for s in wk_vital) / len(wk_vital)) if wk_vital else 0
        weekly.append({
            "week": wk,
            "sessions": len(wk_sess),
            "cache_reads": wk_cread,
            "output_tokens": wk_output,
            "ratio": round(wk_cread / wk_output, 1) if wk_output else 0,
            "top_session_share_pct": round(top_share * 100, 1),
            "active_hours": round(wk_active, 1),
            "avg_upset_pct": round(wk_upset, 1),
            "avg_bash_err_pct": round(wk_bash, 1),
        })

    return {
        "total_sessions": len(sessions),
        "totals": {
            "turns": total_turns,
            "output_tokens": total_output,
            "cache_reads": total_cread,
            "cache_writes": total_cwrite,
            "uncached_input": total_uncached,
            "active_hours": round(total_active_h, 1),
        },
        "ratios": {
            "avg_turns_per_session": round(avg_turns, 1),
            "cache_read_to_output": round(total_cread / total_output, 1) if total_output else 0,
        },
        "top_sessions_aggregate": {
            "top_n": len(whales),
            "cache_read_share_pct": round(whale_share * 100, 1),
            "avg_turns_in_top": round(sum(w["turns"] for w in whales) / len(whales), 1) if whales else 0,
            "avg_wall_clock_h_in_top": round(sum(w["wall_clock_h"] for w in whales) / len(whales), 2) if whales else 0,
            "avg_idle_ratio_in_top": round(sum(w["idle_ratio"] for w in whales) / len(whales), 2) if whales else 0,
        },
        "vital_signs_worst": {
            "top_n": len(vital),
            "avg_upset_pct": round(avg_upset, 1),
            "avg_bash_err_pct": round(avg_bash_err, 1),
        },
        "session_length_histogram": hist,
        "weekly": weekly,
    }


# ═══════════════════════════════════════════════════════════════════════════
#  MODULE 3 — Model Routing Audit
# ═══════════════════════════════════════════════════════════════════════════
#
# Measures compliance with Critical Friction Rules §Model Routing
# (~/.claude/CLAUDE.md): subagents default to Haiku.
#
# What it reports (aggregate only, no prompt text, no paths):
#   - models_used: per-model call counts and token totals (parent-model only;
#     subagent internal calls are server-side and not in local logs).
#   - baseline_replay_per_call: distribution (median / p90 / max) of the cache
#     reads on the FIRST assistant message of each session — this is the
#     structural floor that every call pays before any real work.
#   - agent_compliance: of all Agent/Task tool invocations in user-visible
#     tool_use blocks, percentage that passed model:"haiku" / sonnet / opus /
#     missing. Missing = subagent inherited parent model = rule violation.

MODEL_ROUTING_ID = "model-routing"
MODEL_ROUTING_VERSION = "0.1.0"
MODEL_ROUTING_NAME = "Model Routing Audit"


def _analyse_model_routing_session(path: str) -> dict | None:
    """Scan one session file. Return model + agent-compliance counts."""
    try:
        with open(path) as f:
            lines = f.readlines()
    except Exception:
        return None

    per_model: dict[str, dict] = defaultdict(
        lambda: {"calls": 0, "output": 0, "cache_read": 0, "cache_write": 0}
    )
    agent_calls: dict[str, int] = defaultdict(int)  # haiku/sonnet/opus/missing/other
    first_call_cache_read: int | None = None

    for line in lines:
        try:
            r = json.loads(line)
        except Exception:
            continue

        if r.get("type") != "assistant":
            continue
        # Skip sidechains from aggregation (we haven't seen any in this project
        # but stay defensive; future Claude Code versions may log them).
        if r.get("isSidechain"):
            continue

        msg = r.get("message") or {}
        model = msg.get("model") or "unknown"
        usage = msg.get("usage") or {}
        pm = per_model[model]
        pm["calls"] += 1
        pm["output"] += usage.get("output_tokens", 0) or 0
        pm["cache_read"] += usage.get("cache_read_input_tokens", 0) or 0
        pm["cache_write"] += usage.get("cache_creation_input_tokens", 0) or 0

        if first_call_cache_read is None:
            first_call_cache_read = usage.get("cache_read_input_tokens", 0) or 0

        # Scan tool_use blocks in this assistant message for Agent/Task calls
        content = msg.get("content") or []
        if isinstance(content, list):
            for block in content:
                if not isinstance(block, dict):
                    continue
                if block.get("type") != "tool_use":
                    continue
                name = block.get("name") or ""
                if name not in ("Agent", "Task"):
                    continue
                tinput = block.get("input") or {}
                submodel = (tinput.get("subagent_model") or tinput.get("model") or "").strip().lower()
                if not submodel:
                    agent_calls["missing"] += 1
                elif submodel == "haiku":
                    agent_calls["haiku"] += 1
                elif submodel == "sonnet":
                    agent_calls["sonnet"] += 1
                elif submodel == "opus":
                    agent_calls["opus"] += 1
                else:
                    agent_calls["other"] += 1

    if not per_model:
        return None

    return {
        "per_model": {m: dict(v) for m, v in per_model.items()},
        "agent_calls": dict(agent_calls),
        "first_call_cache_read": first_call_cache_read or 0,
    }


def _percentile(values: list[int], p: float) -> int:
    if not values:
        return 0
    vs = sorted(values)
    k = int(len(vs) * p)
    if k >= len(vs):
        k = len(vs) - 1
    return vs[k]


def run_model_routing(session_dir: str, session_files: list) -> dict:
    """Run Model Routing module. Aggregate-only output."""
    sessions = []
    for f in session_files:
        s = _analyse_model_routing_session(f)
        if s:
            sessions.append(s)

    if not sessions:
        return {"total_sessions": 0}

    # Aggregate per-model
    models_agg: dict[str, dict] = defaultdict(
        lambda: {"calls": 0, "output": 0, "cache_read": 0, "cache_write": 0}
    )
    for s in sessions:
        for m, v in s["per_model"].items():
            ma = models_agg[m]
            ma["calls"] += v["calls"]
            ma["output"] += v["output"]
            ma["cache_read"] += v["cache_read"]
            ma["cache_write"] += v["cache_write"]

    total_calls = sum(m["calls"] for m in models_agg.values())
    models_used = {}
    for m, v in models_agg.items():
        models_used[m] = {
            **v,
            "share_of_calls_pct": round(v["calls"] / total_calls * 100, 1) if total_calls else 0,
        }

    # Baseline replay distribution (cache reads on call #1 of each session)
    firsts = [s["first_call_cache_read"] for s in sessions if s["first_call_cache_read"]]
    baseline = {
        "sessions_counted": len(firsts),
        "median_tokens": _percentile(firsts, 0.5),
        "p90_tokens": _percentile(firsts, 0.9),
        "max_tokens": max(firsts) if firsts else 0,
    }

    # Agent-call compliance
    agent_totals: dict[str, int] = defaultdict(int)
    for s in sessions:
        for k, v in s["agent_calls"].items():
            agent_totals[k] += v
    total_agent = sum(agent_totals.values())
    agent_compliance = {
        "total_agent_calls": total_agent,
        "counts": dict(agent_totals),
    }
    if total_agent:
        agent_compliance["haiku_compliance_pct"] = round(
            agent_totals.get("haiku", 0) / total_agent * 100, 1
        )
        agent_compliance["missing_model_pct"] = round(
            agent_totals.get("missing", 0) / total_agent * 100, 1
        )
        agent_compliance["opus_share_pct"] = round(
            agent_totals.get("opus", 0) / total_agent * 100, 1
        )

    return {
        "total_sessions": len(sessions),
        "models_used": models_used,
        "baseline_replay_per_call": baseline,
        "agent_compliance": agent_compliance,
    }


# ═══════════════════════════════════════════════════════════════════════════
#  MODULE 4 — Hook Activity
#
#  Scans ~/.claude/hooks/*.log and summarises guard fires. Emits counts,
#  distinct-session counts, last-fired ISO date, top patterns per guard.
#  Never emits excerpts, paths, or raw text — only categorical aggregates.
# ═══════════════════════════════════════════════════════════════════════════

HOOK_ACTIVITY_ID = "hook-activity"
HOOK_ACTIVITY_VERSION = "0.1.0"
HOOK_ACTIVITY_NAME = "Hook Activity"

# Guard registry: log filename → {slug, kind}
# kind=blocked → guard prevented waste; kind=warned → guard logged a leak
# that continued; kind=rewritten → guard forced a re-send (wasted output turn).
HOOK_GUARDS = {
    "bash-disallowed.log":           {"slug": "bash-disallowed",           "kind": "blocked"},
    "read-before-edit.log":          {"slug": "read-before-edit",          "kind": "blocked"},
    "agent-model-guard.log":         {"slug": "agent-model-guard",         "kind": "warned"},
    "human-time-guard.log":          {"slug": "human-time-guard",          "kind": "blocked"},
    "human-time-guard-stop.log":     {"slug": "human-time-guard-stop",     "kind": "rewritten"},
    "expert-binding.log":            {"slug": "expert-binding",            "kind": "warned"},
    "inner-voice-guard.log":         {"slug": "inner-voice-guard",         "kind": "warned"},
    "akira-alerts.log":              {"slug": "akira-alerts",              "kind": "warned"},
    "fast-mode-suggestion-guard.log":{"slug": "fast-mode-suggestion-guard","kind": "warned"},
    "brief-session-link.log":        {"slug": "brief-session-link",        "kind": "warned"},
}

# Tab-separated logs: col 0 = ISO timestamp, col 1 = pattern/category or
# session=<id> marker, col 2+ varies. We keep parsing conservative — a
# malformed line is skipped, not fatal.
_SESSION_RE = re.compile(r"session=([^\s\t]+)")


def _parse_hook_log(path: Path, cutoff_iso: str) -> dict:
    fires = 0
    fires_recent = 0
    distinct_sessions: set[str] = set()
    distinct_sessions_recent: set[str] = set()
    last_ts = ""
    pattern_counts: dict[str, int] = defaultdict(int)
    pattern_counts_recent: dict[str, int] = defaultdict(int)
    try:
        with path.open("r", encoding="utf-8", errors="replace") as fh:
            for line in fh:
                line = line.rstrip("\n")
                if not line:
                    continue
                parts = line.split("\t")
                if len(parts) < 2:
                    continue
                ts = parts[0]
                recent = ts >= cutoff_iso
                if ts > last_ts:
                    last_ts = ts
                fires += 1
                if recent:
                    fires_recent += 1
                p = parts[1]
                if not p.startswith("session="):
                    pattern_counts[p] += 1
                    if recent:
                        pattern_counts_recent[p] += 1
                m = _SESSION_RE.search(line)
                sid = None
                if m:
                    sid = m.group(1)
                else:
                    for token in parts[1:5]:
                        if len(token) == 36 and token.count("-") == 4:
                            sid = token
                            break
                if sid:
                    distinct_sessions.add(sid)
                    if recent:
                        distinct_sessions_recent.add(sid)
    except OSError:
        return {
            "fires": 0, "fires_recent": 0,
            "distinct_sessions": 0, "distinct_sessions_recent": 0,
            "last_fired": "",
            "top_patterns": [], "top_patterns_recent": [],
        }

    top = sorted(pattern_counts.items(), key=lambda kv: (-kv[1], kv[0]))[:5]
    top_r = sorted(pattern_counts_recent.items(), key=lambda kv: (-kv[1], kv[0]))[:5]
    return {
        "fires": fires,
        "fires_recent": fires_recent,
        "distinct_sessions": len(distinct_sessions),
        "distinct_sessions_recent": len(distinct_sessions_recent),
        "last_fired": last_ts[:10] if last_ts else "",
        "top_patterns": [{"pattern": k, "count": v} for k, v in top],
        "top_patterns_recent": [{"pattern": k, "count": v} for k, v in top_r],
    }


RECENT_WINDOW_SECONDS = 7 * 24 * 3600


def run_hook_activity(session_dir: str, session_files: list) -> dict:
    from datetime import timedelta
    hooks_dir = Path(os.path.expanduser("~/.claude/hooks"))
    guards_out: dict[str, dict] = {}
    grand_fires = 0
    grand_fires_recent = 0

    if not hooks_dir.is_dir():
        return {
            "total_sessions": 0,
            "guards": {},
            "totals": {
                "fires": 0, "fires_recent": 0,
                "guards_active": 0, "guards_active_recent": 0,
                "guards_present": 0,
            },
            "recent_window_days": 7,
            "hooks_dir_present": False,
        }

    cutoff_dt = datetime.now(timezone.utc) - timedelta(seconds=RECENT_WINDOW_SECONDS)
    cutoff_iso = cutoff_dt.isoformat()

    for fname, meta in HOOK_GUARDS.items():
        path = hooks_dir / fname
        if not path.exists():
            guards_out[meta["slug"]] = {
                "present": False, "kind": meta["kind"],
                "fires": 0, "fires_recent": 0,
                "distinct_sessions": 0, "distinct_sessions_recent": 0,
                "last_fired": "",
                "top_patterns": [], "top_patterns_recent": [],
            }
            continue
        parsed = _parse_hook_log(path, cutoff_iso)
        guards_out[meta["slug"]] = {"present": True, "kind": meta["kind"], **parsed}
        grand_fires += parsed["fires"]
        grand_fires_recent += parsed["fires_recent"]

    active = sum(1 for g in guards_out.values() if g["present"] and g["fires"] > 0)
    active_recent = sum(1 for g in guards_out.values() if g["present"] and g.get("fires_recent", 0) > 0)
    present = sum(1 for g in guards_out.values() if g["present"])
    max_distinct = max(
        (g.get("distinct_sessions", 0) for g in guards_out.values()), default=0
    )

    return {
        "total_sessions": max_distinct,
        "guards": guards_out,
        "totals": {
            "fires": grand_fires,
            "fires_recent": grand_fires_recent,
            "guards_active": active,
            "guards_active_recent": active_recent,
            "guards_present": present,
        },
        "recent_window_days": 7,
        "hooks_dir_present": True,
    }


# ═══════════════════════════════════════════════════════════════════════════
#  Module registry + CLI
# ═══════════════════════════════════════════════════════════════════════════

MODULES = {
    EXPERT_USAGE_ID: {
        "id": EXPERT_USAGE_ID,
        "version": EXPERT_USAGE_VERSION,
        "name": EXPERT_USAGE_NAME,
        "run": run_expert_usage,
    },
    TOKEN_EFF_ID: {
        "id": TOKEN_EFF_ID,
        "version": TOKEN_EFF_VERSION,
        "name": TOKEN_EFF_NAME,
        "run": run_token_efficiency,
    },
    MODEL_ROUTING_ID: {
        "id": MODEL_ROUTING_ID,
        "version": MODEL_ROUTING_VERSION,
        "name": MODEL_ROUTING_NAME,
        "run": run_model_routing,
    },
    HOOK_ACTIVITY_ID: {
        "id": HOOK_ACTIVITY_ID,
        "version": HOOK_ACTIVITY_VERSION,
        "name": HOOK_ACTIVITY_NAME,
        "run": run_hook_activity,
    },
}


def _pick_project_dir(projects_dir: str, project_arg: str | None) -> Path | None:
    base = Path(projects_dir)
    if not base.is_dir():
        return None
    if project_arg:
        candidate = base / project_arg
        return candidate if candidate.is_dir() else None
    candidates = [d for d in base.iterdir() if d.is_dir()]
    if not candidates:
        return None
    return max(candidates, key=lambda d: len(list(d.glob("*.jsonl"))))


def _available_module_list() -> str:
    return ", ".join(sorted(MODULES.keys()))


def main():
    parser = argparse.ArgumentParser(
        description=(
            f"{SCRIPT_NAME} v{SCRIPT_VERSION} — local audit of Claude Code session logs.\n"
            "Available modules: " + _available_module_list()
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("--projects-dir", default=DEFAULT_PROJECTS_DIR,
                        help=f"Path to Claude projects dir (default {DEFAULT_PROJECTS_DIR})")
    parser.add_argument("--project", default=None,
                        help="Specific project folder name (default: auto-detect largest)")
    parser.add_argument("--modules", default="all",
                        help="Comma-separated module IDs to run (default: all)")
    parser.add_argument("--recent", type=int, default=0,
                        help="Only analyse the most recent N session files")
    parser.add_argument("--json", action="store_true",
                        help="Emit JSON (required for upload to SuperMark Observability)")
    parser.add_argument("--version", action="store_true",
                        help="Print version info and exit")
    args = parser.parse_args()

    if args.version:
        print(f"{SCRIPT_NAME} v{SCRIPT_VERSION}")
        for m in MODULES.values():
            print(f"  module: {m['id']:20s} v{m['version']}  — {m['name']}")
        return

    # Resolve module list
    if args.modules == "all":
        selected = list(MODULES.keys())
    else:
        selected = [m.strip() for m in args.modules.split(",") if m.strip()]
        unknown = [m for m in selected if m not in MODULES]
        if unknown:
            print(f"Unknown module(s): {', '.join(unknown)}", file=sys.stderr)
            print(f"Available: {_available_module_list()}", file=sys.stderr)
            sys.exit(2)

    # Resolve session directory
    session_dir = _pick_project_dir(args.projects_dir, args.project)
    if not session_dir:
        print(f"No Claude Code sessions found at {args.projects_dir}", file=sys.stderr)
        print("Is Claude Code installed and have you run at least one session?",
              file=sys.stderr)
        sys.exit(1)

    session_files = sorted(str(p) for p in session_dir.glob("*.jsonl"))
    if args.recent:
        session_files = session_files[-args.recent:]
    if not session_files:
        print(f"No JSONL files found in {session_dir}", file=sys.stderr)
        sys.exit(1)

    # Run each selected module
    modules_out = {}
    for mid in selected:
        mod = MODULES[mid]
        try:
            raw_metrics = mod["run"](str(session_dir), session_files)
        except Exception as e:
            print(f"  ! module {mid} failed: {e}", file=sys.stderr)
            continue
        modules_out[mid] = {
            "module_version": mod["version"],
            "module_name": mod["name"],
            "raw_metrics": raw_metrics,
        }

    report = {
        "script_version": SCRIPT_VERSION,
        "script_name": SCRIPT_NAME,
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "client_meta": {
            "session_count": len(session_files),
            "modules_requested": selected,
        },
        "modules": modules_out,
    }

    if args.json:
        print(json.dumps(report, indent=2))
        # Guidance to stderr so redirection still shows next steps
        print(file=sys.stderr)
        print(f"  ✓ {SCRIPT_NAME} v{SCRIPT_VERSION}", file=sys.stderr)
        print(f"  ✓ {len(session_files)} sessions scanned across "
              f"{len(modules_out)} module(s): {', '.join(modules_out.keys())}",
              file=sys.stderr)
        print(file=sys.stderr)
        print("  Next: return to SuperMark Observability in your browser",
              file=sys.stderr)
        print("        and upload the JSON (drop file or paste).", file=sys.stderr)
        print(file=sys.stderr)
    else:
        # Text preview — shape only, not scoring
        print(f"\n{SCRIPT_NAME} v{SCRIPT_VERSION}")
        print("=" * 62)
        print(f"  Scanned:  {session_dir.name}")
        print(f"  Sessions: {len(session_files)}")
        print(f"  Modules:  {', '.join(selected)}")
        print()
        for mid, mout in modules_out.items():
            rm = mout["raw_metrics"]
            print(f"  [{mid}] v{mout['module_version']}")
            if mid == EXPERT_USAGE_ID:
                o = rm.get("overall", {})
                print(f"    peak L{o.get('peak_level', 0)}, "
                      f"avg L{o.get('average_level', 0)}, "
                      f"L2+ {o.get('l2_plus_pct', 0)}%")
                print(f"    appropriateness {o.get('appropriateness_pct', 0)}% "
                      f"({o.get('appropriate_sessions', 0)}/"
                      f"{o.get('scorable_sessions', 0)} scorable)")
                bd = rm.get("bucket_distribution", {})
                if bd:
                    mix = ", ".join(
                        f"{b}:{bd[b]['count']}"
                        for b in BUCKET_ORDER if bd.get(b, {}).get("count", 0)
                    )
                    print(f"    buckets: {mix}")
                rf = rm.get("red_flag_counts", {})
                if rf and any(rf.values()):
                    rfs = ", ".join(f"{k}:{v}" for k, v in rf.items() if v)
                    print(f"    red flags: {rfs}")
            elif mid == TOKEN_EFF_ID:
                t = rm.get("totals", {})
                r = rm.get("ratios", {})
                print(f"    turns {t.get('turns', 0):,}, "
                      f"cache reads {t.get('cache_reads', 0):,}, "
                      f"cache:output {r.get('cache_read_to_output', 0)}x")
            elif mid == MODEL_ROUTING_ID:
                mu = rm.get("models_used", {})
                ac = rm.get("agent_compliance", {})
                br = rm.get("baseline_replay_per_call", {})
                def _short(m: str) -> str:
                    return m[7:] if m.startswith("claude-") else m
                models_summary = ", ".join(
                    f"{_short(m)}={v['share_of_calls_pct']}%"
                    for m, v in sorted(mu.items(), key=lambda x: -x[1]['calls'])[:3]
                )
                print(f"    models: {models_summary}")
                print(f"    baseline replay/call: median {br.get('median_tokens', 0):,}  "
                      f"p90 {br.get('p90_tokens', 0):,}")
                print(f"    agent calls: {ac.get('total_agent_calls', 0)}, "
                      f"haiku-compliance {ac.get('haiku_compliance_pct', 0)}%  "
                      f"missing-model {ac.get('missing_model_pct', 0)}%")
            print()
        print("  Re-run with --json to produce an uploadable report.")
        print()


if __name__ == "__main__":
    main()
