Introduce core modules: device management, bus communication, and discovery protocol. Adds system device interface, virtual hardware bus, and device discovery logic. Includes tests for all components.

This commit is contained in:
geoffsee
2025-07-03 12:24:25 -04:00
parent 2311f43d97
commit 72bf4b53c6
7 changed files with 1810 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
[package]
name = "hardware"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
tokio = { version = "1.0", features = ["full"] }
uuid = { version = "1.0", features = ["v4"] }
tracing = "0.1"
[dev-dependencies]
tokio-test = "0.4"

366
crates/hardware/README.md Normal file
View File

@@ -0,0 +1,366 @@
# Virtual Hardware Abstraction Layer - Integration Guide
This document provides detailed instructions on how to integrate the virtual hardware abstraction layer into yachtpit systems.
## Overview
The virtual hardware abstraction layer consists of three main components:
1. **Hardware Bus** - Communication infrastructure for virtual devices
2. **System Device** - Interface and base implementation for virtual hardware devices
3. **Discovery Protocol** - Device discovery and capability advertisement
## Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Yachtpit Application │
├─────────────────────────────────────────────────────────────┤
│ Systems Crate │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ GPS System │ │Radar System │ │ AIS System │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Hardware Abstraction Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │Hardware Bus │ │System Device│ │Discovery │ │
│ │ │ │Interface │ │Protocol │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
## Integration Steps
### Step 1: Add Hardware Dependency
Add the hardware crate as a dependency to the systems crate:
```toml
# crates/systems/Cargo.toml
[dependencies]
hardware = { path = "../hardware" }
```
### Step 2: Create Hardware-Aware System Implementations
Modify existing systems to implement the `SystemDevice` trait:
```rust
// crates/systems/src/gps/gps_system.rs
use hardware::prelude::*;
pub struct GpsSystemDevice {
base: BaseSystemDevice,
// GPS-specific fields
position: Option<Position>,
satellites: u8,
}
#[async_trait::async_trait]
impl SystemDevice for GpsSystemDevice {
async fn initialize(&mut self) -> Result<()> {
self.base.initialize().await?;
// GPS-specific initialization
self.satellites = 0;
Ok(())
}
async fn process(&mut self) -> Result<Vec<BusMessage>> {
// Generate GPS data messages
let mut messages = Vec::new();
if let Some(position) = &self.position {
let payload = serde_json::to_vec(&position)?;
let message = BusMessage::Data {
from: self.base.info.address.clone(),
to: BusAddress::new("navigation_system"), // Example target
payload,
message_id: Uuid::new_v4(),
};
messages.push(message);
}
Ok(messages)
}
// Implement other required methods...
}
```
### Step 3: Set Up Hardware Bus
Create a central hardware bus manager:
```rust
// crates/systems/src/hardware_manager.rs
use hardware::prelude::*;
use std::sync::Arc;
pub struct HardwareManager {
bus: Arc<HardwareBus>,
device_manager: DeviceManager,
discovery_protocol: DiscoveryProtocol,
}
impl HardwareManager {
pub async fn new() -> Result<Self> {
let bus = Arc::new(HardwareBus::new());
let device_manager = DeviceManager::new();
// Create discovery protocol for the manager itself
let manager_info = DeviceInfo {
address: BusAddress::new("hardware_manager"),
config: DeviceConfig {
name: "Hardware Manager".to_string(),
capabilities: vec![DeviceCapability::Communication],
..Default::default()
},
status: DeviceStatus::Online,
last_seen: SystemTime::now(),
version: "1.0.0".to_string(),
manufacturer: "Yachtpit".to_string(),
};
let discovery_protocol = DiscoveryProtocol::new(
manager_info,
DiscoveryConfig::default(),
);
Ok(Self {
bus,
device_manager,
discovery_protocol,
})
}
pub async fn add_system_device(&mut self, device: Box<dyn SystemDevice>) -> Result<()> {
let address = device.get_info().address.clone();
// Connect device to bus
let connection = self.bus.connect_device(address.clone()).await?;
// Add to device manager
self.device_manager.add_device(device);
Ok(())
}
pub async fn start_all_systems(&mut self) -> Result<()> {
self.device_manager.start_all().await?;
self.discovery_protocol.start().await?;
Ok(())
}
}
```
### Step 4: Integrate with Existing Systems
Modify the existing vessel systems to use the hardware abstraction:
```rust
// crates/systems/src/vessel/vessel_systems.rs
use crate::hardware_manager::HardwareManager;
pub async fn create_vessel_systems_with_hardware() -> Result<HardwareManager> {
let mut hardware_manager = HardwareManager::new().await?;
// Create GPS system
let gps_config = DeviceConfig {
name: "GPS System".to_string(),
capabilities: vec![DeviceCapability::Gps],
update_interval_ms: 1000,
..Default::default()
};
let gps_device = Box::new(GpsSystemDevice::new(gps_config));
hardware_manager.add_system_device(gps_device).await?;
// Create Radar system
let radar_config = DeviceConfig {
name: "Radar System".to_string(),
capabilities: vec![DeviceCapability::Radar],
update_interval_ms: 500,
..Default::default()
};
let radar_device = Box::new(RadarSystemDevice::new(radar_config));
hardware_manager.add_system_device(radar_device).await?;
// Create AIS system
let ais_config = DeviceConfig {
name: "AIS System".to_string(),
capabilities: vec![DeviceCapability::Ais],
update_interval_ms: 2000,
..Default::default()
};
let ais_device = Box::new(AisSystemDevice::new(ais_config));
hardware_manager.add_system_device(ais_device).await?;
hardware_manager.start_all_systems().await?;
Ok(hardware_manager)
}
```
### Step 5: Update Main Application
Integrate the hardware manager into the main yachtpit application:
```rust
// crates/yachtpit/src/core/system_manager.rs
use systems::vessel::vessel_systems::create_vessel_systems_with_hardware;
pub struct SystemManager {
hardware_manager: Option<HardwareManager>,
}
impl SystemManager {
pub async fn initialize_with_hardware(&mut self) -> Result<()> {
let hardware_manager = create_vessel_systems_with_hardware().await?;
self.hardware_manager = Some(hardware_manager);
Ok(())
}
pub async fn discover_devices(&self) -> Result<Vec<DeviceInfo>> {
if let Some(ref manager) = self.hardware_manager {
// Use discovery protocol to find devices
manager.discovery_protocol.discover_devices(None).await?;
tokio::time::sleep(Duration::from_millis(100)).await; // Wait for responses
Ok(manager.discovery_protocol.get_known_devices().await)
} else {
Ok(vec![])
}
}
}
```
## Message Flow Examples
### GPS Data Flow
```
GPS Device → Hardware Bus → Navigation System
→ Discovery Protocol (heartbeat)
→ Other interested devices
```
### Device Discovery Flow
```
New Device → Announce Message → Hardware Bus → All Devices
Discovery Request → Hardware Bus → Matching Devices → Response
```
## Configuration
### Device Configuration
Each device can be configured with:
- Update intervals
- Capabilities
- Custom configuration parameters
- Message queue sizes
### Discovery Configuration
- Heartbeat intervals
- Device timeout periods
- Cleanup intervals
- Maximum tracked devices
## Testing Integration
### Unit Tests
Run tests for individual components:
```bash
cargo test -p hardware
cargo test -p systems
```
### Integration Tests
Create integration tests that verify the complete flow:
```rust
#[tokio::test]
async fn test_complete_hardware_integration() {
let mut hardware_manager = HardwareManager::new().await.unwrap();
// Add test devices
let gps_device = Box::new(create_test_gps_device());
hardware_manager.add_system_device(gps_device).await.unwrap();
// Start systems
hardware_manager.start_all_systems().await.unwrap();
// Verify device discovery
let devices = hardware_manager.discovery_protocol.get_known_devices().await;
assert!(!devices.is_empty());
// Test message passing
// ... additional test logic
}
```
## Performance Considerations
1. **Message Throughput**: The hardware bus uses unbounded channels for high throughput
2. **Device Limits**: Configure maximum device limits based on system resources
3. **Update Intervals**: Balance between data freshness and system load
4. **Memory Usage**: Monitor device registry size and message history
## Error Handling
The hardware abstraction layer provides comprehensive error handling:
- **Device Errors**: Automatic retry and fallback mechanisms
- **Bus Errors**: Connection recovery and message queuing
- **Discovery Errors**: Timeout handling and device cleanup
## Migration Strategy
### Phase 1: Parallel Implementation
- Keep existing systems running
- Implement hardware abstraction alongside
- Gradual migration of individual systems
### Phase 2: Feature Parity
- Ensure all existing functionality is available
- Add comprehensive testing
- Performance validation
### Phase 3: Full Migration
- Switch to hardware abstraction as primary
- Remove legacy system implementations
- Optimize performance
## Troubleshooting
### Common Issues
1. **Device Not Found**: Check device registration and bus connection
2. **Message Delivery Failures**: Verify device addresses and bus connectivity
3. **Discovery Timeouts**: Adjust discovery configuration parameters
4. **Performance Issues**: Monitor message queue sizes and update intervals
### Debugging Tools
```rust
// Enable debug logging
use tracing::{info, debug, warn};
// Check device status
// let device_info = hardware_manager.get_device_info(&address).await;
// debug!("Device status: {:?}", device_info.status);
// Monitor message history
// let messages = hardware_bus.get_message_history().await;
// info!("Recent messages: {}", messages.len());
```
## Future Enhancements
1. **Network Discovery**: Extend discovery protocol to work across network boundaries
2. **Device Simulation**: Add comprehensive device simulators for testing
3. **Hot-Plugging**: Support for dynamic device addition/removal
4. **Load Balancing**: Distribute device processing across multiple threads
5. **Persistence**: Save and restore device configurations and state
## Conclusion
The virtual hardware abstraction layer provides a robust foundation for managing yacht systems. By following this integration guide, you can gradually migrate existing systems while maintaining full functionality and adding new capabilities for device discovery and communication.
For questions or issues during integration, refer to the individual module documentation in the hardware crate or create an issue in the project repository.

338
crates/hardware/src/bus.rs Normal file
View File

@@ -0,0 +1,338 @@
//! Virtual Hardware Bus Module
//!
//! Provides a communication infrastructure for virtual hardware devices
use crate::{HardwareError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
/// Unique address for devices on the hardware bus
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct BusAddress {
pub id: Uuid,
pub name: String,
}
impl BusAddress {
/// Create a new bus address with a generated UUID
pub fn new(name: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
name: name.into(),
}
}
/// Create a bus address with a specific UUID
pub fn with_id(id: Uuid, name: impl Into<String>) -> Self {
Self {
id,
name: name.into(),
}
}
}
/// Message types that can be sent over the hardware bus
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BusMessage {
/// Data message with payload
Data {
from: BusAddress,
to: BusAddress,
payload: Vec<u8>,
message_id: Uuid,
},
/// Control message for bus management
Control {
from: BusAddress,
command: ControlCommand,
message_id: Uuid,
},
/// Broadcast message to all devices
Broadcast {
from: BusAddress,
payload: Vec<u8>,
message_id: Uuid,
},
/// Acknowledgment message
Ack {
to: BusAddress,
original_message_id: Uuid,
message_id: Uuid,
},
}
/// Control commands for bus management
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ControlCommand {
/// Register a device on the bus
Register { address: BusAddress },
/// Unregister a device from the bus
Unregister { address: BusAddress },
/// Ping a device
Ping { target: BusAddress },
/// Pong response to ping
Pong { from: BusAddress },
/// Request device list
ListDevices,
/// Response with device list
DeviceList { devices: Vec<BusAddress> },
}
impl BusMessage {
/// Get the message ID
pub fn message_id(&self) -> Uuid {
match self {
BusMessage::Data { message_id, .. } => *message_id,
BusMessage::Control { message_id, .. } => *message_id,
BusMessage::Broadcast { message_id, .. } => *message_id,
BusMessage::Ack { message_id, .. } => *message_id,
}
}
/// Get the sender address if available
pub fn from(&self) -> Option<&BusAddress> {
match self {
BusMessage::Data { from, .. } => Some(from),
BusMessage::Control { from, .. } => Some(from),
BusMessage::Broadcast { from, .. } => Some(from),
BusMessage::Ack { .. } => None,
}
}
}
/// Device connection handle for the hardware bus
pub struct DeviceConnection {
pub address: BusAddress,
pub sender: mpsc::UnboundedSender<BusMessage>,
pub receiver: mpsc::UnboundedReceiver<BusMessage>,
}
/// Virtual Hardware Bus implementation
pub struct HardwareBus {
devices: Arc<RwLock<HashMap<BusAddress, mpsc::UnboundedSender<BusMessage>>>>,
message_log: Arc<RwLock<Vec<BusMessage>>>,
}
impl Default for HardwareBus {
fn default() -> Self {
Self::new()
}
}
impl HardwareBus {
/// Create a new hardware bus
pub fn new() -> Self {
Self {
devices: Arc::new(RwLock::new(HashMap::new())),
message_log: Arc::new(RwLock::new(Vec::new())),
}
}
/// Connect a device to the bus
pub async fn connect_device(&self, address: BusAddress) -> Result<DeviceConnection> {
let (tx, rx) = mpsc::unbounded_channel();
{
let mut devices = self.devices.write().await;
if devices.contains_key(&address) {
return Err(HardwareError::generic(format!(
"Device {} already connected", address.name
)));
}
devices.insert(address.clone(), tx.clone());
}
info!("Device {} connected to bus", address.name);
// Send registration message to all other devices
let register_msg = BusMessage::Control {
from: address.clone(),
command: ControlCommand::Register {
address: address.clone(),
},
message_id: Uuid::new_v4(),
};
self.broadcast_message(register_msg).await?;
Ok(DeviceConnection {
address,
sender: tx,
receiver: rx,
})
}
/// Disconnect a device from the bus
pub async fn disconnect_device(&self, address: &BusAddress) -> Result<()> {
{
let mut devices = self.devices.write().await;
devices.remove(address);
}
info!("Device {} disconnected from bus", address.name);
// Send unregistration message to all other devices
let unregister_msg = BusMessage::Control {
from: address.clone(),
command: ControlCommand::Unregister {
address: address.clone(),
},
message_id: Uuid::new_v4(),
};
self.broadcast_message(unregister_msg).await?;
Ok(())
}
/// Send a message to a specific device
pub async fn send_message(&self, message: BusMessage) -> Result<()> {
// Log the message
{
let mut log = self.message_log.write().await;
log.push(message.clone());
}
match &message {
BusMessage::Data { to, .. } => {
let devices = self.devices.read().await;
if let Some(sender) = devices.get(to) {
sender.send(message).map_err(|_| {
HardwareError::bus_communication("Failed to send message to device")
})?;
} else {
return Err(HardwareError::device_not_found(&to.name));
}
}
BusMessage::Broadcast { .. } => {
self.broadcast_message(message).await?;
}
BusMessage::Control { .. } => {
self.broadcast_message(message).await?;
}
BusMessage::Ack { to, .. } => {
let devices = self.devices.read().await;
if let Some(sender) = devices.get(to) {
sender.send(message).map_err(|_| {
HardwareError::bus_communication("Failed to send ACK to device")
})?;
} else {
warn!("Attempted to send ACK to unknown device: {}", to.name);
}
}
}
Ok(())
}
/// Broadcast a message to all connected devices
async fn broadcast_message(&self, message: BusMessage) -> Result<()> {
let devices = self.devices.read().await;
let sender_address = message.from();
for (address, sender) in devices.iter() {
// Don't send message back to sender
if let Some(from) = sender_address {
if address == from {
continue;
}
}
if let Err(_) = sender.send(message.clone()) {
error!("Failed to broadcast message to device: {}", address.name);
}
}
Ok(())
}
/// Get list of connected devices
pub async fn get_connected_devices(&self) -> Vec<BusAddress> {
let devices = self.devices.read().await;
devices.keys().cloned().collect()
}
/// Get message history
pub async fn get_message_history(&self) -> Vec<BusMessage> {
let log = self.message_log.read().await;
log.clone()
}
/// Clear message history
pub async fn clear_message_history(&self) {
let mut log = self.message_log.write().await;
log.clear();
}
/// Check if a device is connected
pub async fn is_device_connected(&self, address: &BusAddress) -> bool {
let devices = self.devices.read().await;
devices.contains_key(address)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio_test;
#[tokio::test]
async fn test_bus_creation() {
let bus = HardwareBus::new();
assert_eq!(bus.get_connected_devices().await.len(), 0);
}
#[tokio::test]
async fn test_device_connection() {
let bus = HardwareBus::new();
let address = BusAddress::new("test_device");
let connection = bus.connect_device(address.clone()).await.unwrap();
assert_eq!(connection.address, address);
assert!(bus.is_device_connected(&address).await);
}
#[tokio::test]
async fn test_device_disconnection() {
let bus = HardwareBus::new();
let address = BusAddress::new("test_device");
let _connection = bus.connect_device(address.clone()).await.unwrap();
assert!(bus.is_device_connected(&address).await);
bus.disconnect_device(&address).await.unwrap();
assert!(!bus.is_device_connected(&address).await);
}
#[tokio::test]
async fn test_message_sending() {
let bus = HardwareBus::new();
let addr1 = BusAddress::new("device1");
let addr2 = BusAddress::new("device2");
let mut conn1 = bus.connect_device(addr1.clone()).await.unwrap();
let _conn2 = bus.connect_device(addr2.clone()).await.unwrap();
let message = BusMessage::Data {
from: addr2.clone(),
to: addr1.clone(),
payload: b"test data".to_vec(),
message_id: Uuid::new_v4(),
};
bus.send_message(message.clone()).await.unwrap();
// Check if message was received
let received = conn1.receiver.recv().await.unwrap();
match received {
BusMessage::Data { payload, .. } => {
assert_eq!(payload, b"test data");
}
_ => panic!("Expected data message"),
}
}
}

View File

@@ -0,0 +1,430 @@
//! System Device Module
//!
//! Defines the interface and behavior for virtual hardware devices
use crate::{BusAddress, BusMessage, HardwareError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use uuid::Uuid;
/// Device capabilities that can be advertised
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DeviceCapability {
/// GPS positioning capability
Gps,
/// Radar detection capability
Radar,
/// AIS (Automatic Identification System) capability
Ais,
/// Engine monitoring capability
Engine,
/// Navigation capability
Navigation,
/// Communication capability
Communication,
/// Sensor data capability
Sensor,
/// Custom capability with name
Custom(String),
}
impl DeviceCapability {
/// Get the capability name as a string
pub fn name(&self) -> &str {
match self {
DeviceCapability::Gps => "GPS",
DeviceCapability::Radar => "Radar",
DeviceCapability::Ais => "AIS",
DeviceCapability::Engine => "Engine",
DeviceCapability::Navigation => "Navigation",
DeviceCapability::Communication => "Communication",
DeviceCapability::Sensor => "Sensor",
DeviceCapability::Custom(name) => name,
}
}
}
/// Current status of a device
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum DeviceStatus {
/// Device is initializing
Initializing,
/// Device is online and operational
Online,
/// Device is offline
Offline,
/// Device has encountered an error
Error { message: String },
/// Device is in maintenance mode
Maintenance,
}
/// Device configuration parameters
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceConfig {
/// Device name
pub name: String,
/// Device capabilities
pub capabilities: Vec<DeviceCapability>,
/// Update interval in milliseconds
pub update_interval_ms: u64,
/// Maximum message queue size
pub max_queue_size: usize,
/// Device-specific configuration
pub custom_config: HashMap<String, String>,
}
impl Default for DeviceConfig {
fn default() -> Self {
Self {
name: "Unknown Device".to_string(),
capabilities: vec![],
update_interval_ms: 1000,
max_queue_size: 100,
custom_config: HashMap::new(),
}
}
}
/// Device information for discovery and identification
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceInfo {
/// Device address
pub address: BusAddress,
/// Device configuration
pub config: DeviceConfig,
/// Current status
pub status: DeviceStatus,
/// Last seen timestamp
pub last_seen: SystemTime,
/// Device version
pub version: String,
/// Manufacturer information
pub manufacturer: String,
}
/// Trait for implementing system devices
#[async_trait::async_trait]
pub trait SystemDevice: Send + Sync {
/// Initialize the device
async fn initialize(&mut self) -> Result<()>;
/// Start the device operation
async fn start(&mut self) -> Result<()>;
/// Stop the device operation
async fn stop(&mut self) -> Result<()>;
/// Get device information
fn get_info(&self) -> DeviceInfo;
/// Get current device status
fn get_status(&self) -> DeviceStatus;
/// Handle incoming bus message
async fn handle_message(&mut self, message: BusMessage) -> Result<Option<BusMessage>>;
/// Process device-specific logic (called periodically)
async fn process(&mut self) -> Result<Vec<BusMessage>>;
/// Get device capabilities
fn get_capabilities(&self) -> Vec<DeviceCapability>;
/// Update device configuration
async fn update_config(&mut self, config: DeviceConfig) -> Result<()>;
}
/// Base implementation for system devices
pub struct BaseSystemDevice {
pub info: DeviceInfo,
pub message_sender: Option<mpsc::UnboundedSender<BusMessage>>,
pub message_receiver: Option<mpsc::UnboundedReceiver<BusMessage>>,
pub is_running: bool,
}
impl BaseSystemDevice {
/// Create a new base system device
pub fn new(config: DeviceConfig) -> Self {
let address = BusAddress::new(&config.name);
let info = DeviceInfo {
address,
config,
status: DeviceStatus::Initializing,
last_seen: SystemTime::now(),
version: "1.0.0".to_string(),
manufacturer: "Virtual Hardware".to_string(),
};
Self {
info,
message_sender: None,
message_receiver: None,
is_running: false,
}
}
/// Set the message channels
pub fn set_message_channels(
&mut self,
sender: mpsc::UnboundedSender<BusMessage>,
receiver: mpsc::UnboundedReceiver<BusMessage>,
) {
self.message_sender = Some(sender);
self.message_receiver = Some(receiver);
}
/// Send a message through the bus
pub async fn send_message(&self, message: BusMessage) -> Result<()> {
if let Some(sender) = &self.message_sender {
sender.send(message).map_err(|_| {
HardwareError::bus_communication("Failed to send message from device")
})?;
} else {
return Err(HardwareError::generic("Device not connected to bus"));
}
Ok(())
}
/// Update device status
pub fn set_status(&mut self, status: DeviceStatus) {
self.info.status = status;
self.info.last_seen = SystemTime::now();
}
/// Check if device is running
pub fn is_running(&self) -> bool {
self.is_running
}
}
#[async_trait::async_trait]
impl SystemDevice for BaseSystemDevice {
async fn initialize(&mut self) -> Result<()> {
info!("Initializing device: {}", self.info.config.name);
self.set_status(DeviceStatus::Initializing);
// Simulate initialization delay
tokio::time::sleep(Duration::from_millis(100)).await;
self.set_status(DeviceStatus::Online);
Ok(())
}
async fn start(&mut self) -> Result<()> {
info!("Starting device: {}", self.info.config.name);
self.is_running = true;
self.set_status(DeviceStatus::Online);
Ok(())
}
async fn stop(&mut self) -> Result<()> {
info!("Stopping device: {}", self.info.config.name);
self.is_running = false;
self.set_status(DeviceStatus::Offline);
Ok(())
}
fn get_info(&self) -> DeviceInfo {
self.info.clone()
}
fn get_status(&self) -> DeviceStatus {
self.info.status.clone()
}
async fn handle_message(&mut self, message: BusMessage) -> Result<Option<BusMessage>> {
debug!("Device {} received message: {:?}", self.info.config.name, message);
match message {
BusMessage::Control { command, .. } => {
match command {
crate::bus::ControlCommand::Ping { target } => {
if target == self.info.address {
let pong = BusMessage::Control {
from: self.info.address.clone(),
command: crate::bus::ControlCommand::Pong {
from: self.info.address.clone(),
},
message_id: Uuid::new_v4(),
};
return Ok(Some(pong));
}
}
_ => {}
}
}
_ => {}
}
Ok(None)
}
async fn process(&mut self) -> Result<Vec<BusMessage>> {
// Base implementation does nothing
Ok(vec![])
}
fn get_capabilities(&self) -> Vec<DeviceCapability> {
self.info.config.capabilities.clone()
}
async fn update_config(&mut self, config: DeviceConfig) -> Result<()> {
info!("Updating config for device: {}", self.info.config.name);
self.info.config = config;
Ok(())
}
}
/// Device manager for handling multiple devices
pub struct DeviceManager {
devices: HashMap<BusAddress, Box<dyn SystemDevice>>,
}
impl DeviceManager {
/// Create a new device manager
pub fn new() -> Self {
Self {
devices: HashMap::new(),
}
}
/// Add a device to the manager
pub fn add_device(&mut self, device: Box<dyn SystemDevice>) {
let address = device.get_info().address.clone();
self.devices.insert(address, device);
}
/// Remove a device from the manager
pub fn remove_device(&mut self, address: &BusAddress) -> Option<Box<dyn SystemDevice>> {
self.devices.remove(address)
}
/// Get a device by address
pub fn get_device(&self, address: &BusAddress) -> Option<&dyn SystemDevice> {
self.devices.get(address).map(|d| d.as_ref())
}
/// Get a mutable device by address
pub fn get_device_mut(&mut self, address: &BusAddress) -> Option<&mut Box<dyn SystemDevice>> {
self.devices.get_mut(address)
}
/// Initialize all devices
pub async fn initialize_all(&mut self) -> Result<()> {
for device in self.devices.values_mut() {
device.initialize().await?;
}
Ok(())
}
/// Start all devices
pub async fn start_all(&mut self) -> Result<()> {
for device in self.devices.values_mut() {
device.start().await?;
}
Ok(())
}
/// Stop all devices
pub async fn stop_all(&mut self) -> Result<()> {
for device in self.devices.values_mut() {
device.stop().await?;
}
Ok(())
}
/// Get all device information
pub fn get_all_device_info(&self) -> Vec<DeviceInfo> {
self.devices.values().map(|d| d.get_info()).collect()
}
/// Process all devices
pub async fn process_all(&mut self) -> Result<Vec<BusMessage>> {
let mut messages = Vec::new();
for device in self.devices.values_mut() {
let device_messages = device.process().await?;
messages.extend(device_messages);
}
Ok(messages)
}
}
impl Default for DeviceManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_device_creation() {
let config = DeviceConfig {
name: "Test Device".to_string(),
capabilities: vec![DeviceCapability::Gps],
..Default::default()
};
let device = BaseSystemDevice::new(config);
assert_eq!(device.info.config.name, "Test Device");
assert_eq!(device.info.status, DeviceStatus::Initializing);
}
#[tokio::test]
async fn test_device_initialization() {
let config = DeviceConfig {
name: "Test Device".to_string(),
..Default::default()
};
let mut device = BaseSystemDevice::new(config);
device.initialize().await.unwrap();
assert_eq!(device.get_status(), DeviceStatus::Online);
}
#[tokio::test]
async fn test_device_start_stop() {
let config = DeviceConfig {
name: "Test Device".to_string(),
..Default::default()
};
let mut device = BaseSystemDevice::new(config);
device.start().await.unwrap();
assert!(device.is_running());
assert_eq!(device.get_status(), DeviceStatus::Online);
device.stop().await.unwrap();
assert!(!device.is_running());
assert_eq!(device.get_status(), DeviceStatus::Offline);
}
#[tokio::test]
async fn test_device_manager() {
let mut manager = DeviceManager::new();
let config = DeviceConfig {
name: "Test Device".to_string(),
..Default::default()
};
let device = Box::new(BaseSystemDevice::new(config));
let address = device.get_info().address.clone();
manager.add_device(device);
assert!(manager.get_device(&address).is_some());
manager.initialize_all().await.unwrap();
manager.start_all().await.unwrap();
let info = manager.get_all_device_info();
assert_eq!(info.len(), 1);
assert_eq!(info[0].status, DeviceStatus::Online);
}
}

View File

@@ -0,0 +1,561 @@
//! Discovery Protocol Module
//!
//! Provides device discovery and capability advertisement functionality
use crate::{BusAddress, BusMessage, DeviceCapability, DeviceInfo, HardwareError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};
use uuid::Uuid;
use std::sync::Arc;
/// Discovery message types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DiscoveryMessage {
/// Announce device presence and capabilities
Announce {
device_info: DeviceInfo,
timestamp: SystemTime,
},
/// Request device information
Discover {
requester: BusAddress,
filter: Option<DiscoveryFilter>,
timestamp: SystemTime,
},
/// Response to discovery request
DiscoverResponse {
devices: Vec<DeviceInfo>,
responder: BusAddress,
timestamp: SystemTime,
},
/// Heartbeat to maintain presence
Heartbeat {
device: BusAddress,
timestamp: SystemTime,
},
/// Device going offline notification
Goodbye {
device: BusAddress,
timestamp: SystemTime,
},
}
/// Filter criteria for device discovery
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveryFilter {
/// Filter by device capabilities
pub capabilities: Option<Vec<DeviceCapability>>,
/// Filter by device name pattern
pub name_pattern: Option<String>,
/// Filter by manufacturer
pub manufacturer: Option<String>,
/// Filter by minimum version
pub min_version: Option<String>,
}
impl DiscoveryFilter {
/// Create a new empty filter
pub fn new() -> Self {
Self {
capabilities: None,
name_pattern: None,
manufacturer: None,
min_version: None,
}
}
/// Filter by capabilities
pub fn with_capabilities(mut self, capabilities: Vec<DeviceCapability>) -> Self {
self.capabilities = Some(capabilities);
self
}
/// Filter by name pattern
pub fn with_name_pattern(mut self, pattern: impl Into<String>) -> Self {
self.name_pattern = Some(pattern.into());
self
}
/// Filter by manufacturer
pub fn with_manufacturer(mut self, manufacturer: impl Into<String>) -> Self {
self.manufacturer = Some(manufacturer.into());
self
}
/// Check if device matches this filter
pub fn matches(&self, device_info: &DeviceInfo) -> bool {
// Check capabilities
if let Some(required_caps) = &self.capabilities {
let device_caps = &device_info.config.capabilities;
if !required_caps.iter().all(|cap| device_caps.contains(cap)) {
return false;
}
}
// Check name pattern (simple substring match)
if let Some(pattern) = &self.name_pattern {
if !device_info.config.name.contains(pattern) {
return false;
}
}
// Check manufacturer
if let Some(manufacturer) = &self.manufacturer {
if device_info.manufacturer != *manufacturer {
return false;
}
}
// Check version (simple string comparison for now)
if let Some(min_version) = &self.min_version {
if device_info.version < *min_version {
return false;
}
}
true
}
}
impl Default for DiscoveryFilter {
fn default() -> Self {
Self::new()
}
}
/// Discovery protocol configuration
#[derive(Debug, Clone)]
pub struct DiscoveryConfig {
/// How often to send heartbeat messages (in seconds)
pub heartbeat_interval: Duration,
/// How long to wait before considering a device offline (in seconds)
pub device_timeout: Duration,
/// How often to clean up expired devices (in seconds)
pub cleanup_interval: Duration,
/// Maximum number of devices to track
pub max_devices: usize,
}
impl Default for DiscoveryConfig {
fn default() -> Self {
Self {
heartbeat_interval: Duration::from_secs(30),
device_timeout: Duration::from_secs(90),
cleanup_interval: Duration::from_secs(60),
max_devices: 1000,
}
}
}
/// Discovery protocol implementation
pub struct DiscoveryProtocol {
/// Local device information
local_device: DeviceInfo,
/// Known devices registry
known_devices: Arc<RwLock<HashMap<BusAddress, DeviceInfo>>>,
/// Configuration
config: DiscoveryConfig,
/// Message sender for bus communication
message_sender: Option<mpsc::UnboundedSender<BusMessage>>,
/// Discovery message receiver
discovery_receiver: Option<mpsc::UnboundedReceiver<DiscoveryMessage>>,
/// Running state
is_running: bool,
}
impl DiscoveryProtocol {
/// Create a new discovery protocol instance
pub fn new(local_device: DeviceInfo, config: DiscoveryConfig) -> Self {
Self {
local_device,
known_devices: Arc::new(RwLock::new(HashMap::new())),
config,
message_sender: None,
discovery_receiver: None,
is_running: false,
}
}
/// Set the message sender for bus communication
pub fn set_message_sender(&mut self, sender: mpsc::UnboundedSender<BusMessage>) {
self.message_sender = Some(sender);
}
/// Set the discovery message receiver
pub fn set_discovery_receiver(&mut self, receiver: mpsc::UnboundedReceiver<DiscoveryMessage>) {
self.discovery_receiver = Some(receiver);
}
/// Start the discovery protocol
pub async fn start(&mut self) -> Result<()> {
info!("Starting discovery protocol for device: {}", self.local_device.config.name);
self.is_running = true;
// Send initial announcement
self.announce_device().await?;
Ok(())
}
/// Stop the discovery protocol
pub async fn stop(&mut self) -> Result<()> {
info!("Stopping discovery protocol for device: {}", self.local_device.config.name);
self.is_running = false;
// Send goodbye message
self.send_goodbye().await?;
Ok(())
}
/// Announce this device to the network
pub async fn announce_device(&self) -> Result<()> {
let announcement = DiscoveryMessage::Announce {
device_info: self.local_device.clone(),
timestamp: SystemTime::now(),
};
self.send_discovery_message(announcement).await
}
/// Send heartbeat to maintain presence
pub async fn send_heartbeat(&self) -> Result<()> {
let heartbeat = DiscoveryMessage::Heartbeat {
device: self.local_device.address.clone(),
timestamp: SystemTime::now(),
};
self.send_discovery_message(heartbeat).await
}
/// Send goodbye message when going offline
pub async fn send_goodbye(&self) -> Result<()> {
let goodbye = DiscoveryMessage::Goodbye {
device: self.local_device.address.clone(),
timestamp: SystemTime::now(),
};
self.send_discovery_message(goodbye).await
}
/// Discover devices on the network
pub async fn discover_devices(&self, filter: Option<DiscoveryFilter>) -> Result<()> {
let discover_msg = DiscoveryMessage::Discover {
requester: self.local_device.address.clone(),
filter,
timestamp: SystemTime::now(),
};
self.send_discovery_message(discover_msg).await
}
/// Get all known devices
pub async fn get_known_devices(&self) -> Vec<DeviceInfo> {
let devices = self.known_devices.read().await;
devices.values().cloned().collect()
}
/// Get devices matching a filter
pub async fn get_devices_by_filter(&self, filter: &DiscoveryFilter) -> Vec<DeviceInfo> {
let devices = self.known_devices.read().await;
devices
.values()
.filter(|device| filter.matches(device))
.cloned()
.collect()
}
/// Get device by address
pub async fn get_device(&self, address: &BusAddress) -> Option<DeviceInfo> {
let devices = self.known_devices.read().await;
devices.get(address).cloned()
}
/// Handle incoming discovery message
pub async fn handle_discovery_message(&self, message: DiscoveryMessage) -> Result<()> {
match message {
DiscoveryMessage::Announce { device_info, .. } => {
self.handle_device_announcement(device_info).await
}
DiscoveryMessage::Discover { requester, filter, .. } => {
self.handle_discovery_request(requester, filter).await
}
DiscoveryMessage::DiscoverResponse { devices, .. } => {
self.handle_discovery_response(devices).await
}
DiscoveryMessage::Heartbeat { device, timestamp } => {
self.handle_heartbeat(device, timestamp).await
}
DiscoveryMessage::Goodbye { device, .. } => {
self.handle_goodbye(device).await
}
}
}
/// Handle device announcement
async fn handle_device_announcement(&self, device_info: DeviceInfo) -> Result<()> {
info!("Device announced: {}", device_info.config.name);
let mut devices = self.known_devices.write().await;
devices.insert(device_info.address.clone(), device_info);
Ok(())
}
/// Handle discovery request
async fn handle_discovery_request(
&self,
requester: BusAddress,
filter: Option<DiscoveryFilter>,
) -> Result<()> {
debug!("Discovery request from: {}", requester.name);
let devices = self.known_devices.read().await;
let mut matching_devices = vec![self.local_device.clone()]; // Include self
// Add matching known devices
for device in devices.values() {
if let Some(ref filter) = filter {
if filter.matches(device) {
matching_devices.push(device.clone());
}
} else {
matching_devices.push(device.clone());
}
}
drop(devices); // Release the lock
let response = DiscoveryMessage::DiscoverResponse {
devices: matching_devices,
responder: self.local_device.address.clone(),
timestamp: SystemTime::now(),
};
self.send_discovery_message(response).await
}
/// Handle discovery response
async fn handle_discovery_response(&self, devices: Vec<DeviceInfo>) -> Result<()> {
debug!("Received discovery response with {} devices", devices.len());
let mut known_devices = self.known_devices.write().await;
for device in devices {
// Don't add ourselves
if device.address != self.local_device.address {
known_devices.insert(device.address.clone(), device);
}
}
Ok(())
}
/// Handle heartbeat message
async fn handle_heartbeat(&self, device: BusAddress, timestamp: SystemTime) -> Result<()> {
debug!("Heartbeat from device: {}", device.name);
let mut devices = self.known_devices.write().await;
if let Some(device_info) = devices.get_mut(&device) {
device_info.last_seen = timestamp;
}
Ok(())
}
/// Handle goodbye message
async fn handle_goodbye(&self, device: BusAddress) -> Result<()> {
info!("Device going offline: {}", device.name);
let mut devices = self.known_devices.write().await;
devices.remove(&device);
Ok(())
}
/// Clean up expired devices
pub async fn cleanup_expired_devices(&self) -> Result<()> {
let now = SystemTime::now();
let timeout = self.config.device_timeout;
let mut devices = self.known_devices.write().await;
let mut expired_devices = Vec::new();
for (address, device_info) in devices.iter() {
if let Ok(elapsed) = now.duration_since(device_info.last_seen) {
if elapsed > timeout {
expired_devices.push(address.clone());
}
}
}
for address in expired_devices {
warn!("Removing expired device: {}", address.name);
devices.remove(&address);
}
Ok(())
}
/// Send discovery message over the bus
async fn send_discovery_message(&self, discovery_msg: DiscoveryMessage) -> Result<()> {
if let Some(sender) = &self.message_sender {
let payload = serde_json::to_vec(&discovery_msg)?;
let bus_message = BusMessage::Broadcast {
from: self.local_device.address.clone(),
payload,
message_id: Uuid::new_v4(),
};
sender.send(bus_message).map_err(|_| {
HardwareError::bus_communication("Failed to send discovery message")
})?;
} else {
return Err(HardwareError::generic("Discovery protocol not connected to bus"));
}
Ok(())
}
/// Run the discovery protocol main loop
pub async fn run(&mut self) -> Result<()> {
let mut heartbeat_timer = tokio::time::interval(self.config.heartbeat_interval);
let mut cleanup_timer = tokio::time::interval(self.config.cleanup_interval);
while self.is_running {
tokio::select! {
_ = heartbeat_timer.tick() => {
if let Err(e) = self.send_heartbeat().await {
warn!("Failed to send heartbeat: {}", e);
}
}
_ = cleanup_timer.tick() => {
if let Err(e) = self.cleanup_expired_devices().await {
warn!("Failed to cleanup expired devices: {}", e);
}
}
// Handle incoming discovery messages if receiver is set
msg = async {
if let Some(ref mut receiver) = self.discovery_receiver {
receiver.recv().await
} else {
std::future::pending().await
}
} => {
if let Some(discovery_msg) = msg {
if let Err(e) = self.handle_discovery_message(discovery_msg).await {
warn!("Failed to handle discovery message: {}", e);
}
}
}
}
}
Ok(())
}
/// Check if the protocol is running
pub fn is_running(&self) -> bool {
self.is_running
}
/// Get the number of known devices
pub async fn device_count(&self) -> usize {
let devices = self.known_devices.read().await;
devices.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{DeviceConfig, DeviceStatus};
fn create_test_device_info(name: &str) -> DeviceInfo {
DeviceInfo {
address: BusAddress::new(name),
config: DeviceConfig {
name: name.to_string(),
capabilities: vec![DeviceCapability::Gps],
..Default::default()
},
status: DeviceStatus::Online,
last_seen: SystemTime::now(),
version: "1.0.0".to_string(),
manufacturer: "Test Manufacturer".to_string(),
}
}
#[tokio::test]
async fn test_discovery_protocol_creation() {
let device_info = create_test_device_info("test_device");
let config = DiscoveryConfig::default();
let protocol = DiscoveryProtocol::new(device_info, config);
assert!(!protocol.is_running());
assert_eq!(protocol.device_count().await, 0);
}
#[tokio::test]
async fn test_device_announcement() {
let device_info = create_test_device_info("test_device");
let config = DiscoveryConfig::default();
let protocol = DiscoveryProtocol::new(device_info.clone(), config);
let other_device = create_test_device_info("other_device");
protocol.handle_device_announcement(other_device.clone()).await.unwrap();
let known_devices = protocol.get_known_devices().await;
assert_eq!(known_devices.len(), 1);
assert_eq!(known_devices[0].config.name, "other_device");
}
#[tokio::test]
async fn test_discovery_filter() {
let filter = DiscoveryFilter::new()
.with_capabilities(vec![DeviceCapability::Gps])
.with_name_pattern("test");
let device_info = create_test_device_info("test_device");
assert!(filter.matches(&device_info));
let other_device = DeviceInfo {
address: BusAddress::new("other"),
config: DeviceConfig {
name: "other".to_string(),
capabilities: vec![DeviceCapability::Radar],
..Default::default()
},
status: DeviceStatus::Online,
last_seen: SystemTime::now(),
version: "1.0.0".to_string(),
manufacturer: "Test".to_string(),
};
assert!(!filter.matches(&other_device));
}
#[tokio::test]
async fn test_device_cleanup() {
let device_info = create_test_device_info("test_device");
let mut config = DiscoveryConfig::default();
config.device_timeout = Duration::from_millis(100);
let protocol = DiscoveryProtocol::new(device_info, config);
// Add a device with old timestamp
let mut old_device = create_test_device_info("old_device");
old_device.last_seen = SystemTime::now() - Duration::from_secs(200);
protocol.handle_device_announcement(old_device).await.unwrap();
assert_eq!(protocol.device_count().await, 1);
// Wait and cleanup
tokio::time::sleep(Duration::from_millis(150)).await;
protocol.cleanup_expired_devices().await.unwrap();
assert_eq!(protocol.device_count().await, 0);
}
}

View File

@@ -0,0 +1,72 @@
//! Error types for the hardware abstraction layer
use thiserror::Error;
/// Result type alias for hardware operations
pub type Result<T> = std::result::Result<T, HardwareError>;
/// Common error types for hardware operations
#[derive(Error, Debug)]
pub enum HardwareError {
/// Device not found on the bus
#[error("Device not found: {device_id}")]
DeviceNotFound { device_id: String },
/// Bus communication error
#[error("Bus communication error: {message}")]
BusCommunicationError { message: String },
/// Device is not responding
#[error("Device not responding: {device_id}")]
DeviceNotResponding { device_id: String },
/// Invalid device capability
#[error("Invalid device capability: {capability}")]
InvalidCapability { capability: String },
/// Discovery protocol error
#[error("Discovery protocol error: {message}")]
DiscoveryError { message: String },
/// Device initialization error
#[error("Device initialization failed: {device_id}, reason: {reason}")]
InitializationError { device_id: String, reason: String },
/// Serialization/Deserialization error
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
/// Generic hardware error
#[error("Hardware error: {message}")]
Generic { message: String },
}
impl HardwareError {
/// Create a new generic hardware error
pub fn generic(message: impl Into<String>) -> Self {
Self::Generic {
message: message.into(),
}
}
/// Create a new bus communication error
pub fn bus_communication(message: impl Into<String>) -> Self {
Self::BusCommunicationError {
message: message.into(),
}
}
/// Create a new device not found error
pub fn device_not_found(device_id: impl Into<String>) -> Self {
Self::DeviceNotFound {
device_id: device_id.into(),
}
}
/// Create a new discovery error
pub fn discovery_error(message: impl Into<String>) -> Self {
Self::DiscoveryError {
message: message.into(),
}
}
}

View File

@@ -0,0 +1,27 @@
//! Virtual Hardware Abstraction Layer
//!
//! This crate provides a common abstraction for virtual hardware components
//! including a hardware bus, system devices, and discovery protocols.
#![allow(clippy::type_complexity)]
pub mod bus;
pub mod device;
pub mod discovery_protocol;
pub mod error;
// Re-export main types
pub use bus::{HardwareBus, BusMessage, BusAddress};
pub use device::{SystemDevice, DeviceCapability, DeviceStatus, DeviceInfo, DeviceConfig};
pub use discovery_protocol::{DiscoveryProtocol, DiscoveryMessage};
pub use error::{HardwareError, Result};
/// Common traits and types used throughout the hardware abstraction layer
pub mod prelude {
pub use crate::{
HardwareBus, BusMessage, BusAddress,
SystemDevice, DeviceCapability, DeviceStatus, DeviceInfo, DeviceConfig,
DiscoveryProtocol, DiscoveryMessage,
HardwareError, Result,
};
}