import ldap as l from ldap3 import Server, Connection from ldap3.core.exceptions import LDAPBindError from flask import Flask, g, request, redirect, url_for, render_template, flash from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, login_manager, current_user, login_user, \ logout_user, login_required from flask_wtf import FlaskForm from flask_cache_buster import CacheBuster from wtforms import StringField, PasswordField, BooleanField, SubmitField from wtforms.validators import DataRequired from flask_bootstrap import Bootstrap import short_url import os import sqlite3 app = Flask(__name__) Bootstrap(app) app.secret_key = 'asdf' app.debug = True app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['WTF_CSRF_SECRET_KEY'] = 'asdf' # Base app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication' app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST') app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN') app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME') app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD') # OpenLDAP app.config['LDAP_OBJECTS_DN'] = 'dn' app.config['LDAP_OPENLDAP'] = True app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))' # Login cookies app.config['SESSION_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN') app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN') short_domain = os.environ.get('SHORT_DOMAIN') db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.init_app(app) login_manager.login_view = 'login' db.create_all() config = { 'extensions': ['.js', '.css', '.csv'], 'hash_size': 10 } cache_buster = CacheBuster(config=config) cache_buster.register_cache_buster(app) server = Server(app.config['LDAP_HOST']) conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True) class User(db.Model): __tablename__ = 'user' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(100)) authenticated = db.Column(db.Boolean, default=False) def __init__(self, username): self.username = username @staticmethod def try_login(username, password): conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*']) if len(conn.entries) > 0: Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True) return raise LDAPBindError def is_authenticated(self): return self.authenticated def is_active(self): return True def is_anonymous(self): return False def get_id(self): return self.id def get_user_dict(self): user = {'dn': '', 'firstName': '', 'lastName': '', 'email': '', 'userName': self.username, } conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*']) user['dn'] = conn.entries[0].entry_dn user['firstName'] = conn.entries[0].givenName.value user['lastName'] = conn.entries[0].sn.value user['email'] = conn.entries[0].mail.value return user class LoginForm(FlaskForm): username = StringField('Username', validators=[DataRequired()]) password = PasswordField('Password', validators=[DataRequired()]) remember_me = BooleanField('Remember Me') submit = SubmitField('Sign In') @login_manager.user_loader def load_user(id): return User.query.get(int(id)) @app.before_request def get_current_user(): g.user = current_user @app.route('/') @login_required def index(): return render_template('profile.j2', user = current_user.get_user_dict(), short_domain = short_domain) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: flash('You are already logged in.') return redirect(url_for('index')) form = LoginForm(request.form) print(form) print(request.method) if request.method == 'POST' and form.validate(): username = request.form.get('username') password = request.form.get('password') print(username) print(password) try: User.try_login(username, password) except LDAPBindError: flash( 'Invalid username or password. Please try again.', 'danger') return render_template('login.j2', form=form) user = User.query.filter(User.username == username).first() print(user) if user is None: user = User(username, password) db.session.add(user) user.authenticated = True db.session.commit() login_user(user, remember=form.remember_me.data) print('You have successfully logged in.') return redirect(url_for('index')) if form.errors: flash(form.errors, 'danger') return render_template('login.j2', form=form) @app.route('/shorten', methods=['POST']) @login_required def shorten_url(): if request.method == 'POST': url = request.form['url'] conn = sqlite3.connect('links/links.db') c = conn.cursor() if url is not None and len(url) > 0: c.execute("INSERT INTO links (url) VALUES (?)", (url,)) c.execute("SELECT * FROM links WHERE url=?", (url,)) row = c.fetchone() print(row[0]) conn.commit() conn.close() url_fragment = short_url.encode_url(row[0]) return "Your shortened link is {}/{}".format(short_domain, url_fragment, short_domain, url_fragment) conn.commit() conn.close() return 'Error' @app.route('/l/') def expand_url(url): idx = short_url.decode_url(url) conn = sqlite3.connect('links/links.db') c = conn.cursor() c.execute("SELECT * FROM links WHERE id=?", (idx,)) out_link = c.fetchone()[1] return redirect(out_link) @app.route('/logout') @login_required def logout(): user = current_user user.authenticated = False db.session.add(user) db.session.commit() logout_user() return redirect(url_for('game')) if __name__ == '__main__': app.run()