1 module tests.server;
2 
3 import unit_threaded;
4 import mqttd.server;
5 import mqttd.message;
6 import mqttd.broker;
7 import std.stdio, std.conv, std.algorithm, std.array, std.range;
8 import cerealed;
9 
10 
11 const (ubyte)[] connectionMsgBytes() pure nothrow {
12     return [ 0x10, 0x2a, //fixed header
13              0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', //protocol name
14              0x03, //protocol version
15              0xcc, //connection flags 1100111x username, pw, !wr, w(01), w, !c, x
16              0x00, 0x0a, //keepalive of 10
17              0x00, 0x03, 'c', 'i', 'd', //client ID
18              0x00, 0x04, 'w', 'i', 'l', 'l', //will topic
19              0x00, 0x04, 'w', 'm', 's', 'g', //will msg
20              0x00, 0x07, 'g', 'l', 'i', 'f', 't', 'e', 'l', //username
21              0x00, 0x02, 'p', 'w', //password
22         ];
23 }
24 
25 struct TestMqttConnection {
26     void send(in ubyte[] payload) {
27         writelnUt(&this, "  message: ", payload);
28         auto dec = Decerealiser(payload);
29         immutable fixedHeader = dec.value!MqttFixedHeader;
30         dec.reset;
31         switch(fixedHeader.type) with(MqttType) {
32             case CONNACK:
33                 code = dec.value!MqttConnack.code;
34                 if(code == MqttConnack.Code.ACCEPTED) connected = true;
35                 break;
36             case PUBLISH:
37                 auto msg = dec.value!MqttPublish;
38                 payloads ~= msg.payload.dup;
39                 break;
40 
41             default:
42                 break;
43         }
44 
45         lastBytes = payload.dup;
46     }
47 
48     void disconnect() { connected = false; }
49 
50     T lastMsg(T)() {
51         auto dec = Decerealiser(lastBytes);
52         auto fixedHeader = dec.value!MqttFixedHeader;
53         dec.reset;
54 
55         auto t = T(fixedHeader);
56         dec.grain(t);
57         return t;
58     }
59     alias Payload = ubyte[];
60 
61     const(ubyte)[] lastBytes;
62     Payload[] payloads;
63     bool connected = false;
64     MqttConnect connect;
65     MqttConnack.Code code = MqttConnack.Code.SERVER_UNAVAILABLE;
66 
67     static assert(isMqttSubscriber!TestMqttConnection);
68 }
69 
70 
71 void testConnect() {
72     auto server = MqttServer!TestMqttConnection();
73     auto connection = TestMqttConnection();
74     connection.connected.shouldBeFalse;
75     server.send(connection, connectionMsgBytes);
76     connection.code.shouldEqual(MqttConnack.Code.ACCEPTED);
77     connection.connected.shouldBeTrue;
78 }
79 
80 void testConnectBigId() {
81    auto server = MqttServer!TestMqttConnection();
82     ubyte[] bytes = [ 0x10, 0x3f, //fixed header
83                       0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', //protocol name
84                       0x03, //protocol version
85                       0xcc, //connection flags 1100111x username, pw, !wr, w(01), w, !c, x
86                       0x00, 0x0a, //keepalive of 10
87                       0x00, 0x18, 'c', 'i', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd',
88                                   'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', //24 char client id
89                       0x00, 0x04, 'w', 'i', 'l', 'l', //will topic
90                       0x00, 0x04, 'w', 'm', 's', 'g', //will msg
91                       0x00, 0x07, 'g', 'l', 'i', 'f', 't', 'e', 'l', //username
92                       0x00, 0x02, 'p', 'w', //password
93         ];
94 
95     auto connection = TestMqttConnection();
96     server.send(connection, bytes);
97     connection.connect.isBadClientId.shouldBeTrue;
98     connection.code.shouldEqual(MqttConnack.Code.BAD_ID);
99     connection.connected.shouldBeFalse;
100 }
101 
102 
103 void testConnectSmallId() {
104    auto server = MqttServer!TestMqttConnection();
105     ubyte[] bytes = [ 0x10, 0x27, //fixed header
106                       0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', //protocol name
107                       0x03, //protocol version
108                       0xcc, //connection flags 1100111x username, pw, !wr, w(01), w, !c, x
109                       0x00, 0x0a, //keepalive of 10
110                       0x00, 0x00, //no client id
111                       0x00, 0x04, 'w', 'i', 'l', 'l', //will topic
112                       0x00, 0x04, 'w', 'm', 's', 'g', //will msg
113                       0x00, 0x07, 'g', 'l', 'i', 'f', 't', 'e', 'l', //username
114                       0x00, 0x02, 'p', 'w', //password
115         ];
116 
117     auto connection = TestMqttConnection();
118     server.send(connection, bytes);
119     connection.connect.isBadClientId.shouldBeTrue;
120     connection.code.shouldEqual(MqttConnack.Code.BAD_ID);
121     connection.connected.shouldBeFalse;
122 }
123 
124 void publish(S)(ref MqttServer!S server, ref S connection, in string topic, in ubyte[] payload) if(isMqttSubscriber!S) {
125     MqttPublish(topic, payload).cerealise!(b => server.send(connection, b));
126 }
127 
128 void subscribe(S)(ref MqttServer!S server, ref S connection, in ushort msgId, in string[] topics) if(isMqttSubscriber!S) {
129     MqttSubscribe(msgId, topics.map!(a => MqttSubscribe.Topic(a, 0)).array).cerealise!(b => server.send(connection, b));
130 }
131 
132 void unsubscribe(S)(ref MqttServer!S server, ref S connection, in ushort msgId, in string[] topics) if(isMqttSubscriber!S) {
133     MqttUnsubscribe(msgId, topics).cerealise!(b => server.send(connection, b));
134 }
135 
136 
137 void testSubscribeWithMessage() {
138     auto server = MqttServer!TestMqttConnection();
139     auto connection = TestMqttConnection();
140 
141     server.send(connection, connectionMsgBytes);
142     connection.connected.shouldBeTrue;
143 
144     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4, 5, 6]);
145     shouldEqual(connection.payloads, []);
146 
147     ubyte[] bytes = [ 0x8b, 0x13, //fixed header
148                       0x00, 0x21, //message ID
149                       0x00, 0x05, 'f', 'i', 'r', 's', 't',
150                       0x01, //qos
151                       0x00, 0x06, 's', 'e', 'c', 'o', 'n', 'd',
152                       0x02, //qos
153         ];
154 
155     server.send(connection, bytes);
156     const suback = connection.lastMsg!MqttSuback;
157     shouldEqual(suback.msgId, 0x21);
158     shouldEqual(suback.qos, [1, 2]);
159 
160     bytes = [ 0x3c, 0x0d, //fixed header
161               0x00, 0x05, 'f', 'i', 'r', 's', 't',//topic name
162               0x00, 0x21, //message ID
163               1, 2, 3, 4 //payload
164         ];
165     server.send(connection, bytes);
166 
167     bytes = [ 0x3c, 0x0d, //fixed header
168               0x00, 0x06, 's', 'e', 'c', 'o', 'n', 'd',//topic name
169               0x00, 0x21, //message ID
170               9, 8, 7//payload
171         ];
172     server.send(connection, bytes); //publish
173 
174     bytes = [ 0x3c, 0x0c, //fixed header
175               0x00, 0x05, 't', 'h', 'i', 'r', 'd',//topic name
176               0x00, 0x21, //message ID
177               2, 4, 6, //payload
178         ];
179     server.send(connection, bytes); //publish
180 
181 
182     shouldEqual(connection.payloads, [[1, 2, 3, 4], [9, 8, 7]]);
183 }
184 
185 
186 void testPingWithMessage() {
187     auto server = MqttServer!TestMqttConnection();
188     auto connection = TestMqttConnection();
189 
190     server.send(connection, connectionMsgBytes);
191     server.send(connection, cast(ubyte[])[0xc0, 0x00]); //ping request
192     const pingResp = connection.lastMsg!MqttPingResp; //shouldn't throw
193 }
194 
195 
196 void testUnsubscribe() {
197     auto server = MqttServer!TestMqttConnection();
198     auto connection = TestMqttConnection();
199 
200     server.send(connection, connectionMsgBytes);
201 
202     server.subscribe(connection, 42, ["foo/bar/+"]);
203     const suback = connection.lastMsg!MqttSuback;
204 
205     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]);
206     server.publish(connection, "foo/boogagoo", [9, 8, 7]);
207     shouldEqual(connection.payloads, [[1, 2, 3, 4]]);
208 
209     server.unsubscribe(connection, 2, ["boo"]); //doesn't exist, so no effect
210     const unsuback1 = connection.lastMsg!MqttUnsuback;
211     shouldEqual(unsuback1.msgId, 2);
212 
213     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]);
214     server.publish(connection, "foo/boogagoo", [9, 8, 7]);
215     shouldEqual(connection.payloads, [[1, 2, 3, 4], [1, 2, 3, 4]]);
216 
217     server.unsubscribe(connection, 3, ["foo/bar/+"]);
218     const unsuback2 = connection.lastMsg!MqttUnsuback;
219     shouldEqual(unsuback2.msgId, 3);
220 
221     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]);
222     server.publish(connection, "foo/boogagoo", [9, 8, 7]);
223     shouldEqual(connection.payloads, [[1, 2, 3, 4], [1, 2, 3, 4]]); //shouldn't have changed
224 }
225 
226 void testUnsubscribeHandle() {
227     auto server = MqttServer!TestMqttConnection();
228     auto connection = TestMqttConnection();
229     server.send(connection, connectionMsgBytes);
230     server.subscribe(connection, 42, ["foo/bar/+"]);
231 
232     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]);
233     server.publish(connection, "foo/boogagoo", [9, 8, 7]);
234     shouldEqual(connection.payloads, [[1, 2, 3, 4]]);
235 
236     ubyte[] bytes = [ 0xa2, 0x0d, //fixed header
237                       0x00, 0x21, //message ID
238                       0x00, 0x09, 'f', 'o', 'o', '/', 'b', 'a', 'r', '/', '+',
239         ];
240 
241     server.send(connection, bytes);
242     const unsuback = connection.lastMsg!MqttUnsuback;
243     shouldEqual(unsuback.msgId, 33);
244 
245     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]);
246     server.publish(connection, "foo/boogagoo", [9, 8, 7]);
247     shouldEqual(connection.payloads, [[1, 2, 3, 4]]); //shouldn't have changed
248 }
249 
250 void testUnsubscribeAll() {
251     auto server = MqttServer!TestMqttConnection();
252     auto connection = TestMqttConnection();
253     server.send(connection, connectionMsgBytes);
254     server.subscribe(connection, 42, ["foo/bar/+"]);
255 
256     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]);
257     server.publish(connection, "foo/boogagoo", [9, 8, 7]);
258     shouldEqual(connection.payloads, [[1, 2, 3, 4]]);
259 
260     ubyte[] bytes = [ 0xa2, 0x0d, //fixed header
261                       0x00, 0x21, //message ID
262                       0x00, 0x09, 'f', 'o', 'o', '/', 'b', 'a', 'r', '/', '+',
263         ];
264 
265     server.send(connection, cast(ubyte[])[0xe0, 0]);
266 
267     server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]);
268     server.publish(connection, "foo/boogagoo", [9, 8, 7]);
269     shouldEqual(connection.payloads, [[1, 2, 3, 4]]); //shouldn't have changed, disconnected
270 }
271 
272 
273 void testSubscribeWildCard() {
274     import std.conv;
275     auto server = MqttServer!TestMqttConnection();
276     immutable numPairs = 2;
277     immutable numWilds = 2;
278     TestMqttConnection[numPairs] reqs;
279     TestMqttConnection[numPairs] reps;
280     TestMqttConnection[numWilds] wlds;
281 
282     foreach(i, ref wld; wlds)
283         server.subscribe(wld, cast(ushort)(i * 20 + 1), ["pingtest/0/#"]);
284 
285     foreach(i, ref req; reqs)
286         server.subscribe(req, cast(ushort)(i * 2), [text("pingtest/", i, "/request")]);
287 
288     foreach(i, ref rep; reps)
289         server.subscribe(rep, cast(ushort)(i * 2 + 1), [text("pingtest/", i, "/reply")]);
290 
291     foreach(ref c; reqs) c.payloads = [];
292     foreach(ref c; reps) c.payloads = [];
293     foreach(ref c; wlds) c.payloads = [];
294 
295     immutable numMessages = 2;
296     foreach(i; 0..numPairs) {
297         foreach(j; 0..numMessages) {
298             server.publish(reqs[0], text("pingtest/", i, "/request"), [0, 1, 2, 3]);
299             server.publish(reqs[0], text("pingtest/", i, "/reply"), [9, 8, 7]);
300         }
301     }
302 
303     foreach(ref req; reqs) {
304         writelnUt("checking payloads of ", &req);
305         req.payloads.shouldEqual([0, 1, 2, 3].repeat.take(numMessages));
306     }
307 
308     foreach(rep; reps)
309         rep.payloads.shouldEqual([9, 8, 7].repeat.take(numMessages));
310 
311     foreach(w; wlds)
312         shouldEqual(w.payloads.length, numMessages * 2);
313 }
314 
315 
316 void testDisconnect() {
317     auto server = MqttServer!TestMqttConnection();
318     auto connection = TestMqttConnection();
319 
320     connection.connected.shouldBeFalse;
321     server.send(connection, connectionMsgBytes);
322     connection.connected.shouldBeTrue;
323 
324     server.send(connection, cast(ubyte[])[0xe0, 0x00]); //disconnect
325     connection.connected.shouldBeFalse;
326 }