From 09b10cc80146c1ac2a0d5c53c6c8469b934189f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?louiz=E2=80=99?= Date: Mon, 25 Jun 2018 22:54:32 +0200 Subject: Throttle all commands sent to IRC servers fix #3354 --- CHANGELOG.rst | 3 ++ doc/biboumi.1.rst | 6 ++++ src/database/database.hpp | 5 ++- src/irc/irc_client.cpp | 63 ++++++++++++++++++++++++------------- src/irc/irc_client.hpp | 12 +++++-- src/irc/irc_message.hpp | 4 +-- src/utils/tokens_bucket.hpp | 58 ++++++++++++++++++++++++++++++++++ src/xmpp/biboumi_adhoc_commands.cpp | 26 ++++++++++++++- tests/end_to_end/__main__.py | 29 ++++++++++++----- 9 files changed, 172 insertions(+), 34 deletions(-) create mode 100644 src/utils/tokens_bucket.hpp diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 8d00fb6..d668d57 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -5,6 +5,9 @@ Version 9.0 - Messages from unjoined resources are now rejected instead of being accepted. This helps clients understand that they are not in the room (because of some connection issue for example). +- All commands sent to IRC servers are now throttled to avoid being + disconnected for excess flood. The limit value can be customized using the + ad-hoc configuration form on a server JID. Version 8.3 - 2018-06-01 ======================== diff --git a/doc/biboumi.1.rst b/doc/biboumi.1.rst index 5d6facc..ebf45b6 100644 --- a/doc/biboumi.1.rst +++ b/doc/biboumi.1.rst @@ -682,6 +682,12 @@ On a server JID (e.g on the JID chat.freenode.org@biboumi.example.com) this is NOT a password that will be sent to NickServ (or some author authentication service), some server (notably Freenode) use it as if it was sent to NickServ to identify your nickname. + * Throttle limit: specifies a number of messages that can be sent + without a limit, before the throttling takes place. When messages + are throttled, only one command per second is sent to the server. + The default is 10. You can lower this value if you are ever kicked + for excess flood. If the value is 0, all messages are throttled. To + disable this feature, just set a high value, like 999. - get-irc-connection-info: Returns some information about the IRC server, for the executing user. It lets the user know if they are connected to diff --git a/src/database/database.hpp b/src/database/database.hpp index 3e25b30..5f637bd 100644 --- a/src/database/database.hpp +++ b/src/database/database.hpp @@ -86,13 +86,16 @@ class Database struct Address: Column { static constexpr auto name = "address_"; }; + struct ThrottleLimit: Column { static constexpr auto name = "throttlelimit_"; + ThrottleLimit(): Column(10) {} }; + using MucLogLineTable = Table; using MucLogLine = MucLogLineTable::RowType; using GlobalOptionsTable = Table; using GlobalOptions = GlobalOptionsTable::RowType; - using IrcServerOptionsTable = Table; + using IrcServerOptionsTable = Table; using IrcServerOptions = IrcServerOptionsTable::RowType; using IrcChannelOptionsTable = Table; diff --git a/src/irc/irc_client.cpp b/src/irc/irc_client.cpp index d5f872b..5a2f09b 100644 --- a/src/irc/irc_client.cpp +++ b/src/irc/irc_client.cpp @@ -135,7 +135,7 @@ IrcClient::IrcClient(std::shared_ptr& poller, std::string hostname, std::string realname, std::string user_hostname, Bridge& bridge): TCPClientSocketHandler(poller), - hostname(std::move(hostname)), + hostname(hostname), user_hostname(std::move(user_hostname)), username(std::move(username)), realname(std::move(realname)), @@ -143,7 +143,14 @@ IrcClient::IrcClient(std::shared_ptr& poller, std::string hostname, bridge(bridge), welcomed(false), chanmodes({"", "", "", ""}), - chantypes({'#', '&'}) + chantypes({'#', '&'}), + tokens_bucket(Database::get_irc_server_options(bridge.get_bare_jid(), hostname).col(), 1s, [this]() { + if (message_queue.empty()) + return true; + this->actual_send(std::move(this->message_queue.front())); + this->message_queue.pop_front(); + return false; + }, "TokensBucket" + this->hostname + this->bridge.get_jid()) { #ifdef USE_DATABASE auto options = Database::get_irc_server_options(this->bridge.get_bare_jid(), @@ -171,6 +178,7 @@ IrcClient::~IrcClient() // This event may or may not exist (if we never got connected, it // doesn't), but it's ok TimedEventsManager::instance().cancel("PING" + this->hostname + this->bridge.get_jid()); + TimedEventsManager::instance().cancel("TokensBucket" + this->hostname + this->bridge.get_jid()); } void IrcClient::start() @@ -390,25 +398,33 @@ void IrcClient::parse_in_buffer(const size_t) } } -void IrcClient::send_message(IrcMessage&& message) +void IrcClient::actual_send(const IrcMessage& message) { - log_debug("IRC SENDING: (", this->get_hostname(), ") ", message); - std::string res; - if (!message.prefix.empty()) - res += ":" + std::move(message.prefix) + " "; - res += message.command; - for (const std::string& arg: message.arguments) - { - if (arg.find(' ') != std::string::npos || - (!arg.empty() && arg[0] == ':')) - { - res += " :" + arg; - break; - } - res += " " + arg; - } - res += "\r\n"; - this->send_data(std::move(res)); + log_debug("IRC SENDING: (", this->get_hostname(), ") ", message); + std::string res; + if (!message.prefix.empty()) + res += ":" + message.prefix + " "; + res += message.command; + for (const std::string& arg: message.arguments) + { + if (arg.find(' ') != std::string::npos + || (!arg.empty() && arg[0] == ':')) + { + res += " :" + arg; + break; + } + res += " " + arg; + } + res += "\r\n"; + this->send_data(std::move(res)); + } + +void IrcClient::send_message(IrcMessage message, bool throttle) +{ + if (this->tokens_bucket.use_token() || !throttle) + this->actual_send(message); + else + message_queue.push_back(std::move(message)); } void IrcClient::send_raw(const std::string& txt) @@ -459,7 +475,7 @@ void IrcClient::send_topic_command(const std::string& chan_name, const std::stri void IrcClient::send_quit_command(const std::string& reason) { - this->send_message(IrcMessage("QUIT", {reason})); + this->send_message(IrcMessage("QUIT", {reason}), false); } void IrcClient::send_join_command(const std::string& chan_name, const std::string& password) @@ -1225,6 +1241,11 @@ void IrcClient::on_channel_mode(const IrcMessage& message) } } +void IrcClient::set_throttle_limit(std::size_t limit) +{ + this->tokens_bucket.set_limit(limit); +} + void IrcClient::on_user_mode(const IrcMessage& message) { this->bridge.send_xmpp_message(this->hostname, "", diff --git a/src/irc/irc_client.hpp b/src/irc/irc_client.hpp index 70046be..ac5ccb0 100644 --- a/src/irc/irc_client.hpp +++ b/src/irc/irc_client.hpp @@ -16,8 +16,10 @@ #include #include #include +#include #include #include +#include class Bridge; @@ -84,8 +86,9 @@ public: * (actually, into our out_buf and signal the poller that we want to wach * for send events to be ready) */ - void send_message(IrcMessage&& message); + void send_message(IrcMessage message, bool throttle=true); void send_raw(const std::string& txt); + void actual_send(const IrcMessage& message); /** * Send the PONG irc command */ @@ -293,7 +296,7 @@ public: const std::vector& get_sorted_user_modes() const { return this->sorted_user_modes; } std::set get_chantypes() const { return this->chantypes; } - + void set_throttle_limit(std::size_t limit); /** * Store the history limit that the client asked when joining this room. */ @@ -330,6 +333,10 @@ private: * To communicate back with the bridge */ Bridge& bridge; + /** + * Where messaged are stored when they are throttled. + */ + std::deque message_queue{}; /** * The list of joined channels, indexed by name */ @@ -389,6 +396,7 @@ private: * the WebIRC protocole. */ Resolver dns_resolver; + TokensBucket tokens_bucket; }; diff --git a/src/irc/irc_message.hpp b/src/irc/irc_message.hpp index fe954e4..269a12a 100644 --- a/src/irc/irc_message.hpp +++ b/src/irc/irc_message.hpp @@ -14,9 +14,9 @@ public: ~IrcMessage() = default; IrcMessage(const IrcMessage&) = delete; - IrcMessage(IrcMessage&&) = delete; + IrcMessage(IrcMessage&&) = default; IrcMessage& operator=(const IrcMessage&) = delete; - IrcMessage& operator=(IrcMessage&&) = delete; + IrcMessage& operator=(IrcMessage&&) = default; std::string prefix; std::string command; diff --git a/src/utils/tokens_bucket.hpp b/src/utils/tokens_bucket.hpp new file mode 100644 index 0000000..d44eb06 --- /dev/null +++ b/src/utils/tokens_bucket.hpp @@ -0,0 +1,58 @@ +/** + * Implementation of the token bucket algorithm. + * + * It uses a repetitive TimedEvent, started at construction, to fill the + * bucket. + * + * Every n seconds, it executes the given callback. If the callback + * returns true, we add a token (if the limit is not yet reached). + * + */ + +#pragma once + +#include +#include + +class TokensBucket +{ +public: + TokensBucket(std::size_t max_size, std::chrono::milliseconds fill_duration, std::function callback, std::string name): + limit(max_size), + tokens(limit), + fill_duration(fill_duration), + callback(std::move(callback)) + { + log_debug("creating TokensBucket with max size: ", max_size); + TimedEvent event(std::move(fill_duration), [this]() { this->add_token(); }, std::move(name)); + TimedEventsManager::instance().add_event(std::move(event)); + } + + bool use_token() + { + if (this->tokens > 0) + { + this->tokens--; + return true; + } + else + return false; + } + + void set_limit(std::size_t limit) + { + this->limit = limit; + } + +private: + std::size_t limit; + std::size_t tokens; + std::chrono::milliseconds fill_duration; + std::function callback; + + void add_token() + { + if (this->callback() && this->tokens != limit) + this->tokens++; + } +}; diff --git a/src/xmpp/biboumi_adhoc_commands.cpp b/src/xmpp/biboumi_adhoc_commands.cpp index 1b5fdcb..47de27e 100644 --- a/src/xmpp/biboumi_adhoc_commands.cpp +++ b/src/xmpp/biboumi_adhoc_commands.cpp @@ -365,6 +365,15 @@ void ConfigureIrcServerStep1(XmppComponent&, AdhocSession& session, XmlNode& com } } + { + XmlSubNode throttle_limit(x, "field"); + throttle_limit["var"] = "throttle_limit"; + throttle_limit["type"] = "text-single"; + throttle_limit["label"] = "Throttle limit"; + XmlSubNode value(throttle_limit, "value"); + value.set_inner(std::to_string(options.col())); + } + { XmlSubNode encoding_out(x, "field"); encoding_out["var"] = "encoding_out"; @@ -392,8 +401,10 @@ void ConfigureIrcServerStep1(XmppComponent&, AdhocSession& session, XmlNode& com } } -void ConfigureIrcServerStep2(XmppComponent&, AdhocSession& session, XmlNode& command_node) +void ConfigureIrcServerStep2(XmppComponent& xmpp_component, AdhocSession& session, XmlNode& command_node) { + auto& biboumi_component = dynamic_cast(xmpp_component); + const XmlNode* x = command_node.get_child("x", "jabber:x:data"); if (x) { @@ -474,6 +485,19 @@ void ConfigureIrcServerStep2(XmppComponent&, AdhocSession& session, XmlNode& com else if (field->get_tag("var") == "realname" && value) options.col() = value->get_inner(); + else if (field->get_tag("var") == "throttle_limit" && value) + { + options.col() = std::stoull(value->get_inner()); + Bridge* bridge = biboumi_component.find_user_bridge(session.get_owner_jid()); + if (bridge) + { + IrcClient* client = bridge->find_irc_client(server_domain); + if (client) + client->set_throttle_limit(options.col()); + } + + } + else if (field->get_tag("var") == "encoding_out" && value) options.col() = value->get_inner(); diff --git a/tests/end_to_end/__main__.py b/tests/end_to_end/__main__.py index 60e868b..d6dd9a7 100644 --- a/tests/end_to_end/__main__.py +++ b/tests/end_to_end/__main__.py @@ -1310,16 +1310,16 @@ if __name__ == '__main__': ]), # Send a multi-line channel message - partial(send_stanza, "un\ndeux\ntrois"), + partial(send_stanza, "a\nb\nc"), # Receive multiple messages, for each user partial(expect_unordered, [ - ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id='the-message-id'][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='un']",), - ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='deux']",), - ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='trois']",), + ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id='the-message-id'][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='a']",), + ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='b']",), + ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_one}/{resource_one}'][@type='groupchat']/body[text()='c']",), - ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='un']",), - ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='deux']",), - ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='trois']",), + ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='a']",), + ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='b']",), + ("/message[@from='#foo%{irc_server_one}/{nick_one}'][@id][@to='{jid_two}/{resource_one}'][@type='groupchat']/body[text()='c']",), ]) ]), Scenario("channel_messages", @@ -2073,6 +2073,21 @@ if __name__ == '__main__': Scenario("join_history_limits", [ handshake_sequence(), + + # Disable the throttling because the test is based on timings + partial(send_stanza, ""), + partial(expect_stanza, "/iq[@type='result']", + after = partial(save_value, "sessionid", partial(extract_attribute, "/iq[@type='result']/commands:command[@node='configure']", "sessionid"))), + partial(send_stanza, "" + "" + "" + "6667" + "66976670" + "9999" + ""), + partial(expect_stanza, "/iq[@type='result']/commands:command[@node='configure'][@status='completed']/commands:note[@type='info'][text()='Configuration successfully applied.']"), + + partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), -- cgit v1.2.3