from flask.helpers import send_from_directory import ldap as l from ldap3 import Server, Connection from ldap3.core.exceptions import LDAPBindError from flask import Flask, g, request, redirect, url_for, render_template, flash from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, login_manager, current_user, login_user, \ logout_user, login_required from flask_wtf import FlaskForm from flask_cache_buster import CacheBuster from wtforms import StringField, PasswordField, BooleanField, SubmitField from wtforms.validators import DataRequired from werkzeug.utils import secure_filename from flask_bootstrap import Bootstrap import short_url import os import sqlite3 app = Flask(__name__) Bootstrap(app) app.secret_key = 'asdf' app.debug = True app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['WTF_CSRF_SECRET_KEY'] = 'asdf' # Base app.config['LDAP_REALM_NAME'] = 'OpenLDAP Authentication' app.config['LDAP_HOST'] = os.environ.get('LDAP_HOST') app.config['LDAP_BASE_DN'] = os.environ.get('LDAP_BASE_DN') app.config['LDAP_USERNAME'] = os.environ.get('LDAP_USERNAME') app.config['LDAP_PASSWORD'] = os.environ.get('LDAP_PASSWORD') # Uploads app.config['UPLOAD_FOLDER'] = 'links/images' # OpenLDAP app.config['LDAP_OBJECTS_DN'] = 'dn' app.config['LDAP_OPENLDAP'] = True app.config['LDAP_USER_OBJECT_FILTER'] = '(&(objectclass=posixAccount)(uid=%s))' # Login cookies #app.config['SESSION_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN') #app.config['REMEMBER_COOKIE_DOMAIN'] = os.environ.get('COOKIE_DOMAIN') short_domain = os.environ.get('SHORT_DOMAIN') ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'} db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.init_app(app) login_manager.login_view = 'login' db.create_all() config = { 'extensions': ['.js', '.css', '.csv'], 'hash_size': 10 } cache_buster = CacheBuster(config=config) cache_buster.register_cache_buster(app) server = Server(app.config['LDAP_HOST']) conn = Connection(server, app.config['LDAP_USERNAME'], app.config['LDAP_PASSWORD'], auto_bind=True) class User(db.Model): __tablename__ = 'user' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(100)) authenticated = db.Column(db.Boolean, default=False) def __init__(self, username): self.username = username @staticmethod def try_login(username, password): conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % username, attributes=['*']) if len(conn.entries) > 0: Connection(app.config['LDAP_HOST'], conn.entries[0].entry_dn, password, auto_bind=True) return raise LDAPBindError def is_authenticated(self): return self.authenticated def is_active(self): return True def is_anonymous(self): return False def get_id(self): return self.id def get_user_dict(self): user = {'dn': '', 'firstName': '', 'lastName': '', 'email': '', 'userName': self.username, } conn.search(app.config['LDAP_BASE_DN'], app.config['LDAP_USER_OBJECT_FILTER'] % self.username, attributes=['*']) user['dn'] = conn.entries[0].entry_dn user['firstName'] = conn.entries[0].givenName.value user['lastName'] = conn.entries[0].sn.value user['email'] = conn.entries[0].mail.value return user class LoginForm(FlaskForm): username = StringField('Username', validators=[DataRequired()]) password = PasswordField('Password', validators=[DataRequired()]) remember_me = BooleanField('Remember Me') submit = SubmitField('Sign In') @login_manager.user_loader def load_user(id): return User.query.get(int(id)) @app.before_request def get_current_user(): g.user = current_user @app.route('/') @login_required def index(): pastes = get_pastes_for_user() links = get_links_for_user() images = get_images_for_user() return render_template('profile.j2', user = current_user.get_user_dict(), short_domain = short_domain, links = links, pastes = pastes, images = images) @app.route('/link') @login_required def link(): return render_template('link.j2', user = current_user.get_user_dict(), short_domain = short_domain) @app.route('/paste') @login_required def text(): return render_template('paste.j2', user = current_user.get_user_dict()) @app.route('/image', methods=['GET', 'POST']) @login_required def image(): if request.method == 'POST': print(request.files) # check if the post request has the file part if 'file' not in request.files: return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, error_msg = "No file part.") file = request.files['file'] # if user does not select file, browser also # submit an empty part without filename if file.filename == '': return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, error_msg = "No selected file.") if file and allowed_file(file.filename): conn = sqlite3.connect('links/links.db') c = conn.cursor() filename = secure_filename(file.filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) c.execute("INSERT INTO images (filename, user_id) VALUES (?, ?)", (filename, current_user.get_user_dict()['dn'])) c.execute("SELECT * FROM images WHERE filename=?", (filename,)) row = c.fetchone() print(row[0]) conn.commit() conn.close() url_fragment = short_url.encode_url(row[0]) return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain, success_msg = "Your image link is {}/i/{}".format(short_domain, url_fragment, short_domain, url_fragment)) return render_template('image.j2', user = current_user.get_user_dict(), short_domain = short_domain) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: flash('You are already logged in.') return redirect(url_for('index')) form = LoginForm(request.form) print(form) print(request.method) if request.method == 'POST' and form.validate(): username = request.form.get('username') password = request.form.get('password') print(username) print(password) try: User.try_login(username, password) except LDAPBindError: flash( 'Invalid username or password. Please try again.', 'danger') return render_template('login.j2', form=form) user = User.query.filter(User.username == username).first() print(user) if user is None: user = User(username, password) db.session.add(user) user.authenticated = True db.session.commit() login_user(user, remember=form.remember_me.data) print('You have successfully logged in.') return redirect(url_for('index')) if form.errors: flash(form.errors, 'danger') return render_template('login.j2', form=form) @app.route('/shorten', methods=['POST']) @login_required def shorten_url(): if request.method == 'POST': url = request.form['url'] conn = sqlite3.connect('links/links.db') c = conn.cursor() if url is not None and len(url) > 0: c.execute("INSERT INTO links (url, user_id) VALUES (?, ?)", (url, current_user.get_user_dict()['dn'])) c.execute("SELECT * FROM links WHERE url=?", (url,)) row = c.fetchone() print(row[0]) conn.commit() conn.close() url_fragment = short_url.encode_url(row[0]) return "Your shortened link is {}/l/{}".format(short_domain, url_fragment, short_domain, url_fragment) conn.commit() conn.close() return 'Error' @app.route('/save', methods=['POST']) @login_required def save_paste(): if request.method == 'POST': paste = request.data conn = sqlite3.connect('links/links.db') c = conn.cursor() if paste is not None and len(paste) > 0: c.execute("INSERT INTO pastes (paste, user_id) VALUES (?, ?)", (paste, current_user.get_user_dict()['dn'])) c.execute("SELECT * FROM pastes WHERE paste=?", (paste,)) row = c.fetchone() print(row[0]) conn.commit() conn.close() url_fragment = short_url.encode_url(row[0]) return {"success": True, "msg": "Your paste link is {}/p/{}".format(short_domain, url_fragment, short_domain, url_fragment)} conn.commit() conn.close() return {'success': False} @app.route('/delete', methods=['POST']) @login_required def delete(): if request.method == 'POST': data = request.json table = data['table'] conn = sqlite3.connect('links/links.db') c = conn.cursor() if table == 'links': c.execute("DELETE FROM links WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn'])) elif table == 'pastes': c.execute("DELETE FROM pastes WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn'])) elif table == 'images': c.execute("DELETE FROM images WHERE id=? AND user_id=?", (data['id'], current_user.get_user_dict()['dn'])) else: return {'success': False, 'msg': 'This table doesn\'t exist!'} conn.commit() conn.close() return {'success': True, 'msg': 'Deleted successfully!'} return {'success': False, 'msg': 'An error occurred.'} @app.route('/l/') def expand_url(url): idx = short_url.decode_url(url) conn = sqlite3.connect('links/links.db') c = conn.cursor() c.execute("SELECT * FROM links WHERE id=?", (idx,)) out = c.fetchone() if out != None: out_link = out[1] return redirect(out_link) return render_template('404.j2') @app.route('/p/') def show_paste(url): idx = short_url.decode_url(url) conn = sqlite3.connect('links/links.db') c = conn.cursor() c.execute("SELECT * FROM pastes WHERE id=?", (idx,)) out = c.fetchone() if out != None: out_paste = str(out[1], 'utf-8') return render_template('public_paste.j2', paste = out_paste) return render_template('404.j2') @app.route('/i/') def show_image(url): idx = short_url.decode_url(url) conn = sqlite3.connect('links/links.db') c = conn.cursor() c.execute("SELECT * FROM images WHERE id=?", (idx,)) out = c.fetchone() if out != None: filename = out[1] return send_from_directory(app.config['UPLOAD_FOLDER'], filename=filename, as_attachment=False) return render_template('404.j2') def get_pastes_for_user(): conn = sqlite3.connect('links/links.db') c = conn.cursor() c.execute("SELECT * FROM pastes WHERE user_id=?", (current_user.get_user_dict()["dn"],)) out = [] for row in c.fetchall(): a = "{}/p/{} - {}".format(short_domain, short_url.encode_url(row[0]), str(row[1], 'utf-8')[:80]) b = "{}/p/{}".format(short_domain, short_url.encode_url(row[0])) out.append((row[0], a, b)) return out def get_links_for_user(): conn = sqlite3.connect('links/links.db') c = conn.cursor() c.execute("SELECT * FROM links WHERE user_id=?", (current_user.get_user_dict()["dn"],)) out = [] for row in c.fetchall(): a = "{}/l/{} - {}".format(short_domain, short_url.encode_url(row[0]), row[1]) b = "{}/l/{}".format(short_domain, short_url.encode_url(row[0])) out.append((row[0], a, b)) return out def get_images_for_user(): conn = sqlite3.connect('links/links.db') c = conn.cursor() c.execute("SELECT * FROM images WHERE user_id=?", (current_user.get_user_dict()["dn"],)) out = [] for row in c.fetchall(): a = "{}/i/{}".format(short_domain, short_url.encode_url(row[0])) out.append((row[0], a, a)) return out def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route('/logout') @login_required def logout(): user = current_user user.authenticated = False db.session.add(user) db.session.commit() logout_user() return redirect(url_for('game')) if __name__ == '__main__': app.run()