@ -0,0 +1,46 @@ | |||
import ldap as l | |||
from flask import Flask | |||
from flask_sqlalchemy import SQLAlchemy | |||
from flask_login import LoginManager, login_manager | |||
from flask_bootstrap import Bootstrap | |||
from flask_simpleldap import LDAP | |||
import os | |||
app = Flask(__name__) | |||
Bootstrap(app) | |||
app.secret_key = 'asdf' | |||
app.debug = True | |||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///../users.db' | |||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |||
app.config['WTF_CSRF_SECRET_KEY'] = 'asdf' | |||
# Base | |||
app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication' | |||
app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST') | |||
app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN') | |||
app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME') | |||
app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD') | |||
# OpenLDAP | |||
app.config['LDAP_OBJECTS_DN'] = 'dn' | |||
app.config['LDAP_OPENLDAP'] = True | |||
app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))' | |||
# Login cookies | |||
app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN') | |||
db = SQLAlchemy(app) | |||
ldap = LDAP(app) | |||
login_manager = LoginManager(app) | |||
login_manager.init_app(app) | |||
login_manager.login_view = 'auth.login' | |||
from accounts.auth.views import auth | |||
app.register_blueprint(auth) | |||
db.create_all() |
@ -0,0 +1,74 @@ | |||
import ldap | |||
from ldap3 import Server, Connection | |||
from flask_wtf import FlaskForm | |||
from flask_login import UserMixin | |||
from ldap3.core.exceptions import LDAPBindError | |||
from wtforms import StringField, PasswordField, BooleanField, SubmitField | |||
from wtforms.validators import DataRequired | |||
from accounts import app, db | |||
def get_ldap_connection(): | |||
server = Server(app.config['LDAP_HOST']) | |||
conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True) | |||
return conn | |||
class User(db.Model): | |||
__tablename__ = 'user' | |||
id = db.Column(db.Integer, primary_key=True) | |||
username = db.Column(db.String(100)) | |||
password = db.Column(db.String(128)) | |||
authenticated = db.Column(db.Boolean, default=False) | |||
def __init__(self, username, password): | |||
self.username = username | |||
self.password = password | |||
@staticmethod | |||
def try_login(username, password): | |||
conn = get_ldap_connection() | |||
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*']) | |||
if len(conn.entries) > 0: | |||
Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True) | |||
return | |||
raise LDAPBindError | |||
def is_authenticated(self): | |||
return self.authenticated | |||
def is_active(self): | |||
return True | |||
def is_anonymous(self): | |||
return False | |||
def get_id(self): | |||
return self.id | |||
def get_user_dict(self): | |||
user = {'dn': '', | |||
'firstName': '', | |||
'lastName': '', | |||
'email': '', | |||
'userName': self.username, | |||
} | |||
conn = get_ldap_connection() | |||
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*']) | |||
user['dn'] = conn.entries[0].entry_dn | |||
user['firstName'] = conn.entries[0].givenName.value | |||
user['lastName'] = conn.entries[0].sn.value | |||
user['email'] = conn.entries[0].mail.value | |||
return user | |||
class LoginForm(FlaskForm): | |||
username = StringField('Username', validators=[DataRequired()]) | |||
password = PasswordField('Password', validators=[DataRequired()]) | |||
remember_me = BooleanField('Remember Me') | |||
submit = SubmitField('Sign In') |
@ -0,0 +1,167 @@ | |||
from flask import request, render_template, flash, redirect, \ | |||
url_for, Blueprint, g | |||
from flask_login import current_user, login_user, \ | |||
logout_user, login_required | |||
from ldap3 import MODIFY_REPLACE | |||
from ldap3.core.exceptions import LDAPBindError | |||
from accounts import login_manager, db, ldap | |||
from accounts.auth.models import User, LoginForm, get_ldap_connection | |||
from email_validator import validate_email, EmailNotValidError | |||
auth = Blueprint('auth', __name__) | |||
@login_manager.user_loader | |||
def load_user(id): | |||
return User.query.get(int(id)) | |||
@auth.before_request | |||
def get_current_user(): | |||
g.user = current_user | |||
@auth.route('/') | |||
@login_required | |||
def home(): | |||
return render_template('profile.j2', user = current_user.get_user_dict()) | |||
@auth.route('/update/email', methods=['POST']) | |||
@login_required | |||
def update_email(): | |||
if request.method == 'POST': | |||
email = request.form['email'] | |||
dn = request.form['dn'] | |||
if email != None and len(email) > 0: | |||
try: | |||
# Validate. | |||
valid = validate_email(email) | |||
# Update with the normalized form. | |||
conn = get_ldap_connection() | |||
conn.modify(dn, {'mail': [(MODIFY_REPLACE, [valid.email])]}) | |||
return 'Success' | |||
except EmailNotValidError as e: | |||
# email is not valid, exception message is human-readable | |||
print(str(e)) | |||
return 'Invalid email address' | |||
return 'Email cannot be empty' | |||
@auth.route('/update/name', methods=['POST']) | |||
@login_required | |||
def update_name(): | |||
if request.method == 'POST': | |||
firstName = request.form['firstName'] | |||
lastName = request.form['lastName'] | |||
dn = request.form['dn'] | |||
if (firstName != None and len(firstName) > 0) and (lastName != None and len(lastName) > 0): | |||
conn = get_ldap_connection() | |||
conn.modify(dn, {'givenName': [(MODIFY_REPLACE, [firstName])], | |||
'sn': [(MODIFY_REPLACE, [lastName])]}) | |||
return 'Success' | |||
return 'Name cannot be empty' | |||
@auth.route('/update/username', methods=['POST']) | |||
@login_required | |||
def update_username(): | |||
if request.method == 'POST': | |||
userName = request.form['userName'] | |||
dn = request.form['dn'] | |||
if userName != None and len(userName) > 0: | |||
conn = get_ldap_connection() | |||
conn.modify(dn, {'uid': [(MODIFY_REPLACE, [userName])]}) | |||
return 'Success' | |||
return 'Username cannot be empty' | |||
@auth.route('/update/password', methods=['POST']) | |||
@login_required | |||
def update_password(): | |||
if request.method == 'POST': | |||
currentPassword = request.form['currentPassword'] | |||
newPassword = request.form['newPassword'] | |||
confirmPassword = request.form['confirmPassword'] | |||
dn = request.form['dn'] | |||
if currentPassword == '': | |||
return 'Please enter your current password' | |||
if newPassword == '': | |||
return 'Please enter a new password' | |||
if confirmPassword == '': | |||
return 'Please confirm your new password' | |||
if newPassword != confirmPassword: | |||
return 'Could not confirm new password, please make sure you typed it correctly' | |||
try: | |||
User.try_login(current_user.username, currentPassword) | |||
except LDAPBindError: | |||
return 'Current password is incorrect' | |||
conn = get_ldap_connection() | |||
conn.extend.standard.modify_password(user=dn, new_password=newPassword) | |||
return 'Success' | |||
return 'Error' | |||
@auth.route('/login', methods=['GET', 'POST']) | |||
def login(): | |||
if current_user.is_authenticated: | |||
flash('You are already logged in.') | |||
return redirect(url_for('auth.home')) | |||
form = LoginForm(request.form) | |||
print(form) | |||
print(request.method) | |||
if request.method == 'POST' and form.validate(): | |||
username = request.form.get('username') | |||
password = request.form.get('password') | |||
print(username) | |||
print(password) | |||
try: | |||
User.try_login(username, password) | |||
except LDAPBindError: | |||
flash( | |||
'Invalid username or password. Please try again.', | |||
'danger') | |||
return render_template('login.j2', form=form) | |||
user = User.query.filter(User.username == username).first() | |||
print(user) | |||
if user is None: | |||
user = User(username, password) | |||
db.session.add(user) | |||
user.authenticated = True | |||
db.session.commit() | |||
login_user(user, remember=form.remember_me.data) | |||
print('You have successfully logged in.') | |||
return redirect(url_for('auth.home')) | |||
if form.errors: | |||
flash(form.errors, 'danger') | |||
return render_template('login.j2', form=form) | |||
@auth.route('/logout') | |||
@login_required | |||
def logout(): | |||
user = current_user | |||
user.authenticated = False | |||
db.session.add(user) | |||
db.session.commit() | |||
logout_user() | |||
return redirect(url_for('auth.home')) |
@ -1,160 +1,4 @@ | |||
import ldap as l | |||
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE | |||
from flask import Flask, g, request, session, redirect, url_for, render_template | |||
from flask_simpleldap import LDAP | |||
from flask_bootstrap import Bootstrap | |||
from email_validator import validate_email, EmailNotValidError | |||
import os | |||
from accounts import app | |||
app = Flask(__name__) | |||
Bootstrap(app) | |||
app.secret_key = 'asdf' | |||
app.debug = True | |||
# Base | |||
app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication' | |||
app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST') | |||
app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN') | |||
app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME') | |||
app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD') | |||
# OpenLDAP | |||
app.config['LDAP_OBJECTS_DN'] = 'dn' | |||
app.config['LDAP_OPENLDAP'] = True | |||
app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))' | |||
ldap = LDAP(app) | |||
server = Server(app.config['LDAP_HOST']) | |||
conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True) | |||
@app.before_request | |||
def before_request(): | |||
g.user = None | |||
if 'user_id' in session: | |||
# This is where you'd query your database to get the user info. | |||
g.user = {} | |||
@app.route('/') | |||
@ldap.login_required | |||
def index(): | |||
user_dict = ldap.get_object_details(session['user_id']) | |||
if 'user_id' in session: | |||
user = {'dn': 'cn={},cn=usergroup,ou=users,dc=technicalincompetence,dc=club'.format(user_dict['cn'][0].decode('ascii')), | |||
'firstName': user_dict['givenName'][0].decode('ascii'), | |||
'lastName': user_dict['sn'][0].decode('ascii'), | |||
'email': user_dict['mail'][0].decode('ascii'), | |||
'userName': user_dict['uid'][0].decode('ascii'), | |||
} | |||
return render_template('profile.j2', user = user) | |||
@app.route('/login', methods=['GET', 'POST']) | |||
def login(): | |||
if g.user: | |||
return redirect(url_for('index')) | |||
if request.method == 'POST': | |||
user = request.form['user'] | |||
passwd = request.form['passwd'] | |||
test = ldap.bind_user(user, passwd) | |||
if test is None or passwd == '': | |||
return render_template('login.j2', error='Invalid credentials') | |||
else: | |||
session['user_id'] = request.form['user'] | |||
session['passwd'] = request.form['passwd'] | |||
return redirect('/') | |||
return render_template('login.j2') | |||
@ldap.login_required | |||
@app.route('/update/email', methods=['POST']) | |||
def update_email(): | |||
if request.method == 'POST': | |||
email = request.form['email'] | |||
dn = request.form['dn'] | |||
if email != None and len(email) > 0: | |||
try: | |||
# Validate. | |||
valid = validate_email(email) | |||
# Update with the normalized form. | |||
conn.modify(dn, {'mail': [(MODIFY_REPLACE, [valid.email])]}) | |||
return 'Success' | |||
except EmailNotValidError as e: | |||
# email is not valid, exception message is human-readable | |||
print(str(e)) | |||
return 'Invalid email address' | |||
return 'Email cannot be empty' | |||
@ldap.login_required | |||
@app.route('/update/name', methods=['POST']) | |||
def update_name(): | |||
if request.method == 'POST': | |||
firstName = request.form['firstName'] | |||
lastName = request.form['lastName'] | |||
dn = request.form['dn'] | |||
if (firstName != None and len(firstName) > 0) and (lastName != None and len(lastName) > 0): | |||
conn.modify(dn, {'givenName': [(MODIFY_REPLACE, [firstName])], | |||
'sn': [(MODIFY_REPLACE, [lastName])]}) | |||
return 'Success' | |||
return 'Name cannot be empty' | |||
@ldap.login_required | |||
@app.route('/update/username', methods=['POST']) | |||
def update_username(): | |||
if request.method == 'POST': | |||
userName = request.form['userName'] | |||
dn = request.form['dn'] | |||
if userName != None and len(userName) > 0: | |||
conn.modify(dn, {'uid': [(MODIFY_REPLACE, [userName])]}) | |||
return 'Success' | |||
return 'Username cannot be empty' | |||
@ldap.login_required | |||
@app.route('/update/password', methods=['POST']) | |||
def update_password(): | |||
if request.method == 'POST': | |||
currentPassword = request.form['currentPassword'] | |||
newPassword = request.form['newPassword'] | |||
confirmPassword = request.form['confirmPassword'] | |||
dn = request.form['dn'] | |||
if currentPassword == '': | |||
return 'Please enter your current password' | |||
if newPassword == '': | |||
return 'Please enter a new password' | |||
if confirmPassword == '': | |||
return 'Please confirm your new password' | |||
if newPassword != confirmPassword: | |||
return 'Could not confirm new password, please make sure you typed it correctly' | |||
test = ldap.bind_user(session['user_id'], currentPassword) | |||
if test is None: | |||
return 'Current password is incorrect' | |||
else: | |||
conn.extend.standard.modify_password(user=dn, new_password=newPassword) | |||
return 'Success' | |||
return 'Error' | |||
@app.route('/logout') | |||
def logout(): | |||
session.pop('user_id', None) | |||
return redirect(url_for('index')) | |||
if __name__ == '__main__': | |||
if __name__ == "main": | |||
app.run() |