Source code for ldap2jira.map

""" LDAP to JIRA User mapping

Package looks for given user names in LDAP.
Gets configurable fields for user name, email, name.
Tries to search those in JIRA and find a matching JIRA account.

Optionally consumes user map from file (json, csv).

Usage:
    * Configure instance
    * Use LDAP2JiraUserMap.find_jira_accounts method
"""

from concurrent.futures import ThreadPoolExecutor, as_completed
import csv
import json
import logging
import os

from jira import JIRA
from ldap2jira.ldap_lookup import LDAPLookup
from typing import List  # < python 3.9


log = logging.getLogger('ldap2jira.map')


NO_MATCH = 0
PARTIAL_MATCH = 1
MATCH = 2


[docs]class LDAP2JiraUserMap: """ Finds matching JIRA accounts for given user names Checks whether user name has LDAP record. Gets user names and email alternatives from LDAP. Looks for matching accounts in JIRA. Args: jira_url: JIRA server url ('https://issues.domain.org') jira_user: JIRA user to use for querying jira_password: JIRA user password to use for querying jira_auth_token: JIRA auth token to use for querying ldap_url: LDAP server host ('ldap://ldaphost') ldap_base: LDAP base for queries ('ou=users,dc=dep,dc=org') ldap_query_fields_username: Which LDAP fields to search given user name in ldap_fields_username: LDAP fields to match against JIRA account user name ldap_fields_mail: LDAP fields to match against JIRA account email ldap_fields_name: LDAP fields to match against JIRA account name ldap_fields_jira_search: Run JIRA search against those field values from LDAP email_domain: JIRA user email domain to match map_file: json or csv file with user mapping (username -> jira_username) csv example: us1csv,us1jira us2csv,us2jira json example: { "us1json": "us1jira", "us2json": "us2jira" } """ def __init__(self, jira_url: str, ldap_url: str, ldap_base: str, ldap_query_fields_username: List[str], ldap_fields_username: List[str], ldap_fields_mail: List[str], ldap_fields_name: List[str], ldap_fields_jira_search: List[str], email_domain: str, map_file: str = None, jira_user: str = None, jira_password: str = None, jira_auth_token: str = None, jira_extra_kwargs=None ): if jira_user or jira_password: if not all((jira_user, jira_password)): raise ValueError('JIRA user and password required for basic auth.') elif not jira_auth_token: raise ValueError('JIRA user/password or auth token required.') self.jira_url = jira_url self.jira_user = jira_user self.jira_password = jira_password self.jira_auth_token = jira_auth_token self.jira_extra_kwargs = jira_extra_kwargs or {} self.ldap_url = ldap_url self.ldap_base = ldap_base self.ldap_query_fields_username = ldap_query_fields_username self.ldap_fields_username = ldap_fields_username self.ldap_fields_mail = ldap_fields_mail self.ldap_fields_name = ldap_fields_name self.ldap_fields_jira_search = ldap_fields_jira_search self.email_domain = email_domain.lstrip('@') self._ldap = None self._jira = None self.map_file = map_file self.map = {} @property def ldap(self) -> LDAPLookup: if not self._ldap: self._ldap = LDAPLookup(self.ldap_url, self.ldap_base) return self._ldap @property def jira(self) -> JIRA: if not self._jira: # pragma: no cover jira_kwargs = { 'options': {'server': self.jira_url}, 'get_server_info': False, } if self.jira_password: jira_kwargs['basic_auth'] = (self.jira_user, self.jira_password) else: jira_kwargs['token_auth'] = self.jira_auth_token self._jira = JIRA(**jira_kwargs, **self.jira_extra_kwargs) return self._jira def load_map(self, filename: str = None): if not filename: return {} if not os.path.exists(filename): log.warning("Map file doesn't exist: %s", filename) return {} file_extension = os.path.splitext(filename)[1] fmap = {} with open(filename, 'r') as map_fp: if file_extension == '.json': fmap = json.load(map_fp) if file_extension == '.csv': fmap = {val_list[0]: val_list[1] for val_list in csv.reader(map_fp)} return fmap def ldap_query(self, query: str): return_fields = set( self.ldap_fields_username + self.ldap_fields_mail + self.ldap_fields_jira_search ) return self.ldap.query( query, query_fields=self.ldap_query_fields_username, return_fields=return_fields ) def jira_search_user(self, query: str): log.info('Jira search for: %s', query) return self.jira.search_users(query, maxResults=10) def _ldap_jira_match(self, ldap_account: dict, jira_account: object ) -> int: """ Compare LDAP result with JIRA account Args: ldap_account: LDAP result jira_account: JIRA account to compare LDAP with Returns: Either MATCH, PARTIAL_MATCH, NO_MATCH """ try: jira_username = jira_account.name jira_email = jira_account.emailAddress jira_names = {jira_account.name, jira_account.displayName} except AttributeError as e: log.warning('Unable to get JIRA account values\n%s', e) return NO_MATCH log.debug('Trying JIRA account: %s [%s] %s', jira_account.displayName, jira_username, jira_email) if jira_email.endswith(f'@{self.email_domain}'): ldap_emails = {ldap_account[f] for f in self.ldap_fields_mail if f in ldap_account} ldap_usernames = {ldap_account[f] for f in self.ldap_fields_username if f in ldap_account} email_match = jira_email in ldap_emails username_match = jira_username in ldap_usernames if email_match or username_match: log.debug('Match') return MATCH ldap_names = {ldap_account[f] for f in self.ldap_fields_name if f in ldap_account} if jira_names & ldap_names: log.debug('Partial Match') return PARTIAL_MATCH log.debug('No Match') return NO_MATCH def _update_user(self, user_dict: dict, username: str, status: str, log_extra: str = '', level=logging.WARNING ): """ Update common user dict and log result """ user_dict['status'] = status log_msg = ( "JIRA account - " f"{status.replace('_', ' ').capitalize()}: {username}\n") log_msg += log_extra + '\n' if log_extra else '' log.log(level, log_msg) def _process_username(self, username: str) -> dict: """ Process single username Multithreaded worker method """ user_dict = {'username': username} if not username: return user_dict log.info('Process username: %s', username) # Try file map if username in self.map: self._update_user(user_dict, self.map[username], 'found', log_extra='File Map', level=logging.INFO) user_dict['jira-account'] = self.map[username] return user_dict # No luck - continue ldap_results = self.ldap_query(username) if not ldap_results: self._update_user(user_dict, username, 'not_in_ldap') return user_dict elif len(ldap_results) > 1: # Shouldn't happen when searching unique ldap field for match self._update_user(user_dict, username, 'missing') log.error('Multiple LDAP records for uid %s', username) return user_dict ldap_account = ldap_results[0] # All the values to search JIRA for in order jira_queries = [] for field in self.ldap_fields_jira_search: if ( field not in ldap_account or not ldap_account[field] ): log.debug('Field %s not in LDAP results', field) continue if ldap_account[field] not in jira_queries: jira_queries.append(ldap_account[field]) # Look for jira account based on various ldap fields by preference jira_account_usernames = set() partial_single_matches = [] # Need order of preference for query in jira_queries: result_jira_accounts = self.jira_search_user(query) single_result = len(result_jira_accounts) == 1 for jira_account in result_jira_accounts: if jira_account.name in jira_account_usernames and not single_result: continue jira_account_usernames.add(jira_account.name) match = self._ldap_jira_match(ldap_account, jira_account) if match == MATCH: self._update_user(user_dict, jira_account.name, 'found', level=logging.INFO) user_dict['jira-account'] = jira_account.name break if match == PARTIAL_MATCH and single_result: if jira_account.name not in partial_single_matches: partial_single_matches.append(jira_account.name) # Don't search value from rest of ldap fields if 'jira-account' in user_dict: break if not jira_account_usernames: self._update_user(user_dict, username, 'missing') return user_dict if len(partial_single_matches) == 1: self._update_user(user_dict, partial_single_matches[0], 'found', log_extra='Single Partial', level=logging.INFO) user_dict['jira-account'] = partial_single_matches[0] if 'jira-account' not in user_dict: user_dict['jira-results'] = jira_account_usernames self._update_user( user_dict, username, 'ambiguous', 'Possible matches: ' + ', '.join(user_dict['jira-results'])) return user_dict
[docs] def find_jira_accounts(self, usernames: List[str]) -> dict: """ Finds matching JIRA account for given user names Args: usernames: List of user names Returns: A dict with user names as keys and match results dict as values Possible match result keys: status: found: Found good match in JIRA missing: No match found in JIRA ambiguous: No good match, possible matches in jira-results not_in_ldap: User name wasn't found in LDAP jira-account: JIRA user name jira-results: A list of JIRA user names that partially match Example: { 'us1': {'jira-results': ['us1'], 'status': 'ambiguous'}, 'us2': {'status': 'missing'}, 'us3': {'status': 'not_in_ldap'}, 'us4': {'jira-results': ['us1', 'us3'], 'status': 'ambiguous'}, 'us5': {'jira-account': 'us5', 'status': 'found'} } """ users = {} self.map.update(self.load_map(self.map_file)) with ThreadPoolExecutor(thread_name_prefix='W') as executor: f_users_d = {executor.submit(self._process_username, username) for username in usernames} for f_user_d in as_completed(f_users_d): user_d = f_user_d.result() username = user_d.pop('username') if username: users[username] = user_d return users