from __future__ import annotations from hdbcli import dbapi from mcp.server.fastmcp import Context from hana_performance_mcp.formatting import format_bytes from hana_performance_mcp.server import get_pool async def get_table_statistics( schema_name: str, table_name: str | None = None, ctx: Context = None, ) -> str: """Retrieve SAP HANA table statistics for a schema, or column-level detail for a specific table. Args: schema_name: The database schema to inspect. table_name: Optional table name. If provided, returns column-level detail. If omitted, returns an overview of the top 50 tables by size. """ pool = get_pool(ctx) try: if table_name is None: return _schema_overview(pool, schema_name) return _table_detail(pool, schema_name, table_name) except dbapi.Error as e: return f"HANA Error ({e.errorcode}): {e.errortext}" except Exception as e: return f"Error: {e}" def _schema_overview(pool, schema_name: str) -> str: with pool.get_cursor() as cursor: cursor.execute( "SELECT TABLE_NAME, TABLE_TYPE, IS_COLUMN_TABLE, RECORD_COUNT, " "TABLE_SIZE, IS_PARTITIONED, READ_COUNT, WRITE_COUNT, LOADED " "FROM M_TABLES WHERE SCHEMA_NAME = ? " "ORDER BY TABLE_SIZE DESC LIMIT 50", (schema_name,), ) columns = [desc[0] for desc in cursor.description] rows = cursor.fetchall() if not rows: return f"No tables found in schema '{schema_name}'." lines = [ f"## Table Statistics for Schema: {schema_name} (top {len(rows)} by size)\n" ] lines.append( f"{'Table':<40} {'Type':<8} {'Col?':<5} {'Rows':>12} " f"{'Size':>12} {'Part?':<6} {'Reads':>10} {'Writes':>10} {'Loaded':<8}" ) lines.append("-" * 125) for row in rows: t = dict(zip(columns, row)) name = t["TABLE_NAME"] or "N/A" if len(name) > 38: name = name[:35] + "..." ttype = t["TABLE_TYPE"] or "N/A" col = "Yes" if t["IS_COLUMN_TABLE"] == "TRUE" else "No" records = t["RECORD_COUNT"] if t["RECORD_COUNT"] is not None else 0 size = format_bytes(t["TABLE_SIZE"]) part = "Yes" if t["IS_PARTITIONED"] == "TRUE" else "No" reads = t["READ_COUNT"] if t["READ_COUNT"] is not None else 0 writes = t["WRITE_COUNT"] if t["WRITE_COUNT"] is not None else 0 loaded = t["LOADED"] or "N/A" lines.append( f"{name:<40} {ttype:<8} {col:<5} {records:>12} " f"{size:>12} {part:<6} {reads:>10} {writes:>10} {loaded:<8}" ) return "\n".join(lines) def _table_detail(pool, schema_name: str, table_name: str) -> str: with pool.get_cursor() as cursor: cursor.execute( "SELECT TABLE_NAME, TABLE_TYPE, IS_COLUMN_TABLE, RECORD_COUNT, " "TABLE_SIZE, IS_PARTITIONED, READ_COUNT, WRITE_COUNT, LOADED " "FROM M_TABLES WHERE SCHEMA_NAME = ? AND TABLE_NAME = ?", (schema_name, table_name), ) table_cols = [desc[0] for desc in cursor.description] table_row = cursor.fetchone() if not table_row: return f"Table '{schema_name}.{table_name}' not found." cursor.execute( "SELECT COLUMN_NAME, DATA_TYPE_NAME, DISTINCT_COUNT, " "MEMORY_SIZE_IN_TOTAL, COMPRESSION_TYPE, LOADED, " "INTERNAL_ATTRIBUTE_TYPE " "FROM M_CS_COLUMNS " "WHERE SCHEMA_NAME = ? AND TABLE_NAME = ? " "ORDER BY MEMORY_SIZE_IN_TOTAL DESC", (schema_name, table_name), ) col_columns = [desc[0] for desc in cursor.description] col_rows = cursor.fetchall() t = dict(zip(table_cols, table_row)) lines = [f"## Table Detail: {schema_name}.{table_name}\n"] lines.append(f"- Type: {t['TABLE_TYPE']} | Column Store: {'Yes' if t['IS_COLUMN_TABLE'] == 'TRUE' else 'No'}") lines.append(f"- Records: {t['RECORD_COUNT']} | Size: {format_bytes(t['TABLE_SIZE'])}") lines.append(f"- Partitioned: {'Yes' if t['IS_PARTITIONED'] == 'TRUE' else 'No'} | Loaded: {t['LOADED'] or 'N/A'}") lines.append(f"- Reads: {t['READ_COUNT']} | Writes: {t['WRITE_COUNT']}") if not col_rows: lines.append("\nNo column statistics available (table may not be column-store).") return "\n".join(lines) lines.append(f"\n### Column Details ({len(col_rows)} columns)\n") lines.append( f"{'Column':<35} {'Data Type':<18} {'Distinct':>10} " f"{'Memory':>12} {'Compression':<15} {'Loaded':<8} {'Attr Type':<10}" ) lines.append("-" * 120) for row in col_rows: c = dict(zip(col_columns, row)) name = c["COLUMN_NAME"] or "N/A" if len(name) > 33: name = name[:30] + "..." dtype = c["DATA_TYPE_NAME"] or "N/A" distinct = c["DISTINCT_COUNT"] if c["DISTINCT_COUNT"] is not None else "N/A" mem = format_bytes(c["MEMORY_SIZE_IN_TOTAL"]) compression = c["COMPRESSION_TYPE"] or "N/A" loaded = c["LOADED"] or "N/A" attr_type = c["INTERNAL_ATTRIBUTE_TYPE"] or "N/A" lines.append( f"{name:<35} {dtype:<18} {distinct:>10} " f"{mem:>12} {compression:<15} {loaded:<8} {attr_type:<10}" ) return "\n".join(lines)