1 module mqttd.server;
2 
3 
4 import mqttd.message;
5 import mqttd.broker;
6 import cerealed;
7 import std.stdio;
8 import std.algorithm;
9 import std.array;
10 import std.conv;
11 import std.typecons;
12 
13 
14 enum isMqttConnection(C) = isMqttSubscriber!C && is(typeof(() {
15     auto c = C.init;
16     c.disconnect();
17 }));
18 
19 
20 struct MqttServer(C) if(isMqttConnection!C) {
21 
22     this(Flag!"useCache" useCache = No.useCache) {
23         _broker = MqttBroker!C(useCache);
24     }
25 
26     void send(R)(ref C connection, R bytes) if(isInputRangeOf!(R, ubyte)) {
27 
28         immutable type = getType(bytes);
29 
30         switch(type) with(MqttType) {
31 
32             case CONNECT:
33                 auto code = MqttConnack.Code.ACCEPTED;
34                 auto connect = decerealise!MqttConnect(bytes);
35 
36                 if(connect.isBadClientId) {
37                     code = MqttConnack.Code.BAD_ID;
38                 }
39 
40                 MqttConnack(code).cerealise!(b => connection.send(b));
41                 break;
42 
43             case SUBSCRIBE:
44                 auto msg = decerealise!MqttSubscribe(bytes);
45                 _broker.subscribe(connection, msg.topics);
46                 const qos = msg.topics.map!(a => a.qos).array;
47                 MqttSuback(msg.msgId, qos).cerealise!(b => connection.send(b));
48                 break;
49 
50             case UNSUBSCRIBE:
51                 auto msg = decerealise!MqttUnsubscribe(bytes);
52                 _broker.unsubscribe(connection, msg.topics);
53                 MqttUnsuback(msg.msgId).cerealise!(b => connection.send(b));
54                 break;
55 
56             case PUBLISH:
57                 auto msg = decerealise!MqttPublish(bytes);
58                 _broker.publish(msg.topic, bytes);
59                 break;
60 
61             case PINGREQ:
62                 MqttFixedHeader(MqttType.PINGRESP).cerealise!(b => connection.send(b));
63                 break;
64 
65             case DISCONNECT:
66                 _broker.unsubscribe(connection);
67                 connection.disconnect;
68                 break;
69 
70             default:
71                 throw new Exception(text("Don't know how to handle message of type ", type));
72         }
73     }
74 
75     @property useCache(Flag!"useCache" useIt) {
76         _broker.useCache = useIt;
77     }
78 
79 
80 private:
81 
82     MqttBroker!C _broker;
83 }