A toy real-time Pub-Sub messaging system with durable queues, dynamic routing, and WebSocket support for seamless communication. ππ
-
Minimal frontend (WIP)
-
Client and server libraries in Go to interact with this stack
-
Lock-free datastructures to reduce contention and increase throughput.
βββ README.md
βββ assets
βΒ Β βββ working.png
βββ binding
βΒ Β βββ binding.go
βββ broker
βΒ Β βββ helpers.go
βΒ Β βββ main.go
βββ errors
βΒ Β βββ main.go
βββ exchange
βΒ Β βββ main.go
βββ go.mod
βββ go.sum
βββ image.png
βββ main.go
βββ messages
βΒ Β βββ message.go
βββ models
βΒ Β βββ base.go
βΒ Β βββ binding.go
βΒ Β βββ exchange.go
βΒ Β βββ message.go
βΒ Β βββ queue.go
βββ pkg
βΒ Β βββ config
βΒ Β βΒ Β βββ config.go
βΒ Β βββ database
βΒ Β βββ database.go
βββ pub-sub.db
βββ queue
βΒ Β βββ main.go
βΒ Β βββ main_test.go
βββ server
βββ binding.go
βββ controller.go
βββ exchange.go
βββ main.go
βββ publish.go
βββ queue.go
βββ routes.go
βββ subscription.go
13 directories, 30 files
- Exchange Management: Supports multiple exchange types, including
FanOut
andDirect
, to route messages effectively. - Queue Management: Durable and non-durable queues for persistent or transient message storage.
- Dynamic Bindings: Bind queues to exchanges with flexible routing keys.
- WebSocket Subscriptions: Real-time updates with WebSocket-based subscription mechanisms.
- Message Persistence: Persist messages for durable queues to ensure reliability and fault tolerance.
- Lightweight and highly concurrent architecture using Go's goroutines and sync primitives.
- Semaphore-based worker control for concurrent operations like real-time updates and database syncing.
- Optimized database interactions with GORM, including bulk inserts and conflict handling.
- Persistent state management with a relational database for exchanges, bindings, and queues.
- Preloaded relationships (
Preload("Bindings.Queues")
) to minimize query overhead. - Many-to-many relationships with GORM's advanced association handling.
- Publish-Subscribe pattern implemented with efficient routing algorithms.
- FanOut messages to all bound queues or Direct messages to specific routing keys.
- Seamless WebSocket integration to notify subscribers in real time.
- Go: A blazing-fast, statically typed language perfect for concurrent systems.
- GORM: ORM for seamless database interactions and relationship management.
- WebSocket: Enables real-time communication between broker and clients.
- PostgreSQL/MySQL: Supported as the database backend for reliable persistence.
- Logrus: Enhanced logging for debugging and system monitoring.
-
Exchange: Acts as a router for messages. Each exchange can have:
- A
FanOut
type for broadcasting messages to all bound queues. - A
Direct
type for routing messages based on a key.
- A
-
Queue: Stores messages for consumers, with options for:
- Durability to survive broker restarts.
- Non-durability for lightweight, temporary storage.
-
Binding: Connects an exchange to one or more queues with a routing key.
-
Startup:
- Loads exchanges, bindings, and queues from the database.
- Rebuilds in-memory mappings for fast runtime operations.
-
Publish:
- Routes messages based on exchange type and routing key.
- Supports concurrent publishing with Go routines.
-
Subscribe:
- Real-time updates via WebSocket connections for bound queues.
-
Checkpointing:
- Periodic database synchronization to persist messages for durable queues.
- π Scalable: Handles high throughput and concurrency with ease.
- π Reliable: Durable message storage ensures no data loss.
- 𧩠Extensible: Add new exchange types or integrate with other services effortlessly.
- π οΈ Developer-Friendly: Clean architecture and logs make debugging a breeze.
- Go (1.18+)
-
HTTP_PORT
The port on which the application listens for incoming HTTP requests.- Default:
8060
- Example:
http://localhost:8060
- Default:
-
ALLOWED_HOSTS
Specifies the hosts allowed to access the server. Use*
to allow all hosts (for development purposes).- Default:
*
- Note: Restrict this in production for security.
- Default:
DATABASE_NAME
The name of the database used by the application.- Default:
pub-sub
- Default:
DO_MIGRATIONS
Indicates whether to run database migrations automatically at application startup.- Default:
true
- Values:
true
- Run migrationsfalse
- Skip migrations
- Default:
-
CHECKPOINT_IN_SECONDS
The interval, in seconds, for checkpointing or saving the application state.- Default:
10
- Default:
-
MAX_WORKERS_ALLOWED_CONCURRENTLY_FOR_REAL_TIME_UPDATES
The maximum number of worker threads/processes allocated for handling real-time updates.- Default:
10
- Default:
WORKERS_ALLOWED_FOR_SYNC
The maximum number of worker threads/processes allocated for synchronization tasks.- Default:
20
- Default:
-
Adjust these variables as per your environment and performance requirements.
-
Ensure
ALLOWED_HOSTS
is set appropriately in production to avoid unauthorized access. -
Check
.env.example
for same env
git clone https://github.com/VarthanV/pub-sub.git
cd pub-sub
go run main.go
This project is licensed under the MIT License. Feel free to use, modify, and distribute as needed.