In this tutorial, we show how to build a pipeline of automated knowledge graphics (KG) using Langgraph and Networkx. The pipeline simulates a sequence of intelligent agents who perform tasks in collaboration such as data collection, entities extraction, identification of relations, the resolution of entities and the validation of graphics. From a subject provided by the user, such as “artificial intelligence”, the system methodically extracts entities and relevant relationships, solves duplicates and incorporates information into a coherent graphic structure. By viewing the graph of final knowledge, developers and data scientists acquire clear information on complex interrelations between concepts, which makes this approach very beneficial for applications in semantic analysis, natural language treatment and knowledge management.
!pip install langgraph langchain_core
We install two essential python libraries: Langgraph, which is used to create and orchestrate the calculation workflows based on agents, and Langchain Core, which provides fundamental classes and public services to create applications powered by the language model. These libraries allow the transparent integration of agents into intelligent data pipelines.
import re
import networkx as nx
import matplotlib.pyplot as plt
from typing import TypedDict, List, Tuple, Dict, Any
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END
We import essential libraries to create a pipeline of automated knowledge graphics. It includes re for word processing based on regular expression, networks and matplotlib for the creation and visualization of graphics, typing and entry of annotations for structured manipulation of data, and Langgraph with Langchain_Core to orchestrate the interaction between AI agents in the workflow.
class KGState(TypedDict):
topic: str
raw_text: str
entities: List(str)
relations: List(Tuple(str, str, str))
resolved_relations: List(Tuple(str, str, str))
graph: Any
validation: Dict(str, Any)
messages: List(Any)
current_agent: str
We define a structured, kgstate data type, using Python type. He describes the state management scheme through different stages of the pipeline of knowledge graphics. It includes details such as the subject chosen, the text gathered, the entities and the relationships identified, the duplicates resolved, the graphic object built, the validation results, interaction messages and monitoring of the agent currently active.
def data_gatherer(state: KGState) -> KGState:
topic = state("topic")
print(f"📚 Data Gatherer: Searching for information about '{topic}'")
collected_text = f"{topic} is an important concept. It relates to various entities like EntityA, EntityB, and EntityC. EntityA influences EntityB. EntityC is a type of EntityB."
state("messages").append(AIMessage(content=f"Collected raw text about {topic}"))
state("raw_text") = collected_text
state("current_agent") = "entity_extractor"
return state
This function, Data_Gatherer, acts as the first stage of the pipeline. He simulates the collection of raw text data on a subject provided (stored in the state (“subject”)). He then stores this data simulated in the state (“RAW_TEXT”), adds a message indicating the completion of data collection and updates the condition of the pipeline by defining the following agent (Entity_ extractor) as an asset.
def entity_extractor(state: KGState) -> KGState:
print("🔍 Entity Extractor: Identifying entities in the text")
text = state("raw_text")
entities = re.findall(r"Entity(A-Z)", text)
entities = (state("topic")) + entities
state("entities") = list(set(entities))
state("messages").append(AIMessage(content=f"Extracted entities: {state('entities')}"))
print(f" Found entities: {state('entities')}")
state("current_agent") = "relation_extractor"
return state
The entity_ extractor function identifies the entities of the raw text collected using a single regular expression model which corresponds to terms like “ENTITYA”, “ENTITYB”, etc. It also includes the main subject as an entity and ensures uniqueness by converting the list into a set. The extracted entities are stored in the condition, a message has recorded the result and the pipeline advances to the Agent relation_ extractor.
def relation_extractor(state: KGState) -> KGState:
print("🔗 Relation Extractor: Identifying relationships between entities")
text = state("raw_text")
entities = state("entities")
relations = ()
relation_patterns = (
(r"((A-Za-z)+) relates to ((A-Za-z)+)", "relates_to"),
(r"((A-Za-z)+) influences ((A-Za-z)+)", "influences"),
(r"((A-Za-z)+) is a type of ((A-Za-z)+)", "is_type_of")
)
for e1 in entities:
for e2 in entities:
if e1 != e2:
for pattern, rel_type in relation_patterns:
if re.search(f"{e1}.*{rel_type}.*{e2}", text.replace("_", " "), re.IGNORECASE) or \
re.search(f"{e1}.*{e2}", text, re.IGNORECASE):
relations.append((e1, rel_type, e2))
state("relations") = relations
state("messages").append(AIMessage(content=f"Extracted relations: {relations}"))
print(f" Found relations: {relations}")
state("current_agent") = "entity_resolver"
return state
The relationship_ extractor function detects semantic relationships between entities in the raw text. He uses predefined Regex models to identify sentences such as “influences” or “is a type of” between entities. When a correspondence is found, it adds the corresponding triple relationship (subject, predicate, object) to the list of relationships. These extracted relationships are stored in the state, a message is recorded for the communications of the agent and the control goes to the following agent: Entity_resolver.
def entity_resolver(state: KGState) -> KGState:
print("🔄 Entity Resolver: Resolving duplicate entities")
entity_map = {}
for entity in state("entities"):
canonical_name = entity.lower().replace(" ", "_")
entity_map(entity) = canonical_name
resolved_relations = ()
for s, p, o in state("relations"):
s_resolved = entity_map.get(s, s)
o_resolved = entity_map.get(o, o)
resolved_relations.append((s_resolved, p, o_resolved))
state("resolved_relations") = resolved_relations
state("messages").append(AIMessage(content=f"Resolved relations: {resolved_relations}"))
state("current_agent") = "graph_integrator"
return state
The Entity_Resolver function standardizes entities to avoid duplication and inconsistencies. It creates a mapping (entity_MAP) by converting each entity into lower case and replacing spaces by underlining traits. Then, this cartography is applied to all subjects and objects in relationships extracted to produce resolved relationships. These standardized triplets are added to the state, a confirmation message is recorded and the control is transmitted to the graphic agent.
def graph_integrator(state: KGState) -> KGState:
print("📊 Graph Integrator: Building the knowledge graph")
G = nx.DiGraph()
for s, p, o in state("resolved_relations"):
if not G.has_node(s):
G.add_node(s)
if not G.has_node(o):
G.add_node(o)
G.add_edge(s, o, relation=p)
state("graph") = G
state("messages").append(AIMessage(content=f"Built graph with {len(G.nodes)} nodes and {len(G.edges)} edges"))
state("current_agent") = "graph_validator"
return state
The Graph_inteRtering function builds the real knowledge graph using Networkx.digraph () supports directed relationships. He itere on the triple resolved (subject, predicate, object), guarantees that the two nodes exist, then adds an edge directed with the relationship as metadata. The resulting graph is recorded in the condition, a summary message is annexed and the pipeline is transformed into the Graph_Validator agent for final validation.
def graph_validator(state: KGState) -> KGState:
print("✅ Graph Validator: Validating knowledge graph")
G = state("graph")
validation_report = {
"num_nodes": len(G.nodes),
"num_edges": len(G.edges),
"is_connected": nx.is_weakly_connected(G) if G.nodes else False,
"has_cycles": not nx.is_directed_acyclic_graph(G) if G.nodes else False
}
state("validation") = validation_report
state("messages").append(AIMessage(content=f"Validation report: {validation_report}"))
print(f" Validation report: {validation_report}")
state("current_agent") = END
return state
The Graph_Validator function performs a basic check of health on the built knowledge graph. It compiles a validation ratio containing the number of nodes and edges, if the graph is weakly connected (that is to say that each node is accessible if the direction is ignored), and if the graph contains cycles. This report is added to the state and recorded as a message from AI. Once the validation is completed, the pipeline is marked as finished by defining the current_agent to finish.
def router(state: KGState) -> str:
return state("current_agent")
def visualize_graph(graph):
plt.figure(figsize=(10, 6))
pos = nx.spring_layout(graph)
nx.draw(graph, pos, with_labels=True, node_color="skyblue", node_size=1500, font_size=10)
edge_labels = nx.get_edge_attributes(graph, 'relation')
nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)
plt.title("Knowledge Graph")
plt.tight_layout()
plt.show()
The function of the router directs the pipeline to the following agent according to the current field_ing in the state. Meanwhile, the Visualize_Graph function uses Matplotlib and Networkx to display the final knowledge graphic, showing the nodes, edges and labeled relationships for an intuitive visual understanding.
def build_kg_graph():
workflow = StateGraph(KGState)
workflow.add_node("data_gatherer", data_gatherer)
workflow.add_node("entity_extractor", entity_extractor)
workflow.add_node("relation_extractor", relation_extractor)
workflow.add_node("entity_resolver", entity_resolver)
workflow.add_node("graph_integrator", graph_integrator)
workflow.add_node("graph_validator", graph_validator)
workflow.add_conditional_edges("data_gatherer", router,
{"entity_extractor": "entity_extractor"})
workflow.add_conditional_edges("entity_extractor", router,
{"relation_extractor": "relation_extractor"})
workflow.add_conditional_edges("relation_extractor", router,
{"entity_resolver": "entity_resolver"})
workflow.add_conditional_edges("entity_resolver", router,
{"graph_integrator": "graph_integrator"})
workflow.add_conditional_edges("graph_integrator", router,
{"graph_validator": "graph_validator"})
workflow.add_conditional_edges("graph_validator", router,
{END: END})
workflow.set_entry_point("data_gatherer")
return workflow.compile()
The Build_Kg_Graph function defines the full workflow of the knowledge graph using Langgraph. He sequentially adds each agent in the form of a node, from data collection to the validation of the graph, and connects them through conditional transitions based on the current agent. The entry point is defined on Data_Gatherer, and the graph is compiled in an executable workflow which guides the automated pipeline from start to finish.
def run_knowledge_graph_pipeline(topic):
print(f"🚀 Starting knowledge graph pipeline for: {topic}")
initial_state = {
"topic": topic,
"raw_text": "",
"entities": (),
"relations": (),
"resolved_relations": (),
"graph": None,
"validation": {},
"messages": (HumanMessage(content=f"Build a knowledge graph about {topic}")),
"current_agent": "data_gatherer"
}
kg_app = build_kg_graph()
final_state = kg_app.invoke(initial_state)
print(f"✨ Knowledge graph construction complete for: {topic}")
return final_state
The RUN_KNOWLEDGE_GIPELINE function initializes the pipeline by configuring an empty state dictionary with the subject provided. He built the workflow using Build_kg_Graph (), then executes it by invoking the graphic compiled with the initial state. As each agent processes the data, the state evolves and the final result contains the complete, validated and ready -to -use knowledge graph.
if __name__ == "__main__":
topic = "Artificial Intelligence"
result = run_knowledge_graph_pipeline(topic)
visualize_graph(result("graph"))
Finally, this block serves as an entry point of the script. When executed directly, it triggers the pipeline of knowledge graphics for the subject “artificial intelligence”, goes through all the stages of the agents and finally visualizes the graphic resulting using the Visualize_Graph () function. It provides an end -to -end demonstration of the generation of automated knowledge graphics.
In conclusion, we have learned to transparently integrate several agents specializing in a pipeline of cohesive knowledge graphics through this structured approach, taking advantage of Langgraph and Networkx. This workflow automates the processes of extraction of entities and relationships and visualizes complex relationships, providing a clear and usable representation of the information collected. By adjusting and improving individual agents, such as the use of more sophisticated entities recognition methods or integrating real -time data sources, this fundamental framework can be put on scale and personalized for construction tasks of advanced knowledge graphics in various fields.
Discover the Colaab. All the merit of this research goes to researchers in this project. Also, don't hesitate to follow us Twitter And don't forget to join our 90K + ML Subdreddit.
Asif Razzaq is the CEO of Marktechpost Media Inc .. as a visionary entrepreneur and engineer, AIF undertakes to exploit the potential of artificial intelligence for social good. His most recent company is the launch of an artificial intelligence media platform, Marktechpost, which stands out from its in-depth coverage of automatic learning and in-depth learning news which are both technically solid and easily understandable by a large audience. The platform has more than 2 million monthly views, illustrating its popularity with the public.
