ADR-016: Delta-Behavior System - Domain-Driven Design Architecture
Status: Proposed
Date: 2026-01-28
Parent: ADR-001 RuVector Core Architecture
Author: System Architecture Designer
Abstract
This ADR defines a comprehensive Domain-Driven Design (DDD) architecture for a "Delta-Behavior" system using RuVector WASM modules. The system captures, propagates, aggregates, and applies differential changes (deltas) to vector representations, enabling efficient incremental updates, temporal versioning, and distributed state synchronization.
1. Executive Summary
The Delta-Behavior system models state changes as first-class domain objects rather than simple mutations. By treating deltas as immutable, causally-ordered events, the system enables:
- Efficient incremental updates: Only transmit/store changes, not full states
- Temporal queries: Reconstruct any historical state via delta replay
- Conflict detection: Identify and resolve concurrent modifications
- Distributed sync: Propagate deltas across nodes with eventual consistency
- WASM portability: Core logic runs in browser, edge, and server environments
2. Domain Analysis
2.1 Strategic Domain Design
The Delta-Behavior system spans five bounded contexts, each representing a distinct subdomain:
2.2 Core Domain Concepts
| Domain Concept |
Definition |
| Delta |
An immutable record of a differential change between two states |
| DeltaStream |
Ordered sequence of deltas forming a causal chain |
| DeltaGraph |
DAG structure representing delta dependencies and branches |
| DeltaWindow |
Temporal container for batching deltas within a time/count boundary |
| DeltaVector |
Sparse representation of the actual change data (vector diff) |
| DeltaCheckpoint |
Full state snapshot at a specific delta sequence point |
3. Bounded Context Definitions
3.1 Delta Capture Domain
Purpose: Detect state changes and extract delta representations from source systems.
Ubiquitous Language
| Term |
Definition |
| Observer |
Component that monitors a source for state changes |
| ChangeEvent |
Raw notification that a state modification occurred |
| Detector |
Algorithm that identifies meaningful changes vs noise |
| Extractor |
Component that computes the delta between old and new state |
| CapturePolicy |
Rules governing when/how deltas are captured |
| SourceBinding |
Connection between observer and monitored resource |
Aggregate Roots
/// Delta Capture Domain - Aggregate Roots and Entities
pub mod delta_capture {
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
// ============================================================
// VALUE OBJECTS
// ============================================================
/// Unique identifier for a delta
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
pub struct DeltaId(pub u128);
impl DeltaId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4().as_u128())
}
pub fn from_bytes(bytes: [u8; 16]) -> Self {
Self(u128::from_be_bytes(bytes))
}
pub fn to_bytes(&self) -> [u8; 16] {
self.0.to_be_bytes()
}
}
/// Logical timestamp for causal ordering
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub struct DeltaTimestamp {
/// Logical clock (Lamport timestamp)
pub logical: u64,
/// Physical wall-clock time (milliseconds since epoch)
pub physical: u64,
/// Node identifier for tie-breaking
pub node_id: u32,
}
impl DeltaTimestamp {
pub fn new(logical: u64, physical: u64, node_id: u32) -> Self {
Self { logical, physical, node_id }
}
/// Advance logical clock, ensuring it's ahead of physical time
pub fn tick(&self) -> Self {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
logical: self.logical + 1,
physical: now_ms.max(self.physical),
node_id: self.node_id,
}
}
/// Merge with another timestamp (for receiving events)
pub fn merge(&self, other: &DeltaTimestamp) -> Self {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
logical: self.logical.max(other.logical) + 1,
physical: now_ms.max(self.physical).max(other.physical),
node_id: self.node_id,
}
}
}
/// Checksum for delta integrity verification
#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct DeltaChecksum(pub [u8; 32]);
impl DeltaChecksum {
/// Compute Blake3 hash of delta payload
pub fn compute(data: &[u8]) -> Self {
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
let mut bytes = [0u8; 32];
bytes.copy_from_slice(&result);
Self(bytes)
}
/// Chain with previous checksum for tamper-evidence
pub fn chain(&self, previous: &DeltaChecksum, data: &[u8]) -> Self {
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
hasher.update(&previous.0);
hasher.update(data);
let result = hasher.finalize();
let mut bytes = [0u8; 32];
bytes.copy_from_slice(&result);
Self(bytes)
}
}
/// Magnitude/size metric for a delta
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct DeltaMagnitude {
/// Number of dimensions changed
pub dimensions_changed: u32,
/// Total L2 norm of the change
pub l2_norm: f32,
/// Maximum single-dimension change
pub max_component: f32,
/// Sparsity ratio (changed/total dimensions)
pub sparsity: f32,
}
impl DeltaMagnitude {
pub fn compute(old: &[f32], new: &[f32]) -> Self {
assert_eq!(old.len(), new.len());
let mut dims_changed = 0u32;
let mut l2_sum = 0.0f32;
let mut max_comp = 0.0f32;
for (o, n) in old.iter().zip(new.iter()) {
let diff = (n - o).abs();
if diff > f32::EPSILON {
dims_changed += 1;
l2_sum += diff * diff;
max_comp = max_comp.max(diff);
}
}
Self {
dimensions_changed: dims_changed,
l2_norm: l2_sum.sqrt(),
max_component: max_comp,
sparsity: dims_changed as f32 / old.len() as f32,
}
}
/// Check if delta is significant enough to record
pub fn is_significant(&self, threshold: f32) -> bool {
self.l2_norm > threshold
}
}
/// Sparse representation of vector changes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DeltaVector {
/// Total dimensions of the full vector
pub total_dims: u32,
/// Indices of changed dimensions
pub indices: Vec<u32>,
/// Delta values (new - old) for each changed index
pub values: Vec<f32>,
/// Magnitude metrics
pub magnitude: DeltaMagnitude,
}
impl DeltaVector {
/// Create from old and new vectors
pub fn from_diff(old: &[f32], new: &[f32], min_diff: f32) -> Self {
assert_eq!(old.len(), new.len());
let mut indices = Vec::new();
let mut values = Vec::new();
for (i, (o, n)) in old.iter().zip(new.iter()).enumerate() {
let diff = n - o;
if diff.abs() > min_diff {
indices.push(i as u32);
values.push(diff);
}
}
Self {
total_dims: old.len() as u32,
indices,
values,
magnitude: DeltaMagnitude::compute(old, new),
}
}
/// Apply delta to a base vector
pub fn apply(&self, base: &mut [f32]) {
assert_eq!(base.len(), self.total_dims as usize);
for (&idx, &val) in self.indices.iter().zip(self.values.iter()) {
base[idx as usize] += val;
}
}
/// Invert delta (for rollback)
pub fn invert(&self) -> Self {
Self {
total_dims: self.total_dims,
indices: self.indices.clone(),
values: self.values.iter().map(|v| -v).collect(),
magnitude: self.magnitude,
}
}
/// Compose two deltas (this then other)
pub fn compose(&self, other: &DeltaVector) -> Self {
assert_eq!(self.total_dims, other.total_dims);
let mut combined: HashMap<u32, f32> = HashMap::new();
for (&idx, &val) in self.indices.iter().zip(self.values.iter()) {
*combined.entry(idx).or_insert(0.0) += val;
}
for (&idx, &val) in other.indices.iter().zip(other.values.iter()) {
*combined.entry(idx).or_insert(0.0) += val;
}
// Filter out zero changes
let filtered: Vec<_> = combined.into_iter()
.filter(|(_, v)| v.abs() > f32::EPSILON)
.collect();
let mut indices: Vec<u32> = filtered.iter().map(|(i, _)| *i).collect();
indices.sort();
let values: Vec<f32> = indices.iter()
.map(|i| filtered.iter().find(|(idx, _)| idx == i).unwrap().1)
.collect();
Self {
total_dims: self.total_dims,
indices,
values,
magnitude: DeltaMagnitude {
dimensions_changed: filtered.len() as u32,
l2_norm: values.iter().map(|v| v * v).sum::<f32>().sqrt(),
max_component: values.iter().map(|v| v.abs()).fold(0.0, f32::max),
sparsity: filtered.len() as f32 / self.total_dims as f32,
},
}
}
/// Serialize to bytes
pub fn to_bytes(&self) -> Vec<u8> {
bincode::serialize(self).unwrap()
}
/// Deserialize from bytes
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::Error> {
bincode::deserialize(bytes)
}
}
// ============================================================
// AGGREGATES
// ============================================================
/// Source identifier being observed
#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
pub struct SourceId(pub String);
/// Observer configuration and state
#[derive(Clone, Debug)]
pub struct Observer {
pub id: ObserverId,
pub source_id: SourceId,
pub capture_policy: CapturePolicy,
pub status: ObserverStatus,
pub last_capture: Option<DeltaTimestamp>,
pub metrics: ObserverMetrics,
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct ObserverId(pub u64);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ObserverStatus {
Active,
Paused,
Error,
Terminated,
}
#[derive(Clone, Debug)]
pub struct CapturePolicy {
/// Minimum time between captures (milliseconds)
pub min_interval_ms: u64,
/// Minimum magnitude threshold for capture
pub magnitude_threshold: f32,
/// Maximum deltas to buffer before force-flush
pub buffer_limit: usize,
/// Whether to capture zero-deltas as heartbeats
pub capture_heartbeats: bool,
}
impl Default for CapturePolicy {
fn default() -> Self {
Self {
min_interval_ms: 100,
magnitude_threshold: 1e-6,
buffer_limit: 1000,
capture_heartbeats: false,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct ObserverMetrics {
pub deltas_captured: u64,
pub deltas_filtered: u64,
pub bytes_processed: u64,
pub avg_magnitude: f32,
pub last_error: Option<String>,
}
// ============================================================
// DELTA AGGREGATE ROOT
// ============================================================
/// The core Delta entity - immutable once created
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Delta {
/// Unique identifier
pub id: DeltaId,
/// Source that produced this delta
pub source_id: SourceId,
/// Causal timestamp
pub timestamp: DeltaTimestamp,
/// Previous delta in the chain (None for genesis)
pub parent_id: Option<DeltaId>,
/// The actual change data
pub vector: DeltaVector,
/// Integrity checksum (chained with parent)
pub checksum: DeltaChecksum,
/// Additional metadata
pub metadata: HashMap<String, String>,
}
impl Delta {
/// Create a new delta
pub fn new(
source_id: SourceId,
timestamp: DeltaTimestamp,
parent: Option<&Delta>,
vector: DeltaVector,
metadata: HashMap<String, String>,
) -> Self {
let id = DeltaId::new();
let parent_id = parent.map(|p| p.id);
let payload = vector.to_bytes();
let checksum = match parent {
Some(p) => p.checksum.chain(&p.checksum, &payload),
None => DeltaChecksum::compute(&payload),
};
Self {
id,
source_id,
timestamp,
parent_id,
vector,
checksum,
metadata,
}
}
/// Verify checksum chain integrity
pub fn verify_chain(&self, parent: Option<&Delta>) -> bool {
let payload = self.vector.to_bytes();
let expected = match parent {
Some(p) => p.checksum.chain(&p.checksum, &payload),
None => DeltaChecksum::compute(&payload),
};
self.checksum == expected
}
/// Check if this delta is a descendant of another
pub fn is_descendant_of(&self, ancestor_id: DeltaId) -> bool {
self.parent_id == Some(ancestor_id)
}
}
}
Domain Events
| Event |
Payload |
Published When |
ChangeDetected |
source_id, old_state_hash, new_state_hash |
Observer detects state modification |
DeltaExtracted |
delta_id, source_id, magnitude |
Delta computed from change |
DeltaCaptured |
delta_id, timestamp, checksum |
Delta committed to capture buffer |
CaptureBufferFlushed |
delta_count, batch_id |
Buffer contents sent downstream |
ObserverError |
observer_id, error_type, message |
Capture failure |
ObserverPaused |
observer_id, reason |
Observer temporarily stopped |
Domain Services
/// Domain services for Delta Capture
pub mod capture_services {
use super::delta_capture::*;
/// Trait for change detection algorithms
pub trait ChangeDetector: Send + Sync {
/// Compare states and determine if change is significant
fn detect(&self, old: &[f32], new: &[f32], policy: &CapturePolicy) -> bool;
/// Get detector configuration
fn config(&self) -> DetectorConfig;
}
#[derive(Clone, Debug)]
pub struct DetectorConfig {
pub algorithm: String,
pub threshold: f32,
pub use_cosine: bool,
}
/// Default detector using L2 norm threshold
pub struct L2ThresholdDetector {
pub threshold: f32,
}
impl ChangeDetector for L2ThresholdDetector {
fn detect(&self, old: &[f32], new: &[f32], policy: &CapturePolicy) -> bool {
let magnitude = DeltaMagnitude::compute(old, new);
magnitude.l2_norm > self.threshold.max(policy.magnitude_threshold)
}
fn config(&self) -> DetectorConfig {
DetectorConfig {
algorithm: "l2_threshold".to_string(),
threshold: self.threshold,
use_cosine: false,
}
}
}
/// Trait for delta extraction
pub trait DeltaExtractor: Send + Sync {
/// Extract delta from state transition
fn extract(
&self,
source_id: &SourceId,
old_state: &[f32],
new_state: &[f32],
timestamp: DeltaTimestamp,
parent: Option<&Delta>,
) -> Delta;
}
/// Default sparse delta extractor
pub struct SparseDeltaExtractor {
pub min_component_diff: f32,
}
impl DeltaExtractor for SparseDeltaExtractor {
fn extract(
&self,
source_id: &SourceId,
old_state: &[f32],
new_state: &[f32],
timestamp: DeltaTimestamp,
parent: Option<&Delta>,
) -> Delta {
let vector = DeltaVector::from_diff(old_state, new_state, self.min_component_diff);
Delta::new(
source_id.clone(),
timestamp,
parent,
vector,
std::collections::HashMap::new(),
)
}
}
/// Capture orchestration service
pub trait CaptureService: Send + Sync {
/// Register an observer for a source
fn register_observer(
&mut self,
source_id: SourceId,
policy: CapturePolicy,
) -> Result<ObserverId, CaptureError>;
/// Process a state change notification
fn on_state_change(
&mut self,
observer_id: ObserverId,
old_state: &[f32],
new_state: &[f32],
) -> Result<Option<Delta>, CaptureError>;
/// Flush buffered deltas
fn flush(&mut self, observer_id: ObserverId) -> Result<Vec<Delta>, CaptureError>;
}
#[derive(Debug)]
pub enum CaptureError {
ObserverNotFound(ObserverId),
PolicyViolation(String),
ExtractionFailed(String),
BufferOverflow,
}
}
3.2 Delta Propagation Domain
Purpose: Route deltas through the system to interested subscribers with ordering guarantees.
Ubiquitous Language
| Term |
Definition |
| Channel |
Named conduit for delta transmission |
| Subscriber |
Consumer registered to receive deltas from channels |
| Router |
Component that directs deltas to appropriate channels |
| RoutingPolicy |
Rules for delta channel assignment |
| Backpressure |
Flow control mechanism when subscribers are slow |
| DeliveryGuarantee |
At-least-once, at-most-once, or exactly-once semantics |
Aggregate Roots
/// Delta Propagation Domain
pub mod delta_propagation {
use super::delta_capture::*;
use std::collections::{HashMap, HashSet};
// ============================================================
// VALUE OBJECTS
// ============================================================
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ChannelId(pub String);
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct SubscriberId(pub u64);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DeliveryGuarantee {
/// Fire and forget
AtMostOnce,
/// Retry until acknowledged
AtLeastOnce,
/// Deduplicated delivery
ExactlyOnce,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SubscriberStatus {
Active,
Paused,
Backpressured,
Disconnected,
}
/// Filter for selective subscription
#[derive(Clone, Debug)]
pub struct SubscriptionFilter {
/// Source patterns to match (glob-style)
pub source_patterns: Vec<String>,
/// Minimum magnitude to receive
pub min_magnitude: Option<f32>,
/// Metadata key-value matches
pub metadata_filters: HashMap<String, String>,
}
impl SubscriptionFilter {
pub fn matches(&self, delta: &Delta) -> bool {
// Check source pattern
let source_match = self.source_patterns.is_empty() ||
self.source_patterns.iter().any(|pat| {
glob_match(pat, &delta.source_id.0)
});
// Check magnitude
let magnitude_match = self.min_magnitude
.map(|min| delta.vector.magnitude.l2_norm >= min)
.unwrap_or(true);
// Check metadata
let metadata_match = self.metadata_filters.iter().all(|(k, v)| {
delta.metadata.get(k).map(|mv| mv == v).unwrap_or(false)
});
source_match && magnitude_match && metadata_match
}
}
fn glob_match(pattern: &str, text: &str) -> bool {
// Simple glob matching (* = any)
if pattern == "*" { return true; }
if pattern.contains('*') {
let parts: Vec<&str> = pattern.split('*').collect();
if parts.len() == 2 {
return text.starts_with(parts[0]) && text.ends_with(parts[1]);
}
}
pattern == text
}
// ============================================================
// AGGREGATES
// ============================================================
/// Channel for delta distribution
#[derive(Clone, Debug)]
pub struct Channel {
pub id: ChannelId,
pub name: String,
pub delivery_guarantee: DeliveryGuarantee,
pub subscribers: HashSet<SubscriberId>,
pub metrics: ChannelMetrics,
pub created_at: u64,
}
#[derive(Clone, Debug, Default)]
pub struct ChannelMetrics {
pub deltas_published: u64,
pub deltas_delivered: u64,
pub avg_latency_ms: f32,
pub subscriber_count: u32,
}
impl Channel {
pub fn new(id: ChannelId, name: String, guarantee: DeliveryGuarantee) -> Self {
Self {
id,
name,
delivery_guarantee: guarantee,
subscribers: HashSet::new(),
metrics: ChannelMetrics::default(),
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
pub fn add_subscriber(&mut self, sub_id: SubscriberId) -> bool {
self.subscribers.insert(sub_id)
}
pub fn remove_subscriber(&mut self, sub_id: &SubscriberId) -> bool {
self.subscribers.remove(sub_id)
}
}
/// Subscriber registration
#[derive(Clone, Debug)]
pub struct Subscriber {
pub id: SubscriberId,
pub name: String,
pub channels: HashSet<ChannelId>,
pub filter: SubscriptionFilter,
pub status: SubscriberStatus,
pub cursor: SubscriberCursor,
pub metrics: SubscriberMetrics,
}
/// Tracks subscriber progress through delta stream
#[derive(Clone, Debug)]
pub struct SubscriberCursor {
/// Last acknowledged delta per channel
pub last_acked: HashMap<ChannelId, DeltaId>,
/// Last timestamp received
pub last_timestamp: Option<DeltaTimestamp>,
/// Pending deltas awaiting acknowledgment
pub pending_count: u32,
}
#[derive(Clone, Debug, Default)]
pub struct SubscriberMetrics {
pub deltas_received: u64,
pub deltas_acked: u64,
pub avg_processing_time_ms: f32,
pub backpressure_events: u32,
}
/// Routing decision for a delta
#[derive(Clone, Debug)]
pub struct RoutingDecision {
pub delta_id: DeltaId,
pub target_channels: Vec<ChannelId>,
pub priority: RoutingPriority,
pub ttl_ms: Option<u64>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum RoutingPriority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
/// Routing policy definition
#[derive(Clone, Debug)]
pub struct RoutingPolicy {
pub id: String,
pub source_pattern: String,
pub target_channels: Vec<ChannelId>,
pub priority: RoutingPriority,
pub conditions: Vec<RoutingCondition>,
}
#[derive(Clone, Debug)]
pub enum RoutingCondition {
MinMagnitude(f32),
MetadataEquals(String, String),
MetadataExists(String),
TimeOfDay { start_hour: u8, end_hour: u8 },
}
}
Domain Events
| Event |
Payload |
Published When |
DeltaRouted |
delta_id, channel_ids, priority |
Router assigns delta to channels |
DeltaPublished |
delta_id, channel_id, subscriber_count |
Delta sent to channel |
DeltaDelivered |
delta_id, subscriber_id, latency_ms |
Subscriber receives delta |
DeltaAcknowledged |
delta_id, subscriber_id |
Subscriber confirms processing |
SubscriberBackpressured |
subscriber_id, pending_count |
Subscriber overwhelmed |
ChannelCreated |
channel_id, delivery_guarantee |
New channel registered |
SubscriptionChanged |
subscriber_id, added_channels, removed_channels |
Subscription modified |
Domain Services
3.3 Delta Aggregation Domain
Purpose: Combine, batch, and compress deltas for efficient storage and transmission.
Ubiquitous Language
| Term |
Definition |
| DeltaWindow |
Temporal container grouping deltas by time or count |
| Batch |
Collection of deltas packaged for bulk processing |
| Compaction |
Process of merging sequential deltas into fewer |
| Compression |
Reducing delta byte size through encoding |
| WindowPolicy |
Rules for window boundaries (time, count, size) |
| AggregatedDelta |
Result of combining multiple deltas |
Aggregate Roots
/// Delta Aggregation Domain
pub mod delta_aggregation {
use super::delta_capture::*;
use std::collections::HashMap;
// ============================================================
// VALUE OBJECTS
// ============================================================
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct WindowId(pub u64);
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct BatchId(pub u64);
/// Window boundary policy
#[derive(Clone, Debug)]
pub struct WindowPolicy {
/// Maximum time span in milliseconds
pub max_duration_ms: u64,
/// Maximum delta count
pub max_count: usize,
/// Maximum aggregate size in bytes
pub max_bytes: usize,
/// Force window close on these events
pub close_on_metadata: Vec<String>,
}
impl Default for WindowPolicy {
fn default() -> Self {
Self {
max_duration_ms: 1000,
max_count: 100,
max_bytes: 1024 * 1024, // 1MB
close_on_metadata: vec![],
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowStatus {
Open,
Closing,
Closed,
Compacted,
}
// ============================================================
// AGGREGATES
// ============================================================
/// Temporal window for delta collection
#[derive(Clone, Debug)]
pub struct DeltaWindow {
pub id: WindowId,
pub source_id: SourceId,
pub policy: WindowPolicy,
pub status: WindowStatus,
/// Deltas in this window (ordered by timestamp)
pub deltas: Vec<Delta>,
/// Window start timestamp
pub started_at: DeltaTimestamp,
/// Window close timestamp (if closed)
pub closed_at: Option<DeltaTimestamp>,
/// Aggregate metrics
pub metrics: WindowMetrics,
}
#[derive(Clone, Debug, Default)]
pub struct WindowMetrics {
pub delta_count: u32,
pub total_bytes: u64,
pub total_magnitude: f32,
pub dimensions_touched: u32,
}
impl DeltaWindow {
pub fn new(id: WindowId, source_id: SourceId, policy: WindowPolicy, start: DeltaTimestamp) -> Self {
Self {
id,
source_id,
policy,
status: WindowStatus::Open,
deltas: Vec::new(),
started_at: start,
closed_at: None,
metrics: WindowMetrics::default(),
}
}
/// Check if window should close
pub fn should_close(&self, current_time_ms: u64) -> bool {
if self.status != WindowStatus::Open {
return false;
}
// Time limit
let elapsed = current_time_ms - self.started_at.physical;
if elapsed >= self.policy.max_duration_ms {
return true;
}
// Count limit
if self.deltas.len() >= self.policy.max_count {
return true;
}
// Size limit
if self.metrics.total_bytes as usize >= self.policy.max_bytes {
return true;
}
false
}
/// Add delta to window
pub fn add(&mut self, delta: Delta) -> Result<(), WindowError> {
if self.status != WindowStatus::Open {
return Err(WindowError::WindowClosed);
}
// Check for close-on-metadata triggers
for key in &self.policy.close_on_metadata {
if delta.metadata.contains_key(key) {
self.status = WindowStatus::Closing;
}
}
let delta_bytes = delta.vector.to_bytes().len() as u64;
self.metrics.delta_count += 1;
self.metrics.total_bytes += delta_bytes;
self.metrics.total_magnitude += delta.vector.magnitude.l2_norm;
self.metrics.dimensions_touched = self.metrics.dimensions_touched
.max(delta.vector.magnitude.dimensions_changed);
self.deltas.push(delta);
Ok(())
}
/// Close the window
pub fn close(&mut self, timestamp: DeltaTimestamp) {
self.status = WindowStatus::Closed;
self.closed_at = Some(timestamp);
}
/// Compact all deltas into an aggregated delta
pub fn compact(&mut self) -> Option<AggregatedDelta> {
if self.deltas.is_empty() {
return None;
}
// Compose all deltas
let mut composed = self.deltas[0].vector.clone();
for delta in self.deltas.iter().skip(1) {
composed = composed.compose(&delta.vector);
}
let first = self.deltas.first().unwrap();
let last = self.deltas.last().unwrap();
self.status = WindowStatus::Compacted;
Some(AggregatedDelta {
window_id: self.id,
source_id: self.source_id.clone(),
first_delta_id: first.id,
last_delta_id: last.id,
delta_count: self.deltas.len() as u32,
composed_vector: composed,
time_span: TimeSpan {
start: first.timestamp,
end: last.timestamp,
},
compression_ratio: self.compute_compression_ratio(&composed),
})
}
fn compute_compression_ratio(&self, composed: &DeltaVector) -> f32 {
let original_bytes: usize = self.deltas.iter()
.map(|d| d.vector.to_bytes().len())
.sum();
let composed_bytes = composed.to_bytes().len();
if composed_bytes > 0 {
original_bytes as f32 / composed_bytes as f32
} else {
1.0
}
}
}
/// Result of window compaction
#[derive(Clone, Debug)]
pub struct AggregatedDelta {
pub window_id: WindowId,
pub source_id: SourceId,
pub first_delta_id: DeltaId,
pub last_delta_id: DeltaId,
pub delta_count: u32,
pub composed_vector: DeltaVector,
pub time_span: TimeSpan,
pub compression_ratio: f32,
}
#[derive(Clone, Debug)]
pub struct TimeSpan {
pub start: DeltaTimestamp,
pub end: DeltaTimestamp,
}
/// Batch of deltas for bulk operations
#[derive(Clone, Debug)]
pub struct DeltaBatch {
pub id: BatchId,
pub deltas: Vec<Delta>,
pub created_at: u64,
pub checksum: DeltaChecksum,
}
impl DeltaBatch {
pub fn new(deltas: Vec<Delta>) -> Self {
let id = BatchId(rand::random());
// Compute batch checksum
let mut data = Vec::new();
for delta in &deltas {
data.extend(&delta.id.to_bytes());
}
let checksum = DeltaChecksum::compute(&data);
Self {
id,
deltas,
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
checksum,
}
}
pub fn len(&self) -> usize {
self.deltas.len()
}
pub fn is_empty(&self) -> bool {
self.deltas.is_empty()
}
}
#[derive(Debug)]
pub enum WindowError {
WindowClosed,
PolicyViolation(String),
}
}
Domain Events
| Event |
Payload |
Published When |
WindowOpened |
window_id, source_id, policy |
New aggregation window started |
WindowClosed |
window_id, delta_count, duration_ms |
Window reached boundary |
WindowCompacted |
window_id, compression_ratio |
Deltas merged within window |
BatchCreated |
batch_id, delta_count, checksum |
Batch assembled |
BatchCompressed |
batch_id, original_size, compressed_size |
Batch compressed for storage |
AggregationPolicyChanged |
source_id, old_policy, new_policy |
Window policy updated |
3.4 Delta Application Domain
Purpose: Apply deltas to target states with validation and transformation.
Ubiquitous Language
| Term |
Definition |
| Applicator |
Component that applies deltas to target state |
| Target |
State vector being modified by deltas |
| Validator |
Component that verifies delta applicability |
| Transformer |
Component that modifies deltas before application |
| ApplicationResult |
Outcome of delta application (success/failure) |
| Rollback |
Reverting applied deltas |
Aggregate Roots
/// Delta Application Domain
pub mod delta_application {
use super::delta_capture::*;
use std::collections::HashMap;
// ============================================================
// VALUE OBJECTS
// ============================================================
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct TargetId(pub u64);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ApplicationStatus {
Pending,
Applied,
Failed,
RolledBack,
}
#[derive(Clone, Debug)]
pub enum ValidationResult {
Valid,
Invalid { reason: String },
Warning { message: String },
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConflictResolution {
/// Last write wins
LastWriteWins,
/// First write wins
FirstWriteWins,
/// Merge by averaging
Merge,
/// Reject on conflict
Reject,
/// Custom resolution function
Custom,
}
// ============================================================
// AGGREGATES
// ============================================================
/// Target state that receives delta applications
#[derive(Clone, Debug)]
pub struct DeltaTarget {
pub id: TargetId,
pub source_id: SourceId,
/// Current state vector
pub state: Vec<f32>,
/// Last applied delta
pub last_delta_id: Option<DeltaId>,
/// Last application timestamp
pub last_applied_at: Option<DeltaTimestamp>,
/// Application policy
pub policy: ApplicationPolicy,
/// Application history (ring buffer)
pub history: ApplicationHistory,
}
#[derive(Clone, Debug)]
pub struct ApplicationPolicy {
/// How to handle conflicts
pub conflict_resolution: ConflictResolution,
/// Maximum magnitude allowed per delta
pub max_magnitude: Option<f32>,
/// Dimensions that are read-only
pub locked_dimensions: Vec<u32>,
/// Whether to validate checksum chain
pub verify_chain: bool,
/// Maximum history entries to keep
pub history_limit: usize,
}
impl Default for ApplicationPolicy {
fn default() -> Self {
Self {
conflict_resolution: ConflictResolution::LastWriteWins,
max_magnitude: None,
locked_dimensions: vec![],
verify_chain: true,
history_limit: 1000,
}
}
}
/// Ring buffer of recent applications
#[derive(Clone, Debug)]
pub struct ApplicationHistory {
pub entries: Vec<ApplicationEntry>,
pub capacity: usize,
pub head: usize,
}
#[derive(Clone, Debug)]
pub struct ApplicationEntry {
pub delta_id: DeltaId,
pub applied_at: DeltaTimestamp,
pub status: ApplicationStatus,
/// Delta for rollback (inverted)
pub rollback_delta: Option<DeltaVector>,
}
impl ApplicationHistory {
pub fn new(capacity: usize) -> Self {
Self {
entries: Vec::with_capacity(capacity),
capacity,
head: 0,
}
}
pub fn push(&mut self, entry: ApplicationEntry) {
if self.entries.len() < self.capacity {
self.entries.push(entry);
} else {
self.entries[self.head] = entry;
}
self.head = (self.head + 1) % self.capacity;
}
pub fn last(&self) -> Option<&ApplicationEntry> {
if self.entries.is_empty() {
None
} else {
let idx = if self.head == 0 {
self.entries.len() - 1
} else {
self.head - 1
};
self.entries.get(idx)
}
}
/// Get entries for rollback (most recent first)
pub fn rollback_entries(&self, count: usize) -> Vec<&ApplicationEntry> {
let mut result = Vec::with_capacity(count);
let len = self.entries.len().min(count);
for i in 0..len {
let idx = if self.head >= i + 1 {
self.head - i - 1
} else {
self.entries.len() - (i + 1 - self.head)
};
if let Some(entry) = self.entries.get(idx) {
if entry.status == ApplicationStatus::Applied {
result.push(entry);
}
}
}
result
}
}
impl DeltaTarget {
pub fn new(
id: TargetId,
source_id: SourceId,
initial_state: Vec<f32>,
policy: ApplicationPolicy,
) -> Self {
Self {
id,
source_id,
state: initial_state,
last_delta_id: None,
last_applied_at: None,
policy: policy.clone(),
history: ApplicationHistory::new(policy.history_limit),
}
}
/// Validate a delta before application
pub fn validate(&self, delta: &Delta) -> ValidationResult {
// Check source matches
if delta.source_id != self.source_id {
return ValidationResult::Invalid {
reason: "Source ID mismatch".to_string(),
};
}
// Check dimensions match
if delta.vector.total_dims as usize != self.state.len() {
return ValidationResult::Invalid {
reason: format!(
"Dimension mismatch: delta has {} dims, target has {}",
delta.vector.total_dims, self.state.len()
),
};
}
// Check magnitude limit
if let Some(max_mag) = self.policy.max_magnitude {
if delta.vector.magnitude.l2_norm > max_mag {
return ValidationResult::Invalid {
reason: format!(
"Magnitude {} exceeds limit {}",
delta.vector.magnitude.l2_norm, max_mag
),
};
}
}
// Check locked dimensions
for &locked_dim in &self.policy.locked_dimensions {
if delta.vector.indices.contains(&locked_dim) {
return ValidationResult::Invalid {
reason: format!("Dimension {} is locked", locked_dim),
};
}
}
// Check causal ordering (parent should be our last applied)
if self.policy.verify_chain {
if let Some(expected_parent) = self.last_delta_id {
if delta.parent_id != Some(expected_parent) {
return ValidationResult::Warning {
message: format!(
"Non-sequential delta: expected parent {:?}, got {:?}",
expected_parent, delta.parent_id
),
};
}
}
}
ValidationResult::Valid
}
/// Apply a delta to the target state
pub fn apply(&mut self, delta: &Delta) -> Result<ApplicationResult, ApplicationError> {
// Validate first
match self.validate(delta) {
ValidationResult::Invalid { reason } => {
return Err(ApplicationError::ValidationFailed(reason));
}
ValidationResult::Warning { message } => {
// Log warning but continue
eprintln!("Warning: {}", message);
}
ValidationResult::Valid => {}
}
// Store rollback delta
let rollback_delta = delta.vector.invert();
// Apply the delta
delta.vector.apply(&mut self.state);
// Update metadata
self.last_delta_id = Some(delta.id);
self.last_applied_at = Some(delta.timestamp);
// Record in history
self.history.push(ApplicationEntry {
delta_id: delta.id,
applied_at: delta.timestamp,
status: ApplicationStatus::Applied,
rollback_delta: Some(rollback_delta),
});
Ok(ApplicationResult {
delta_id: delta.id,
target_id: self.id,
status: ApplicationStatus::Applied,
new_state_hash: self.compute_state_hash(),
})
}
/// Rollback the last N applied deltas
pub fn rollback(&mut self, count: usize) -> Result<Vec<DeltaId>, ApplicationError> {
let entries = self.history.rollback_entries(count);
if entries.is_empty() {
return Err(ApplicationError::NothingToRollback);
}
let mut rolled_back = Vec::with_capacity(entries.len());
for entry in entries {
if let Some(ref rollback_delta) = entry.rollback_delta {
rollback_delta.apply(&mut self.state);
rolled_back.push(entry.delta_id);
}
}
// Update last_delta_id to the one before rollback
self.last_delta_id = self.history.entries
.iter()
.filter(|e| e.status == ApplicationStatus::Applied && !rolled_back.contains(&e.delta_id))
.last()
.map(|e| e.delta_id);
Ok(rolled_back)
}
fn compute_state_hash(&self) -> [u8; 32] {
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
for val in &self.state {
hasher.update(&val.to_le_bytes());
}
let result = hasher.finalize();
let mut hash = [0u8; 32];
hash.copy_from_slice(&result);
hash
}
}
#[derive(Clone, Debug)]
pub struct ApplicationResult {
pub delta_id: DeltaId,
pub target_id: TargetId,
pub status: ApplicationStatus,
pub new_state_hash: [u8; 32],
}
#[derive(Debug)]
pub enum ApplicationError {
ValidationFailed(String),
TargetNotFound(TargetId),
ConflictDetected { delta_id: DeltaId, reason: String },
NothingToRollback,
StateCurrupted(String),
}
}
Domain Events
| Event |
Payload |
Published When |
DeltaApplied |
delta_id, target_id, new_state_hash |
Delta successfully applied |
DeltaRejected |
delta_id, target_id, reason |
Delta failed validation |
DeltaConflictDetected |
delta_id, conflicting_delta_id |
Concurrent modification detected |
DeltaMerged |
delta_ids, merged_delta_id |
Conflict resolved by merging |
DeltaRolledBack |
delta_ids, target_id |
Deltas reverted |
TargetStateCorrupted |
target_id, expected_hash, actual_hash |
Integrity check failed |
3.5 Delta Versioning Domain
Purpose: Manage temporal ordering, history, branching, and state reconstruction.
Ubiquitous Language
| Term |
Definition |
| DeltaStream |
Linear sequence of causally-ordered deltas |
| DeltaGraph |
DAG of deltas supporting branches and merges |
| Snapshot |
Full state capture at a specific version |
| Branch |
Named divergence from main delta stream |
| Merge |
Combining two branches into one |
| Replay |
Reconstructing state by applying deltas from a point |
Aggregate Roots
/// Delta Versioning Domain
pub mod delta_versioning {
use super::delta_capture::*;
use super::delta_aggregation::*;
use std::collections::{HashMap, HashSet, BTreeMap};
// ============================================================
// VALUE OBJECTS
// ============================================================
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct StreamId(pub u64);
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub struct SnapshotId(pub u64);
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct BranchId(pub String);
/// Version identifier (sequence number in stream)
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct Version(pub u64);
impl Version {
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
pub fn genesis() -> Self {
Self(0)
}
}
// ============================================================
// AGGREGATES
// ============================================================
/// Linear delta stream (append-only log)
#[derive(Clone, Debug)]
pub struct DeltaStream {
pub id: StreamId,
pub source_id: SourceId,
/// Deltas indexed by version
pub deltas: BTreeMap<Version, Delta>,
/// Current head version
pub head: Version,
/// Periodic snapshots for fast replay
pub snapshots: HashMap<Version, SnapshotId>,
/// Snapshot interval
pub snapshot_interval: u64,
/// Stream metadata
pub metadata: StreamMetadata,
}
#[derive(Clone, Debug, Default)]
pub struct StreamMetadata {
pub created_at: u64,
pub last_updated: u64,
pub total_deltas: u64,
pub total_bytes: u64,
}
impl DeltaStream {
pub fn new(id: StreamId, source_id: SourceId, snapshot_interval: u64) -> Self {
Self {
id,
source_id,
deltas: BTreeMap::new(),
head: Version::genesis(),
snapshots: HashMap::new(),
snapshot_interval,
metadata: StreamMetadata::default(),
}
}
/// Append a delta to the stream
pub fn append(&mut self, delta: Delta) -> Version {
let version = self.head.next();
self.head = version;
self.metadata.total_deltas += 1;
self.metadata.total_bytes += delta.vector.to_bytes().len() as u64;
self.metadata.last_updated = delta.timestamp.physical;
self.deltas.insert(version, delta);
version
}
/// Get delta at specific version
pub fn get(&self, version: Version) -> Option<&Delta> {
self.deltas.get(&version)
}
/// Get delta range (inclusive)
pub fn range(&self, from: Version, to: Version) -> Vec<&Delta> {
self.deltas.range(from..=to)
.map(|(_, d)| d)
.collect()
}
/// Find nearest snapshot before version
pub fn nearest_snapshot(&self, version: Version) -> Option<(Version, SnapshotId)> {
self.snapshots.iter()
.filter(|(v, _)| **v <= version)
.max_by_key(|(v, _)| *v)
.map(|(v, s)| (*v, *s))
}
/// Check if snapshot is due
pub fn should_snapshot(&self) -> bool {
let last_snapshot_version = self.snapshots.keys().max().copied()
.unwrap_or(Version::genesis());
self.head.0 - last_snapshot_version.0 >= self.snapshot_interval
}
/// Record a snapshot
pub fn record_snapshot(&mut self, version: Version, snapshot_id: SnapshotId) {
self.snapshots.insert(version, snapshot_id);
}
}
/// DAG-based delta graph (supports branching)
#[derive(Clone, Debug)]
pub struct DeltaGraph {
pub source_id: SourceId,
/// All deltas by ID
pub nodes: HashMap<DeltaId, DeltaGraphNode>,
/// Child relationships (parent -> children)
pub edges: HashMap<DeltaId, Vec<DeltaId>>,
/// Named branches
pub branches: HashMap<BranchId, DeltaId>,
/// Main branch head
pub main_head: Option<DeltaId>,
/// Root deltas (no parent)
pub roots: HashSet<DeltaId>,
}
#[derive(Clone, Debug)]
pub struct DeltaGraphNode {
pub delta: Delta,
pub version: Version,
pub branch: Option<BranchId>,
pub is_merge: bool,
/// Second parent for merge commits
pub merge_parent: Option<DeltaId>,
}
impl DeltaGraph {
pub fn new(source_id: SourceId) -> Self {
Self {
source_id,
nodes: HashMap::new(),
edges: HashMap::new(),
branches: HashMap::new(),
main_head: None,
roots: HashSet::new(),
}
}
/// Add a delta to the graph
pub fn add(&mut self, delta: Delta, branch: Option<BranchId>) -> DeltaId {
let delta_id = delta.id;
let parent_id = delta.parent_id;
// Determine version
let version = match parent_id {
Some(pid) => self.nodes.get(&pid)
.map(|n| n.version.next())
.unwrap_or(Version(1)),
None => Version(1),
};
// Create node
let node = DeltaGraphNode {
delta,
version,
branch: branch.clone(),
is_merge: false,
merge_parent: None,
};
self.nodes.insert(delta_id, node);
// Update edges
if let Some(pid) = parent_id {
self.edges.entry(pid).or_default().push(delta_id);
} else {
self.roots.insert(delta_id);
}
// Update branch head
if let Some(ref b) = branch {
self.branches.insert(b.clone(), delta_id);
} else {
self.main_head = Some(delta_id);
}
delta_id
}
/// Create a branch from a delta
pub fn create_branch(&mut self, branch_id: BranchId, from_delta: DeltaId) -> Result<(), VersioningError> {
if !self.nodes.contains_key(&from_delta) {
return Err(VersioningError::DeltaNotFound(from_delta));
}
if self.branches.contains_key(&branch_id) {
return Err(VersioningError::BranchExists(branch_id));
}
self.branches.insert(branch_id, from_delta);
Ok(())
}
/// Merge two branches
pub fn merge(
&mut self,
source_branch: &BranchId,
target_branch: &BranchId,
merged_vector: DeltaVector,
timestamp: DeltaTimestamp,
) -> Result<DeltaId, VersioningError> {
let source_head = self.branches.get(source_branch)
.ok_or_else(|| VersioningError::BranchNotFound(source_branch.clone()))?;
let target_head = self.branches.get(target_branch)
.ok_or_else(|| VersioningError::BranchNotFound(target_branch.clone()))?;
// Create merge delta
let merge_delta = Delta::new(
self.source_id.clone(),
timestamp,
self.nodes.get(target_head).map(|n| &n.delta),
merged_vector,
HashMap::from([("merge".to_string(), "true".to_string())]),
);
let merge_id = merge_delta.id;
// Add merge node
let version = self.nodes.get(target_head)
.map(|n| n.version.next())
.unwrap_or(Version(1));
let node = DeltaGraphNode {
delta: merge_delta,
version,
branch: Some(target_branch.clone()),
is_merge: true,
merge_parent: Some(*source_head),
};
self.nodes.insert(merge_id, node);
// Update edges (merge has two parents)
self.edges.entry(*target_head).or_default().push(merge_id);
self.edges.entry(*source_head).or_default().push(merge_id);
// Update target branch head
self.branches.insert(target_branch.clone(), merge_id);
Ok(merge_id)
}
/// Get ancestry path from root to delta
pub fn ancestry(&self, delta_id: DeltaId) -> Vec<DeltaId> {
let mut path = Vec::new();
let mut current = Some(delta_id);
while let Some(id) = current {
path.push(id);
current = self.nodes.get(&id)
.and_then(|n| n.delta.parent_id);
}
path.reverse();
path
}
/// Find common ancestor of two deltas
pub fn common_ancestor(&self, a: DeltaId, b: DeltaId) -> Option<DeltaId> {
let ancestry_a: HashSet<_> = self.ancestry(a).into_iter().collect();
for ancestor in self.ancestry(b) {
if ancestry_a.contains(&ancestor) {
return Some(ancestor);
}
}
None
}
/// Get all deltas in topological order
pub fn topological_order(&self) -> Vec<DeltaId> {
let mut result = Vec::new();
let mut visited = HashSet::new();
let mut stack: Vec<DeltaId> = self.roots.iter().copied().collect();
while let Some(id) = stack.pop() {
if visited.contains(&id) {
continue;
}
visited.insert(id);
result.push(id);
if let Some(children) = self.edges.get(&id) {
for &child in children {
if !visited.contains(&child) {
stack.push(child);
}
}
}
}
result
}
}
/// Full state snapshot for fast replay
#[derive(Clone, Debug)]
pub struct DeltaSnapshot {
pub id: SnapshotId,
pub source_id: SourceId,
/// Full state vector at this point
pub state: Vec<f32>,
/// Delta that produced this state
pub delta_id: DeltaId,
/// Stream version (if from stream)
pub version: Version,
/// Timestamp of snapshot
pub created_at: DeltaTimestamp,
/// State checksum
pub checksum: DeltaChecksum,
}
impl DeltaSnapshot {
pub fn new(
source_id: SourceId,
state: Vec<f32>,
delta_id: DeltaId,
version: Version,
timestamp: DeltaTimestamp,
) -> Self {
let mut data = Vec::new();
for val in &state {
data.extend(&val.to_le_bytes());
}
let checksum = DeltaChecksum::compute(&data);
Self {
id: SnapshotId(rand::random()),
source_id,
state,
delta_id,
version,
created_at: timestamp,
checksum,
}
}
/// Verify snapshot integrity
pub fn verify(&self) -> bool {
let mut data = Vec::new();
for val in &self.state {
data.extend(&val.to_le_bytes());
}
let computed = DeltaChecksum::compute(&data);
computed == self.checksum
}
}
/// Index for efficient delta lookup
#[derive(Clone, Debug)]
pub struct DeltaIndex {
/// Delta by ID
by_id: HashMap<DeltaId, Delta>,
/// Deltas by source
by_source: HashMap<SourceId, Vec<DeltaId>>,
/// Deltas by time range
by_time: BTreeMap<u64, Vec<DeltaId>>,
/// Checksum -> Delta mapping
by_checksum: HashMap<DeltaChecksum, DeltaId>,
}
impl DeltaIndex {
pub fn new() -> Self {
Self {
by_id: HashMap::new(),
by_source: HashMap::new(),
by_time: BTreeMap::new(),
by_checksum: HashMap::new(),
}
}
/// Index a delta
pub fn insert(&mut self, delta: Delta) {
let id = delta.id;
let source = delta.source_id.clone();
let time = delta.timestamp.physical;
let checksum = delta.checksum;
self.by_source.entry(source).or_default().push(id);
self.by_time.entry(time).or_default().push(id);
self.by_checksum.insert(checksum, id);
self.by_id.insert(id, delta);
}
/// Lookup by ID
pub fn get(&self, id: &DeltaId) -> Option<&Delta> {
self.by_id.get(id)
}
/// Query by source
pub fn by_source(&self, source: &SourceId) -> Vec<&Delta> {
self.by_source.get(source)
.map(|ids| ids.iter().filter_map(|id| self.by_id.get(id)).collect())
.unwrap_or_default()
}
/// Query by time range
pub fn by_time_range(&self, start_ms: u64, end_ms: u64) -> Vec<&Delta> {
self.by_time.range(start_ms..=end_ms)
.flat_map(|(_, ids)| ids.iter())
.filter_map(|id| self.by_id.get(id))
.collect()
}
/// Verify by checksum
pub fn verify(&self, checksum: &DeltaChecksum) -> Option<&Delta> {
self.by_checksum.get(checksum)
.and_then(|id| self.by_id.get(id))
}
}
#[derive(Debug)]
pub enum VersioningError {
DeltaNotFound(DeltaId),
BranchNotFound(BranchId),
BranchExists(BranchId),
SnapshotNotFound(SnapshotId),
ReplayFailed(String),
MergeConflict { delta_a: DeltaId, delta_b: DeltaId },
}
}
Domain Events
| Event |
Payload |
Published When |
DeltaVersioned |
delta_id, version, stream_id |
Delta assigned version number |
SnapshotCreated |
snapshot_id, version, state_hash |
Full state captured |
BranchCreated |
branch_id, from_delta_id |
New branch started |
BranchMerged |
source_branch, target_branch, merge_delta_id |
Branches combined |
StreamCompacted |
stream_id, before_count, after_count |
Old deltas archived |
ReplayStarted |
from_version, to_version |
State reconstruction begun |
ReplayCompleted |
target_version, delta_count, duration_ms |
State reconstruction finished |
4. Bounded Context Map
5. Anti-Corruption Layers
5.1 Propagation to Application ACL
/// ACL: Translate propagation concepts to application domain
pub mod propagation_to_application_acl {
use super::delta_propagation::*;
use super::delta_application::*;
use super::delta_capture::*;
/// Adapter that translates routing decisions into application requests
pub struct RoutingToApplicationAdapter;
impl RoutingToApplicationAdapter {
/// Convert a delivered delta into an application request
pub fn to_apply_request(
delta: &Delta,
routing: &RoutingDecision,
target_id: TargetId,
) -> ApplyRequest {
ApplyRequest {
delta: delta.clone(),
target_id,
priority: match routing.priority {
RoutingPriority::Critical => ApplicationPriority::Immediate,
RoutingPriority::High => ApplicationPriority::High,
RoutingPriority::Normal => ApplicationPriority::Normal,
RoutingPriority::Low => ApplicationPriority::Background,
},
timeout_ms: routing.ttl_ms,
retry_policy: match routing.priority {
RoutingPriority::Critical => RetryPolicy::Infinite,
RoutingPriority::High => RetryPolicy::Count(5),
RoutingPriority::Normal => RetryPolicy::Count(3),
RoutingPriority::Low => RetryPolicy::None,
},
}
}
/// Map application result back to acknowledgment
pub fn to_acknowledgment(
result: &ApplicationResult,
subscriber_id: SubscriberId,
) -> DeltaAcknowledgment {
DeltaAcknowledgment {
delta_id: result.delta_id,
subscriber_id,
success: matches!(result.status, ApplicationStatus::Applied),
new_version: Some(result.new_state_hash),
}
}
}
#[derive(Clone, Debug)]
pub struct ApplyRequest {
pub delta: Delta,
pub target_id: TargetId,
pub priority: ApplicationPriority,
pub timeout_ms: Option<u64>,
pub retry_policy: RetryPolicy,
}
#[derive(Clone, Copy, Debug)]
pub enum ApplicationPriority {
Immediate,
High,
Normal,
Background,
}
#[derive(Clone, Copy, Debug)]
pub enum RetryPolicy {
None,
Count(u32),
Infinite,
}
#[derive(Clone, Debug)]
pub struct DeltaAcknowledgment {
pub delta_id: DeltaId,
pub subscriber_id: SubscriberId,
pub success: bool,
pub new_version: Option<[u8; 32]>,
}
}
5.2 Aggregation to Versioning ACL
6. Repository Interfaces
/// Repository interfaces for persistence
pub mod repositories {
use super::delta_capture::*;
use super::delta_versioning::*;
use super::delta_aggregation::*;
use async_trait::async_trait;
// ============================================================
// DELTA REPOSITORY
// ============================================================
#[async_trait]
pub trait DeltaRepository: Send + Sync {
/// Store a delta
async fn save(&self, delta: &Delta) -> Result<(), RepositoryError>;
/// Retrieve delta by ID
async fn find_by_id(&self, id: &DeltaId) -> Result<Option<Delta>, RepositoryError>;
/// Find deltas by source
async fn find_by_source(
&self,
source_id: &SourceId,
limit: usize,
offset: usize,
) -> Result<Vec<Delta>, RepositoryError>;
/// Find deltas in time range
async fn find_by_time_range(
&self,
source_id: &SourceId,
start_ms: u64,
end_ms: u64,
) -> Result<Vec<Delta>, RepositoryError>;
/// Find deltas by parent (for graph traversal)
async fn find_children(&self, parent_id: &DeltaId) -> Result<Vec<Delta>, RepositoryError>;
/// Verify checksum chain
async fn verify_chain(
&self,
from_id: &DeltaId,
to_id: &DeltaId,
) -> Result<bool, RepositoryError>;
/// Bulk insert deltas
async fn save_batch(&self, deltas: &[Delta]) -> Result<usize, RepositoryError>;
/// Delete deltas older than version (for compaction)
async fn delete_before(
&self,
source_id: &SourceId,
before_timestamp: u64,
) -> Result<u64, RepositoryError>;
}
// ============================================================
// SNAPSHOT REPOSITORY
// ============================================================
#[async_trait]
pub trait SnapshotRepository: Send + Sync {
/// Store a snapshot
async fn save(&self, snapshot: &DeltaSnapshot) -> Result<(), RepositoryError>;
/// Retrieve snapshot by ID
async fn find_by_id(&self, id: &SnapshotId) -> Result<Option<DeltaSnapshot>, RepositoryError>;
/// Find nearest snapshot before version
async fn find_nearest(
&self,
source_id: &SourceId,
before_version: Version,
) -> Result<Option<DeltaSnapshot>, RepositoryError>;
/// List snapshots for source
async fn list_by_source(
&self,
source_id: &SourceId,
) -> Result<Vec<DeltaSnapshot>, RepositoryError>;
/// Delete old snapshots (keep N most recent)
async fn cleanup(
&self,
source_id: &SourceId,
keep_count: usize,
) -> Result<u64, RepositoryError>;
}
// ============================================================
// STREAM REPOSITORY
// ============================================================
#[async_trait]
pub trait StreamRepository: Send + Sync {
/// Get or create stream
async fn get_or_create(
&self,
source_id: &SourceId,
) -> Result<DeltaStream, RepositoryError>;
/// Save stream metadata
async fn save_metadata(
&self,
stream: &DeltaStream,
) -> Result<(), RepositoryError>;
/// Append delta to stream
async fn append(
&self,
stream_id: &StreamId,
delta: &Delta,
) -> Result<Version, RepositoryError>;
/// Get deltas in version range
async fn get_range(
&self,
stream_id: &StreamId,
from: Version,
to: Version,
) -> Result<Vec<Delta>, RepositoryError>;
/// Get current head version
async fn get_head(&self, stream_id: &StreamId) -> Result<Version, RepositoryError>;
}
// ============================================================
// INDEX REPOSITORY (for search)
// ============================================================
#[async_trait]
pub trait IndexRepository: Send + Sync {
/// Index a delta
async fn index(&self, delta: &Delta) -> Result<(), RepositoryError>;
/// Search by checksum
async fn search_by_checksum(
&self,
checksum: &DeltaChecksum,
) -> Result<Option<DeltaId>, RepositoryError>;
/// Search by metadata
async fn search_by_metadata(
&self,
key: &str,
value: &str,
) -> Result<Vec<DeltaId>, RepositoryError>;
/// Full-text search in metadata
async fn search_text(
&self,
query: &str,
limit: usize,
) -> Result<Vec<DeltaId>, RepositoryError>;
}
// ============================================================
// ERROR TYPES
// ============================================================
#[derive(Debug)]
pub enum RepositoryError {
NotFound(String),
Conflict(String),
ConnectionError(String),
SerializationError(String),
IntegrityError(String),
StorageFull,
Timeout,
}
}
7. Event Flow Diagram
8. WASM Integration Architecture
/// WASM bindings for Delta-Behavior system
pub mod wasm_bindings {
use wasm_bindgen::prelude::*;
use serde::{Deserialize, Serialize};
// ============================================================
// WASM-OPTIMIZED TYPES (minimal footprint)
// ============================================================
/// Compact delta for WASM (minimized size)
#[wasm_bindgen]
#[derive(Clone)]
pub struct WasmDelta {
id_high: u64,
id_low: u64,
parent_high: u64,
parent_low: u64,
timestamp: u64,
indices: Vec<u32>,
values: Vec<f32>,
}
#[wasm_bindgen]
impl WasmDelta {
#[wasm_bindgen(constructor)]
pub fn new(timestamp: u64, indices: Vec<u32>, values: Vec<f32>) -> Self {
let id = uuid::Uuid::new_v4().as_u128();
Self {
id_high: (id >> 64) as u64,
id_low: id as u64,
parent_high: 0,
parent_low: 0,
timestamp,
indices,
values,
}
}
/// Set parent delta ID
pub fn set_parent(&mut self, high: u64, low: u64) {
self.parent_high = high;
self.parent_low = low;
}
/// Get delta ID as hex string
pub fn id_hex(&self) -> String {
format!("{:016x}{:016x}", self.id_high, self.id_low)
}
/// Apply to a vector (in-place)
pub fn apply(&self, vector: &mut [f32]) {
for (&idx, &val) in self.indices.iter().zip(self.values.iter()) {
if (idx as usize) < vector.len() {
vector[idx as usize] += val;
}
}
}
/// Compute L2 magnitude
pub fn magnitude(&self) -> f32 {
self.values.iter().map(|v| v * v).sum::<f32>().sqrt()
}
/// Serialize to bytes
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(
16 + 16 + 8 + 4 + self.indices.len() * 4 + self.values.len() * 4
);
bytes.extend(&self.id_high.to_le_bytes());
bytes.extend(&self.id_low.to_le_bytes());
bytes.extend(&self.parent_high.to_le_bytes());
bytes.extend(&self.parent_low.to_le_bytes());
bytes.extend(&self.timestamp.to_le_bytes());
bytes.extend(&(self.indices.len() as u32).to_le_bytes());
for &idx in &self.indices {
bytes.extend(&idx.to_le_bytes());
}
for &val in &self.values {
bytes.extend(&val.to_le_bytes());
}
bytes
}
/// Deserialize from bytes
pub fn from_bytes(bytes: &[u8]) -> Result<WasmDelta, JsValue> {
if bytes.len() < 44 {
return Err(JsValue::from_str("Buffer too small"));
}
let id_high = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
let id_low = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
let parent_high = u64::from_le_bytes(bytes[16..24].try_into().unwrap());
let parent_low = u64::from_le_bytes(bytes[24..32].try_into().unwrap());
let timestamp = u64::from_le_bytes(bytes[32..40].try_into().unwrap());
let count = u32::from_le_bytes(bytes[40..44].try_into().unwrap()) as usize;
let expected_len = 44 + count * 8;
if bytes.len() < expected_len {
return Err(JsValue::from_str("Buffer too small for data"));
}
let mut indices = Vec::with_capacity(count);
let mut values = Vec::with_capacity(count);
let mut offset = 44;
for _ in 0..count {
indices.push(u32::from_le_bytes(bytes[offset..offset+4].try_into().unwrap()));
offset += 4;
}
for _ in 0..count {
values.push(f32::from_le_bytes(bytes[offset..offset+4].try_into().unwrap()));
offset += 4;
}
Ok(Self {
id_high,
id_low,
parent_high,
parent_low,
timestamp,
indices,
values,
})
}
}
// ============================================================
// WASM DELTA STREAM
// ============================================================
#[wasm_bindgen]
pub struct WasmDeltaStream {
deltas: Vec<WasmDelta>,
head_idx: Option<usize>,
}
#[wasm_bindgen]
impl WasmDeltaStream {
#[wasm_bindgen(constructor)]
pub fn new() -> Self {
Self {
deltas: Vec::new(),
head_idx: None,
}
}
/// Append delta to stream
pub fn append(&mut self, mut delta: WasmDelta) -> usize {
// Set parent to current head
if let Some(head) = self.head_idx {
let parent = &self.deltas[head];
delta.set_parent(parent.id_high, parent.id_low);
}
let idx = self.deltas.len();
self.deltas.push(delta);
self.head_idx = Some(idx);
idx
}
/// Get delta count
pub fn len(&self) -> usize {
self.deltas.len()
}
/// Apply all deltas to a vector
pub fn apply_all(&self, vector: &mut [f32]) {
for delta in &self.deltas {
delta.apply(vector);
}
}
/// Apply deltas from index to head
pub fn apply_from(&self, start_idx: usize, vector: &mut [f32]) {
for delta in self.deltas.iter().skip(start_idx) {
delta.apply(vector);
}
}
/// Compact stream by composing deltas
pub fn compact(&mut self) -> WasmDelta {
let mut indices_map = std::collections::HashMap::new();
for delta in &self.deltas {
for (&idx, &val) in delta.indices.iter().zip(delta.values.iter()) {
*indices_map.entry(idx).or_insert(0.0f32) += val;
}
}
let mut sorted: Vec<_> = indices_map.into_iter().collect();
sorted.sort_by_key(|(idx, _)| *idx);
let indices: Vec<u32> = sorted.iter().map(|(i, _)| *i).collect();
let values: Vec<f32> = sorted.iter().map(|(_, v)| *v).collect();
let timestamp = self.deltas.last()
.map(|d| d.timestamp)
.unwrap_or(0);
WasmDelta::new(timestamp, indices, values)
}
/// Serialize entire stream
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend(&(self.deltas.len() as u32).to_le_bytes());
for delta in &self.deltas {
let delta_bytes = delta.to_bytes();
bytes.extend(&(delta_bytes.len() as u32).to_le_bytes());
bytes.extend(&delta_bytes);
}
bytes
}
}
// ============================================================
// WASM DELTA DETECTOR (Change detection in WASM)
// ============================================================
#[wasm_bindgen]
pub struct WasmDeltaDetector {
threshold: f32,
min_component_diff: f32,
last_state: Option<Vec<f32>>,
}
#[wasm_bindgen]
impl WasmDeltaDetector {
#[wasm_bindgen(constructor)]
pub fn new(threshold: f32, min_component_diff: f32) -> Self {
Self {
threshold,
min_component_diff,
last_state: None,
}
}
/// Detect delta between last state and new state
pub fn detect(&mut self, new_state: Vec<f32>) -> Option<WasmDelta> {
let delta = match &self.last_state {
Some(old) => {
if old.len() != new_state.len() {
return None;
}
let mut indices = Vec::new();
let mut values = Vec::new();
let mut magnitude_sq = 0.0f32;
for (i, (&old_val, &new_val)) in old.iter().zip(new_state.iter()).enumerate() {
let diff = new_val - old_val;
if diff.abs() > self.min_component_diff {
indices.push(i as u32);
values.push(diff);
magnitude_sq += diff * diff;
}
}
let magnitude = magnitude_sq.sqrt();
if magnitude < self.threshold {
None
} else {
let timestamp = js_sys::Date::now() as u64;
Some(WasmDelta::new(timestamp, indices, values))
}
}
None => None,
};
self.last_state = Some(new_state);
delta
}
/// Reset detector state
pub fn reset(&mut self) {
self.last_state = None;
}
}
}
9. Technology Evaluation Matrix
| Component |
Option A |
Option B |
Recommendation |
Rationale |
| Delta Storage |
PostgreSQL + JSONB |
RuVector + Append Log |
RuVector |
Native vector support, HNSW for similarity |
| Checksum Chain |
SHA-256 |
Blake3 |
Blake3 |
3x faster, streaming support |
| Serialization |
JSON |
Bincode |
Bincode (WASM), JSON (API) |
Size: 60% smaller, speed: 10x faster |
| Timestamp |
Wall Clock |
Hybrid Logical |
Hybrid Logical |
Causality without clock sync |
| Conflict Resolution |
LWW |
Vector Clocks |
Vector Clocks |
Concurrent detection |
| WASM Runtime |
wasm-bindgen |
wit-bindgen |
wasm-bindgen |
Mature, browser-compatible |
| Pub/Sub |
Redis Streams |
NATS |
NATS (prod), in-process (embed) |
Persistence + at-least-once |
| Graph Storage |
Neo4j |
ruvector-graph |
ruvector-graph |
Native integration |
10. Consequences
Benefits
- Incremental Efficiency: Only transmit/store actual changes (typically 1-5% of full vector)
- Temporal Queries: Reconstruct any historical state via delta replay
- Conflict Visibility: Concurrent modifications explicitly tracked and resolved
- Audit Trail: Complete, tamper-evident history of all changes
- WASM Portability: Core delta logic runs anywhere (browser, edge, server)
- Composability: Deltas can be merged, compacted, or branched
- Clear Boundaries: Each domain has explicit responsibilities
Risks and Mitigations
| Risk |
Impact |
Probability |
Mitigation |
| Replay performance at scale |
High |
Medium |
Periodic snapshots (every N deltas) |
| Checksum chain corruption |
High |
Low |
Redundant storage, verification on read |
| Window aggregation data loss |
Medium |
Low |
WAL before window close |
| Branch merge conflicts |
Medium |
Medium |
Clear resolution strategies per domain |
| WASM memory limits |
Medium |
Medium |
Streaming delta application |
Trade-offs
- Storage vs Replay Speed: More frequent snapshots = faster replay, more storage
- Granularity vs Overhead: Fine-grained deltas = better precision, more metadata overhead
- Compression vs Latency: Window compaction = smaller storage, delayed visibility
- Consistency vs Availability: Strict ordering = stronger guarantees, potential blocking
11. Implementation Roadmap
Phase 1: Core Domain (4 weeks)
Phase 2: Propagation + Aggregation (3 weeks)
Phase 3: Application + Versioning (4 weeks)
Phase 4: Repositories + Integration (3 weeks)
Phase 5: Production Hardening (2 weeks)
12. References
- Evans, Eric. "Domain-Driven Design: Tackling Complexity in the Heart of Software" (2003)
- Vernon, Vaughn. "Implementing Domain-Driven Design" (2013)
- Kleppmann, Martin. "Designing Data-Intensive Applications" (2017) - Chapter 5: Replication
- RuVector Core:
/workspaces/ruvector/crates/ruvector-core
- RuVector DAG:
/workspaces/ruvector/crates/ruvector-dag
- RuVector Replication:
/workspaces/ruvector/crates/ruvector-replication/src/conflict.rs
- ADR-CE-004: Signed Event Log
Revision History
| Version |
Date |
Author |
Changes |
| 1.0 |
2026-01-28 |
System Architecture Designer |
Initial ADR |