1 module mqttd.broker;
2 
3 
4 import mqttd.message;
5 import std.algorithm;
6 import std.array;
7 import std.typecons;
8 import std.range;
9 import std.traits;
10 
11 
12 enum isTopicRange(R) = isInputRange!R && is(Unqual!(ElementType!R) == string);
13 
14 enum isInputRangeOf(R, T) = isInputRange!R && is(Unqual!(ElementType!R) == T);
15 
16 enum isMqttSubscriber(T) = is(typeof((){
17     const(ubyte)[] bytes;
18     auto sub = T.init;
19     sub.send(bytes);
20 }));
21 
22 struct MqttBroker(S) if(isMqttSubscriber!S) {
23 
24     void subscribe(R)(ref S subscriber, R topics)
25         if(isInputRange!R && is(Unqual!(ElementType!R) == string))
26     {
27         subscribe(subscriber, topics.map!(a => MqttSubscribe.Topic(a.idup, 0)));
28     }
29 
30     void subscribe(R)(ref S subscriber, R topics)
31         if(isInputRange!R && is(ElementType!R == MqttSubscribe.Topic))
32     {
33         invalidateCache();
34         foreach(topic; topics) {
35             auto subParts = topic.topic.splitter("/");
36             auto node = addOrFindNode(&_tree, subParts);
37             node.leaves ~= Subscription!(S)(subscriber, topic);
38         }
39     }
40 
41     void unsubscribe(ref S subscriber) {
42         static string[] topics;
43         unsubscribe(subscriber, topics);
44     }
45 
46     void unsubscribe(R)(ref S subscriber, R topics)
47         if(isInputRange!R && is(ElementType!R == string))
48     {
49         invalidateCache();
50         unsubscribeImpl(&_tree, subscriber, topics.array);
51     }
52 
53     void publish(in string topic, in ubyte[] payload) {
54         if(_useCache && topic in _cache) {
55             foreach(subscriber; _cache[topic]) subscriber.send(payload);
56             return;
57         }
58         auto pubParts = topic.splitter("/");
59         publishImpl(&_tree, pubParts, topic, payload);
60     }
61 
62     @property useCache(Flag!"useCache" useIt) {
63         _useCache = useIt;
64     }
65 
66 private:
67 
68     static struct Node {
69         Node*[string] children;
70         Subscription!S[] leaves;
71     }
72 
73     Flag!"useCache" _useCache;
74     Node _tree;
75     S*[][string] _cache;
76 
77     void invalidateCache() {
78         if(_useCache) _cache = _cache.init;
79     }
80 
81     Node* addOrFindNode(R)(Node* tree, R parts) if(isInputRange!R && is(ElementType!R == string)) {
82         if(parts.empty) return tree;
83 
84         //create if not already here
85         const part = parts.front.idup;
86         if(part !in tree.children) tree.children[part] = new Node;
87 
88         parts.popFront;
89         return addOrFindNode(tree.children[part], parts);
90     }
91 
92     static void unsubscribeImpl(Node* tree, ref S subscriber, in string[] topics) {
93         tree.leaves = tree.leaves.filter!(a => !a.isSubscriber(subscriber, topics)).array;
94 
95         if(tree.children.length == 0) return;
96         foreach(k, v; tree.children) {
97             unsubscribeImpl(v, subscriber, topics);
98         }
99     }
100 
101     void publishImpl(R1, R2)(Node* tree, R1 pubParts, in string topic, R2 bytes)
102         if(isTopicRange!R1 && isInputRangeOf!(R2, ubyte))
103     {
104 
105         if(pubParts.empty) return;
106 
107         immutable front = pubParts.front;
108         pubParts.popFront;
109 
110         foreach(part; only(front, "#", "+")) {
111             if(part in tree.children) {
112                 auto node = tree.children[part];
113 
114                 if(pubParts.empty || part == "#") publishNode(node, topic, bytes);
115 
116                 if(pubParts.empty && "#" in node.children) {
117                     //So that "finance/#" matches "finance"
118                     publishNode(node.children["#"], topic, bytes);
119                 }
120 
121                 publishImpl(node, pubParts, topic, bytes);
122             }
123         }
124     }
125 
126     void publishNode(R)(Node* node, in string topic, R bytes) if(isInputRangeOf!(R, ubyte)) {
127         foreach(ref subscription; node.leaves) {
128             subscription.send(bytes);
129             if(_useCache) _cache[topic.idup] ~= subscription._subscriber;
130         }
131     }
132 }
133 
134 
135 private struct Subscription(S) if(isMqttSubscriber!S) {
136     this(ref S subscriber, in MqttSubscribe.Topic topic) {
137         _subscriber = &subscriber;
138         _topic = topic.topic.idup;
139         _qos = topic.qos;
140     }
141 
142     void send(in ubyte[] bytes) {
143         _subscriber.send(bytes);
144     }
145 
146     bool isSubscriber(ref S subscriber, in string[] topics) @trusted const {
147         immutable isSameTopic = topics.empty || topics.canFind(_topic);
148         return isSameTopic && &subscriber == _subscriber;
149     }
150 
151     S* _subscriber;
152     immutable(string) _topic;
153     ubyte _qos;
154 }