Build a modern asynchronous configuration management system with type security and hot recharging

by Brenden Burgess

When you buy through links on our site, we may earn a commission at no extra cost to you. However, this does not influence our evaluations.

In this tutorial, we guide you through the design and functionality of AsyncconfigA modern and first front configuration management library for Python. We build it from zero to support powerful functionalities, including configuration loading based on type data places, multiple configuration sources (such as environmental variables, files and dictionaries) and hot recharge using Watchdog. With a clean API and strong validation capacities, Asyncconfig is ideal for development and production environments. Throughout this tutorial, we demonstrate its capacities by using simple, advanced and validation use cases, all fed by Asyncio to support non -blocking workflows.

import asyncio
import json
import os
import yaml
from pathlib import Path
from typing import Any, Dict, Optional, Type, TypeVar, Union, get_type_hints
from dataclasses import dataclass, field, MISSING
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging


__version__ = "0.1.0"
__author__ = "AsyncConfig Team"


T = TypeVar('T')


logger = logging.getLogger(__name__)

We start by importing essential python modules required for our configuration system. These include Asyncio for Asynchronous, Yaml and JSON operations for file analysis, data classes for structured configuration and watchdog for hot recharging. We also define certain metadata and create a recorder to follow events through the system.

class ConfigError(Exception):
    """Base exception for configuration errors."""
    pass




class ValidationError(ConfigError):
    """Raised when configuration validation fails."""
    pass




class LoadError(ConfigError):
    """Raised when configuration loading fails."""
    pass




@dataclass
class ConfigSource:
    """Represents a configuration source with priority and reload capabilities."""
    path: Optional(Path) = None
    env_prefix: Optional(str) = None
    data: Optional(Dict(str, Any)) = None
    priority: int = 0
    watch: bool = False
   
    def __post_init__(self):
        if self.path:
            self.path = Path(self.path)

We define a hierarchy of personalized exceptions to manage different errors related to configuration, with Configerror as a base and more specific class, such as Validationerror and Loaderror, for targeted troubleshooting. We also create a configsource data class to represent a single configuration source, which can be a file, environmental variables or a dictionary, and include support for hierarchy and optional hot recharge.

class ConfigWatcher(FileSystemEventHandler):
    """File system event handler for configuration hot reloading."""
   
    def __init__(self, config_manager, paths: list(Path)):
        self.config_manager = config_manager
        self.paths = {str(p.resolve()) for p in paths}
        super().__init__()
   
    def on_modified(self, event):
        if not event.is_directory and event.src_path in self.paths:
            logger.info(f"Configuration file changed: {event.src_path}")
            asyncio.create_task(self.config_manager._reload_config())

We create the Configwatcher class by extending Filesystemeventhandler to activate the hot recharging of the configuration files. This class monitors the specified file paths and triggers asynchronous configuration recharging via the associated manager each time a file is modified. This ensures that our application can adapt to configuration changes in real time without the need for restart.

class AsyncConfigManager:
    """
    Modern async configuration manager with type safety and hot reloading.
   
    Features:
    - Async-first design
    - Type-safe configuration classes
    - Environment variable support
    - Hot reloading
    - Multiple source merging
    - Validation with detailed error messages
    """
   
    def __init__(self):
        self.sources: list(ConfigSource) = ()
        self.observers: list(Observer) = ()
        self.config_cache: Dict(str, Any) = {}
        self.reload_callbacks: list(callable) = ()
        self._lock = asyncio.Lock()
   
    def add_source(self, source: ConfigSource) -> "AsyncConfigManager":
        """Add a configuration source."""
        self.sources.append(source)
        self.sources.sort(key=lambda x: x.priority, reverse=True)
        return self
   
    def add_file(self, path: Union(str, Path), priority: int = 0, watch: bool = False) -> "AsyncConfigManager":
        """Add a file-based configuration source."""
        return self.add_source(ConfigSource(path=path, priority=priority, watch=watch))
   
    def add_env(self, prefix: str, priority: int = 100) -> "AsyncConfigManager":
        """Add environment variable source."""
        return self.add_source(ConfigSource(env_prefix=prefix, priority=priority))
   
    def add_dict(self, data: Dict(str, Any), priority: int = 50) -> "AsyncConfigManager":
        """Add dictionary-based configuration source."""
        return self.add_source(ConfigSource(data=data, priority=priority))
   
    async def load_config(self, config_class: Type(T)) -> T:
        """Load and validate configuration into a typed dataclass."""
        async with self._lock:
            config_data = await self._merge_sources()
           
            try:
                return self._validate_and_convert(config_data, config_class)
            except Exception as e:
                raise ValidationError(f"Failed to validate configuration: {e}")
   
    async def _merge_sources(self) -> Dict(str, Any):
        """Merge configuration from all sources based on priority."""
        merged = {}
       
        for source in reversed(self.sources):  
            try:
                data = await self._load_source(source)
                if data:
                    merged.update(data)
            except Exception as e:
                logger.warning(f"Failed to load source {source}: {e}")
       
        return merged
   
    async def _load_source(self, source: ConfigSource) -> Optional(Dict(str, Any)):
        """Load data from a single configuration source."""
        if source.data:
            return source.data.copy()
       
        if source.path:
            return await self._load_file(source.path)
       
        if source.env_prefix:
            return self._load_env_vars(source.env_prefix)
       
        return None
   
    async def _load_file(self, path: Path) -> Dict(str, Any):
        """Load configuration from a file."""
        if not path.exists():
            raise LoadError(f"Configuration file not found: {path}")
       
        try:
            content = await asyncio.to_thread(path.read_text)
           
            if path.suffix.lower() == '.json':
                return json.loads(content)
            elif path.suffix.lower() in ('.yml', '.yaml'):
                return yaml.safe_load(content) or {}
            else:
                raise LoadError(f"Unsupported file format: {path.suffix}")
       
        except Exception as e:
            raise LoadError(f"Failed to load {path}: {e}")
   
    def _load_env_vars(self, prefix: str) -> Dict(str, Any):
        """Load environment variables with given prefix."""
        env_vars = {}
        prefix = prefix.upper() + '_'
       
        for key, value in os.environ.items():
            if key.startswith(prefix):
                config_key = key(len(prefix):).lower()
                env_vars(config_key) = self._convert_env_value(value)
       
        return env_vars
   
    def _convert_env_value(self, value: str) -> Any:
        """Convert environment variable string to appropriate type."""
        if value.lower() in ('true', 'false'):
            return value.lower() == 'true'
       
        try:
            if '.' in value:
                return float(value)
            return int(value)
        except ValueError:
            pass
       
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            pass
       
        return value
   
    def _validate_and_convert(self, data: Dict(str, Any), config_class: Type(T)) -> T:
        """Validate and convert data to the specified configuration class."""
        if not hasattr(config_class, '__dataclass_fields__'):
            raise ValidationError(f"{config_class.__name__} must be a dataclass")
       
        type_hints = get_type_hints(config_class)
        field_values = {}
       
        for field_name, field_info in config_class.__dataclass_fields__.items():
            if field_name in data:
                field_value = data(field_name)
               
                if hasattr(field_info.type, '__dataclass_fields__'):
                    if isinstance(field_value, dict):
                        field_value = self._validate_and_convert(field_value, field_info.type)
               
                field_values(field_name) = field_value
            elif field_info.default is not MISSING:
                field_values(field_name) = field_info.default
            elif field_info.default_factory is not MISSING:
                field_values(field_name) = field_info.default_factory()
            else:
                raise ValidationError(f"Required field '{field_name}' not found in configuration")
       
        return config_class(**field_values)
   
    async def start_watching(self):
        """Start watching configuration files for changes."""
        watch_paths = ()
       
        for source in self.sources:
            if source.watch and source.path:
                watch_paths.append(source.path)
       
        if watch_paths:
            observer = Observer()
            watcher = ConfigWatcher(self, watch_paths)
           
            for path in watch_paths:
                observer.schedule(watcher, str(path.parent), recursive=False)
           
            observer.start()
            self.observers.append(observer)
            logger.info(f"Started watching {len(watch_paths)} configuration files")
   
    async def stop_watching(self):
        """Stop watching configuration files."""
        for observer in self.observers:
            observer.stop()
            observer.join()
        self.observers.clear()
   
    async def _reload_config(self):
        """Reload configuration from all sources."""
        try:
            self.config_cache.clear()
            for callback in self.reload_callbacks:
                await callback()
            logger.info("Configuration reloaded successfully")
        except Exception as e:
            logger.error(f"Failed to reload configuration: {e}")
   
    def on_reload(self, callback: callable):
        """Register a callback to be called when configuration is reloaded."""
        self.reload_callbacks.append(callback)
   
    async def __aenter__(self):
        await self.start_watching()
        return self
   
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.stop_watching()

We now implement the heart of our system via the AsyncconfigManager class. It acts as the central controller for all configuration operations, adding sources (files, environmental variables, dictionaries), merging them by priority, loading files asynchronously and validating against the classes of typed data. We manufacture the asynchronous-première design, allowing non-blocking IS / S include a locking mechanism to ensure safe simultaneous access. In addition, we allow you to recharge hot by looking at specified configuration files and by triggering reminders each time a modification is detected. This configuration provides a flexible, robust and modern base to dynamically manage applications configurations.

async def load_config(config_class: Type(T),
                     config_file: Optional(Union(str, Path)) = None,
                     env_prefix: Optional(str) = None,
                     watch: bool = False) -> T:
    """
    Convenience function to quickly load configuration.
   
    Args:
        config_class: Dataclass to load configuration into
        config_file: Optional configuration file path
        env_prefix: Optional environment variable prefix
        watch: Whether to watch for file changes
   
    Returns:
        Configured instance of config_class
    """
    manager = AsyncConfigManager()
   
    if config_file:
        manager.add_file(config_file, priority=0, watch=watch)
   
    if env_prefix:
        manager.add_env(env_prefix, priority=100)
   
    return await manager.load_config(config_class)

We add a practical assistance function, LOAD_CONFIG, to rationalize the configuration configuration process. With a single call, we can load parameters from a file, environmental variables or both in a typed data class, possibly allowing hot recharge. This utility makes the library adapted to beginners while supporting advanced use cases under the hood.

1500X500
@dataclass
class DatabaseConfig:
    """Example database configuration."""
    host: str = "localhost"
    port: int = 5432
    username: str = "admin"
    password: str = ""
    database: str = "myapp"
    ssl_enabled: bool = False
    pool_size: int = 10




@dataclass
class AppConfig:
    """Example application configuration."""
    debug: bool = False
    log_level: str = "INFO"
    secret_key: str = ""
    database: DatabaseConfig = field(default_factory=DatabaseConfig)
    redis_url: str = "redis://localhost:6379"
    max_workers: int = 4




async def demo_simple_config():
    """Demo simple configuration loading."""
   
    sample_config = {
        "debug": True,
        "log_level": "DEBUG",
        "secret_key": "dev-secret-key",
        "database": {
            "host": "localhost",
            "port": 5432,
            "username": "testuser",
            "password": "testpass",
            "database": "testdb"
        },
        "max_workers": 8
    }
   
    manager = AsyncConfigManager()
    manager.add_dict(sample_config, priority=0)
   
    config = await manager.load_config(AppConfig)
   
    print("=== Simple Configuration Demo ===")
    print(f"Debug mode: {config.debug}")
    print(f"Log level: {config.log_level}")
    print(f"Database host: {config.database.host}")
    print(f"Database port: {config.database.port}")
    print(f"Max workers: {config.max_workers}")
   
    return config

We define two examples of configuration classes: Databaseconfig and AppConfig, which show how the nested and typed configurations are structured. To demonstrate real use, we write dem_SIMPLE_CONFIG (), where we load a basic dictionary in our configuration manager. This illustrates to what extent we can effortlessly map the structured data in Python -type objects, which makes manipulation of the configuration clean, readable and holdable.

async def demo_advanced_config():
    """Demo advanced configuration with multiple sources."""
   
    base_config = {
        "debug": False,
        "log_level": "INFO",
        "secret_key": "production-secret",
        "max_workers": 4
    }
   
    override_config = {
        "debug": True,
        "log_level": "DEBUG",
        "database": {
            "host": "dev-db.example.com",
            "port": 5433
        }
    }
   
    env_config = {
        "secret_key": "env-secret-key",
        "redis_url": "redis://prod-redis:6379"
    }
   
    print("\n=== Advanced Configuration Demo ===")
   
    manager = AsyncConfigManager()
   
    manager.add_dict(base_config, priority=0)    
    manager.add_dict(override_config, priority=50)  
    manager.add_dict(env_config, priority=100)    
   
    config = await manager.load_config(AppConfig)
   
    print("Configuration sources merged:")
    print(f"Debug mode: {config.debug} (from override)")
    print(f"Log level: {config.log_level} (from override)")
    print(f"Secret key: {config.secret_key} (from env)")
    print(f"Database host: {config.database.host} (from override)")
    print(f"Redis URL: {config.redis_url} (from env)")
   
    return config




async def demo_validation():
    """Demo configuration validation."""
   
    print("\n=== Configuration Validation Demo ===")
   
    valid_config = {
        "debug": True,
        "log_level": "DEBUG",
        "secret_key": "test-key",
        "database": {
            "host": "localhost",
            "port": 5432
        }
    }
   
    manager = AsyncConfigManager()
    manager.add_dict(valid_config, priority=0)
   
    try:
        config = await manager.load_config(AppConfig)
        print("✓ Valid configuration loaded successfully")
        print(f"  Database SSL: {config.database.ssl_enabled} (default value)")
        print(f"  Database pool size: {config.database.pool_size} (default value)")
    except ValidationError as e:
        print(f"✗ Validation error: {e}")
   
    incomplete_config = {
        "debug": True,
        "log_level": "DEBUG"
    }
   
    manager2 = AsyncConfigManager()
    manager2.add_dict(incomplete_config, priority=0)
   
    try:
        config2 = await manager2.load_config(AppConfig)
        print("✓ Configuration with defaults loaded successfully")
        print(f"  Secret key: '{config2.secret_key}' (default empty string)")
    except ValidationError as e:
        print(f"✗ Validation error: {e}")

We show advanced features of our configuration system through two examples. In demo_advanced_config (), we demonstrate how several configuration sources, base, replacement and environment, are merged according to their priority, with higher sources of priority with priority. This highlights the flexibility in the management of specific environmental replacements. In Demo_Validation (), we validate the complete and partial configurations. The system automatically fills the missing fields with default values when possible. He launches clear validations in the event of breach of the fields, guaranteeing the security of the type and the robust management of configuration in the applications of the real world.

async def run_demos():
    """Run all demonstration functions."""
    try:
        await demo_simple_config()
        await demo_advanced_config()
        await demo_validation()
        print("\n=== All demos completed successfully! ===")
    except Exception as e:
        print(f"Demo error: {e}")
        import traceback
        traceback.print_exc()






await run_demos()


if __name__ == "__main__":
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            print("Running in Jupyter/IPython environment")
            print("Use: await run_demos()")
        else:
            asyncio.run(run_demos())
    except RuntimeError:
        asyncio.run(run_demos())

We conclude the tutorial with run_demos (), a utility that sequentially performs all demonstration functions, covering simple loading, multi-sources merger and validation. To take care of the standard Jupyter and Python environments, we include conditional logic to execute demos appropriately. This ensures that our configuration system is easy to test, present and integrate into a variety of workflows as soon as the box is released.

In conclusion, we successfully demonstrate how Asyncconfig provides a robust and extensible base to manage the configuration in modern Python applications. We see how easy it is to merge several sources, to validate the configurations against the typed diagrams and to respond to live file changes in real time. Whether we build microservices, asynchronized backends or CLI tools, this library offers a flexible and user -friendly way to manage the configuration safely and effectively.


Discover the Complete codes. All the merit of this research goes to researchers in this project.

Sponsorship opportunity: Reach the most influential AI developers in the United States and Europe. 1M + monthly players, 500K + community manufacturers, endless possibilities. (Explore sponsorship)


Screen Shot 2021 09 14 at 9.02.24 AM

Asif Razzaq is the CEO of Marktechpost Media Inc .. as a visionary entrepreneur and engineer, AIF undertakes to exploit the potential of artificial intelligence for social good. His most recent company is the launch of an artificial intelligence media platform, Marktechpost, which stands out from its in-depth coverage of automatic learning and in-depth learning news which are both technically solid and easily understandable by a large audience. The platform has more than 2 million monthly views, illustrating its popularity with the public.

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.