1 module tests.stream;
2 
3 import unit_threaded;
4 import mqttd.stream;
5 import mqttd.message;
6 import mqttd.server;
7 import mqttd.broker;
8 import std.stdio;
9 import std.algorithm;
10 import std.array;
11 import cerealed;
12 
13 struct TestMqttConnection {
14     void send(in ubyte[] payload) {
15         writelnUt(&this, "  message: ", payload);
16         auto dec = Decerealiser(payload);
17         immutable fixedHeader = dec.value!MqttFixedHeader;
18         dec.reset;
19         switch(fixedHeader.type) with(MqttType) {
20             case PUBLISH:
21                 auto msg = dec.value!MqttPublish;
22                 payloads ~= msg.payload.dup;
23                 break;
24 
25             default:
26                 messages ~= payload;
27         }
28     }
29 
30     void disconnect() {
31         connected = false;
32     }
33 
34     alias Payload = ubyte[];
35     const(Payload)[] payloads;
36     const(Payload)[] messages;
37     bool connected = true;
38 
39     static assert(isMqttSubscriber!TestMqttConnection);
40 }
41 
42 void subscribe(S)(ref MqttServer!S server, ref S connection, in ushort msgId, in string[] topics) if(isMqttSubscriber!S) {
43     MqttSubscribe(msgId, topics.map!(a => MqttSubscribe.Topic(a, 0)).array).cerealise!(b => server.send(connection, b));
44 }
45 
46 
47 void testMqttInTwoPackets() {
48     auto server = MqttServer!TestMqttConnection();
49     auto connection = TestMqttConnection();
50     auto stream = MqttStream(128);
51 
52     server.subscribe(connection, 33, ["top"]);
53 
54     ubyte[] bytes1 = [ 0x3c, 0x0f, //fixed header
55                        0x00, 0x03, 't', 'o', 'p', //topic name
56                        0x00, 0x21, //message ID
57                        1, 2, 3 ]; //1st part of payload
58 
59     stream ~= bytes1;
60     stream.handleMessages(server, connection);
61     connection.payloads.shouldBeEmpty;
62 
63     ubyte[] bytes2 = [ 4, 5, 6, 7, 8]; //2nd part of payload
64     stream ~= bytes2;
65     stream.handleMessages(server, connection);
66     connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]);
67 }
68 
69 
70 void testTwoMqttInThreePackets() {
71     auto server = MqttServer!TestMqttConnection();
72     auto connection = TestMqttConnection();
73     auto stream = MqttStream(128);
74 
75     server.subscribe(connection, 33, ["top"]);
76 
77     ubyte[] bytes1 = [ 0x3c, 0x0f, //fixed header
78                        0x00, 0x03, 't', 'o', 'p', //topic name
79                        0x00, 0x21, //message ID
80                        1, 2, 3, ]; //1st part of payload
81 
82     stream ~= bytes1;
83     stream.handleMessages(server, connection);
84     connection.payloads.shouldBeEmpty;
85 
86     ubyte[] bytes2 = [ 4, 5, 6, 7, 8]; //2nd part of payload
87     stream ~= bytes2;
88     stream.handleMessages(server, connection);
89     connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]);
90 
91     ubyte[] bytes3 = [0xe0, 0x00]; //disconnect
92     stream ~= bytes3;
93     stream.handleMessages(server, connection);
94     connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]);
95     connection.connected.shouldBeFalse;
96 }
97 
98 
99 void testTwoMqttInOnePacket() {
100    auto stream = MqttStream(128);
101    auto server = MqttServer!TestMqttConnection();
102    auto connection = TestMqttConnection();
103 
104    server.subscribe(connection, 33, ["top"]);
105 
106    ubyte[] bytes1 = [ 0x3c ]; // half of header
107    ubyte[] bytes2 = [ 0x0f, //2nd half fixed header
108                      0x00, 0x03, 't', 'o', 'p', //topic name
109                      0x00, 0x21, //message ID
110                       1, 2, 3, 4, 5, 6, 7, 8, //payload
111                      0xe0, 0x00, //header for disconnect
112        ];
113 
114    stream ~= bytes1;
115    stream.handleMessages(server, connection);
116    connection.payloads.shouldBeEmpty;
117 
118    stream ~= bytes2;
119    stream.handleMessages(server, connection);
120    connection.payloads.shouldEqual([[1, 2, 3, 4, 5, 6, 7, 8]]);
121 }
122 
123 void testBug1() {
124     auto stream = MqttStream(128);
125 
126     ubyte[] msg = [48, 20, 0, 16, 112, 105, 110, 103, 116, 101, 115, 116, 47, 48, 47, 114, 101, 112, 108, 121, 111, 107];
127     ubyte[] bytes1 = msg ~ msg[0..$-4];
128     stream ~= bytes1;
129     stream.popNextMessageBytes.shouldEqual(msg);
130 }
131 
132 
133 void testBug2() {
134     auto stream = MqttStream(128);
135 
136     ubyte[] bytes1 = [48, 26, 0, 18, 112, 105, 110, 103, 116, 101, 115, 116, 47, 48, 47, 114, 101, 113, 117, 101, 115, 116];
137     stream ~= bytes1;
138     stream.hasMessages.shouldBeFalse;
139 
140     ubyte[] bytes2 = [112, 105, 110, 103, 32, 48];
141     stream ~= bytes2;
142     stream.hasMessages.shouldBeTrue;
143     stream.popNextMessageBytes.shouldEqual(bytes1 ~ bytes2);
144 }
145 
146 
147 void testPublishInTwoMessages() {
148     auto server = MqttServer!TestMqttConnection();
149     auto connection = TestMqttConnection();
150     auto stream = MqttStream(128);
151 
152     ubyte[] subBytes = [
153         0x8b, 0x13, //fixed header
154         0x00, 0x21, //message ID
155         0x00, 0x05, 'f', 'i', 'r', 's', 't',
156         0x01, //qos
157         0x00, 0x06, 's', 'e', 'c', 'o', 'n', 'd',
158         0x02, //qos
159         ];
160 
161     stream ~= subBytes;
162     stream.handleMessages(server, connection);
163 
164     ubyte[] firstPart = [
165         0x3c, 0x0d, //fixed header
166         0x00, 0x05, 'f', 'i', 'r', 's', 't',//topic name
167         ];
168 
169     stream ~= firstPart;
170     stream.handleMessages(server, connection);
171     connection.payloads.shouldBeEmpty;
172 
173     ubyte[] sndPart = [
174         0x00, 0x21, //message ID
175         1, 2, 3, 4, //payload
176         ];
177 
178     stream ~= sndPart;
179     stream.handleMessages(server, connection);
180     connection.payloads.shouldEqual([[1, 2, 3, 4]]);
181 }