1 module mqttd.broker; 2 3 4 import mqttd.message; 5 import std.algorithm; 6 import std.array; 7 import std.typecons; 8 import std.range; 9 import std.traits; 10 11 12 enum isTopicRange(R) = isInputRange!R && is(Unqual!(ElementType!R) == string); 13 14 enum isInputRangeOf(R, T) = isInputRange!R && is(Unqual!(ElementType!R) == T); 15 16 enum isMqttSubscriber(T) = is(typeof((){ 17 const(ubyte)[] bytes; 18 auto sub = T.init; 19 sub.send(bytes); 20 })); 21 22 struct MqttBroker(S) if(isMqttSubscriber!S) { 23 24 void subscribe(R)(ref S subscriber, R topics) 25 if(isInputRange!R && is(Unqual!(ElementType!R) == string)) 26 { 27 subscribe(subscriber, topics.map!(a => MqttSubscribe.Topic(a.idup, 0))); 28 } 29 30 void subscribe(R)(ref S subscriber, R topics) 31 if(isInputRange!R && is(ElementType!R == MqttSubscribe.Topic)) 32 { 33 invalidateCache(); 34 foreach(topic; topics) { 35 auto subParts = topic.topic.splitter("/"); 36 auto node = addOrFindNode(&_tree, subParts); 37 node.leaves ~= Subscription!(S)(subscriber, topic); 38 } 39 } 40 41 void unsubscribe(ref S subscriber) { 42 static string[] topics; 43 unsubscribe(subscriber, topics); 44 } 45 46 void unsubscribe(R)(ref S subscriber, R topics) 47 if(isInputRange!R && is(ElementType!R == string)) 48 { 49 invalidateCache(); 50 unsubscribeImpl(&_tree, subscriber, topics.array); 51 } 52 53 void publish(in string topic, in ubyte[] payload) { 54 if(_useCache && topic in _cache) { 55 foreach(subscriber; _cache[topic]) subscriber.send(payload); 56 return; 57 } 58 auto pubParts = topic.splitter("/"); 59 publishImpl(&_tree, pubParts, topic, payload); 60 } 61 62 @property useCache(Flag!"useCache" useIt) { 63 _useCache = useIt; 64 } 65 66 private: 67 68 static struct Node { 69 Node*[string] children; 70 Subscription!S[] leaves; 71 } 72 73 Flag!"useCache" _useCache; 74 Node _tree; 75 S*[][string] _cache; 76 77 void invalidateCache() { 78 if(_useCache) _cache = _cache.init; 79 } 80 81 Node* addOrFindNode(R)(Node* tree, R parts) if(isInputRange!R && is(ElementType!R == string)) { 82 if(parts.empty) return tree; 83 84 //create if not already here 85 const part = parts.front.idup; 86 if(part !in tree.children) tree.children[part] = new Node; 87 88 parts.popFront; 89 return addOrFindNode(tree.children[part], parts); 90 } 91 92 static void unsubscribeImpl(Node* tree, ref S subscriber, in string[] topics) { 93 tree.leaves = tree.leaves.filter!(a => !a.isSubscriber(subscriber, topics)).array; 94 95 if(tree.children.length == 0) return; 96 foreach(k, v; tree.children) { 97 unsubscribeImpl(v, subscriber, topics); 98 } 99 } 100 101 void publishImpl(R1, R2)(Node* tree, R1 pubParts, in string topic, R2 bytes) 102 if(isTopicRange!R1 && isInputRangeOf!(R2, ubyte)) 103 { 104 105 if(pubParts.empty) return; 106 107 immutable front = pubParts.front; 108 pubParts.popFront; 109 110 foreach(part; only(front, "#", "+")) { 111 if(part in tree.children) { 112 auto node = tree.children[part]; 113 114 if(pubParts.empty || part == "#") publishNode(node, topic, bytes); 115 116 if(pubParts.empty && "#" in node.children) { 117 //So that "finance/#" matches "finance" 118 publishNode(node.children["#"], topic, bytes); 119 } 120 121 publishImpl(node, pubParts, topic, bytes); 122 } 123 } 124 } 125 126 void publishNode(R)(Node* node, in string topic, R bytes) if(isInputRangeOf!(R, ubyte)) { 127 foreach(ref subscription; node.leaves) { 128 subscription.send(bytes); 129 if(_useCache) _cache[topic.idup] ~= subscription._subscriber; 130 } 131 } 132 } 133 134 135 private struct Subscription(S) if(isMqttSubscriber!S) { 136 this(ref S subscriber, in MqttSubscribe.Topic topic) { 137 _subscriber = &subscriber; 138 _topic = topic.topic.idup; 139 _qos = topic.qos; 140 } 141 142 void send(in ubyte[] bytes) { 143 _subscriber.send(bytes); 144 } 145 146 bool isSubscriber(ref S subscriber, in string[] topics) @trusted const { 147 immutable isSameTopic = topics.empty || topics.canFind(_topic); 148 return isSameTopic && &subscriber == _subscriber; 149 } 150 151 S* _subscriber; 152 immutable(string) _topic; 153 ubyte _qos; 154 }