1 module tests.broker; 2 3 import unit_threaded; 4 import mqttd.broker; 5 import mqttd.message; 6 import std.algorithm; 7 import std.typecons; 8 9 struct TestMqttSubscriber { 10 alias Payload = ubyte[]; 11 12 static int sIndex; 13 void send(in ubyte[] bytes) { 14 if(!index) index = ++sIndex; 15 import std.stdio; 16 writelnUt("New message: ", bytes, ", index: ", index); 17 messages ~= bytes; 18 } 19 20 const(Payload)[] messages; 21 int index; 22 23 static assert(isMqttSubscriber!TestMqttSubscriber); 24 } 25 26 27 @(Yes.useCache, No.useCache) 28 void testSubscribe(Flag!"useCache" useCache) { 29 auto broker = MqttBroker!TestMqttSubscriber(useCache); 30 31 auto subscriber = TestMqttSubscriber(); 32 broker.publish("topics/foo", [2, 4, 6]); 33 shouldEqual(subscriber.messages, []); 34 35 broker.subscribe(subscriber, ["topics/foo"]); 36 broker.publish("topics/foo", [2, 4, 6]); 37 broker.publish("topics/bar", [1, 3, 5, 7]); 38 shouldEqual(subscriber.messages, [[2, 4, 6]]); 39 40 broker.subscribe(subscriber, ["topics/bar"]); 41 broker.publish("topics/foo", [2, 4, 6]); 42 broker.publish("topics/bar", [1, 3, 5, 7]); 43 shouldEqual(subscriber.messages, [[2, 4, 6], [2, 4, 6], [1, 3, 5, 7]]); 44 } 45 46 @(Yes.useCache, No.useCache) 47 void testUnsubscribeAll(Flag!"useCache" useCache) { 48 auto broker = MqttBroker!TestMqttSubscriber(useCache); 49 auto subscriber = TestMqttSubscriber(); 50 51 broker.subscribe(subscriber, ["topics/foo"]); 52 broker.publish("topics/foo", [2, 4, 6]); 53 broker.publish("topics/bar", [1, 3, 5, 7]); 54 shouldEqual(subscriber.messages, [[2, 4, 6]]); 55 56 broker.unsubscribe(subscriber); 57 broker.publish("topics/foo", [2, 4, 6]); 58 broker.publish("topics/bar", [1, 3, 5, 7]); 59 shouldEqual(subscriber.messages, [[2, 4, 6]]); //shouldn't have changed 60 } 61 62 @(Yes.useCache, No.useCache) 63 void testUnsubscribeOne(Flag!"useCache" useCache) { 64 auto broker = MqttBroker!TestMqttSubscriber(useCache); 65 auto subscriber = TestMqttSubscriber(); 66 67 broker.subscribe(subscriber, ["topics/foo", "topics/bar"]); 68 broker.publish("topics/foo", [2, 4, 6]); 69 broker.publish("topics/bar", [1, 3, 5, 7]); 70 broker.publish("topics/baz", [9, 8, 7, 6, 5]); 71 shouldEqual(subscriber.messages, [[2, 4, 6], [1, 3, 5, 7]]); 72 73 broker.unsubscribe(subscriber, ["topics/foo"]); 74 broker.publish("topics/foo", [2, 4, 6]); 75 broker.publish("topics/bar", [1, 3, 5, 7]); 76 broker.publish("topics/baz", [9, 8, 7, 6, 5]); 77 shouldEqual(subscriber.messages, [[2, 4, 6], [1, 3, 5, 7], [1, 3, 5, 7]]); 78 } 79 80 81 private void checkMatches(in string pubTopic, in string subTopic, bool matches) { 82 foreach(useCache; [Yes.useCache, No.useCache]) { 83 auto broker = MqttBroker!TestMqttSubscriber(useCache); 84 auto subscriber = TestMqttSubscriber(); 85 86 broker.subscribe(subscriber, [subTopic]); 87 broker.publish(pubTopic, [1, 2, 3, 4]); 88 const expected = matches ? [[1, 2, 3, 4]] : []; 89 writelnUt("checkMatches, subTopic is ", subTopic, " pubTopic is ", pubTopic, 90 ", matches is ", matches); 91 shouldEqual(subscriber.messages, expected); 92 } 93 } 94 95 void testWildCards() { 96 checkMatches("foo/bar/baz", "foo/bar/baz", true); 97 checkMatches("foo/bar", "foo/+", true); 98 checkMatches("foo/baz", "foo/+", true); 99 checkMatches("foo/bar/baz", "foo/+", false); 100 checkMatches("foo/bar", "foo/#", true); 101 checkMatches("foo/bar/baz", "foo/#", true); 102 checkMatches("foo/bar/baz/boo", "foo/#", true); 103 checkMatches("foo/bla/bar/baz/boo/bogadog", "foo/+/bar/baz/#", true); 104 checkMatches("finance", "finance/#", true); 105 checkMatches("finance", "finance#", false); 106 checkMatches("finance", "#", true); 107 checkMatches("finance/stock", "#", true); 108 checkMatches("finance/stock", "finance/stock/ibm", false); 109 checkMatches("topics/foo/bar", "topics/foo/#", true); 110 checkMatches("topics/bar/baz/boo", "topics/foo/#", false); 111 } 112 113 @(Yes.useCache, No.useCache) 114 void testSubscribeWithWildCards(Flag!"useCache" useCache) { 115 116 auto broker = MqttBroker!TestMqttSubscriber(useCache); 117 auto subscriber1 = TestMqttSubscriber(); 118 119 broker.subscribe(subscriber1, ["topics/foo/+"]); 120 broker.publish("topics/foo/bar", [3]); 121 broker.publish("topics/bar/baz/boo", [4]); //shouldn't get this one 122 shouldEqual(subscriber1.messages, [[3]]); 123 124 auto subscriber2 = TestMqttSubscriber(); 125 broker.subscribe(subscriber2, ["topics/foo/#"]); 126 broker.publish("topics/foo/bar", [3]); 127 broker.publish("topics/bar/baz/boo", [4]); 128 129 shouldEqual(subscriber1.messages, [[3], [3]]); 130 shouldEqual(subscriber2.messages, [[3]]); 131 132 auto subscriber3 = TestMqttSubscriber(); 133 broker.subscribe(subscriber3, ["topics/+/bar"]); 134 auto subscriber4 = TestMqttSubscriber(); 135 broker.subscribe(subscriber4, ["topics/#"]); 136 137 broker.publish("topics/foo/bar", [3]); 138 broker.publish("topics/bar/baz/boo", [4]); 139 broker.publish("topics/boo/bar/zoo", [5]); 140 broker.publish("topics/foo/bar/zoo", [6]); 141 broker.publish("topics/bbobobobo/bar", [7]); 142 143 shouldEqual(subscriber1.messages, [[3], [3], [3]]); 144 shouldEqual(subscriber2.messages, [[3], [3], [6]]); 145 shouldEqual(subscriber3.messages, [[3], [7]]); 146 shouldEqual(subscriber4.messages, [[3], [4], [5], [6], [7]]); 147 } 148 149 @(Yes.useCache, No.useCache) 150 void testPlus(Flag!"useCache" useCache) { 151 auto broker = MqttBroker!TestMqttSubscriber(useCache); 152 auto subscriber = TestMqttSubscriber(); 153 154 broker.publish("foo/bar/baz", [1, 2, 3, 4]); 155 subscriber.messages.shouldBeEmpty; 156 157 broker.subscribe(subscriber, [MqttSubscribe.Topic("foo/bar/+", 0)]); 158 broker.publish("foo/bar/baz", [1, 2, 3, 4]); 159 broker.publish("foo/boogagoo", [9, 8, 7]); 160 subscriber.messages.shouldEqual([[1, 2, 3, 4]]); 161 }