from datetime import datetime from time import sleep from subprocess import run, PIPE import re from airflow import DAG from airflow.contrib.sensors.python_sensor import PythonSensor from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.bash_operator import BashOperator from airflow.exceptions import AirflowSkipException def print_hello(): sleep(60) return 'Hello world!' dag = DAG('backup_dag', description='My DAG backup', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False) #dummy_task = DummyOperator(task_id='dummy_task', dag=dag) # #sensor_task = MyFirstSensor(task_id='my_sensor_task', poke_interval=30, dag=dag) # #hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag) # #operator_task = MyFirstOperator(my_operator_param='This is a test.', # task_id='my_first_operator_task', dag=dag) # #dummy_task >> sensor_task >> hello_operator >> operator_task BCONSOLE_CONF = '/etc/bareos/bconsole.conf' FILE_LOG_BACKUP = '/var/log/rsyslog/local/bareos-dir/bareos-dir.info.log' def get_queries(): """ Return dictionnary of queries (conventionnal name and order of appearance in /etc/bareos/query.sql) """ query_header = re.compile(r'^:') eole_query = re.compile(r'\(EOLE:(?P\w+)\)') with open('/etc/bareos/query.sql', 'r') as query_file: queries_list = [query for query in query_file.readlines() if query_header.match(query)] queries = {} for i in range(len(queries_list)): query = eole_query.search(queries_list[i]) if query: queries[query.group('query')] = i + 1 return queries def run_bareos_cmd(stdin): cmd = ['bconsole', '-c', BCONSOLE_CONF] return run(cmd, input=stdin.encode(), check=True, stdout=PIPE, stderr=PIPE) def bareos_query(query, *args): """ Return bareos query command result :param query: query id as specified in /etc/bareos/query.sql :type query: str :param args: list of parameters needed for query :type args: list """ queries = get_queries() try: query_num = queries[query] except KeyError: raise Exception('La liste des fonctions disponibles est la suivante : \n{0}'.format('\n'.join(queries.keys()))) results_list = [] params = '\n'.join(args) + '\nquit' ret = run_bareos_cmd('query\n{0}\n{1}\n'.format(query_num, params)) res = ret.stdout.decode() bareos_query_re = re.compile(r'\+(-+\+)+') try: resu = bareos_query_re.search(res) if resu: res = res.split(resu.group()) else: return [] header = [title.strip() for title in res[1].split('|') if title != '\n'] results = [[j.strip() for j in i.split('|') if j != ''] for i in res[2].split('\n') if i != ''] for result in results: result_dict = {} for value, key in zip(result, header): result_dict[key] = value results_list.append(result_dict) except AttributeError: raise Exception('Résultat de la commande bconsole imprévu') return results_list def run_backup(level='Full', when='', bareos_fd=None): """ Lancement de la sauvegarde :param level: type de la sauvegarde :type level: str :param when: heure de la sauvegarde :type when: str """ if level not in ['Full', 'Incremental', 'Differential']: raise Exception('Level {0} inconnu'.format(level)) if when != '': if re.match(r'[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}', when) == None: raise Exception('Horaire mal renseigné') else: when = ' When=\"{0}\"'.format(when) if bareos_fd is None: bareos_fd = '' #ret, stdout, stderr = run_bareos_cmd("run JobSchedule{1}Pre Level=Full{0}\n\n".format(when, bareos_fd)) #if ret != 0: # raise Exception('ERREUR : de sauvegarde {0} : {1}, {2}'.format(level, stdout, stderr)) ret = run_bareos_cmd("run JobSauvegarde{2} Level={0}{1}\n\n".format(level, when, bareos_fd)) if ret.returncode != 0: raise Exception(u'ERREUR : de sauvegarde {0} : {1}, {2}'.format(level, ret.stdout, ret.stderr)) #run catalog backup every time #ret, stdout, stderr = run_bareos_cmd("run BackupCatalog level=Full{0}\n\n".format(when)) #if ret != 0: # raise Exception(u'ERREUR : de sauvegarde du catalogue {0} : {1}, {2}'.format(level, stdout, stderr)) return "Sauvegarde {0} lancée\nVous pouvez suivre son évolution dans le fichier {1}".format(level, FILE_LOG_BACKUP) def not_running_jobs(): return bareos_query('running_jobs')[0]['NB'] == '0' def update_available(): query_auto = run(['Query-Auto'], check=True, stdout=PIPE) return query_auto.stdout.decode().strip().split('\n')[-1] != 'Aucun paquet à installer.' # # kwargs['task_instance'].xcom_push('update_available', status) # return True def maj_auto(**kwargs): status_update_available = kwargs['task_instance'].xcom_pull('sensor_update_available', key='return_value') print(status_update_available, type(status_update_available)) if not status_update_available: raise AirflowSkipException() run(['Maj-Auto'], check=True) run(['reconfigure'], check=True) return "Maj-Auto OK" sensor_backup_not_running = PythonSensor(task_id='sensor_backup_not_running', python_callable=not_running_jobs, poke_interval=30, dag=dag) pre_mysql = BashOperator( task_id='pre_mysql', bash_command='/usr/share/eole/schedule/scripts/mysql', dag=dag, ) pre_queryauto = BashOperator( task_id='pre_queryauto', bash_command='/usr/share/eole/schedule/scripts/queryauto', dag=dag, ) launch_backup = PythonOperator(task_id='launch_backup', python_callable=run_backup, dag=dag) sensor_backup_ended = PythonSensor(task_id='sensor_backup_ended', python_callable=not_running_jobs, poke_interval=30, dag=dag) sensor_update_available = PythonOperator(task_id='sensor_update_available', python_callable=update_available, dag=dag) maj_auto = PythonOperator(task_id='maj_auto', python_callable=maj_auto, dag=dag, provide_context=True) dummy_task = DummyOperator(task_id='end', dag=dag) sensor_backup_not_running >> [pre_mysql, pre_queryauto] >> launch_backup >> sensor_backup_ended >> sensor_update_available >> maj_auto >> dummy_task