You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

160 lines
5.1 KiB

import ldap as l
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE
from flask import Flask, g, request, session, redirect, url_for, render_template
from flask_simpleldap import LDAP
from flask_bootstrap import Bootstrap
from email_validator import validate_email, EmailNotValidError
import os
app = Flask(__name__)
Bootstrap(app)
app.secret_key = 'asdf'
app.debug = True
# Base
app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication'
app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST')
app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN')
app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME')
app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD')
# OpenLDAP
app.config['LDAP_OBJECTS_DN'] = 'dn'
app.config['LDAP_OPENLDAP'] = True
app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))'
ldap = LDAP(app)
server = Server(app.config['LDAP_HOST'])
conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True)
@app.before_request
def before_request():
g.user = None
if 'user_id' in session:
# This is where you'd query your database to get the user info.
g.user = {}
@app.route('/')
@ldap.login_required
def index():
user_dict = ldap.get_object_details(session['user_id'])
if 'user_id' in session:
user = {'dn': 'cn={},cn=usergroup,ou=users,dc=technicalincompetence,dc=club'.format(user_dict['cn'][0].decode('ascii')),
'firstName': user_dict['givenName'][0].decode('ascii'),
'lastName': user_dict['sn'][0].decode('ascii'),
'email': user_dict['mail'][0].decode('ascii'),
'userName': user_dict['uid'][0].decode('ascii'),
}
return render_template('profile.j2', user = user)
@app.route('/login', methods=['GET', 'POST'])
def login():
if g.user:
return redirect(url_for('index'))
if request.method == 'POST':
user = request.form['user']
passwd = request.form['passwd']
test = ldap.bind_user(user, passwd)
if test is None or passwd == '':
return render_template('login.j2', error='Invalid credentials')
else:
session['user_id'] = request.form['user']
session['passwd'] = request.form['passwd']
return redirect('/')
return render_template('login.j2')
@ldap.login_required
@app.route('/update/email', methods=['POST'])
def update_email():
if request.method == 'POST':
email = request.form['email']
dn = request.form['dn']
if email != None and len(email) > 0:
try:
# Validate.
valid = validate_email(email)
# Update with the normalized form.
conn.modify(dn, {'mail': [(MODIFY_REPLACE, [valid.email])]})
return 'Success'
except EmailNotValidError as e:
# email is not valid, exception message is human-readable
print(str(e))
return 'Invalid email address'
return 'Email cannot be empty'
@ldap.login_required
@app.route('/update/name', methods=['POST'])
def update_name():
if request.method == 'POST':
firstName = request.form['firstName']
lastName = request.form['lastName']
dn = request.form['dn']
if (firstName != None and len(firstName) > 0) and (lastName != None and len(lastName) > 0):
conn.modify(dn, {'givenName': [(MODIFY_REPLACE, [firstName])],
'sn': [(MODIFY_REPLACE, [lastName])]})
return 'Success'
return 'Name cannot be empty'
@ldap.login_required
@app.route('/update/username', methods=['POST'])
def update_username():
if request.method == 'POST':
userName = request.form['userName']
dn = request.form['dn']
if userName != None and len(userName) > 0:
conn.modify(dn, {'uid': [(MODIFY_REPLACE, [userName])]})
return 'Success'
return 'Username cannot be empty'
@ldap.login_required
@app.route('/update/password', methods=['POST'])
def update_password():
if request.method == 'POST':
currentPassword = request.form['currentPassword']
newPassword = request.form['newPassword']
confirmPassword = request.form['confirmPassword']
dn = request.form['dn']
if currentPassword == '':
return 'Please enter your current password'
if newPassword == '':
return 'Please enter a new password'
if confirmPassword == '':
return 'Please confirm your new password'
if newPassword != confirmPassword:
return 'Could not confirm new password, please make sure you typed it correctly'
test = ldap.bind_user(session['user_id'], currentPassword)
if test is None:
return 'Current password is incorrect'
else:
conn.extend.standard.modify_password(user=dn, new_password=newPassword)
return 'Success'
return 'Error'
@app.route('/logout')
def logout():
session.pop('user_id', None)
return redirect(url_for('index'))
if __name__ == '__main__':
app.run()