const aedes = require('aedes')(); const net = require('net'); const ws = require('ws'); const http = require('http'); const port = 1883; const wsPort = 9001; // TCP Server for MQTT const server = net.createServer(aedes.handle); // Logging für alle Nachrichten aedes.on('publish', (packet, client) => { if (client) { console.log(`[MQTT] Client ${client.id} published to topic: ${packet.topic}`); console.log(`[MQTT] Payload: ${packet.payload.toString()}`); } else { console.log(`[MQTT] Published to topic: ${packet.topic}`); console.log(`[MQTT] Payload: ${packet.payload.toString()}`); } }); // Client-Verbindungen aedes.on('client', (client) => { console.log(`[MQTT] Client connected: ${client.id}`); }); aedes.on('clientDisconnect', (client) => { console.log(`[MQTT] Client disconnected: ${client.id}`); }); // Fehlerbehandlung aedes.on('clientError', (client, err) => { console.error(`[MQTT] Client error for ${client.id}:`, err); }); // WebSocket Server for browser connections const httpServer = http.createServer(); const wsServer = new ws.Server({ server: httpServer, path: '/mqtt' }); wsServer.on('connection', (socket, req) => { // Create a proper stream adapter for Aedes const { Duplex } = require('stream'); const stream = new Duplex({ write(chunk, encoding, callback) { if (socket.readyState === ws.OPEN) { socket.send(chunk); callback(); } else { callback(new Error('WebSocket is not open')); } }, read() { // No-op: we push data when we receive it } }); // Handle incoming WebSocket messages socket.on('message', (data) => { stream.push(data); }); socket.on('error', (err) => { console.error('[MQTT] WebSocket error:', err); stream.destroy(err); }); socket.on('close', () => { console.log('[MQTT] WebSocket client disconnected'); stream.push(null); // End the stream }); // Handle stream errors stream.on('error', (err) => { console.error('[MQTT] Stream error:', err); if (socket.readyState === ws.OPEN) { socket.close(); } }); // Pass the stream to Aedes aedes.handle(stream); }); server.listen(port, () => { console.log(`[MQTT] TCP Broker started and listening on port ${port}`); console.log(`[MQTT] Ready to accept TCP connections`); }); httpServer.listen(wsPort, () => { console.log(`[MQTT] WebSocket Broker started and listening on port ${wsPort}`); console.log(`[MQTT] Ready to accept WebSocket connections at ws://localhost:${wsPort}/mqtt`); }); // Graceful shutdown process.on('SIGINT', () => { console.log('\n[MQTT] Shutting down broker...'); server.close(() => { console.log('[MQTT] TCP server closed'); }); httpServer.close(() => { console.log('[MQTT] WebSocket server closed'); process.exit(0); }); });