Preview: SQL Database Agent Template
Connect to SQLite databases and query them using natural language. Supports schema exploration, data analysis, and SQL query generation.
Preview Mode
This is a preview with sample data. The template uses placeholders like
which will be replaced with actual agent data.
Template Preview
Template Metadata
- Slug
- sql-database-agent-template
- Created By
- ozzo
- Created
- Nov 02, 2025
- Usage Count
- 0
Tags
database
sql-agent
sql
data
Code Statistics
- HTML Lines
- 138
- CSS Lines
- 312
- JS Lines
- 285
- Python Lines
- 418
Source Code
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<title>{{agent_name}} ā SQL Database Assistant</title>
<style>
{{css_styles}}
</style>
<!-- Pyodide for Python -->
<script src="https://cdn.jsdelivr.net/pyodide/v0.29.0/full/pyodide.js"></script>
<script>
const PROVIDER = "{{embedded_provider}}";
const API_KEY = "{{api_key}}";
const NEEDS_PYODIDE = true;
const PYODIDE_VERSION = "0.29.0";
</script>
</head>
<body>
<div class="dashboard">
<div class="sidebar">
<div class="agent-header">
<div class="agent-icon">šļø</div>
<div>
<h2 style="margin: 0; font-size: 1.1rem;">{{agent_name}}</h2>
<p style="margin: 0; color: #64748b; font-size: 0.875rem;">SQL Database</p>
</div>
</div>
<h3>Database Connection</h3>
<div class="input-section" style="margin-bottom: 2rem;">
<select id="db-type" class="db-input" style="margin-bottom: 0.75rem;">
<option value="sqlite">SQLite (File)</option>
<option value="mysql">MySQL</option>
<option value="postgresql">PostgreSQL</option>
</select>
<div id="sqlite-connection" class="connection-form">
<div class="upload-area" id="file-upload-area"
ondragenter="handleDragOver(event)"
ondragover="handleDragOver(event)"
ondragleave="handleDragLeave(event)"
ondrop="handleDrop(event)"
onclick="document.getElementById('db-file-input').click()">
<div style="color: var(--text-muted);">
<div style="font-size: 2rem; margin-bottom: 0.5rem;">š</div>
<p style="margin: 0; font-weight: 500;">Drop SQLite file here</p>
<p style="margin: 0.25rem 0 0 0; font-size: 0.75rem;">or click to browse</p>
</div>
<input type="file" id="db-file-input" accept=".db,.sqlite,.sqlite3" style="display: none;" onchange="handleFileSelect(event)">
</div>
</div>
<div id="remote-connection" class="connection-form" style="display: none;">
<input type="text" id="db-host" class="db-input" placeholder="Host (e.g., localhost)" style="margin-bottom: 0.75rem;" />
<input type="number" id="db-port" class="db-input" placeholder="Port" style="margin-bottom: 0.75rem;" />
<input type="text" id="db-name" class="db-input" placeholder="Database Name" style="margin-bottom: 0.75rem;" />
<input type="text" id="db-user" class="db-input" placeholder="Username" style="margin-bottom: 0.75rem;" />
<input type="password" id="db-password" class="db-input" placeholder="Password" style="margin-bottom: 0.75rem;" />
</div>
<button class="analyze-button" id="connectBtn" onclick="connectDatabase()" style="margin-bottom: 1rem;">
<span>š</span>
<span>Connect to Database</span>
</button>
<div id="connection-status" style="display: none; padding: 0.75rem; border-radius: 6px; margin-bottom: 1rem; font-size: 0.875rem;"></div>
</div>
<div id="query-section" style="display: none;">
<h3>SQL Query</h3>
<div class="input-section">
<textarea id="queryInput"
class="query-input"
placeholder="Examples:
⢠Show me all tables in the database
⢠What columns does the users table have?
⢠Find the top 10 customers by sales
⢠Show sales trends over time"
rows="4">
</textarea>
<button class="analyze-button" id="analyzeBtn" onclick="processQuery()">
<span>š</span>
<span>Execute Query</span>
</button>
</div>
<div class="sample-queries">
<h4 style="margin-bottom: 0.75rem; font-size: 0.9rem; color: var(--text-heading);">Sample Queries</h4>
<div class="sample-query" onclick="fillSampleQuery('Show me all tables')">š Show me all tables</div>
<div class="sample-query" onclick="fillSampleQuery('Describe the schema')">š Describe the schema</div>
<div class="sample-query" onclick="fillSampleQuery('Show the first 10 rows from each table')">šļø Show the first 10 rows from each table</div>
<div class="sample-query" onclick="fillSampleQuery('What are the relationships between tables?')">š What are the relationships between tables?</div>
</div>
</div>
</div>
<div class="main-content">
<div class="agent-header">
<h1 style="margin: 0; font-size: 1.5rem; color: var(--text-heading);">Query Results</h1>
<div style="margin-left: auto; font-size: 0.875rem; color: #64748b;">Powered by AI</div>
</div>
<div class="result-area">
<div id="results-container" class="result-content">
<div style="text-align: center; color: var(--text-muted); padding: 3rem 0;">
<div style="font-size: 3rem; margin-bottom: 1rem;">šļø</div>
<h3 style="margin: 0 0 0.5rem 0;">Ready for Database Queries</h3>
<p style="margin: 0;">Connect to a database and ask questions in natural language</p>
</div>
</div>
<div class="loading-indicator" id="loading-indicator">
<div class="spinner"></div>
<p>Analyzing your query...</p>
</div>
</div>
</div>
</div>
<script>
{{js_includes}}
</script>
<!-- Hidden Python code -->
<script type="text/python" id="python-code">
{{python_code}}
</script>
</body>
</html>
:root {
--primary: #3b82f6;
--primary-hover: #2563eb;
--secondary: #8b5cf6;
--surface-bg: #f8fafc;
--surface-border: #e2e8f0;
--text-heading: #1e293b;
--text-body: #475569;
--text-muted: #64748b;
--radius-lg: 12px;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #eff6ff 0%, #f8fafc 100%);
color: var(--text-body);
line-height: 1.6;
}
.dashboard {
max-width: 1400px;
margin: 0 auto;
padding: 1.5rem;
display: grid;
grid-template-columns: 350px 1fr;
gap: 1.5rem;
min-height: 100vh;
}
.sidebar {
background: white;
border-radius: var(--radius-lg);
padding: 1.5rem;
height: fit-content;
border: 1px solid var(--surface-border);
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
.main-content {
background: white;
border-radius: var(--radius-lg);
padding: 2rem;
border: 1px solid var(--surface-border);
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
position: relative;
}
.agent-header {
display: flex;
align-items: center;
gap: 1rem;
margin-bottom: 2rem;
padding-bottom: 1rem;
border-bottom: 1px solid var(--surface-border);
}
.agent-icon {
width: 48px;
height: 48px;
background: linear-gradient(135deg, var(--primary), var(--secondary));
border-radius: 12px;
display: flex;
align-items: center;
justify-content: center;
color: white;
font-weight: bold;
font-size: 1.25rem;
}
.sidebar h3 {
margin: 0 0 1rem 0;
color: var(--text-heading);
font-size: 1.1rem;
}
.db-input {
width: 100%;
padding: 0.75rem;
border: 1px solid var(--surface-border);
border-radius: 8px;
font-family: inherit;
font-size: 0.875rem;
outline: none;
transition: border-color 0.2s;
}
.db-input:focus {
border-color: var(--primary);
box-shadow: 0 0 0 3px rgba(59, 130, 246, 0.1);
}
.upload-area {
border: 2px dashed var(--surface-border);
border-radius: 8px;
padding: 2rem;
text-align: center;
cursor: pointer;
transition: all 0.3s;
margin-bottom: 1rem;
}
.upload-area:hover, .upload-area.drag-over {
border-color: var(--primary);
background: rgba(59, 130, 246, 0.05);
}
.query-input {
width: 100%;
min-height: 100px;
max-height: 200px;
padding: 0.75rem;
border: 1px solid var(--surface-border);
border-radius: 8px;
font-family: inherit;
font-size: 0.875rem;
resize: vertical;
margin-bottom: 0.75rem;
outline: none;
transition: border-color 0.2s;
}
.query-input:focus {
border-color: var(--primary);
box-shadow: 0 0 0 3px rgba(59, 130, 246, 0.1);
}
.analyze-button {
width: 100%;
background: var(--primary);
color: white;
padding: 0.75rem 1rem;
border: none;
border-radius: 8px;
cursor: pointer;
font-weight: 500;
transition: background-color 0.2s;
display: flex;
align-items: center;
justify-content: center;
gap: 0.5rem;
}
.analyze-button:hover:not(:disabled) {
background: var(--primary-hover);
}
.analyze-button:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.sample-queries {
margin-top: 1.5rem;
}
.sample-query {
background: #f8fafc;
border: 1px solid var(--surface-border);
border-radius: 6px;
padding: 0.5rem 0.75rem;
margin-bottom: 0.5rem;
font-size: 0.875rem;
cursor: pointer;
transition: background-color 0.2s;
}
.sample-query:hover {
background: #e2e8f0;
}
.result-area {
min-height: 500px;
background: var(--surface-bg);
border-radius: 8px;
padding: 1.5rem;
margin-top: 1rem;
border: 1px solid var(--surface-border);
position: relative;
}
.result-content {
line-height: 1.6;
}
.message {
display: flex;
gap: 1rem;
margin-bottom: 1.5rem;
padding: 1rem;
border-radius: 8px;
background: white;
border: 1px solid var(--surface-border);
}
.message-user {
background: #eff6ff;
border-color: #bfdbfe;
}
.message-assistant {
background: #f0fdf4;
border-color: #bbf7d0;
}
.message-error {
background: #fef2f2;
border-color: #fecaca;
}
.message-system {
background: #fefce8;
border-color: #fde047;
}
.message-icon {
font-size: 1.5rem;
flex-shrink: 0;
}
.message-content {
flex: 1;
overflow-wrap: break-word;
}
.loading-indicator {
display: none;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
color: #64748b;
}
.loading-indicator.show {
display: block;
}
.spinner {
width: 40px;
height: 40px;
border: 3px solid #e2e8f0;
border-top: 3px solid var(--primary);
border-radius: 50%;
animation: spin 1s linear infinite;
margin: 0 auto 1rem;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
table {
border-collapse: collapse;
width: 100%;
margin: 1rem 0;
font-family: 'Courier New', monospace;
font-size: 0.85rem;
background: white;
border-radius: 8px;
overflow: hidden;
max-width: 100%;
display: block;
overflow-x: auto;
}
th, td {
border: 1px solid var(--surface-border);
padding: 0.5rem;
text-align: left;
white-space: nowrap;
}
th {
background: var(--surface-bg);
font-weight: 600;
color: var(--text-heading);
position: sticky;
top: 0;
}
tr:nth-child(even) {
background: #f8fafc;
}
.sql-code {
background: #1e293b;
color: #e2e8f0;
padding: 1rem;
border-radius: 8px;
font-family: 'Courier New', monospace;
font-size: 0.875rem;
overflow-x: auto;
margin: 1rem 0;
}
.sql-keyword {
color: #60a5fa;
font-weight: bold;
}
@media (max-width: 968px) {
.dashboard {
grid-template-columns: 1fr;
gap: 1rem;
padding: 1rem;
}
}
// Note: This template now uses the generator's universal initialization system.
// The generator (apps/core/utils.py) handles:
// - Pyodide loading
// - Package installation (built-in and PyPI via micropip)
// - WebLLM bridge injection (for local provider)
// - API key setup
//
// Template-specific code should focus on business logic, not initialization.
// Database connection state
let dbConnected = false;
let dbType = 'sqlite';
let dbFile = null;
// Database type selector
document.getElementById('db-type').addEventListener('change', function(e) {
dbType = e.target.value;
const sqliteForm = document.getElementById('sqlite-connection');
const remoteForm = document.getElementById('remote-connection');
if (dbType === 'sqlite') {
sqliteForm.style.display = 'block';
remoteForm.style.display = 'none';
} else {
sqliteForm.style.display = 'none';
remoteForm.style.display = 'block';
// Set default ports
const portInput = document.getElementById('db-port');
if (dbType === 'mysql') {
portInput.value = '3306';
} else if (dbType === 'postgresql') {
portInput.value = '5432';
}
}
});
// File upload handlers
function handleDragOver(event) {
event.preventDefault();
event.dataTransfer.dropEffect = 'copy';
document.getElementById('file-upload-area').classList.add('drag-over');
}
function handleDragLeave(event) {
event.preventDefault();
document.getElementById('file-upload-area').classList.remove('drag-over');
}
function handleDrop(event) {
event.preventDefault();
document.getElementById('file-upload-area').classList.remove('drag-over');
const files = event.dataTransfer.files;
if (files.length > 0) {
handleFile(files[0]);
}
}
function handleFileSelect(event) {
const file = event.target.files[0];
if (file) {
handleFile(file);
}
}
function handleFile(file) {
dbFile = file;
const uploadArea = document.getElementById('file-upload-area');
uploadArea.innerHTML = `
<div style="color: var(--primary);">
<div style="font-size: 2rem; margin-bottom: 0.5rem;">ā</div>
<p style="margin: 0; font-weight: 500;">${file.name}</p>
<p style="margin: 0.25rem 0 0 0; font-size: 0.75rem;">${formatFileSize(file.size)}</p>
</div>
`;
}
function formatFileSize(bytes) {
if (bytes === 0) return '0 Bytes';
const k = 1024;
const sizes = ['Bytes', 'KB', 'MB', 'GB'];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
}
// Database connection
async function connectDatabase() {
const connectBtn = document.getElementById('connectBtn');
const statusDiv = document.getElementById('connection-status');
connectBtn.disabled = true;
connectBtn.innerHTML = '<span>ā³</span><span>Connecting...</span>';
try {
if (dbType === 'sqlite') {
if (!dbFile) {
throw new Error('Please select a SQLite database file');
}
// Read file and load into Python
const reader = new FileReader();
reader.onload = async function(e) {
try {
const arrayBuffer = e.target.result;
const bytes = new Uint8Array(arrayBuffer);
// Pass to Python
pyodide.globals.set('db_bytes', bytes);
pyodide.globals.set('db_filename', dbFile.name);
const result = await pyodide.runPythonAsync('connect_to_sqlite(db_bytes, db_filename)');
if (result.startsWith('ā
')) {
dbConnected = true;
statusDiv.style.display = 'block';
statusDiv.style.background = '#f0fdf4';
statusDiv.style.border = '1px solid #bbf7d0';
statusDiv.style.color = '#15803d';
statusDiv.innerHTML = result;
document.getElementById('query-section').style.display = 'block';
connectBtn.innerHTML = '<span>ā</span><span>Connected</span>';
addMessage('system', result);
} else {
throw new Error(result);
}
} catch (error) {
throw error;
}
};
reader.readAsArrayBuffer(dbFile);
} else {
// Remote database connection
const host = document.getElementById('db-host').value;
const port = document.getElementById('db-port').value;
const name = document.getElementById('db-name').value;
const user = document.getElementById('db-user').value;
const password = document.getElementById('db-password').value;
if (!host || !name || !user) {
throw new Error('Please fill in all required fields');
}
pyodide.globals.set('db_type', dbType);
pyodide.globals.set('db_host', host);
pyodide.globals.set('db_port', port);
pyodide.globals.set('db_name', name);
pyodide.globals.set('db_user', user);
pyodide.globals.set('db_password', password);
const result = await pyodide.runPythonAsync('connect_to_remote_db(db_type, db_host, db_port, db_name, db_user, db_password)');
if (result.startsWith('ā
')) {
dbConnected = true;
statusDiv.style.display = 'block';
statusDiv.style.background = '#f0fdf4';
statusDiv.style.border = '1px solid #bbf7d0';
statusDiv.style.color = '#15803d';
statusDiv.innerHTML = result;
document.getElementById('query-section').style.display = 'block';
connectBtn.innerHTML = '<span>ā</span><span>Connected</span>';
addMessage('system', result);
} else {
throw new Error(result);
}
}
} catch (error) {
statusDiv.style.display = 'block';
statusDiv.style.background = '#fef2f2';
statusDiv.style.border = '1px solid #fecaca';
statusDiv.style.color = '#dc2626';
statusDiv.innerHTML = 'ā Connection failed: ' + error.message;
connectBtn.innerHTML = '<span>š</span><span>Connect to Database</span>';
connectBtn.disabled = false;
addMessage('error', 'Connection failed: ' + error.message);
}
}
// Query processing
async function processQuery() {
if (!dbConnected) {
addMessage('error', 'Please connect to a database first');
return;
}
const input = document.getElementById('queryInput');
const query = input.value.trim();
if (!query) return;
console.log('[QUERY START]', query);
const startTime = Date.now();
input.value = '';
addMessage('user', query);
const loadingEl = document.getElementById('loading-indicator');
const analyzeBtn = document.getElementById('analyzeBtn');
if (loadingEl) loadingEl.classList.add('show');
if (analyzeBtn) {
analyzeBtn.disabled = true;
analyzeBtn.innerHTML = '<span>ā³</span><span>Processing...</span>';
}
try {
if (window.API_KEY) {
pyodide.globals.set('current_api_key', window.API_KEY);
}
pyodide.globals.set('user_query', query);
const result = await pyodide.runPythonAsync('process_user_query(user_query)');
console.log('[QUERY COMPLETE]', Date.now() - startTime, 'ms');
addMessage('assistant', result);
} catch (error) {
console.error('[ERROR]', error);
addMessage('error', `Error: ${error.message}`);
} finally {
if (loadingEl) loadingEl.classList.remove('show');
if (analyzeBtn) {
analyzeBtn.disabled = false;
analyzeBtn.innerHTML = '<span>š</span><span>Execute Query</span>';
}
}
}
function fillSampleQuery(query) {
document.getElementById('queryInput').value = query;
document.getElementById('queryInput').focus();
}
// Message display
function addMessage(type, content) {
const resultsContainer = document.getElementById('results-container');
// Clear welcome message if present
if (resultsContainer.querySelector('div[style*="text-align: center"]') && type !== 'system') {
resultsContainer.innerHTML = '';
}
const messageEl = document.createElement('div');
messageEl.className = `message message-${type}`;
// Format content
let formattedContent = content
.replace(/\*\*(.*?)\*\*/g, '<strong>$1</strong>')
.replace(/\*(.*?)\*/g, '<em>$1</em>')
.replace(/### (.*?)(\n|$)/g, '<h3>$1</h3>')
.replace(/## (.*?)(\n|$)/g, '<h2>$1</h2>')
.replace(/# (.*?)(\n|$)/g, '<h1>$1</h1>')
.replace(/```sql([\s\S]*?)```/g, '<div class="sql-code">$1</div>')
.replace(/```([\s\S]*?)```/g, '<pre style="background: #f8fafc; padding: 1rem; border-radius: 6px; overflow-x: auto;">$1</pre>')
.replace(/\n/g, '<br>');
messageEl.innerHTML = `
<div class="message-icon">
${type === 'user' ? 'š¤' : type === 'assistant' ? 'š¤' : type === 'error' ? 'ā' : 'š§'}
</div>
<div class="message-content">${formattedContent}</div>
`;
resultsContainer.appendChild(messageEl);
resultsContainer.scrollTop = resultsContainer.scrollHeight;
}
// Keyboard shortcuts
document.addEventListener('DOMContentLoaded', function() {
const queryInput = document.getElementById('queryInput');
if (queryInput) {
queryInput.addEventListener('keypress', function(e) {
if (e.key === 'Enter' && e.ctrlKey) {
e.preventDefault();
processQuery();
}
});
}
});
# SQL Database Agent - Database Query and Analysis
# Supports SQLite, MySQL, and PostgreSQL databases
import sqlite3
import json
# Template Variables
[[[MAX_ROWS|100]]]
[[[SHOW_SQL|True]]]
[[[AUTO_VISUALIZE|False]]]
# Global database connection
db_conn = None
db_cursor = None
db_type = None
db_schema = {}
# Import LangChain components
try:
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
langchain_available = True
print("[OK] LangChain imports successful")
except ImportError as e:
langchain_available = False
print(f"[WARNING] LangChain not available: {e}")
def connect_to_sqlite(db_bytes, filename):
"""Connect to SQLite database from uploaded file bytes"""
global db_conn, db_cursor, db_type, db_schema
try:
# Write bytes to a temporary in-memory database
import io
db_file = io.BytesIO(bytes(db_bytes))
# For SQLite in Pyodide, we need to write to the virtual filesystem
with open(f'/tmp/{filename}', 'wb') as f:
f.write(bytes(db_bytes))
db_conn = sqlite3.connect(f'/tmp/{filename}')
db_cursor = db_conn.cursor()
db_type = 'sqlite'
# Get schema information
db_schema = get_database_schema()
table_count = len(db_schema.get('tables', []))
return f"ā
Connected to SQLite database: {filename}\nš Found {table_count} table(s)"
except Exception as e:
return f"ā Connection failed: {str(e)}"
def connect_to_remote_db(db_type_str, host, port, name, user, password):
"""Connect to remote MySQL or PostgreSQL database"""
global db_conn, db_cursor, db_type, db_schema
try:
if db_type_str == 'mysql':
try:
import mysql.connector
db_conn = mysql.connector.connect(
host=host,
port=int(port),
database=name,
user=user,
password=password
)
db_cursor = db_conn.cursor()
db_type = 'mysql'
except ImportError:
return "ā MySQL connector not available in this environment. Please use SQLite instead."
elif db_type_str == 'postgresql':
try:
import psycopg2
db_conn = psycopg2.connect(
host=host,
port=int(port),
database=name,
user=user,
password=password
)
db_cursor = db_conn.cursor()
db_type = 'postgresql'
except ImportError:
return "ā PostgreSQL connector not available in this environment. Please use SQLite instead."
else:
return f"ā Unsupported database type: {db_type_str}"
# Get schema information
db_schema = get_database_schema()
table_count = len(db_schema.get('tables', []))
return f"ā
Connected to {db_type_str.upper()} database: {name}\nš Found {table_count} table(s)"
except Exception as e:
return f"ā Connection failed: {str(e)}"
def get_database_schema():
"""Extract schema information from connected database"""
global db_conn, db_cursor, db_type
schema = {'tables': [], 'columns': {}}
try:
if db_type == 'sqlite':
# Get all tables
db_cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in db_cursor.fetchall()]
schema['tables'] = tables
# Get columns for each table
for table in tables:
db_cursor.execute(f"PRAGMA table_info({table})")
columns = [{'name': row[1], 'type': row[2], 'nullable': not row[3]}
for row in db_cursor.fetchall()]
schema['columns'][table] = columns
elif db_type == 'mysql':
db_cursor.execute("SHOW TABLES")
tables = [row[0] for row in db_cursor.fetchall()]
schema['tables'] = tables
for table in tables:
db_cursor.execute(f"DESCRIBE {table}")
columns = [{'name': row[0], 'type': row[1], 'nullable': row[2] == 'YES'}
for row in db_cursor.fetchall()]
schema['columns'][table] = columns
elif db_type == 'postgresql':
db_cursor.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'
""")
tables = [row[0] for row in db_cursor.fetchall()]
schema['tables'] = tables
for table in tables:
db_cursor.execute(f"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '{table}'
""")
columns = [{'name': row[0], 'type': row[1], 'nullable': row[2] == 'YES'}
for row in db_cursor.fetchall()]
schema['columns'][table] = columns
return schema
except Exception as e:
print(f"Error getting schema: {e}")
return schema
def execute_sql_query(sql, params=None):
"""Execute SQL query and return results"""
global db_cursor, db_conn
try:
if params:
db_cursor.execute(sql, params)
else:
db_cursor.execute(sql)
# Get column names
if db_cursor.description:
columns = [desc[0] for desc in db_cursor.description]
rows = db_cursor.fetchall()
return {'columns': columns, 'rows': rows, 'count': len(rows)}
else:
db_conn.commit()
return {'message': 'Query executed successfully', 'rowcount': db_cursor.rowcount}
except Exception as e:
return {'error': str(e)}
@tool
def list_tables() -> str:
"""Get list of all tables in the database"""
global db_schema
if not db_schema.get('tables'):
return "No tables found in database"
result = ["## Database Tables\n"]
result.append(f"Found **{len(db_schema['tables'])}** table(s):\n")
for i, table in enumerate(db_schema['tables'], 1):
col_count = len(db_schema['columns'].get(table, []))
result.append(f"{i}. **{table}** ({col_count} columns)")
return "\n".join(result)
@tool
def describe_table(table_name: str) -> str:
"""Get detailed schema information for a specific table"""
global db_schema
if table_name not in db_schema['columns']:
available = ", ".join(db_schema['tables'])
return f"ā Table '{table_name}' not found. Available tables: {available}"
columns = db_schema['columns'][table_name]
result = [f"## Schema: {table_name}\n"]
result.append("| Column | Type | Nullable |")
result.append("| --- | --- | --- |")
for col in columns:
nullable = "ā" if col['nullable'] else "ā"
result.append(f"| {col['name']} | {col['type']} | {nullable} |")
# Get row count
try:
count_result = execute_sql_query(f"SELECT COUNT(*) FROM {table_name}")
if 'rows' in count_result:
row_count = count_result['rows'][0][0]
result.append(f"\n**Total rows:** {row_count:,}")
except:
pass
return "\n".join(result)
@tool
def run_sql_query(sql: str) -> str:
"""Execute a SQL query and return formatted results"""
global db_cursor
# Safety check - block dangerous operations
sql_upper = sql.upper().strip()
dangerous = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 'INSERT', 'UPDATE']
if any(sql_upper.startswith(cmd) for cmd in dangerous):
return f"ā Query blocked: {sql_upper.split()[0]} operations are not allowed for safety"
result = execute_sql_query(sql)
if 'error' in result:
return f"ā SQL Error: {result['error']}"
if 'message' in result:
return f"ā
{result['message']}"
# Format results as table
output = [f"## Query Results\n"]
if [[[SHOW_SQL]]]:
output.append(f"```sql\n{sql}\n```\n")
if result['count'] == 0:
output.append("_No results found_")
else:
# Create markdown table
output.append("| " + " | ".join(result['columns']) + " |")
output.append("| " + " | ".join(["---"] * len(result['columns'])) + " |")
max_rows = min(result['count'], [[[MAX_ROWS]]])
for row in result['rows'][:max_rows]:
output.append("| " + " | ".join(str(val) if val is not None else 'NULL' for val in row) + " |")
if result['count'] > max_rows:
output.append(f"\n_Showing {max_rows} of {result['count']:,} rows_")
else:
output.append(f"\n_Total: {result['count']:,} row(s)_")
return "\n".join(output)
@tool
def get_sample_data(table_name: str, limit: int = 10) -> str:
"""Get sample rows from a table"""
global db_schema
if table_name not in db_schema['tables']:
return f"ā Table '{table_name}' not found"
sql = f"SELECT * FROM {table_name} LIMIT {limit}"
return run_sql_query.invoke({"sql": sql})
def process_user_query(query: str) -> str:
"""Process natural language queries about the database"""
if not langchain_available:
return "ā AI query processing not available. LangChain not installed."
# Check provider first - for local WebLLM, no API key needed
provider = globals().get('PROVIDER', '')
if provider == 'local':
# Using WebLLM for local inference - call the model with context about available tools
query_lower = query.lower()
# For simple commands, use direct tool calling (fast)
if 'tables' in query_lower or 'show all' in query_lower:
return list_tables.invoke({})
elif 'describe' in query_lower or 'schema' in query_lower:
# Try to extract table name
for table in db_schema['tables']:
if table.lower() in query_lower:
return describe_table.invoke({"table_name": table})
return list_tables.invoke({})
# For complex queries, call WebLLM with context
try:
# Build context about available tools and schema
schema_info = "\n".join([f"- {t}" for t in db_schema['tables'][:10]])
system_prompt = f"""You are a SQL database assistant. The database contains these tables:
{schema_info}
You have these tools available:
- list_tables(): Show all tables
- describe_table(table_name): Get table schema
- run_sql_query(sql): Execute SELECT queries
- get_sample_data(table_name, limit): Get sample rows
Based on the user's question, suggest which tool to use or provide a helpful response.
Be concise and specific."""
# Call WebLLM (this is an async function available globally)
import asyncio
response = asyncio.run(call_webllm_from_python(query, system_prompt))
return response
except Exception as e:
# Fallback to helpful message
return f"š¤ Using local WebLLM model.\n\nAvailable commands:\n- 'Show all tables'\n- 'Describe [table_name]'\n- 'Show first 10 rows from [table_name]'\n\nError: {str(e)}"
# For OpenAI/Anthropic providers, require API key
api_key = globals().get('current_api_key', '')
if not api_key:
# Fallback to basic command parsing
query_lower = query.lower()
if 'tables' in query_lower or 'show all' in query_lower:
return list_tables.invoke({})
elif 'describe' in query_lower or 'schema' in query_lower:
# Try to extract table name
for table in db_schema['tables']:
if table.lower() in query_lower:
return describe_table.invoke({"table_name": table})
return list_tables.invoke({})
else:
return "ā ļø Please provide an OpenAI API key for AI-powered natural language queries.\n\nOr try these commands:\n- 'Show all tables'\n- 'Describe [table_name]'"
try:
llm = ChatOpenAI(model="gpt-4o-mini", api_key=api_key)
# Test API key
try:
test = llm.invoke([HumanMessage(content="Test")])
except Exception as e:
return f"ā API key error: {str(e)}"
tools = [list_tables, describe_table, run_sql_query, get_sample_data]
llm_with_tools = llm.bind_tools(tools)
# Create context-aware prompt
schema_info = "\n".join([f"- {t}" for t in db_schema['tables'][:10]])
system_prompt = f"""You are a SQL database assistant. The database contains these tables:
{schema_info}
You have these tools available:
- list_tables(): Show all tables in database
- describe_table(table_name): Get schema of a specific table
- run_sql_query(sql): Execute SELECT queries (read-only)
- get_sample_data(table_name, limit): Get sample rows
Always use tools to interact with the database. When writing SQL:
- Use proper table and column names from the schema
- Keep queries simple and efficient
- Only use SELECT statements (no modifications)
User question: {query}"""
messages = [HumanMessage(content=system_prompt)]
# Tool calling loop with immediate return
for _ in range(3):
response = llm_with_tools.invoke(messages)
messages.append(response)
if not response.tool_calls:
break
for tool_call in response.tool_calls:
try:
tool_name = tool_call["name"]
tool_args = tool_call["args"]
selected_tool = None
for t in tools:
if t.name == tool_name:
selected_tool = t
break
if selected_tool:
tool_result = selected_tool.invoke(tool_args)
messages.append(ToolMessage(content=str(tool_result), tool_call_id=tool_call["id"]))
else:
messages.append(ToolMessage(content=f"Tool {tool_name} not found", tool_call_id=tool_call["id"]))
except Exception as e:
error_msg = f"Error executing {tool_call['name']}: {str(e)}"
messages.append(ToolMessage(content=error_msg, tool_call_id=tool_call["id"]))
# Return tool results immediately
tool_results = [msg.content for msg in messages if isinstance(msg, ToolMessage)]
if tool_results:
return tool_results[-1]
# Get final response if no tools were called
final_response = llm.invoke(messages)
return final_response.content
except Exception as e:
return f"ā Error: {str(e)}"