Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External login with SAML #452

Open
byewokko opened this issue Mar 6, 2025 · 2 comments
Open

External login with SAML #452

byewokko opened this issue Mar 6, 2025 · 2 comments
Labels
enhancement New feature or request

Comments

@byewokko
Copy link
Collaborator

byewokko commented Mar 6, 2025

Implement generic SAML provider for external login.

It will likely use Web browser SSO profile (https://en.wikipedia.org/wiki/SAML_2.0#Web_browser_SSO_profile), probably using one of the redirect flows.

Seacat Auth will act as SAML Service Provider and will make request to SAML Identity Provider.

Try pysaml2 library.

Public identity providers:

  • SSOCircle.com - free and open
  • Azure Entra ID - i dont have the permission in teskalabs
  • Free developer account at Okta - too complicated

Links

@byewokko byewokko added the enhancement New feature or request label Mar 6, 2025
@byewokko
Copy link
Collaborator Author

byewokko commented Mar 6, 2025

ChatGPT example:

import asyncio
import base64
import logging
from urllib.parse import urlparse
from aiohttp import web
from saml2 import BINDING_HTTP_REDIRECT
from saml2.client import Saml2Client
from saml2.config import Config as Saml2Config
from saml2.response import StatusAuthnFailed

# Configure logging
logging.basicConfig(level=logging.INFO)

# SAML Service Provider configuration
SAML_CONFIG = {
    "entityid": "https://sp.example.com/metadata/",
    "service": {
        "sp": {
            "endpoints": {
                "assertion_consumer_service": [
                    ("https://sp.example.com/saml/acs", BINDING_HTTP_REDIRECT),
                ],
            },
            "allow_unsolicited": True,
        }
    },
    "metadata": {
        "local": ["idp_metadata.xml"],  # Provide your IdP metadata file
    },
    "security": {
        "authn_requests_signed": False,
        "want_assertions_signed": True,
    },
}

# Load SAML SP client
def get_saml_client():
    config = Saml2Config()
    config.load(SAML_CONFIG)
    return Saml2Client(config)

# SAML login: Redirects user to IdP for authentication
async def saml_login(request):
    client = get_saml_client()
    reqid, authn_request = client.prepare_for_authenticate(
        entityid="https://idp.example.com/sso",
        binding=BINDING_HTTP_REDIRECT
    )
    
    redirect_url = dict(authn_request["headers"])["Location"]
    logging.info(f"Redirecting user to IdP: {redirect_url}")
    return web.HTTPFound(redirect_url)

# SAML Assertion Consumer Service (ACS): Handles response from IdP
async def saml_acs(request):
    client = get_saml_client()
    saml_response = await request.post()  # Get SAMLResponse from POST body
    
    if "SAMLResponse" not in saml_response:
        return web.Response(text="Missing SAMLResponse", status=400)

    # Decode and process the SAML response
    saml_response_decoded = base64.b64decode(saml_response["SAMLResponse"])
    logging.info(f"Received SAML Response: {saml_response_decoded.decode()}")

    try:
        authn_response = client.parse_authn_request_response(
            saml_response["SAMLResponse"], 
            binding=BINDING_HTTP_REDIRECT
        )

        if authn_response.status != "urn:oasis:names:tc:SAML:2.0:status:Success":
            raise StatusAuthnFailed("SAML authentication failed")

        user_identity = authn_response.get_identity()
        user_name = authn_response.get_subject().text

        logging.info(f"User {user_name} authenticated successfully with attributes: {user_identity}")
        return web.Response(text=f"Welcome, {user_name}! Attributes: {user_identity}")

    except Exception as e:
        logging.error(f"SAML Authentication failed: {e}")
        return web.Response(text="Authentication failed", status=401)

# Define aiohttp web app
app = web.Application()
app.router.add_get("/saml/login", saml_login)
app.router.add_post("/saml/acs", saml_acs)

# Run the aiohttp server
if __name__ == "__main__":
    web.run_app(app, port=8080)

@byewokko
Copy link
Collaborator Author

byewokko commented Mar 7, 2025

the example above, fixed and adopted to ASAB:

#!/usr/bin/env python3
import asab.web.rest
import saml2
import saml2.config
import saml2.client
import saml2.response
import aiohttp.web


if "web" not in asab.Config:
	asab.Config["web"] = {
		"listen": "8080"
	}


class SamlApplication(asab.Application):

	def __init__(self):
		super().__init__()

		# Initialize web container
		self.add_module(asab.web.Module)
		self.WebService = self.get_service("asab.WebService")
		self.WebContainer = asab.web.WebContainer(self.WebService, "web")

		# Add routes
		self.WebContainer.WebApp.router.add_get(f"/saml/login", self.saml_login)
		self.WebContainer.WebApp.router.add_post(f"/saml/acs", self.saml_acs)

		self.SamlClient = self.init_saml_client()


	def init_saml_client(self):
		config_dict = {
			"entityid": "https://auth.local.loc",  # Must be registered at IdP
			"service": {
				"sp": {
					"endpoints": {
						"assertion_consumer_service": [
							("https://auth.local.loc/auth/api/saml/acs", saml2.BINDING_HTTP_POST),  # Must be registered at IdP
						],
					},
					"allow_unsolicited": True,
					"authn_requests_signed": False,
					"want_assertions_signed": False,  # insecure, use cert and key in production
					"want_response_signed": False,  # insecure, use cert and key in production
				}
			},
			"metadata": {
				"local": ["examples/ssocircle-idp-meta.xml"],  # Download from https://idp.ssocircle.com/meta-idp.xml
			},
		}
		config = saml2.config.Config()
		config.load(config_dict)
		return saml2.client.Saml2Client(config)


	async def saml_login(self, request):
		reqid, authn_request = self.SamlClient.prepare_for_authenticate(
			entityid="https://idp.ssocircle.com",
			binding=saml2.BINDING_HTTP_REDIRECT
		)

		redirect_url = dict(authn_request["headers"])["Location"]
		print(f"Redirecting user to IdP: {redirect_url}")
		return aiohttp.web.HTTPFound(redirect_url)


	async def saml_acs(self, request):
		client = self.SamlClient
		saml_response = await request.post()  # Get SAMLResponse from POST body

		if "SAMLResponse" not in saml_response:
			return aiohttp.web.Response(text="Missing SAMLResponse", status=400)

		authn_response = None
		try:
			authn_response = client.parse_authn_request_response(
				saml_response["SAMLResponse"],
				binding=saml2.BINDING_HTTP_POST
			)

			if not authn_response.status_ok():
				print("no success")
				return aiohttp.web.Response(text="Authentication failed", status=401)

			user_identity = authn_response.get_identity()
			try:
				user_name = authn_response.get_subject()
			except ValueError:
				user_name = "!Unknown!"

			print(f"User {user_name} authenticated successfully with attributes: {user_identity}")
			return aiohttp.web.Response(text=f"Welcome, {user_name}! \nAttributes: {user_identity}")

		except Exception as e:
			print(f"SAML Authentication failed: {e}")
			print(authn_response)
			raise(e)


if __name__ == "__main__":
	app = SamlApplication()
	app.run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant