Source code for ldap2jira.ldap_lookup

import ldap
from ldap.ldapobject import ReconnectLDAPObject
from typing import List  # < python 3.9


[docs]class LDAPQueryNotFoundError(Exception): pass
[docs]class LDAPLookup: """ Wraps ldap library query Args: ldap_url: LDAP server in the form of 'ldap://ldaphost' ldap_base: LDAP base for search ('ou=users,dc=department,dc=org') ldap_retry_max: LDAP number of reconnect attempts ldap_retry_delay: LDAP seconds between reconnect attempts """ DEFAULT_QUERY_FIELDS: List[str] = ['uid'] DEFAULT_RETURN_FIELDS: List[str] = ['uid', 'cn', 'mail'] def __init__(self, ldap_url: str, ldap_base: str, ldap_retry_max: int = 3, ldap_retry_delay: float = 5.0): self.ldap_url = ldap_url self.ldap_base = ldap_base self.ldap_retry_max = ldap_retry_max self.ldap_retry_delay = ldap_retry_delay self._ldap_client = None # lazy client init @property def ldap_client(self): if not self._ldap_client: self._ldap_client = ReconnectLDAPObject( self.ldap_url, retry_max=self.ldap_retry_max, retry_delay=self.ldap_retry_delay ) self._ldap_client.set_option(ldap.OPT_PROTOCOL_VERSION, 3) self._ldap_client.set_option(ldap.OPT_REFERRALS, 0) return self._ldap_client
[docs] def query(self, query: str, query_fields: List[str] = None, return_fields: List[str] = None, raise_exception: bool = False, ) -> List[dict]: """ Perform LDAP query Args: query: String to search query_fields: Which LDAP fields to search in return_fields: What LDAP fields to return raise_exception: If True - raise exception if no results Returns: List if dicts with LDAP results Example: [ {'uid': 'us1', 'cn': 'user 1', 'mail': 'us1@org.com'}, {'uid': 'us2', 'cn': 'user 2', 'mail': 'us2@org.com'} ] Raises: LDAPQueryNotFoundError: No result while raise_exception True """ query = query.rstrip('*') if not query_fields: query_fields = self.DEFAULT_QUERY_FIELDS if not return_fields: return_fields = self.DEFAULT_RETURN_FIELDS if len(query_fields) == 1: query_string = f'{query_fields[0]}={query}' else: # Example: (|(cn=query*)(sn=query*)(mail=query*)) field_queries = [f'({field}={query}*)' for field in query_fields if field] query_string = '(|%s)' % ''.join(field_queries) res = self.ldap_client.search_s( self.ldap_base, ldap.SCOPE_SUBTREE, query_string, return_fields) if raise_exception and not res: raise LDAPQueryNotFoundError(f'Query not found in LDAP: {query}') # Extract first values, convert from bytes return [{k: v[0].decode('utf-8') for k, v in record[1].items()} for record in res]