#!/usr/bin/python # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import (absolute_import, division, print_function) __metaclass__ = type DOCUMENTATION = r''' --- module: zephir_register short_description: This is a module to automate EOLE module registering against zephir # If this is part of a collection, you need to use semantic versioning, # i.e. the version is of the form "2.5.0" and not "2.4". version_added: "1.0.0" description: This is my longer description explaining my test module. options: variables: description: dictionnary of variables used to answer interactive questions required: false type: dict # Specify this value according to your collection # in format of namespace.collection.doc_fragment_name extends_documentation_fragment: - cadoles.eole.zephir_register author: - Cadoles ''' EXAMPLES = r''' # Pass in a message - name: Test with a message cadoles.eole.zephir_register: variables: zephir_user: admin zephir_user_password: eole zephir_address: zephir.infra.lan action: download ''' RETURN = r''' # These are examples of possible return values, and in general should use other names for return values. id: description: id the server is associated with. type: int returned: always sample: 1 module: description: module the server is associated with. type: str returned: always sample: eolebase-2.7.2 module_id: description: module id the server is associated with. type: int returned: always sample: 42 variant: description: variant the server is associated with type: str returned: always sample: myeolebase variant_id: description: variant id the server is associated with type: int returned: always sample: 58 ''' from ansible.module_utils.basic import AnsibleModule import re import pexpect import subprocess import os import sys ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') class ExpectationCollection: def __init__(self): self.expectations_lookup = {} def add_expectation(self, expectation): if expectation.get_pattern() in self.expectations_lookup: if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]: print(f'Can not add {expectation} to collection') return False self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation] return True def add_expectation_from_descr(self, expectation, response=None, name=None): expectation = Expectation(expectation, response=response, name=name) self.add_expectation(expectation) def get_patterns(self): return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys()) def get_expectations_by_pattern(self, pattern): return self.expectations_lookup.get(pattern, []) def get_expectations_by_name(self, name): return [expectation for expectation in self.get_expectations(recursive=True) if expectation.name == name] def get_exposed_expectation_names(self): return [expectation.name for expectation in self.get_expectations(recursive=True) if expectation.name] def set_expectation_response_by_name(self, name, response): for expectation in self.get_expectations_by_name(name): expectation.set_response(response) def get_expectations(self, recursive=False): expectations = [exp for exps in self.expectations_lookup.values() for exp in exps] if recursive: expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()]) return list(set(expectations)) def count_expectations(self): return len(self.get_expectations(recursive=True)) def merge(self, collection): for expectation in collection.get_expectations(): self.add_expectation(expectation) class Expectation: def __init__(self, multiline_pattern, response=None, name=None): self.name = name self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()] self.pattern = self.context[-1] self.response = response self.next = {} def get_pattern(self, previous_answer=None): return re.compile(f'(.*){re.escape(self.pattern.format(variable=previous_answer))}(.*)') def set_next_expectation(self, expectation, triggers=None): if not isinstance(triggers, list): triggers = [triggers] for trigger in triggers: self.next[trigger] = expectation def set_response(self, response): print(f'Setting {response} for {self.pattern}') self.response = response def expect(self, spawned, previous_answer=None): print(f'-> expecting next "{self.pattern.format(variable=previous_answer)}"') p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern(previous_answer=previous_answer)]) if p not in [0, 1]: self.answer(spawned) else: print(f'-> before: {spawned.before}\n-> after: {spawned.after}\n-> {self.pattern}') def answer(self, spawned): print(f'-> answering "{self.response}" to "{spawned.after}"') if self.response is not None: spawned.sendline(self.response) if self.response in self.next: self.next[self.response].expect(spawned, previous_answer=self.response) elif None in self.next: self.next[None].expect(spawned, previous_answer=self.response) def is_the_one(self, context): #print(f'testing if it is the one {self.get_pattern()} for {context}') context = [l.strip() for l in context.strip().split('\r\n') if l.strip()] if not context: print('No context provided') return False context.reverse() for index, c in enumerate(self.context[len(self.context)-2::-1]): if c != ansi_escape.sub('', context[index]): return False return True def get_following_expectations(self): following = [] for next_expectation in set(self.next.values()): following.append(next_expectation) following.extend(next_expectation.get_following_expectations()) return following def scan_hardware(): # recherche d'informations par défaut # type de machine (si dispo) data_dict = {} process = subprocess.run(['/usr/bin/lshw', '-C', 'system', '-short'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = process.returncode hw_info = process.stdout.decode('utf-8') hw_err = process.stderr.decode('utf-8') # matériel (première ligne du premier élément de classe 'system') sys_name_lines = hw_info.split('system')[1] sys_name = sys_name_lines.split('\n')[0].strip() if 'materiel' not in data_dict: if sys_name.upper() != 'COMPUTER': data_dict['materiel'] = sys_name # modèle cpu if 'processeur' not in data_dict: with open('/proc/cpuinfo', 'r') as cpuinfo: for line in cpuinfo.read().split('\n'): if line.startswith('model name'): processeur = line.split(':')[1] processeur.strip() data_dict['processeur'] = processeur if line.startswith('cpu MHz'): speed = line.split(':')[1] speed.strip() data_dict['processeur'] += " %d MHz" % int(float(speed)) # disque dur (pas de /proc/partitions sur serveur virtualisé) if 'disque_dur' not in data_dict and os.path.isfile('/proc/partitions'): blocks = float(0) with open('/proc/partitions', 'r') as part_info: for line in part_info.read().split('\n')[2:]: data = line.split() if data != []: blocks += float(data[2]) # taille en Mo = (blocks * 512) / (1024 * 1024 * 1024) size = int(blocks / (2 * 1024 * 1024)) data_dict['disque_dur'] = "%s Go" % size return data_dict hardware_scan = scan_hardware() expectations = ExpectationCollection() already_registered = Expectation("""1 -> Désinscrire ce serveur du serveur Zéphir 2 -> Relancer l'enregistrement 3 -> Ne rien faire Entrez le numéro de votre choix :""", response='3', name='already_registered') network_configuration = Expectation(""" Procédure d'enregistrement sur le serveur Zéphir Voulez-vous établir une configuration réseau minimale (O/N) :""", response='N', name='network_configuration') interface_name = Expectation("""interface connectée sur l'extérieur""", response='ens0', name='interface_name') network_address = Expectation("""adresse_ip {variable} :""", response='192.168.1.2', name='') network_netmask = Expectation("""masque de réseau pour {variable} :""", response='255.255.255.0', name='network_netmask') gateway = Expectation("""adresse de la passerelle :""", response='192.168.1.1', name='gateway') zephir_address = Expectation("""Entrez l'adresse (nom DNS) du serveur Zéphir :""", response='zephir', name='zephir_address') zephir_admin = Expectation("""Entrez votre login pour l'application Zéphir (rien pour sortir) :""", response='admin_zephir', name='zephir_admin') zephir_admin_password = Expectation("""Mot de passe pour l'application Zéphir pour {variable} :""", response='eole', name='zephir_admin_password') new_server = Expectation("""créer le serveur dans la base du serveur Zéphir (O/N) :""", response='N', name='new_server') rne = Expectation("""entrez le RNE de l'établissement correspondant au serveur, (rien pour saisir directement un n° de serveur) :""", response='', name='rne') server_id = Expectation("""entrez le n° identifiant le serveur l'application Zéphir :""", response='1', name='server_id') hardware = Expectation(f"""Mise à jour des informations sur le matériel matériel ({hardware_scan['materiel']} par défaut) :""", response='', name='hardware') processor = Expectation(f"""processeur ({hardware_scan['processeur']} par défaut) :""", response='', name='processor') harddrive = Expectation(f"""disque dur ({hardware_scan['disque_dur']} par défaut) :""", response='', name='harddrive') key_available = Expectation("""(une procédure d'enregistrement à déjà eu lieu pour ce serveur) continuer l'enregistrement (O/N) ?""", response='O', name='key_available') final = Expectation("""1 -> Ne rien faire 2 -> Utiliser la configuration définie sur le serveur Zéphir 3 -> Non disponible 4 -> Modifier la variante du serveur Entrez le numéro de votre choix :""", response='2', name='action') expectations.add_expectation(already_registered) expectations.add_expectation(network_configuration) #expectations.add_expectation(new_server) expectations.add_expectation(hardware) expectations.add_expectation(key_available) expectations.add_expectation(final) network_configuration.set_next_expectation(interface_name, triggers=['O', 'Oui', 'OUI']) interface_name.set_next_expectation(network_address) network_address.set_next_expectation(network_netmask) network_netmask.set_next_expectation(gateway) network_configuration.set_next_expectation(zephir_address, triggers=['N', 'Non', 'NON']) new_server.set_next_expectation(rne, triggers=['N', 'Non', 'NON']) rne.set_next_expectation(server_id, triggers='') zephir_address.set_next_expectation(zephir_admin) zephir_admin.set_next_expectation(zephir_admin_password) zephir_admin_password.set_next_expectation(new_server) hardware.set_next_expectation(processor) processor.set_next_expectation(harddrive) patterns = expectations.get_patterns() nb_expectations = expectations.count_expectations() def run_module(): # define available arguments/parameters a user can pass to the module module_args = dict( variables=dict(type='dict', required=False), ) # seed the result dict in the object # we primarily care about changed and state # changed is if this module effectively modified the target # state will include any data that you want your module to pass back # for consumption, for example, in a subsequent task result = dict( changed=False, msg='', ) # the AnsibleModule object will be our abstraction working with Ansible # this includes instantiation, a couple of common attr would be the # args/params passed to the execution, as well as if the module # supports check mode module = AnsibleModule( argument_spec=module_args, supports_check_mode=True ) if module.check_mode: module.exit_json(**result) ACTIONS = {'none': '1', 'download': '2', 'upload': '3', 'modify': '4'} # if the user is working with this module in only check mode we do not # want to make any changes to the environment, just return the current # state with no modifications use_pexpect = True if module.params.get('variables', None): if not set(module.params['variables'].keys()).issubset(set(expectations.get_exposed_expectation_names())): unknown_variables = list(set(module.params['variables'].keys()).difference(set(expectations.get_exposed_expectation_names()))) result['msg'] += f"Variables {unknown_variables} not available\n" module.fail_json(**result) else: if 'action' in module.params['variables']: module.params['variables']['action'] = ACTIONS[module.params['variables']['action']] for expectation_name, response in module.params['variables'].items(): expectations.set_expectation_response_by_name(expectation_name, response) try: import importlib.machinery import importlib.util loader = importlib.machinery.SourceFileLoader( 'enregistrement_zephir', '/usr/bin/enregistrement_zephir' ) spec = importlib.util.spec_from_loader( 'enregistrement_zephir', loader ) enregistrement_zephir = importlib.util.module_from_spec( spec ) loader.exec_module( enregistrement_zephir ) if hasattr(enregistrement_zephir, 'argparse'): use_pexpect = False except: pass if use_pexpect: try: instance_process = pexpect.spawn('/usr/bin/enregistrement_zephir', encoding='utf-8', timeout=60, logfile=sys.stdout) some_index = 0 while some_index < nb_expectations: p = instance_process.expect(patterns) if p == 0: break if p == 1: print(f'Some missing expectations for {instance_process.before}{instance_process.after}') break pattern = patterns[p] for expectation in expectations.get_expectations_by_pattern(patterns[p]): if expectation.is_the_one(instance_process.before): expectation.answer(instance_process) break some_index += 1 result['changed'] = True module.exit_json(**result) except Exception as err: result['changed'] = True module.fail_json(**result) else: result = run(['/usr/bin/enregistrement_zephir', '--adresse_zephir', module.params['zephir_address'], '--user', module.params['zephir_user'], '--password', module.params['zephir_user_password'], '--id_serveur', module.params['server_id'], '--choix', ACTIONS.get(module.params['action'], '2')], capture_output=True, check=True) # manipulate or modify the state as needed (this is going to be the # part where your module will do what it needs to do) # use whatever logic you need to determine whether or not this module # made any modifications to your target try: from zephir.zephir_conf.zephir_conf import id_serveur result['changed'] = True except Exception as err: result['msg'] += f'Server not registered {err}' module.fail_json(**result) # during the execution of the module, if there is an exception or a # conditional state that effectively causes a failure, run # AnsibleModule.fail_json() to pass in the message and the result # in the event of a successful module execution, you will want to # simple AnsibleModule.exit_json(), passing the key/value results module.exit_json(**result) def main(): run_module() if __name__ == '__main__': main()