summaryrefslogtreecommitdiff
path: root/fbin/fbin.py
diff options
context:
space:
mode:
authorJon Bergli Heier <snakebite@jvnv.net>2021-06-09 19:19:56 +0200
committerJon Bergli Heier <snakebite@jvnv.net>2021-06-09 19:21:34 +0200
commite96bedf7477d392b8821f76ca85038c198c84375 (patch)
treec56deba384f45da8958d848113b4da9fa73fe3f9 /fbin/fbin.py
parent7c13b1038482d68c2ab581dad16fa81c0a09034e (diff)
Fix linting errors
Style, unused imports, unused variables, etc. as reported by flake8. Configuration for flake8 has been added to setup.cfg.
Diffstat (limited to 'fbin/fbin.py')
-rwxr-xr-xfbin/fbin.py92
1 files changed, 55 insertions, 37 deletions
diff --git a/fbin/fbin.py b/fbin/fbin.py
index a195594..b062c9a 100755
--- a/fbin/fbin.py
+++ b/fbin/fbin.py
@@ -1,33 +1,28 @@
#!/usr/bin/env python
import base64
-import cgi
import datetime
-import hashlib
import importlib
-import io
-import json
-import mimetypes
import os
import random
import subprocess
import tempfile
-import urllib
from urllib.parse import urlencode, urljoin
-from flask import Blueprint, redirect, current_app, url_for, request, render_template, session, flash, send_file, abort, jsonify, Markup, Response
+from flask import Blueprint, redirect, current_app, url_for, request, render_template, session, \
+ flash, send_file, abort, jsonify, Response
from flask_login import login_user, logout_user, current_user, login_required
import jwt
-from PIL import Image, ExifTags
+from PIL import Image
import requests
-from werkzeug.utils import secure_filename
from .db import db, User, UserSession, File, NoResultFound, IntegrityError
from .monkey import patch as monkey_patch
from .login import login_manager, load_user
from .file_storage.exceptions import StorageError
-storage = importlib.import_module(current_app.config.get('STORAGE_MODULE', '.file_storage.filesystem'), package='fbin').Storage(current_app)
+storage = importlib.import_module(current_app.config.get('STORAGE_MODULE', '.file_storage.filesystem'), package='fbin') \
+ .Storage(current_app)
monkey_patch()
@@ -36,6 +31,7 @@ base62_alphabet = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXY
if not os.path.isdir(current_app.config['THUMB_DIRECTORY']):
os.mkdir(current_app.config['THUMB_DIRECTORY'])
+
def get_or_create_user(username, jab_id):
try:
return db.session.query(User).filter(User.jab_id == jab_id).one()
@@ -49,6 +45,7 @@ def get_or_create_user(username, jab_id):
except IntegrityError:
return None
+
def get_file(file_hash, user_id=None, update_accessed=False):
try:
f = db.session.query(File).filter(File.hash == file_hash)
@@ -65,6 +62,7 @@ def get_file(file_hash, user_id=None, update_accessed=False):
db.session.refresh(f)
return f
+
def get_files(user):
try:
db.session.add(user)
@@ -73,19 +71,23 @@ def get_files(user):
return []
return files
+
def delete_file(file):
db.session.delete(file)
db.session.commit()
storage.delete_file(file)
+
app = Blueprint('fbin', __name__)
+
@app.route('/')
def index():
return redirect(url_for('.upload'))
+
@app.route('/u')
-@app.route('/upload', methods = ['GET', 'POST'])
+@app.route('/upload', methods=['GET', 'POST'])
def upload(api=False, user=None):
def error(message):
if api:
@@ -131,23 +133,24 @@ def upload(api=False, user=None):
'status': True,
'hash': new_file.hash,
'urls': {
- 'base': url_for('fbin.file', hash = '', _external = True),
- 'full': url_for('fbin.file', hash = new_file.hash, filename = new_file.filename, _external = True),
- 'ext': url_for('fbin.file', hash = new_file.hash, ext = new_file.ext, _external = True),
- 'hash': url_for('fbin.file', hash = new_file.hash, _external = True),
+ 'base': url_for('fbin.file', hash='', _external=True),
+ 'full': url_for('fbin.file', hash=new_file.hash, filename=new_file.filename, _external=True),
+ 'ext': url_for('fbin.file', hash=new_file.hash, ext=new_file.ext, _external=True),
+ 'hash': url_for('fbin.file', hash=new_file.hash, _external=True),
},
})
elif old_api:
- return 'OK {hash}'.format(hash = new_file.hash)
+ return 'OK {hash}'.format(hash=new_file.hash)
else:
context = {
'file': new_file,
}
- return redirect(url_for('.uploaded', hash = new_file.hash))
+ return redirect(url_for('.uploaded', hash=new_file.hash))
+
@app.route('/uploaded/<hash>')
def uploaded(hash):
- f = get_file(hash, update_accessed = False)
+ f = get_file(hash, update_accessed=False)
if not f:
abort(404)
if f.user_id and (not current_user.is_authenticated or f.user_id != current_user.get_user_id()):
@@ -159,17 +162,18 @@ def uploaded(hash):
}
return render_template('uploaded.html', **context)
+
@app.route('/f/<hash:hash>')
@app.route('/f/<hash:hash><ext:ext>')
@app.route('/f/<hash:hash>/<path:filename>')
-@app.route('/file/<hash:hash>', endpoint = 'file')
-@app.route('/file/<hash:hash><ext:ext>', endpoint = 'file')
-@app.route('/file/<hash:hash>/<path:filename>', endpoint = 'file')
+@app.route('/file/<hash:hash>', endpoint='file')
+@app.route('/file/<hash:hash><ext:ext>', endpoint='file')
+@app.route('/file/<hash:hash>/<path:filename>', endpoint='file')
def _file(hash, ext=None, filename=None):
f = get_file(hash)
- if not f or (f.blocked_reason and (f.blocked_reason['positives'] >= current_app.config['VIRUSTOTAL_MINIMUM_POSITIVES'] \
- or any(scan['detected'] and scan['result'] in current_app.config['VIRUSTOTAL_SINGULAR_MATCHES']
- for scan in f.blocked_reason['scans'].values()))):
+ if not f or (f.blocked_reason and (f.blocked_reason['positives'] >= current_app.config['VIRUSTOTAL_MINIMUM_POSITIVES']
+ or any(scan['detected'] and scan['result'] in current_app.config['VIRUSTOTAL_SINGULAR_MATCHES']
+ for scan in f.blocked_reason['scans'].values()))):
abort(404)
path = storage.get_file(f)
if isinstance(path, Response):
@@ -186,6 +190,7 @@ def _file(hash, ext=None, filename=None):
mimetype = 'application/octet-stream'
return send_file(path, mimetype=mimetype, attachment_filename=f.filename)
+
@app.route('/l')
@app.route('/login')
def login():
@@ -197,23 +202,23 @@ def login():
'state': session['oauth_state'],
}))
+
@app.route('/account')
def account():
return redirect(current_app.config['ACCOUNT_URL'])
+
@app.route('/o')
@app.route('/logout')
def logout():
if not current_user.is_authenticated:
return redirect(url_for('.index'))
session_id = int(current_user.get_id().split(':', 1)[-1])
- try:
- db.session.query(UserSession).filter_by(id = session_id).delete()
- except:
- raise
+ db.session.query(UserSession).filter_by(id=session_id).delete()
logout_user()
return redirect(url_for('.index'))
+
@app.route('/auth')
def auth():
if 'error' in request.args:
@@ -234,7 +239,7 @@ def auth():
flash('Missing OAuth code', 'error')
return redirect(url_for('.index'))
rs = requests.Session()
- response = rs.post(urljoin(current_app.config['OAUTH_URL'], 'token'), data = {
+ response = rs.post(urljoin(current_app.config['OAUTH_URL'], 'token'), data={
'grant_type': 'authorization_code',
'code': code,
'client_id': current_app.config['OAUTH_CLIENT_ID'],
@@ -247,12 +252,15 @@ def auth():
flash(msg, 'error')
return redirect(url_for('.index'))
try:
- access_data = jwt.decode(token['access_token'], key = current_app.config['JWT_PUBLIC_KEY'], audience = current_app.config['OAUTH_CLIENT_ID'])
- refresh_data = jwt.decode(token['refresh_token'], key = current_app.config['JWT_PUBLIC_KEY'], audience = current_app.config['OAUTH_CLIENT_ID'])
+ jwt.decode(token['access_token'], key=current_app.config['JWT_PUBLIC_KEY'],
+ audience=current_app.config['OAUTH_CLIENT_ID'])
+ jwt.decode(token['refresh_token'], key=current_app.config['JWT_PUBLIC_KEY'],
+ audience=current_app.config['OAUTH_CLIENT_ID'])
except jwt.InvalidTokenError as e:
flash('Failed to verify token: {!s}'.format(e), 'error')
return redirect(url_for('.index'))
- response = rs.get(urljoin(current_app.config['OAUTH_URL'], '/api/user'), headers = {'Authorization': 'Bearer {}'.format(token['access_token'])})
+ response = rs.get(urljoin(current_app.config['OAUTH_URL'], '/api/user'),
+ headers={'Authorization': 'Bearer {}'.format(token['access_token'])})
user = response.json()
user = get_or_create_user(user['username'], user['id'])
us = UserSession(user.id, token['access_token'], token['refresh_token'])
@@ -264,9 +272,10 @@ def auth():
if not user:
flash('Failed to retrieve user instance.', 'error')
else:
- login_user(user, remember = True)
+ login_user(user, remember=True)
return redirect(url_for('.index'))
+
@app.route('/m')
@app.route('/files')
@login_required
@@ -279,11 +288,12 @@ def files():
}
return render_template('files.html', **context)
-@app.route('/files', methods = ['POST'])
+
+@app.route('/files', methods=['POST'])
@login_required
def file_edit():
user_id = int(current_user.get_id().split(':')[0])
- f = get_file(request.form.get('hash'), user_id = user_id, update_accessed = False)
+ f = get_file(request.form.get('hash'), user_id=user_id, update_accessed=False)
if not f:
flash('File not found.', 'error')
return redirect(url_for('.files'))
@@ -295,7 +305,7 @@ def file_edit():
elif 'delete' in request.form:
try:
delete_file(f)
- except:
+ except Exception:
flash('Failed to delete file.', 'error')
else:
flash('File deleted.', 'success')
@@ -303,6 +313,7 @@ def file_edit():
flash('No action was performed.', 'warning')
return redirect(url_for('.files'))
+
@app.route('/i')
@app.route('/images')
@login_required
@@ -316,6 +327,7 @@ def images():
}
return render_template('images.html', **context)
+
@app.route('/v')
@app.route('/videos')
@login_required
@@ -329,13 +341,14 @@ def videos():
}
return render_template('images.html', **context)
+
@app.route('/t/<hash:hash>')
@app.route('/thumb/<hash:hash>')
def thumb(hash):
f = get_file(hash, update_accessed=False)
response = storage.get_thumbnail(f)
if not response:
- with tempfile.NamedTemporaryFile(suffix='.jpg') as ttf: # temporary thumb file
+ with tempfile.NamedTemporaryFile(suffix='.jpg') as ttf: # temporary thumb file
if f.is_image():
try:
with storage.temp_file(f) as tf:
@@ -373,11 +386,13 @@ def thumb(hash):
return response
return send_file(response, attachment_filename='thumb.jpg')
+
@app.route('/h')
@app.route('/help')
def help():
return redirect(url_for('.api'))
+
@app.route('/api')
def api():
context = {
@@ -386,6 +401,7 @@ def api():
}
return render_template('api.html', **context)
+
@app.route('/generate-api-key')
def generate_api_key():
if not current_user.is_authenticated:
@@ -401,6 +417,7 @@ def generate_api_key():
token = jwt.encode(data, current_app.config['SECRET_KEY'])
return token
+
@app.route('/invalidate-api-keys')
@login_required
def invalidate_api_keys():
@@ -411,4 +428,5 @@ def invalidate_api_keys():
flash('All API keys invalidated.', 'success')
return redirect(request.referrer)
+
login_manager.login_view = '.login'