1 module mqttd.tcp; 2 3 4 struct MqttTcpConnection { 5 6 import mqttd.server: MqttServer, isMqttConnection; 7 import mqttd.stream: MqttStream, isMqttInput; 8 import vibe.d: TCPConnection; 9 10 this(TCPConnection tcpConnection) @safe nothrow { 11 _tcpConnection = tcpConnection; 12 _connected = true; 13 enum bufferSize = 1024 * 512; 14 _stream = MqttStream(bufferSize); 15 } 16 17 void read(ubyte[] bytes) @safe { 18 _tcpConnection.read(bytes); 19 } 20 21 void send(in ubyte[] bytes) @safe { 22 if(connected) { 23 _tcpConnection.write(bytes); 24 } 25 } 26 27 void run(ref MqttServer!(typeof(this)) server) @safe { 28 import mqttd.log: error; 29 import std.datetime: seconds; 30 31 while(connected) { 32 if(!_tcpConnection.waitForData(60.seconds) ) { 33 error("Persistent connection timeout!"); 34 _connected = false; 35 break; 36 } 37 38 read(server); 39 } 40 41 _connected = false; 42 } 43 44 @property bool connected() @safe const { 45 return _tcpConnection.connected && _connected; 46 } 47 48 void disconnect() @safe { 49 _connected = false; 50 _tcpConnection.close(); 51 } 52 53 private: 54 55 TCPConnection _tcpConnection; 56 bool _connected; 57 MqttStream _stream; 58 59 void read(ref MqttServer!(typeof(this)) server) @safe { 60 import std.conv: text; 61 62 while(connected && !_tcpConnection.empty) { 63 if(_tcpConnection.leastSize > _stream.bufferSize) { 64 throw new Exception( 65 text("Too many bytes (", _tcpConnection.leastSize, 66 " for puny stream buffer (", _stream.bufferSize, ")")); 67 } 68 _stream.read(this, _tcpConnection.leastSize); 69 _stream.handleMessages(server, this); 70 } 71 } 72 73 static assert(isMqttConnection!(typeof(this))); 74 static assert(isMqttInput!(typeof(this))); 75 }