From 8ee744416c4e4a3f2765bcd7b8d922893eb7129d Mon Sep 17 00:00:00 2001 From: Benjamin Bohard Date: Mon, 30 Aug 2021 15:31:37 +0200 Subject: [PATCH] pexpectation module --- .gitignore | 1 + __init__.py | 0 eole_module_expectations/__init__.py | 0 .../eolebase_instance_pexpectations.py | 30 ++++++ .../register_zephir_pexpectations.py | 80 +++++++++++++++ .../zephir_instance_pexpectations.py | 37 +++++++ pexpectation.py | 99 +++++++++++++++++++ 7 files changed, 247 insertions(+) create mode 100644 .gitignore create mode 100644 __init__.py create mode 100644 eole_module_expectations/__init__.py create mode 100644 eole_module_expectations/eolebase_instance_pexpectations.py create mode 100644 eole_module_expectations/register_zephir_pexpectations.py create mode 100644 eole_module_expectations/zephir_instance_pexpectations.py create mode 100644 pexpectation.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eole_module_expectations/__init__.py b/eole_module_expectations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eole_module_expectations/eolebase_instance_pexpectations.py b/eole_module_expectations/eolebase_instance_pexpectations.py new file mode 100644 index 0000000..a4d8552 --- /dev/null +++ b/eole_module_expectations/eolebase_instance_pexpectations.py @@ -0,0 +1,30 @@ +from ..pexpectation import Expectation, ExpectationCollection + +expectations = ExpectationCollection() +exp3 = Expectation("""###################################################### +# Changement du mot de passe pour l’utilisateur root # +###################################################### +Nouveau mot de passe:""", response='eole', name='root_password') +exp3_1 = Expectation("""Confirmation du mot de passe:""", response='eole', name='root_password') + +exp3.set_next_expectation(exp3_1) +expectations.add_expectation(exp3) + +exp4 = Expectation("""###################################################### +# Changement du mot de passe pour l’utilisateur eole # +###################################################### +Nouveau mot de passe:""", response='eole', name='eole_password') +exp4_1 = Expectation("""Confirmation du mot de passe:""", response='eole', name='eole_password') +exp4_2 = Expectation("""Créer un nouvel administrateur eole2 ? [oui/non] +[non] :""", response='') + +exp4.set_next_expectation(exp4_1) +exp4_1.set_next_expectation(exp4_2) +expectations.add_expectation(exp4) + +exp5 = Expectation("""Une mise à jour est recommandée +Voulez-vous effectuer une mise à jour via le réseau maintenant ? [oui/non] +[oui] :""", response='non', name='maj_fin_instance') + +expectations.add_expectation(exp5) + diff --git a/eole_module_expectations/register_zephir_pexpectations.py b/eole_module_expectations/register_zephir_pexpectations.py new file mode 100644 index 0000000..c85eb2e --- /dev/null +++ b/eole_module_expectations/register_zephir_pexpectations.py @@ -0,0 +1,80 @@ +from ..pexpectation import Expectation, ExpectationCollection +from .eolebase_instance_pexpectations import expectations as eolebase_expectations + +expectations = ExpectationCollection() + +already_registered = Expectation("""1 -> Désinscrire ce serveur du serveur Zéphir +2 -> Relancer l'enregistrement +3 -> Ne rien faire + + Entrez le numéro de votre choix :""", response='3', name='already_registered') + +network_configuration = Expectation(""" Procédure d'enregistrement sur le serveur Zéphir + + +Voulez-vous établir une configuration réseau minimale (O/N) :""", response='N', name='network_configuration') + +interface_name = Expectation("""interface connectée sur l'extérieur""", response='ens0', name='interface_name') +network_address = Expectation("""adresse_ip {interface} :""", response='192.168.1.2', name='') +network_netmask = Expectation("""masque de réseau pour {interface} :""", response='255.255.255.0', name='network_netmask') +gateway = Expectation("""adresse de la passerelle :""", response='192.168.1.1', name='gateway') + + +"""Entrez l'adresse (nom DNS) du serveur Zéphir :""" +"""Entrez votre login pour l'application Zéphir (rien pour sortir) :""" +"""Mot de passe pour l'application Zéphir pour admin :""" + +"""créer le serveur dans la base du serveur Zéphir (O/N) :""" + +"""entrez le RNE de l'établissement correspondant au serveur +(rien pour saisir directement un n° de serveur) :""" + +"""entrez le n° identifiant le serveur l'application Zéphir :""" + +"""Mise à jour des informations sur le matériel +matériel (Standard PC (Q35 + ICH9, 2009) par défaut) :""" +"""processeur ( Intel Core Processor (Skylake, IBRS) 3191 MHz par défaut) :""" +"""disque dur (19 Go par défaut) :""" + +"""(une procédure d'enregistrement à déjà eu lieu pour ce serveur) +continuer l'enregistrement (O/N) ?""" + +"""1 -> Ne rien faire +2 -> Utiliser la configuration définie sur le serveur Zéphir +3 -> Non disponible +4 -> Modifier la variante du serveur + + Entrez le numéro de votre choix :""" + +exp1 = Expectation("""############################################################################# +# Initialisation du mot de passe de l'administrateur de base (admin_zephir) # +############################################################################# +Mot de passe :""", response='eole', name='admin_zephir_password') + +exp1_1 = Expectation("""Confirmation du mot de passe :""", response='eole', name='admin_zephir_password') +exp1.set_next_expectation(exp1_1) +expectations.add_expectation(exp1) + +exp2 = Expectation("""Vous pouvez maintenant créer des utilisateurs si vous le souhaitez +Attribuez leur des droits sur l'application via l'interface web + +nom d'utilisateur a créer (rien pour terminer) : """, response='admin', name='other_zephir_account') + +exp2_1 = Expectation("""Mot de passe du nouvel utilisateur : """, response='eole', name='other_zephir_account_password') +exp2_2 = Expectation("""Saisissez à nouveau le mot de passe : """, response='eole', name='other_zephir_account_password') +exp2_3 = Expectation("""nom d'utilisateur a créer (rien pour terminer) : """, response='', name='empty_zephir_account') + +exp2_2.set_next_expectation(exp2_3) +exp2_1.set_next_expectation(exp2_2) +exp2.set_next_expectation(exp2_1) +expectations.add_expectation(exp2) + +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.6.1""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.6.2""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.0""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.1""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.2""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.8.0""") +expectations.add_expectation_from_descr("""Start Systemd services""") + +expectations.merge(eolebase_expectations) diff --git a/eole_module_expectations/zephir_instance_pexpectations.py b/eole_module_expectations/zephir_instance_pexpectations.py new file mode 100644 index 0000000..e7197ec --- /dev/null +++ b/eole_module_expectations/zephir_instance_pexpectations.py @@ -0,0 +1,37 @@ +from ..pexpectation import Expectation, ExpectationCollection +from .eolebase_instance_pexpectations import expectations as eolebase_expectations + +expectations = ExpectationCollection() + +exp1 = Expectation("""############################################################################# +# Initialisation du mot de passe de l'administrateur de base (admin_zephir) # +############################################################################# +Mot de passe :""", response='eole', name='admin_zephir_password') + +exp1_1 = Expectation("""Confirmation du mot de passe :""", response='eole', name='admin_zephir_password') +exp1.set_next_expectation(exp1_1) +expectations.add_expectation(exp1) + +exp2 = Expectation("""Vous pouvez maintenant créer des utilisateurs si vous le souhaitez +Attribuez leur des droits sur l'application via l'interface web + +nom d'utilisateur a créer (rien pour terminer) : """, response='admin', name='other_zephir_account') + +exp2_1 = Expectation("""Mot de passe du nouvel utilisateur : """, response='eole', name='other_zephir_account_password') +exp2_2 = Expectation("""Saisissez à nouveau le mot de passe : """, response='eole', name='other_zephir_account_password') +exp2_3 = Expectation("""nom d'utilisateur a créer (rien pour terminer) : """, response='', name='empty_zephir_account') + +exp2_2.set_next_expectation(exp2_3) +exp2_1.set_next_expectation(exp2_2) +exp2.set_next_expectation(exp2_1) +expectations.add_expectation(exp2) + +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.6.1""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.6.2""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.0""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.1""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.2""") +expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.8.0""") +expectations.add_expectation_from_descr("""Start Systemd services""") + +expectations.merge(eolebase_expectations) diff --git a/pexpectation.py b/pexpectation.py new file mode 100644 index 0000000..af00fe6 --- /dev/null +++ b/pexpectation.py @@ -0,0 +1,99 @@ +import re +import pexpect +ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + +class ExpectationCollection: + def __init__(self): + self.expectations_lookup = {} + + def add_expectation(self, expectation): + if expectation.get_pattern() in self.expectations_lookup: + if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]: + print(f'Can not add {expectation} to collection') + return False + self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation] + return True + + def add_expectation_from_descr(self, expectation, response=None, name=None): + expectation = Expectation(expectation, response=response, name=name) + self.add_expectation(expectation) + + def get_patterns(self): + return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys()) + + def get_expectations_by_pattern(self, pattern): + return self.expectations_lookup.get(pattern, []) + + def get_expectation_by_name(self, name): + return [expectation.name for expectation in self.get_expectations(recursive=True)] + + def get_expectations(self, recursive=False): + expectations = [exp for exps in self.expectations_lookup.values() for exp in exps] + if recursive: + expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()]) + pass + return expectations + + def count_expectations(self): + return sum([exp.count_expectations() for exp in self.get_expectations()]) + + def merge(self, collection): + for expectation in collection.get_expectations(): + self.add_expectation(expectation) + + +class Expectation: + def __init__(self, multiline_pattern, response=None, name=None): + self.name = name + self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()] + self.pattern = self.context[-1] + self.response = response + self.next = None + + def get_pattern(self): + return re.compile(f'(.*){re.escape(self.pattern)}(.*)') + + def set_next_expectation(self, expectation): + self.next = expectation + + def set_response(self, response): + self.response = response + + def expect(self, spawned): + print(f'-> expecting next "{self.pattern}"') + p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern()]) + if p not in [0, 1]: + self.answer(spawned) + else: + print(f'-> before: {spawned.before}\n-> after: {spawned.after}\n-> {self.pattern}') + + def answer(self, spawned): + print(f'-> answering "{self.response}" to "{spawned.after}"') + if self.response is not None: + spawned.sendline(self.response) + if self.next: + self.next.expect(spawned) + + def is_the_one(self, context): + #print(f'testing if it is the one {self.get_pattern()} for {context}') + context = [l.strip() for l in context.strip().split('\r\n') if l.strip()] + if not context: + print('No context provided') + return False + context.reverse() + for index, c in enumerate(self.context[len(self.context)-2::-1]): + if c != ansi_escape.sub('', context[index]): + return False + return True + + def count_expectations(self): + count = 1 + if self.next: + count += self.next.count_expectations() + return count + + def get_following_expectations(self): + if self.next: + return [self.next] + self.next.get_following_expectations() + return [] +