1 module tests.broker;
2 
3 import unit_threaded;
4 import mqttd.broker;
5 import mqttd.message;
6 import std.algorithm;
7 import std.typecons;
8 
9 struct TestMqttSubscriber {
10     alias Payload = ubyte[];
11 
12     static int sIndex;
13     void send(in ubyte[] bytes) {
14         if(!index) index = ++sIndex;
15         import std.stdio;
16         writelnUt("New message: ", bytes, ", index: ", index);
17         messages ~= bytes;
18     }
19 
20     const(Payload)[] messages;
21     int index;
22 
23     static assert(isMqttSubscriber!TestMqttSubscriber);
24 }
25 
26 
27 @(Yes.useCache, No.useCache)
28 void testSubscribe(Flag!"useCache" useCache) {
29     auto broker = MqttBroker!TestMqttSubscriber(useCache);
30 
31     auto subscriber = TestMqttSubscriber();
32     broker.publish("topics/foo", [2, 4, 6]);
33     shouldEqual(subscriber.messages, []);
34 
35     broker.subscribe(subscriber, ["topics/foo"]);
36     broker.publish("topics/foo", [2, 4, 6]);
37     broker.publish("topics/bar", [1, 3, 5, 7]);
38     shouldEqual(subscriber.messages, [[2, 4, 6]]);
39 
40     broker.subscribe(subscriber, ["topics/bar"]);
41     broker.publish("topics/foo", [2, 4, 6]);
42     broker.publish("topics/bar", [1, 3, 5, 7]);
43     shouldEqual(subscriber.messages, [[2, 4, 6], [2, 4, 6], [1, 3, 5, 7]]);
44 }
45 
46 @(Yes.useCache, No.useCache)
47 void testUnsubscribeAll(Flag!"useCache" useCache) {
48     auto broker = MqttBroker!TestMqttSubscriber(useCache);
49     auto subscriber = TestMqttSubscriber();
50 
51     broker.subscribe(subscriber, ["topics/foo"]);
52     broker.publish("topics/foo", [2, 4, 6]);
53     broker.publish("topics/bar", [1, 3, 5, 7]);
54     shouldEqual(subscriber.messages, [[2, 4, 6]]);
55 
56     broker.unsubscribe(subscriber);
57     broker.publish("topics/foo", [2, 4, 6]);
58     broker.publish("topics/bar", [1, 3, 5, 7]);
59     shouldEqual(subscriber.messages, [[2, 4, 6]]); //shouldn't have changed
60 }
61 
62 @(Yes.useCache, No.useCache)
63 void testUnsubscribeOne(Flag!"useCache" useCache) {
64     auto broker = MqttBroker!TestMqttSubscriber(useCache);
65     auto subscriber = TestMqttSubscriber();
66 
67     broker.subscribe(subscriber, ["topics/foo", "topics/bar"]);
68     broker.publish("topics/foo", [2, 4, 6]);
69     broker.publish("topics/bar", [1, 3, 5, 7]);
70     broker.publish("topics/baz", [9, 8, 7, 6, 5]);
71     shouldEqual(subscriber.messages, [[2, 4, 6], [1, 3, 5, 7]]);
72 
73     broker.unsubscribe(subscriber, ["topics/foo"]);
74     broker.publish("topics/foo", [2, 4, 6]);
75     broker.publish("topics/bar", [1, 3, 5, 7]);
76     broker.publish("topics/baz", [9, 8, 7, 6, 5]);
77     shouldEqual(subscriber.messages, [[2, 4, 6], [1, 3, 5, 7], [1, 3, 5, 7]]);
78 }
79 
80 
81 private void checkMatches(in string pubTopic, in string subTopic, bool matches) {
82     foreach(useCache; [Yes.useCache, No.useCache]) {
83         auto broker = MqttBroker!TestMqttSubscriber(useCache);
84         auto subscriber = TestMqttSubscriber();
85 
86         broker.subscribe(subscriber, [subTopic]);
87         broker.publish(pubTopic, [1, 2, 3, 4]);
88         const expected = matches ? [[1, 2, 3, 4]] : [];
89         writelnUt("checkMatches, subTopic is ", subTopic, " pubTopic is ", pubTopic,
90                   ", matches is ", matches);
91         shouldEqual(subscriber.messages, expected);
92     }
93 }
94 
95 void testWildCards() {
96    checkMatches("foo/bar/baz", "foo/bar/baz", true);
97    checkMatches("foo/bar", "foo/+", true);
98    checkMatches("foo/baz", "foo/+", true);
99    checkMatches("foo/bar/baz", "foo/+", false);
100    checkMatches("foo/bar", "foo/#", true);
101    checkMatches("foo/bar/baz", "foo/#", true);
102    checkMatches("foo/bar/baz/boo", "foo/#", true);
103    checkMatches("foo/bla/bar/baz/boo/bogadog", "foo/+/bar/baz/#", true);
104    checkMatches("finance", "finance/#", true);
105    checkMatches("finance", "finance#", false);
106    checkMatches("finance", "#", true);
107    checkMatches("finance/stock", "#", true);
108    checkMatches("finance/stock", "finance/stock/ibm", false);
109    checkMatches("topics/foo/bar", "topics/foo/#", true);
110    checkMatches("topics/bar/baz/boo", "topics/foo/#", false);
111 }
112 
113 @(Yes.useCache, No.useCache)
114 void testSubscribeWithWildCards(Flag!"useCache" useCache) {
115 
116     auto broker = MqttBroker!TestMqttSubscriber(useCache);
117     auto subscriber1 = TestMqttSubscriber();
118 
119     broker.subscribe(subscriber1, ["topics/foo/+"]);
120     broker.publish("topics/foo/bar", [3]);
121     broker.publish("topics/bar/baz/boo", [4]); //shouldn't get this one
122     shouldEqual(subscriber1.messages, [[3]]);
123 
124     auto subscriber2 = TestMqttSubscriber();
125     broker.subscribe(subscriber2, ["topics/foo/#"]);
126     broker.publish("topics/foo/bar", [3]);
127     broker.publish("topics/bar/baz/boo", [4]);
128 
129     shouldEqual(subscriber1.messages, [[3], [3]]);
130     shouldEqual(subscriber2.messages, [[3]]);
131 
132     auto subscriber3 = TestMqttSubscriber();
133     broker.subscribe(subscriber3, ["topics/+/bar"]);
134     auto subscriber4 = TestMqttSubscriber();
135     broker.subscribe(subscriber4, ["topics/#"]);
136 
137     broker.publish("topics/foo/bar", [3]);
138     broker.publish("topics/bar/baz/boo", [4]);
139     broker.publish("topics/boo/bar/zoo", [5]);
140     broker.publish("topics/foo/bar/zoo", [6]);
141     broker.publish("topics/bbobobobo/bar", [7]);
142 
143     shouldEqual(subscriber1.messages, [[3], [3], [3]]);
144     shouldEqual(subscriber2.messages, [[3], [3], [6]]);
145     shouldEqual(subscriber3.messages, [[3], [7]]);
146     shouldEqual(subscriber4.messages, [[3], [4], [5], [6], [7]]);
147 }
148 
149 @(Yes.useCache, No.useCache)
150 void testPlus(Flag!"useCache" useCache) {
151     auto broker = MqttBroker!TestMqttSubscriber(useCache);
152     auto subscriber = TestMqttSubscriber();
153 
154     broker.publish("foo/bar/baz", [1, 2, 3, 4]);
155     subscriber.messages.shouldBeEmpty;
156 
157     broker.subscribe(subscriber, [MqttSubscribe.Topic("foo/bar/+", 0)]);
158     broker.publish("foo/bar/baz", [1, 2, 3, 4]);
159     broker.publish("foo/boogagoo", [9, 8, 7]);
160     subscriber.messages.shouldEqual([[1, 2, 3, 4]]);
161 }