#!/usr/bin/env python3 import argparse import json import math import os from pathlib import Path try: import pandas as pd except ImportError: raise SystemExit( 'pandas is required. Run with the code-interpreter venv:\n' ' ~/.openclaw/workspace/.venv-code-interpreter/bin/python analyze_data.py ...' ) try: import matplotlib import matplotlib.pyplot as plt HAS_MPL = True except Exception: HAS_MPL = False ZH_FONT_CANDIDATES = [ '/home/selig/.openclaw/workspace/skills/code-interpreter/assets/fonts/NotoSansCJKtc-Regular.otf', '/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf', ] def configure_matplotlib_fonts() -> tuple[str | None, object | None]: if not HAS_MPL: return None, None chosen = None chosen_prop = None for path in ZH_FONT_CANDIDATES: if Path(path).exists(): try: from matplotlib import font_manager font_manager.fontManager.addfont(path) font_prop = font_manager.FontProperties(fname=path) font_name = font_prop.get_name() matplotlib.rcParams['font.family'] = [font_name] matplotlib.rcParams['axes.unicode_minus'] = False chosen = font_name chosen_prop = font_prop break except Exception: continue return chosen, chosen_prop def apply_font(ax, font_prop) -> None: if not font_prop: return title = ax.title if title: title.set_fontproperties(font_prop) ax.xaxis.label.set_fontproperties(font_prop) ax.yaxis.label.set_fontproperties(font_prop) for label in ax.get_xticklabels(): label.set_fontproperties(font_prop) for label in ax.get_yticklabels(): label.set_fontproperties(font_prop) legend = ax.get_legend() if legend: for text in legend.get_texts(): text.set_fontproperties(font_prop) legend.get_title().set_fontproperties(font_prop) def detect_format(path: Path) -> str: ext = path.suffix.lower() if ext in {'.csv', '.tsv', '.txt'}: return 'delimited' if ext == '.json': return 'json' if ext in {'.xlsx', '.xls'}: return 'excel' raise SystemExit(f'Unsupported file type: {ext}') def load_df(path: Path) -> pd.DataFrame: fmt = detect_format(path) if fmt == 'delimited': sep = '\t' if path.suffix.lower() == '.tsv' else ',' return pd.read_csv(path, sep=sep) if fmt == 'json': try: return pd.read_json(path) except ValueError: return pd.DataFrame(json.loads(path.read_text(encoding='utf-8'))) if fmt == 'excel': return pd.read_excel(path) raise SystemExit('Unsupported format') def safe_name(s: str) -> str: keep = [] for ch in s: if ch.isalnum() or ch in ('-', '_'): keep.append(ch) elif ch in (' ', '/'): keep.append('_') out = ''.join(keep).strip('_') return out[:80] or 'column' def series_stats(s: pd.Series) -> dict: non_null = s.dropna() result = { 'dtype': str(s.dtype), 'nonNull': int(non_null.shape[0]), 'nulls': int(s.isna().sum()), 'unique': int(non_null.nunique()) if len(non_null) else 0, } if pd.api.types.is_numeric_dtype(s): result.update({ 'min': None if non_null.empty else float(non_null.min()), 'max': None if non_null.empty else float(non_null.max()), 'mean': None if non_null.empty else float(non_null.mean()), 'sum': None if non_null.empty else float(non_null.sum()), }) else: top = non_null.astype(str).value_counts().head(5) result['topValues'] = [{ 'value': str(idx), 'count': int(val), } for idx, val in top.items()] return result def maybe_parse_dates(df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]: parsed = [] out = df.copy() for col in out.columns: if out[col].dtype == 'object': sample = out[col].dropna().astype(str).head(20) if sample.empty: continue parsed_col = pd.to_datetime(out[col], errors='coerce') success_ratio = float(parsed_col.notna().mean()) if len(out[col]) else 0.0 if success_ratio >= 0.6: out[col] = parsed_col parsed.append(str(col)) return out, parsed def write_report(df: pd.DataFrame, summary: dict, out_dir: Path) -> Path: lines = [] lines.append('# Data Analysis Report') lines.append('') lines.append(f"- Source: `{summary['source']}`") lines.append(f"- Rows: **{summary['rows']}**") lines.append(f"- Columns: **{summary['columns']}**") lines.append(f"- Generated plots: **{len(summary['plots'])}**") if summary['parsedDateColumns']: lines.append(f"- Parsed date columns: {', '.join(summary['parsedDateColumns'])}") lines.append('') lines.append('## Columns') lines.append('') for name, meta in summary['columnProfiles'].items(): lines.append(f"### {name}") lines.append(f"- dtype: `{meta['dtype']}`") lines.append(f"- non-null: {meta['nonNull']}") lines.append(f"- nulls: {meta['nulls']}") lines.append(f"- unique: {meta['unique']}") if 'mean' in meta: lines.append(f"- min / max: {meta['min']} / {meta['max']}") lines.append(f"- mean / sum: {meta['mean']} / {meta['sum']}") elif meta.get('topValues'): preview = ', '.join([f"{x['value']} ({x['count']})" for x in meta['topValues'][:5]]) lines.append(f"- top values: {preview}") lines.append('') report = out_dir / 'report.md' report.write_text('\n'.join(lines).strip() + '\n', encoding='utf-8') return report def generate_plots(df: pd.DataFrame, out_dir: Path, font_prop=None) -> list[str]: if not HAS_MPL: return [] plots = [] numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] date_cols = [c for c in df.columns if pd.api.types.is_datetime64_any_dtype(df[c])] cat_cols = [c for c in df.columns if not pd.api.types.is_numeric_dtype(df[c]) and not pd.api.types.is_datetime64_any_dtype(df[c])] if numeric_cols: col = numeric_cols[0] plt.figure(figsize=(7, 4)) bins = min(20, max(5, int(math.sqrt(max(1, df[col].dropna().shape[0]))))) df[col].dropna().hist(bins=bins) plt.title(f'Histogram of {col}', fontproperties=font_prop) plt.xlabel(str(col), fontproperties=font_prop) plt.ylabel('Count', fontproperties=font_prop) apply_font(plt.gca(), font_prop) path = out_dir / f'hist_{safe_name(str(col))}.png' plt.tight_layout() plt.savefig(path, dpi=160) plt.close() plots.append(str(path)) if cat_cols and numeric_cols: cat, num = cat_cols[0], numeric_cols[0] grp = df.groupby(cat, dropna=False)[num].sum().sort_values(ascending=False).head(12) if not grp.empty: plt.figure(figsize=(8, 4.5)) grp.plot(kind='bar') plt.title(f'{num} by {cat}', fontproperties=font_prop) plt.xlabel(str(cat), fontproperties=font_prop) plt.ylabel(f'Sum of {num}', fontproperties=font_prop) apply_font(plt.gca(), font_prop) plt.tight_layout() path = out_dir / f'bar_{safe_name(str(num))}_by_{safe_name(str(cat))}.png' plt.savefig(path, dpi=160) plt.close() plots.append(str(path)) if date_cols and numeric_cols: date_col, num = date_cols[0], numeric_cols[0] grp = df[[date_col, num]].dropna().sort_values(date_col) if not grp.empty: plt.figure(figsize=(8, 4.5)) plt.plot(grp[date_col], grp[num], marker='o') plt.title(f'{num} over time', fontproperties=font_prop) plt.xlabel(str(date_col), fontproperties=font_prop) plt.ylabel(str(num), fontproperties=font_prop) apply_font(plt.gca(), font_prop) plt.tight_layout() path = out_dir / f'line_{safe_name(str(num))}_over_time.png' plt.savefig(path, dpi=160) plt.close() plots.append(str(path)) return plots def main() -> int: parser = argparse.ArgumentParser(description='Automatic data analysis report generator') parser.add_argument('input', help='Input data file (csv/json/xlsx)') parser.add_argument('--artifact-dir', required=True, help='Output artifact directory') args = parser.parse_args() input_path = Path(args.input).expanduser().resolve() artifact_dir = Path(args.artifact_dir).expanduser().resolve() artifact_dir.mkdir(parents=True, exist_ok=True) df = load_df(input_path) original_columns = [str(c) for c in df.columns] df, parsed_dates = maybe_parse_dates(df) chosen_font, chosen_font_prop = configure_matplotlib_fonts() preview_path = artifact_dir / 'preview.csv' df.head(50).to_csv(preview_path, index=False) summary = { 'source': str(input_path), 'rows': int(df.shape[0]), 'columns': int(df.shape[1]), 'columnNames': original_columns, 'parsedDateColumns': parsed_dates, 'columnProfiles': {str(c): series_stats(df[c]) for c in df.columns}, 'plots': [], 'plotFont': chosen_font, } summary['plots'] = generate_plots(df, artifact_dir, chosen_font_prop) summary_path = artifact_dir / 'summary.json' summary_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8') report_path = write_report(df, summary, artifact_dir) result = { 'ok': True, 'input': str(input_path), 'artifactDir': str(artifact_dir), 'summary': str(summary_path), 'report': str(report_path), 'preview': str(preview_path), 'plots': summary['plots'], } print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if __name__ == '__main__': raise SystemExit(main())