1 module mqttd.server; 2 3 4 import mqttd.message; 5 import mqttd.broker; 6 import cerealed; 7 import std.stdio; 8 import std.algorithm; 9 import std.array; 10 import std.conv; 11 import std.typecons; 12 13 14 enum isMqttConnection(C) = isMqttSubscriber!C && is(typeof(() { 15 auto c = C.init; 16 c.disconnect(); 17 })); 18 19 20 struct MqttServer(C) if(isMqttConnection!C) { 21 22 this(Flag!"useCache" useCache = No.useCache) { 23 _broker = MqttBroker!C(useCache); 24 } 25 26 void send(R)(ref C connection, R bytes) if(isInputRangeOf!(R, ubyte)) { 27 28 immutable type = getType(bytes); 29 30 switch(type) with(MqttType) { 31 32 case CONNECT: 33 auto code = MqttConnack.Code.ACCEPTED; 34 auto connect = decerealise!MqttConnect(bytes); 35 36 if(connect.isBadClientId) { 37 code = MqttConnack.Code.BAD_ID; 38 } 39 40 MqttConnack(code).cerealise!(b => connection.send(b)); 41 break; 42 43 case SUBSCRIBE: 44 auto msg = decerealise!MqttSubscribe(bytes); 45 _broker.subscribe(connection, msg.topics); 46 const qos = msg.topics.map!(a => a.qos).array; 47 MqttSuback(msg.msgId, qos).cerealise!(b => connection.send(b)); 48 break; 49 50 case UNSUBSCRIBE: 51 auto msg = decerealise!MqttUnsubscribe(bytes); 52 _broker.unsubscribe(connection, msg.topics); 53 MqttUnsuback(msg.msgId).cerealise!(b => connection.send(b)); 54 break; 55 56 case PUBLISH: 57 auto msg = decerealise!MqttPublish(bytes); 58 _broker.publish(msg.topic, bytes); 59 break; 60 61 case PINGREQ: 62 MqttFixedHeader(MqttType.PINGRESP).cerealise!(b => connection.send(b)); 63 break; 64 65 case DISCONNECT: 66 _broker.unsubscribe(connection); 67 connection.disconnect; 68 break; 69 70 default: 71 throw new Exception(text("Don't know how to handle message of type ", type)); 72 } 73 } 74 75 @property useCache(Flag!"useCache" useIt) { 76 _broker.useCache = useIt; 77 } 78 79 80 private: 81 82 MqttBroker!C _broker; 83 }