commit df5acdc2efb3920ae6438e40f8467a6323be72be Author: Emmanuel Garette Date: Fri Nov 8 16:47:09 2019 +0100 first commit diff --git a/dag/backup_maj_auto.py b/dag/backup_maj_auto.py new file mode 100644 index 0000000..ca9af5e --- /dev/null +++ b/dag/backup_maj_auto.py @@ -0,0 +1,164 @@ +from datetime import datetime +from time import sleep +from subprocess import run, PIPE +import re +from airflow import DAG +from airflow.contrib.sensors.python_sensor import PythonSensor +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import PythonOperator +from airflow.operators.bash_operator import BashOperator +from airflow.exceptions import AirflowSkipException + +def print_hello(): + sleep(60) + return 'Hello world!' + +dag = DAG('backup_dag', description='My DAG backup', + schedule_interval='0 12 * * *', + start_date=datetime(2017, 3, 20), catchup=False) + +#dummy_task = DummyOperator(task_id='dummy_task', dag=dag) +# +#sensor_task = MyFirstSensor(task_id='my_sensor_task', poke_interval=30, dag=dag) +# +#hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag) +# +#operator_task = MyFirstOperator(my_operator_param='This is a test.', +# task_id='my_first_operator_task', dag=dag) +# +#dummy_task >> sensor_task >> hello_operator >> operator_task +BCONSOLE_CONF = '/etc/bareos/bconsole.conf' +FILE_LOG_BACKUP = '/var/log/rsyslog/local/bareos-dir/bareos-dir.info.log' +def get_queries(): + """ + Return dictionnary of queries (conventionnal name + and order of appearance in /etc/bareos/query.sql) + """ + query_header = re.compile(r'^:') + eole_query = re.compile(r'\(EOLE:(?P\w+)\)') + with open('/etc/bareos/query.sql', 'r') as query_file: + queries_list = [query for query in query_file.readlines() + if query_header.match(query)] + queries = {} + for i in range(len(queries_list)): + query = eole_query.search(queries_list[i]) + if query: + queries[query.group('query')] = i + 1 + return queries + + +def run_bareos_cmd(stdin): + cmd = ['bconsole', '-c', BCONSOLE_CONF] + return run(cmd, input=stdin.encode(), check=True, stdout=PIPE, stderr=PIPE) + + +def bareos_query(query, *args): + """ + Return bareos query command result + :param query: query id as specified in /etc/bareos/query.sql + :type query: str + :param args: list of parameters needed for query + :type args: list + """ + queries = get_queries() + try: + query_num = queries[query] + except KeyError: + raise Exception('La liste des fonctions disponibles est la suivante : \n{0}'.format('\n'.join(queries.keys()))) + results_list = [] + params = '\n'.join(args) + '\nquit' + ret = run_bareos_cmd('query\n{0}\n{1}\n'.format(query_num, params)) + res = ret.stdout.decode() + bareos_query_re = re.compile(r'\+(-+\+)+') + try: + resu = bareos_query_re.search(res) + if resu: + res = res.split(resu.group()) + else: + return [] + header = [title.strip() for title in res[1].split('|') if title != '\n'] + results = [[j.strip() for j in i.split('|') if j != ''] for i in res[2].split('\n') if i != ''] + for result in results: + result_dict = {} + for value, key in zip(result, header): + result_dict[key] = value + results_list.append(result_dict) + except AttributeError: + raise Exception('Résultat de la commande bconsole imprévu') + return results_list + + +def run_backup(level='Full', when='', bareos_fd=None): + """ + Lancement de la sauvegarde + :param level: type de la sauvegarde + :type level: str + :param when: heure de la sauvegarde + :type when: str + """ + if level not in ['Full', 'Incremental', 'Differential']: + raise Exception('Level {0} inconnu'.format(level)) + if when != '': + if re.match(r'[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}', when) == None: + raise Exception('Horaire mal renseigné') + else: + when = ' When=\"{0}\"'.format(when) + if bareos_fd is None: + bareos_fd = '' + #ret, stdout, stderr = run_bareos_cmd("run JobSchedule{1}Pre Level=Full{0}\n\n".format(when, bareos_fd)) + #if ret != 0: + # raise Exception('ERREUR : de sauvegarde {0} : {1}, {2}'.format(level, stdout, stderr)) + ret = run_bareos_cmd("run JobSauvegarde{2} Level={0}{1}\n\n".format(level, when, bareos_fd)) + if ret.returncode != 0: + raise Exception(u'ERREUR : de sauvegarde {0} : {1}, {2}'.format(level, ret.stdout, ret.stderr)) + #run catalog backup every time + #ret, stdout, stderr = run_bareos_cmd("run BackupCatalog level=Full{0}\n\n".format(when)) + #if ret != 0: + # raise Exception(u'ERREUR : de sauvegarde du catalogue {0} : {1}, {2}'.format(level, stdout, stderr)) + return "Sauvegarde {0} lancée\nVous pouvez suivre son évolution dans le fichier {1}".format(level, FILE_LOG_BACKUP) + + +def not_running_jobs(): + return bareos_query('running_jobs')[0]['NB'] == '0' + + +def update_available(): + query_auto = run(['Query-Auto'], check=True, stdout=PIPE) + return query_auto.stdout.decode().strip().split('\n')[-1] != 'Aucun paquet à installer.' +# +# kwargs['task_instance'].xcom_push('update_available', status) +# return True + + +def maj_auto(**kwargs): + status_update_available = kwargs['task_instance'].xcom_pull('sensor_update_available', key='return_value') + print(status_update_available, type(status_update_available)) + if not status_update_available: + raise AirflowSkipException() + run(['Maj-Auto'], check=True) + run(['reconfigure'], check=True) + return "Maj-Auto OK" + + +sensor_backup_not_running = PythonSensor(task_id='sensor_backup_not_running', python_callable=not_running_jobs, poke_interval=30, dag=dag) +pre_mysql = BashOperator( + task_id='pre_mysql', + bash_command='/usr/share/eole/schedule/scripts/mysql', + dag=dag, +) +pre_queryauto = BashOperator( + task_id='pre_queryauto', + bash_command='/usr/share/eole/schedule/scripts/queryauto', + dag=dag, +) + +launch_backup = PythonOperator(task_id='launch_backup', python_callable=run_backup, dag=dag) + +sensor_backup_ended = PythonSensor(task_id='sensor_backup_ended', python_callable=not_running_jobs, poke_interval=30, dag=dag) + +sensor_update_available = PythonOperator(task_id='sensor_update_available', python_callable=update_available, dag=dag) + +maj_auto = PythonOperator(task_id='maj_auto', python_callable=maj_auto, dag=dag, provide_context=True) +dummy_task = DummyOperator(task_id='end', dag=dag) + +sensor_backup_not_running >> [pre_mysql, pre_queryauto] >> launch_backup >> sensor_backup_ended >> sensor_update_available >> maj_auto >> dummy_task