Files

agentic-robotics-rt

Crates.io Documentation License

Real-time executor with priority scheduling for Agentic Robotics

Part of the Agentic Robotics framework - high-performance robotics middleware with ROS2 compatibility.

Features

  • ⏱️ Deterministic scheduling: Priority-based task execution with deadlines
  • 🔄 Dual runtime architecture: Separate thread pools for high/low priority tasks
  • 📊 Latency tracking: HDR histogram for microsecond-precision measurements
  • 🎯 Priority isolation: High-priority tasks never blocked by low-priority work
  • Microsecond deadlines: Schedule tasks with < 1ms deadlines
  • 🦀 Rust async/await: Full integration with Tokio ecosystem

Installation

Add to your Cargo.toml:

[dependencies]
agentic-robotics-core = "0.1.0"
agentic-robotics-rt = "0.1.0"
tokio = { version = "1", features = ["full"] }

Quick Start

Basic Priority Scheduling

use agentic_robotics_rt::{Executor, Priority, Deadline};
use std::time::Duration;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create executor with dual runtime
    let executor = Executor::new()?;

    // High-priority 1kHz control loop
    executor.spawn_rt(
        Priority::High,
        Deadline::from_hz(1000),  // 1ms deadline
        async {
            loop {
                // Read sensors, compute control, write actuators
                control_robot().await;
                tokio::time::sleep(Duration::from_micros(1000)).await;
            }
        }
    )?;

    // Low-priority logging (won't interfere with control loop)
    executor.spawn_rt(
        Priority::Low,
        Deadline::from_hz(10),  // 100ms deadline
        async {
            loop {
                log_telemetry().await;
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
        }
    )?;

    executor.run().await?;
    Ok(())
}

Deadline Enforcement

use agentic_robotics_rt::{Executor, Priority, Deadline};
use std::time::Duration;

let executor = Executor::new()?;

// Critical task must complete within 500µs
executor.spawn_rt(
    Priority::High,
    Deadline(Duration::from_micros(500)),
    async {
        // If this takes longer than 500µs, deadline missed warning
        critical_computation().await;
    }
)?;

Latency Monitoring

use agentic_robotics_rt::LatencyTracker;

let tracker = LatencyTracker::new();

let start = std::time::Instant::now();
process_message().await;
tracker.record(start.elapsed());

// Get statistics
println!("p50: {} µs", tracker.percentile(0.50) / 1000);
println!("p95: {} µs", tracker.percentile(0.95) / 1000);
println!("p99: {} µs", tracker.percentile(0.99) / 1000);
println!("p99.9: {} µs", tracker.percentile(0.999) / 1000);

Architecture

┌────────────────────────────────────────────┐
│     agentic-robotics-rt (Executor)         │
├────────────────────────────────────────────┤
│                                            │
│  ┌──────────────────────────────────────┐ │
│  │  Task Scheduler                      │ │
│  │  • Priority queue                    │ │
│  │  • Deadline tracking                 │ │
│  │  • Work stealing                     │ │
│  └──────────────────────────────────────┘ │
│                │                           │
│      ┌─────────┴─────────┐                │
│      │                   │                │
│  ┌───▼──────┐     ┌──────▼───┐           │
│  │ High-Pri │     │ Low-Pri  │           │
│  │ Runtime  │     │ Runtime  │           │
│  │ (2 thr)  │     │ (4 thr)  │           │
│  └──────────┘     └──────────┘           │
│      │                   │                │
│  ┌───▼───────────────────▼───┐           │
│  │  Tokio Async Runtime       │           │
│  └────────────────────────────┘           │
│                                            │
└────────────────────────────────────────────┘

Priority Levels

The executor supports multiple priority levels:

pub enum Priority {
    Critical,  // Real-time critical (< 100µs deadlines)
    High,      // High priority (< 1ms deadlines)
    Medium,    // Medium priority (< 10ms deadlines)
    Low,       // Low priority (> 10ms deadlines)
    Background,// Background tasks (no deadline)
}

Priority Assignment Guidelines

Priority Use Case Example Deadline
Critical Safety-critical control Emergency stop, collision avoidance < 100 µs
High Real-time control PID control, motor commands < 1 ms
Medium Sensor processing Image processing, point cloud filtering < 10 ms
Low Perception Object detection, SLAM < 100 ms
Background Logging, telemetry File I/O, network sync No deadline

Deadline Specification

Multiple ways to specify deadlines:

use std::time::Duration;
use agentic_robotics_rt::Deadline;

// Direct duration
let d1 = Deadline(Duration::from_micros(500));

// From frequency (Hz)
let d2 = Deadline::from_hz(1000);  // 1 kHz = 1ms deadline

// From milliseconds
let d3 = Deadline::from_millis(10);

// From microseconds
let d4 = Deadline::from_micros(100);

Real-Time Control Example

use agentic_robotics_core::Node;
use agentic_robotics_rt::{Executor, Priority, Deadline};
use std::time::Duration;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut node = Node::new("robot_controller")?;
    let executor = Executor::new()?;

    // Subscribe to sensor data
    let sensor_sub = node.subscribe::<JointState>("/joint_states")?;

    // Publish control commands
    let cmd_pub = node.publish::<JointCommand>("/joint_commands")?;

    // High-priority 1kHz control loop
    executor.spawn_rt(
        Priority::High,
        Deadline::from_hz(1000),
        async move {
            loop {
                // Read latest sensor data (non-blocking)
                if let Some(state) = sensor_sub.try_recv() {
                    // Compute control law
                    let cmd = compute_control(&state);

                    // Send command
                    cmd_pub.publish(&cmd).await.ok();
                }

                // 1kHz loop
                tokio::time::sleep(Duration::from_micros(1000)).await;
            }
        }
    )?;

    // Low-priority telemetry
    executor.spawn_rt(
        Priority::Low,
        Deadline::from_hz(10),
        async move {
            loop {
                log_robot_state().await;
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
        }
    )?;

    executor.run().await?;
    Ok(())
}

Performance

Real measurements on production hardware:

Metric Value
Task spawn overhead ~2 µs
Priority switch latency < 5 µs
Deadline jitter < 10 µs (p99.9)
Throughput > 100k tasks/sec

Latency Distribution

Measured latencies for 1kHz control loop:

p50:   800 µs  ✅ Excellent
p95:   950 µs  ✅ Good
p99:   990 µs  ✅ Acceptable
p99.9: 999 µs  ✅ Within deadline

Advanced Features

Custom Thread Pools

Configure thread pool sizes:

use agentic_robotics_rt::{Executor, RuntimeConfig};

let config = RuntimeConfig {
    high_priority_threads: 4,  // 4 threads for high-priority
    low_priority_threads: 8,   // 8 threads for low-priority
};

let executor = Executor::with_config(config)?;

CPU Affinity

Pin high-priority threads to specific cores:

use agentic_robotics_rt::{Executor, CpuAffinity};

let executor = Executor::new()?;

// Pin high-priority runtime to cores 0-1
executor.set_cpu_affinity(
    Priority::High,
    CpuAffinity::Cores(vec![0, 1])
)?;

// Pin low-priority runtime to cores 2-7
executor.set_cpu_affinity(
    Priority::Low,
    CpuAffinity::Cores(vec![2, 3, 4, 5, 6, 7])
)?;

Deadline Miss Handling

Handle deadline misses gracefully:

use agentic_robotics_rt::{Executor, DeadlinePolicy};

let executor = Executor::new()?;

executor.set_deadline_policy(DeadlinePolicy::Warn)?;  // Log warning
// or
executor.set_deadline_policy(DeadlinePolicy::Panic)?; // Panic on miss
// or
executor.set_deadline_policy(DeadlinePolicy::Callback(|task_id, deadline, actual| {
    eprintln!("Task {} missed deadline: {:?} vs {:?}", task_id, deadline, actual);
}))?;

Testing

# Run unit tests
cargo test --package agentic-robotics-rt

# Run real-time latency tests
cargo test --package agentic-robotics-rt --test latency -- --nocapture

# Run with logging
RUST_LOG=debug cargo test --package agentic-robotics-rt

Benchmarks

cargo bench --package agentic-robotics-rt --bench latency

Expected results:

task_spawn_overhead     time: [1.8 µs 2.0 µs 2.2 µs]
priority_switch         time: [4.2 µs 4.5 µs 4.8 µs]
deadline_tracking       time: [120 ns 125 ns 130 ns]

Platform Support

Platform Status Notes
Linux Full support SCHED_FIFO available with CAP_SYS_NICE
macOS Supported Thread priorities via pthread
Windows Supported SetThreadPriority API
Embedded Planned RTIC integration coming soon

Real-Time Tips

Best Practices

  1. Avoid allocations in hot path: Pre-allocate buffers
  2. Use try_recv() for non-blocking: Don't block high-priority tasks
  3. Keep critical sections short: < 100µs per iteration
  4. Profile regularly: Use latency tracking to find bottlenecks

Common Pitfalls

Don't do file I/O in high-priority tasks Don't use mutex locks in critical paths Don't allocate memory in control loops Don't make network calls in high-priority tasks

Do pre-allocate buffers Do use lock-free channels Do offload heavy work to low-priority tasks Do profile and measure latencies

License

Licensed under either of:

at your option.


Part of the Agentic Robotics framework • Built with ❤️ by the Agentic Robotics Team