summaryrefslogtreecommitdiff
path: root/slixmpp/jid.py
diff options
context:
space:
mode:
Diffstat (limited to 'slixmpp/jid.py')
-rw-r--r--slixmpp/jid.py638
1 files changed, 638 insertions, 0 deletions
diff --git a/slixmpp/jid.py b/slixmpp/jid.py
new file mode 100644
index 00000000..b7ebfbe9
--- /dev/null
+++ b/slixmpp/jid.py
@@ -0,0 +1,638 @@
+# -*- coding: utf-8 -*-
+"""
+ slixmpp.jid
+ ~~~~~~~~~~~~~~~~~~~~~~~
+
+ This module allows for working with Jabber IDs (JIDs).
+
+ Part of Slixmpp: The Slick XMPP Library
+
+ :copyright: (c) 2011 Nathanael C. Fritz
+ :license: MIT, see LICENSE for more details
+"""
+
+from __future__ import unicode_literals
+
+import re
+import socket
+import stringprep
+import threading
+import encodings.idna
+
+from copy import deepcopy
+
+from slixmpp.util import stringprep_profiles
+from slixmpp.thirdparty import OrderedDict
+
+#: These characters are not allowed to appear in a JID.
+ILLEGAL_CHARS = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r' + \
+ '\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19' + \
+ '\x1a\x1b\x1c\x1d\x1e\x1f' + \
+ ' !"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~\x7f'
+
+#: The basic regex pattern that a JID must match in order to determine
+#: the local, domain, and resource parts. This regex does NOT do any
+#: validation, which requires application of nodeprep, resourceprep, etc.
+JID_PATTERN = re.compile(
+ "^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$"
+)
+
+#: The set of escape sequences for the characters not allowed by nodeprep.
+JID_ESCAPE_SEQUENCES = set(['\\20', '\\22', '\\26', '\\27', '\\2f',
+ '\\3a', '\\3c', '\\3e', '\\40', '\\5c'])
+
+#: A mapping of unallowed characters to their escape sequences. An escape
+#: sequence for '\' is also included since it must also be escaped in
+#: certain situations.
+JID_ESCAPE_TRANSFORMATIONS = {' ': '\\20',
+ '"': '\\22',
+ '&': '\\26',
+ "'": '\\27',
+ '/': '\\2f',
+ ':': '\\3a',
+ '<': '\\3c',
+ '>': '\\3e',
+ '@': '\\40',
+ '\\': '\\5c'}
+
+#: The reverse mapping of escape sequences to their original forms.
+JID_UNESCAPE_TRANSFORMATIONS = {'\\20': ' ',
+ '\\22': '"',
+ '\\26': '&',
+ '\\27': "'",
+ '\\2f': '/',
+ '\\3a': ':',
+ '\\3c': '<',
+ '\\3e': '>',
+ '\\40': '@',
+ '\\5c': '\\'}
+
+JID_CACHE = OrderedDict()
+JID_CACHE_LOCK = threading.Lock()
+JID_CACHE_MAX_SIZE = 1024
+
+def _cache(key, parts, locked):
+ JID_CACHE[key] = (parts, locked)
+ if len(JID_CACHE) > JID_CACHE_MAX_SIZE:
+ with JID_CACHE_LOCK:
+ while len(JID_CACHE) > JID_CACHE_MAX_SIZE:
+ found = None
+ for key, item in JID_CACHE.items():
+ if not item[1]: # if not locked
+ found = key
+ break
+ if not found: # more than MAX_SIZE locked
+ # warn?
+ break
+ del JID_CACHE[found]
+
+# pylint: disable=c0103
+#: The nodeprep profile of stringprep used to validate the local,
+#: or username, portion of a JID.
+nodeprep = stringprep_profiles.create(
+ nfkc=True,
+ bidi=True,
+ mappings=[
+ stringprep_profiles.b1_mapping,
+ stringprep.map_table_b2],
+ prohibited=[
+ stringprep.in_table_c11,
+ stringprep.in_table_c12,
+ stringprep.in_table_c21,
+ stringprep.in_table_c22,
+ stringprep.in_table_c3,
+ stringprep.in_table_c4,
+ stringprep.in_table_c5,
+ stringprep.in_table_c6,
+ stringprep.in_table_c7,
+ stringprep.in_table_c8,
+ stringprep.in_table_c9,
+ lambda c: c in ' \'"&/:<>@'],
+ unassigned=[stringprep.in_table_a1])
+
+# pylint: disable=c0103
+#: The resourceprep profile of stringprep, which is used to validate
+#: the resource portion of a JID.
+resourceprep = stringprep_profiles.create(
+ nfkc=True,
+ bidi=True,
+ mappings=[stringprep_profiles.b1_mapping],
+ prohibited=[
+ stringprep.in_table_c12,
+ stringprep.in_table_c21,
+ stringprep.in_table_c22,
+ stringprep.in_table_c3,
+ stringprep.in_table_c4,
+ stringprep.in_table_c5,
+ stringprep.in_table_c6,
+ stringprep.in_table_c7,
+ stringprep.in_table_c8,
+ stringprep.in_table_c9],
+ unassigned=[stringprep.in_table_a1])
+
+
+def _parse_jid(data):
+ """
+ Parse string data into the node, domain, and resource
+ components of a JID, if possible.
+
+ :param string data: A string that is potentially a JID.
+
+ :raises InvalidJID:
+
+ :returns: tuple of the validated local, domain, and resource strings
+ """
+ match = JID_PATTERN.match(data)
+ if not match:
+ raise InvalidJID('JID could not be parsed')
+
+ (node, domain, resource) = match.groups()
+
+ node = _validate_node(node)
+ domain = _validate_domain(domain)
+ resource = _validate_resource(resource)
+
+ return node, domain, resource
+
+
+def _validate_node(node):
+ """Validate the local, or username, portion of a JID.
+
+ :raises InvalidJID:
+
+ :returns: The local portion of a JID, as validated by nodeprep.
+ """
+ try:
+ if node is not None:
+ node = nodeprep(node)
+
+ if not node:
+ raise InvalidJID('Localpart must not be 0 bytes')
+ if len(node) > 1023:
+ raise InvalidJID('Localpart must be less than 1024 bytes')
+ return node
+ except stringprep_profiles.StringPrepError:
+ raise InvalidJID('Invalid local part')
+
+
+def _validate_domain(domain):
+ """Validate the domain portion of a JID.
+
+ IP literal addresses are left as-is, if valid. Domain names
+ are stripped of any trailing label separators (`.`), and are
+ checked with the nameprep profile of stringprep. If the given
+ domain is actually a punyencoded version of a domain name, it
+ is converted back into its original Unicode form. Domains must
+ also not start or end with a dash (`-`).
+
+ :raises InvalidJID:
+
+ :returns: The validated domain name
+ """
+ ip_addr = False
+
+ # First, check if this is an IPv4 address
+ try:
+ socket.inet_aton(domain)
+ ip_addr = True
+ except socket.error:
+ pass
+
+ # Check if this is an IPv6 address
+ if not ip_addr and hasattr(socket, 'inet_pton'):
+ try:
+ socket.inet_pton(socket.AF_INET6, domain.strip('[]'))
+ domain = '[%s]' % domain.strip('[]')
+ ip_addr = True
+ except (socket.error, ValueError):
+ pass
+
+ if not ip_addr:
+ # This is a domain name, which must be checked further
+
+ if domain and domain[-1] == '.':
+ domain = domain[:-1]
+
+ domain_parts = []
+ for label in domain.split('.'):
+ try:
+ label = encodings.idna.nameprep(label)
+ encodings.idna.ToASCII(label)
+ pass_nameprep = True
+ except UnicodeError:
+ pass_nameprep = False
+
+ if not pass_nameprep:
+ raise InvalidJID('Could not encode domain as ASCII')
+
+ if label.startswith('xn--'):
+ label = encodings.idna.ToUnicode(label)
+
+ for char in label:
+ if char in ILLEGAL_CHARS:
+ raise InvalidJID('Domain contains illegal characters')
+
+ if '-' in (label[0], label[-1]):
+ raise InvalidJID('Domain started or ended with -')
+
+ domain_parts.append(label)
+ domain = '.'.join(domain_parts)
+
+ if not domain:
+ raise InvalidJID('Domain must not be 0 bytes')
+ if len(domain) > 1023:
+ raise InvalidJID('Domain must be less than 1024 bytes')
+
+ return domain
+
+
+def _validate_resource(resource):
+ """Validate the resource portion of a JID.
+
+ :raises InvalidJID:
+
+ :returns: The local portion of a JID, as validated by resourceprep.
+ """
+ try:
+ if resource is not None:
+ resource = resourceprep(resource)
+
+ if not resource:
+ raise InvalidJID('Resource must not be 0 bytes')
+ if len(resource) > 1023:
+ raise InvalidJID('Resource must be less than 1024 bytes')
+ return resource
+ except stringprep_profiles.StringPrepError:
+ raise InvalidJID('Invalid resource')
+
+
+def _escape_node(node):
+ """Escape the local portion of a JID."""
+ result = []
+
+ for i, char in enumerate(node):
+ if char == '\\':
+ if ''.join((node[i:i+3])) in JID_ESCAPE_SEQUENCES:
+ result.append('\\5c')
+ continue
+ result.append(char)
+
+ for i, char in enumerate(result):
+ if char != '\\':
+ result[i] = JID_ESCAPE_TRANSFORMATIONS.get(char, char)
+
+ escaped = ''.join(result)
+
+ if escaped.startswith('\\20') or escaped.endswith('\\20'):
+ raise InvalidJID('Escaped local part starts or ends with "\\20"')
+
+ _validate_node(escaped)
+
+ return escaped
+
+
+def _unescape_node(node):
+ """Unescape a local portion of a JID.
+
+ .. note::
+ The unescaped local portion is meant ONLY for presentation,
+ and should not be used for other purposes.
+ """
+ unescaped = []
+ seq = ''
+ for i, char in enumerate(node):
+ if char == '\\':
+ seq = node[i:i+3]
+ if seq not in JID_ESCAPE_SEQUENCES:
+ seq = ''
+ if seq:
+ if len(seq) == 3:
+ unescaped.append(JID_UNESCAPE_TRANSFORMATIONS.get(seq, char))
+
+ # Pop character off the escape sequence, and ignore it
+ seq = seq[1:]
+ else:
+ unescaped.append(char)
+ unescaped = ''.join(unescaped)
+
+ return unescaped
+
+
+def _format_jid(local=None, domain=None, resource=None):
+ """Format the given JID components into a full or bare JID.
+
+ :param string local: Optional. The local portion of the JID.
+ :param string domain: Required. The domain name portion of the JID.
+ :param strin resource: Optional. The resource portion of the JID.
+
+ :return: A full or bare JID string.
+ """
+ result = []
+ if local:
+ result.append(local)
+ result.append('@')
+ if domain:
+ result.append(domain)
+ if resource:
+ result.append('/')
+ result.append(resource)
+ return ''.join(result)
+
+
+class InvalidJID(ValueError):
+ """
+ Raised when attempting to create a JID that does not pass validation.
+
+ It can also be raised if modifying an existing JID in such a way as
+ to make it invalid, such trying to remove the domain from an existing
+ full JID while the local and resource portions still exist.
+ """
+
+# pylint: disable=R0903
+class UnescapedJID(object):
+
+ """
+ .. versionadded:: 1.1.10
+ """
+
+ def __init__(self, local, domain, resource):
+ self._jid = (local, domain, resource)
+
+ # pylint: disable=R0911
+ def __getattr__(self, name):
+ """Retrieve the given JID component.
+
+ :param name: one of: user, server, domain, resource,
+ full, or bare.
+ """
+ if name == 'resource':
+ return self._jid[2] or ''
+ elif name in ('user', 'username', 'local', 'node'):
+ return self._jid[0] or ''
+ elif name in ('server', 'domain', 'host'):
+ return self._jid[1] or ''
+ elif name in ('full', 'jid'):
+ return _format_jid(*self._jid)
+ elif name == 'bare':
+ return _format_jid(self._jid[0], self._jid[1])
+ elif name == '_jid':
+ return getattr(super(JID, self), '_jid')
+ else:
+ return None
+
+ def __str__(self):
+ """Use the full JID as the string value."""
+ return _format_jid(*self._jid)
+
+ def __repr__(self):
+ """Use the full JID as the representation."""
+ return self.__str__()
+
+
+class JID(object):
+
+ """
+ A representation of a Jabber ID, or JID.
+
+ Each JID may have three components: a user, a domain, and an optional
+ resource. For example: user@domain/resource
+
+ When a resource is not used, the JID is called a bare JID.
+ The JID is a full JID otherwise.
+
+ **JID Properties:**
+ :jid: Alias for ``full``.
+ :full: The string value of the full JID.
+ :bare: The string value of the bare JID.
+ :user: The username portion of the JID.
+ :username: Alias for ``user``.
+ :local: Alias for ``user``.
+ :node: Alias for ``user``.
+ :domain: The domain name portion of the JID.
+ :server: Alias for ``domain``.
+ :host: Alias for ``domain``.
+ :resource: The resource portion of the JID.
+
+ :param string jid:
+ A string of the form ``'[user@]domain[/resource]'``.
+ :param string local:
+ Optional. Specify the local, or username, portion
+ of the JID. If provided, it will override the local
+ value provided by the `jid` parameter. The given
+ local value will also be escaped if necessary.
+ :param string domain:
+ Optional. Specify the domain of the JID. If
+ provided, it will override the domain given by
+ the `jid` parameter.
+ :param string resource:
+ Optional. Specify the resource value of the JID.
+ If provided, it will override the domain given
+ by the `jid` parameter.
+
+ :raises InvalidJID:
+ """
+
+ # pylint: disable=W0212
+ def __init__(self, jid=None, **kwargs):
+ locked = kwargs.get('cache_lock', False)
+ in_local = kwargs.get('local', None)
+ in_domain = kwargs.get('domain', None)
+ in_resource = kwargs.get('resource', None)
+ parts = None
+ if in_local or in_domain or in_resource:
+ parts = (in_local, in_domain, in_resource)
+
+ # only check cache if there is a jid string, or parts, not if there
+ # are both
+ self._jid = None
+ key = None
+ if (jid is not None) and (parts is None):
+ if isinstance(jid, JID):
+ # it's already good to go, and there are no additions
+ self._jid = jid._jid
+ return
+ key = jid
+ self._jid, locked = JID_CACHE.get(jid, (None, locked))
+ elif jid is None and parts is not None:
+ key = parts
+ self._jid, locked = JID_CACHE.get(parts, (None, locked))
+ if not self._jid:
+ if not jid:
+ parsed_jid = (None, None, None)
+ elif not isinstance(jid, JID):
+ parsed_jid = _parse_jid(jid)
+ else:
+ parsed_jid = jid._jid
+
+ local, domain, resource = parsed_jid
+
+ if 'local' in kwargs:
+ local = _escape_node(in_local)
+ if 'domain' in kwargs:
+ domain = _validate_domain(in_domain)
+ if 'resource' in kwargs:
+ resource = _validate_resource(in_resource)
+
+ self._jid = (local, domain, resource)
+ if key:
+ _cache(key, self._jid, locked)
+
+ def unescape(self):
+ """Return an unescaped JID object.
+
+ Using an unescaped JID is preferred for displaying JIDs
+ to humans, and they should NOT be used for any other
+ purposes than for presentation.
+
+ :return: :class:`UnescapedJID`
+
+ .. versionadded:: 1.1.10
+ """
+ return UnescapedJID(_unescape_node(self._jid[0]),
+ self._jid[1],
+ self._jid[2])
+
+ def regenerate(self):
+ """No-op
+
+ .. deprecated:: 1.1.10
+ """
+ pass
+
+ def reset(self, data):
+ """Start fresh from a new JID string.
+
+ :param string data: A string of the form ``'[user@]domain[/resource]'``.
+
+ .. deprecated:: 1.1.10
+ """
+ self._jid = JID(data)._jid
+
+ @property
+ def resource(self):
+ return self._jid[2] or ''
+
+ @property
+ def user(self):
+ return self._jid[0] or ''
+
+ @property
+ def local(self):
+ return self._jid[0] or ''
+
+ @property
+ def node(self):
+ return self._jid[0] or ''
+
+ @property
+ def username(self):
+ return self._jid[0] or ''
+
+ @property
+ def bare(self):
+ return _format_jid(self._jid[0], self._jid[1])
+
+ @property
+ def server(self):
+ return self._jid[1] or ''
+
+ @property
+ def domain(self):
+ return self._jid[1] or ''
+
+ @property
+ def host(self):
+ return self._jid[1] or ''
+
+ @property
+ def full(self):
+ return _format_jid(*self._jid)
+
+ @property
+ def jid(self):
+ return _format_jid(*self._jid)
+
+ @property
+ def bare(self):
+ return _format_jid(self._jid[0], self._jid[1])
+
+
+ @resource.setter
+ def resource(self, value):
+ self._jid = JID(self, resource=value)._jid
+
+ @user.setter
+ def user(self, value):
+ self._jid = JID(self, local=value)._jid
+
+ @username.setter
+ def username(self, value):
+ self._jid = JID(self, local=value)._jid
+
+ @local.setter
+ def local(self, value):
+ self._jid = JID(self, local=value)._jid
+
+ @node.setter
+ def node(self, value):
+ self._jid = JID(self, local=value)._jid
+
+ @server.setter
+ def server(self, value):
+ self._jid = JID(self, domain=value)._jid
+
+ @domain.setter
+ def domain(self, value):
+ self._jid = JID(self, domain=value)._jid
+
+ @host.setter
+ def host(self, value):
+ self._jid = JID(self, domain=value)._jid
+
+ @full.setter
+ def full(self, value):
+ self._jid = JID(value)._jid
+
+ @jid.setter
+ def jid(self, value):
+ self._jid = JID(value)._jid
+
+ @bare.setter
+ def bare(self, value):
+ parsed = JID(value)._jid
+ self._jid = (parsed[0], parsed[1], self._jid[2])
+
+
+ def __str__(self):
+ """Use the full JID as the string value."""
+ return _format_jid(*self._jid)
+
+ def __repr__(self):
+ """Use the full JID as the representation."""
+ return self.__str__()
+
+ # pylint: disable=W0212
+ def __eq__(self, other):
+ """Two JIDs are equal if they have the same full JID value."""
+ if isinstance(other, UnescapedJID):
+ return False
+
+ other = JID(other)
+ return self._jid == other._jid
+
+ # pylint: disable=W0212
+ def __ne__(self, other):
+ """Two JIDs are considered unequal if they are not equal."""
+ return not self == other
+
+ def __hash__(self):
+ """Hash a JID based on the string version of its full JID."""
+ return hash(self.__str__())
+
+ def __copy__(self):
+ """Generate a duplicate JID."""
+ return JID(self)
+
+ def __deepcopy__(self, memo):
+ """Generate a duplicate JID."""
+ return JID(deepcopy(str(self), memo))