1 module mqttd.tcp;
2 
3 
4 struct MqttTcpConnection {
5 
6     import mqttd.server: MqttServer, isMqttConnection;
7     import mqttd.stream: MqttStream, isMqttInput;
8     import vibe.d: TCPConnection;
9 
10     this(TCPConnection tcpConnection) @safe nothrow {
11         _tcpConnection = tcpConnection;
12         _connected = true;
13         enum bufferSize = 1024 * 512;
14         _stream = MqttStream(bufferSize);
15     }
16 
17     void read(ubyte[] bytes) @safe {
18         _tcpConnection.read(bytes);
19     }
20 
21     void send(in ubyte[] bytes) @safe {
22         if(connected) {
23             _tcpConnection.write(bytes);
24         }
25     }
26 
27     void run(ref MqttServer!(typeof(this)) server) @safe {
28         import mqttd.log: error;
29         import std.datetime: seconds;
30 
31         while(connected) {
32             if(!_tcpConnection.waitForData(60.seconds) ) {
33                 error("Persistent connection timeout!");
34                 _connected = false;
35                 break;
36             }
37 
38             read(server);
39         }
40 
41         _connected = false;
42     }
43 
44     @property bool connected() @safe const {
45         return _tcpConnection.connected && _connected;
46     }
47 
48     void disconnect() @safe {
49         _connected = false;
50         _tcpConnection.close();
51     }
52 
53 private:
54 
55     TCPConnection _tcpConnection;
56     bool _connected;
57     MqttStream _stream;
58 
59     void read(ref MqttServer!(typeof(this)) server) @safe {
60         import std.conv: text;
61 
62         while(connected && !_tcpConnection.empty) {
63             if(_tcpConnection.leastSize > _stream.bufferSize) {
64                 throw new Exception(
65                     text("Too many bytes (", _tcpConnection.leastSize,
66                          " for puny stream buffer (", _stream.bufferSize, ")"));
67             }
68             _stream.read(this, _tcpConnection.leastSize);
69             _stream.handleMessages(server, this);
70         }
71     }
72 
73     static assert(isMqttConnection!(typeof(this)));
74     static assert(isMqttInput!(typeof(this)));
75 }