109 lines
2.8 KiB
JavaScript
109 lines
2.8 KiB
JavaScript
const aedes = require('aedes')();
|
|
const net = require('net');
|
|
const ws = require('ws');
|
|
const http = require('http');
|
|
const port = 1883;
|
|
const wsPort = 9001;
|
|
|
|
// TCP Server for MQTT
|
|
const server = net.createServer(aedes.handle);
|
|
|
|
// Logging für alle Nachrichten
|
|
aedes.on('publish', (packet, client) => {
|
|
if (client) {
|
|
console.log(`[MQTT] Client ${client.id} published to topic: ${packet.topic}`);
|
|
console.log(`[MQTT] Payload: ${packet.payload.toString()}`);
|
|
} else {
|
|
console.log(`[MQTT] Published to topic: ${packet.topic}`);
|
|
console.log(`[MQTT] Payload: ${packet.payload.toString()}`);
|
|
}
|
|
});
|
|
|
|
// Client-Verbindungen
|
|
aedes.on('client', (client) => {
|
|
console.log(`[MQTT] Client connected: ${client.id}`);
|
|
});
|
|
|
|
aedes.on('clientDisconnect', (client) => {
|
|
console.log(`[MQTT] Client disconnected: ${client.id}`);
|
|
});
|
|
|
|
// Fehlerbehandlung
|
|
aedes.on('clientError', (client, err) => {
|
|
console.error(`[MQTT] Client error for ${client.id}:`, err);
|
|
});
|
|
|
|
// WebSocket Server for browser connections
|
|
const httpServer = http.createServer();
|
|
const wsServer = new ws.Server({
|
|
server: httpServer,
|
|
path: '/mqtt'
|
|
});
|
|
|
|
wsServer.on('connection', (socket, req) => {
|
|
// Create a proper stream adapter for Aedes
|
|
const { Duplex } = require('stream');
|
|
|
|
const stream = new Duplex({
|
|
write(chunk, encoding, callback) {
|
|
if (socket.readyState === ws.OPEN) {
|
|
socket.send(chunk);
|
|
callback();
|
|
} else {
|
|
callback(new Error('WebSocket is not open'));
|
|
}
|
|
},
|
|
read() {
|
|
// No-op: we push data when we receive it
|
|
}
|
|
});
|
|
|
|
// Handle incoming WebSocket messages
|
|
socket.on('message', (data) => {
|
|
stream.push(data);
|
|
});
|
|
|
|
socket.on('error', (err) => {
|
|
console.error('[MQTT] WebSocket error:', err);
|
|
stream.destroy(err);
|
|
});
|
|
|
|
socket.on('close', () => {
|
|
console.log('[MQTT] WebSocket client disconnected');
|
|
stream.push(null); // End the stream
|
|
});
|
|
|
|
// Handle stream errors
|
|
stream.on('error', (err) => {
|
|
console.error('[MQTT] Stream error:', err);
|
|
if (socket.readyState === ws.OPEN) {
|
|
socket.close();
|
|
}
|
|
});
|
|
|
|
// Pass the stream to Aedes
|
|
aedes.handle(stream);
|
|
});
|
|
|
|
server.listen(port, () => {
|
|
console.log(`[MQTT] TCP Broker started and listening on port ${port}`);
|
|
console.log(`[MQTT] Ready to accept TCP connections`);
|
|
});
|
|
|
|
httpServer.listen(wsPort, () => {
|
|
console.log(`[MQTT] WebSocket Broker started and listening on port ${wsPort}`);
|
|
console.log(`[MQTT] Ready to accept WebSocket connections at ws://localhost:${wsPort}/mqtt`);
|
|
});
|
|
|
|
// Graceful shutdown
|
|
process.on('SIGINT', () => {
|
|
console.log('\n[MQTT] Shutting down broker...');
|
|
server.close(() => {
|
|
console.log('[MQTT] TCP server closed');
|
|
});
|
|
httpServer.close(() => {
|
|
console.log('[MQTT] WebSocket server closed');
|
|
process.exit(0);
|
|
});
|
|
});
|