Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: only allow login of onboarded users #1184

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions fence/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,37 @@ def set_flask_session_values(user):

user = query_for_user(session=current_app.scoped_session(), username=username)
if user:
_update_users_email(user, email)
_update_users_id_from_idp(user, id_from_idp)
_update_users_last_auth(user)

# This expression is relevant to those users who already have user and
# idp info persisted to the database. We return early to avoid
# unnecessarily re-saving that user and idp info.
if user.identity_provider and user.identity_provider.name == provider:
set_flask_session_values(user)
return
if user.active is False:
# Abort login if user.active is False (user.active is None or True are both
# considered active in this case):
raise Unauthorized(
"User is known but not authorized/activated in the system"
)
else:
_update_users_email(user, email)
_update_users_id_from_idp(user, id_from_idp)
_update_users_last_auth(user)

# This expression is relevant to those users who already have user and
# idp info persisted to the database. We return early to avoid
# unnecessarily re-saving that user and idp info.
if user.identity_provider and user.identity_provider.name == provider:
set_flask_session_values(user)
return
else:
# we need a new user
user = User(username=username)

if email:
user.email = email

if id_from_idp:
user.id_from_idp = id_from_idp
# TODO: update iss_sub mapping table?
if not config["ALLOW_NEW_USER_ON_LOGIN"]:
# do not create new active users automatically
raise Unauthorized("New user is not yet authorized/activated in the system")
else:
# add the new user
user = User(username=username)

if email:
user.email = email

if id_from_idp:
user.id_from_idp = id_from_idp
# TODO: update iss_sub mapping table?
pieterlukasse marked this conversation as resolved.
Show resolved Hide resolved

# setup idp connection for new user (or existing user w/o it setup)
idp = (
Expand Down
10 changes: 10 additions & 0 deletions fence/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,16 @@ DEFAULT_BACKOFF_SETTINGS_MAX_TRIES: 3
# here. Something like: support@example.com
SUPPORT_EMAIL_FOR_ERRORS: null

# //////////////////////////////////////////////////////////////////////////////////////
# USER ACTIVATION
# //////////////////////////////////////////////////////////////////////////////////////
# If you want new users (read: users that login for the first time) to automatically be
# allowed through and added to the Fence DB, set this to true. Otherwise, set this to false.
# Setting it to false will ensure the user will only be able to login after the user
# is added to the Fence DB via a separate process. This two-step process allows for
# a separate onboarding and user "approval" process, instead of the default automatic approval.
ALLOW_NEW_USER_ON_LOGIN: true

# //////////////////////////////////////////////////////////////////////////////////////
# SHIBBOLETH
# - Support using `shibboleth` in LOGIN_OPTIONS
Expand Down
1 change: 1 addition & 0 deletions fence/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def post_process(self):
"WHITE_LISTED_GOOGLE_PARENT_ORGS",
"CLIENT_CREDENTIALS_ON_DOWNLOAD_ENABLED",
"DATA_UPLOAD_BUCKET",
"ALLOW_NEW_USER_ON_LOGIN",
pieterlukasse marked this conversation as resolved.
Show resolved Hide resolved
]
for default in defaults:
self.force_default_if_none(default, default_cfg=default_config)
Expand Down
33 changes: 33 additions & 0 deletions tests/login/test_login_user.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import flask
import pytest
from fence.auth import login_user, logout
from fence.models import User, IdentityProvider
import time
from datetime import datetime
from fence.config import config
from fence.errors import Unauthorized


def test_login_user_already_in_db(db_session):
Expand Down Expand Up @@ -33,6 +36,22 @@ def test_login_user_already_in_db(db_session):
assert flask.g.user == test_user


def test_login_failure_for_user_already_in_db_but_inactive(db_session):
"""
Test that if a user is already in the database, but is set to user.active == False,
and logs in, the login returns an Unauthorized error.
"""
email = "testuser@gmail.com"
provider = "Test Provider"
id_from_idp = "Provider_ID_0001"

test_user = User(username=email, is_admin=False, active=False)
db_session.add(test_user)
db_session.commit()
with pytest.raises(Unauthorized):
pieterlukasse marked this conversation as resolved.
Show resolved Hide resolved
login_user(email, provider, email=email, id_from_idp=id_from_idp)


def test_login_user_with_idp_already_in_db(db_session):
"""
Test that if a user is already in the database, has identity_provider
Expand Down Expand Up @@ -85,6 +104,20 @@ def test_login_new_user(db_session):
assert flask.g.user == test_user


def test_login_new_user_not_allowed(db_session, monkeypatch):
"""
Test that when ALLOW_NEW_USER_ON_LOGIN config is False,
and a user that is not in the database logs in, an
Unauthorized error is returned.
"""
monkeypatch.setitem(config, "ALLOW_NEW_USER_ON_LOGIN", False)
email = "testuser@gmail.com"
provider = "Test Provider"
id_from_idp = "Provider_ID_0001"
with pytest.raises(Unauthorized):
pieterlukasse marked this conversation as resolved.
Show resolved Hide resolved
login_user(email, provider, email=email, id_from_idp=id_from_idp)


def test_last_auth_update_in_db(db_session):
"""
Test that the _last_auth field in the DB is updated when the user logs in.
Expand Down
Loading