/** * @ruvector/edge-net Hybrid Sync Service * * Multi-device identity and ledger synchronization using: * - P2P sync via WebRTC (fast, direct when devices online together) * - Firestore sync (persistent fallback, cross-session) * - Identity linking via PiKey signatures * * @module @ruvector/edge-net/sync */ import { EventEmitter } from 'events'; import { createHash, randomBytes } from 'crypto'; // ============================================ // SYNC CONFIGURATION // ============================================ export const SYNC_CONFIG = { // Firestore endpoints (Genesis nodes) firestore: { projectId: 'ruvector-edge-net', collection: 'ledger-sync', identityCollection: 'identity-links', }, // Sync intervals intervals: { p2pHeartbeat: 5000, // 5s P2P sync check firestoreSync: 30000, // 30s Firestore sync staleThreshold: 60000, // 1min before considering state stale }, // CRDT merge settings crdt: { maxBatchSize: 1000, // Max entries per merge conflictResolution: 'lww', // Last-write-wins }, // Genesis node endpoints genesisNodes: [ { region: 'us-central1', url: 'https://edge-net-genesis-us.ruvector.dev' }, { region: 'europe-west1', url: 'https://edge-net-genesis-eu.ruvector.dev' }, { region: 'asia-east1', url: 'https://edge-net-genesis-asia.ruvector.dev' }, ], }; // ============================================ // IDENTITY LINKER // ============================================ /** * Links a PiKey identity across multiple devices * Uses cryptographic challenge-response to prove ownership */ export class IdentityLinker extends EventEmitter { constructor(piKey, options = {}) { super(); this.piKey = piKey; this.publicKeyHex = this.toHex(piKey.getPublicKey()); this.shortId = piKey.getShortId(); this.options = { genesisUrl: options.genesisUrl || SYNC_CONFIG.genesisNodes[0].url, ...options, }; this.linkedDevices = new Map(); this.authToken = null; this.deviceId = this.generateDeviceId(); } /** * Generate unique device ID */ generateDeviceId() { const platform = typeof window !== 'undefined' ? 'browser' : 'node'; const random = randomBytes(8).toString('hex'); const timestamp = Date.now().toString(36); return `${platform}-${timestamp}-${random}`; } /** * Authenticate with genesis node using PiKey signature */ async authenticate() { try { // Step 1: Request challenge const challengeRes = await this.fetchWithTimeout( `${this.options.genesisUrl}/api/v1/identity/challenge`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ publicKey: this.publicKeyHex, deviceId: this.deviceId, }), } ); if (!challengeRes.ok) { throw new Error(`Challenge request failed: ${challengeRes.status}`); } const { challenge, nonce } = await challengeRes.json(); // Step 2: Sign challenge with PiKey const challengeBytes = this.fromHex(challenge); const signature = this.piKey.sign(challengeBytes); // Step 3: Submit signature for verification const authRes = await this.fetchWithTimeout( `${this.options.genesisUrl}/api/v1/identity/verify`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ publicKey: this.publicKeyHex, deviceId: this.deviceId, nonce, signature: this.toHex(signature), }), } ); if (!authRes.ok) { throw new Error(`Authentication failed: ${authRes.status}`); } const { token, expiresAt, linkedDevices } = await authRes.json(); this.authToken = token; this.tokenExpiry = new Date(expiresAt); // Update linked devices for (const device of linkedDevices || []) { this.linkedDevices.set(device.deviceId, device); } this.emit('authenticated', { deviceId: this.deviceId, linkedDevices: this.linkedDevices.size, }); return { success: true, token, linkedDevices: this.linkedDevices.size }; } catch (error) { // Fallback: Generate local-only token for P2P sync console.warn('[Sync] Genesis authentication failed, using local mode:', error.message); this.authToken = this.generateLocalToken(); this.emit('authenticated', { deviceId: this.deviceId, mode: 'local' }); return { success: true, mode: 'local' }; } } /** * Generate local token for P2P-only mode */ generateLocalToken() { const payload = { sub: this.publicKeyHex, dev: this.deviceId, iat: Date.now(), mode: 'local', }; return Buffer.from(JSON.stringify(payload)).toString('base64'); } /** * Link a new device to this identity */ async linkDevice(deviceInfo) { if (!this.authToken) { await this.authenticate(); } try { const res = await this.fetchWithTimeout( `${this.options.genesisUrl}/api/v1/identity/link`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${this.authToken}`, }, body: JSON.stringify({ publicKey: this.publicKeyHex, newDevice: deviceInfo, }), } ); if (!res.ok) { throw new Error(`Link failed: ${res.status}`); } const result = await res.json(); this.linkedDevices.set(deviceInfo.deviceId, deviceInfo); this.emit('device_linked', { deviceId: deviceInfo.deviceId }); return result; } catch (error) { // P2P fallback: Store in local linked devices for gossip this.linkedDevices.set(deviceInfo.deviceId, { ...deviceInfo, linkedAt: Date.now(), mode: 'p2p', }); return { success: true, mode: 'p2p' }; } } /** * Get all linked devices */ getLinkedDevices() { return Array.from(this.linkedDevices.values()); } /** * Check if a device is linked to this identity */ isDeviceLinked(deviceId) { return this.linkedDevices.has(deviceId); } // Utility methods toHex(bytes) { return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join(''); } fromHex(hex) { const bytes = new Uint8Array(hex.length / 2); for (let i = 0; i < hex.length; i += 2) { bytes[i / 2] = parseInt(hex.substr(i, 2), 16); } return bytes; } async fetchWithTimeout(url, options, timeout = 10000) { const controller = new AbortController(); const id = setTimeout(() => controller.abort(), timeout); try { const response = await fetch(url, { ...options, signal: controller.signal }); clearTimeout(id); return response; } catch (error) { clearTimeout(id); throw error; } } } // ============================================ // LEDGER SYNC SERVICE // ============================================ /** * Hybrid sync service for credit ledger * Combines P2P (fast) and Firestore (persistent) sync */ export class LedgerSyncService extends EventEmitter { constructor(identityLinker, ledger, options = {}) { super(); this.identity = identityLinker; this.ledger = ledger; this.options = { enableP2P: true, enableFirestore: true, syncInterval: SYNC_CONFIG.intervals.firestoreSync, ...options, }; // Sync state this.lastSyncTime = 0; this.syncInProgress = false; this.pendingChanges = []; this.peerStates = new Map(); // deviceId -> { earned, spent, timestamp } this.vectorClock = new Map(); // deviceId -> counter // P2P connections this.p2pPeers = new Map(); // Intervals this.syncIntervalId = null; this.heartbeatId = null; } /** * Start sync service */ async start() { // Authenticate first await this.identity.authenticate(); // Start periodic sync if (this.options.enableFirestore) { this.syncIntervalId = setInterval( () => this.syncWithFirestore(), this.options.syncInterval ); } // Start P2P heartbeat if (this.options.enableP2P) { this.heartbeatId = setInterval( () => this.p2pHeartbeat(), SYNC_CONFIG.intervals.p2pHeartbeat ); } // Initial sync await this.fullSync(); this.emit('started', { deviceId: this.identity.deviceId }); return this; } /** * Stop sync service */ stop() { if (this.syncIntervalId) { clearInterval(this.syncIntervalId); this.syncIntervalId = null; } if (this.heartbeatId) { clearInterval(this.heartbeatId); this.heartbeatId = null; } this.emit('stopped'); } /** * Full sync - fetch from all sources and merge */ async fullSync() { if (this.syncInProgress) return; this.syncInProgress = true; try { const results = await Promise.allSettled([ this.options.enableFirestore ? this.fetchFromFirestore() : null, this.options.enableP2P ? this.fetchFromP2PPeers() : null, ]); // Merge all fetched states for (const result of results) { if (result.status === 'fulfilled' && result.value) { await this.mergeState(result.value); } } // Push our state await this.pushState(); this.lastSyncTime = Date.now(); this.emit('synced', { timestamp: this.lastSyncTime, balance: this.ledger.balance(), }); } catch (error) { this.emit('sync_error', { error: error.message }); } finally { this.syncInProgress = false; } } /** * Fetch ledger state from Firestore */ async fetchFromFirestore() { if (!this.identity.authToken) return null; try { const res = await this.identity.fetchWithTimeout( `${this.identity.options.genesisUrl}/api/v1/ledger/${this.identity.publicKeyHex}`, { method: 'GET', headers: { 'Authorization': `Bearer ${this.identity.authToken}`, }, } ); if (!res.ok) { if (res.status === 404) return null; // No state yet throw new Error(`Firestore fetch failed: ${res.status}`); } const { states } = await res.json(); return states; // Array of { deviceId, earned, spent, timestamp } } catch (error) { console.warn('[Sync] Firestore fetch failed:', error.message); return null; } } /** * Fetch ledger state from P2P peers */ async fetchFromP2PPeers() { const states = []; for (const [peerId, peer] of this.p2pPeers) { try { if (peer.dataChannel?.readyState === 'open') { const state = await this.requestStateFromPeer(peer); if (state) { states.push({ deviceId: peerId, ...state }); } } } catch (error) { console.warn(`[Sync] P2P fetch from ${peerId} failed:`, error.message); } } return states.length > 0 ? states : null; } /** * Request state from a P2P peer */ requestStateFromPeer(peer) { return new Promise((resolve, reject) => { const requestId = randomBytes(8).toString('hex'); const timeout = setTimeout(() => { reject(new Error('P2P state request timeout')); }, 5000); const handler = (event) => { try { const msg = JSON.parse(event.data); if (msg.type === 'ledger_state' && msg.requestId === requestId) { clearTimeout(timeout); peer.dataChannel.removeEventListener('message', handler); resolve(msg.state); } } catch (e) { /* ignore */ } }; peer.dataChannel.addEventListener('message', handler); peer.dataChannel.send(JSON.stringify({ type: 'ledger_state_request', requestId, from: this.identity.deviceId, })); }); } /** * Merge remote state into local ledger (CRDT) */ async mergeState(states) { if (!states || !Array.isArray(states)) return; for (const state of states) { // Skip our own state if (state.deviceId === this.identity.deviceId) continue; // Check vector clock for freshness const lastSeen = this.vectorClock.get(state.deviceId) || 0; if (state.timestamp <= lastSeen) continue; // CRDT merge try { if (state.earned && state.spent) { const earned = typeof state.earned === 'string' ? JSON.parse(state.earned) : state.earned; const spent = typeof state.spent === 'string' ? JSON.parse(state.spent) : state.spent; this.ledger.merge( JSON.stringify(earned), JSON.stringify(spent) ); } // Update vector clock this.vectorClock.set(state.deviceId, state.timestamp); this.peerStates.set(state.deviceId, state); this.emit('state_merged', { deviceId: state.deviceId, newBalance: this.ledger.balance(), }); } catch (error) { console.warn(`[Sync] Merge failed for ${state.deviceId}:`, error.message); } } } /** * Push local state to sync destinations */ async pushState() { const state = this.exportState(); // Push to Firestore if (this.options.enableFirestore && this.identity.authToken) { await this.pushToFirestore(state); } // Broadcast to P2P peers if (this.options.enableP2P) { this.broadcastToP2P(state); } } /** * Export current ledger state */ exportState() { return { deviceId: this.identity.deviceId, publicKey: this.identity.publicKeyHex, earned: this.ledger.exportEarned(), spent: this.ledger.exportSpent(), balance: this.ledger.balance(), totalEarned: this.ledger.totalEarned(), totalSpent: this.ledger.totalSpent(), timestamp: Date.now(), }; } /** * Push state to Firestore */ async pushToFirestore(state) { try { const res = await this.identity.fetchWithTimeout( `${this.identity.options.genesisUrl}/api/v1/ledger/${this.identity.publicKeyHex}`, { method: 'PUT', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${this.identity.authToken}`, }, body: JSON.stringify({ deviceId: state.deviceId, earned: state.earned, spent: state.spent, timestamp: state.timestamp, }), } ); if (!res.ok) { throw new Error(`Firestore push failed: ${res.status}`); } return true; } catch (error) { console.warn('[Sync] Firestore push failed:', error.message); return false; } } /** * Broadcast state to P2P peers */ broadcastToP2P(state) { const message = JSON.stringify({ type: 'ledger_state_broadcast', state: { deviceId: state.deviceId, earned: state.earned, spent: state.spent, timestamp: state.timestamp, }, }); for (const [peerId, peer] of this.p2pPeers) { try { if (peer.dataChannel?.readyState === 'open') { peer.dataChannel.send(message); } } catch (error) { console.warn(`[Sync] P2P broadcast to ${peerId} failed:`, error.message); } } } /** * P2P heartbeat - discover and sync with nearby devices */ async p2pHeartbeat() { // Broadcast presence to linked devices const presence = { type: 'presence', deviceId: this.identity.deviceId, publicKey: this.identity.publicKeyHex, balance: this.ledger.balance(), timestamp: Date.now(), }; for (const [peerId, peer] of this.p2pPeers) { try { if (peer.dataChannel?.readyState === 'open') { peer.dataChannel.send(JSON.stringify(presence)); } } catch (error) { // Remove stale peer this.p2pPeers.delete(peerId); } } } /** * Register a P2P peer for sync */ registerP2PPeer(peerId, dataChannel) { this.p2pPeers.set(peerId, { dataChannel, connectedAt: Date.now() }); // Handle incoming messages dataChannel.addEventListener('message', (event) => { this.handleP2PMessage(peerId, event.data); }); this.emit('peer_registered', { peerId }); } /** * Handle incoming P2P message */ async handleP2PMessage(peerId, data) { try { const msg = JSON.parse(data); switch (msg.type) { case 'ledger_state_request': // Respond with our state const state = this.exportState(); const peer = this.p2pPeers.get(peerId); if (peer?.dataChannel?.readyState === 'open') { peer.dataChannel.send(JSON.stringify({ type: 'ledger_state', requestId: msg.requestId, state: { earned: state.earned, spent: state.spent, timestamp: state.timestamp, }, })); } break; case 'ledger_state_broadcast': // Merge incoming state if (msg.state) { await this.mergeState([{ deviceId: peerId, ...msg.state }]); } break; case 'presence': // Update peer info const existingPeer = this.p2pPeers.get(peerId); if (existingPeer) { existingPeer.lastSeen = Date.now(); existingPeer.balance = msg.balance; } break; } } catch (error) { console.warn(`[Sync] P2P message handling failed:`, error.message); } } /** * Sync with Firestore (called periodically) */ async syncWithFirestore() { if (this.syncInProgress) return; try { const states = await this.fetchFromFirestore(); if (states) { await this.mergeState(states); } await this.pushToFirestore(this.exportState()); } catch (error) { console.warn('[Sync] Periodic Firestore sync failed:', error.message); } } /** * Force immediate sync */ async forceSync() { return this.fullSync(); } /** * Get sync status */ getStatus() { return { deviceId: this.identity.deviceId, publicKey: this.identity.publicKeyHex, shortId: this.identity.shortId, linkedDevices: this.identity.getLinkedDevices().length, p2pPeers: this.p2pPeers.size, lastSyncTime: this.lastSyncTime, balance: this.ledger.balance(), totalEarned: this.ledger.totalEarned(), totalSpent: this.ledger.totalSpent(), syncEnabled: { p2p: this.options.enableP2P, firestore: this.options.enableFirestore, }, }; } } // ============================================ // SYNC MANAGER (CONVENIENCE WRAPPER) // ============================================ /** * High-level sync manager for easy integration */ export class SyncManager extends EventEmitter { constructor(piKey, ledger, options = {}) { super(); this.identityLinker = new IdentityLinker(piKey, options); this.syncService = new LedgerSyncService(this.identityLinker, ledger, options); // Forward events this.syncService.on('synced', (data) => this.emit('synced', data)); this.syncService.on('state_merged', (data) => this.emit('state_merged', data)); this.syncService.on('sync_error', (data) => this.emit('sync_error', data)); this.identityLinker.on('authenticated', (data) => this.emit('authenticated', data)); this.identityLinker.on('device_linked', (data) => this.emit('device_linked', data)); } /** * Start sync */ async start() { await this.syncService.start(); return this; } /** * Stop sync */ stop() { this.syncService.stop(); } /** * Force sync */ async sync() { return this.syncService.forceSync(); } /** * Register P2P peer */ registerPeer(peerId, dataChannel) { this.syncService.registerP2PPeer(peerId, dataChannel); } /** * Get status */ getStatus() { return this.syncService.getStatus(); } /** * Export identity for another device */ exportIdentity(password) { return this.identityLinker.piKey.createEncryptedBackup(password); } /** * Link devices via QR code data */ generateLinkData() { return { publicKey: this.identityLinker.publicKeyHex, shortId: this.identityLinker.shortId, genesisUrl: this.identityLinker.options.genesisUrl, timestamp: Date.now(), }; } } // ============================================ // EXPORTS // ============================================ export default SyncManager;