Utilisation du module maison pexpectation

This commit is contained in:
Benjamin Bohard 2021-09-02 10:31:37 +02:00
parent b553f77ead
commit d3ebf05027
1 changed files with 265 additions and 61 deletions

View File

@ -17,26 +17,10 @@ version_added: "1.0.0"
description: This is my longer description explaining my test module.
options:
zephir_user:
description: zephir user name.
required: true
type: str
zephir_user_password:
description: zephir user password.
required: true
type: str
zephir_address:
description: address used to contact zephir server.
required: required
type: str
server_id:
description: id in zephir database the server will be registered with.
required: true
type: int
action:
description: action to perform regarding server configuration state.
required: true
type: str
variables:
description: dictionnary of variables used to answer interactive questions
required: false
type: dict
# Specify this value according to your collection
# in format of namespace.collection.doc_fragment_name
extends_documentation_fragment:
@ -50,6 +34,7 @@ EXAMPLES = r'''
# Pass in a message
- name: Test with a message
cadoles.eole.zephir_register:
variables:
zephir_user: admin
zephir_user_password: eole
zephir_address: zephir.infra.lan
@ -87,15 +72,224 @@ variant_id:
from ansible.module_utils.basic import AnsibleModule
import re
import pexpect
import subprocess
import os
import sys
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
class ExpectationCollection:
def __init__(self):
self.expectations_lookup = {}
def add_expectation(self, expectation):
if expectation.get_pattern() in self.expectations_lookup:
if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]:
print(f'Can not add {expectation} to collection')
return False
self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation]
return True
def add_expectation_from_descr(self, expectation, response=None, name=None):
expectation = Expectation(expectation, response=response, name=name)
self.add_expectation(expectation)
def get_patterns(self):
return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys())
def get_expectations_by_pattern(self, pattern):
return self.expectations_lookup.get(pattern, [])
def get_expectations_by_name(self, name):
return [expectation for expectation in self.get_expectations(recursive=True) if expectation.name == name]
def get_exposed_expectation_names(self):
return [expectation.name for expectation in self.get_expectations(recursive=True) if expectation.name]
def set_expectation_response_by_name(self, name, response):
for expectation in self.get_expectations_by_name(name):
expectation.set_response(response)
def get_expectations(self, recursive=False):
expectations = [exp for exps in self.expectations_lookup.values() for exp in exps]
if recursive:
expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()])
return list(set(expectations))
def count_expectations(self):
return len(self.get_expectations(recursive=True))
def merge(self, collection):
for expectation in collection.get_expectations():
self.add_expectation(expectation)
class Expectation:
def __init__(self, multiline_pattern, response=None, name=None):
self.name = name
self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()]
self.pattern = self.context[-1]
self.response = response
self.next = {}
def get_pattern(self, previous_answer=None):
return re.compile(f'(.*){re.escape(self.pattern.format(variable=previous_answer))}(.*)')
def set_next_expectation(self, expectation, triggers=None):
if not isinstance(triggers, list):
triggers = [triggers]
for trigger in triggers:
self.next[trigger] = expectation
def set_response(self, response):
print(f'Setting {response} for {self.pattern}')
self.response = response
def expect(self, spawned, previous_answer=None):
print(f'-> expecting next "{self.pattern.format(variable=previous_answer)}"')
p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern(previous_answer=previous_answer)])
if p not in [0, 1]:
self.answer(spawned)
else:
print(f'-> before: {spawned.before}\n-> after: {spawned.after}\n-> {self.pattern}')
def answer(self, spawned):
print(f'-> answering "{self.response}" to "{spawned.after}"')
if self.response is not None:
spawned.sendline(self.response)
if self.response in self.next:
self.next[self.response].expect(spawned, previous_answer=self.response)
elif None in self.next:
self.next[None].expect(spawned, previous_answer=self.response)
def is_the_one(self, context):
#print(f'testing if it is the one {self.get_pattern()} for {context}')
context = [l.strip() for l in context.strip().split('\r\n') if l.strip()]
if not context:
print('No context provided')
return False
context.reverse()
for index, c in enumerate(self.context[len(self.context)-2::-1]):
if c != ansi_escape.sub('', context[index]):
return False
return True
def get_following_expectations(self):
following = []
for next_expectation in set(self.next.values()):
following.append(next_expectation)
following.extend(next_expectation.get_following_expectations())
return following
def scan_hardware():
# recherche d'informations par défaut
# type de machine (si dispo)
data_dict = {}
process = subprocess.run(['/usr/bin/lshw', '-C', 'system', '-short'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = process.returncode
hw_info = process.stdout.decode('utf-8')
hw_err = process.stderr.decode('utf-8')
# matériel (première ligne du premier élément de classe 'system')
sys_name_lines = hw_info.split('system')[1]
sys_name = sys_name_lines.split('\n')[0].strip()
if 'materiel' not in data_dict:
if sys_name.upper() != 'COMPUTER':
data_dict['materiel'] = sys_name
# modèle cpu
if 'processeur' not in data_dict:
with open('/proc/cpuinfo', 'r') as cpuinfo:
for line in cpuinfo.read().split('\n'):
if line.startswith('model name'):
processeur = line.split(':')[1]
processeur.strip()
data_dict['processeur'] = processeur
if line.startswith('cpu MHz'):
speed = line.split(':')[1]
speed.strip()
data_dict['processeur'] += " %d MHz" % int(float(speed))
# disque dur (pas de /proc/partitions sur serveur virtualisé)
if 'disque_dur' not in data_dict and os.path.isfile('/proc/partitions'):
blocks = float(0)
with open('/proc/partitions', 'r') as part_info:
for line in part_info.read().split('\n')[2:]:
data = line.split()
if data != []:
blocks += float(data[2])
# taille en Mo = (blocks * 512) / (1024 * 1024 * 1024)
size = int(blocks / (2 * 1024 * 1024))
data_dict['disque_dur'] = "%s Go" % size
return data_dict
hardware_scan = scan_hardware()
expectations = ExpectationCollection()
already_registered = Expectation("""1 -> Désinscrire ce serveur du serveur Zéphir
2 -> Relancer l'enregistrement
3 -> Ne rien faire
Entrez le numéro de votre choix :""", response='3', name='already_registered')
network_configuration = Expectation(""" Procédure d'enregistrement sur le serveur Zéphir
Voulez-vous établir une configuration réseau minimale (O/N) :""", response='N', name='network_configuration')
interface_name = Expectation("""interface connectée sur l'extérieur""", response='ens0', name='interface_name')
network_address = Expectation("""adresse_ip {variable} :""", response='192.168.1.2', name='')
network_netmask = Expectation("""masque de réseau pour {variable} :""", response='255.255.255.0', name='network_netmask')
gateway = Expectation("""adresse de la passerelle :""", response='192.168.1.1', name='gateway')
zephir_address = Expectation("""Entrez l'adresse (nom DNS) du serveur Zéphir :""", response='zephir', name='zephir_address')
zephir_admin = Expectation("""Entrez votre login pour l'application Zéphir (rien pour sortir) :""", response='admin_zephir', name='zephir_admin')
zephir_admin_password = Expectation("""Mot de passe pour l'application Zéphir pour {variable} :""", response='eole', name='zephir_admin_password')
new_server = Expectation("""créer le serveur dans la base du serveur Zéphir (O/N) :""", response='N', name='new_server')
rne = Expectation("""entrez le RNE de l'établissement correspondant au serveur,
(rien pour saisir directement un n° de serveur) :""", response='', name='rne')
server_id = Expectation("""entrez le n° identifiant le serveur l'application Zéphir :""", response='1', name='server_id')
hardware = Expectation(f"""Mise à jour des informations sur le matériel
matériel ({hardware_scan['materiel']} par défaut) :""", response='', name='hardware')
processor = Expectation(f"""processeur ({hardware_scan['processeur']} par défaut) :""", response='', name='processor')
harddrive = Expectation(f"""disque dur ({hardware_scan['disque_dur']} par défaut) :""", response='', name='harddrive')
key_available = Expectation("""(une procédure d'enregistrement à déjà eu lieu pour ce serveur)
continuer l'enregistrement (O/N) ?""", response='O', name='key_available')
final = Expectation("""1 -> Ne rien faire
2 -> Utiliser la configuration définie sur le serveur Zéphir
3 -> Non disponible
4 -> Modifier la variante du serveur
Entrez le numéro de votre choix :""", response='2', name='action')
expectations.add_expectation(already_registered)
expectations.add_expectation(network_configuration)
#expectations.add_expectation(new_server)
expectations.add_expectation(hardware)
expectations.add_expectation(key_available)
expectations.add_expectation(final)
network_configuration.set_next_expectation(interface_name, triggers=['O', 'Oui', 'OUI'])
interface_name.set_next_expectation(network_address)
network_address.set_next_expectation(network_netmask)
network_netmask.set_next_expectation(gateway)
network_configuration.set_next_expectation(zephir_address, triggers=['N', 'Non', 'NON'])
new_server.set_next_expectation(rne, triggers=['N', 'Non', 'NON'])
rne.set_next_expectation(server_id, triggers='')
zephir_address.set_next_expectation(zephir_admin)
zephir_admin.set_next_expectation(zephir_admin_password)
zephir_admin_password.set_next_expectation(new_server)
hardware.set_next_expectation(processor)
processor.set_next_expectation(harddrive)
patterns = expectations.get_patterns()
nb_expectations = expectations.count_expectations()
def run_module():
# define available arguments/parameters a user can pass to the module
module_args = dict(
zephir_user=dict(type='str', required=True),
zephir_user_password=dict(type='str', required=True),
zephir_address=dict(type='str', required=True),
server_id=dict(type='int', required=True),
action=dict(type='str', required=True),
variables=dict(type='dict', required=False),
)
# seed the result dict in the object
@ -105,6 +299,7 @@ def run_module():
# for consumption, for example, in a subsequent task
result = dict(
changed=False,
msg='',
)
# the AnsibleModule object will be our abstraction working with Ansible
@ -119,54 +314,62 @@ def run_module():
if module.check_mode:
module.exit_json(**result)
ACTIONS = {'Ne rien faire': '1',
'Utiliser la configuration définie sur le serveur Zéphir': '2',
'Sauver la configuration actuelle sur le serveur Zéphir': '3',
'Modifier la variante de serveur': '4'}
ACTIONS = {'none': '1',
'download': '2',
'upload': '3',
'modify': '4'}
# if the user is working with this module in only check mode we do not
# want to make any changes to the environment, just return the current
# state with no modifications
try:
from zephir.zephir_conf.zephir_conf import id_serveur
module.exit_json(**result)
except:
pass
use_pexpect = True
if module.params.get('variables', None):
if not set(module.params['variables'].keys()).issubset(set(expectations.get_exposed_expectation_names())):
unknown_variables = list(set(module.params['variables'].keys()).difference(set(expectations.get_exposed_expectation_names())))
result['msg'] += f"Variables {unknown_variables} not available\n"
module.fail_json(**result)
else:
if 'action' in module.params['variables']:
module.params['variables']['action'] = ACTIONS[module.params['variables']['action']]
for expectation_name, response in module.params['variables'].items():
expectations.set_expectation_response_by_name(expectation_name, response)
try:
import importlib.machinery
import importlib.util
# Import mymodule
loader = importlib.machinery.SourceFileLoader( 'enregistrement_zephir', '/usr/bin/enregistrement_zephir' )
spec = importlib.util.spec_from_loader( 'enregistrement_zephir', loader )
enregistrement_zephir = importlib.util.module_from_spec( spec )
loader.exec_module( enregistrement_zephir )
if hasattr(enregistrement_zephir, 'argparse'):
use_pexpect = False
from subprocess import run
except:
import pexpect
pass
if use_pexpect:
responses = {"(.*)Voulez-vous établir une configuration réseau minimale(.*)": "N",
"(.*)Entrez l'adresse(.*)": module.params['zephir_address'],
"(.*)Entrez votre login pour l'application Zéphir(.*)": module.params['zephir_user'],
"(.*)Mot de passe pour l'application Zéphir pour(.*)": module.params['zephir_user_password'],
"(.*)créer le serveur dans la base du serveur Zéphir(.*)": "N",
"(.*)rien pour saisir directement un n° de serveur(.*)": "",
"(.*)entrez le n° identifiant le serveur l'application Zéphir(.*)": module.params['server_id'],
"(.*)matériel(.*)": "",
"(.*)processeur(.*)": "",
"(.*)disque dur(.*)": "",
"(.*)continuer(.*)": "O",
"(.*)Entrez le numéro de votre choix(.*)": ACTIONS.get(module.params['action'], '2'),
}
registering_process = pexpect.spawn('/usr/bin/enregistrement_zephir')
for key, value in responses.items():
registering_process.expect(key)
registering_process.sendline(value)
try:
instance_process = pexpect.spawn('/usr/bin/enregistrement_zephir', encoding='utf-8', timeout=60, logfile=sys.stdout)
some_index = 0
while some_index < nb_expectations:
p = instance_process.expect(patterns)
if p == 0:
break
if p == 1:
print(f'Some missing expectations for {instance_process.before}{instance_process.after}')
break
pattern = patterns[p]
for expectation in expectations.get_expectations_by_pattern(patterns[p]):
if expectation.is_the_one(instance_process.before):
expectation.answer(instance_process)
break
some_index += 1
result['changed'] = True
module.exit_json(**result)
except Exception as err:
result['changed'] = True
module.fail_json(**result)
else:
run(['/usr/bin/enregistrement_zephir', '--adresse_zephir', module.params['zephir_address'], '--user', module.params['zephir_user'], '--password', module.params['zephir_user_password'], '--id_serveur', module.params['server_id'], '--choix', ACTIONS.get(module.params['action'], '2')], capture_output=True, check=True)
result = run(['/usr/bin/enregistrement_zephir', '--adresse_zephir', module.params['zephir_address'], '--user', module.params['zephir_user'], '--password', module.params['zephir_user_password'], '--id_serveur', module.params['server_id'], '--choix', ACTIONS.get(module.params['action'], '2')], capture_output=True, check=True)
# manipulate or modify the state as needed (this is going to be the
# part where your module will do what it needs to do)
@ -177,8 +380,9 @@ def run_module():
try:
from zephir.zephir_conf.zephir_conf import id_serveur
result['changed'] = True
except:
module.fail_json(msg='Server not registered', **result)
except Exception as err:
result['msg'] += f'Server not registered {err}'
module.fail_json(**result)
# during the execution of the module, if there is an exception or a
# conditional state that effectively causes a failure, run