1 module mqttd.message; 2 3 4 import cerealed; 5 import std.stdio; 6 import std.algorithm; 7 import std.stdio; 8 import std.range; 9 import std.exception; 10 import std.conv; 11 12 enum MqttType { 13 RESERVED1 = 0, 14 CONNECT = 1, 15 CONNACK = 2, 16 PUBLISH = 3, 17 PUBACK = 4, 18 PUBREC = 5, 19 PUBREL = 6, 20 PUBCOMP = 7, 21 SUBSCRIBE = 8, 22 SUBACK = 9, 23 UNSUBSCRIBE = 10, 24 UNSUBACK = 11, 25 PINGREQ = 12, 26 PINGRESP = 13, 27 DISCONNECT = 14, 28 RESERVED2 = 15 29 } 30 31 32 struct MqttFixedHeader { 33 public: 34 enum SIZE = 2; 35 36 @Bits!4 MqttType type; 37 @Bits!1 bool dup; 38 @Bits!2 ubyte qos; 39 @Bits!1 bool retain; 40 @NoCereal uint remaining; 41 42 bool check(in ubyte[] bytes, in ulong cerealBytesLength) const { 43 if(remaining < cerealBytesLength) { 44 stderr.writeln("Wrong MQTT remaining size ", cast(int)remaining, 45 ". Real remaining size: ", cerealBytesLength); 46 } 47 48 const mqttSize = remaining + SIZE; 49 if(mqttSize != bytes.length) { 50 stderr.writeln("Malformed packet. Actual size: ", bytes.length, 51 ". Advertised size: ", mqttSize, " (r ", remaining ,")"); 52 stderr.writeln("Packet:\n", "%(0x%x %)", bytes); 53 54 return false; 55 } 56 57 return true; 58 } 59 60 void postBlit(Cereal)(ref Cereal cereal) if(isCerealiser!Cereal) { 61 setRemainingSize(cereal); 62 } 63 64 void postBlit(Cereal)(ref Cereal cereal) if(isDecerealiser!Cereal) { 65 remaining = getRemainingSize(cereal); 66 } 67 68 mixin assertHasPostBlit!MqttFixedHeader; 69 70 private: 71 72 uint getRemainingSize(Cereal)(ref Cereal cereal) if(isDecerealiser!Cereal) { 73 //algorithm straight from the MQTT spec 74 int multiplier = 1; 75 uint value = 0; 76 ubyte digit; 77 do { 78 cereal.grain(digit); 79 value += (digit & 127) * multiplier; 80 multiplier *= 128; 81 } while((digit & 128) != 0); 82 83 return value; 84 } 85 86 void setRemainingSize(Cereal)(ref Cereal cereal) const if(isCerealiser!Cereal) { 87 remaining <= 127 ? setRemainingSizeOneByte(cereal) : setRemainingSizeMultiByte(cereal); 88 } 89 90 void setRemainingSizeOneByte(Cereal)(ref Cereal cereal) const if(isCerealiser!Cereal) { 91 cereal ~= cast(ubyte)remaining; 92 } 93 94 void setRemainingSizeMultiByte(Cereal)(ref Cereal cereal) const if(isCerealiser!Cereal) { 95 //algorithm straight from the MQTT spec, modified for speed optimisation 96 enum maxDigits = 4; 97 static ubyte[maxDigits] digitStore; //optimisation for speed, no heap allocations 98 ubyte[] digits = digitStore[0..0]; 99 uint x = remaining; 100 do { 101 ubyte digit = x % 128; 102 x /= 128; 103 if(x > 0) { 104 digit |= 0x80; 105 } 106 digits ~= digit; 107 } while(x > 0); 108 109 foreach(b; digits) cereal.grain(b); 110 } 111 112 } 113 114 115 struct MqttConnect { 116 public: 117 118 this(MqttFixedHeader header) { 119 this.header = header; 120 } 121 122 void postBlit(Cereal)(ref Cereal cereal) { 123 if(hasWill) cereal.grain(willTopic); 124 if(hasWill) cereal.grain(willMessage); 125 if(hasUserName) cereal.grain(userName); 126 if(hasPassword) cereal.grain(password); 127 } 128 129 mixin assertHasPostBlit!MqttConnect; 130 131 @property bool isBadClientId() const { return clientId.length < 1 || clientId.length > 23; } 132 133 MqttFixedHeader header; 134 string protoName; 135 ubyte protoVersion; 136 @Bits!1 bool hasUserName; 137 @Bits!1 bool hasPassword; 138 @Bits!1 bool hasWillRetain; 139 @Bits!2 ubyte willQos; 140 @Bits!1 bool hasWill; 141 @Bits!1 bool hasClear; 142 @Bits!1 bool reserved; 143 ushort keepAlive; 144 string clientId; 145 @NoCereal string willTopic; 146 @NoCereal string willMessage; 147 @NoCereal string userName; 148 @NoCereal string password; 149 } 150 151 struct MqttConnack { 152 153 enum Code: byte { 154 ACCEPTED = 0, 155 BAD_VERSION = 1, 156 BAD_ID = 2, 157 SERVER_UNAVAILABLE = 3, 158 BAD_USER_OR_PWD = 4, 159 NO_AUTH = 5, 160 } 161 162 this(MqttFixedHeader header = MqttFixedHeader(MqttType.CONNACK, false, 0, false, 2)) { 163 this.header = header; 164 } 165 166 this(Code code) { 167 this.code = code; 168 this(); 169 } 170 171 MqttFixedHeader header; 172 ubyte reserved; 173 Code code; 174 } 175 176 177 struct MqttPublish { 178 public: 179 this(MqttFixedHeader header) { 180 this.header = header; 181 } 182 183 this(in string topic, in ubyte[] payload, ushort msgId = 0) { 184 this(false, 0, false, topic, payload, msgId); 185 } 186 187 this(in bool dup, in ubyte qos, in bool retain, in string topic, in ubyte[] payload, in ushort msgId = 0) { 188 const topicLen = cast(uint)topic.length + 2; //2 for length 189 auto remaining = qos ? topicLen + 2 /*msgId*/ : topicLen; 190 remaining += payload.length; 191 192 this.header = MqttFixedHeader(MqttType.PUBLISH, dup, qos, retain, remaining); 193 this.topic = topic; 194 //only safe if we never change it 195 _payload = cast(ubyte[])payload; 196 this.msgId = msgId; 197 this.cantDecerealise = true; //because of the cast 198 } 199 200 void postBlit(Cereal)(ref Cereal cereal) { 201 static if(isDecerealiser!Cereal) { 202 assert(!cantDecerealise, "Cannot decerealise if constructed from payload"); 203 } 204 205 auto payloadLen = header.remaining - (topic.length + MqttFixedHeader.SIZE); 206 if(header.qos) { 207 static if(Cereal.type == CerealType.ReadBytes) { 208 if(header.remaining < 7) { 209 stderr.writeln("Error: PUBLISH message with QOS but no message ID"); 210 } else { 211 cereal.grain(msgId); 212 payloadLen -= 2; 213 } 214 } else { 215 cereal.grain(msgId); 216 payloadLen -= 2; 217 } 218 } 219 220 static if(isDecerealiser!Cereal) { 221 _payload = cast(ubyte[])cereal.bytes; //dirty but fast 222 } else { 223 cereal.grainRaw(_payload); 224 } 225 } 226 227 mixin assertHasPostBlit!MqttPublish; 228 229 const(ubyte)[] payload() @safe pure const nothrow { 230 return _payload; 231 } 232 233 MqttFixedHeader header; 234 string topic; 235 private @NoCereal ubyte[] _payload; 236 @NoCereal ushort msgId; 237 @NoCereal bool cantDecerealise; 238 } 239 240 241 struct MqttSubscribe { 242 public: 243 this(MqttFixedHeader header) { 244 if(header.qos != 1) { 245 stderr.writeln("SUBSCRIBE message with qos ", header.qos, ", should be 1"); 246 } 247 this.header = header; 248 } 249 250 this(ushort msgId, Topic[] topics, in bool dup = false, in ubyte qos = 1, in bool retain = false) { 251 immutable remaining = cast(uint)(msgId.sizeof + topics.map!(a => a.qos.sizeof + a.topic.length + 2).sum); 252 this.header = MqttFixedHeader(MqttType.SUBSCRIBE, dup, qos, retain, remaining); 253 this.topics = topics; 254 } 255 256 static struct Topic { 257 string topic; 258 ubyte qos; 259 } 260 261 MqttFixedHeader header; 262 ushort msgId; 263 @RawArray Topic[] topics; 264 } 265 266 struct MqttSuback { 267 public: 268 269 this(MqttFixedHeader header) { 270 this.header = header; 271 } 272 273 this(in ushort msgId, in ubyte[] qos) { 274 this.header = MqttFixedHeader(MqttType.SUBACK, false, 0, false, cast(uint)qos.length + 2); 275 this.msgId = msgId; 276 this.qos = qos.dup; 277 } 278 279 MqttFixedHeader header; 280 ushort msgId; 281 @RawArray ubyte[] qos; 282 } 283 284 struct MqttUnsubscribe { 285 this(MqttFixedHeader header) { 286 this.header = header; 287 } 288 289 this(ushort msgId, in string[] topics, in bool dup = false, in ubyte qos = 1, in bool retain = false) { 290 immutable remaining = cast(uint)(msgId.sizeof + topics.map!(a => 2 + a.length).sum); 291 this.header = MqttFixedHeader(MqttType.UNSUBSCRIBE, dup, qos, retain, remaining); 292 this.msgId = msgId; 293 this.topics = topics.dup; 294 } 295 296 MqttFixedHeader header; 297 ushort msgId; 298 @RawArray string[] topics; 299 } 300 301 struct MqttUnsuback { 302 this(in ushort msgId) { 303 this.header = MqttFixedHeader(MqttType.UNSUBACK, false, 0, false, 2); 304 this.msgId = msgId; 305 } 306 307 this(MqttFixedHeader header) { 308 this.header = header; 309 } 310 311 MqttFixedHeader header; 312 ushort msgId; 313 } 314 315 316 struct MqttPingResp { 317 this(MqttFixedHeader = MqttFixedHeader()) {} 318 @property const(ubyte[]) encode() const { 319 return [0xd0, 0x00]; 320 } 321 } 322 323 324 string getTopic(in ubyte[] bytes) { 325 //only works if there's no msg id 326 enum offset = 4; //fixed header of 2 bytes + topic len 327 immutable len = (bytes[2] << 8) + bytes[3]; 328 return cast(string)bytes[offset .. offset + len]; 329 330 // const(ubyte)[] slice = bytes; 331 // auto dec = Decerealiser(bytes); 332 // const fixedHeader = dec.value!MqttFixedHeader; 333 // enforce(fixedHeader.type == MqttType.PUBLISH, 334 // text("Cannot get topic from a message of type ", fixedHeader.type)); 335 336 // if(fixedHeader.qos) { 337 // if(fixedHeader.remaining < 7) { 338 // stderr.writeln("Error: PUBLISH message with QOS but no message ID"); 339 // } else { 340 // cereal.grain(msgId); 341 // payloadLen -= 2; 342 // } 343 // } 344 } 345 346 MqttType getType(in ubyte[] bytes) { 347 return cast(MqttType)(bytes[0] >> 4); 348 }