Skip to content

A toy real-time Pub-Sub messaging system with durable queues, dynamic routing, and WebSocket support for seamless communication. πŸŒπŸš€

Notifications You must be signed in to change notification settings

VarthanV/pub-sub

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

31 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸš€ Pub-Sub : A toy Messaging System

A toy real-time Pub-Sub messaging system with durable queues, dynamic routing, and WebSocket support for seamless communication. πŸŒπŸš€

Architecture

Pipeline

  • Minimal frontend (WIP)

  • Client and server libraries in Go to interact with this stack

  • Lock-free datastructures to reduce contention and increase throughput.

⚑ Project Layout

β”œβ”€β”€ README.md
β”œβ”€β”€ assets
β”‚Β Β  └── working.png
β”œβ”€β”€ binding
β”‚Β Β  └── binding.go
β”œβ”€β”€ broker
β”‚Β Β  β”œβ”€β”€ helpers.go
β”‚Β Β  └── main.go
β”œβ”€β”€ errors
β”‚Β Β  └── main.go
β”œβ”€β”€ exchange
β”‚Β Β  └── main.go
β”œβ”€β”€ go.mod
β”œβ”€β”€ go.sum
β”œβ”€β”€ image.png
β”œβ”€β”€ main.go
β”œβ”€β”€ messages
β”‚Β Β  └── message.go
β”œβ”€β”€ models
β”‚Β Β  β”œβ”€β”€ base.go
β”‚Β Β  β”œβ”€β”€ binding.go
β”‚Β Β  β”œβ”€β”€ exchange.go
β”‚Β Β  β”œβ”€β”€ message.go
β”‚Β Β  └── queue.go
β”œβ”€β”€ pkg
β”‚Β Β  β”œβ”€β”€ config
β”‚Β Β  β”‚Β Β  └── config.go
β”‚Β Β  └── database
β”‚Β Β      └── database.go
β”œβ”€β”€ pub-sub.db
β”œβ”€β”€ queue
β”‚Β Β  β”œβ”€β”€ main.go
β”‚Β Β  └── main_test.go
└── server
    β”œβ”€β”€ binding.go
    β”œβ”€β”€ controller.go
    β”œβ”€β”€ exchange.go
    β”œβ”€β”€ main.go
    β”œβ”€β”€ publish.go
    β”œβ”€β”€ queue.go
    β”œβ”€β”€ routes.go
    └── subscription.go

13 directories, 30 files

🌟 Features

πŸ› οΈ Core Functionalities

  • Exchange Management: Supports multiple exchange types, including FanOut and Direct, to route messages effectively.
  • Queue Management: Durable and non-durable queues for persistent or transient message storage.
  • Dynamic Bindings: Bind queues to exchanges with flexible routing keys.
  • WebSocket Subscriptions: Real-time updates with WebSocket-based subscription mechanisms.
  • Message Persistence: Persist messages for durable queues to ensure reliability and fault tolerance.

⚑ Built for Performance

  • Lightweight and highly concurrent architecture using Go's goroutines and sync primitives.
  • Semaphore-based worker control for concurrent operations like real-time updates and database syncing.
  • Optimized database interactions with GORM, including bulk inserts and conflict handling.

πŸ’Ύ Database-Driven

  • Persistent state management with a relational database for exchanges, bindings, and queues.
  • Preloaded relationships (Preload("Bindings.Queues")) to minimize query overhead.
  • Many-to-many relationships with GORM's advanced association handling.

πŸ”„ Real-Time Data Flow

  • Publish-Subscribe pattern implemented with efficient routing algorithms.
  • FanOut messages to all bound queues or Direct messages to specific routing keys.
  • Seamless WebSocket integration to notify subscribers in real time.

πŸ“š Tech Stack

  • Go: A blazing-fast, statically typed language perfect for concurrent systems.
  • GORM: ORM for seamless database interactions and relationship management.
  • WebSocket: Enables real-time communication between broker and clients.
  • PostgreSQL/MySQL: Supported as the database backend for reliable persistence.
  • Logrus: Enhanced logging for debugging and system monitoring.

πŸ”§ How It Works

🎯 Core Models

  1. Exchange: Acts as a router for messages. Each exchange can have:

    • A FanOut type for broadcasting messages to all bound queues.
    • A Direct type for routing messages based on a key.
  2. Queue: Stores messages for consumers, with options for:

    • Durability to survive broker restarts.
    • Non-durability for lightweight, temporary storage.
  3. Binding: Connects an exchange to one or more queues with a routing key.

πŸŒ€ Broker Lifecycle

  1. Startup:

    • Loads exchanges, bindings, and queues from the database.
    • Rebuilds in-memory mappings for fast runtime operations.
  2. Publish:

    • Routes messages based on exchange type and routing key.
    • Supports concurrent publishing with Go routines.
  3. Subscribe:

    • Real-time updates via WebSocket connections for bound queues.
  4. Checkpointing:

    • Periodic database synchronization to persist messages for durable queues.

πŸŽ‰ Why You'll Love This

  • 🌐 Scalable: Handles high throughput and concurrency with ease.
  • πŸ”’ Reliable: Durable message storage ensures no data loss.
  • 🧩 Extensible: Add new exchange types or integrate with other services effortlessly.
  • πŸ› οΈ Developer-Friendly: Clean architecture and logs make debugging a breeze.

🏁 Getting Started

πŸ”¨ Prerequisites

  • Go (1.18+)

Environment Variables

Server Configuration

  • HTTP_PORT
    The port on which the application listens for incoming HTTP requests.

    • Default: 8060
    • Example: http://localhost:8060
  • ALLOWED_HOSTS
    Specifies the hosts allowed to access the server. Use * to allow all hosts (for development purposes).

    • Default: *
    • Note: Restrict this in production for security.

Database Configuration

  • DATABASE_NAME
    The name of the database used by the application.
    • Default: pub-sub

Migration Configuration

  • DO_MIGRATIONS
    Indicates whether to run database migrations automatically at application startup.
    • Default: true
    • Values:
      • true - Run migrations
      • false - Skip migrations

Real-Time Updates Configuration

  • CHECKPOINT_IN_SECONDS
    The interval, in seconds, for checkpointing or saving the application state.

    • Default: 10
  • MAX_WORKERS_ALLOWED_CONCURRENTLY_FOR_REAL_TIME_UPDATES
    The maximum number of worker threads/processes allocated for handling real-time updates.

    • Default: 10

Synchronization Configuration

  • WORKERS_ALLOWED_FOR_SYNC
    The maximum number of worker threads/processes allocated for synchronization tasks.
    • Default: 20

Notes

  • Adjust these variables as per your environment and performance requirements.

  • Ensure ALLOWED_HOSTS is set appropriately in production to avoid unauthorized access.

  • Check .env.example for same env

πŸš€ Run the Broker

git clone https://github.com/VarthanV/pub-sub.git
cd pub-sub
go run main.go

Screenshots

alt text

πŸ“„ License

This project is licensed under the MIT License. Feel free to use, modify, and distribute as needed.

About

A toy real-time Pub-Sub messaging system with durable queues, dynamic routing, and WebSocket support for seamless communication. πŸŒπŸš€

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages