1 module tests.stream; 2 3 import unit_threaded; 4 import mqttd.stream; 5 import mqttd.message; 6 import mqttd.server; 7 import mqttd.broker; 8 import std.stdio; 9 import std.algorithm; 10 import std.array; 11 import cerealed; 12 13 struct TestMqttConnection { 14 void send(in ubyte[] payload) { 15 writelnUt(&this, " message: ", payload); 16 auto dec = Decerealiser(payload); 17 immutable fixedHeader = dec.value!MqttFixedHeader; 18 dec.reset; 19 switch(fixedHeader.type) with(MqttType) { 20 case PUBLISH: 21 auto msg = dec.value!MqttPublish; 22 payloads ~= msg.payload.dup; 23 break; 24 25 default: 26 messages ~= payload; 27 } 28 } 29 30 void disconnect() { 31 connected = false; 32 } 33 34 alias Payload = ubyte[]; 35 const(Payload)[] payloads; 36 const(Payload)[] messages; 37 bool connected = true; 38 39 static assert(isMqttSubscriber!TestMqttConnection); 40 } 41 42 void subscribe(S)(ref MqttServer!S server, ref S connection, in ushort msgId, in string[] topics) if(isMqttSubscriber!S) { 43 MqttSubscribe(msgId, topics.map!(a => MqttSubscribe.Topic(a, 0)).array).cerealise!(b => server.send(connection, b)); 44 } 45 46 47 void testMqttInTwoPackets() { 48 auto server = MqttServer!TestMqttConnection(); 49 auto connection = TestMqttConnection(); 50 auto stream = MqttStream(128); 51 52 server.subscribe(connection, 33, ["top"]); 53 54 ubyte[] bytes1 = [ 0x3c, 0x0f, //fixed header 55 0x00, 0x03, 't', 'o', 'p', //topic name 56 0x00, 0x21, //message ID 57 1, 2, 3 ]; //1st part of payload 58 59 stream ~= bytes1; 60 stream.handleMessages(server, connection); 61 connection.payloads.shouldBeEmpty; 62 63 ubyte[] bytes2 = [ 4, 5, 6, 7, 8]; //2nd part of payload 64 stream ~= bytes2; 65 stream.handleMessages(server, connection); 66 connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]); 67 } 68 69 70 void testTwoMqttInThreePackets() { 71 auto server = MqttServer!TestMqttConnection(); 72 auto connection = TestMqttConnection(); 73 auto stream = MqttStream(128); 74 75 server.subscribe(connection, 33, ["top"]); 76 77 ubyte[] bytes1 = [ 0x3c, 0x0f, //fixed header 78 0x00, 0x03, 't', 'o', 'p', //topic name 79 0x00, 0x21, //message ID 80 1, 2, 3, ]; //1st part of payload 81 82 stream ~= bytes1; 83 stream.handleMessages(server, connection); 84 connection.payloads.shouldBeEmpty; 85 86 ubyte[] bytes2 = [ 4, 5, 6, 7, 8]; //2nd part of payload 87 stream ~= bytes2; 88 stream.handleMessages(server, connection); 89 connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]); 90 91 ubyte[] bytes3 = [0xe0, 0x00]; //disconnect 92 stream ~= bytes3; 93 stream.handleMessages(server, connection); 94 connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]); 95 connection.connected.shouldBeFalse; 96 } 97 98 99 void testTwoMqttInOnePacket() { 100 auto stream = MqttStream(128); 101 auto server = MqttServer!TestMqttConnection(); 102 auto connection = TestMqttConnection(); 103 104 server.subscribe(connection, 33, ["top"]); 105 106 ubyte[] bytes1 = [ 0x3c ]; // half of header 107 ubyte[] bytes2 = [ 0x0f, //2nd half fixed header 108 0x00, 0x03, 't', 'o', 'p', //topic name 109 0x00, 0x21, //message ID 110 1, 2, 3, 4, 5, 6, 7, 8, //payload 111 0xe0, 0x00, //header for disconnect 112 ]; 113 114 stream ~= bytes1; 115 stream.handleMessages(server, connection); 116 connection.payloads.shouldBeEmpty; 117 118 stream ~= bytes2; 119 stream.handleMessages(server, connection); 120 connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]); 121 } 122 123 void testBug1() { 124 auto stream = MqttStream(128); 125 126 ubyte[] msg = [48, 20, 0, 16, 112, 105, 110, 103, 116, 101, 115, 116, 47, 48, 47, 114, 101, 112, 108, 121, 111, 107]; 127 ubyte[] bytes1 = msg ~ msg[0..$-4]; 128 stream ~= bytes1; 129 stream.popNextMessageBytes.shouldEqual(msg); 130 } 131 132 133 void testBug2() { 134 auto stream = MqttStream(128); 135 136 ubyte[] bytes1 = [48, 26, 0, 18, 112, 105, 110, 103, 116, 101, 115, 116, 47, 48, 47, 114, 101, 113, 117, 101, 115, 116]; 137 stream ~= bytes1; 138 stream.hasMessages.shouldBeFalse; 139 140 ubyte[] bytes2 = [112, 105, 110, 103, 32, 48]; 141 stream ~= bytes2; 142 stream.hasMessages.shouldBeTrue; 143 stream.popNextMessageBytes.shouldEqual(bytes1 ~ bytes2); 144 } 145 146 147 void testPublishInTwoMessages() { 148 auto server = MqttServer!TestMqttConnection(); 149 auto connection = TestMqttConnection(); 150 auto stream = MqttStream(128); 151 152 ubyte[] subBytes = [ 153 0x8b, 0x13, //fixed header 154 0x00, 0x21, //message ID 155 0x00, 0x05, 'f', 'i', 'r', 's', 't', 156 0x01, //qos 157 0x00, 0x06, 's', 'e', 'c', 'o', 'n', 'd', 158 0x02, //qos 159 ]; 160 161 stream ~= subBytes; 162 stream.handleMessages(server, connection); 163 164 ubyte[] firstPart = [ 165 0x3c, 0x0d, //fixed header 166 0x00, 0x05, 'f', 'i', 'r', 's', 't',//topic name 167 ]; 168 169 stream ~= firstPart; 170 stream.handleMessages(server, connection); 171 connection.payloads.shouldBeEmpty; 172 173 ubyte[] sndPart = [ 174 0x00, 0x21, //message ID 175 1, 2, 3, 4, //payload 176 ]; 177 178 stream ~= sndPart; 179 stream.handleMessages(server, connection); 180 connection.payloads.shouldEqual([[1, 2, 3, 4]]); 181 }