AgentOp

Data Analysis Agent

by ozzo · Dec 30, 2025 Public
Download
14 downloads
0 forks
0.0 rating

Description

test

Source Code

agent.py
import pandas as pd
import numpy as np

# Template Variables - Users can customize these
max_rows = [[[MAX_ROWS|1000]]]
precision = [[[PRECISION|2]]]
chart_type = "[[[CHART_TYPE|bar]]]"
include_summary = [[[INCLUDE_SUMMARY|True]]]

# Global variables to store loaded data
current_data = None
current_filename = None

def dataframe_to_markdown(df, max_rows=10):
    """Convert pandas DataFrame to markdown table format."""
    if len(df) > max_rows:
        df = df.head(max_rows)
        truncated = True
    else:
        truncated = False
    lines = []
    headers = [''] + list(df.columns)
    lines.append('| ' + ' | '.join(str(h) for h in headers) + ' |')
    lines.append('|' + '|'.join([' --- ' for _ in range(len(headers))]) + '|')
    for idx, row in df.iterrows():
        row_values = [str(idx)] + [str(v) for v in row]
        lines.append('| ' + ' | '.join(row_values) + ' |')
    if truncated:
        lines.append(f'\n*Showing first {max_rows} rows of {len(df)} total*')
    return '\n'.join(lines)

def load_csv_data(csv_content: str, filename: str = "data.csv"):
    """Load CSV data into global variable."""
    global current_data, current_filename
    try:
        from io import StringIO
        current_data = pd.read_csv(StringIO(csv_content))
        current_filename = filename
        return f"✅ Loaded {current_data.shape[0]} rows and {current_data.shape[1]} columns from {filename}"
    except Exception as e:
        return f"❌ Error loading CSV: {str(e)}"

def get_data_summary() -> str:
    """Get dataset summary: shape, columns, data types, statistics."""
    global current_data
    if current_data is None:
        return "❌ No data loaded. Please upload a CSV file first."
    result = []
    result.append(f"## Dataset Overview")
    result.append(f"Shape: {current_data.shape[0]} rows × {current_data.shape[1]} columns")
    result.append(f"\nColumns: {', '.join(current_data.columns.tolist())}")
    result.append("\n### Data Types:")
    for col in current_data.columns:
        dtype = str(current_data[col].dtype)
        result.append(f"- **{col}**: {dtype}")
    result.append("\n### Missing Values:")
    missing = current_data.isnull().sum()
    has_missing = False
    for col in current_data.columns:
        if missing[col] > 0:
            pct = (missing[col] / len(current_data)) * 100
            result.append(f"- **{col}**: {missing[col]} missing ({pct:.1f}%)")
            has_missing = True
    if not has_missing:
        result.append("- ✅ No missing values found")
    if current_data.select_dtypes(include=[np.number]).shape[1] > 0:
        result.append("\n### Summary Statistics:")
        stats_df = current_data.describe()
        result.append(dataframe_to_markdown(stats_df))
    return "\n".join(result)

def get_column_info() -> str:
    """Get column info: data types and missing values."""
    global current_data
    if current_data is None:
        return "❌ No data loaded. Please upload a CSV file first."
    result = [f"## Column Information\n"]
    result.append(f"Dataset has **{len(current_data.columns)} columns** and **{len(current_data)} rows**:\n")
    result.append("| Column | Type | Non-Null | Missing | % Missing |")
    result.append("| --- | --- | --- | --- | --- |")
    for col in current_data.columns:
        dtype = str(current_data[col].dtype)
        non_null = current_data[col].count()
        total = len(current_data)
        missing = total - non_null
        pct_missing = (missing / total) * 100
        result.append(f"| {col} | {dtype} | {non_null} | {missing} | {pct_missing:.1f}% |")
    return "\n".join(result)

def get_value_counts(column: str) -> str:
    """Get value counts for a specific column."""
    global current_data
    if current_data is None:
        return "❌ No data loaded. Please upload a CSV file first."
    if column not in current_data.columns:
        return f"❌ Column '{column}' not found. Available columns: {', '.join(current_data.columns)}"
    result = [f"## Value counts for '{column}'\n"]
    value_counts = current_data[column].value_counts().head(15)
    total = len(current_data)
    result.append("| Value | Count | Percentage |")
    result.append("| --- | --- | --- |")
    for value, count in value_counts.items():
        pct = (count / total) * 100
        result.append(f"| {value} | {count} | {pct:.1f}% |")
    unique_count = current_data[column].nunique()
    if unique_count > 15:
        result.append(f"\n*Showing top 15 of {unique_count} unique values*")
    else:
        result.append(f"\n*Total unique values: {unique_count}*")
    return "\n".join(result)

def create_chart(column: str, chart_type: str = "histogram") -> str:
    """Create a chart for a specific column."""
    global current_data
    if current_data is None:
        return "❌ No data loaded. Please upload a CSV file first."
    if column not in current_data.columns:
        return f"❌ Column '{column}' not found. Available columns: {', '.join(current_data.columns)}"
    try:
        try:
            import matplotlib
            matplotlib.use('Agg')  # CRITICAL: Use Agg backend for Pyodide
            import matplotlib.pyplot as plt
            import base64
            from io import BytesIO
            plt.ioff()
        except ImportError as e:
            return f"❌ Chart creation unavailable: {str(e)}"
        
        fig, ax = plt.subplots(figsize=(10, 6))
        
        if chart_type.lower() == "bar":
            value_counts = current_data[column].value_counts().head(10)
            ax.bar(range(len(value_counts)), value_counts.values, color='#059669')
            ax.set_xticks(range(len(value_counts)))
            ax.set_xticklabels(value_counts.index, rotation=45, ha='right')
            ax.set_ylabel('Count')
            ax.set_title(f'Bar Chart: {column}', fontsize=14, fontweight='bold')
            ax.grid(axis='y', alpha=0.3)
        elif chart_type.lower() == "histogram":
            if pd.api.types.is_numeric_dtype(current_data[column]):
                ax.hist(current_data[column].dropna(), bins=20, alpha=0.7, color='#059669', edgecolor='white')
                ax.set_xlabel(column)
                ax.set_ylabel('Frequency')
                ax.set_title(f'Histogram: {column}', fontsize=14, fontweight='bold')
                ax.grid(axis='y', alpha=0.3)
            else:
                plt.close(fig)
                return f"❌ Cannot create histogram for non-numeric column '{column}'. Try 'bar' chart instead."
        else:
            plt.close(fig)
            return f"❌ Unsupported chart type '{chart_type}'. Use: bar or histogram"
        
        plt.tight_layout()
        
        # Save to BytesIO buffer and encode to base64
        buffer = BytesIO()
        plt.savefig(buffer, format='png', dpi=100, bbox_inches='tight')
        buffer.seek(0)
        image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
        plt.close(fig)
        
        # Return HTML with embedded base64 image
        return f"""✅ Chart created successfully for '{column}' ({chart_type} chart).

<img src="data:image/png;base64,{image_base64}" alt="{chart_type.title()} chart for {column}" style="max-width: 100%; height: auto; margin: 10px 0; border-radius: 8px; box-shadow: 0 2px 8px rgba(0,0,0,0.1);">

Chart shows the distribution of values in the '{column}' column."""
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        return f"❌ Error creating chart: {str(e)}\n\nDetails:\n{error_details}"

def get_correlation_analysis() -> str:
    """Get correlation analysis for numeric columns."""
    global current_data
    if current_data is None:
        return "❌ No data loaded. Please upload a CSV file first."
    numeric_cols = current_data.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) < 2:
        return "❌ Need at least 2 numerical columns to calculate correlations."
    corr_matrix = current_data[numeric_cols].corr()
    result = ["## Correlation Analysis\n"]
    result.append("### Correlation Matrix:\n")
    result.append(dataframe_to_markdown(corr_matrix, max_rows=20))
    result.append("\n### Key Insights:")
    strong_corr = []
    for i in range(len(numeric_cols)):
        for j in range(i+1, len(numeric_cols)):
            corr_val = corr_matrix.iloc[i, j]
            if abs(corr_val) > 0.7:
                strength = "strong positive" if corr_val > 0 else "strong negative"
                emoji = "📈" if corr_val > 0 else "📉"
                strong_corr.append(f"- {emoji} **{numeric_cols[i]}** and **{numeric_cols[j]}**: {strength} correlation ({corr_val:.3f})")
    if strong_corr:
        result.extend(strong_corr)
    else:
        result.append("- ℹ️ No strong correlations found (|r| > 0.7)")
    return "\n".join(result)