From e96bedf7477d392b8821f76ca85038c198c84375 Mon Sep 17 00:00:00 2001 From: Jon Bergli Heier Date: Wed, 9 Jun 2021 19:19:56 +0200 Subject: Fix linting errors Style, unused imports, unused variables, etc. as reported by flake8. Configuration for flake8 has been added to setup.cfg. --- fbin/fbin.py | 92 ++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 55 insertions(+), 37 deletions(-) (limited to 'fbin/fbin.py') diff --git a/fbin/fbin.py b/fbin/fbin.py index a195594..b062c9a 100755 --- a/fbin/fbin.py +++ b/fbin/fbin.py @@ -1,33 +1,28 @@ #!/usr/bin/env python import base64 -import cgi import datetime -import hashlib import importlib -import io -import json -import mimetypes import os import random import subprocess import tempfile -import urllib from urllib.parse import urlencode, urljoin -from flask import Blueprint, redirect, current_app, url_for, request, render_template, session, flash, send_file, abort, jsonify, Markup, Response +from flask import Blueprint, redirect, current_app, url_for, request, render_template, session, \ + flash, send_file, abort, jsonify, Response from flask_login import login_user, logout_user, current_user, login_required import jwt -from PIL import Image, ExifTags +from PIL import Image import requests -from werkzeug.utils import secure_filename from .db import db, User, UserSession, File, NoResultFound, IntegrityError from .monkey import patch as monkey_patch from .login import login_manager, load_user from .file_storage.exceptions import StorageError -storage = importlib.import_module(current_app.config.get('STORAGE_MODULE', '.file_storage.filesystem'), package='fbin').Storage(current_app) +storage = importlib.import_module(current_app.config.get('STORAGE_MODULE', '.file_storage.filesystem'), package='fbin') \ + .Storage(current_app) monkey_patch() @@ -36,6 +31,7 @@ base62_alphabet = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXY if not os.path.isdir(current_app.config['THUMB_DIRECTORY']): os.mkdir(current_app.config['THUMB_DIRECTORY']) + def get_or_create_user(username, jab_id): try: return db.session.query(User).filter(User.jab_id == jab_id).one() @@ -49,6 +45,7 @@ def get_or_create_user(username, jab_id): except IntegrityError: return None + def get_file(file_hash, user_id=None, update_accessed=False): try: f = db.session.query(File).filter(File.hash == file_hash) @@ -65,6 +62,7 @@ def get_file(file_hash, user_id=None, update_accessed=False): db.session.refresh(f) return f + def get_files(user): try: db.session.add(user) @@ -73,19 +71,23 @@ def get_files(user): return [] return files + def delete_file(file): db.session.delete(file) db.session.commit() storage.delete_file(file) + app = Blueprint('fbin', __name__) + @app.route('/') def index(): return redirect(url_for('.upload')) + @app.route('/u') -@app.route('/upload', methods = ['GET', 'POST']) +@app.route('/upload', methods=['GET', 'POST']) def upload(api=False, user=None): def error(message): if api: @@ -131,23 +133,24 @@ def upload(api=False, user=None): 'status': True, 'hash': new_file.hash, 'urls': { - 'base': url_for('fbin.file', hash = '', _external = True), - 'full': url_for('fbin.file', hash = new_file.hash, filename = new_file.filename, _external = True), - 'ext': url_for('fbin.file', hash = new_file.hash, ext = new_file.ext, _external = True), - 'hash': url_for('fbin.file', hash = new_file.hash, _external = True), + 'base': url_for('fbin.file', hash='', _external=True), + 'full': url_for('fbin.file', hash=new_file.hash, filename=new_file.filename, _external=True), + 'ext': url_for('fbin.file', hash=new_file.hash, ext=new_file.ext, _external=True), + 'hash': url_for('fbin.file', hash=new_file.hash, _external=True), }, }) elif old_api: - return 'OK {hash}'.format(hash = new_file.hash) + return 'OK {hash}'.format(hash=new_file.hash) else: context = { 'file': new_file, } - return redirect(url_for('.uploaded', hash = new_file.hash)) + return redirect(url_for('.uploaded', hash=new_file.hash)) + @app.route('/uploaded/') def uploaded(hash): - f = get_file(hash, update_accessed = False) + f = get_file(hash, update_accessed=False) if not f: abort(404) if f.user_id and (not current_user.is_authenticated or f.user_id != current_user.get_user_id()): @@ -159,17 +162,18 @@ def uploaded(hash): } return render_template('uploaded.html', **context) + @app.route('/f/') @app.route('/f/') @app.route('/f//') -@app.route('/file/', endpoint = 'file') -@app.route('/file/', endpoint = 'file') -@app.route('/file//', endpoint = 'file') +@app.route('/file/', endpoint='file') +@app.route('/file/', endpoint='file') +@app.route('/file//', endpoint='file') def _file(hash, ext=None, filename=None): f = get_file(hash) - if not f or (f.blocked_reason and (f.blocked_reason['positives'] >= current_app.config['VIRUSTOTAL_MINIMUM_POSITIVES'] \ - or any(scan['detected'] and scan['result'] in current_app.config['VIRUSTOTAL_SINGULAR_MATCHES'] - for scan in f.blocked_reason['scans'].values()))): + if not f or (f.blocked_reason and (f.blocked_reason['positives'] >= current_app.config['VIRUSTOTAL_MINIMUM_POSITIVES'] + or any(scan['detected'] and scan['result'] in current_app.config['VIRUSTOTAL_SINGULAR_MATCHES'] + for scan in f.blocked_reason['scans'].values()))): abort(404) path = storage.get_file(f) if isinstance(path, Response): @@ -186,6 +190,7 @@ def _file(hash, ext=None, filename=None): mimetype = 'application/octet-stream' return send_file(path, mimetype=mimetype, attachment_filename=f.filename) + @app.route('/l') @app.route('/login') def login(): @@ -197,23 +202,23 @@ def login(): 'state': session['oauth_state'], })) + @app.route('/account') def account(): return redirect(current_app.config['ACCOUNT_URL']) + @app.route('/o') @app.route('/logout') def logout(): if not current_user.is_authenticated: return redirect(url_for('.index')) session_id = int(current_user.get_id().split(':', 1)[-1]) - try: - db.session.query(UserSession).filter_by(id = session_id).delete() - except: - raise + db.session.query(UserSession).filter_by(id=session_id).delete() logout_user() return redirect(url_for('.index')) + @app.route('/auth') def auth(): if 'error' in request.args: @@ -234,7 +239,7 @@ def auth(): flash('Missing OAuth code', 'error') return redirect(url_for('.index')) rs = requests.Session() - response = rs.post(urljoin(current_app.config['OAUTH_URL'], 'token'), data = { + response = rs.post(urljoin(current_app.config['OAUTH_URL'], 'token'), data={ 'grant_type': 'authorization_code', 'code': code, 'client_id': current_app.config['OAUTH_CLIENT_ID'], @@ -247,12 +252,15 @@ def auth(): flash(msg, 'error') return redirect(url_for('.index')) try: - access_data = jwt.decode(token['access_token'], key = current_app.config['JWT_PUBLIC_KEY'], audience = current_app.config['OAUTH_CLIENT_ID']) - refresh_data = jwt.decode(token['refresh_token'], key = current_app.config['JWT_PUBLIC_KEY'], audience = current_app.config['OAUTH_CLIENT_ID']) + jwt.decode(token['access_token'], key=current_app.config['JWT_PUBLIC_KEY'], + audience=current_app.config['OAUTH_CLIENT_ID']) + jwt.decode(token['refresh_token'], key=current_app.config['JWT_PUBLIC_KEY'], + audience=current_app.config['OAUTH_CLIENT_ID']) except jwt.InvalidTokenError as e: flash('Failed to verify token: {!s}'.format(e), 'error') return redirect(url_for('.index')) - response = rs.get(urljoin(current_app.config['OAUTH_URL'], '/api/user'), headers = {'Authorization': 'Bearer {}'.format(token['access_token'])}) + response = rs.get(urljoin(current_app.config['OAUTH_URL'], '/api/user'), + headers={'Authorization': 'Bearer {}'.format(token['access_token'])}) user = response.json() user = get_or_create_user(user['username'], user['id']) us = UserSession(user.id, token['access_token'], token['refresh_token']) @@ -264,9 +272,10 @@ def auth(): if not user: flash('Failed to retrieve user instance.', 'error') else: - login_user(user, remember = True) + login_user(user, remember=True) return redirect(url_for('.index')) + @app.route('/m') @app.route('/files') @login_required @@ -279,11 +288,12 @@ def files(): } return render_template('files.html', **context) -@app.route('/files', methods = ['POST']) + +@app.route('/files', methods=['POST']) @login_required def file_edit(): user_id = int(current_user.get_id().split(':')[0]) - f = get_file(request.form.get('hash'), user_id = user_id, update_accessed = False) + f = get_file(request.form.get('hash'), user_id=user_id, update_accessed=False) if not f: flash('File not found.', 'error') return redirect(url_for('.files')) @@ -295,7 +305,7 @@ def file_edit(): elif 'delete' in request.form: try: delete_file(f) - except: + except Exception: flash('Failed to delete file.', 'error') else: flash('File deleted.', 'success') @@ -303,6 +313,7 @@ def file_edit(): flash('No action was performed.', 'warning') return redirect(url_for('.files')) + @app.route('/i') @app.route('/images') @login_required @@ -316,6 +327,7 @@ def images(): } return render_template('images.html', **context) + @app.route('/v') @app.route('/videos') @login_required @@ -329,13 +341,14 @@ def videos(): } return render_template('images.html', **context) + @app.route('/t/') @app.route('/thumb/') def thumb(hash): f = get_file(hash, update_accessed=False) response = storage.get_thumbnail(f) if not response: - with tempfile.NamedTemporaryFile(suffix='.jpg') as ttf: # temporary thumb file + with tempfile.NamedTemporaryFile(suffix='.jpg') as ttf: # temporary thumb file if f.is_image(): try: with storage.temp_file(f) as tf: @@ -373,11 +386,13 @@ def thumb(hash): return response return send_file(response, attachment_filename='thumb.jpg') + @app.route('/h') @app.route('/help') def help(): return redirect(url_for('.api')) + @app.route('/api') def api(): context = { @@ -386,6 +401,7 @@ def api(): } return render_template('api.html', **context) + @app.route('/generate-api-key') def generate_api_key(): if not current_user.is_authenticated: @@ -401,6 +417,7 @@ def generate_api_key(): token = jwt.encode(data, current_app.config['SECRET_KEY']) return token + @app.route('/invalidate-api-keys') @login_required def invalidate_api_keys(): @@ -411,4 +428,5 @@ def invalidate_api_keys(): flash('All API keys invalidated.', 'success') return redirect(request.referrer) + login_manager.login_view = '.login' -- cgit v1.2.3