eole-galaxy/cadoles/eole/plugins/modules/instance.py

399 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python
# Copyright: (c) 2018, Terry Jones <terry.jones@example.org>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = r'''
---
module: instance
short_description: This is a module to automate EOLE module instance
# If this is part of a collection, you need to use semantic versioning,
# i.e. the version is of the form "2.5.0" and not "2.4".
version_added: "1.0.0"
description: This is my longer description explaining my test module.
options:
module:
description: EOLE module type
required: true
type: str
variables:
description: dictionnary of variables overloading default instance values.
required: false
type: dict
# Specify this value according to your collection
# in format of namespace.collection.doc_fragment_name
extends_documentation_fragment:
- cadoles.eole.instance
author:
- Cadoles
'''
EXAMPLES = r'''
# Pass in a message
- name: Test with a message
cadoles.eole.instance:
module: eolebase
variables:
root_password: eole
eole_password: eole
'''
RETURN = r'''
# These are examples of possible return values, and in general should use other names for return values.
original_message:
description: The original name param that was passed in.
type: str
returned: always
sample: 'hello world'
message:
description: The output message that the test module generates.
type: str
returned: always
sample: 'goodbye'
'''
from ansible.module_utils.basic import AnsibleModule
import pexpect
import time
import sys
import re
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
def yml_params_to_unicode(param):
def convert_param(param):
if isinstance(param, str):
return param.decode('utf-8')
if isinstance(param, list):
return [convert_param(p) for p in param]
if isinstance(param, dict):
return {convert_param(key): convert_param(value) for key,value in param.items()}
return param
return convert_param(param)
class ExpectationCollection:
def __init__(self):
self.expectations_lookup = {}
def add_expectation(self, expectation):
if expectation.get_pattern() in self.expectations_lookup:
if expectation.context in [exp.context for exp in self.expectations_lookup[expectation.get_pattern()]]:
print('Can not add {} to collection'.format(expectation))
return False
self.expectations_lookup[expectation.get_pattern()] = self.expectations_lookup.setdefault(expectation.get_pattern(), []) + [expectation]
return True
def add_expectation_from_descr(self, expectation, response=None, name=None):
expectation = Expectation(expectation, response=response, name=name)
self.add_expectation(expectation)
def get_patterns(self):
return [pexpect.EOF, pexpect.TIMEOUT] + list(self.expectations_lookup.keys())
def get_expectations_by_pattern(self, pattern):
return self.expectations_lookup.get(pattern, [])
def get_expectations_by_name(self, name):
return [expectation for expectation in self.get_expectations(recursive=True) if expectation.name == name]
def get_exposed_expectation_names(self):
return [expectation.name for expectation in self.get_expectations(recursive=True) if expectation.name]
def set_expectation_response_by_name(self, name, response):
for expectation in self.get_expectations_by_name(name):
expectation.set_response(response)
def get_expectations(self, recursive=False):
expectations = [exp for exps in self.expectations_lookup.values() for exp in exps]
if recursive:
expectations.extend([exp for exps in expectations for exp in exps.get_following_expectations()])
return list(set(expectations))
def count_expectations(self):
return len(self.get_expectations(recursive=True))
def merge(self, collection):
for expectation in collection.get_expectations():
self.add_expectation(expectation)
class Expectation:
def __init__(self, multiline_pattern, response=None, name=None):
self.name = name
self.context = [p.strip() for p in multiline_pattern.strip().split('\n') if p.strip()]
self.pattern = self.context[-1]
self.response = response
self.next = {}
def get_pattern(self, previous_answer=None):
return re.compile(r'(.*){}(.*)'.format(re.escape(self.pattern.format(variable=previous_answer))))
def set_next_expectation(self, expectation, triggers=None):
if not isinstance(triggers, list):
triggers = [triggers]
for trigger in triggers:
self.next[trigger] = expectation
def set_response(self, response):
print('Setting {} for {}'.format(response, self.pattern))
self.response = response
def expect(self, spawned, previous_answer=None):
print('-> expecting next "{}"'.format(self.pattern.format(variable=previous_answer)))
p = spawned.expect([pexpect.EOF, pexpect.TIMEOUT, self.get_pattern(previous_answer=previous_answer)])
if p not in [0, 1]:
self.answer(spawned)
else:
print('-> before: {}\n-> after: {}\n-> {}'.format(spawned.before, spawned_after, self.pattern))
def answer(self, spawned):
print('-> answering "{}" to "{}"'.format(self.response, spawned.after))
if self.response is not None:
spawned.sendline(self.response)
if self.response in self.next:
self.next[self.response].expect(spawned, previous_answer=self.response)
elif None in self.next:
self.next[None].expect(spawned, previous_answer=self.response)
def is_the_one(self, context):
#print(f'testing if it is the one {self.get_pattern()} for {context}')
context = [l.strip() for l in context.strip().split('\r\n') if l.strip()]
if not context:
print('No context provided')
return False
context.reverse()
for index, c in enumerate(self.context[len(self.context)-2::-1]):
if sys.version_info < (3,):
c = c.decode('utf-8')
if c != ansi_escape.sub('', context[index]):
return False
return True
def get_following_expectations(self):
following = []
for next_expectation in set(self.next.values()):
following.append(next_expectation)
following.extend(next_expectation.get_following_expectations())
return following
expectations = ExpectationCollection()
supported_modules = {}
# eolebase
eolebase_expectations = ExpectationCollection()
exp3 = Expectation("""######################################################
# Changement du mot de passe pour lutilisateur root #
######################################################
Nouveau mot de passe:""", response='eole', name='root_password')
exp3_1 = Expectation("""Confirmation du mot de passe:""", response='eole', name='root_password')
exp3.set_next_expectation(exp3_1)
eolebase_expectations.add_expectation(exp3)
exp4 = Expectation("""######################################################
# Changement du mot de passe pour lutilisateur eole #
######################################################
Nouveau mot de passe:""", response='eole', name='eole_password')
exp4_1 = Expectation("""Confirmation du mot de passe:""", response='eole', name='eole_password')
exp4_2 = Expectation("""Créer un nouvel administrateur eole2 ? [oui/non]
[non] :""", response='')
exp4.set_next_expectation(exp4_1)
exp4_1.set_next_expectation(exp4_2)
eolebase_expectations.add_expectation(exp4)
exp5 = Expectation("""Une mise à jour est recommandée
Voulez-vous effectuer une mise à jour via le réseau maintenant ? [oui/non]
[oui] :""", response='non', name='maj_fin_instance')
eolebase_expectations.add_expectation(exp5)
supported_modules['eolebase'] = eolebase_expectations
# zephir
zephir_expectations = ExpectationCollection()
exp1 = Expectation("""#############################################################################
# Initialisation du mot de passe de l'administrateur de base (admin_zephir) #
#############################################################################
Mot de passe :""", response='eole', name='admin_zephir_password')
exp1_1 = Expectation("""Confirmation du mot de passe :""", response='eole', name='admin_zephir_password')
exp1.set_next_expectation(exp1_1)
zephir_expectations.add_expectation(exp1)
exp2 = Expectation("""Vous pouvez maintenant créer des utilisateurs si vous le souhaitez
Attribuez leur des droits sur l'application via l'interface web
nom d'utilisateur a créer (rien pour terminer) : """, response='admin', name='other_zephir_account')
exp2_1 = Expectation("""Mot de passe du nouvel utilisateur : """, response='eole', name='other_zephir_account_password')
exp2_2 = Expectation("""Saisissez à nouveau le mot de passe : """, response='eole', name='other_zephir_account_password')
exp2_3 = Expectation("""nom d'utilisateur a créer (rien pour terminer) : """, response='', name='empty_zephir_account')
exp2_2.set_next_expectation(exp2_3)
exp2_1.set_next_expectation(exp2_2)
exp2.set_next_expectation(exp2_1)
zephir_expectations.add_expectation(exp2)
zephir_expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.6.1""")
zephir_expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.6.2""")
zephir_expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.0""")
zephir_expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.1""")
zephir_expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.7.2""")
zephir_expectations.add_expectation_from_descr("""* Vérification des données (md5) : Eole 2.8.0""")
zephir_expectations.add_expectation_from_descr("""Start Systemd services""")
supported_modules['zephir'] = zephir_expectations
# amon
amon_expectations = ExpectationCollection()
# scribe
scribe_expectations = ExpectationCollection()
# seth
seth_expectations = ExpectationCollection()
module_expectations = [
eolebase_expectations,
amon_expectations,
zephir_expectations,
scribe_expectations,
seth_expectations,
]
for module_expectation in module_expectations:
expectations.merge(module_expectation)
cmd = ['/usr/bin/instance']
patterns = expectations.get_patterns()
nb_expectations = expectations.count_expectations()
def run_module():
# define available arguments/parameters a user can pass to the module
module_args = dict(
module=dict(type='str', required=True),
variables=dict(type='dict', required=False),
)
# seed the result dict in the object
# we primarily care about changed and state
# changed is if this module effectively modified the target
# state will include any data that you want your module to pass back
# for consumption, for example, in a subsequent task
result = dict(
changed=False,
module='',
msg='',
debug='',
)
# the AnsibleModule object will be our abstraction working with Ansible
# this includes instantiation, a couple of common attr would be the
# args/params passed to the execution, as well as if the module
# supports check mode
module = AnsibleModule(
argument_spec=module_args,
supports_check_mode=True
)
# if the user is working with this module in only check mode we do not
# want to make any changes to the environment, just return the current
# state with no modifications
if module.check_mode:
if module.params['module'] in supported_modules:
result['module'] = module.params["module"]
if module.params.get('variables', None) and not set(module.params['variables'].keys()).issubset(set(expectations.get_exposed_expectation_names())):
unknown_variables = list(set(module.params['variables'].keys()).difference(set(expectations.get_exposed_expectation_names())))
result['msg'] += "Variables {} not available\n".format(unknown_variables)
else:
for variable in module.params.get('variables', {}).keys():
result['msg'] += "Overloading variable {}\n".format(variable)
else:
result['msg'] += 'Module {} not supported\n'.format(module.module)
module.exit_json(**result)
if module.params['module'] not in supported_modules:
result['msg'] += "Unsupported module {}\n".format(module.params['module'])
module.fail_json(**result)
else:
result['module'] = module.params['module']
if module.params.get('variables', None):
if not set(module.params['variables'].keys()).issubset(set(expectations.get_exposed_expectation_names())):
unknown_variables = list(set(module.params['variables'].keys()).difference(set(expectations.get_exposed_expectation_names())))
result['msg'] += "Variables {} not available\n".format(unknown_variables)
module.fail_json(**result)
else:
for expectation_name, response in module.params['variables'].items():
expectations.set_expectation_response_by_name(expectation_name, response)
try:
instance_process = pexpect.spawn(cmd, encoding='utf-8', timeout=60)
some_index = 0
while some_index < nb_expectations:
p = instance_process.expect(patterns)
if p == 0:
break
if p == 1:
print('Some missing expectations for {}{}'.format(instance_process.before, instance_process.after))
break
pattern = patterns[p]
for expectation in expectations.get_expectations_by_pattern(patterns[p]):
if expectation.is_the_one(instance_process.before):
expectation.answer(instance_process)
break
some_index += 1
result['changed'] = True
result['msg'] += "Module {} instanciated".format(result['module'])
module.exit_json(**result)
except Exception as err:
result['msg'] += str(err)
result['changed'] = True
module.fail_json(**result)
# manipulate or modify the state as needed (this is going to be the
# part where your module will do what it needs to do)
# use whatever logic you need to determine whether or not this module
# made any modifications to your target
# during the execution of the module, if there is an exception or a
# conditional state that effectively causes a failure, run
# AnsibleModule.fail_json() to pass in the message and the result
# in the event of a successful module execution, you will want to
# simple AnsibleModule.exit_json(), passing the key/value results
def main():
run_module()
if __name__ == '__main__':
main()