Files
CD73/patches.d/scribe-backend.patch
2025-08-18 16:35:48 +02:00

578 lines
23 KiB
Diff

diff --git usr/bin/importation_scribe usr/bin/importation_scribe
index 0db4982..290d66e 100755
--- usr/bin/importation_scribe
+++ usr/bin/importation_scribe
@@ -42,21 +42,21 @@ choix de la source de données et imports
- personnels administratifs
- comptes invités
"""
import sys
from os import environ, getcwd, chdir
from os.path import isfile, dirname
from pyeole.process import system_out, system_code
from scribe.storage import init_store
from scribe.eoleldap import Ldap
-from scribe.ldapconf import SUPPORT_ETAB
+from scribe.ldapconf import SUPPORT_ETAB, BRANCHE_ETAB, PROF_FILTER
from scribe.eoletools import nscd_start, nscd_stop
from scribe.parsing import sconet, aaf, be1d, scribecsv2
from scribe.importation import preferences, writer, config
from scribe.importation import log
#______________________________________________________________________________
# utilitaires de manipulation de la console
class OutOfRange(Exception):
""" Exception OutOfRange """
@@ -458,33 +458,44 @@ class Console:
"""
log.add_lock()
log.debuglog("Arrêt de LSC...", title=True)
nscd_stop()
connexion = Ldap()
connexion.connect()
if SUPPORT_ETAB:
prefs = preferences.get_enseignants_prefs()
etab = prefs.get_default('etab')
etab_prefix = prefs.get_default('etab_prefix')
+ branche_etab = BRANCHE_ETAB % {'etab': etab}
+ purge_option = 'keep'
+ old_logins = connexion._search('(&{})'.format(PROF_FILTER), 'uid', suffix=branche_etab)
else:
etab = None
etab_prefix = ''
if self.import_type != 'maj':
writer.purge_equipes(connexion=connexion, etab=etab)
writer.verify_classe(store=self.store, connexion=connexion,
etab_prefix=etab_prefix)
writer.write_matiere(store=self.store, connexion=connexion,
etab=etab, etab_prefix=etab_prefix)
writer.verify_option(store=self.store, connexion=connexion,
etab_prefix=etab_prefix)
- writer.write_enseignant(store=self.store, connexion=connexion,
+ logins = writer.write_enseignant(store=self.store, connexion=connexion,
etab=etab)
+ if SUPPORT_ETAB:
+ old_dns = [login[0] for login in old_logins if login[1]['uid'] not in logins]
+ for old_dn in old_dns:
+ connexion._delete(old_dn)
+ if prefs.get_default('backup') == 'oui':
+ target = f'/home/backup/{etab}/'
+ print(f'copie dans {target}')
+
if self.data_type in ['sconet', 'aaf']:
writer.write_service(store=self.store, connexion=connexion,
etab=etab, etab_prefix=etab_prefix)
writer.write_administratif(store=self.store, connexion=connexion,
etab=etab)
writer.write_samba(connexion)
connexion.close()
log.debuglog("Démarrage de LSC...", title=True)
nscd_start()
log.del_lock()
diff --git usr/lib/python3/dist-packages/scribe/eoleldap.py usr/lib/python3/dist-packages/scribe/eoleldap.py
index 45ec338..45c82a4 100644
--- usr/lib/python3/dist-packages/scribe/eoleldap.py
+++ usr/lib/python3/dist-packages/scribe/eoleldap.py
@@ -8,21 +8,21 @@
# eoleldap.py
#
# librairie pour la connexion à un serveur ldap
#
###########################################################################
"""
Librairie Ldap pour Scribe
"""
import sys
from .ldapconf import SUFFIX, ROOT_DN, USER_FILTER, GROUP_FILTER, SHARE_FILTER, \
- SUPPORT_ETAB, ldap_server, ldap_passwd, num_etab, BRANCHE_GROUP_ETAB, LDAP_MODE, acad
+ SUPPORT_ETAB, ldap_server, ldap_passwd, num_etab, BRANCHE_GROUP_ETAB, BRANCHE_ETAB, LDAP_MODE, acad
from scribe.errors import LdapExistingGroup, LdapExistingUser, \
SystemExistingUser, NiveauNotFound
from .eoletools import to_list
import ldap
from ldap import SCOPE_ONELEVEL
def is_system_user(user):
"""
indique si le login proposé est déjà un utilisateur système
@@ -120,73 +120,81 @@ class _Ldap(object):
class _LdapEntry(object):
"""
classe de base pour gérer les entrées ldap
"""
def __init__(self, serveur=None, passwd=None):
self.serveur = serveur
self.passwd = passwd
self.ldap_admin = Ldap(serveur, passwd)
self.cache_etab = {'login': {}, 'group': {}}
- def _is_group(self, name):
+ def _is_group(self, name, etab=None):
"""
test si le groupe existe dans l'annuaire
"""
cnfilter = "(&%s(cn=%s))" % (GROUP_FILTER, name)
- if self.ldap_admin._search_one(cnfilter):
+ if etab:
+ branche_etab = BRANCHE_ETAB % {'etab': etab}
+ else:
+ branche_etab = None
+ if self.ldap_admin._search_one(cnfilter, suffix=branche_etab):
return True
return False
- def _is_user(self, name):
+ def _is_user(self, name, etab=None):
"""
test si l'utilisateur existe dans l'annuaire
"""
uidfilter = "(&%s(uid=%s))" % (USER_FILTER, name)
- if self.ldap_admin._search_one(uidfilter):
+ if etab:
+ branche_etab = BRANCHE_ETAB % {'etab': etab}
+ else:
+ branche_etab = None
+ if self.ldap_admin._search_one(uidfilter, suffix=branche_etab):
return True
return False
def _is_share(self, name):
"""
test si le partage existe dans l'annuaire
"""
shfilter = "(&%s(sambaShareName=%s))" % (SHARE_FILTER, name)
if self.ldap_admin._search_one(shfilter):
return True
return False
def is_available_name(self, name):
self.ldap_admin.connect()
res = self._is_available_name(name)
self.ldap_admin.close()
return res
- def _is_available_name(self, name):
+ def _is_available_name(self, name, etab=None):
"""
teste la disponibilité d'un uid ou un cn
"""
- if self._is_group(name):
+ if self._is_group(name, etab=etab):
return False
- elif self._is_user(name):
+ elif self._is_user(name, etab=etab):
return False
elif is_system_user(name):
return False
return True
- def _test_available_name(self, name):
+ def _test_available_name(self, name, etab=None):
"""
Test la disponibilité d'un nom
raise une exception si pas disponible
"""
- if self._is_group(name):
+ if self._is_group(name, etab=etab):
raise LdapExistingGroup
- elif self._is_user(name):
+ elif self._is_user(name, etab=etab):
raise LdapExistingUser
elif is_system_user(name):
raise SystemExistingUser
return True
def get_niveau(self, classe):
"""
Retourne le niveau associé à la classe
"""
self.ldap_admin.connect()
diff --git usr/lib/python3/dist-packages/scribe/eoleuser.py usr/lib/python3/dist-packages/scribe/eoleuser.py
index 05569fd..b4a4fb8 100644
--- usr/lib/python3/dist-packages/scribe/eoleuser.py
+++ usr/lib/python3/dist-packages/scribe/eoleuser.py
@@ -259,26 +259,26 @@ class User(LdapEntry):
Ajoute un utilisateur
**args
"""
self.filter_must_args(args)
self.filter_may_args(args)
#force login in lower case #33178
login = args['login'].lower()
args['login'] = login
if match("^[a-zA-Z0-9.\-_]*$", login) is None:
raise BadLogin("Login \"%s\" invalide !" % login)
- self._test_available_name(login)
args.setdefault('etab', None)
if args['etab'] is None and 'classe' in args:
args['etab'] = self.get_etab_from_group(args['classe'])
if args['etab'] == num_etab:
args['etab'] = None
+ self._test_available_name(login, etab=args['etab'])
self.cache_etab['login'][login] = args['etab']
# FIXME : fixes #327 mais est-ce le bon endroit ?
if tool.not_empty(args, 'date'):
args['date'] = tool.deformate_date(args['date'])
if 'exp_date' in args: # Can be jj/mm/aaaa or empty string
if LDAP_MODE == 'openldap':
args['exp_unix_days'] = str(tool.get_days_from_epoch(args['exp_date'])) if args['exp_date'] else '-1'
else:
args['exp_nt_time'] = str(tool.get_nt_time_from_date(args['exp_date'])) if args['exp_date'] else '0'
args.pop('exp_date')
@@ -536,31 +536,32 @@ class User(LdapEntry):
data = [((MOD_REPLACE, attribut, value))]
self.ldap_admin._modify(user_dn, data)
def delete(self, login, remove_data=False, delete_resp=False):
"""
supprime un utilisateur
"""
self._delete(login, remove_data=remove_data, need_connect=True,
delete_resp=delete_resp)
- def _delete(self, login, remove_data=False, need_connect=False, delete_resp=False):
+ def _delete(self, login, remove_data=False, need_connect=False, delete_resp=False, etab=None):
"""
supprime un utilisateur
"""
if self.has_samba:
quota.set_quota(login, '0')
if remove_data:
cmd = ['/usr/sbin/smbldap-userdel', '-r', login]
else:
cmd = ['/usr/sbin/smbldap-userdel', login]
- etab = self.get_etab(login)
+ if etab is None:
+ etab = self.get_etab(login)
force_dn = {'groupsdn="ou=local,ou=Groupes,${etab},${suffix}"':
'groupsdn="${suffix}"'}
tool.launch_smbldap_tool(cmd, num_etab, etab, force_dn=force_dn)
# gestion des données résiduelles
if remove_data:
ad_user_dir = join(AD_HOME_PATH, login)
rmtree(ad_user_dir, ignore_errors=True)
perso = join(HOME_PATH, login[0], login)
# code uniquement pour AmonEcole (#33013)
if islink(perso):
diff --git usr/lib/python3/dist-packages/scribe/importation/preferences.py usr/lib/python3/dist-packages/scribe/importation/preferences.py
index fdcb142..fe11e3b 100644
--- usr/lib/python3/dist-packages/scribe/importation/preferences.py
+++ usr/lib/python3/dist-packages/scribe/importation/preferences.py
@@ -141,20 +141,24 @@ PREF_MAIL = ['mail', 'liste',
('perso_internet', 'adresse fournie ou domaine Internet'),
('perso_aucune', 'adresse fournie ou aucune'),
('restreint', 'adresse locale, domaine restreint'),
('internet', 'adresse locale, domaine Internet'),
('aucune', 'aucune adresse'),
)]
PREF_PREFIX_ETAB = ['etab_prefix', 'texte',
"Préfixe des groupes de l'établissement", ""]
+PREF_PURGE = ['backup', 'liste',
+ "Sauvegarde des données des utilisateurs supprimés",
+ (('oui', 'oui'), ('non', 'non'))]
+
if dico.get('activer_nfs', 'non') == 'oui' or 'interface_client_ltsp' in dico:
DEFAULT_CHANGE_PWD = 'non'
DEFAULT_SHELL = 'oui'
else:
DEFAULT_CHANGE_PWD = 'oui'
DEFAULT_SHELL = 'non'
def get_etab():
"""
charge les établissements à chaque fois qu'on le demande
@@ -211,20 +215,21 @@ def get_responsables_prefs():
return Preferences(prefs_resp, PREF_FILES['responsable'])
def get_enseignants_prefs():
""" préférences pour les enseignants """
prefs_ens = [ Preference(default='0', *PREF_QUOTA),
Preference(*PREF_LOGIN),
Preference(default=DEFAULT_SHELL, *PREF_SHELL),
Preference(default='1', *PREF_PROFIL),
Preference(default='perso_aucune', *PREF_MAIL),
+ Preference(default='oui', *PREF_PURGE),
]
if FORCED_PASSWORD_MODIFICATION_ALLOWED:
prefs_ens.insert(2, Preference(default=DEFAULT_CHANGE_PWD, *PREF_CHANGE_PWD))
if SUPPORT_ETAB:
prefs_ens.insert(0, Preference(*get_etab()))
prefs_ens.insert(1, Preference(*PREF_PREFIX_ETAB))
prefs_ens.append(Preference(*get_etabprefix()))
return Preferences(prefs_ens, PREF_FILES['enseignant'])
def get_administratifs_prefs():
diff --git usr/lib/python3/dist-packages/scribe/importation/writer.py usr/lib/python3/dist-packages/scribe/importation/writer.py
index 34ce0fb..08716e0 100644
--- usr/lib/python3/dist-packages/scribe/importation/writer.py
+++ usr/lib/python3/dist-packages/scribe/importation/writer.py
@@ -58,37 +58,37 @@ def _gen_mail(pref, mail):
"""
if pref.startswith('perso') and '@' in mail:
return mail
elif 'internet' in pref:
return 'internet'
elif 'restreint' in pref:
return 'restreint'
else:
return ''
-def _gen_new_login(user, pref, prenom='', nom='', force_login=''):
+def _gen_new_login(user, pref, prenom='', nom='', force_login='', etab=None):
"""
génération d'un login unique
@user : objet compatible LdapUser()
@pref : standard/pnom/nomp/p.nnn/prenom.n
@prenom : prénom de l'utilisateur
@nom : nom de l'utilisateur
@force_login : login forcé
"""
if force_login != '':
login = base_login = force_login
else:
login = base_login = gen_login(pref, prenom, nom)
num = 1
# vérification de la disponibilité de l'identifiant
# si non disponible, il est suffixé d'un numéro
- while not user._is_available_name(login):
+ while not user._is_available_name(login, etab=etab):
login = "%s%d" % (base_login, num)
num += 1
if login == '' or login.isdigit():
raise BadLogin("""Login "%s" invalide""" % login)
return login
def _sync_passwords(user, new_passwords, change_pwd=False):
"""
synchronisation des mots de passe
@user : objet compatible LdapUser()
@@ -769,25 +769,25 @@ def write_responsable(store, connexion, current_ead_user=config.DEFAULT_USER):
# -------------------- enseignants -------------------- #
def _new_enseignant(enseignant, user, prefs, etab=None, new_passwords=[]):
"""
traitement d'un nouveau enseignant (création)
enseignant : store.Enseignant()
user : eoleuser.Enseignant()
"""
if enseignant.force_login:
login = _gen_new_login(user, prefs.get_default('login'),
- force_login=str(enseignant.force_login))
+ force_login=str(enseignant.force_login), etab=etab)
else:
login = _gen_new_login(user, prefs.get_default('login'),
str(enseignant.prenom),
- str(enseignant.nom))
+ str(enseignant.nom), etab=etab)
if enseignant.force_password:
password = str(enseignant.force_password)
else:
password = _gen_password(prefs.get_default('gen_pwd'), str(enseignant.date))
log.log.debug("nouvel enseignant : %s" % login)
classe = []
groups = []
for joint in enseignant.get_classes():
groups.append('profs-%s' % str(joint.classe.nom))
if joint.profprincipal:
@@ -834,23 +834,20 @@ def _maj_enseignant(enseignant, user, login, etab, administratif=False):
enseignant : store.Enseignant()
user : eoleuser.Enseignant()
login : uid de l'utilisateur dans ldap
administratif : personnel administratif avec un compte enseignant
"""
log.log.debug("maj de %s" % login)
classe = []
groups = []
# attention : des administratifs peuvent avoir un compte enseignant
if isinstance(enseignant, Enseignant):
- old_etab = user.get_etab(login)
- if old_etab != etab:
- user._change_etab(login, old_etab, etab)
for joint in enseignant.get_classes():
groups.append('profs-%s' % str(joint.classe.nom))
if joint.profprincipal:
classe.append(str(joint.classe.nom))
for matiere in enseignant.get_matieres():
groups.append(str(matiere.nom))
for option in enseignant.get_groupes():
groups.append('profs-%s' % str(option.nom))
disciplines = eval(enseignant.disciplines)
else:
@@ -874,64 +871,67 @@ def _maj_enseignant(enseignant, user, login, etab, administratif=False):
else:
info = "%s;%s;%s;%s" % (str(enseignant.nom), str(enseignant.prenom),
login, ATTRIB)
log.write_info(info, config.ENS_INFO)
def write_enseignant(store, connexion, etab=None, current_ead_user=config.DEFAULT_USER):
"""
insertion des enseignants
"""
num = 0
+ logins = []
log.infolog("Intégration des enseignants...", title=True)
log.write_header(config.ENS_HEADER, config.ENS_INFO)
user = LdapEnseignant()
user.ldap_admin = connexion
prefs = preferences.get_enseignants_prefs()
quota = prefs.get_default('quota')
if FORCED_PASSWORD_MODIFICATION_ALLOWED:
change_pwd = prefs.get_default('change_pwd') == 'oui'
else:
change_pwd = False
new_passwords = []
for enseignant in store.query(Enseignant):
if enseignant.force_login:
# login forcé
if user._is_enseignant(str(enseignant.force_login)):
login = str(enseignant.force_login)
else:
login = ''
else:
- login = _enseignant_exists(enseignant, user)
+ login = _enseignant_exists(enseignant, user, etab=etab)
if login != '':
# enseignant existant
- _maj_enseignant(enseignant, user, login, etab)
+ _maj_enseignant(enseignant, user, login, etab=etab)
else:
# nouvel enseignant
if str(enseignant.nom) == '' or str(enseignant.prenom) == '':
log.infolog("Enseignant n°%s invalide" % str(enseignant.int_id))
continue
try:
login = _new_enseignant(enseignant, user, prefs, etab=etab, new_passwords=new_passwords)
except BadLogin as message:
log.infolog(str(message))
continue
# enregistrement du login attribué
enseignant.login = str(login)
+ logins.append(enseignant.login)
num += 1
if num % config.DEBUG_NUM == 0:
log.debuglog("%d enseignants traités..." % num)
if EOLE_AD:
_sync_passwords(user, new_passwords, change_pwd=change_pwd)
_create_dirs(user, quota, new_passwords)
log.infolog("TOTAL : %d enseignants" % num)
if num != 0:
log.copy_info(config.ENS_INFO, user=current_ead_user)
+ return logins
# -------------------- administratifs -------------------- #
def _new_administratif(administratif, user, prefs, etab=None, new_passwords=[]):
"""
traitement d'un nouvel administratif (création)
administratif : store.Administratif()
user : eoleuser.Administratif()
"""
@@ -1020,21 +1020,21 @@ def write_administratif(store, connexion, etab=None, current_ead_user=config.DEF
if user._is_administratif(str(administratif.force_login)):
login = str(administratif.force_login)
else:
login = ''
else:
login = _administratif_exists(administratif, user)
if login != '':
# personnel existe
_maj_administratif(administratif, user, login)
else:
- login = _enseignant_exists(administratif, user)
+ login = _enseignant_exists(administratif, user, etab)
if login != '':
# le personnel a été crée comme un professeur...
log.infolog("(%s a un compte enseignant)" % login)
ldap_ens = LdapEnseignant()
ldap_ens.ldap_admin = connexion
_maj_enseignant(administratif, ldap_ens, login, etab, administratif=True)
else:
# nouveau personnel
if str(administratif.nom) == '' or str(administratif.prenom) == '':
log.infolog("Administratif n°%s invalide" % str(administratif.int_id))
diff --git usr/lib/python3/dist-packages/scribe/linker.py usr/lib/python3/dist-packages/scribe/linker.py
index 4cc6082..94099e9 100644
--- usr/lib/python3/dist-packages/scribe/linker.py
+++ usr/lib/python3/dist-packages/scribe/linker.py
@@ -3,21 +3,21 @@
# Eole NG - 2009
# Copyright Pole de Competence Eole (Ministere Education - Academie Dijon)
# Licence CeCill cf /root/LicenceEole.txt
# eole@ac-dijon.fr
###########################################################################
"""
recherche les liens entre les utilisateurs importes
et l'annuaire ldap
"""
from scribe.ldapconf import ELEVE_FILTER, PROF_FILTER, ADMINISTRATIF_FILTER, \
-RESPONSABLE_FILTER, USER_FILTER, AUTRE_FILTER, LDAP_MODE
+RESPONSABLE_FILTER, USER_FILTER, AUTRE_FILTER, LDAP_MODE, BRANCHE_ETAB
from scribe.eoletools import deformate_date, strip_adresse
from scribe.eoleldap import Ldap
def _eleve_exists(eleve, user):
"""
recherche si un élève existe déjà dans l'annuaire
eleve : storage.Eleve()
user : eoleuser.Eleve()
"""
filtres = []
@@ -96,21 +96,21 @@ def _responsable_exists(responsable, user):
if responsable.mail and res.get('mailPerso', [''])[0] == str(responsable.mail):
# mail + nom + prenom (#6061) mais mail + nom => mauvaise idée (#4191)
return res['uid'][0]
if responsable.adresse.adresse and \
strip_adresse(res.get('ENTPersonAdresse', [''])[0]) == \
strip_adresse(str(responsable.adresse.adresse)):
# adresse + nom + prenom (#6934)
return res['uid'][0]
return ''
-def _enseignant_exists(enseignant, user):
+def _enseignant_exists(enseignant, user, etab=None):
"""
recherche si un enseignant existe déjà dans l'annuaire
enseignant : storage.Enseignant()
user : eoleuser.Enseignant()
"""
filtres = []
if enseignant.date and enseignant.date != '01/01/0001':
date = deformate_date(str(enseignant.date))
if enseignant.int_id:
# date + id interne
@@ -126,22 +126,26 @@ def _enseignant_exists(enseignant, user):
# mail (federation) + nom
filtres.append("(&%s(FederationKey=%s)(sn=%s))" % (PROF_FILTER,
str(enseignant.mail), str(enseignant.nom) ))
if enseignant.int_id:
# nom + id interne
filtres.append("(&%s(sn=%s)(intid=%s))" % (PROF_FILTER,
str(enseignant.nom), str(enseignant.int_id) ))
# beurk (homonymes) -> à améliorer
#filtres.append("(&%s(cn=%s %s)(objectClass=enseignant))" % (USER_FILTER,
# str(enseignant.prenom), str(enseignant.nom)))
+ if etab:
+ branche_etab = BRANCHE_ETAB % {'etab': etab}
+ else:
+ branche_etab = None
for filtre in filtres:
- res = user.ldap_admin._search_one(filtre, 'uid')
+ res = user.ldap_admin._search_one(filtre, 'uid', suffix=branche_etab)
if res != {}:
return res['uid'][0]
return ''
def _administratif_exists(administratif, user):
"""
recherche si un administratif existe déjà dans l'annuaire
administratif : storage.Administratif()
user : eoleuser.Administratif()
"""