1 module mqttd.runtime; 2 3 4 shared static this() nothrow { 5 try 6 mainLoop; 7 catch(Exception e) { 8 import std.experimental.logger: error; 9 try 10 error("static ctor ERROR: ", e.msg); 11 catch(Exception _) 12 assert(0, "Could not long error"); 13 } 14 } 15 16 void mainLoop() { 17 18 import mqttd.log: error; 19 import mqttd.server: MqttServer; 20 import mqttd.tcp: MqttTcpConnection; 21 import vibe.d: listenTCP, TCPConnection; 22 import std.stdio: writeln; 23 import std.typecons: Yes, No; 24 import std.datetime: seconds; 25 import core.runtime: Runtime; 26 27 const useCache = Runtime.args.length > 1 ? Yes.useCache : No.useCache; 28 if(useCache) writeln("Enabling the cache"); 29 30 // debug { 31 // setLogLevel(LogLevel.debugV); 32 // } 33 34 auto server = MqttServer!(MqttTcpConnection)(useCache); 35 36 listenTCP( 37 1883, 38 (TCPConnection tcpConnection) { 39 try { 40 if (!tcpConnection.waitForData(10.seconds())) { 41 error("Client didn't send the initial request in a timely manner. Closing connection."); 42 } 43 44 auto mqttConnection = MqttTcpConnection(tcpConnection); 45 mqttConnection.run(server); 46 if(tcpConnection.connected) tcpConnection.close(); 47 } catch(Exception e) 48 error("Fatal error: ", e.msg); 49 50 } 51 ); 52 } 53 54 55 56 int vibemain() { 57 import vibe.core.core : runEventLoop, lowerPrivileges; 58 import vibe.core.log; 59 import std.encoding : sanitize; 60 61 lowerPrivileges(); 62 63 logDiagnostic("Running event loop..."); 64 int status; 65 version (VibeDebugCatchAll) { 66 try { 67 status = runEventLoop(); 68 } catch( Throwable th ){ 69 logError("Unhandled exception in event loop: %s", th.msg); 70 logDiagnostic("Full exception: %s", th.toString().sanitize()); 71 return 1; 72 } 73 } else { 74 status = runEventLoop(); 75 } 76 77 logDiagnostic("Event loop exited with status %d.", status); 78 return status; 79 }