Source Code
agent.py
import pandas as pd
import numpy as np
# Template Variables - Users can customize these
max_rows = [[[MAX_ROWS|1000]]]
precision = [[[PRECISION|2]]]
chart_type = "[[[CHART_TYPE|bar]]]"
include_summary = [[[INCLUDE_SUMMARY|True]]]
# Global variables to store loaded data
current_data = None
current_filename = None
def dataframe_to_markdown(df, max_rows=10):
"""Convert pandas DataFrame to markdown table format."""
if len(df) > max_rows:
df = df.head(max_rows)
truncated = True
else:
truncated = False
lines = []
headers = [''] + list(df.columns)
lines.append('| ' + ' | '.join(str(h) for h in headers) + ' |')
lines.append('|' + '|'.join([' --- ' for _ in range(len(headers))]) + '|')
for idx, row in df.iterrows():
row_values = [str(idx)] + [str(v) for v in row]
lines.append('| ' + ' | '.join(row_values) + ' |')
if truncated:
lines.append(f'\n*Showing first {max_rows} rows of {len(df)} total*')
return '\n'.join(lines)
def load_csv_data(csv_content: str, filename: str = "data.csv"):
"""Load CSV data into global variable."""
global current_data, current_filename
try:
from io import StringIO
current_data = pd.read_csv(StringIO(csv_content))
current_filename = filename
return f"✅ Loaded {current_data.shape[0]} rows and {current_data.shape[1]} columns from {filename}"
except Exception as e:
return f"❌ Error loading CSV: {str(e)}"
def get_data_summary() -> str:
"""Get dataset summary: shape, columns, data types, statistics."""
global current_data
if current_data is None:
return "❌ No data loaded. Please upload a CSV file first."
result = []
result.append(f"## Dataset Overview")
result.append(f"Shape: {current_data.shape[0]} rows × {current_data.shape[1]} columns")
result.append(f"\nColumns: {', '.join(current_data.columns.tolist())}")
result.append("\n### Data Types:")
for col in current_data.columns:
dtype = str(current_data[col].dtype)
result.append(f"- **{col}**: {dtype}")
result.append("\n### Missing Values:")
missing = current_data.isnull().sum()
has_missing = False
for col in current_data.columns:
if missing[col] > 0:
pct = (missing[col] / len(current_data)) * 100
result.append(f"- **{col}**: {missing[col]} missing ({pct:.1f}%)")
has_missing = True
if not has_missing:
result.append("- ✅ No missing values found")
if current_data.select_dtypes(include=[np.number]).shape[1] > 0:
result.append("\n### Summary Statistics:")
stats_df = current_data.describe()
result.append(dataframe_to_markdown(stats_df))
return "\n".join(result)
def get_column_info() -> str:
"""Get column info: data types and missing values."""
global current_data
if current_data is None:
return "❌ No data loaded. Please upload a CSV file first."
result = [f"## Column Information\n"]
result.append(f"Dataset has **{len(current_data.columns)} columns** and **{len(current_data)} rows**:\n")
result.append("| Column | Type | Non-Null | Missing | % Missing |")
result.append("| --- | --- | --- | --- | --- |")
for col in current_data.columns:
dtype = str(current_data[col].dtype)
non_null = current_data[col].count()
total = len(current_data)
missing = total - non_null
pct_missing = (missing / total) * 100
result.append(f"| {col} | {dtype} | {non_null} | {missing} | {pct_missing:.1f}% |")
return "\n".join(result)
def get_value_counts(column: str) -> str:
"""Get value counts for a specific column."""
global current_data
if current_data is None:
return "❌ No data loaded. Please upload a CSV file first."
if column not in current_data.columns:
return f"❌ Column '{column}' not found. Available columns: {', '.join(current_data.columns)}"
result = [f"## Value counts for '{column}'\n"]
value_counts = current_data[column].value_counts().head(15)
total = len(current_data)
result.append("| Value | Count | Percentage |")
result.append("| --- | --- | --- |")
for value, count in value_counts.items():
pct = (count / total) * 100
result.append(f"| {value} | {count} | {pct:.1f}% |")
unique_count = current_data[column].nunique()
if unique_count > 15:
result.append(f"\n*Showing top 15 of {unique_count} unique values*")
else:
result.append(f"\n*Total unique values: {unique_count}*")
return "\n".join(result)
def create_chart(column: str, chart_type: str = "histogram") -> str:
"""Create a chart for a specific column."""
global current_data
if current_data is None:
return "❌ No data loaded. Please upload a CSV file first."
if column not in current_data.columns:
return f"❌ Column '{column}' not found. Available columns: {', '.join(current_data.columns)}"
try:
try:
import matplotlib
matplotlib.use('Agg') # CRITICAL: Use Agg backend for Pyodide
import matplotlib.pyplot as plt
import base64
from io import BytesIO
plt.ioff()
except ImportError as e:
return f"❌ Chart creation unavailable: {str(e)}"
fig, ax = plt.subplots(figsize=(10, 6))
if chart_type.lower() == "bar":
value_counts = current_data[column].value_counts().head(10)
ax.bar(range(len(value_counts)), value_counts.values, color='#059669')
ax.set_xticks(range(len(value_counts)))
ax.set_xticklabels(value_counts.index, rotation=45, ha='right')
ax.set_ylabel('Count')
ax.set_title(f'Bar Chart: {column}', fontsize=14, fontweight='bold')
ax.grid(axis='y', alpha=0.3)
elif chart_type.lower() == "histogram":
if pd.api.types.is_numeric_dtype(current_data[column]):
ax.hist(current_data[column].dropna(), bins=20, alpha=0.7, color='#059669', edgecolor='white')
ax.set_xlabel(column)
ax.set_ylabel('Frequency')
ax.set_title(f'Histogram: {column}', fontsize=14, fontweight='bold')
ax.grid(axis='y', alpha=0.3)
else:
plt.close(fig)
return f"❌ Cannot create histogram for non-numeric column '{column}'. Try 'bar' chart instead."
else:
plt.close(fig)
return f"❌ Unsupported chart type '{chart_type}'. Use: bar or histogram"
plt.tight_layout()
# Save to BytesIO buffer and encode to base64
buffer = BytesIO()
plt.savefig(buffer, format='png', dpi=100, bbox_inches='tight')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
plt.close(fig)
# Return HTML with embedded base64 image
return f"""✅ Chart created successfully for '{column}' ({chart_type} chart).
<img src="data:image/png;base64,{image_base64}" alt="{chart_type.title()} chart for {column}" style="max-width: 100%; height: auto; margin: 10px 0; border-radius: 8px; box-shadow: 0 2px 8px rgba(0,0,0,0.1);">
Chart shows the distribution of values in the '{column}' column."""
except Exception as e:
import traceback
error_details = traceback.format_exc()
return f"❌ Error creating chart: {str(e)}\n\nDetails:\n{error_details}"
def get_correlation_analysis() -> str:
"""Get correlation analysis for numeric columns."""
global current_data
if current_data is None:
return "❌ No data loaded. Please upload a CSV file first."
numeric_cols = current_data.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
return "❌ Need at least 2 numerical columns to calculate correlations."
corr_matrix = current_data[numeric_cols].corr()
result = ["## Correlation Analysis\n"]
result.append("### Correlation Matrix:\n")
result.append(dataframe_to_markdown(corr_matrix, max_rows=20))
result.append("\n### Key Insights:")
strong_corr = []
for i in range(len(numeric_cols)):
for j in range(i+1, len(numeric_cols)):
corr_val = corr_matrix.iloc[i, j]
if abs(corr_val) > 0.7:
strength = "strong positive" if corr_val > 0 else "strong negative"
emoji = "📈" if corr_val > 0 else "📉"
strong_corr.append(f"- {emoji} **{numeric_cols[i]}** and **{numeric_cols[j]}**: {strength} correlation ({corr_val:.3f})")
if strong_corr:
result.extend(strong_corr)
else:
result.append("- ℹ️ No strong correlations found (|r| > 0.7)")
return "\n".join(result)