From e4e18a416f63cfe44c1db92e5e18c4dfe8e229c1 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Sun, 22 Jul 2012 00:16:35 -0700 Subject: Add validation for JIDs. --- setup.py | 1 + sleekxmpp/__init__.py | 1 + sleekxmpp/jid.py | 262 ++++++++++++++++++++++++++++++++++ sleekxmpp/util/__init__.py | 0 sleekxmpp/util/stringprep_profiles.py | 116 +++++++++++++++ sleekxmpp/xmlstream/__init__.py | 2 +- sleekxmpp/xmlstream/jid.py | 148 ------------------- tests/test_jid.py | 2 +- 8 files changed, 382 insertions(+), 150 deletions(-) create mode 100644 sleekxmpp/jid.py create mode 100644 sleekxmpp/util/__init__.py create mode 100644 sleekxmpp/util/stringprep_profiles.py delete mode 100644 sleekxmpp/xmlstream/jid.py diff --git a/setup.py b/setup.py index 6d0891a6..99b060f2 100755 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ packages = [ 'sleekxmpp', 'sleekxmpp/stanza', 'sleekxmpp/test', 'sleekxmpp/roster', + 'sleekxmpp/util', 'sleekxmpp/xmlstream', 'sleekxmpp/xmlstream/matcher', 'sleekxmpp/xmlstream/handler', diff --git a/sleekxmpp/__init__.py b/sleekxmpp/__init__.py index a1f1c0f1..84b1114f 100644 --- a/sleekxmpp/__init__.py +++ b/sleekxmpp/__init__.py @@ -10,6 +10,7 @@ from sleekxmpp.basexmpp import BaseXMPP from sleekxmpp.clientxmpp import ClientXMPP from sleekxmpp.componentxmpp import ComponentXMPP from sleekxmpp.stanza import Message, Presence, Iq +from sleekxmpp.jid import JID from sleekxmpp.xmlstream.handler import * from sleekxmpp.xmlstream import XMLStream, RestartStream from sleekxmpp.xmlstream.matcher import * diff --git a/sleekxmpp/jid.py b/sleekxmpp/jid.py new file mode 100644 index 00000000..e6da5746 --- /dev/null +++ b/sleekxmpp/jid.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +""" + sleekxmpp.jid + ~~~~~~~~~~~~~~~~~~~~~~~ + + This module allows for working with Jabber IDs (JIDs) by + providing accessors for the various components of a JID. + + Part of SleekXMPP: The Sleek XMPP Library + + :copyright: (c) 2011 Nathanael C. Fritz + :license: MIT, see LICENSE for more details +""" + +from __future__ import unicode_literals + +import re +import socket +import stringprep +import encodings.idna + +from sleekxmpp.util import stringprep_profiles + + +ILLEGAL_CHARS = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r' + \ + '\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19' + \ + '\x1a\x1b\x1c\x1d\x1e\x1f' + \ + ' !"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~\x7f' + +JID_PATTERN = "^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$" + + +nodeprep = stringprep_profiles.create( + nfkc=True, + bidi=True, + mappings=[ + stringprep_profiles.b1_mapping, + stringprep_profiles.c12_mapping], + prohibited=[ + stringprep.in_table_c11, + stringprep.in_table_c12, + stringprep.in_table_c21, + stringprep.in_table_c22, + stringprep.in_table_c3, + stringprep.in_table_c4, + stringprep.in_table_c5, + stringprep.in_table_c6, + stringprep.in_table_c7, + stringprep.in_table_c8, + stringprep.in_table_c9, + lambda c: c in '\'"&/:<>@'], + unassigned=[stringprep.in_table_a1]) + + +resourceprep = stringprep_profiles.create( + nfkc=True, + bidi=True, + mappings=[stringprep_profiles.b1_mapping], + prohibited=[ + stringprep.in_table_c12, + stringprep.in_table_c21, + stringprep.in_table_c22, + stringprep.in_table_c3, + stringprep.in_table_c4, + stringprep.in_table_c5, + stringprep.in_table_c6, + stringprep.in_table_c7, + stringprep.in_table_c8, + stringprep.in_table_c9], + unassigned=[stringprep.in_table_a1]) + + +class InvalidJID(ValueError): + pass + + +def parse_jid(data): + """ + Parse string data into the node, domain, and resource + components of a JID. + """ + match = re.match(JID_PATTERN, data) + if not match: + raise InvalidJID + + (node, domain, resource) = match.groups() + + ip_addr = False + + try: + socket.inet_aton(domain) + ip_addr = True + except socket.error: + pass + + if not ip_addr and hasattr(socket, 'inet_pton'): + try: + socket.inet_pton(socket.AF_INET6, domain.strip('[]')) + ip_addr = True + except socket.error: + pass + + if not ip_addr: + domain_parts = [] + for label in domain.split('.'): + try: + label = encodings.idna.nameprep(label) + encodings.idna.ToASCII(label) + except UnicodeError: + raise InvalidJID + + for char in label: + if char in ILLEGAL_CHARS: + raise InvalidJID + + if '-' in (label[0], label[-1]): + raise InvalidJID + + domain_parts.append(label) + domain = '.'.join(domain_parts) + + try: + if node is not None: + node = nodeprep(node) + if resource is not None: + resource = resourceprep(resource) + except stringprep_profiles.StringPrepError: + raise InvalidJID + + return node, domain, resource + + +class JID(object): + + """ + A representation of a Jabber ID, or JID. + + Each JID may have three components: a user, a domain, and an optional + resource. For example: user@domain/resource + + When a resource is not used, the JID is called a bare JID. + The JID is a full JID otherwise. + + **JID Properties:** + :jid: Alias for ``full``. + :full: The string value of the full JID. + :bare: The string value of the bare JID. + :user: The username portion of the JID. + :username: Alias for ``user``. + :local: Alias for ``user``. + :node: Alias for ``user``. + :domain: The domain name portion of the JID. + :server: Alias for ``domain``. + :host: Alias for ``domain``. + :resource: The resource portion of the JID. + + :param string jid: A string of the form ``'[user@]domain[/resource]'``. + """ + + def __init__(self, jid=None, local=None, domain=None, resource=None): + """Initialize a new JID""" + self._jid = (None, None, None) + + if jid is None or jid == '': + jid = (None, None, None) + elif not isinstance(jid, JID): + jid = parse_jid(jid) + else: + jid = jid._jid + + orig_local, orig_domain, orig_resource = jid + self._jid = (local or orig_local or None, + domain or orig_domain or None, + resource or orig_resource or None) + + def regenerate(self): + """Deprecated""" + pass + + def reset(self, data): + """Start fresh from a new JID string. + + :param string data: A string of the form ``'[user@]domain[/resource]'``. + """ + self._jid = JID(data)._jid + + def __getattr__(self, name): + """handle getting the jid values, using cache if available. + + :param name: one of: user, server, domain, resource, + full, or bare. + """ + if name == 'resource': + return self._jid[2] or '' + elif name in ('user', 'username', 'local', 'node'): + return self._jid[0] or '' + elif name in ('server', 'domain', 'host'): + return self._jid[1] or '' + elif name in ('full', 'jid'): + return str(self) + elif name == 'bare': + return str(JID(local=self._jid[0], + domain=self._jid[1])) + else: + object.__getattr__(self, name) + + def __setattr__(self, name, value): + """handle getting the jid values, using cache if available. + + :param name: one of: ``user``, ``username``, ``local``, + ``node``, ``server``, ``domain``, ``host``, + ``resource``, ``full``, ``jid``, or ``bare``. + :param value: The new string value of the JID component. + """ + if name == 'resource': + self._jid = JID(self, resource=value)._jid + elif name in ('user', 'username', 'local', 'node'): + self._jid = JID(self, local=value)._jid + elif name in ('server', 'domain', 'host'): + self._jid = JID(self, domain=value)._jid + elif name in ('full', 'jid'): + self._jid = JID(value)._jid + elif name == 'bare': + parsed = JID(value)._jid + self._jid = (parsed[0], parsed[1], self._jid[2]) + else: + object.__setattr__(self, name, value) + + def __str__(self): + """Use the full JID as the string value.""" + result = [] + if self._jid[0]: + result.append(self._jid[0]) + result.append('@') + if self._jid[1]: + result.append(self._jid[1]) + if self._jid[2]: + result.append('/') + result.append(self._jid[2]) + return ''.join(result) + + def __repr__(self): + return self.__str__() + + def __eq__(self, other): + """ + Two JIDs are considered equal if they have the same full JID value. + """ + other = JID(other) + return self._jid == other._jid + + def __ne__(self, other): + """Two JIDs are considered unequal if they are not equal.""" + return not self._jid == other._jid + + def __hash__(self): + """Hash a JID based on the string version of its full JID.""" + return hash(self.__str__()) + + def __copy__(self): + """Generate a duplicate JID.""" + return JID(self) diff --git a/sleekxmpp/util/__init__.py b/sleekxmpp/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sleekxmpp/util/stringprep_profiles.py b/sleekxmpp/util/stringprep_profiles.py new file mode 100644 index 00000000..a75bb9dd --- /dev/null +++ b/sleekxmpp/util/stringprep_profiles.py @@ -0,0 +1,116 @@ +from __future__ import unicode_literals + +import sys +import stringprep +import unicodedata + + +class StringPrepError(UnicodeError): + pass + + +def to_unicode(data): + if sys.version_info < (3, 0): + return unicode(data) + else: + return str(data) + + +def b1_mapping(char): + return '' if stringprep.in_table_c12(char) else None + + +def c12_mapping(char): + return ' ' if stringprep.in_table_c12(char) else None + + +def map_input(data, tables=None): + """ + Each character in the input stream MUST be checked against + a mapping table. + """ + result = [] + for char in data: + replacement = None + + for mapping in tables: + replacement = mapping(char) + if replacement is not None: + break + + if replacement is None: + replacement = char + result.append(replacement) + return ''.join(result) + + +def normalize(data, nfkc=True): + """ + A profile can specify one of two options for Unicode normalization: + - no normalization + - Unicode normalization with form KC + """ + if nfkc: + data = unicodedata.normalize('NFKC', data) + return data + + +def prohibit_output(data, tables=None): + """ + Before the text can be emitted, it MUST be checked for prohibited + code points. + """ + for char in data: + for check in tables: + if check(char): + raise StringPrepError("Prohibited code point: %s" % char) + + +def check_bidi(data): + """ + 1) The characters in section 5.8 MUST be prohibited. + + 2) If a string contains any RandALCat character, the string MUST NOT + contain any LCat character. + + 3) If a string contains any RandALCat character, a RandALCat + character MUST be the first character of the string, and a + RandALCat character MUST be the last character of the string. + """ + has_lcat = False + has_randal = False + + for c in data: + if stringprep.in_table_c8(c): + raise StringPrepError("BIDI violation: seciton 6 (1)") + if stringprep.in_table_d1(c): + has_randal = True + elif stringprep.in_table_d2(c): + has_lcat = True + + if has_randal and has_lcat: + raise StringPrepError("BIDI violation: section 6 (2)") + + first_randal = stringprep.in_table_d1(data[0]) + last_randal = stringprep.in_table_d1(data[-1]) + if has_randal and not (first_randal and last_randal): + raise StringPrepError("BIDI violation: section 6 (3)") + + +def create(nfkc=True, bidi=True, mappings=None, + prohibited=None, unassigned=None): + def profile(data, query=False): + try: + data = to_unicode(data) + except UnicodeError: + raise StringPrepError + + data = map_input(data, mappings) + data = normalize(data, nfkc) + prohibit_output(data, prohibited) + if bidi: + check_bidi(data) + if query and unassigned: + check_unassigned(data, unassigned) + return data + return profile diff --git a/sleekxmpp/xmlstream/__init__.py b/sleekxmpp/xmlstream/__init__.py index 67b20c56..5a1ea1be 100644 --- a/sleekxmpp/xmlstream/__init__.py +++ b/sleekxmpp/xmlstream/__init__.py @@ -6,7 +6,7 @@ See the file LICENSE for copying permission. """ -from sleekxmpp.xmlstream.jid import JID +from sleekxmpp.jid import JID from sleekxmpp.xmlstream.scheduler import Scheduler from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET from sleekxmpp.xmlstream.stanzabase import register_stanza_plugin diff --git a/sleekxmpp/xmlstream/jid.py b/sleekxmpp/xmlstream/jid.py deleted file mode 100644 index 1582164a..00000000 --- a/sleekxmpp/xmlstream/jid.py +++ /dev/null @@ -1,148 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.xmlstream.jid - ~~~~~~~~~~~~~~~~~~~~~~~ - - This module allows for working with Jabber IDs (JIDs) by - providing accessors for the various components of a JID. - - Part of SleekXMPP: The Sleek XMPP Library - - :copyright: (c) 2011 Nathanael C. Fritz - :license: MIT, see LICENSE for more details -""" - -from __future__ import unicode_literals - - -class JID(object): - - """ - A representation of a Jabber ID, or JID. - - Each JID may have three components: a user, a domain, and an optional - resource. For example: user@domain/resource - - When a resource is not used, the JID is called a bare JID. - The JID is a full JID otherwise. - - **JID Properties:** - :jid: Alias for ``full``. - :full: The value of the full JID. - :bare: The value of the bare JID. - :user: The username portion of the JID. - :domain: The domain name portion of the JID. - :server: Alias for ``domain``. - :resource: The resource portion of the JID. - - :param string jid: A string of the form ``'[user@]domain[/resource]'``. - """ - - def __init__(self, jid): - """Initialize a new JID""" - self.reset(jid) - - def reset(self, jid): - """Start fresh from a new JID string. - - :param string jid: A string of the form ``'[user@]domain[/resource]'``. - """ - if isinstance(jid, JID): - jid = jid.full - self._full = self._jid = jid - self._domain = None - self._resource = None - self._user = None - self._bare = None - - def __getattr__(self, name): - """Handle getting the JID values, using cache if available. - - :param name: One of: user, server, domain, resource, - full, or bare. - """ - if name == 'resource': - if self._resource is None and '/' in self._jid: - self._resource = self._jid.split('/', 1)[-1] - return self._resource or "" - elif name == 'user': - if self._user is None: - if '@' in self._jid: - self._user = self._jid.split('@', 1)[0] - else: - self._user = self._user - return self._user or "" - elif name in ('server', 'domain', 'host'): - if self._domain is None: - self._domain = self._jid.split('@', 1)[-1].split('/', 1)[0] - return self._domain or "" - elif name in ('full', 'jid'): - return self._jid or "" - elif name == 'bare': - if self._bare is None: - self._bare = self._jid.split('/', 1)[0] - return self._bare or "" - - def __setattr__(self, name, value): - """Edit a JID by updating it's individual values, resetting the - generated JID in the end. - - Arguments: - name -- The name of the JID part. One of: user, domain, - server, resource, full, jid, or bare. - value -- The new value for the JID part. - """ - if name in ('resource', 'user', 'domain'): - object.__setattr__(self, "_%s" % name, value) - self.regenerate() - elif name in ('server', 'domain', 'host'): - self.domain = value - elif name in ('full', 'jid'): - self.reset(value) - self.regenerate() - elif name == 'bare': - if '@' in value: - u, d = value.split('@', 1) - object.__setattr__(self, "_user", u) - object.__setattr__(self, "_domain", d) - else: - object.__setattr__(self, "_user", '') - object.__setattr__(self, "_domain", value) - self.regenerate() - else: - object.__setattr__(self, name, value) - - def regenerate(self): - """Generate a new JID based on current values, useful after editing.""" - jid = "" - if self.user: - jid = "%s@" % self.user - jid += self.domain - if self.resource: - jid += "/%s" % self.resource - self.reset(jid) - - def __str__(self): - """Use the full JID as the string value.""" - return self.full - - def __repr__(self): - return self.full - - def __eq__(self, other): - """ - Two JIDs are considered equal if they have the same full JID value. - """ - other = JID(other) - return self.full == other.full - - def __ne__(self, other): - """Two JIDs are considered unequal if they are not equal.""" - return not self == other - - def __hash__(self): - """Hash a JID based on the string version of its full JID.""" - return hash(self.full) - - def __copy__(self): - return JID(self.jid) diff --git a/tests/test_jid.py b/tests/test_jid.py index ef1145d3..7b800520 100644 --- a/tests/test_jid.py +++ b/tests/test_jid.py @@ -1,5 +1,5 @@ from sleekxmpp.test import * -from sleekxmpp.xmlstream.jid import JID +from sleekxmpp import JID class TestJIDClass(SleekTest): -- cgit v1.2.3