|
|
@ -1,7 +1,13 @@ |
|
|
|
import ldap as l |
|
|
|
from ldap3 import Server, Connection, ALL, MODIFY_REPLACE |
|
|
|
from flask import Flask, g, request, session, redirect, url_for, render_template |
|
|
|
from flask_simpleldap import LDAP |
|
|
|
from ldap3 import Server, Connection |
|
|
|
from ldap3.core.exceptions import LDAPBindError |
|
|
|
from flask import Flask, g, request, redirect, url_for, render_template, flash |
|
|
|
from flask_sqlalchemy import SQLAlchemy |
|
|
|
from flask_login import LoginManager, login_manager, current_user, login_user, \ |
|
|
|
logout_user, login_required |
|
|
|
from flask_wtf import FlaskForm |
|
|
|
from wtforms import StringField, PasswordField, BooleanField, SubmitField |
|
|
|
from wtforms.validators import DataRequired |
|
|
|
from flask_bootstrap import Bootstrap |
|
|
|
import short_url |
|
|
|
import os |
|
|
@ -12,6 +18,10 @@ Bootstrap(app) |
|
|
|
app.secret_key = 'asdf' |
|
|
|
app.debug = True |
|
|
|
|
|
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' |
|
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False |
|
|
|
app.config['WTF_CSRF_SECRET_KEY'] = 'asdf' |
|
|
|
|
|
|
|
# Base |
|
|
|
app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication' |
|
|
|
app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST') |
|
|
@ -24,57 +34,142 @@ app.config['LDAP_OBJECTS_DN'] = 'dn' |
|
|
|
app.config['LDAP_OPENLDAP'] = True |
|
|
|
app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))' |
|
|
|
|
|
|
|
# Login cookies |
|
|
|
app.config['SESSION_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN') |
|
|
|
app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN') |
|
|
|
|
|
|
|
short_domain = os.environ.get('SHORT_DOMAIN') |
|
|
|
|
|
|
|
ldap = LDAP(app) |
|
|
|
db = SQLAlchemy(app) |
|
|
|
|
|
|
|
login_manager = LoginManager(app) |
|
|
|
login_manager.init_app(app) |
|
|
|
login_manager.login_view = 'login' |
|
|
|
|
|
|
|
db.create_all() |
|
|
|
|
|
|
|
server = Server(app.config['LDAP_HOST']) |
|
|
|
conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True) |
|
|
|
|
|
|
|
class User(db.Model): |
|
|
|
|
|
|
|
__tablename__ = 'user' |
|
|
|
|
|
|
|
id = db.Column(db.Integer, primary_key=True) |
|
|
|
username = db.Column(db.String(100)) |
|
|
|
password = db.Column(db.String(128)) |
|
|
|
authenticated = db.Column(db.Boolean, default=False) |
|
|
|
|
|
|
|
def __init__(self, username, password): |
|
|
|
self.username = username |
|
|
|
self.password = password |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def try_login(username, password): |
|
|
|
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*']) |
|
|
|
if len(conn.entries) > 0: |
|
|
|
Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True) |
|
|
|
return |
|
|
|
raise LDAPBindError |
|
|
|
|
|
|
|
def is_authenticated(self): |
|
|
|
return self.authenticated |
|
|
|
|
|
|
|
def is_active(self): |
|
|
|
return True |
|
|
|
|
|
|
|
def is_anonymous(self): |
|
|
|
return False |
|
|
|
|
|
|
|
def get_id(self): |
|
|
|
return self.id |
|
|
|
|
|
|
|
def get_user_dict(self): |
|
|
|
user = {'dn': '', |
|
|
|
'firstName': '', |
|
|
|
'lastName': '', |
|
|
|
'email': '', |
|
|
|
'userName': self.username, |
|
|
|
} |
|
|
|
|
|
|
|
conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*']) |
|
|
|
|
|
|
|
user['dn'] = conn.entries[0].entry_dn |
|
|
|
user['firstName'] = conn.entries[0].givenName.value |
|
|
|
user['lastName'] = conn.entries[0].sn.value |
|
|
|
user['email'] = conn.entries[0].mail.value |
|
|
|
|
|
|
|
return user |
|
|
|
|
|
|
|
|
|
|
|
class LoginForm(FlaskForm): |
|
|
|
username = StringField('Username', validators=[DataRequired()]) |
|
|
|
password = PasswordField('Password', validators=[DataRequired()]) |
|
|
|
remember_me = BooleanField('Remember Me') |
|
|
|
submit = SubmitField('Sign In') |
|
|
|
|
|
|
|
|
|
|
|
@login_manager.user_loader |
|
|
|
def load_user(id): |
|
|
|
return User.query.get(int(id)) |
|
|
|
|
|
|
|
|
|
|
|
@app.before_request |
|
|
|
def before_request(): |
|
|
|
g.user = None |
|
|
|
if 'user_id' in session: |
|
|
|
# This is where you'd query your database to get the user info. |
|
|
|
g.user = {} |
|
|
|
def get_current_user(): |
|
|
|
g.user = current_user |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
|
@ldap.login_required |
|
|
|
@login_required |
|
|
|
def index(): |
|
|
|
user_dict = ldap.get_object_details(session['user_id']) |
|
|
|
|
|
|
|
if 'user_id' in session: |
|
|
|
user = {'dn': 'cn={},cn=usergroup,ou=users,dc=technicalincompetence,dc=club'.format(user_dict['cn'][0].decode('ascii')), |
|
|
|
'firstName': user_dict['givenName'][0].decode('ascii'), |
|
|
|
'lastName': user_dict['sn'][0].decode('ascii'), |
|
|
|
'email': user_dict['mail'][0].decode('ascii'), |
|
|
|
'userName': user_dict['uid'][0].decode('ascii'), |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return render_template('profile.j2', user = user, short_domain = short_domain) |
|
|
|
return render_template('profile.j2', user = current_user.get_user_dict(), short_domain = short_domain) |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
|
|
def login(): |
|
|
|
if g.user: |
|
|
|
if current_user.is_authenticated: |
|
|
|
flash('You are already logged in.') |
|
|
|
return redirect(url_for('index')) |
|
|
|
if request.method == 'POST': |
|
|
|
user = request.form['user'] |
|
|
|
passwd = request.form['passwd'] |
|
|
|
test = ldap.bind_user(user, passwd) |
|
|
|
if test is None or passwd == '': |
|
|
|
return render_template('login.j2', error='Invalid credentials') |
|
|
|
else: |
|
|
|
session['user_id'] = request.form['user'] |
|
|
|
session['passwd'] = request.form['passwd'] |
|
|
|
return redirect('/') |
|
|
|
return render_template('login.j2') |
|
|
|
|
|
|
|
|
|
|
|
@ldap.login_required |
|
|
|
|
|
|
|
form = LoginForm(request.form) |
|
|
|
print(form) |
|
|
|
print(request.method) |
|
|
|
|
|
|
|
if request.method == 'POST' and form.validate(): |
|
|
|
username = request.form.get('username') |
|
|
|
password = request.form.get('password') |
|
|
|
print(username) |
|
|
|
print(password) |
|
|
|
|
|
|
|
try: |
|
|
|
User.try_login(username, password) |
|
|
|
except LDAPBindError: |
|
|
|
flash( |
|
|
|
'Invalid username or password. Please try again.', |
|
|
|
'danger') |
|
|
|
return render_template('login.j2', form=form) |
|
|
|
|
|
|
|
user = User.query.filter(User.username == username).first() |
|
|
|
|
|
|
|
print(user) |
|
|
|
if user is None: |
|
|
|
user = User(username, password) |
|
|
|
db.session.add(user) |
|
|
|
user.authenticated = True |
|
|
|
db.session.commit() |
|
|
|
login_user(user, remember=form.remember_me.data) |
|
|
|
|
|
|
|
print('You have successfully logged in.') |
|
|
|
return redirect(url_for('index')) |
|
|
|
|
|
|
|
if form.errors: |
|
|
|
flash(form.errors, 'danger') |
|
|
|
|
|
|
|
return render_template('login.j2', form=form) |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/shorten', methods=['POST']) |
|
|
|
@login_required |
|
|
|
def shorten_url(): |
|
|
|
if request.method == 'POST': |
|
|
|
url = request.form['url'] |
|
|
@ -108,9 +203,14 @@ def expand_url(url): |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/logout') |
|
|
|
@login_required |
|
|
|
def logout(): |
|
|
|
session.pop('user_id', None) |
|
|
|
return redirect(url_for('index')) |
|
|
|
user = current_user |
|
|
|
user.authenticated = False |
|
|
|
db.session.add(user) |
|
|
|
db.session.commit() |
|
|
|
logout_user() |
|
|
|
return redirect(url_for('game')) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |