1 module mqttd.stream; 2 3 4 enum isMqttInput(T) = is(typeof(() { 5 ubyte[] bytes; 6 auto t = T.init; 7 t.read(bytes); 8 })); 9 10 @safe: 11 12 13 /** 14 Abstracts a stream protocol such as TCP so that we can get discrete 15 MQTT messages out of it. 16 */ 17 struct MqttStream { 18 19 import mqttd.server: MqttServer; 20 import mqttd.broker: isMqttSubscriber; 21 22 this(int bufferSize) pure nothrow { 23 _buffer = new ubyte[bufferSize]; 24 _bytes = _buffer[0..0]; 25 } 26 27 void opOpAssign(string op: "~")(ubyte[] bytes) { 28 struct Input { 29 void read(ubyte[] buf) { 30 import std.algorithm: copy; 31 copy(bytes, buf); 32 } 33 static assert(isMqttInput!Input); 34 } 35 read(new Input, bytes.length); 36 } 37 38 void read(T)(auto ref T input, size_t size) @trusted if(isMqttInput!T) { 39 resetBuffer; 40 41 immutable end = _bytesRead + size; 42 input.read(_buffer[_bytesRead .. end]); 43 44 _bytesRead += size; 45 _bytes = _buffer[0 .. _bytesRead]; 46 47 updateLastMessageSize; 48 } 49 50 51 bool hasMessages() pure nothrow { 52 import mqttd.message: MqttFixedHeader; 53 return 54 _lastMessageSize >= MqttFixedHeader.SIZE 55 && _bytes.length >= _lastMessageSize 56 ; 57 } 58 59 const(ubyte)[] popNextMessageBytes() { 60 if(!hasMessages) return []; 61 62 auto ret = nextMessageBytes; 63 _bytes = _bytes[ret.length .. $]; 64 65 updateLastMessageSize; 66 return ret; 67 } 68 69 void handleMessages(T)(ref MqttServer!T server, ref T connection) @trusted if(isMqttSubscriber!T) { 70 while(hasMessages) server.send(connection, popNextMessageBytes); 71 } 72 73 auto bufferSize() const pure nothrow @safe { 74 return _buffer.length; 75 } 76 77 private: 78 79 ubyte[] _buffer; //the underlying storage 80 ubyte[] _bytes; //the current bytes held 81 int _lastMessageSize; 82 int _bytesStart; //the starting position 83 ulong _bytesRead; //what it says 84 85 void updateLastMessageSize() { 86 _lastMessageSize = nextMessageSize; 87 } 88 89 const(ubyte)[] nextMessageBytes() const { 90 return _bytes[0 .. nextMessageSize]; 91 } 92 93 int nextMessageSize() const { 94 import mqttd.message: MqttFixedHeader; 95 import cerealed: Decerealiser; 96 97 if(_bytes.length < MqttFixedHeader.SIZE) return 0; 98 99 auto dec = Decerealiser(_bytes); 100 return dec.value!MqttFixedHeader.remaining + MqttFixedHeader.SIZE; 101 } 102 103 //@trusted because of copy 104 void resetBuffer() @trusted pure nothrow { 105 import std.algorithm: copy; 106 107 copy(_bytes, _buffer); 108 _bytesRead = _bytes.length; 109 _bytes = _buffer[0 .. _bytesRead]; 110 } 111 } 112 113 114 /** 115 Satisfies the `isMqttConnection` interface for streams (e.g. TCP) 116 Delegates actually reading or writing data to a templated channel. 117 */ 118 struct MqttStreamConnection(Channel) { 119 120 import mqttd.server: MqttServer, isMqttConnection; 121 import mqttd.stream: MqttStream, isMqttInput; 122 123 this(Channel channel) { 124 _channel = channel; 125 _connected = true; 126 enum bufferSize = 1024 * 512; 127 _stream = MqttStream(bufferSize); 128 } 129 130 void read(ubyte[] bytes) { 131 _channel.read(bytes); 132 } 133 134 void send(in ubyte[] bytes) { 135 if(connected) { 136 _channel.write(bytes); 137 } 138 } 139 140 void run(ref MqttServer!(typeof(this)) server) { 141 import mqttd.log: error; 142 import std.datetime: seconds; 143 144 while(connected) { 145 if(!_channel.waitForData(60.seconds) ) { 146 error("Persistent connection timeout!"); 147 _connected = false; 148 break; 149 } 150 151 read(server); 152 } 153 154 _connected = false; 155 } 156 157 @property bool connected() const { 158 return _channel.connected && _connected; 159 } 160 161 void disconnect() { 162 _connected = false; 163 _channel.close(); 164 } 165 166 private: 167 168 Channel _channel; 169 bool _connected; 170 MqttStream _stream; 171 172 void read(ref MqttServer!(typeof(this)) server) { 173 import std.conv: text; 174 175 while(connected && !_channel.empty) { 176 if(_channel.leastSize > _stream.bufferSize) { 177 throw new Exception( 178 text("Too many bytes (", _channel.leastSize, 179 " for puny stream buffer (", _stream.bufferSize, ")")); 180 } 181 _stream.read(this, _channel.leastSize); 182 _stream.handleMessages(server, this); 183 } 184 } 185 186 static assert(isMqttConnection!(typeof(this))); 187 static assert(isMqttInput!(typeof(this))); 188 }