lemur/lemur/logs/service.py

94 lines
2.5 KiB
Python
Raw Permalink Normal View History

"""
.. module: lemur.logs.service
:platform: Unix
:synopsis: This module contains all of the services level functions used to
administer logs in Lemur
:copyright: (c) 2018 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
2021-01-28 04:10:13 +01:00
from flask import current_app, g
from lemur import database
from lemur.logs.models import Log
from lemur.users.models import User
from lemur.certificates.models import Certificate
def create(user, type, certificate=None):
"""
Creates logs a given action.
:param user:
:param type:
:param certificate:
:return:
"""
2019-05-16 16:57:02 +02:00
current_app.logger.info(
"[lemur-audit] action: {0}, user: {1}, certificate: {2}.".format(
type, user.email, certificate.name
)
)
view = Log(user_id=user.id, log_type=type, certificate_id=certificate.id)
database.add(view)
database.commit()
2021-01-28 04:10:13 +01:00
def audit_log(action, entity, message):
"""
Logs given action
:param action: The action being logged e.g. assign_role, create_role etc
:param entity: The entity undergoing the action e.g. name of the role
:param message: Additional info e.g. Role being assigned to user X
:return:
"""
2021-01-29 02:56:12 +01:00
user = g.current_user.email if hasattr(g, 'current_user') else "LEMUR"
2021-01-28 04:10:13 +01:00
current_app.logger.info(
2021-01-29 02:59:35 +01:00
f"[lemur-audit] action: {action}, user: {user}, entity: {entity}, details: {message}"
2021-01-28 04:10:13 +01:00
)
def get_all():
"""
Retrieve all logs from the database.
:return:
"""
query = database.session_query(Log)
return database.find_all(query, Log, {}).all()
def render(args):
"""
Helper that paginates and filters data when requested
through the REST Api
:param args:
:return:
"""
query = database.session_query(Log)
2019-05-16 16:57:02 +02:00
filt = args.pop("filter")
if filt:
2019-05-16 16:57:02 +02:00
terms = filt.split(";")
2019-05-16 16:57:02 +02:00
if "certificate.name" in terms:
sub_query = database.session_query(Certificate.id).filter(
Certificate.name.ilike("%{0}%".format(terms[1]))
)
query = query.filter(Log.certificate_id.in_(sub_query))
2019-05-16 16:57:02 +02:00
elif "user.email" in terms:
sub_query = database.session_query(User.id).filter(
User.email.ilike("%{0}%".format(terms[1]))
)
query = query.filter(Log.user_id.in_(sub_query))
else:
query = database.filter(query, Log, terms)
return database.sort_and_page(query, Log, args)