This commit is contained in:
108
mock-server/mqtt_broker.js
Normal file
108
mock-server/mqtt_broker.js
Normal file
@@ -0,0 +1,108 @@
|
||||
const aedes = require('aedes')();
|
||||
const net = require('net');
|
||||
const ws = require('ws');
|
||||
const http = require('http');
|
||||
const port = 1883;
|
||||
const wsPort = 9001;
|
||||
|
||||
// TCP Server for MQTT
|
||||
const server = net.createServer(aedes.handle);
|
||||
|
||||
// Logging für alle Nachrichten
|
||||
aedes.on('publish', (packet, client) => {
|
||||
if (client) {
|
||||
console.log(`[MQTT] Client ${client.id} published to topic: ${packet.topic}`);
|
||||
console.log(`[MQTT] Payload: ${packet.payload.toString()}`);
|
||||
} else {
|
||||
console.log(`[MQTT] Published to topic: ${packet.topic}`);
|
||||
console.log(`[MQTT] Payload: ${packet.payload.toString()}`);
|
||||
}
|
||||
});
|
||||
|
||||
// Client-Verbindungen
|
||||
aedes.on('client', (client) => {
|
||||
console.log(`[MQTT] Client connected: ${client.id}`);
|
||||
});
|
||||
|
||||
aedes.on('clientDisconnect', (client) => {
|
||||
console.log(`[MQTT] Client disconnected: ${client.id}`);
|
||||
});
|
||||
|
||||
// Fehlerbehandlung
|
||||
aedes.on('clientError', (client, err) => {
|
||||
console.error(`[MQTT] Client error for ${client.id}:`, err);
|
||||
});
|
||||
|
||||
// WebSocket Server for browser connections
|
||||
const httpServer = http.createServer();
|
||||
const wsServer = new ws.Server({
|
||||
server: httpServer,
|
||||
path: '/mqtt'
|
||||
});
|
||||
|
||||
wsServer.on('connection', (socket, req) => {
|
||||
// Create a proper stream adapter for Aedes
|
||||
const { Duplex } = require('stream');
|
||||
|
||||
const stream = new Duplex({
|
||||
write(chunk, encoding, callback) {
|
||||
if (socket.readyState === ws.OPEN) {
|
||||
socket.send(chunk);
|
||||
callback();
|
||||
} else {
|
||||
callback(new Error('WebSocket is not open'));
|
||||
}
|
||||
},
|
||||
read() {
|
||||
// No-op: we push data when we receive it
|
||||
}
|
||||
});
|
||||
|
||||
// Handle incoming WebSocket messages
|
||||
socket.on('message', (data) => {
|
||||
stream.push(data);
|
||||
});
|
||||
|
||||
socket.on('error', (err) => {
|
||||
console.error('[MQTT] WebSocket error:', err);
|
||||
stream.destroy(err);
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
console.log('[MQTT] WebSocket client disconnected');
|
||||
stream.push(null); // End the stream
|
||||
});
|
||||
|
||||
// Handle stream errors
|
||||
stream.on('error', (err) => {
|
||||
console.error('[MQTT] Stream error:', err);
|
||||
if (socket.readyState === ws.OPEN) {
|
||||
socket.close();
|
||||
}
|
||||
});
|
||||
|
||||
// Pass the stream to Aedes
|
||||
aedes.handle(stream);
|
||||
});
|
||||
|
||||
server.listen(port, () => {
|
||||
console.log(`[MQTT] TCP Broker started and listening on port ${port}`);
|
||||
console.log(`[MQTT] Ready to accept TCP connections`);
|
||||
});
|
||||
|
||||
httpServer.listen(wsPort, () => {
|
||||
console.log(`[MQTT] WebSocket Broker started and listening on port ${wsPort}`);
|
||||
console.log(`[MQTT] Ready to accept WebSocket connections at ws://localhost:${wsPort}/mqtt`);
|
||||
});
|
||||
|
||||
// Graceful shutdown
|
||||
process.on('SIGINT', () => {
|
||||
console.log('\n[MQTT] Shutting down broker...');
|
||||
server.close(() => {
|
||||
console.log('[MQTT] TCP server closed');
|
||||
});
|
||||
httpServer.close(() => {
|
||||
console.log('[MQTT] WebSocket server closed');
|
||||
process.exit(0);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user