1 module mqttd.stream;
2 
3 
4 enum isMqttInput(T) = is(typeof(() {
5     ubyte[] bytes;
6     auto t = T.init;
7     t.read(bytes);
8 }));
9 
10 @safe:
11 
12 
13 /**
14    Abstracts a stream protocol such as TCP so that we can get discrete
15    MQTT messages out of it.
16  */
17 struct MqttStream {
18 
19     import mqttd.server: MqttServer;
20     import mqttd.broker: isMqttSubscriber;
21 
22     this(int bufferSize) pure nothrow {
23         _buffer = new ubyte[bufferSize];
24         _bytes = _buffer[0..0];
25     }
26 
27     void opOpAssign(string op: "~")(ubyte[] bytes) {
28         struct Input {
29             void read(ubyte[] buf) {
30                 import std.algorithm: copy;
31                 copy(bytes, buf);
32             }
33             static assert(isMqttInput!Input);
34         }
35         read(new Input, bytes.length);
36     }
37 
38     void read(T)(auto ref T input, size_t size) @trusted if(isMqttInput!T) {
39         resetBuffer;
40 
41         immutable end = _bytesRead + size;
42         input.read(_buffer[_bytesRead .. end]);
43 
44         _bytesRead += size;
45         _bytes = _buffer[0 .. _bytesRead];
46 
47         updateLastMessageSize;
48     }
49 
50 
51     bool hasMessages() pure nothrow {
52         import mqttd.message: MqttFixedHeader;
53         return
54             _lastMessageSize >= MqttFixedHeader.SIZE
55             && _bytes.length >= _lastMessageSize
56             ;
57     }
58 
59     const(ubyte)[] popNextMessageBytes() {
60         if(!hasMessages) return [];
61 
62         auto ret = nextMessageBytes;
63         _bytes = _bytes[ret.length .. $];
64 
65         updateLastMessageSize;
66         return ret;
67     }
68 
69     void handleMessages(T)(ref MqttServer!T server, ref T connection) @trusted if(isMqttSubscriber!T) {
70         while(hasMessages) server.send(connection, popNextMessageBytes);
71     }
72 
73     auto bufferSize() const pure nothrow @safe {
74         return _buffer.length;
75     }
76 
77 private:
78 
79     ubyte[] _buffer; //the underlying storage
80     ubyte[] _bytes; //the current bytes held
81     int _lastMessageSize;
82     int _bytesStart; //the starting position
83     ulong _bytesRead; //what it says
84 
85     void updateLastMessageSize() {
86         _lastMessageSize = nextMessageSize;
87     }
88 
89     const(ubyte)[] nextMessageBytes() const {
90         return _bytes[0 .. nextMessageSize];
91     }
92 
93     int nextMessageSize() const {
94         import mqttd.message: MqttFixedHeader;
95         import cerealed: Decerealiser;
96 
97         if(_bytes.length < MqttFixedHeader.SIZE) return 0;
98 
99         auto dec = Decerealiser(_bytes);
100         return dec.value!MqttFixedHeader.remaining + MqttFixedHeader.SIZE;
101     }
102 
103     //@trusted because of copy
104     void resetBuffer() @trusted pure nothrow {
105         import std.algorithm: copy;
106 
107         copy(_bytes, _buffer);
108         _bytesRead = _bytes.length;
109         _bytes = _buffer[0 .. _bytesRead];
110     }
111 }
112 
113 
114 /**
115    Satisfies the `isMqttConnection` interface for streams (e.g. TCP)
116    Delegates actually reading or writing data to a templated channel.
117  */
118 struct MqttStreamConnection(Channel) {
119 
120     import mqttd.server: MqttServer, isMqttConnection;
121     import mqttd.stream: MqttStream, isMqttInput;
122 
123     this(Channel channel) {
124         _channel = channel;
125         _connected = true;
126         enum bufferSize = 1024 * 512;
127         _stream = MqttStream(bufferSize);
128     }
129 
130     void read(ubyte[] bytes) {
131         _channel.read(bytes);
132     }
133 
134     void send(in ubyte[] bytes) {
135         if(connected) {
136             _channel.write(bytes);
137         }
138     }
139 
140     void run(ref MqttServer!(typeof(this)) server) {
141         import mqttd.log: error;
142         import std.datetime: seconds;
143 
144         while(connected) {
145             if(!_channel.waitForData(60.seconds) ) {
146                 error("Persistent connection timeout!");
147                 _connected = false;
148                 break;
149             }
150 
151             read(server);
152         }
153 
154         _connected = false;
155     }
156 
157     @property bool connected() const {
158         return _channel.connected && _connected;
159     }
160 
161     void disconnect() {
162         _connected = false;
163         _channel.close();
164     }
165 
166 private:
167 
168     Channel _channel;
169     bool _connected;
170     MqttStream _stream;
171 
172     void read(ref MqttServer!(typeof(this)) server) {
173         import std.conv: text;
174 
175         while(connected && !_channel.empty) {
176             if(_channel.leastSize > _stream.bufferSize) {
177                 throw new Exception(
178                     text("Too many bytes (", _channel.leastSize,
179                          " for puny stream buffer (", _stream.bufferSize, ")"));
180             }
181             _stream.read(this, _channel.leastSize);
182             _stream.handleMessages(server, this);
183         }
184     }
185 
186     static assert(isMqttConnection!(typeof(this)));
187     static assert(isMqttInput!(typeof(this)));
188 }