In this tutorial, we guide you through the design and implementation of a personalized agent frame built on key Pytorch and Python tools, ranging from web modules to intelligence and data science to advanced code generators. We will learn to wrap the basic features in the monitored Customtool classes, to orchestrate several agents with custom system prompts and to define end -to -end workflows which automate tasks such as the competitive analysis of websites and data processing pipelines. Along the way, we demonstrate examples of the real world, with logic, journalization and performance measures to try again, so that you can deploy and set up these agents within the existing infrastructure of your organization.
!pip install -q torch transformers datasets pillow requests beautifulsoup4 pandas numpy scikit-learn openai
import os, json, asyncio, threading, time
import torch, pandas as pd, numpy as np
from PIL import Image
import requests
from io import BytesIO, StringIO
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, lru_cache
from typing import Dict, List, Optional, Any, Callable, Union
import logging
from dataclasses import dataclass
import inspect
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_TIMEOUT = 15
MAX_RETRIES = 3
We start by installing and importing all basic libraries, including Pytorch and Transformers, as well as data processing libraries such as Pandas and Numpy, and public services like Beautifulsoup for Web scratch and Scikit-Learn for Automatic Learning. We configure a standardized journalization configuration to capture information and error messages, and define the overall constants for the expiration times of the API and the limits to try again, ensuring that our tools behave predictably in production.
@dataclass
class ToolResult:
"""Standardized tool result structure"""
success: bool
data: Any
error: Optional(str) = None
execution_time: float = 0.0
metadata: Dict(str, Any) = None
class CustomTool:
"""Base class for custom tools"""
def __init__(self, name: str, description: str, func: Callable):
self.name = name
self.description = description
self.func = func
self.calls = 0
self.avg_execution_time = 0.0
self.error_rate = 0.0
def execute(self, *args, **kwargs) -> ToolResult:
"""Execute tool with monitoring"""
start_time = time.time()
self.calls += 1
try:
result = self.func(*args, **kwargs)
execution_time = time.time() - start_time
self.avg_execution_time = ((self.avg_execution_time * (self.calls - 1)) + execution_time) / self.calls
return ToolResult(
success=True,
data=result,
execution_time=execution_time,
metadata={'tool_name': self.name, 'call_count': self.calls}
)
except Exception as e:
execution_time = time.time() - start_time
self.error_rate = (self.error_rate * (self.calls - 1) + 1) / self.calls
logger.error(f"Tool {self.name} failed: {str(e)}")
return ToolResult(
success=False,
data=None,
error=str(e),
execution_time=execution_time,
metadata={'tool_name': self.name, 'call_count': self.calls}
)
We define a class of Tolresult data to encapsulate the result of each execution, which he has succeeded, how long he has taken over returned and error details if he failed. Our Customtool base class then envelops the individual functions with a unified execution method which follows the number of calls, measures the execution time, calculates an average execution time and records all errors. By normalizing the results of the tools and the performance measures in this way, we ensure consistency and observability in all our personalized utilities.
class CustomAgent:
"""Custom agent implementation with tool management"""
def __init__(self, name: str, system_prompt: str = "", max_iterations: int = 5):
self.name = name
self.system_prompt = system_prompt
self.max_iterations = max_iterations
self.tools = {}
self.conversation_history = ()
self.performance_metrics = {}
def add_tool(self, tool: CustomTool):
"""Add a tool to the agent"""
self.tools(tool.name) = tool
def run(self, task: str) -> Dict(str, Any):
"""Execute a task using available tools"""
logger.info(f"Agent {self.name} executing task: {task}")
task_lower = task.lower()
results = ()
if any(keyword in task_lower for keyword in ('analyze', 'website', 'url', 'web')):
if 'advanced_web_intelligence' in self.tools:
import re
url_pattern = r'https?://(^\s)+'
urls = re.findall(url_pattern, task)
if urls:
result = self.tools('advanced_web_intelligence').execute(urls(0))
results.append(result)
elif any(keyword in task_lower for keyword in ('data', 'analyze', 'stats', 'csv')):
if 'advanced_data_science_toolkit' in self.tools:
if 'name,age,salary' in task:
data_start = task.find('name,age,salary')
data_part = task(data_start:)
result = self.tools('advanced_data_science_toolkit').execute(data_part, 'stats')
results.append(result)
elif any(keyword in task_lower for keyword in ('generate', 'code', 'api', 'client')):
if 'advanced_code_generator' in self.tools:
result = self.tools('advanced_code_generator').execute(task)
results.append(result)
return {
'agent': self.name,
'task': task,
'results': (r.data if r.success else {'error': r.error} for r in results),
'execution_summary': {
'tools_used': len(results),
'success_rate': sum(1 for r in results if r.success) / len(results) if results else 0,
'total_time': sum(r.execution_time for r in results)
}
}
We encapsulate our AI logic in a personalized class which contains a set of tools, a system prompt and the execution history, then transports each incoming task to the right tool according to the simple correspondence of the keywords. In the RUN () method, we record the task, select the appropriate tool (web intelligence, data analysis or code generation), implementing the results in a standardized response which includes success rates and synchronization measures. This design allows us to easily extend agents by adding new tools and maintains our orchestration that is both transparent and measurable.
print("🏗️ Building Advanced Tool Architecture")
def performance_monitor(func):
"""Decorator for monitoring tool performance"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info(f"{func.__name__} executed in {execution_time:.2f}s")
return result
except Exception as e:
logger.error(f"{func.__name__} failed: {str(e)}")
raise
return wrapper
@performance_monitor
def advanced_web_intelligence(url: str, analysis_type: str = "comprehensive") -> Dict(str, Any):
"""
Advanced web intelligence gathering with multiple analysis modes.
Args:
url: Target URL for analysis
analysis_type: Type of analysis (comprehensive, sentiment, technical, seo)
Returns:
Dict containing structured analysis results
"""
try:
response = requests.get(url, timeout=API_TIMEOUT, headers={
'User-Agent': 'Mozilla/5.0'
})
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
title = soup.find('title').text if soup.find('title') else 'No title'
meta_desc = soup.find('meta', attrs={'name': 'description'})
meta_desc = meta_desc.get('content') if meta_desc else 'No description'
if analysis_type == "comprehensive":
return {
'title': title,
'description': meta_desc,
'word_count': len(soup.get_text().split()),
'image_count': len(soup.find_all('img')),
'link_count': len(soup.find_all('a')),
'headers': (h.text.strip() for h in soup.find_all(('h1', 'h2', 'h3'))(:5)),
'status_code': response.status_code,
'content_type': response.headers.get('content-type', 'unknown'),
'page_size': len(response.content)
}
elif analysis_type == "sentiment":
text = soup.get_text()(:2000)
positive_words = ('good', 'great', 'excellent', 'amazing', 'wonderful', 'fantastic')
negative_words = ('bad', 'terrible', 'awful', 'horrible', 'disappointing')
pos_count = sum(text.lower().count(word) for word in positive_words)
neg_count = sum(text.lower().count(word) for word in negative_words)
return {
'sentiment_score': pos_count - neg_count,
'positive_indicators': pos_count,
'negative_indicators': neg_count,
'text_sample': text(:200),
'analysis_type': 'sentiment'
}
except Exception as e:
return {'error': f"Analysis failed: {str(e)}"}
@performance_monitor
def advanced_data_science_toolkit(data: str, operation: str) -> Dict(str, Any):
"""
Comprehensive data science operations with statistical analysis.
Args:
data: CSV-like string or JSON data
operation: Type of analysis (stats, correlation, forecast, clustering)
Returns:
Dict with analysis results
"""
try:
if data.startswith('{') or data.startswith('('):
parsed_data = json.loads(data)
df = pd.DataFrame(parsed_data)
else:
df = pd.read_csv(StringIO(data))
if operation == "stats":
numeric_columns = df.select_dtypes(include=(np.number)).columns.tolist()
result = {
'shape': df.shape,
'columns': df.columns.tolist(),
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'missing_values': df.isnull().sum().to_dict(),
'numeric_columns': numeric_columns
}
if len(numeric_columns) > 0:
result('summary_stats') = df(numeric_columns).describe().to_dict()
if len(numeric_columns) > 1:
result('correlation_matrix') = df(numeric_columns).corr().to_dict()
return result
elif operation == "clustering":
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
numeric_df = df.select_dtypes(include=(np.number))
if numeric_df.shape(1) Dict(str, str):
"""
Advanced code generation with multiple language support and optimization.
Args:
task_description: Description of coding task
language: Target programming language
Returns:
Dict with generated code and metadata
"""
templates = {
'python': {
'api_client': '''
import requests
import json
import time
from typing import Dict, Any, Optional
class APIClient:
"""Production-ready API client with retry logic and error handling"""
def __init__(self, base_url: str, api_key: Optional(str) = None, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.session = requests.Session()
if api_key:
self.session.headers.update({'Authorization': f'Bearer {api_key}'})
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'CustomAPIClient/1.0'
})
def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict(str, Any):
"""Make HTTP request with retry logic"""
url = f'{self.base_url}/{endpoint.lstrip("/")}'
for attempt in range(3):
try:
response = self.session.request(method, url, timeout=self.timeout, **kwargs)
response.raise_for_status()
return response.json() if response.content else {}
except requests.exceptions.RequestException as e:
if attempt == 2: # Last attempt
raise
time.sleep(2 ** attempt) # Exponential backoff
def get(self, endpoint: str, params: Optional(Dict) = None) -> Dict(str, Any):
return self._make_request('GET', endpoint, params=params)
def post(self, endpoint: str, data: Optional(Dict) = None) -> Dict(str, Any):
return self._make_request('POST', endpoint, json=data)
def put(self, endpoint: str, data: Optional(Dict) = None) -> Dict(str, Any):
return self._make_request('PUT', endpoint, json=data)
def delete(self, endpoint: str) -> Dict(str, Any):
return self._make_request('DELETE', endpoint)
''',
'data_processor': '''
import pandas as pd
import numpy as np
from typing import List, Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class DataProcessor:
"""Advanced data processor with comprehensive cleaning and analysis"""
def __init__(self, data: pd.DataFrame):
self.original_data = data.copy()
self.processed_data = data.copy()
self.processing_log = ()
def clean_data(self, strategy: str="auto") -> 'DataProcessor':
"""Clean data with configurable strategies"""
initial_shape = self.processed_data.shape
# Remove duplicates
self.processed_data = self.processed_data.drop_duplicates()
# Handle missing values based on strategy
if strategy == 'auto':
# For numeric columns, use mean
numeric_cols = self.processed_data.select_dtypes(include=(np.number)).columns
self.processed_data(numeric_cols) = self.processed_data(numeric_cols).fillna(
self.processed_data(numeric_cols).mean()
)
# For categorical columns, use mode
categorical_cols = self.processed_data.select_dtypes(include=('object')).columns
for col in categorical_cols:
mode_value = self.processed_data(col).mode()
if len(mode_value) > 0:
self.processed_data(col) = self.processed_data(col).fillna(mode_value(0))
final_shape = self.processed_data.shape
self.processing_log.append(f"Cleaned data: {initial_shape} -> {final_shape}")
return self
def normalize(self, method: str="minmax", columns: Optional(List(str)) = None) -> 'DataProcessor':
"""Normalize numerical columns"""
cols = columns or self.processed_data.select_dtypes(include=(np.number)).columns.tolist()
if method == 'minmax':
# Min-max normalization
for col in cols:
col_min, col_max = self.processed_data(col).min(), self.processed_data(col).max()
if col_max != col_min:
self.processed_data(col) = (self.processed_data(col) - col_min) / (col_max - col_min)
elif method == 'zscore':
# Z-score normalization
for col in cols:
mean_val, std_val = self.processed_data(col).mean(), self.processed_data(col).std()
if std_val != 0:
self.processed_data(col) = (self.processed_data(col) - mean_val) / std_val
self.processing_log.append(f"Normalized columns {cols} using {method}")
return self
def get_insights(self) -> Dict(str, Any):
"""Generate comprehensive data insights"""
insights = {
'basic_info': {
'shape': self.processed_data.shape,
'columns': self.processed_data.columns.tolist(),
'dtypes': {col: str(dtype) for col, dtype in self.processed_data.dtypes.items()}
},
'data_quality': {
'missing_values': self.processed_data.isnull().sum().to_dict(),
'duplicate_rows': self.processed_data.duplicated().sum(),
'memory_usage': self.processed_data.memory_usage(deep=True).to_dict()
},
'processing_log': self.processing_log
}
# Add statistical summary for numeric columns
numeric_data = self.processed_data.select_dtypes(include=(np.number))
if len(numeric_data.columns) > 0:
insights('statistical_summary') = numeric_data.describe().to_dict()
return insights
'''
}
}
task_lower = task_description.lower()
if any(keyword in task_lower for keyword in ('api', 'client', 'http', 'request')):
code = templates(language)('api_client')
description = "Production-ready API client with retry logic and comprehensive error handling"
elif any(keyword in task_lower for keyword in ('data', 'process', 'clean', 'analyze')):
code = templates(language)('data_processor')
description = "Advanced data processor with cleaning, normalization, and insight generation"
else:
code = f'''# Generated code template for: {task_description}
# Language: {language}
class CustomSolution:
"""Auto-generated solution template"""
def __init__(self):
self.initialized = True
def execute(self, *args, **kwargs):
"""Main execution method - implement your logic here"""
return {{"message": "Implement your custom logic here", "task": "{task_description}"}}
# Usage example:
# solution = CustomSolution()
# result = solution.execute()
'''
description = f"Custom template for {task_description}"
return {
'code': code,
'language': language,
'description': description,
'complexity': 'production-ready',
'estimated_lines': len(code.split('\n')),
'features': ('error_handling', 'logging', 'type_hints', 'documentation')
}
We wrap each basic function in a @performance_monitor decorator so that we can record execution time and capture failures, then implement three specialized tools: advanced_web_intelligence for a complete web analysis or focused on feeling, advanced_data_science_toolkit for a statistical analysis and a cluster on CSV or JSON data. Advanced_Code_Generator for the production of the production of the production of production of production of production of production or production of production of production of production of production or production of production of production of production or production of production of production of production or Advanced_Code_Gen Code Maintense in all of our analysis and code generation of code.
print("🤖 Setting up Custom Agent Framework")
class AgentOrchestrator:
"""Manages multiple specialized agents with workflow coordination"""
def __init__(self):
self.agents = {}
self.workflows = {}
self.results_cache = {}
self.performance_metrics = {}
def create_specialist_agent(self, name: str, tools: List(CustomTool), system_prompt: str = None):
"""Create domain-specific agents"""
agent = CustomAgent(
name=name,
system_prompt=system_prompt or f"You are a specialist {name} agent.",
max_iterations=5
)
for tool in tools:
agent.add_tool(tool)
self.agents(name) = agent
return agent
def execute_workflow(self, workflow_name: str, inputs: Dict) -> Dict:
"""Execute multi-step workflows across agents"""
if workflow_name not in self.workflows:
raise ValueError(f"Workflow {workflow_name} not found")
workflow = self.workflows(workflow_name)
results = {}
workflow_start = time.time()
for step in workflow('steps'):
agent_name = step('agent')
task = step('task').format(**inputs, **results)
if agent_name in self.agents:
step_start = time.time()
result = self.agents(agent_name).run(task)
step_time = time.time() - step_start
results(step('output_key')) = result
results(f"{step('output_key')}_time") = step_time
total_time = time.time() - workflow_start
return {
'workflow': workflow_name,
'inputs': inputs,
'results': results,
'metadata': {
'total_execution_time': total_time,
'steps_completed': len(workflow('steps')),
'success': True
}
}
def get_system_status(self) -> Dict(str, Any):
"""Get comprehensive system status"""
return {
'agents': {name: {'tools': len(agent.tools)} for name, agent in self.agents.items()},
'workflows': list(self.workflows.keys()),
'cache_size': len(self.results_cache),
'total_tools': sum(len(agent.tools) for agent in self.agents.values())
}
orchestrator = AgentOrchestrator()
web_tool = CustomTool(
name="advanced_web_intelligence",
description="Advanced web analysis and intelligence gathering",
func=advanced_web_intelligence
)
data_tool = CustomTool(
name="advanced_data_science_toolkit",
description="Comprehensive data science and statistical analysis",
func=advanced_data_science_toolkit
)
code_tool = CustomTool(
name="advanced_code_generator",
description="Advanced code generation and architecture",
func=advanced_code_generator
)
web_agent = orchestrator.create_specialist_agent(
"web_analyst",
(web_tool),
"You are a web analysis specialist. Provide comprehensive website analysis and insights."
)
data_agent = orchestrator.create_specialist_agent(
"data_scientist",
(data_tool),
"You are a data science expert. Perform statistical analysis and machine learning tasks."
)
code_agent = orchestrator.create_specialist_agent(
"code_architect",
(code_tool),
"You are a senior software architect. Generate optimized, production-ready code."
)
We initialize an agentorchestrator to manage our suite of AI agents, record each Customtool implementation for web intelligence, data science and code generation, then rotated three agents specific to the domain: Web_analyst, Data_Scientist and Code_Architect. Each agent is sown with its respective tool set and a clear system prompt. This configuration allows us to coordinate and execute workflows in several stages in areas of expertise specialized in a single unified setting.
print("⚡ Defining Advanced Workflows")
orchestrator.workflows('competitive_analysis') = {
'steps': (
{
'agent': 'web_analyst',
'task': 'Analyze website {target_url} with comprehensive analysis',
'output_key': 'website_analysis'
},
{
'agent': 'code_architect',
'task': 'Generate monitoring code for website analysis automation',
'output_key': 'monitoring_code'
}
)
}
orchestrator.workflows('data_pipeline') = {
'steps': (
{
'agent': 'data_scientist',
'task': 'Analyze the following CSV data with stats operation: {data_input}',
'output_key': 'data_analysis'
},
{
'agent': 'code_architect',
'task': 'Generate data processing pipeline code',
'output_key': 'pipeline_code'
}
)
}
We define two key multi-agent workflows: Competitive_analysis, which involves our scratching web analyst and analyzing a target URL before transmitting information to our code architect to generate surveillance scripts, and data_pipeline, where our data scientist performs statistical analyzes on CSV inputs. Then, our code architect creates the corresponding Pipeline Pipeline code. These declarative step sequences have made it possible to orchestrate complex end -to -end tasks with a minimum minimum.
print("🚀 Running Production Examples")
print("\n📊 Advanced Web Intelligence Demo")
try:
web_result = web_agent.run("Analyze https://httpbin.org/html with comprehensive analysis type")
print(f"✅ Web Analysis Success: {json.dumps(web_result, indent=2)}")
except Exception as e:
print(f"❌ Web analysis error: {e}")
print("\n🔬 Data Science Pipeline Demo")
sample_data = """name,age,salary,department
Alice,25,50000,Engineering
Bob,30,60000,Engineering
Carol,35,70000,Marketing
David,28,55000,Engineering
Eve,32,65000,Marketing"""
try:
data_result = data_agent.run(f"Analyze this data with stats operation: {sample_data}")
print(f"✅ Data Analysis Success: {json.dumps(data_result, indent=2)}")
except Exception as e:
print(f"❌ Data analysis error: {e}")
print("\n💻 Code Architecture Demo")
try:
code_result = code_agent.run("Generate an API client for data processing tasks")
print(f"✅ Code Generation Success: Generated {len(code_result('results')(0)('code').split())} lines of code")
except Exception as e:
print(f"❌ Code generation error: {e}")
print("\n🔄 Multi-Agent Workflow Demo")
try:
workflow_inputs = {'target_url': 'https://httpbin.org/html'}
workflow_result = orchestrator.execute_workflow('competitive_analysis', workflow_inputs)
print(f"✅ Workflow Success: Completed in {workflow_result('metadata')('total_execution_time'):.2f}s")
except Exception as e:
print(f"❌ Workflow error: {e}")
We follow a series of production demos to validate each component: first, our web_analyst performs a complete analysis; Then our data_scientist cracks samples of CSV statistics; Then our code_architect generates an API customer; And finally, we orchestrate the workflow of competitive analysis from start to finish, capturing the indicators of success, outputs and execution synchronization for each step.
print("\n📈 System Performance Metrics")
system_status = orchestrator.get_system_status()
print(f"System Status: {json.dumps(system_status, indent=2)}")
print("\nTool Performance:")
for agent_name, agent in orchestrator.agents.items():
print(f"\n{agent_name}:")
for tool_name, tool in agent.tools.items():
print(f" - {tool_name}: {tool.calls} calls, {tool.avg_execution_time:.3f}s avg, {tool.error_rate:.1%} error rate")
print("\n✅ Advanced Custom Agent Framework Complete!")
print("🚀 Production-ready implementation with full monitoring and error handling!")
We finish by recovering and printing the overall state of our orchestrator, by listing the registered agents, the workflows and the size of the cache, then browse the tools of each agent to display the number of calls, average execution times and error rates. This gives us a real -time view of performance and reliability before registering a final confirmation that our agent manager for production is completed.
In conclusion, we now have a plan for the creation of specialized AI agents who carry out complex analyzes and generate production quality code, and also the self-assessment of their health and their use of resources. The Agentorchestrateur attaches everything together, allowing you to coordinate workflows in several stages and to capture information on granular performance between agents. Whether you automatize market studies, ETL tasks or the generation of API customers, this framework offers extensibility, reliability and observability required for business quality AI deployments.
Discover the Codes. All the merit of this research goes to researchers in this project. Also, don't hesitate to follow us Twitter And don't forget to join our Subseubdredit 100k + ml and subscribe to Our newsletter.
Asif Razzaq is the CEO of Marktechpost Media Inc .. as a visionary entrepreneur and engineer, AIF undertakes to exploit the potential of artificial intelligence for social good. His most recent company is the launch of an artificial intelligence media platform, Marktechpost, which stands out from its in-depth coverage of automatic learning and in-depth learning news which are both technically solid and easily understandable by a large audience. The platform has more than 2 million monthly views, illustrating its popularity with the public.
