-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
External login with SAML #452
Labels
enhancement
New feature or request
Comments
ChatGPT example: import asyncio
import base64
import logging
from urllib.parse import urlparse
from aiohttp import web
from saml2 import BINDING_HTTP_REDIRECT
from saml2.client import Saml2Client
from saml2.config import Config as Saml2Config
from saml2.response import StatusAuthnFailed
# Configure logging
logging.basicConfig(level=logging.INFO)
# SAML Service Provider configuration
SAML_CONFIG = {
"entityid": "https://sp.example.com/metadata/",
"service": {
"sp": {
"endpoints": {
"assertion_consumer_service": [
("https://sp.example.com/saml/acs", BINDING_HTTP_REDIRECT),
],
},
"allow_unsolicited": True,
}
},
"metadata": {
"local": ["idp_metadata.xml"], # Provide your IdP metadata file
},
"security": {
"authn_requests_signed": False,
"want_assertions_signed": True,
},
}
# Load SAML SP client
def get_saml_client():
config = Saml2Config()
config.load(SAML_CONFIG)
return Saml2Client(config)
# SAML login: Redirects user to IdP for authentication
async def saml_login(request):
client = get_saml_client()
reqid, authn_request = client.prepare_for_authenticate(
entityid="https://idp.example.com/sso",
binding=BINDING_HTTP_REDIRECT
)
redirect_url = dict(authn_request["headers"])["Location"]
logging.info(f"Redirecting user to IdP: {redirect_url}")
return web.HTTPFound(redirect_url)
# SAML Assertion Consumer Service (ACS): Handles response from IdP
async def saml_acs(request):
client = get_saml_client()
saml_response = await request.post() # Get SAMLResponse from POST body
if "SAMLResponse" not in saml_response:
return web.Response(text="Missing SAMLResponse", status=400)
# Decode and process the SAML response
saml_response_decoded = base64.b64decode(saml_response["SAMLResponse"])
logging.info(f"Received SAML Response: {saml_response_decoded.decode()}")
try:
authn_response = client.parse_authn_request_response(
saml_response["SAMLResponse"],
binding=BINDING_HTTP_REDIRECT
)
if authn_response.status != "urn:oasis:names:tc:SAML:2.0:status:Success":
raise StatusAuthnFailed("SAML authentication failed")
user_identity = authn_response.get_identity()
user_name = authn_response.get_subject().text
logging.info(f"User {user_name} authenticated successfully with attributes: {user_identity}")
return web.Response(text=f"Welcome, {user_name}! Attributes: {user_identity}")
except Exception as e:
logging.error(f"SAML Authentication failed: {e}")
return web.Response(text="Authentication failed", status=401)
# Define aiohttp web app
app = web.Application()
app.router.add_get("/saml/login", saml_login)
app.router.add_post("/saml/acs", saml_acs)
# Run the aiohttp server
if __name__ == "__main__":
web.run_app(app, port=8080) |
the example above, fixed and adopted to ASAB: #!/usr/bin/env python3
import asab.web.rest
import saml2
import saml2.config
import saml2.client
import saml2.response
import aiohttp.web
if "web" not in asab.Config:
asab.Config["web"] = {
"listen": "8080"
}
class SamlApplication(asab.Application):
def __init__(self):
super().__init__()
# Initialize web container
self.add_module(asab.web.Module)
self.WebService = self.get_service("asab.WebService")
self.WebContainer = asab.web.WebContainer(self.WebService, "web")
# Add routes
self.WebContainer.WebApp.router.add_get(f"/saml/login", self.saml_login)
self.WebContainer.WebApp.router.add_post(f"/saml/acs", self.saml_acs)
self.SamlClient = self.init_saml_client()
def init_saml_client(self):
config_dict = {
"entityid": "https://auth.local.loc", # Must be registered at IdP
"service": {
"sp": {
"endpoints": {
"assertion_consumer_service": [
("https://auth.local.loc/auth/api/saml/acs", saml2.BINDING_HTTP_POST), # Must be registered at IdP
],
},
"allow_unsolicited": True,
"authn_requests_signed": False,
"want_assertions_signed": False, # insecure, use cert and key in production
"want_response_signed": False, # insecure, use cert and key in production
}
},
"metadata": {
"local": ["examples/ssocircle-idp-meta.xml"], # Download from https://idp.ssocircle.com/meta-idp.xml
},
}
config = saml2.config.Config()
config.load(config_dict)
return saml2.client.Saml2Client(config)
async def saml_login(self, request):
reqid, authn_request = self.SamlClient.prepare_for_authenticate(
entityid="https://idp.ssocircle.com",
binding=saml2.BINDING_HTTP_REDIRECT
)
redirect_url = dict(authn_request["headers"])["Location"]
print(f"Redirecting user to IdP: {redirect_url}")
return aiohttp.web.HTTPFound(redirect_url)
async def saml_acs(self, request):
client = self.SamlClient
saml_response = await request.post() # Get SAMLResponse from POST body
if "SAMLResponse" not in saml_response:
return aiohttp.web.Response(text="Missing SAMLResponse", status=400)
authn_response = None
try:
authn_response = client.parse_authn_request_response(
saml_response["SAMLResponse"],
binding=saml2.BINDING_HTTP_POST
)
if not authn_response.status_ok():
print("no success")
return aiohttp.web.Response(text="Authentication failed", status=401)
user_identity = authn_response.get_identity()
try:
user_name = authn_response.get_subject()
except ValueError:
user_name = "!Unknown!"
print(f"User {user_name} authenticated successfully with attributes: {user_identity}")
return aiohttp.web.Response(text=f"Welcome, {user_name}! \nAttributes: {user_identity}")
except Exception as e:
print(f"SAML Authentication failed: {e}")
print(authn_response)
raise(e)
if __name__ == "__main__":
app = SamlApplication()
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Implement generic SAML provider for external login.
It will likely use Web browser SSO profile (https://en.wikipedia.org/wiki/SAML_2.0#Web_browser_SSO_profile), probably using one of the redirect flows.
Seacat Auth will act as SAML Service Provider and will make request to SAML Identity Provider.
Try
pysaml2
library.Public identity providers:
Links
The text was updated successfully, but these errors were encountered: