Building a workflow on Langchain Olllaa Langchain accelerated with cloth agents, monitoring of multi-session cat performance

by Brenden Burgess

When you buy through links on our site, we may earn a commission at no extra cost to you. However, this does not influence our evaluations.

Screenshot 2025 07 25 at 10.52.39 PM

In this tutorial, we build a local LLM battery with GPU capture which unifies Olllama and Langchain. We install the required libraries, launching the Olllama server, exploit a model and rolls it up in a personalized Langchain LLM, which allows us to control the temperature, tokens and the context. We add a generation layer to recovery that ingests PDF or text, makes them pieces, incorporates them with sentence transformers and serves founded responses. We manage multi-session chat memory, save tools (web + RAG request) and run an agent who reasons when it comes to calling them.

import os
import sys
import subprocess
import time
import threading
import queue
import json
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from contextlib import contextmanager
import asyncio
from concurrent.futures import ThreadPoolExecutor


def install_packages():
    """Install required packages for Colab environment"""
    packages = (
        "langchain",
        "langchain-community",
        "langchain-core",
        "chromadb",
        "sentence-transformers",
        "faiss-cpu",
        "pypdf",
        "python-docx",
        "requests",
        "psutil",
        "pyngrok",
        "gradio"
    )
   
    for package in packages:
        subprocess.check_call((sys.executable, "-m", "pip", "install", package))


install_packages()


import requests
import psutil
import threading
from queue import Queue
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryBufferMemory
from langchain.chains import ConversationChain, RetrievalQA
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma
from langchain.agents import AgentType, initialize_agent, Tool
from langchain.tools import DuckDuckGoSearchRun

We import the Python utilities necessary in Colab for competition, system calls and JSON manipulation. We define and enforce Install_packages () to extract Langchain, Integres, Vector Stores, Document chargers, Surveillance and User Interface. We then import Langchain LLM, Memory, Retrieval and Agent Tools (including DuckDuckGo Search) to build an extensive cloth workflow.

(Download the complete codes with a notebook here)

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4

We define an Ollamaconfig data class so that we keep all Olllama's execution parameters in a single clean place. We define the name of the model and the termination point of the local API, as well as the generation behavior (Max_tokens, Temperature and Context_window). We control the performance with GPU_Layers ( – 1 = load everything in GPU when possible), Batch_size and Threads for parallelism.

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4
We define an OllamaConfig dataclass so we keep all Ollama runtime settings in one clean place. We set the model name and local API endpoint, as well as the generation behavior (max_tokens, temperature, and context_window). We control performance with gpu_layers (‑1 = load all to GPU when possible), batch_size, and threads for parallelism.

class OllamaManager:
    """Advanced Ollama manager for Colab environment"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.process = None
        self.is_running = False
        self.models_cache = {}
        self.performance_monitor = PerformanceMonitor()
       
    def install_ollama(self):
        """Install Ollama in Colab environment"""
        try:
            subprocess.run((
                "curl", "-fsSL", "https://ollama.com/install.sh", "-o", "/tmp/install.sh"
            ), check=True)
           
            subprocess.run(("bash", "/tmp/install.sh"), check=True)
            print("✅ Ollama installed successfully")
           
        except subprocess.CalledProcessError as e:
            print(f"❌ Failed to install Ollama: {e}")
            raise
   
    def start_server(self):
        """Start Ollama server with GPU support"""
        if self.is_running:
            print("Ollama server is already running")
            return
           
        try:
            env = os.environ.copy()
            env("OLLAMA_NUM_PARALLEL") = str(self.config.threads)
            env("OLLAMA_MAX_LOADED_MODELS") = "3"
           
            self.process = subprocess.Popen(
                ("ollama", "serve"),
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
           
            time.sleep(5)
           
            if self.health_check():
                self.is_running = True
                print("✅ Ollama server started successfully")
                self.performance_monitor.start()
            else:
                raise Exception("Server failed to start properly")
               
        except Exception as e:
            print(f"❌ Failed to start Ollama server: {e}")
            raise
   
    def health_check(self) -> bool:
        """Check if Ollama server is healthy"""
        try:
            response = requests.get(f"{self.config.base_url}/api/tags", timeout=10)
            return response.status_code == 200
        except:
            return False
   
    def pull_model(self, model_name: str) -> bool:
        """Pull a model from Ollama registry"""
        try:
            print(f"🔄 Pulling model: {model_name}")
            result = subprocess.run(
                ("ollama", "pull", model_name),
                capture_output=True,
                text=True,
                timeout=1800  
            )
           
            if result.returncode == 0:
                print(f"✅ Model {model_name} pulled successfully")
                self.models_cache(model_name) = True
                return True
            else:
                print(f"❌ Failed to pull model {model_name}: {result.stderr}")
                return False
               
        except subprocess.TimeoutExpired:
            print(f"❌ Timeout pulling model {model_name}")
            return False
        except Exception as e:
            print(f"❌ Error pulling model {model_name}: {e}")
            return False
   
    def list_models(self) -> List(str):
        """List available local models"""
        try:
            result = subprocess.run(
                ("ollama", "list"),
                capture_output=True,
                text=True
            )
           
            models = ()
            for line in result.stdout.split('\n')(1:):
                if line.strip():
                    model_name = line.split()(0)
                    models.append(model_name)
                   
            return models
           
        except Exception as e:
            print(f"❌ Error listing models: {e}")
            return ()
   
    def stop_server(self):
        """Stop Ollama server"""
        if self.process:
            self.process.terminate()
            self.process.wait()
            self.is_running = False
            self.performance_monitor.stop()
            print("✅ Ollama server stopped")

We create the Ollamamanager class to install, start, monitor and manage the Ollama server in the Colab environment. We define the environmental variables for the GPU parallelism, execute the server in the background and check that it takes place with a health check. We carry out models on demand, hide them, enhance those available locally and graciously stop the server when the task is finished, while following the performance.

(Download the complete codes with a notebook here)

class PerformanceMonitor:
    """Monitor system performance and resource usage"""
   
    def __init__(self):
        self.monitoring = False
        self.stats = {
            "cpu_usage": (),
            "memory_usage": (),
            "gpu_usage": (),
            "inference_times": ()
        }
        self.monitor_thread = None
   
    def start(self):
        """Start performance monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
   
    def stop(self):
        """Stop performance monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
   
    def _monitor_loop(self):
        """Main monitoring loop"""
        while self.monitoring:
            try:
                cpu_percent = psutil.cpu_percent(interval=1)
                memory = psutil.virtual_memory()
               
                self.stats("cpu_usage").append(cpu_percent)
                self.stats("memory_usage").append(memory.percent)
               
                for key in ("cpu_usage", "memory_usage"):
                    if len(self.stats(key)) > 100:
                        self.stats(key) = self.stats(key)(-100:)
               
                time.sleep(5)
               
            except Exception as e:
                print(f"Monitoring error: {e}")
   
    def get_stats(self) -> Dict(str, Any):
        """Get current performance statistics"""
        return {
            "avg_cpu": sum(self.stats("cpu_usage")(-10:)) / max(len(self.stats("cpu_usage")(-10:)), 1),
            "avg_memory": sum(self.stats("memory_usage")(-10:)) / max(len(self.stats("memory_usage")(-10:)), 1),
            "total_inferences": len(self.stats("inference_times")),
            "avg_inference_time": sum(self.stats("inference_times")) / max(len(self.stats("inference_times")), 1)
        }

We define a PerformanceMeMonitor class to follow the processor, memory and real -time inference times while the Ollama server is running. We launch a background line to collect statistics every few seconds, store recent measurements and provide means of use. This helps us monitor the system load and optimize performance during the model's inference.

(Download the complete codes with a notebook here)

class OllamaLLM(LLM):
    """Custom LangChain LLM for Ollama"""
   
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    temperature: float = 0.7
    max_tokens: int = 2048
    performance_monitor: Optional(PerformanceMonitor) = None
   
    @property
    def _llm_type(self) -> str:
        return "ollama"
   
    def _call(
        self,
        prompt: str,
        stop: Optional(List(str)) = None,
        run_manager: Optional(CallbackManagerForLLMRun) = None,
        **kwargs: Any,
    ) -> str:
        """Make API call to Ollama"""
        start_time = time.time()
       
        try:
            payload = {
                "model": self.model_name,
                "prompt": prompt,
                "stream": False,
                "options": {
                    "temperature": self.temperature,
                    "num_predict": self.max_tokens,
                    "stop": stop or ()
                }
            }
           
            response = requests.post(
                f"{self.base_url}/api/generate",
                json=payload,
                timeout=120
            )
           
            response.raise_for_status()
            result = response.json()
           
            inference_time = time.time() - start_time
           
            if self.performance_monitor:
                self.performance_monitor.stats("inference_times").append(inference_time)
           
            return result.get("response", "")
           
        except Exception as e:
            print(f"❌ Ollama API error: {e}")
            return f"Error: {str(e)}"

We wrap the Olllama API in a personalized Ollamallm class compatible with Langchain's LLM interface. We define how the prompts are sent to the Ollama server and record each inference time for performance monitoring. This allows us to plug Olllama directly into the Langchain chains, agents and memory components while monitoring efficiency.

class RAGSystem:
    """Retrieval-Augmented Generation system"""
   
    def __init__(self, llm: OllamaLLM, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.llm = llm
        self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
        self.vector_store = None
        self.qa_chain = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )
   
    def add_documents(self, file_paths: List(str)):
        """Add documents to the vector store"""
        documents = ()
       
        for file_path in file_paths:
            try:
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                else:
                    loader = TextLoader(file_path)
               
                docs = loader.load()
                documents.extend(docs)
               
            except Exception as e:
                print(f"❌ Error loading {file_path}: {e}")
       
        if documents:
            splits = self.text_splitter.split_documents(documents)
           
            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(splits, self.embeddings)
            else:
                self.vector_store.add_documents(splits)
           
            self.qa_chain = RetrievalQA.from_chain_type(
                llm=self.llm,
                chain_type="stuff",
                retriever=self.vector_store.as_retriever(search_kwargs={"k": 3}),
                return_source_documents=True
            )
           
            print(f"✅ Added {len(splits)} document chunks to vector store")
   
    def query(self, question: str) -> Dict(str, Any):
        """Query the RAG system"""
        if not self.qa_chain:
            return {"answer": "No documents loaded. Please add documents first."}
       
        try:
            result = self.qa_chain({"query": question})
            return {
                "answer": result("result"),
                "sources": (doc.metadata for doc in result.get("source_documents", ()))
            }
        except Exception as e:
            return {"answer": f"Error: {str(e)}"}

We use conversationManager to manage multi-session memory, allowing both cat stories based on buffers and based on a summary for each session. Then, in Ollamalangchainsystem, we bring together all the components, server, llm, cloth, memory, tools and agents, in a unified interface. We configure the system to install Ollama, Pull Models, build agents with tools such as web search and cloth, and exhibit cat, download documents and model change capacities for transparent interaction.

class ConversationManager:
    """Manage conversation history and memory"""
   
    def __init__(self, llm: OllamaLLM, memory_type: str = "buffer"):
        self.llm = llm
        self.conversations = {}
        self.memory_type = memory_type
       
    def get_conversation(self, session_id: str) -> ConversationChain:
        """Get or create conversation for session"""
        if session_id not in self.conversations:
            if self.memory_type == "buffer":
                memory = ConversationBufferWindowMemory(k=10)
            elif self.memory_type == "summary":
                memory = ConversationSummaryBufferMemory(
                    llm=self.llm,
                    max_token_limit=1000
                )
            else:
                memory = ConversationBufferWindowMemory(k=10)
           
            self.conversations(session_id) = ConversationChain(
                llm=self.llm,
                memory=memory,
                verbose=True
            )
       
        return self.conversations(session_id)
   
    def chat(self, session_id: str, message: str) -> str:
        """Chat with specific session"""
        conversation = self.get_conversation(session_id)
        return conversation.predict(input=message)
   
    def clear_session(self, session_id: str):
        """Clear conversation history for session"""
        if session_id in self.conversations:
            del self.conversations(session_id)


class OllamaLangChainSystem:
    """Main system integrating all components"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.manager = OllamaManager(config)
        self.llm = None
        self.rag_system = None
        self.conversation_manager = None
        self.tools = ()
        self.agent = None
       
    def setup(self):
        """Complete system setup"""
        print("🚀 Setting up Ollama + LangChain system...")
       
        self.manager.install_ollama()
        self.manager.start_server()
       
        if not self.manager.pull_model(self.config.model_name):
            print("❌ Failed to pull default model")
            return False
       
        self.llm = OllamaLLM(
            model_name=self.config.model_name,
            base_url=self.config.base_url,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens,
            performance_monitor=self.manager.performance_monitor
        )
       
        self.rag_system = RAGSystem(self.llm)
       
        self.conversation_manager = ConversationManager(self.llm)
       
        self._setup_tools()
       
        print("✅ System setup complete!")
        return True
   
    def _setup_tools(self):
        """Setup tools for the agent"""
        search = DuckDuckGoSearchRun()
       
        self.tools = (
            Tool(
                name="Search",
                func=search.run,
                description="Search the internet for current information"
            ),
            Tool(
                name="RAG_Query",
                func=lambda q: self.rag_system.query(q)("answer"),
                description="Query loaded documents using RAG"
            )
        )
       
        self.agent = initialize_agent(
            tools=self.tools,
            llm=self.llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True
        )
   
    def chat(self, message: str, session_id: str = "default") -> str:
        """Simple chat interface"""
        return self.conversation_manager.chat(session_id, message)
   
    def rag_chat(self, question: str) -> Dict(str, Any):
        """RAG-based chat"""
        return self.rag_system.query(question)
   
    def agent_chat(self, message: str) -> str:
        """Agent-based chat with tools"""
        return self.agent.run(message)
   
    def switch_model(self, model_name: str) -> bool:
        """Switch to different model"""
        if self.manager.pull_model(model_name):
            self.llm.model_name = model_name
            print(f"✅ Switched to model: {model_name}")
            return True
        return False
   
    def load_documents(self, file_paths: List(str)):
        """Load documents into RAG system"""
        self.rag_system.add_documents(file_paths)
   
    def get_performance_stats(self) -> Dict(str, Any):
        """Get system performance statistics"""
        return self.manager.performance_monitor.get_stats()
   
    def cleanup(self):
        """Clean up resources"""
        self.manager.stop_server()
        print("✅ System cleanup complete")

We use the manner conversation to maintain distinct cat sessions, each with its type of memory, based on a stamp or based on a summary, allowing us to preserve or summarize the context if necessary. In the Ollamalangchainsystem system, we integrate everything: we install and launch Olllama, exploit the desired model, rolls it up in an LLM compatible Langchain, connect a cloth system, initialize cat memory and record external tools such as web search.

def main():
    """Main function demonstrating the system"""
   
    config = OllamaConfig(
        model_name="llama2",
        temperature=0.7,
        max_tokens=2048
    )
   
    system = OllamaLangChainSystem(config)
   
    try:
        if not system.setup():
            return
       
        print("\n🗣️ Testing basic chat:")
        response = system.chat("Hello! How are you?")
        print(f"Response: {response}")
       
        print("\n🔄 Testing model switching:")
        models = system.manager.list_models()
        print(f"Available models: {models}")
       
       
        print("\n🤖 Testing agent:")
        agent_response = system.agent_chat("What's the current weather like?")
        print(f"Agent Response: {agent_response}")
       
        print("\n📊 Performance Statistics:")
        stats = system.get_performance_stats()
        print(json.dumps(stats, indent=2))
       
    except KeyboardInterrupt:
        print("\n⏹️ Interrupted by user")
    except Exception as e:
        print(f"❌ Error: {e}")
    finally:
        system.cleanup()


def create_gradio_interface(system: OllamaLangChainSystem):
    """Create a Gradio interface for easy interaction"""
    try:
        import gradio as gr
       
        def chat_interface(message, history, mode):
            if mode == "Basic Chat":
                response = system.chat(message)
            elif mode == "RAG Chat":
                result = system.rag_chat(message)
                response = result("answer")
            elif mode == "Agent Chat":
                response = system.agent_chat(message)
            else:
                response = "Unknown mode"
           
            history.append((message, response))
            return "", history
       
        def upload_docs(files):
            if files:
                file_paths = (f.name for f in files)
                system.load_documents(file_paths)
                return f"Loaded {len(file_paths)} documents into RAG system"
            return "No files uploaded"
       
        def get_stats():
            stats = system.get_performance_stats()
            return json.dumps(stats, indent=2)
       
        with gr.Blocks(title="Ollama + LangChain System") as demo:
            gr.Markdown("# 🦙 Ollama + LangChain Advanced System")
           
            with gr.Tab("Chat"):
                chatbot = gr.Chatbot()
                mode = gr.Dropdown(
                    ("Basic Chat", "RAG Chat", "Agent Chat"),
                    value="Basic Chat",
                    label="Chat Mode"
                )
                msg = gr.Textbox(label="Message")
                clear = gr.Button("Clear")
               
                msg.submit(chat_interface, (msg, chatbot, mode), (msg, chatbot))
                clear.click(lambda: ((), ""), outputs=(chatbot, msg))
           
            with gr.Tab("Document Upload"):
                file_upload = gr.File(file_count="multiple", label="Upload Documents")
                upload_btn = gr.Button("Upload to RAG System")
                upload_status = gr.Textbox(label="Status")
               
                upload_btn.click(upload_docs, file_upload, upload_status)
           
            with gr.Tab("Performance"):
                stats_btn = gr.Button("Get Performance Stats")
                stats_output = gr.Textbox(label="Performance Statistics")
               
                stats_btn.click(get_stats, outputs=stats_output)
       
        return demo
       
    except ImportError:
        print("Gradio not installed. Skipping interface creation.")
        return None


if __name__ == "__main__":
    print("🚀 Ollama + LangChain System for Google Colab")
    print("=" * 50)
   
    main()
   
    # Or create a system instance for interactive use
    # config = OllamaConfig(model_name="llama2")
    # system = OllamaLangChainSystem(config)
    # system.setup()
   
    # # Create Gradio interface
    # demo = create_gradio_interface(system)
    # if demo:
    #     demo.launch(share=True)  # share=True for public link

We pack everything in the main function to run a complete demo, configuring the system, testing the cat, agent tools, models and performance statistics. Then, in Create_Gradi_interface (), we create a friendly gradio application with tabs to discuss, downloading documents on the RAG system and monitoring performance. Finally, we call hand () in the __main__ block for the direct execution of the colab, or possibly launch the Gradio user interface for interactive exploration and public sharing.

In conclusion, we have a flexible playground: we change Olllama models, converse with a buffered or summary memory, question our own documents, contact the search in the event of a lack of context and monitor the basic statistics of the resources to stay within the colab limits. The code is modular, allowing us to extend the list of tools, to adjust the inference options (temperature, maximum chips, competition) in OLLAMACONFIG or adapt the pipeline of rags to larger corpus or different models of incorporation. We launch the Gradio application with Share = True to collaborate or integrate these components in our projects. We now have an extensible model for the fast local LLM experimentation.


Discover the Codes. All the merit of this research goes to researchers in this project. Subscribe now to our newsletter IA


Screen Shot 2021 09 14 at 9.02.24 AM

Asif Razzaq is the CEO of Marktechpost Media Inc .. as a visionary entrepreneur and engineer, AIF undertakes to exploit the potential of artificial intelligence for social good. His most recent company is the launch of an artificial intelligence media platform, Marktechpost, which stands out from its in-depth coverage of automatic learning and in-depth learning news which are both technically solid and easily understandable by a large audience. The platform has more than 2 million monthly views, illustrating its popularity with the public.

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.