Overall linting #3

Merged
alexandre merged 13 commits from linting into master 2021-12-06 21:29:02 +01:00
8 changed files with 261 additions and 92 deletions

View File

@@ -8,7 +8,42 @@ platform:
arch: amd64 arch: amd64
steps: steps:
- name: docker - name: Install packages
image: python
commands:
- pip install -r requirements.txt
volumes:
- name: pip_cache
path: /root/.cache/pip
- name: Lint - pylint
image: python
commands:
- pip install -r requirements.txt
- pylint --ignore-paths app/config.py wsgi.py app/*.py
volumes:
- name: pip_cache
path: /root/.cache/pip
- name: Lint - flake8
image: python
commands:
- pip install -r requirements.txt
- flake8 wsgi.py app/*.py
volumes:
- name: pip_cache
path: /root/.cache/pip
- name: Security analysis with bandit
image: python
commands:
- pip install -r requirements.txt
- bandit -r app/
volumes:
- name: pip_cache
path: /root/.cache/pip
- name: Publish the image
image: plugins/docker image: plugins/docker
settings: settings:
repo: registry.alxczl.fr/ldap-interface repo: registry.alxczl.fr/ldap-interface
@@ -21,3 +56,8 @@ steps:
when: when:
branch: branch:
- master - master
volumes:
- name: pip_cache
host:
path: /tmp/drone/cache/pip_cache

1
.tag Normal file
View File

@@ -0,0 +1 @@
1.0.0

View File

@@ -1,8 +1,21 @@
"""
Module that initializes the app
"""
from flask import Flask from flask import Flask
from . import password, config from . import password, config
def create_app():
app = Flask(__name__, template_folder="ui/templates", static_folder="ui/static") def create_app() -> Flask:
"""
Creates the Flask app object
Returns:
Flask: App's object
"""
app = Flask(
__name__,
template_folder="ui/templates",
static_folder="ui/static")
app.config.from_object(config.ProductionConfig()) app.config.from_object(config.ProductionConfig())
app.register_blueprint(password.bp) app.register_blueprint(password.bp)

View File

@@ -1,6 +1,10 @@
"""
A module that is used to store the current app's config
"""
import os import os
class Config(object):
class Config():
SECRET_KEY = os.environ.get("SECRET_KEY") SECRET_KEY = os.environ.get("SECRET_KEY")
if SECRET_KEY is None: if SECRET_KEY is None:
raise Exception("[!!] No secret key was given") raise Exception("[!!] No secret key was given")
@@ -21,11 +25,13 @@ class Config(object):
SESSION_COOKIE_HTTPONLY = True SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'None' SESSION_COOKIE_SAMESITE = 'None'
class DevelopmentConfig(Config): class DevelopmentConfig(Config):
TESTING = True TESTING = True
DEBUG = True DEBUG = True
DEVELOPMENT = True DEVELOPMENT = True
class ProductionConfig(Config): class ProductionConfig(Config):
TESTING = False TESTING = False
DEBUG = False DEBUG = False

View File

@@ -1,60 +1,110 @@
import ldap3 """
Module used to connect to the target LDAP server
"""
from typing import Tuple from typing import Tuple
from dataclasses import dataclass
import ldap3
@dataclass
class ClientParams:
"""
Parameters for the Client class
"""
address: str
base_dn: str
port: int = 389
tls: bool = False
primary_attribute: str = "uid"
class Client(): class Client():
def __init__(self, address: str, port: int, base_dn: str, primary_attribute: str = "uid", tls: bool = False): """
self.server = ldap3.Server(host=address, port=port, use_ssl=tls) Class that represents a connection to an LDAP server.
self.base_dn = base_dn
self.address = address Attributes:
self.port = port server (ldap3.Connection): An ldap3.Server object
self.tls = tls base_dn (str): The target base DN
self.primary_attribute = primary_attribute address (str): The target server's address
port (int): The target server's port
tls (bool): A boolean to indicate TLS usage
primary_attribute (str): The clients' LDAP primary attributes
link (ldap3.Connection): Represents the current link status
"""
def __init__(self, params: ClientParams):
self.server = ldap3.Server(
host=params.address,
port=params.port,
use_ssl=params.tls)
self.base_dn = params.base_dn
self.address = params.address
self.port = params.port
self.tls = params.tls
self.primary_attribute = params.primary_attribute
self.link = None
def bind(self, user: str, bind_passwd: str) -> Tuple[bool, str]: def bind(self, user: str, bind_passwd: str) -> Tuple[bool, str]:
"""
Binds to the target server.
Args:
user (str): The target username
bind_passwd (str): The target user's password
Returns:
(bool, str): (True, "<user-dn>") if the bind was successful
(False, "") if it wasn't.
"""
user_dn = f"{self.primary_attribute}={user},{self.base_dn}" user_dn = f"{self.primary_attribute}={user},{self.base_dn}"
self.link = ldap3.Connection(self.server, user=user_dn, password=bind_passwd) self.link = ldap3.Connection(self.server,
user=user_dn,
password=bind_passwd)
try: status = self.link.bind()
status = self.link.bind() if status is False:
except Exception as _: print(f"[!!] Could not bind {user_dn} to the LDAP directory: "
status = False f"{self.link.last_error}")
return (False, "")
if status == False:
print(f"[!!] Could not bind {user_dn} to the LDAP directory: {self.link.last_error}")
return (status, "")
return (status, user_dn) return (status, user_dn)
def unbind(self) -> bool: def unbind(self) -> bool:
if self.link.bound != True: """
Unbinds and disconnect from the server.
Returns:
bool: True if the operation was successful, False if it wasn't.
"""
if self.link.bound is False:
return False return False
try: return self.link.unbind()
self.link.unbind()
except Exception as e:
pass
return True
def change_pwd(self, user_dn: str, new_password: str) -> bool: def change_pwd(self, user_dn: str, new_password: str) -> bool:
if self.link.bound == False: """
Changes a specific user's DN.
Args:
user_dn (str): The target user's DN
new_password (str): The wanted password
Returns:
bool: True if the operation was successful, False if it wasn't.
"""
if self.link.bound is False:
print("[!!] Can't change the password: not bound to the server") print("[!!] Can't change the password: not bound to the server")
return False return False
status = self.link.modify(user_dn, {'userPassword': [(ldap3.MODIFY_REPLACE, [new_password])]}) status = self.link.modify(
if status == True: user_dn, {
'userPassword': [
(ldap3.MODIFY_REPLACE, [new_password])]})
if status:
print(f"[++] Changed password of user {user_dn}") print(f"[++] Changed password of user {user_dn}")
else: else:
print(f"[!!] Could not change password of user {user_dn}: {self.link.last_error}") print(
f"[!!] Could not change password of user {user_dn}: "
f"{self.link.last_error}")
return status return status
if __name__ == "__main__":
client = Client("dc01.lan.alxczl.fr", 636, "cn=users,cn=accounts,dc=lan,dc=alxczl,dc=fr", True)
client_dn = "uid=alexandre,cn=users,cn=accounts,dc=lan,dc=alxczl,dc=fr"
res = client.bind(client_dn, "Getshrektm8")
if res[0] == False:
print(client.link.result["description"])
#client.link.unbind()

View File

@@ -1,79 +1,126 @@
from . import ldap_client """
Module that represents the /password route
"""
from flask import ( from flask import (
Blueprint, render_template, flash, Blueprint,
render_template,
flash,
current_app current_app
) )
from flask_wtf import FlaskForm from flask_wtf import (
FlaskForm
)
from wtforms import ( from wtforms import (
StringField, PasswordField, StringField,
SubmitField, EmailField PasswordField,
SubmitField,
EmailField
) )
from wtforms.validators import ( from wtforms.validators import (
ValidationError, DataRequired, ValidationError,
EqualTo, Length, Regexp, Email DataRequired,
EqualTo,
Length,
Regexp,
Email
) )
from . import (
ldap_client
)
bp = Blueprint('password', __name__, url_prefix='/password') bp = Blueprint('password', __name__, url_prefix='/password')
class ChangePasswordForm(FlaskForm): class ChangePasswordForm(FlaskForm):
"""
A Flask form that asks users about various informations needed
to change their password.
"""
# Minimal password length # Minimal password length
minlength = 9 minlength = 9
# Form # Form
username = StringField(label=('Login'), username = StringField(label=('Login'),
validators=[DataRequired(), validators=[DataRequired(),
Length(max=64)], Length(max=64)],
render_kw={"onkeyup": "validate_username()"}) render_kw={"onkeyup": "validate_username()"})
currentpassword = PasswordField(label=('Current password'), currentpassword = PasswordField(label=('Current password'),
validators=[DataRequired()]) validators=[DataRequired()])
newpassword = PasswordField(label=('New password'), newpassword = PasswordField(label=('New password'),
validators=[DataRequired(), validators=[DataRequired(),
Length(min=minlength), Length(min=minlength),
Regexp("^(?=.*[a-z])"), Regexp("^(?=.*[a-z])"),
Regexp("^(?=.*[A-Z])"), Regexp("^(?=.*[A-Z])"),
Regexp("^(?=.*\\d)"), Regexp("^(?=.*\\d)"),
#Regexp( ],
# "(?=.*[@$!%*#?&])", message="Password must contain a special character" render_kw={"onkeyup": "validate_username_form"
#),], f"({minlength})"})
],
render_kw={"onkeyup": f"validate_username_form({minlength})"})
confirm_password = PasswordField( confirm_password = PasswordField(
label=('Confirm Password'), label=('Confirm Password'),
validators=[DataRequired(message='* Required'), validators=[DataRequired(message='* Required'),
EqualTo('newpassword')], EqualTo('newpassword')],
render_kw={"onkeyup": f"validate_username_form({minlength})"}) render_kw={"onkeyup": f"validate_username_form({minlength})"})
submit = SubmitField(label=('Change my password'), render_kw={"disabled": "true", submit = SubmitField(
"onclick": f"validate_username_form({minlength})"}) label=('Change my password'),
render_kw={
"disabled": "true",
"onclick": f"validate_username_form({minlength})"})
# Validators # Validators
def validate_username(self, username): def validate_username(self):
"""
A validation function for the username input field
"""
excluded_chars = " *?!'^+%&/()=}][{$#;\\\"" excluded_chars = " *?!'^+%&/()=}][{$#;\\\""
for char in self.username.data: for char in self.username.data:
if char in excluded_chars: if char in excluded_chars:
raise ValidationError( raise ValidationError(
f"Character {char} is not allowed in an username.") f"Character {char} is not allowed in an username.")
class ResetPasswordForm(FlaskForm): class ResetPasswordForm(FlaskForm):
"""
A Flask form that asks users about their email, used to
reset their passwords.
"""
email = EmailField(label=('Email address'), email = EmailField(label=('Email address'),
validators=[DataRequired(), Email()], validators=[DataRequired(), Email()],
render_kw={"onkeyup": f"validate_email()"}) render_kw={"onkeyup": "validate_email()"})
submit = SubmitField(
label=('Change my password'),
render_kw={
"disabled": "true",
"onclick": "validate_email()"})
submit = SubmitField(label=('Change my password'), render_kw={"disabled": "true",
"onclick": f"validate_email()"})
@bp.route('/change', methods=["GET", "POST"]) @bp.route('/change', methods=["GET", "POST"])
def change(): def change():
"""
The /password/change route method
"""
form = ChangePasswordForm() form = ChangePasswordForm()
if form.validate_on_submit(): if form.validate_on_submit():
client = ldap_client.Client(address=current_app.config["LDAP_ADDR"], port=current_app.config["LDAP_PORT"], base_dn=current_app.config["BASE_DN"], tls=current_app.config["LDAP_TLS"]) ldap_params = ldap_client.ClientParams(
address=current_app.config["LDAP_ADDR"],
port=current_app.config["LDAP_PORT"],
base_dn=current_app.config["BASE_DN"],
tls=current_app.config["LDAP_TLS"])
client = ldap_client.Client(ldap_params)
bind_status = client.bind(form.username._value(), form.currentpassword._value()) bind_status = client.bind(
if bind_status[0] == False: form.username.data, form.currentpassword.data)
flash(f"Connection failed, are you sure that your login and password are correct ? ({client.link.last_error})")
elif client.change_pwd(bind_status[1], form.newpassword._value()) == False: if bind_status[0] is False:
flash(f"An error occured and your password was not changed, sorry. ({client.link.last_error})") flash("Connection failed, are you sure that your login and"
f" password are correct ? ({client.link.last_error})")
elif client.change_pwd(bind_status[1],
form.newpassword.data) is False:
flash("An error occured and your password was not changed, sorry."
f"({client.link.last_error})")
client.unbind() client.unbind()
else: else:
flash('Your password has been changed !') flash('Your password has been changed !')
@@ -81,6 +128,10 @@ def change():
return render_template('change.html', form=form) return render_template('change.html', form=form)
@bp.route('/reset', methods=["GET"]) @bp.route('/reset', methods=["GET"])
def reset(): def reset():
"""
The /password/reset route method
"""
return render_template('reset.html') return render_template('reset.html')

View File

@@ -11,3 +11,6 @@ zipp==3.6.0
ldap3 ldap3
Flask-WTF==1.0.0 Flask-WTF==1.0.0
email-validator email-validator
flake8
pylint
bandit

View File

@@ -1,6 +1,11 @@
import os, sys """
WSGI entrypont
"""
import os
import sys
from app import create_app
app_path = os.path.dirname(os.path.abspath(__file__)) app_path = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, app_path) sys.path.insert(0, app_path)
from app import create_app
application = create_app() application = create_app()