1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import datetime
import traceback
from urllib.parse import urljoin
from flask import current_app, flash
from flask_login import LoginManager
import jwt
import requests
from . import db
login_manager = LoginManager()
class User:
def __init__(self, user, user_session):
self.user = user
self.user_session = user_session
self.token = None
def refresh_access_token(self):
response = requests.post(urljoin(current_app.config['OAUTH_URL'], 'token'), data = {
'grant_type': 'refresh_token',
'client_id': current_app.config['OAUTH_CLIENT_ID'],
'client_secret': current_app.config['OAUTH_CLIENT_SECRET'],
'refresh_token': self.user_session.refresh_token,
})
if response.status_code != 200:
flash('Failed to refresh authentication token (API call returned {} {})'.format(response.status_code, response.reason), 'error')
return
token = response.json()
if 'error' in token:
flash('Failed to refresh authentication token ({})'.format(token['error']), 'error')
return
try:
access_data = jwt.decode(token['access_token'], key = current_app.config['JWT_PUBLIC_KEY'], audience = current_app.config['OAUTH_CLIENT_ID'])
refresh_data = jwt.decode(token['refresh_token'], key = current_app.config['JWT_PUBLIC_KEY'], audience = current_app.config['OAUTH_CLIENT_ID'])
except jwt.InvalidTokenError as e:
traceback.print_exc()
flash('Failed to refresh authentication token (verification failed)', 'error')
return
with db.session_scope() as sess:
self.user_session.access_token = token['access_token']
self.user_session.refresh_token = token['refresh_token']
self.user_session.updated = datetime.datetime.utcnow()
sess.add(self.user_session)
sess.commit()
return True
@property
def is_authenticated(self):
if self.user is None:
return False
if self.token:
return True
try:
self.token = jwt.decode(self.user_session.access_token, key = current_app.config['JWT_PUBLIC_KEY'], audience = current_app.config['OAUTH_CLIENT_ID'])
except jwt.ExpiredSignatureError:
try:
if not self.refresh_access_token():
return False
except:
traceback.print_exc()
flash('Failed to refresh authentication token (unhandled error; contact an admin)', 'error')
return False
except jwt.InvalidTokenError:
return False
return True
@property
def is_active(self):
return True
@property
def is_anonymous(self):
return False
def get_id(self):
return '{}:{}'.format(self.user.id, self.user_session.id)
def get_user_id(self):
return self.user.id if self.is_authenticated else None
@property
def username(self):
return self.user.username
@login_manager.user_loader
def load_user(user_id):
user_id, session_id = map(int, user_id.split(':', 1))
try:
with db.session_scope() as sess:
user, user_session = sess.query(db.User, db.UserSession).join(db.UserSession).filter(db.User.id == user_id, db.UserSession.id == session_id).one()
return User(user, user_session)
except:
traceback.print_exc()
return None
|