SSM-Audit — Mini Calculator (CLI) — Full Script (7.11)

CLI 💻 — try our mini Calculator (offline browser build) to identify the drift (Mini CLI Download Page)


What this page is: the complete, copy-paste CLI to compute lanes/bands, optional alerts, plots, and an SDI roll-up. Numbers stay identical (phi((m,a)) = m).
How to run (brief):

  • Demo mode (no CSV): python ssm_audit_mini_calc.py --demo --demo_days 10 --build_id demoW3 --sdi --sdi_plot
  • CSV mode: python ssm_audit_mini_calc.py input.csv output.csv --compute_a auto --promote 0.05 --demote -0.05 --sdi

No third-party data is embedded here; this tool can generate a tiny synthetic demo or read your CSV.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SSM-Audit Mini Calculator (CLI)

Canonicals (ASCII):
  phi((m,a)) = m
  Clamp: a := clamp(a, -1+eps_a, +1-eps_a)
  Band thresholds: A++: a>=0.75; A+: a>=0.50; A0: a>=0.25; A-: a>=0.10; else A--
  Hysteresis: promote if delta_a >= +promote; demote if delta_a <= demote
  Rapidity fuse: a_out := tanh( (SUM w*atanh(a)) / max(SUM w, eps_w) )
  SDI (portfolio): a_sdi := tanh( (SUM atanh(a_i)) / n )
"""

import argparse, csv, json, hashlib, math, os, sys
from collections import defaultdict, deque
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Tuple

# ---------- helpers ----------
def clamp(x, lo, hi):
    return lo if x < lo else hi if x > hi else x

SCORE = {"A--":1,"A-":2,"A0":3,"A+":4,"A++":5}

def band_from_a(a: float) -> str:
    if a >= 0.75: return "A++"
    elif a >= 0.50: return "A+"
    elif a >= 0.25: return "A0"
    elif a >= 0.10: return "A-"
    else: return "A--"

def canonical_json(d: Dict) -> str:
    return json.dumps(d, sort_keys=True, separators=(",", ":"))

def sha256_ascii(s: str) -> str:
    return hashlib.sha256(s.encode("utf-8")).hexdigest()

def knobs_hash_from(knobs: Dict) -> str:
    return sha256_ascii(canonical_json(knobs))

def parse_iso(ts: str) -> datetime:
    if not ts:
        # UTC, date-only
        return datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
    if "T" in ts:
        return datetime.fromisoformat(ts.replace("Z","+00:00")).replace(tzinfo=None)
    try:
        return datetime.strptime(ts, "%Y-%m-%d")
    except Exception:
        return datetime.fromisoformat(ts)

# ---------- compute-a mappers ----------
def compute_a_from_mapper(row: Dict, eps_a: float) -> Tuple[float, str]:
    mapper = (row.get("mapper","") or "").strip().lower()
    try:
        if mapper == "coverage":
            q = float(row.get("q",""))
            a = 2.0*q - 1.0
            return (clamp(a, -1+eps_a, 1-eps_a), "coverage")
        elif mapper == "agreement":
            m1 = float(row.get("m1","")); m2 = float(row.get("m2","")); b = float(row.get("b",""))
            if b <= 0: return (None, "agreement:b<=0")
            d = abs(m1 - m2)
            a = math.tanh(1.0 - d/b)
            return (clamp(a, -1+eps_a, 1-eps_a), "agreement")
        elif mapper == "residual":
            actual = float(row.get("actual","")); forecast = float(row.get("forecast","")); s = float(row.get("s",""))
            if s <= 0: return (None, "residual:s<=0")
            k = float(row.get("k","1.0") or 1.0)
            r = actual - forecast
            a = math.tanh(k*(1.0 - abs(r)/s))
            return (clamp(a, -1+eps_a, 1-eps_a), "residual")
        else:
            return (None, "no-mapper")
    except Exception as e:
        return (None, f"mapper-error:{mapper}:{e}")

# ---------- hysteresis ----------
def next_band(prev_band: str, a_prev: float, a_now: float, promote: float, demote: float) -> str:
    raw = band_from_a(a_now)
    d = a_now - (a_prev if a_prev is not None else a_now)
    if SCORE[raw] > SCORE.get(prev_band, SCORE[raw]) and d >= promote: return raw
    if SCORE[raw] < SCORE.get(prev_band, SCORE[raw]) and d <= demote:  return raw
    return prev_band or raw

# ---------- alerts ----------
def compute_alerts(rows_sorted: List[Dict], slope_7d: float=-0.02) -> List[Dict]:
    alerts, win = [], deque(maxlen=3)
    for r in rows_sorted:
        win.append(SCORE[r["band_hyst"]])
        if len(win) == 3:
            drops = (win[2] < win[1]) + (win[1] < win[0]) + (win[2] < win[0])
            if drops >= 2:
                alerts.append({"time": r["time"], "kpi": r["kpi"], "rule":"degrade_2of3"})
    if len(rows_sorted) >= 8:
        a0, a7 = rows_sorted[-8]["a"], rows_sorted[-1]["a"]
        slope = (a7 - a0) / 7.0
        if slope <= slope_7d:
            alerts.append({"time": rows_sorted[-1]["time"], "kpi": rows_sorted[-1]["kpi"],
                           "rule":"slope_7d", "value": round(slope,4)})
    return alerts

# ---------- plotting ----------
try:
    import matplotlib.pyplot as plt
    import matplotlib.dates as mdates
    HAVE_PLOT = True
except Exception:
    HAVE_PLOT = False

def plot_kpi(rows_sorted: List[Dict], kpi: str, out_dir: str,
             color_m: str="tab:blue", color_a: str="tab:orange",
             tag: str=None, keep_plain: bool=False) -> str:
    if not HAVE_PLOT:
        print("[plot] matplotlib not available; skipping.")
        return None

    ts = [parse_iso(r["time"]) for r in rows_sorted]
    m  = [float(r["m"]) for r in rows_sorted]
    a  = [float(r["a"]) for r in rows_sorted]

    fig, ax1 = plt.subplots(figsize=(12,4))
    ax1.plot(ts, m, label=f"m (classical: {kpi})", linewidth=2, color=color_m, zorder=3)
    ax1.set_xlabel("time"); ax1.set_ylabel("m")
    ax1.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d"))

    ax2 = ax1.twinx()
    ax2.plot(ts, a, label="a (lane)", linewidth=2, linestyle="--", color=color_a, zorder=3)
    ax2.set_ylabel("a in (-1,+1)"); ax2.set_ylim(-1.0, 1.0)

    for th in [0.10, 0.25, 0.50, 0.75]:
        ax2.axhline(th, linestyle="--", linewidth=1, alpha=0.35, label="_nolegend_", zorder=1)

    lines = [l for l in (ax1.get_lines() + ax2.get_lines())
             if l.get_label() and not l.get_label().startswith("_")]
    labels = [l.get_label() for l in lines]
    leg = ax1.legend(lines, labels, loc="upper left",
                     frameon=True, framealpha=1.0, facecolor="white", edgecolor="0.8")
    leg.set_zorder(10)

    ax1.grid(True, linestyle="--", alpha=0.35, zorder=0)
    fig.tight_layout()

    os.makedirs(out_dir, exist_ok=True)
    base = f"{kpi}.png"
    tagged = f"{kpi}_{tag}.png" if tag else base
    path_tagged = os.path.join(out_dir, tagged)
    fig.savefig(path_tagged, dpi=150)
    if keep_plain and tagged != base:
        fig.savefig(os.path.join(out_dir, base), dpi=150)
    plt.close(fig)
    return path_tagged

# ---------- SDI helper (native) ----------
# a_sdi := tanh( (SUM atanh(a_i)) / n )
def append_sdi_rows_from_csv(output_csv_path: str, eps_a: float, sdi_kpis: str,
                             sdi_csv: str, do_plot: bool, plots_dir: str):
    def _clamp(a, eps):
        if a > 1.0 - eps: return 1.0 - eps
        if a < -1.0 + eps: return -1.0 + eps
        return a
    def _norm(name: str) -> str:
        return (name or "").strip().strip('"').strip("'")

    # read & purge existing SDI rows
    header, base_rows = None, []
    with open(output_csv_path, "r", encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        header = rdr.fieldnames or ["time","kpi","m","a","band","band_hyst","knobs_hash","build_id","stamp"]
        for r in rdr:
            if (r.get("kpi","") or "") == "SDI":
                continue
            base_rows.append(r)

    # robust filter: strip quotes/spaces around tokens
    filt = set(_norm(s) for s in (sdi_kpis or "").split(",") if _norm(s))

    # collect a-values per day after optional KPI filter
    per_day = defaultdict(list)
    for r in base_rows:
        kpi = _norm(r.get("kpi",""))
        if filt and kpi not in filt:
            continue
        t = r.get("time","")
        if not t:
            continue
        try:
            a_val = float(r.get("a",""))
        except Exception:
            continue
        per_day[t].append(a_val)

    # compute SDI rows
    sdi_rows = []
    for t in sorted(per_day.keys()):
        a_list = per_day[t]
        if not a_list:
            continue
        u_sum = 0.0
        for ai in a_list:
            u_sum += math.atanh(_clamp(ai, eps_a))
        a_sdi = math.tanh(u_sum / max(len(a_list), 1))
        sdi_rows.append({
            "time": t, "kpi": "SDI", "m": "0.0", "a": f"{a_sdi:.10f}",
            "band": band_from_a(a_sdi), "band_hyst": band_from_a(a_sdi),
            "knobs_hash": "", "build_id": "", "stamp": ""
        })

    # write purged + SDI
    with open(output_csv_path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=header)
        w.writeheader()
        for r in base_rows:
            w.writerow(r)
        for r in sdi_rows:
            w.writerow(r)

    # optional SDI-only CSV
    if sdi_csv:
        with open(sdi_csv, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=["time","kpi","m","a","band","band_hyst","knobs_hash","build_id","stamp"])
            w.writeheader()
            for r in sdi_rows:
                w.writerow(r)

    # optional SDI plot
    if do_plot and HAVE_PLOT and sdi_rows:
        try:
            xs = [r["time"] for r in sdi_rows]
            ys = [float(r["a"]) for r in sdi_rows]
            fig, ax = plt.subplots(figsize=(10,3.5))
            ax.plot(xs, ys, marker="o")
            ax.set_title("SDI (tanh-mean of lanes)")
            ax.set_xlabel("time"); ax.set_ylabel("a_sdi")
            ax.grid(True, linestyle="--", alpha=0.35)
            fig.tight_layout()
            os.makedirs(plots_dir, exist_ok=True)
            out_path = os.path.join(plots_dir, "SDI.png")
            fig.savefig(out_path, dpi=150)
            plt.close(fig)
            print(f"[OK] SDI plot: {out_path}")
        except Exception as e:
            print(f"[WARN] SDI plot skipped: {e}")

    print(f"[OK] SDI appended: {len(sdi_rows)} rows")

# ---------- DEMO generator ----------
def demo_rows(start_iso: str, days: int, eps_a: float) -> List[Dict]:
    start = parse_iso(start_iso) if start_iso else \
            datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).replace(tzinfo=None)
    out = []
    k, s  = 1.2, 50_000.0
    b     = 0.005
    q0    = 0.88

    for i in range(days):
        t = (start + timedelta(days=i)).strftime("%Y-%m-%d")

        # Revenue_actual (smooth ramp; one mid-curve miss)
        forecast = 1_000_000 + 30_000*i
        actual   = forecast + (20_000 if i==6 else (-10_000 if i<3 else 5_000))
        r = actual - forecast
        a_res = math.tanh(k*(1 - abs(r)/s))
        out.append({"time":t,"kpi":"Revenue_actual","m":float(actual),
                    "a": clamp(a_res, -1+eps_a, 1-eps_a)})

        # AR_collected_issued (coverage easing then recovering)
        q = q0 + 0.01*i if i<3 else q0 + 0.03 - 0.008*(i-3)
        q = max(0.0, min(1.0, q))
        a_cov = 2*q - 1
        out.append({"time":t,"kpi":"AR_collected_issued","m":0.96,
                    "a": clamp(a_cov, -1+eps_a, 1-eps_a)})

        # Refunds_agreement (drift then re-align)
        m_platform = 0.021 + 0.0002*i
        m_bank     = 0.020 + (0.0004*i if i<5 else 0.00005*i)
        d = abs(m_platform - m_bank)
        a_agree = math.tanh(1 - d/b)
        out.append({"time":t,"kpi":"Refunds_agreement","m":m_platform,
                    "a": clamp(a_agree, -1+eps_a, 1-eps_a)})
    return out

# ---------- main ----------
def main():
    p = argparse.ArgumentParser(description="SSM-Audit Mini Calculator (values unchanged: phi((m,a)) = m)")

    # positional (optional in demo mode)
    p.add_argument("input_csv", nargs="?", default=None)
    p.add_argument("output_csv", nargs="?", default=None)

    # knobs
    p.add_argument("--build_id", default="demo")
    p.add_argument("--eps_a", type=float, default=1e-6)
    p.add_argument("--eps_w", type=float, default=1e-12)
    p.add_argument("--gamma", type=float, default=1.0)
    p.add_argument("--promote", type=float, default=0.05)
    p.add_argument("--demote",  type=float, default=-0.05)

    # alerts / plotting
    p.add_argument("--alerts_csv")
    p.add_argument("--slope_7d", type=float, default=-0.02)
    p.add_argument("--plot_kpi")
    p.add_argument("--plots_dir", default="Plots")
    p.add_argument("--color_m", default="tab:blue")
    p.add_argument("--color_a", default="tab:orange")
    p.add_argument("--plot_tag", default="", help="suffix for plot filename; defaults to DEMO/CSV by mode")
    p.add_argument("--plot_keep_plain", action="store_true", help="also write plain <kpi>.png")

    # SDI (portfolio roll-up)
    p.add_argument("--sdi", action="store_true", help="compute portfolio SDI across KPIs per day")
    p.add_argument("--sdi_kpis", default="", help="comma-separated KPI filter for SDI (default: all KPIs)")
    p.add_argument("--sdi_csv", default="", help="optional SDI-only CSV output")
    p.add_argument("--sdi_plot", action="store_true", help="also plot SDI")

    # demo controls
    p.add_argument("--demo", action="store_true", help="run built-in sample without CSV")
    p.add_argument("--demo_days", type=int, default=10)
    p.add_argument("--demo_start", default="", help="YYYY-MM-DD (optional)")

    # CSV mapper control
    p.add_argument("--compute_a", default="auto", choices=["auto","on","off"],
                   help="build 'a' from mapper columns (coverage/agreement/residual)")

    args = p.parse_args()

    # knobs hash (deterministic fingerprint)
    knobs = {
        "eps_a": args.eps_a, "eps_w": args.eps_w, "gamma": args.gamma,
        "division_policy": "strict",
        "bands": {"A++":0.75, "A+":0.50, "A0":0.25, "A-":0.10, "A--":-float("inf")},
        "hysteresis": {"promote": args.promote, "demote": args.demote},
        "mappings": "mini_calc"
    }
    khash = knobs_hash_from(knobs)

    # --- Build rows: DEMO or CSV ---
    rows: List[Dict] = []

    if args.demo:
        rows = demo_rows(args.demo_start, args.demo_days, args.eps_a)
        if not args.output_csv:
            args.output_csv = "mini_calc_output.csv"
    else:
        if not args.input_csv or not args.output_csv:
            print("[ERR] Provide input_csv and output_csv, or use --demo")
            sys.exit(2)
        with open(args.input_csv, newline="", encoding="utf-8") as f:
            rdr = csv.DictReader(f)
            header = [h.strip() for h in (rdr.fieldnames or [])]
            have_mapper = "mapper" in [h.lower() for h in header]
            compute_flag = (args.compute_a == "on") or (args.compute_a == "auto" and have_mapper)
            for r in rdr:
                r = {k: (r.get(k,"") or "").strip() for k in r.keys()}
                # fill a if needed
                a_value = r.get("a","")
                a_float = None
                if compute_flag and (a_value == "" or a_value.lower() == "na"):
                    a_float, _reason = compute_a_from_mapper(r, args.eps_a)
                else:
                    try:
                        a_float = float(a_value)
                    except Exception:
                        a_float = 0.0
                r["a"] = clamp(a_float if a_float is not None else 0.0, -1+args.eps_a, 1-args.eps_a)

                # default m if residual mapper present
                if r.get("m","") == "" and (r.get("mapper","") or "").lower() == "residual":
                    try:
                        r["m"] = float(r.get("actual","0") or 0.0)
                    except Exception:
                        r["m"] = 0.0
                rows.append(r)

    # --- Group by (time,kpi); fuse a if multiple rows share the key ---
    grouped: Dict[Tuple[str,str], List[Dict]] = defaultdict(list)
    for r in rows:
        key = (r.get("time",""), r.get("kpi",""))
        grouped[key].append(r)

    fused_rows: List[Dict] = []
    for (t,k), lst in grouped.items():
        lst_sorted = sorted(lst, key=lambda z: canonical_json(z))
        # classical m: prefer explicit m if present; else 0.0
        m_num = None
        for mv in [z.get("m","") for z in lst_sorted]:
            try:
                m_num = float(mv); break
            except Exception:
                continue
        if m_num is None: m_num = 0.0

        # fuse a in rapidity space with optional weights
        num, den = 0.0, 0.0
        for z in lst_sorted:
            a_i = float(z.get("a",0) or 0)
            w_i = float(z.get("weight",1) or 1)
            a_i = clamp(a_i, -1+args.eps_a, 1-args.eps_a)
            num += w_i * math.atanh(a_i)
            den += w_i
        a_out = math.tanh(num / max(den, args.eps_w))

        fused_rows.append({
            "time": t or datetime.now(timezone.utc).strftime("%Y-%m-%d"),
            "kpi": k, "m": m_num, "a": a_out
        })

    # --- Per-KPI time sort, hysteresis, hash/build tags ---
    by_kpi: Dict[str, List[Dict]] = defaultdict(list)
    for r in fused_rows:
        r["band"] = band_from_a(r["a"])
        by_kpi[r["kpi"]].append(r)

    latest = {}
    for kpi, lst in by_kpi.items():
        lst.sort(key=lambda z: parse_iso(z["time"]))
        prev_band, prev_a = None, None
        for r in lst:
            r["band_hyst"] = next_band(prev_band, prev_a, r["a"], args.promote, args.demote)
            r["knobs_hash"] = khash
            r["build_id"] = args.build_id
            prev_band, prev_a = r["band_hyst"], r["a"]
        latest[kpi] = lst[-1]

    # --- Write output CSV ---
    fieldnames = ["time","kpi","m","a","band","band_hyst","knobs_hash","build_id","stamp"]
    with open(args.output_csv, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames); w.writeheader()
        for kpi in sorted(by_kpi.keys()):
            for r in by_kpi[kpi]:
                out = {k: r.get(k,"") for k in fieldnames}
                out["a"] = f"{float(out['a']):.4f}"
                w.writerow(out)

    # --- Alerts (optional) ---
    if args.alerts_csv:
        all_alerts = []
        for kpi, lst in by_kpi.items():
            all_alerts.extend(compute_alerts(lst, args.slope_7d))
        with open(args.alerts_csv, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=["time","kpi","rule","value"])
            w.writeheader()
            for ar in all_alerts: w.writerow(ar)
        print(f"Alerts written to: {args.alerts_csv}")

    # --- Plot KPI (optional) ---
    if args.plot_kpi and args.plot_kpi in by_kpi:
        tag = args.plot_tag or ("DEMO" if args.demo else "CSV")
        path = plot_kpi(by_kpi[args.plot_kpi], args.plot_kpi, args.plots_dir,
                        color_m=args.color_m, color_a=args.color_a,
                        tag=tag, keep_plain=args.plot_keep_plain)
        if path: print(f"Plot saved: {path}")

    # --- SDI (native; purge+append) ---
    if args.sdi:
        append_sdi_rows_from_csv(
            output_csv_path=args.output_csv,
            eps_a=args.eps_a,
            sdi_kpis=args.sdi_kpis,
            sdi_csv=args.sdi_csv,
            do_plot=args.sdi_plot,
            plots_dir=args.plots_dir
        )

    # --- Console summary ---
    print("=== Mini Calculator Summary (latest by KPI) ===")
    for kpi, r in latest.items():
        print(f"{kpi}: m={r['m']}, a={float(r['a']):.4f}, band={r['band']}, band_hyst={r['band_hyst']}")
    if args.sdi:
        try:
            last_sdi = None
            with open(args.output_csv, "r", encoding="utf-8") as f:
                for row in csv.DictReader(f):
                    if row.get("kpi") == "SDI":
                        last_sdi = row
            if last_sdi:
                print(f"\nSDI: m={last_sdi.get('m','0.0')}, a={float(last_sdi['a']):.4f}, "
                      f"band={last_sdi.get('band')}, band_hyst={last_sdi.get('band_hyst')}")
        except Exception:
            pass

    print(f"\nOutput written to: {args.output_csv}")
    print(f"knobs_hash: {knobs_hash_from(knobs)}")

if __name__ == "__main__":
    main()


Navigation
Previous: SSM-Audit – Appendices: Kits, Templates, Verifiers (7.1–7.10)
Next: SSM-Audit – Conclusion


Directory of Pages
SSM-Audit – Table of Contents


Frequently asked questions
SSM-Audit – Q & A


Explore Further
https://github.com/OMPSHUNYAYA/Symbolic-Mathematical-Audit

Disclaimer
Research/observation only. Not for operational, safety-critical, or legal decision-making.