1 module mqttd.message;
2 
3 
4 import cerealed;
5 import std.stdio;
6 import std.algorithm;
7 import std.stdio;
8 import std.range;
9 import std.exception;
10 import std.conv;
11 
12 enum MqttType {
13     RESERVED1   = 0,
14     CONNECT     = 1,
15     CONNACK     = 2,
16     PUBLISH     = 3,
17     PUBACK      = 4,
18     PUBREC      = 5,
19     PUBREL      = 6,
20     PUBCOMP     = 7,
21     SUBSCRIBE   = 8,
22     SUBACK      = 9,
23     UNSUBSCRIBE = 10,
24     UNSUBACK    = 11,
25     PINGREQ     = 12,
26     PINGRESP    = 13,
27     DISCONNECT  = 14,
28     RESERVED2   = 15
29 }
30 
31 
32 struct MqttFixedHeader {
33 public:
34     enum SIZE = 2;
35 
36     @Bits!4 MqttType type;
37     @Bits!1 bool dup;
38     @Bits!2 ubyte qos;
39     @Bits!1 bool retain;
40     @NoCereal uint remaining;
41 
42     bool check(in ubyte[] bytes, in ulong cerealBytesLength) const {
43         if(remaining < cerealBytesLength) {
44             stderr.writeln("Wrong MQTT remaining size ", cast(int)remaining,
45                            ". Real remaining size: ", cerealBytesLength);
46         }
47 
48         const mqttSize = remaining + SIZE;
49         if(mqttSize != bytes.length) {
50             stderr.writeln("Malformed packet. Actual size: ", bytes.length,
51                     ". Advertised size: ", mqttSize, " (r ", remaining ,")");
52             stderr.writeln("Packet:\n", "%(0x%x %)", bytes);
53 
54             return false;
55         }
56 
57         return true;
58     }
59 
60     void postBlit(Cereal)(ref Cereal cereal) if(isCerealiser!Cereal) {
61         setRemainingSize(cereal);
62     }
63 
64     void postBlit(Cereal)(ref Cereal cereal) if(isDecerealiser!Cereal) {
65         remaining = getRemainingSize(cereal);
66     }
67 
68     mixin assertHasPostBlit!MqttFixedHeader;
69 
70 private:
71 
72     uint getRemainingSize(Cereal)(ref Cereal cereal) if(isDecerealiser!Cereal) {
73         //algorithm straight from the MQTT spec
74         int multiplier = 1;
75         uint value = 0;
76         ubyte digit;
77         do {
78             cereal.grain(digit);
79             value += (digit & 127) * multiplier;
80             multiplier *= 128;
81         } while((digit & 128) != 0);
82 
83         return value;
84     }
85 
86     void setRemainingSize(Cereal)(ref Cereal cereal) const if(isCerealiser!Cereal) {
87         remaining <= 127 ? setRemainingSizeOneByte(cereal) : setRemainingSizeMultiByte(cereal);
88     }
89 
90     void setRemainingSizeOneByte(Cereal)(ref Cereal cereal) const if(isCerealiser!Cereal) {
91         cereal ~= cast(ubyte)remaining;
92     }
93 
94     void setRemainingSizeMultiByte(Cereal)(ref Cereal cereal) const if(isCerealiser!Cereal) {
95         //algorithm straight from the MQTT spec, modified for speed optimisation
96         enum maxDigits = 4;
97         static ubyte[maxDigits] digitStore; //optimisation for speed, no heap allocations
98         ubyte[] digits = digitStore[0..0];
99         uint x = remaining;
100         do {
101             ubyte digit = x % 128;
102             x /= 128;
103             if(x > 0) {
104                 digit |= 0x80;
105             }
106             digits ~= digit;
107         } while(x > 0);
108 
109         foreach(b; digits) cereal.grain(b);
110     }
111 
112 }
113 
114 
115 struct MqttConnect {
116 public:
117 
118     this(MqttFixedHeader header) {
119         this.header = header;
120     }
121 
122     void postBlit(Cereal)(ref Cereal cereal) {
123         if(hasWill)     cereal.grain(willTopic);
124         if(hasWill)     cereal.grain(willMessage);
125         if(hasUserName) cereal.grain(userName);
126         if(hasPassword) cereal.grain(password);
127     }
128 
129     mixin assertHasPostBlit!MqttConnect;
130 
131     @property bool isBadClientId() const { return clientId.length < 1 || clientId.length > 23; }
132 
133     MqttFixedHeader header;
134     string protoName;
135     ubyte protoVersion;
136     @Bits!1 bool hasUserName;
137     @Bits!1 bool hasPassword;
138     @Bits!1 bool hasWillRetain;
139     @Bits!2 ubyte willQos;
140     @Bits!1 bool hasWill;
141     @Bits!1 bool hasClear;
142     @Bits!1 bool reserved;
143     ushort keepAlive;
144     string clientId;
145     @NoCereal string willTopic;
146     @NoCereal string willMessage;
147     @NoCereal string userName;
148     @NoCereal string password;
149 }
150 
151 struct MqttConnack {
152 
153     enum Code: byte {
154         ACCEPTED = 0,
155         BAD_VERSION = 1,
156         BAD_ID = 2,
157         SERVER_UNAVAILABLE = 3,
158         BAD_USER_OR_PWD = 4,
159         NO_AUTH = 5,
160     }
161 
162     this(MqttFixedHeader header = MqttFixedHeader(MqttType.CONNACK, false, 0, false, 2)) {
163         this.header = header;
164     }
165 
166     this(Code code) {
167         this.code = code;
168         this();
169     }
170 
171     MqttFixedHeader header;
172     ubyte reserved;
173     Code code;
174 }
175 
176 
177 struct MqttPublish {
178 public:
179     this(MqttFixedHeader header) {
180         this.header = header;
181     }
182 
183     this(in string topic, in ubyte[] payload, ushort msgId = 0) {
184         this(false, 0, false, topic, payload, msgId);
185     }
186 
187     this(in bool dup, in ubyte qos, in bool retain, in string topic, in ubyte[] payload, in ushort msgId = 0) {
188         const topicLen = cast(uint)topic.length + 2; //2 for length
189         auto remaining = qos ? topicLen + 2 /*msgId*/ : topicLen;
190         remaining += payload.length;
191 
192         this.header = MqttFixedHeader(MqttType.PUBLISH, dup, qos, retain, remaining);
193         this.topic = topic;
194         //only safe if we never change it
195         _payload = cast(ubyte[])payload;
196         this.msgId = msgId;
197         this.cantDecerealise = true; //because of the cast
198     }
199 
200     void postBlit(Cereal)(ref Cereal cereal) {
201         static if(isDecerealiser!Cereal) {
202             assert(!cantDecerealise, "Cannot decerealise if constructed from payload");
203         }
204 
205         auto payloadLen = header.remaining - (topic.length + MqttFixedHeader.SIZE);
206         if(header.qos) {
207             static if(Cereal.type == CerealType.ReadBytes) {
208                 if(header.remaining < 7) {
209                     stderr.writeln("Error: PUBLISH message with QOS but no message ID");
210                 } else {
211                     cereal.grain(msgId);
212                     payloadLen -= 2;
213                 }
214             } else {
215                 cereal.grain(msgId);
216                 payloadLen -= 2;
217             }
218         }
219 
220         static if(isDecerealiser!Cereal) {
221             _payload = cast(ubyte[])cereal.bytes; //dirty but fast
222         } else {
223             cereal.grainRaw(_payload);
224         }
225     }
226 
227     mixin assertHasPostBlit!MqttPublish;
228 
229     const(ubyte)[] payload() @safe pure const nothrow {
230         return _payload;
231     }
232 
233     MqttFixedHeader header;
234     string topic;
235     private @NoCereal ubyte[] _payload;
236     @NoCereal ushort msgId;
237     @NoCereal bool cantDecerealise;
238 }
239 
240 
241 struct MqttSubscribe {
242 public:
243     this(MqttFixedHeader header) {
244         if(header.qos != 1) {
245             stderr.writeln("SUBSCRIBE message with qos ", header.qos, ", should be 1");
246         }
247         this.header = header;
248     }
249 
250     this(ushort msgId, Topic[] topics, in bool dup = false, in ubyte qos = 1, in bool retain = false) {
251         immutable remaining = cast(uint)(msgId.sizeof + topics.map!(a => a.qos.sizeof + a.topic.length + 2).sum);
252         this.header = MqttFixedHeader(MqttType.SUBSCRIBE, dup, qos, retain, remaining);
253         this.topics = topics;
254     }
255 
256     static struct Topic {
257         string topic;
258         ubyte qos;
259     }
260 
261     MqttFixedHeader header;
262     ushort msgId;
263     @RawArray Topic[] topics;
264 }
265 
266 struct MqttSuback {
267 public:
268 
269     this(MqttFixedHeader header) {
270         this.header = header;
271     }
272 
273     this(in ushort msgId, in ubyte[] qos) {
274         this.header = MqttFixedHeader(MqttType.SUBACK, false, 0, false, cast(uint)qos.length + 2);
275         this.msgId = msgId;
276         this.qos = qos.dup;
277     }
278 
279     MqttFixedHeader header;
280     ushort msgId;
281     @RawArray ubyte[] qos;
282 }
283 
284 struct MqttUnsubscribe {
285     this(MqttFixedHeader header) {
286         this.header = header;
287     }
288 
289     this(ushort msgId, in string[] topics,  in bool dup = false, in ubyte qos = 1, in bool retain = false) {
290         immutable remaining = cast(uint)(msgId.sizeof + topics.map!(a => 2 + a.length).sum);
291         this.header = MqttFixedHeader(MqttType.UNSUBSCRIBE, dup, qos, retain, remaining);
292         this.msgId = msgId;
293         this.topics = topics.dup;
294     }
295 
296     MqttFixedHeader header;
297     ushort msgId;
298     @RawArray string[] topics;
299 }
300 
301 struct MqttUnsuback {
302     this(in ushort msgId) {
303         this.header = MqttFixedHeader(MqttType.UNSUBACK, false, 0, false, 2);
304         this.msgId = msgId;
305     }
306 
307     this(MqttFixedHeader header) {
308         this.header = header;
309     }
310 
311     MqttFixedHeader header;
312     ushort msgId;
313 }
314 
315 
316 struct MqttPingResp {
317     this(MqttFixedHeader = MqttFixedHeader()) {}
318     @property const(ubyte[]) encode() const {
319         return [0xd0, 0x00];
320     }
321 }
322 
323 
324 string getTopic(in ubyte[] bytes) {
325     //only works if there's no msg id
326     enum offset = 4; //fixed header of 2 bytes + topic len
327     immutable len = (bytes[2] << 8) + bytes[3];
328     return cast(string)bytes[offset .. offset + len];
329 
330     // const(ubyte)[] slice = bytes;
331     // auto dec = Decerealiser(bytes);
332     // const fixedHeader = dec.value!MqttFixedHeader;
333     // enforce(fixedHeader.type == MqttType.PUBLISH,
334     //         text("Cannot get topic from a message of type ", fixedHeader.type));
335 
336     // if(fixedHeader.qos) {
337     //     if(fixedHeader.remaining < 7) {
338     //         stderr.writeln("Error: PUBLISH message with QOS but no message ID");
339     //     } else {
340     //         cereal.grain(msgId);
341     //         payloadLen -= 2;
342     //     }
343     // }
344 }
345 
346 MqttType getType(in ubyte[] bytes) {
347     return cast(MqttType)(bytes[0] >> 4);
348 }