DNA
AIOSDNA

domain_kernel_guide

Domain Kernel Development Guide

Overview

This guide explains how to build a new domain kernel for the AIOSx Kernel Mesh using the template structure.

Architecture Layers

Every domain kernel follows this structure:

Perception → Reasoning → Risk → Execution → Telemetry → Self-Healing → AKO

1. Perception Layer

Purpose: Data ingestion and normalization

Responsibilities:

  • Ingest data from external sources (APIs, databases, streams)
  • Normalize data into standard format
  • Validate data quality
  • Emit perception events

Example:

python
class PerceptionLayer:
async def ingest(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]:
# Normalize data
normalized = self._normalize(data)
# Validate
if not self._validate(normalized):
raise ValueError("Data validation failed")
return normalized

Domain Examples:

  • Trading: Market data ingestion (prices, volumes, order books)
  • DeFi-FX: On-chain data (pool reserves, prices, liquidity)
  • LLM: Prompt ingestion, context building
  • VoiceOS: Audio stream ingestion (RTP/WebRTC)
  • Security: Log ingestion, event normalization

2. Reasoning Layer

Purpose: Decision making and strategy

Responsibilities:

  • Analyze perception data
  • Generate decisions/strategies
  • Score options
  • Select best action

Example:

python
class ReasoningLayer:
async def reason(self, context: Dict[str, Any]) -> Dict[str, Any]:
# Analyze context
analysis = await self._analyze(context)
# Generate options
options = await self._generate_options(analysis)
# Score options
scored = await self._score_options(options, context)
# Select best
return self._select_best(scored)

Domain Examples:

  • Trading: Strategy execution (trend following, mean reversion)
  • DeFi-FX: Strategy selection (carry, basis, arbitrage)
  • LLM: Model selection, prompt optimization
  • VoiceOS: Dialog management, intent recognition
  • Security: Threat detection, risk scoring

3. Risk Layer

Purpose: Risk assessment and mitigation

Responsibilities:

  • Assess risk of proposed actions
  • Apply risk limits
  • Mitigate risks
  • Block high-risk actions

Example:

python
class RiskLayer:
async def assess_risk(
self,
action: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
risk_score = await self._calculate_risk_score(action, context)
within_limits = self._check_limits(risk_score)
mitigation = None
if not within_limits:
mitigation = await self._generate_mitigation(action, risk_score)
return {
"risk_score": risk_score,
"approved": within_limits or mitigation is not None,
}

Domain Examples:

  • Trading: Position sizing, drawdown limits, risk-adjusted returns
  • DeFi-FX: Slippage caps, liquidity checks, protocol risk scoring
  • LLM: Content safety, token limits, cost controls
  • VoiceOS: Call quality thresholds, provider reliability
  • Security: Threat severity, false positive rates

4. Execution Layer

Purpose: Action execution

Responsibilities:

  • Execute approved actions
  • Handle execution errors
  • Track execution results
  • Emit execution events

Example:

python
class ExecutionLayer:
async def execute(
self,
action: Dict[str, Any],
context: Dict[str, Any],
) -> Dict[str, Any]:
try:
result = await self._execute_action(action, context)
self._record_execution(action, result, success=True)
return result
except Exception as e:
self._record_execution(action, {"error": str(e)}, success=False)
raise

Domain Examples:

  • Trading: Order execution, position management
  • DeFi-FX: Multi-hop swaps, liquidity provision
  • LLM: Inference execution, streaming responses
  • VoiceOS: TTS generation, call routing
  • Security: Threat response, isolation actions

5. Telemetry Layer

Purpose: Metrics and observability

Responsibilities:

  • Emit metrics to metrics bus
  • Track performance indicators
  • Log events
  • Export observability data

Example:

python
class TelemetryLayer:
async def emit_metric(
self,
metric_name: str,
value: float,
tags: Optional[Dict[str, str]] = None,
) -> None:
# Emit to metrics bus
pass

Domain Examples:

  • Trading: PnL, win rate, drawdown
  • DeFi-FX: Yield, slippage, gas costs
  • LLM: Latency, cost per request, quality score
  • VoiceOS: Call resolution rate, jitter, packet loss
  • Security: Detection rate, false positive rate

Implementation Steps

Step 1: Create Kernel Class

python
from aiosx.kernel.core.base_kernel import BaseKernel
class MyDomainKernel(BaseKernel):
def __init__(self, kernel_id: str = "my_kernel_1"):
super().__init__(kernel_id, domain="my-domain")
# Initialize layers
self.perception = MyPerceptionLayer(kernel_id)
self.reasoning = MyReasoningLayer(kernel_id)
self.risk = MyRiskLayer(kernel_id)
self.execution = MyExecutionLayer(kernel_id)
self.telemetry = MyTelemetryLayer(kernel_id, metrics_bus)

Step 2: Implement Lifecycle Hooks

python
async def initialize(self) -> None:
"""Initialize kernel resources"""
# Connect to data sources
# Load configuration
pass
async def start(self) -> None:
"""Start kernel operations"""
await super().start()
# Start background tasks
async def stop(self) -> None:
"""Stop kernel operations"""
await super().stop()
# Cleanup resources
async def pause(self) -> None:
"""Pause kernel operations"""
await super().pause()
# Pause processing
async def resume(self) -> None:
"""Resume kernel operations"""
await super().resume()
# Resume processing
async def enter_safe_mode(self) -> None:
"""Enter safe mode"""
await super().enter_safe_mode()
# Disable risky operations
async def exit_safe_mode(self) -> None:
"""Exit safe mode"""
await super().exit_safe_mode()
# Re-enable operations

Step 3: Register Health Probes

python
def _register_health_probes(self) -> None:
"""Register domain-specific health probes"""
from aiosx.health.probes.probe_interface import HealthProbe
# Create custom probe
probe = MyDomainProbe(self.kernel_id)
self.register_probe(probe)

Step 4: Implement Domain-Specific Health

python
async def get_domain_specific_health(self) -> Dict[str, Any]:
"""Get domain-specific health metrics"""
return {
"perception_status": "healthy",
"reasoning_status": "healthy",
"risk_status": "healthy",
"execution_status": "healthy",
"custom_metric": 42.0,
}

Step 5: Define SLOs

Add domain-specific SLOs to aiosx/ako/slo_monitor.py:

python
self.add_slo(SLODefinition(
domain="my-domain",
name="max_processing_latency",
metric="processing_latency_p95",
threshold=1000.0, # 1s
window_minutes=60,
severity="critical",
))

Step 6: Add Business KPIs

Add domain-specific KPIs to aiosx/business/kpis.py:

python
"my-domain": {
"processing_success_rate": {"unit": "%", "aggregation": "avg"},
"custom_kpi": {"unit": "count", "aggregation": "sum"},
}

Step 7: Create Workflow Templates

Create workflow templates in aiosx/workflows/:

python
from aiosx.workflows.workflow_engine import WorkflowDefinition, WorkflowStep
def create_my_domain_workflow() -> WorkflowDefinition:
return WorkflowDefinition(
name="My Domain Workflow",
steps=[
WorkflowStep(
step_id="step1",
kernel="my-domain",
operation="process",
inputs={"data": "${input_data}"},
outputs=["result"],
),
],
)

Testing

Unit Tests

python
import pytest
from aiosx.kernel.domains.my_domain.my_domain_kernel import MyDomainKernel
@pytest.mark.asyncio
async def test_kernel_initialization():
kernel = MyDomainKernel("test_kernel")
await kernel.initialize()
assert kernel.kernel_id == "test_kernel"

Integration Tests

python
@pytest.mark.asyncio
async def test_kernel_workflow():
kernel = MyDomainKernel("test_kernel")
await kernel.start()
result = await kernel.process({"input": "data"})
assert result["status"] == "success"
await kernel.stop()

Best Practices

  1. Async/Await: Use async/await throughout for non-blocking operations
  2. Error Handling: Implement comprehensive error handling with retries
  3. Health Probes: Register probes for all critical components
  4. Metrics: Emit metrics for all important operations
  5. SLOs: Define SLOs for latency, error rate, availability
  6. Documentation: Document all public methods and classes
  7. Type Hints: Use type hints for better code clarity
  8. Testing: Write unit and integration tests

Example: Healthcare Domain Kernel

python
class HealthcareKernel(BaseKernel):
def __init__(self, kernel_id: str = "healthcare_kernel_1"):
super().__init__(kernel_id, domain="healthcare")
# Perception: Lab results, patient data
self.perception = HealthcarePerceptionLayer(kernel_id)
# Reasoning: Clinical decision models
self.reasoning = ClinicalReasoningLayer(kernel_id)
# Risk: Patient safety, medication interactions
self.risk = PatientSafetyRiskLayer(kernel_id)
# Execution: Order set execution
self.execution = OrderSetExecutionLayer(kernel_id)
# Telemetry: Patient outcomes, quality metrics
self.telemetry = HealthcareTelemetryLayer(kernel_id, metrics_bus)

Mapping:

  • MarketSnapshot → LabResult
  • Strategy → ClinicalDecisionModel
  • Execution → OrderSetExecution
  • VenueConnector → HealthcareProvider

Example: Logistics Domain Kernel

python
class LogisticsKernel(BaseKernel):
def __init__(self, kernel_id: str = "logistics_kernel_1"):
super().__init__(kernel_id, domain="logistics")
# Perception: Shipment data, location tracking
self.perception = LogisticsPerceptionLayer(kernel_id)
# Reasoning: Routing heuristics, optimization
self.reasoning = RoutingReasoningLayer(kernel_id)
# Risk: Delivery delays, cost overruns
self.risk = LogisticsRiskLayer(kernel_id)
# Execution: Transport provider execution
self.execution = TransportExecutionLayer(kernel_id)
# Telemetry: On-time delivery, cost metrics
self.telemetry = LogisticsTelemetryLayer(kernel_id, metrics_bus)

Mapping:

  • Order → Shipment
  • VenueConnector → TransportProvider
  • Strategy → RoutingHeuristic

Next Steps

  1. Review existing domain kernels (Trading, DeFi-FX, LLM) for reference
  2. Use the template (aiosx/kernel/domains/template/domain_kernel_template.py)
  3. Implement each layer incrementally
  4. Add health probes and metrics
  5. Define SLOs and KPIs
  6. Create workflow templates
  7. Write tests
  8. Document your implementation

VoiceOS Kernel Example

Implementation Overview

The VoiceOS Kernel demonstrates a complete implementation with:

  • CallSessionManager: Manages call lifecycle with state transitions
  • AudioStreamHandler: Handles RTP/WebRTC audio with jitter buffering
  • Provider Abstractions: STT/TTS/NLU with fallback support
  • DialogOrchestrator: Integrates with LLM Kernel for reasoning
  • Health Probes: Jitter, packet loss, latency, dropped calls

Key Files

  • aiosx/kernel/domains/voiceos/call_session.py
  • aiosx/kernel/domains/voiceos/providers.py
  • aiosx/kernel/domains/voiceos/audio_handler.py
  • aiosx/kernel/domains/voiceos/dialog_orchestrator.py
  • aiosx/kernel/domains/voiceos/voiceos_kernel.py

SentinelX Kernel Example

Implementation Overview

The SentinelX Security Kernel demonstrates:

  • SecurityEventIngestor: Normalizes events from all sources
  • DetectionEngine: Rule-based and ML-based threat detection
  • ResponseEngine: Executes security response actions
  • Integration: Works with KernelRegistry, TenantManager, MeshCircuitBreaker

Key Files

  • aiosx/kernel/domains/security/event_ingestion.py
  • aiosx/kernel/domains/security/detection_engine.py
  • aiosx/kernel/domains/security/response_engine.py
  • aiosx/kernel/domains/security/sentinelx_kernel.py

Security Integration Points

  1. Event Ingestion: All kernels emit security events
  2. Threat Detection: Rules and ML detect threats
  3. Response Actions: Automatic isolation, throttling, blocking
  4. AKO Integration: Security events influence optimization decisions
  5. Self-Healing: Security-driven recovery policies

Was this helpful?