1 module tests.server; 2 3 import unit_threaded; 4 import mqttd.server; 5 import mqttd.message; 6 import mqttd.broker; 7 import std.stdio, std.conv, std.algorithm, std.array, std.range; 8 import cerealed; 9 10 11 const (ubyte)[] connectionMsgBytes() pure nothrow { 12 return [ 0x10, 0x2a, //fixed header 13 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', //protocol name 14 0x03, //protocol version 15 0xcc, //connection flags 1100111x username, pw, !wr, w(01), w, !c, x 16 0x00, 0x0a, //keepalive of 10 17 0x00, 0x03, 'c', 'i', 'd', //client ID 18 0x00, 0x04, 'w', 'i', 'l', 'l', //will topic 19 0x00, 0x04, 'w', 'm', 's', 'g', //will msg 20 0x00, 0x07, 'g', 'l', 'i', 'f', 't', 'e', 'l', //username 21 0x00, 0x02, 'p', 'w', //password 22 ]; 23 } 24 25 struct TestMqttConnection { 26 void send(in ubyte[] payload) { 27 writelnUt(&this, " message: ", payload); 28 auto dec = Decerealiser(payload); 29 immutable fixedHeader = dec.value!MqttFixedHeader; 30 dec.reset; 31 switch(fixedHeader.type) with(MqttType) { 32 case CONNACK: 33 code = dec.value!MqttConnack.code; 34 if(code == MqttConnack.Code.ACCEPTED) connected = true; 35 break; 36 case PUBLISH: 37 auto msg = dec.value!MqttPublish; 38 payloads ~= msg.payload.dup; 39 break; 40 41 default: 42 break; 43 } 44 45 lastBytes = payload.dup; 46 } 47 48 void disconnect() { connected = false; } 49 50 T lastMsg(T)() { 51 auto dec = Decerealiser(lastBytes); 52 auto fixedHeader = dec.value!MqttFixedHeader; 53 dec.reset; 54 55 auto t = T(fixedHeader); 56 dec.grain(t); 57 return t; 58 } 59 alias Payload = ubyte[]; 60 61 const(ubyte)[] lastBytes; 62 Payload[] payloads; 63 bool connected = false; 64 MqttConnect connect; 65 MqttConnack.Code code = MqttConnack.Code.SERVER_UNAVAILABLE; 66 67 static assert(isMqttSubscriber!TestMqttConnection); 68 } 69 70 71 void testConnect() { 72 auto server = MqttServer!TestMqttConnection(); 73 auto connection = TestMqttConnection(); 74 connection.connected.shouldBeFalse; 75 server.send(connection, connectionMsgBytes); 76 connection.code.shouldEqual(MqttConnack.Code.ACCEPTED); 77 connection.connected.shouldBeTrue; 78 } 79 80 void testConnectBigId() { 81 auto server = MqttServer!TestMqttConnection(); 82 ubyte[] bytes = [ 0x10, 0x3f, //fixed header 83 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', //protocol name 84 0x03, //protocol version 85 0xcc, //connection flags 1100111x username, pw, !wr, w(01), w, !c, x 86 0x00, 0x0a, //keepalive of 10 87 0x00, 0x18, 'c', 'i', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 88 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', 'd', //24 char client id 89 0x00, 0x04, 'w', 'i', 'l', 'l', //will topic 90 0x00, 0x04, 'w', 'm', 's', 'g', //will msg 91 0x00, 0x07, 'g', 'l', 'i', 'f', 't', 'e', 'l', //username 92 0x00, 0x02, 'p', 'w', //password 93 ]; 94 95 auto connection = TestMqttConnection(); 96 server.send(connection, bytes); 97 connection.connect.isBadClientId.shouldBeTrue; 98 connection.code.shouldEqual(MqttConnack.Code.BAD_ID); 99 connection.connected.shouldBeFalse; 100 } 101 102 103 void testConnectSmallId() { 104 auto server = MqttServer!TestMqttConnection(); 105 ubyte[] bytes = [ 0x10, 0x27, //fixed header 106 0x00, 0x06, 'M', 'Q', 'I', 's', 'd', 'p', //protocol name 107 0x03, //protocol version 108 0xcc, //connection flags 1100111x username, pw, !wr, w(01), w, !c, x 109 0x00, 0x0a, //keepalive of 10 110 0x00, 0x00, //no client id 111 0x00, 0x04, 'w', 'i', 'l', 'l', //will topic 112 0x00, 0x04, 'w', 'm', 's', 'g', //will msg 113 0x00, 0x07, 'g', 'l', 'i', 'f', 't', 'e', 'l', //username 114 0x00, 0x02, 'p', 'w', //password 115 ]; 116 117 auto connection = TestMqttConnection(); 118 server.send(connection, bytes); 119 connection.connect.isBadClientId.shouldBeTrue; 120 connection.code.shouldEqual(MqttConnack.Code.BAD_ID); 121 connection.connected.shouldBeFalse; 122 } 123 124 void publish(S)(ref MqttServer!S server, ref S connection, in string topic, in ubyte[] payload) if(isMqttSubscriber!S) { 125 MqttPublish(topic, payload).cerealise!(b => server.send(connection, b)); 126 } 127 128 void subscribe(S)(ref MqttServer!S server, ref S connection, in ushort msgId, in string[] topics) if(isMqttSubscriber!S) { 129 MqttSubscribe(msgId, topics.map!(a => MqttSubscribe.Topic(a, 0)).array).cerealise!(b => server.send(connection, b)); 130 } 131 132 void unsubscribe(S)(ref MqttServer!S server, ref S connection, in ushort msgId, in string[] topics) if(isMqttSubscriber!S) { 133 MqttUnsubscribe(msgId, topics).cerealise!(b => server.send(connection, b)); 134 } 135 136 137 void testSubscribeWithMessage() { 138 auto server = MqttServer!TestMqttConnection(); 139 auto connection = TestMqttConnection(); 140 141 server.send(connection, connectionMsgBytes); 142 connection.connected.shouldBeTrue; 143 144 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4, 5, 6]); 145 shouldEqual(connection.payloads, []); 146 147 ubyte[] bytes = [ 0x8b, 0x13, //fixed header 148 0x00, 0x21, //message ID 149 0x00, 0x05, 'f', 'i', 'r', 's', 't', 150 0x01, //qos 151 0x00, 0x06, 's', 'e', 'c', 'o', 'n', 'd', 152 0x02, //qos 153 ]; 154 155 server.send(connection, bytes); 156 const suback = connection.lastMsg!MqttSuback; 157 shouldEqual(suback.msgId, 0x21); 158 shouldEqual(suback.qos, [1, 2]); 159 160 bytes = [ 0x3c, 0x0d, //fixed header 161 0x00, 0x05, 'f', 'i', 'r', 's', 't',//topic name 162 0x00, 0x21, //message ID 163 1, 2, 3, 4 //payload 164 ]; 165 server.send(connection, bytes); 166 167 bytes = [ 0x3c, 0x0d, //fixed header 168 0x00, 0x06, 's', 'e', 'c', 'o', 'n', 'd',//topic name 169 0x00, 0x21, //message ID 170 9, 8, 7//payload 171 ]; 172 server.send(connection, bytes); //publish 173 174 bytes = [ 0x3c, 0x0c, //fixed header 175 0x00, 0x05, 't', 'h', 'i', 'r', 'd',//topic name 176 0x00, 0x21, //message ID 177 2, 4, 6, //payload 178 ]; 179 server.send(connection, bytes); //publish 180 181 182 shouldEqual(connection.payloads, [[1, 2, 3, 4], [9, 8, 7]]); 183 } 184 185 186 void testPingWithMessage() { 187 auto server = MqttServer!TestMqttConnection(); 188 auto connection = TestMqttConnection(); 189 190 server.send(connection, connectionMsgBytes); 191 server.send(connection, cast(ubyte[])[0xc0, 0x00]); //ping request 192 const pingResp = connection.lastMsg!MqttPingResp; //shouldn't throw 193 } 194 195 196 void testUnsubscribe() { 197 auto server = MqttServer!TestMqttConnection(); 198 auto connection = TestMqttConnection(); 199 200 server.send(connection, connectionMsgBytes); 201 202 server.subscribe(connection, 42, ["foo/bar/+"]); 203 const suback = connection.lastMsg!MqttSuback; 204 205 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]); 206 server.publish(connection, "foo/boogagoo", [9, 8, 7]); 207 shouldEqual(connection.payloads, [[1, 2, 3, 4]]); 208 209 server.unsubscribe(connection, 2, ["boo"]); //doesn't exist, so no effect 210 const unsuback1 = connection.lastMsg!MqttUnsuback; 211 shouldEqual(unsuback1.msgId, 2); 212 213 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]); 214 server.publish(connection, "foo/boogagoo", [9, 8, 7]); 215 shouldEqual(connection.payloads, [[1, 2, 3, 4], [1, 2, 3, 4]]); 216 217 server.unsubscribe(connection, 3, ["foo/bar/+"]); 218 const unsuback2 = connection.lastMsg!MqttUnsuback; 219 shouldEqual(unsuback2.msgId, 3); 220 221 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]); 222 server.publish(connection, "foo/boogagoo", [9, 8, 7]); 223 shouldEqual(connection.payloads, [[1, 2, 3, 4], [1, 2, 3, 4]]); //shouldn't have changed 224 } 225 226 void testUnsubscribeHandle() { 227 auto server = MqttServer!TestMqttConnection(); 228 auto connection = TestMqttConnection(); 229 server.send(connection, connectionMsgBytes); 230 server.subscribe(connection, 42, ["foo/bar/+"]); 231 232 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]); 233 server.publish(connection, "foo/boogagoo", [9, 8, 7]); 234 shouldEqual(connection.payloads, [[1, 2, 3, 4]]); 235 236 ubyte[] bytes = [ 0xa2, 0x0d, //fixed header 237 0x00, 0x21, //message ID 238 0x00, 0x09, 'f', 'o', 'o', '/', 'b', 'a', 'r', '/', '+', 239 ]; 240 241 server.send(connection, bytes); 242 const unsuback = connection.lastMsg!MqttUnsuback; 243 shouldEqual(unsuback.msgId, 33); 244 245 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]); 246 server.publish(connection, "foo/boogagoo", [9, 8, 7]); 247 shouldEqual(connection.payloads, [[1, 2, 3, 4]]); //shouldn't have changed 248 } 249 250 void testUnsubscribeAll() { 251 auto server = MqttServer!TestMqttConnection(); 252 auto connection = TestMqttConnection(); 253 server.send(connection, connectionMsgBytes); 254 server.subscribe(connection, 42, ["foo/bar/+"]); 255 256 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]); 257 server.publish(connection, "foo/boogagoo", [9, 8, 7]); 258 shouldEqual(connection.payloads, [[1, 2, 3, 4]]); 259 260 ubyte[] bytes = [ 0xa2, 0x0d, //fixed header 261 0x00, 0x21, //message ID 262 0x00, 0x09, 'f', 'o', 'o', '/', 'b', 'a', 'r', '/', '+', 263 ]; 264 265 server.send(connection, cast(ubyte[])[0xe0, 0]); 266 267 server.publish(connection, "foo/bar/baz", [1, 2, 3, 4]); 268 server.publish(connection, "foo/boogagoo", [9, 8, 7]); 269 shouldEqual(connection.payloads, [[1, 2, 3, 4]]); //shouldn't have changed, disconnected 270 } 271 272 273 void testSubscribeWildCard() { 274 import std.conv; 275 auto server = MqttServer!TestMqttConnection(); 276 immutable numPairs = 2; 277 immutable numWilds = 2; 278 TestMqttConnection[numPairs] reqs; 279 TestMqttConnection[numPairs] reps; 280 TestMqttConnection[numWilds] wlds; 281 282 foreach(i, ref wld; wlds) 283 server.subscribe(wld, cast(ushort)(i * 20 + 1), ["pingtest/0/#"]); 284 285 foreach(i, ref req; reqs) 286 server.subscribe(req, cast(ushort)(i * 2), [text("pingtest/", i, "/request")]); 287 288 foreach(i, ref rep; reps) 289 server.subscribe(rep, cast(ushort)(i * 2 + 1), [text("pingtest/", i, "/reply")]); 290 291 foreach(ref c; reqs) c.payloads = []; 292 foreach(ref c; reps) c.payloads = []; 293 foreach(ref c; wlds) c.payloads = []; 294 295 immutable numMessages = 2; 296 foreach(i; 0..numPairs) { 297 foreach(j; 0..numMessages) { 298 server.publish(reqs[0], text("pingtest/", i, "/request"), [0, 1, 2, 3]); 299 server.publish(reqs[0], text("pingtest/", i, "/reply"), [9, 8, 7]); 300 } 301 } 302 303 foreach(ref req; reqs) { 304 writelnUt("checking payloads of ", &req); 305 req.payloads.shouldEqual([0, 1, 2, 3].repeat.take(numMessages)); 306 } 307 308 foreach(rep; reps) 309 rep.payloads.shouldEqual([9, 8, 7].repeat.take(numMessages)); 310 311 foreach(w; wlds) 312 shouldEqual(w.payloads.length, numMessages * 2); 313 } 314 315 316 void testDisconnect() { 317 auto server = MqttServer!TestMqttConnection(); 318 auto connection = TestMqttConnection(); 319 320 connection.connected.shouldBeFalse; 321 server.send(connection, connectionMsgBytes); 322 connection.connected.shouldBeTrue; 323 324 server.send(connection, cast(ubyte[])[0xe0, 0x00]); //disconnect 325 connection.connected.shouldBeFalse; 326 }