from contextlib import contextmanager import datetime import mimetypes import os from flask import current_app from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Index, ForeignKey, Boolean, JSON, BigInteger from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relation, backref from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import and_ engine = create_engine(current_app.config['DB_URI']) Base = declarative_base(bind = engine) class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key = True) username = Column(String, unique = True, index = True) jab_id = Column(String(24), unique = True, index = True) api_key_date = Column(DateTime, default = datetime.datetime.utcnow) files = relation('File', backref = 'user', order_by = 'File.date.desc()') def __init__(self, username, jab_id): self.username = username self.jab_id = jab_id class UserSession(Base): __tablename__ = 'sessions' id = Column(Integer, primary_key = True) user_id = Column(Integer, ForeignKey('users.id'), index = True) access_token = Column(String) refresh_token = Column(String) updated = Column(DateTime) def __init__(self, user_id, access_token, refresh_token): self.user_id = user_id self.access_token = access_token self.refresh_token = refresh_token class File(Base): __tablename__ = 'files' id = Column(Integer, primary_key = True) hash = Column(String, unique = True, index = True) filename = Column(String) size = Column(BigInteger) date = Column(DateTime) user_id = Column(Integer, ForeignKey('users.id'), nullable = True) ip = Column(String) accessed = Column(DateTime) scanned = Column(Boolean, nullable=False, default=False) blocked_reason = Column(JSON) def __init__(self, hash, filename, size, date, user_id = None, ip = None): self.hash = hash self.filename = filename self.size = size self.date = date self.user_id = user_id self.ip = ip @staticmethod def pretty_size(size): if size is None: return 'N/A' suffixes = (('TiB', 2**40), ('GiB', 2**30), ('MiB', 2**20), ('KiB', 2**10)) for suf, threshold in suffixes: if size >= threshold: return '{:.2f} {}'.format(size / threshold, suf) else: continue return '{} B'.format(size) def get_path(self): return os.path.join(current_app.config['FILE_DIRECTORY'], self.hash + os.path.splitext(self.filename)[1]) def get_thumb_path(self): return os.path.join(current_app.config['THUMB_DIRECTORY'], self.hash + '.jpg') def get_size(self): try: if self.size: return self.size return os.path.getsize(self.get_path()) except OSError: return None @property def formatted_size(self): return self.pretty_size(self.get_size()) @property def formatted_date(self): return self.date.strftime('%Y-%m-%d %H:%M:%S UTC') def get_mime_type(self): return mimetypes.guess_type(self.filename, strict = False)[0] or 'application/octet-stream' def is_image(self): return self.get_mime_type().startswith('image') def is_video(self): return self.get_mime_type().startswith('video') @property def ext(self): return os.path.splitext(self.filename)[1] @property def exists(self): return os.path.exists(self.get_path()) Base.metadata.create_all() Session = sessionmaker(bind = engine, autoflush = True, autocommit = False) @contextmanager def session_scope(): session = Session() try: session.expire_on_commit = False yield session session.commit() except: session.rollback() raise finally: session.close()