Source code for goodvibes.output

"""Output formatting and printing functions for GoodVibes."""
import json
import logging
import math
import os.path
import sys
from datetime import datetime, timedelta, timezone

from .utils import display_name, get_console_stdout, get_console_dat
from .selectivity import get_selectivity
from .constants import GAS_CONSTANT, ATMOS, J_TO_AU, KCAL_TO_AU, __version__
from .io import qcdata_to_dict

from .pes import get_pes, graph_reaction_profile
from .thermo import calc_bbe
from .media import solvents


# --------------------------------------------------------------------------
# Structured (JSON) output
# --------------------------------------------------------------------------
# Schema is now stable from v1.0 onward — see goodvibes.schema for the
# single source of truth (TypedDicts, version constants, validator).
# Re-exported here as a module attribute for back-compat with anything
# that imports `JSON_SCHEMA_VERSION` directly.
from .schema import SCHEMA_VERSION as JSON_SCHEMA_VERSION

# Run-level option keys that should appear in the JSON header, mapped from
# their Namespace attribute names. Any options not in this list are dropped
# (avoids leaking nuisance flags like --output, --custom_ext, etc.).
_JSON_OPTION_KEYS = (
    'temperature', 'temperature_interval', 'conc',
    'QS', 'QH', 'freq_cutoff', 'S_freq_cutoff', 'H_freq_cutoff',
    'freq_scale_factor', 'media', 'freespace', 'spc',
    'invert', 'symm', 'duplicate', 'boltz', 'inertia',
    'mm_freq_scale_factor',
)

# calc_bbe attributes whose names match exactly between the object and the
# JSON output. Optional — None when calc_bbe didn't compute them (SP-only).
_THERMO_FIELDS = (
    'scf_energy', 'sp_energy', 'zpe', 'zero_point_corr',
    'enthalpy', 'qh_enthalpy',
    'entropy', 'qh_entropy',
    'gibbs_free_energy', 'qh_gibbs_free_energy',
    'frequency_wn', 'im_frequency_wn', 'inverted_freqs',
    'point_group', 'symmno', 'linear_mol',
    'multiplicity', 'job_type', 'applied_freq_scale_factor',
)


def _options_to_json(options):
    """Project the argparse Namespace down to the JSON-relevant subset."""
    out = {}
    for key in _JSON_OPTION_KEYS:
        out[key] = getattr(options, key, None)
    return out


def _bbe_to_json(bbe, file_path, media_conc=None, boltz_factor=None):
    """Serialize one calc_bbe instance + its underlying QCData to a dict."""
    entry = {
        'file': os.path.abspath(file_path),
        'name': display_name(file_path),
    }
    qcdata = getattr(bbe, 'xyz', None)
    if qcdata is not None:
        d = qcdata_to_dict(qcdata)
        d.pop('_cache_version', None)
        entry['qcdata'] = d
    thermo = {}
    for field_name in _THERMO_FIELDS:
        thermo[field_name] = getattr(bbe, field_name, None)
    entry['thermo'] = thermo
    entry['media_conc'] = media_conc
    entry['boltzmann_factor'] = boltz_factor
    return entry






def _format_ratio(populations, labels, scale=100):
    """Format populations as integer-percent ratio string ('60:40' or '40:30:20:10')."""
    rounded = [round(populations[label] * scale) for label in labels]
    return ':'.join(str(v) for v in rounded)


def _print_selectivity_single(result, method=""):
    """Per-species table + summary footer for a single SelectivityResult."""
    HA_TO_KCAL = 627.509541
    suffix = f", {method}" if method else ""
    log.info(f"\n   Selectivity{suffix} ({result.key}, T = {result.temperature:.2f} K)")

    # Per-species table: leading marker column (★ for major) | Species |
    # Files | Population (%) | ΔΔG (kcal/mol, vs. major).
    table = Table(
        box=rich_box.SIMPLE_HEAD,
        show_header=True,
        show_edge=False,
        pad_edge=False,
        highlight=False,
        header_style="",
    )
    table.add_column("", width=3)             # marker col, mirrors print_results
    table.add_column("Species", no_wrap=True)
    table.add_column("Files", justify="right")
    table.add_column("Population (%)", justify="right")
    table.add_column("ΔΔG (kcal/mol)", justify="right")

    # ΔΔG = -RT ln(p / p_max); the major species is 0 by construction,
    # others are positive (less stable). Skip when p == 0 (printed as "—").
    ref_pop = max(result.populations.values())
    rt_kcal = 8.3144621 * result.temperature / 1000.0 / 4.184  # kcal/mol
    for label in result.labels:
        p = result.populations[label]
        n_files = len(result.files_per_label.get(label, []))
        marker = "★" if label == result.preferred else " "
        if p > 0:
            dG = -rt_kcal * math.log(p / ref_pop)
            # The major species's dG can be -0.0 from float math; force +0.000.
            if abs(dG) < 1e-9:
                dG = 0.0
            dG_str = f"{dG:.3f}"
        else:
            dG_str = "—"
        table.add_row(marker, label, str(n_files), f"{p * 100:.2f}", dG_str)

    _print_rich_table(table)

    # Summary line: ratio (always); excess + ΔΔG for N=2.
    n = len(result.labels)
    ratio_str = _format_ratio(result.populations, result.labels)
    pieces = [f"Ratio {':'.join(result.labels)} = {ratio_str}"]
    pieces.append(f"Major: {result.preferred}")
    if n == 2 and result.ee is not None:
        pieces.append(f"excess = {result.ee:.2f}%")
    if n == 2 and result.ddG is not None:
        pieces.append(f"ΔΔG = {result.ddG * HA_TO_KCAL:.2f} kcal/mol")
    log.info("\n   " + "   ".join(pieces) + "\n")


def _print_selectivity_scan(results, method=""):
    """One row per temperature: excess/ΔΔG for N=2, populations only for N>2."""
    HA_TO_KCAL = 627.509541
    labels = results[0].labels
    n = len(labels)
    suffix = f", {method}" if method else ""
    log.info(f"\n   Selectivity scan{suffix} ({results[0].key})")

    table = Table(
        box=rich_box.SIMPLE_HEAD,
        show_header=True,
        show_edge=False,
        pad_edge=False,
        highlight=False,
        header_style="",
    )
    table.add_column("", width=3)             # leading indent, mirrors single-T
    table.add_column("T (K)", justify="right")
    for label in labels:
        table.add_column(f"{label} (%)", justify="right")
    if n == 2:
        table.add_column("excess (%)", justify="right")
        table.add_column("ΔΔG (kcal/mol)", justify="right")
    table.add_column("Major", justify="center")

    for r in results:
        row = ["", f"{r.temperature:g}"]
        for label in labels:
            row.append(f"{r.populations[label] * 100:.2f}")
        if n == 2:
            row.append(f"{r.ee:.2f}" if r.ee is not None else "—")
            ddG_kc = r.ddG * HA_TO_KCAL if r.ddG is not None else None
            row.append(f"{ddG_kc:.2f}" if ddG_kc is not None else "—")
        row.append(r.preferred)
        table.add_row(*row)

    _print_rich_table(table)
    log.info("\n")


def _selectivity_to_json(selectivity_results):
    """Serialize a list of SelectivityResult instances for the JSON payload.

    For N=2 each entry includes ee + ddG; for N>2 those fields are null
    and consumers derive any ratios they need from `populations`.
    """
    if not selectivity_results:
        return None
    first = selectivity_results[0]
    return {
        'labels': list(first.labels),
        'key': first.key,
        'results': [
            {
                'temperature': r.temperature,
                'populations': dict(r.populations),
                'raw_boltzmann': dict(r.raw_boltzmann),
                'preferred': r.preferred,
                'ee': r.ee,
                'ddG': r.ddG,
                'files_per_label': {k: list(v) for k, v in r.files_per_label.items()},
            }
            for r in selectivity_results
        ],
    }


def _pes_to_json(result, temperature):
    """Serialize a PESResult into the v0.4 JSON `pes` block.

    Per-pathway, per-point: the original label, the species breakdown
    (with coefficient and resolved files), and the relative thermo bundle
    (Hartree internally, converted to user units in the `units` field).
    """
    if result is None or not result.pathways:
        return None
    pathways_out = []
    units_factor = result.options.to_user_units(1.0)   # Hartree → kcal/mol or kJ/mol
    lowest_only = getattr(result.options, 'lowest_only', False)
    rollup_kw = dict(gconf=result.options.gconf, QH=result.options.QH,
                     lowest_only=lowest_only)
    for pathway in result.pathways:
        zero_th = pathway.zero.thermo(temperature, **rollup_kw)
        points_out = []
        for point in pathway.points:
            pt_th = point.thermo(temperature, **rollup_kw)
            rel = pt_th - zero_th
            points_out.append({
                'label': point.label,
                'species': [
                    {
                        'coefficient': coeff,
                        'name': cset.name,
                        'files': list(cset.files),
                    }
                    for coeff, cset in point.species
                ],
                'relative': {
                    'scf': rel.scf_energy * units_factor,
                    'zpe': rel.zpe * units_factor,
                    'h': rel.enthalpy * units_factor,
                    'qh_h': rel.qh_enthalpy * units_factor,
                    'ts': temperature * rel.entropy * units_factor,
                    'qh_ts': temperature * rel.qh_entropy * units_factor,
                    'g': rel.gibbs * units_factor,
                    'qh_g': rel.qh_gibbs * units_factor,
                    'spc': (rel.sp_energy * units_factor) if rel.sp_energy is not None else None,
                },
            })
        pathways_out.append({
            'name': pathway.name,
            'temperature': temperature,
            'units': result.options.units,
            'zero_label': pathway.zero.label,
            'points': points_out,
        })
    return {'pathways': pathways_out}


[docs] def write_json_results(thermo_data, options, path, media_conc_per_file=None, boltz_facs=None, selectivity_results=None, selectivity_lowest_results=None, pes_result=None): """Write structured run results to *path* as JSON. Captures every parsed QCData field plus computed thermo per file, the schema version, the goodvibes version, and the run options. Schema is preview/v0.x — fields may shift before v5.0 stabilizes it. Parameters: thermo_data (dict): file path -> calc_bbe. options (Namespace): the parsed CLI options. path (str): output JSON file path. media_conc_per_file (dict, optional): file path -> neat solvent concentration applied to that specific file (None for files where --media didn't match). boltz_facs (dict, optional): file path -> Boltzmann factor. selectivity_results (list[SelectivityResult], optional): one entry per temperature; included as a top-level "selectivity" block. """ media_conc_per_file = media_conc_per_file or {} boltz_facs = boltz_facs or {} payload = { 'schema_version': JSON_SCHEMA_VERSION, 'goodvibes_version': __version__, 'generated_at': datetime.now(timezone.utc).isoformat(timespec='seconds'), 'options': _options_to_json(options), 'results': [ _bbe_to_json( bbe, file, media_conc=media_conc_per_file.get(file), boltz_factor=boltz_facs.get(file), ) for file, bbe in thermo_data.items() ], } sel_block = _selectivity_to_json(selectivity_results) if sel_block is not None: payload['selectivity'] = sel_block sel_low_block = _selectivity_to_json(selectivity_lowest_results) if sel_low_block is not None: payload['selectivity_lowest'] = sel_low_block pes_block = _pes_to_json(pes_result, getattr(options, 'temperature', 298.15)) if pes_block is not None: payload['pes'] = pes_block # default=str catches anything stringifiable that json doesn't natively # know about (e.g., the '!' sentinel value calc_bbe uses for failed SPC). with open(path, 'w', encoding='utf-8') as f: json.dump(payload, f, indent=2, default=str) log.info(f"\n ✔ Structured results written to {path} (schema v{JSON_SCHEMA_VERSION}).")
try: from rich.table import Table from rich import box as rich_box except ImportError: Table = None rich_box = None log = logging.getLogger('goodvibes') def _print_rich_table(table: "Table") -> None: """Print a Rich Table to both stdout and .dat file consoles. Renders to a buffer per console, strips Rich's trailing blank padding row (`box.SIMPLE` always adds one), and writes directly so the separator that follows lands immediately under the data. """ if Table is None: return import re from io import StringIO from rich.console import Console ANSI_RE = re.compile(r"\x1b\[[0-9;]*[mGK]") def _render(console): buf = StringIO() Console( file=buf, force_terminal=console.is_terminal, color_system=console.color_system, width=console.width, ).print(table) text = buf.getvalue().rstrip("\n") lines = text.split("\n") # Drop trailing visually-blank lines (Rich SIMPLE-box padding). while lines and not ANSI_RE.sub("", lines[-1]).strip(): lines.pop() # Leading "\n\n" gives exactly one blank line before the table: # the logger uses terminator='' so the prior message has no trailing # \n; the first \n closes that line, the second is the blank line. return "\n\n" + "\n".join(lines) + "\n" for console in (get_console_stdout(), get_console_dat()): console.file.write(_render(console)) console.file.flush() # Measure visible width via a plain render (no escape codes). stdout_console = get_console_stdout() plain_buf = StringIO() Console(file=plain_buf, color_system=None, width=stdout_console.width).print(table) line_width = max( (len(line.rstrip()) for line in plain_buf.getvalue().split("\n")), default=0, ) log.info("─" * line_width) log.info("\n") # --------------------------------------------------------------------------- # PES tables (v4.2 Rich renderer; see Sub-plan B in ROADMAP.md) # --------------------------------------------------------------------------- def _pes_column_spec(spc_used: bool, QH: bool): """Column headers + which ThermoVector field each one reads. Returns a list of (header, field, scale_by_T) tuples. `field` is the attribute on a ThermoVector; `scale_by_T` indicates the column is T·S rather than raw S/H/G (i.e. needs an extra multiplication at render). Without --spc the energy/H/G columns use their plain names; with --spc, H and G are SPC-substituted in calc_bbe so the labels are annotated `_SPC` to signal that to the reader (the values themselves are taken from the same fields). """ cols = [] if spc_used: cols.append(("ΔE_SPC", "sp_energy", False)) cols.append(("ΔE", "scf_energy", False)) cols.append(("ΔZPE", "zpe", False)) h_label = "ΔH_SPC" if spc_used else "ΔH" cols.append((h_label, "enthalpy", False)) if QH: qh_h_label = "Δqh-H_SPC" if spc_used else "Δqh-H" cols.append((qh_h_label, "qh_enthalpy", False)) cols.append(("T·ΔS", "entropy", True)) cols.append(("T·Δqh-S", "qh_entropy", True)) g_label = "ΔG(T)_SPC" if spc_used else "ΔG(T)" cols.append((g_label, "gibbs", False)) qhg_label = "Δqh-G(T)_SPC" if spc_used else "Δqh-G(T)" cols.append((qhg_label, "qh_gibbs", False)) return cols def _build_pes_table(pathway, options, temperature, pes_options) -> "Table": """Build a Rich Table for one Pathway at one temperature.""" spc_used = bool(getattr(options, 'spc', None)) cols = _pes_column_spec(spc_used, options.QH) # Concentration: when --conc isn't set the default is gas-phase 1 atm, # so report "p = 1 atm" rather than its numeric P/RT equivalent. user_conc = getattr(options, 'conc', None) state_str = f"c = {user_conc:.4g} mol/L" if user_conc else "p = 1 atm" if getattr(pes_options, 'lowest_only', False): mode_str = " — lowest conformer per species" elif not pes_options.gconf: mode_str = " — Boltzmann-averaged (no gconf)" else: mode_str = "" title = ( f"RXN: {pathway.name} ({pes_options.units}) " f"at T = {temperature:g} K, {state_str}{mode_str}" ) table = Table(title=title, box=rich_box.SIMPLE, header_style="bold") table.add_column("", justify="left", no_wrap=True) # leading marker (matches selectivity tables) table.add_column("Species", justify="left", no_wrap=True) for header, _, _ in cols: table.add_column(header, justify="right") units_factor = pes_options.to_user_units(1.0) fmt = f"{{:.{pes_options.decimals}f}}" rollup_kw = dict( gconf=pes_options.gconf, QH=pes_options.QH, lowest_only=getattr(pes_options, 'lowest_only', False), ) zero_th = pathway.zero.thermo(temperature, **rollup_kw) for point in pathway.points: pt_th = point.thermo(temperature, **rollup_kw) rel = pt_th - zero_th row = ["", point.label] for _header, field, scale_by_T in cols: value = getattr(rel, field) if value is None: row.append("—") continue if scale_by_T: value = temperature * value row.append(fmt.format(value * units_factor)) table.add_row(*row) return table def _build_results_table(options) -> "Table": """ Create a Rich Table configured for thermochemistry result columns based on the provided options. Columns included depend on the options flags: QH, spc, boltz, symm/pg, and imag_freq. Column widths and numeric precision are derived from options.dp when present. Parameters: options: An object with attributes used to determine the table layout (commonly includes `dp`, `QH`, `spc`, `boltz`, `symm`, `pg`, and `imag_freq`). Returns: table (Table | None): A configured rich.table.Table ready for rows, or `None` if Rich is not available. """ if Table is None: return None dp = getattr(options, 'dp', 6) dw = dp - 6 ew = 11 + dw # wide numeric column (energies) sw = 2 + dp # small column (ZPE, T.S, T.qh-S - always 0.xxx) table = Table( box=rich_box.SIMPLE_HEAD, show_header=True, show_edge=False, pad_edge=False, highlight=False, header_style="", # Disable bold header styling ) # Status column (o/x markers) table.add_column("", width=3) table.add_column("Structure", no_wrap=True) if options.spc is not None: table.add_column("E_SPC", min_width=ew, justify="right") table.add_column("E", min_width=ew, justify="right") table.add_column("ZPE", min_width=sw, justify="right") h_col = "H_SPC" if options.spc is not None else "H" if options.QH: table.add_column(h_col, min_width=ew, justify="right") table.add_column("qh-H", min_width=ew, justify="right") else: table.add_column(h_col, min_width=ew, justify="right") table.add_column("T.S", min_width=sw, justify="right") table.add_column("T.qh-S", min_width=sw, justify="right") if options.spc is not None: table.add_column("G(T)_SPC", min_width=ew, justify="right") table.add_column("qh-G(T)_SPC", min_width=ew, justify="right") else: table.add_column("G(T)", min_width=ew, justify="right") table.add_column("qh-G(T)", min_width=ew, justify="right") if options.boltz: table.add_column("Boltz", min_width=7, justify="right") if options.symm or options.pg: table.add_column("Point Group", min_width=13, justify="right") if options.imag_freq is True: table.add_column("im freq", min_width=8, justify="right") if _selectivity_active(options): table.add_column("Grel (kcal/mol)", min_width=8, justify="right") return table def _selectivity_active(options): """True when any selectivity flag is on (--label / --selectivity / --ee).""" return bool(getattr(options, 'labels', None) or getattr(options, 'selectivity_spec', None) or getattr(options, 'ee', None)) # --graph rendering moved up to GoodVibes.main() in v5.0 so it # fires in single-T mode too (v4.2's print_pes_tables doesn't go # through this function). The legacy T-interval path still calls # print_pes_results, but main() handles the graph either way.