domain_kernel_guide
Domain Kernel Development Guide
Overview
This guide explains how to build a new domain kernel for the AIOSx Kernel Mesh using the template structure.
Architecture Layers
Every domain kernel follows this structure:
Perception → Reasoning → Risk → Execution → Telemetry → Self-Healing → AKO
1. Perception Layer
Purpose: Data ingestion and normalization
Responsibilities:
- Ingest data from external sources (APIs, databases, streams)
- Normalize data into standard format
- Validate data quality
- Emit perception events
Example:
pythonclass PerceptionLayer:async def ingest(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]:# Normalize datanormalized = self._normalize(data)# Validateif not self._validate(normalized):raise ValueError("Data validation failed")return normalized
Domain Examples:
- Trading: Market data ingestion (prices, volumes, order books)
- DeFi-FX: On-chain data (pool reserves, prices, liquidity)
- LLM: Prompt ingestion, context building
- VoiceOS: Audio stream ingestion (RTP/WebRTC)
- Security: Log ingestion, event normalization
2. Reasoning Layer
Purpose: Decision making and strategy
Responsibilities:
- Analyze perception data
- Generate decisions/strategies
- Score options
- Select best action
Example:
pythonclass ReasoningLayer:async def reason(self, context: Dict[str, Any]) -> Dict[str, Any]:# Analyze contextanalysis = await self._analyze(context)# Generate optionsoptions = await self._generate_options(analysis)# Score optionsscored = await self._score_options(options, context)# Select bestreturn self._select_best(scored)
Domain Examples:
- Trading: Strategy execution (trend following, mean reversion)
- DeFi-FX: Strategy selection (carry, basis, arbitrage)
- LLM: Model selection, prompt optimization
- VoiceOS: Dialog management, intent recognition
- Security: Threat detection, risk scoring
3. Risk Layer
Purpose: Risk assessment and mitigation
Responsibilities:
- Assess risk of proposed actions
- Apply risk limits
- Mitigate risks
- Block high-risk actions
Example:
pythonclass RiskLayer:async def assess_risk(self,action: Dict[str, Any],context: Dict[str, Any],) -> Dict[str, Any]:risk_score = await self._calculate_risk_score(action, context)within_limits = self._check_limits(risk_score)mitigation = Noneif not within_limits:mitigation = await self._generate_mitigation(action, risk_score)return {"risk_score": risk_score,"approved": within_limits or mitigation is not None,}
Domain Examples:
- Trading: Position sizing, drawdown limits, risk-adjusted returns
- DeFi-FX: Slippage caps, liquidity checks, protocol risk scoring
- LLM: Content safety, token limits, cost controls
- VoiceOS: Call quality thresholds, provider reliability
- Security: Threat severity, false positive rates
4. Execution Layer
Purpose: Action execution
Responsibilities:
- Execute approved actions
- Handle execution errors
- Track execution results
- Emit execution events
Example:
pythonclass ExecutionLayer:async def execute(self,action: Dict[str, Any],context: Dict[str, Any],) -> Dict[str, Any]:try:result = await self._execute_action(action, context)self._record_execution(action, result, success=True)return resultexcept Exception as e:self._record_execution(action, {"error": str(e)}, success=False)raise
Domain Examples:
- Trading: Order execution, position management
- DeFi-FX: Multi-hop swaps, liquidity provision
- LLM: Inference execution, streaming responses
- VoiceOS: TTS generation, call routing
- Security: Threat response, isolation actions
5. Telemetry Layer
Purpose: Metrics and observability
Responsibilities:
- Emit metrics to metrics bus
- Track performance indicators
- Log events
- Export observability data
Example:
pythonclass TelemetryLayer:async def emit_metric(self,metric_name: str,value: float,tags: Optional[Dict[str, str]] = None,) -> None:# Emit to metrics buspass
Domain Examples:
- Trading: PnL, win rate, drawdown
- DeFi-FX: Yield, slippage, gas costs
- LLM: Latency, cost per request, quality score
- VoiceOS: Call resolution rate, jitter, packet loss
- Security: Detection rate, false positive rate
Implementation Steps
Step 1: Create Kernel Class
pythonfrom aiosx.kernel.core.base_kernel import BaseKernelclass MyDomainKernel(BaseKernel):def __init__(self, kernel_id: str = "my_kernel_1"):super().__init__(kernel_id, domain="my-domain")# Initialize layersself.perception = MyPerceptionLayer(kernel_id)self.reasoning = MyReasoningLayer(kernel_id)self.risk = MyRiskLayer(kernel_id)self.execution = MyExecutionLayer(kernel_id)self.telemetry = MyTelemetryLayer(kernel_id, metrics_bus)
Step 2: Implement Lifecycle Hooks
pythonasync def initialize(self) -> None:"""Initialize kernel resources"""# Connect to data sources# Load configurationpassasync def start(self) -> None:"""Start kernel operations"""await super().start()# Start background tasksasync def stop(self) -> None:"""Stop kernel operations"""await super().stop()# Cleanup resourcesasync def pause(self) -> None:"""Pause kernel operations"""await super().pause()# Pause processingasync def resume(self) -> None:"""Resume kernel operations"""await super().resume()# Resume processingasync def enter_safe_mode(self) -> None:"""Enter safe mode"""await super().enter_safe_mode()# Disable risky operationsasync def exit_safe_mode(self) -> None:"""Exit safe mode"""await super().exit_safe_mode()# Re-enable operations
Step 3: Register Health Probes
pythondef _register_health_probes(self) -> None:"""Register domain-specific health probes"""from aiosx.health.probes.probe_interface import HealthProbe# Create custom probeprobe = MyDomainProbe(self.kernel_id)self.register_probe(probe)
Step 4: Implement Domain-Specific Health
pythonasync def get_domain_specific_health(self) -> Dict[str, Any]:"""Get domain-specific health metrics"""return {"perception_status": "healthy","reasoning_status": "healthy","risk_status": "healthy","execution_status": "healthy","custom_metric": 42.0,}
Step 5: Define SLOs
Add domain-specific SLOs to aiosx/ako/slo_monitor.py:
pythonself.add_slo(SLODefinition(domain="my-domain",name="max_processing_latency",metric="processing_latency_p95",threshold=1000.0, # 1swindow_minutes=60,severity="critical",))
Step 6: Add Business KPIs
Add domain-specific KPIs to aiosx/business/kpis.py:
python"my-domain": {"processing_success_rate": {"unit": "%", "aggregation": "avg"},"custom_kpi": {"unit": "count", "aggregation": "sum"},}
Step 7: Create Workflow Templates
Create workflow templates in aiosx/workflows/:
pythonfrom aiosx.workflows.workflow_engine import WorkflowDefinition, WorkflowStepdef create_my_domain_workflow() -> WorkflowDefinition:return WorkflowDefinition(name="My Domain Workflow",steps=[WorkflowStep(step_id="step1",kernel="my-domain",operation="process",inputs={"data": "${input_data}"},outputs=["result"],),],)
Testing
Unit Tests
pythonimport pytestfrom aiosx.kernel.domains.my_domain.my_domain_kernel import MyDomainKernel@pytest.mark.asyncioasync def test_kernel_initialization():kernel = MyDomainKernel("test_kernel")await kernel.initialize()assert kernel.kernel_id == "test_kernel"
Integration Tests
python@pytest.mark.asyncioasync def test_kernel_workflow():kernel = MyDomainKernel("test_kernel")await kernel.start()result = await kernel.process({"input": "data"})assert result["status"] == "success"await kernel.stop()
Best Practices
- Async/Await: Use async/await throughout for non-blocking operations
- Error Handling: Implement comprehensive error handling with retries
- Health Probes: Register probes for all critical components
- Metrics: Emit metrics for all important operations
- SLOs: Define SLOs for latency, error rate, availability
- Documentation: Document all public methods and classes
- Type Hints: Use type hints for better code clarity
- Testing: Write unit and integration tests
Example: Healthcare Domain Kernel
pythonclass HealthcareKernel(BaseKernel):def __init__(self, kernel_id: str = "healthcare_kernel_1"):super().__init__(kernel_id, domain="healthcare")# Perception: Lab results, patient dataself.perception = HealthcarePerceptionLayer(kernel_id)# Reasoning: Clinical decision modelsself.reasoning = ClinicalReasoningLayer(kernel_id)# Risk: Patient safety, medication interactionsself.risk = PatientSafetyRiskLayer(kernel_id)# Execution: Order set executionself.execution = OrderSetExecutionLayer(kernel_id)# Telemetry: Patient outcomes, quality metricsself.telemetry = HealthcareTelemetryLayer(kernel_id, metrics_bus)
Mapping:
- MarketSnapshot → LabResult
- Strategy → ClinicalDecisionModel
- Execution → OrderSetExecution
- VenueConnector → HealthcareProvider
Example: Logistics Domain Kernel
pythonclass LogisticsKernel(BaseKernel):def __init__(self, kernel_id: str = "logistics_kernel_1"):super().__init__(kernel_id, domain="logistics")# Perception: Shipment data, location trackingself.perception = LogisticsPerceptionLayer(kernel_id)# Reasoning: Routing heuristics, optimizationself.reasoning = RoutingReasoningLayer(kernel_id)# Risk: Delivery delays, cost overrunsself.risk = LogisticsRiskLayer(kernel_id)# Execution: Transport provider executionself.execution = TransportExecutionLayer(kernel_id)# Telemetry: On-time delivery, cost metricsself.telemetry = LogisticsTelemetryLayer(kernel_id, metrics_bus)
Mapping:
- Order → Shipment
- VenueConnector → TransportProvider
- Strategy → RoutingHeuristic
Next Steps
- Review existing domain kernels (Trading, DeFi-FX, LLM) for reference
- Use the template (
aiosx/kernel/domains/template/domain_kernel_template.py) - Implement each layer incrementally
- Add health probes and metrics
- Define SLOs and KPIs
- Create workflow templates
- Write tests
- Document your implementation
VoiceOS Kernel Example
Implementation Overview
The VoiceOS Kernel demonstrates a complete implementation with:
- CallSessionManager: Manages call lifecycle with state transitions
- AudioStreamHandler: Handles RTP/WebRTC audio with jitter buffering
- Provider Abstractions: STT/TTS/NLU with fallback support
- DialogOrchestrator: Integrates with LLM Kernel for reasoning
- Health Probes: Jitter, packet loss, latency, dropped calls
Key Files
aiosx/kernel/domains/voiceos/call_session.pyaiosx/kernel/domains/voiceos/providers.pyaiosx/kernel/domains/voiceos/audio_handler.pyaiosx/kernel/domains/voiceos/dialog_orchestrator.pyaiosx/kernel/domains/voiceos/voiceos_kernel.py
SentinelX Kernel Example
Implementation Overview
The SentinelX Security Kernel demonstrates:
- SecurityEventIngestor: Normalizes events from all sources
- DetectionEngine: Rule-based and ML-based threat detection
- ResponseEngine: Executes security response actions
- Integration: Works with KernelRegistry, TenantManager, MeshCircuitBreaker
Key Files
aiosx/kernel/domains/security/event_ingestion.pyaiosx/kernel/domains/security/detection_engine.pyaiosx/kernel/domains/security/response_engine.pyaiosx/kernel/domains/security/sentinelx_kernel.py
Security Integration Points
- Event Ingestion: All kernels emit security events
- Threat Detection: Rules and ML detect threats
- Response Actions: Automatic isolation, throttling, blocking
- AKO Integration: Security events influence optimization decisions
- Self-Healing: Security-driven recovery policies