Skip to content

Rohith-S636/Distributed-Job-Queue-System

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

11 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

⚑ Distributed Job Queue System

A modular, distributed task orchestration system built from scratch using Python Sockets. Features a multi-threaded TLS-secured TCP broker, priority-based job scheduling, fault-tolerant worker nodes, a REST API, and a live web dashboard.

GitHub

Team-

  • Kurumeti Mitesh-PES2UG24AM080
  • Mohammed Tahir Siddique-PES2UG24AM092
  • Rohith G S-PES2UG24AM138

πŸ“– 1. The Importance of This Project

In modern software engineering, we rarely run "heavy" tasks (like video encoding, AI processing, or bulk emailing) directly on a web server because it would freeze the user interface.

This project implements a Distributed Task Queue, which is the backbone of companies like Netflix, Amazon, and Google. By building this from scratch using Sockets, you demonstrate:

  • System Architecture: Understanding the Producer-Consumer pattern.
  • Networking Fundamentals: Designing a custom application-layer protocol over TLS.
  • Security: Mandatory SSL/TLS encryption for all control and data exchanges.
  • Concurrency: Managing multiple simultaneous connections without data corruption.
  • Reliability: Solving the problem of "What happens if a worker crashes mid-task?"
  • API Design: Exposing backend state via a REST API.
  • Frontend Integration: Visualizing a live backend system in a web dashboard.
  • Performance Evaluation: Measuring throughput and latency under load.

πŸ“‚ 2. Detailed File Structure

Distributed-Job-Queue-System/
β”œβ”€β”€ certs/
β”‚   β”œβ”€β”€ gen_certs.py         # Script to generate self-signed TLS certificate
β”‚   β”œβ”€β”€ server.crt           # TLS certificate (generated β€” not committed to git)
β”‚   └── server.key           # TLS private key  (generated β€” not committed to git)
β”œβ”€β”€ docs/
β”‚   └── protocol_v1.md       # Full protocol spec: TLS, message types, lifecycle
β”œβ”€β”€ server/
β”‚   β”œβ”€β”€ main.py              # Entry point: TLS socket listener + starts REST API
β”‚   β”œβ”€β”€ queue_manager.py     # Priority Queue with Thread Locking & tie-breaker
β”‚   β”œβ”€β”€ worker_registry.py   # Tracks health and status of connected workers
β”‚   └── api.py               # REST API (port 8080) β€” dashboard + JSON endpoints
β”œβ”€β”€ worker/
β”‚   β”œβ”€β”€ worker.py            # Execution node: TLS connect, poll, execute, return
β”‚   └── tasks.py             # Task library with dynamic registration support
β”œβ”€β”€ client/
β”‚   └── submit_job.py        # Producer script: submits jobs over TLS
β”œβ”€β”€ dashboard/
β”‚   └── index.html           # Live web dashboard (auto-refreshes every 2 seconds)
β”œβ”€β”€ tests/
β”‚   └── stress_test.py       # 100 concurrent TLS jobs + latency/throughput report
β”œβ”€β”€ README.md                # You are here
└── requirements.txt         # No external dependencies (Python 3 built-ins only)

πŸ” 3. SSL/TLS Security

All TCP communication on port 65432 is mandatory TLS-encrypted. Plain (non-TLS) connections are rejected with an ssl.SSLError.

Generate Certificates (run once)

Requires OpenSSL installed on your system.

python certs/gen_certs.py

This generates:

  • certs/server.crt β€” self-signed X.509 certificate (trusted by workers and clients)
  • certs/server.key β€” RSA 2048-bit private key (server only)

How TLS is Applied

Component Role Context Type
server/main.py Wraps server socket PROTOCOL_TLS_SERVER + cert + key
worker/worker.py Connects securely PROTOCOL_TLS_CLIENT + loads server.crt
client/submit_job.py Connects securely PROTOCOL_TLS_CLIENT + loads server.crt
tests/stress_test.py 100 TLS connections PROTOCOL_TLS_CLIENT + loads server.crt

Non-TLS Connection Handling

If a plain TCP client connects without TLS, the TLS handshake fails and the server prints:

[Server] SSLError β€” rejected non-TLS connection: ...

The server does not crash β€” it catches the ssl.SSLError and continues accepting new connections.


πŸ›  4. Setup

Python 3 only. No pip installs required β€” uses built-in socket, ssl, threading, json, heapq, uuid, http.server.

  1. Clone the repo and ensure all files are in their respective folders.
  2. Generate TLS certificates: python certs/gen_certs.py
  3. The QueueManager uses a (priority, counter, job) heap tuple to prevent TypeError when two jobs share the same priority.

βš™οΈ 5. How It Works

Communication Flow:

  1. TLS Handshake: Every connection starts with a TCP 3-way handshake followed by a TLS handshake. The server presents server.crt; clients verify it.
  2. Ingestion: A Client connects over TLS, sends a JSON-encoded job with a priority level, receives an ACK, and disconnects.
  3. Scheduling: The Server places the job in a heapq Priority Queue. Priority 1 jobs always move to the front.
  4. Distribution: Workers poll with GET_JOB. The Server pops the highest priority job and dispatches it.
  5. Execution: The Worker runs the task from tasks.py and returns a RESULT packet.
  6. Fault Tolerance: If a worker disconnects mid-task, the server detects it, sets the job status to REQUEUED, and pushes it back to the priority queue.
  7. Dashboard: The REST API (port 8080) exposes live stats, worker status, security info, and job log.

πŸ—‚ 6. Available Tasks

Task Payload Description
reverse_string {"data": "hello"} Reverses a string
to_uppercase {"data": "hello"} Converts string to uppercase
fibonacci {"n": 10} Returns the nth Fibonacci number
is_prime {"n": 97} Checks if a number is prime
slow_task {"seconds": 3} Sleeps N seconds (simulates heavy work)

Dynamic Task Registration

from tasks import register_task, unregister_task, list_tasks

register_task("double_number", lambda p: int(p["n"]) * 2)
unregister_task("slow_task")
print(list_tasks())

Tasks registered on one worker are not automatically available on others β€” each worker has its own TASK_MAP in memory.


🌐 7. REST API Endpoints

The API runs on port 8080 (plain HTTP, local only).

Method Endpoint Description
GET / Serves the live dashboard
GET /api/stats Queue size, worker counts, job totals, requeued count
GET /api/workers Connected workers, status, current job
GET /api/jobs Full job log (newest first)
GET /api/security TLS status, protocol, cert file, port
POST /api/submit Submit a job via HTTP

POST /api/submit body:

{ "task": "fibonacci", "payload": {"n": 10}, "priority": 2 }

πŸ“Š 8. Dashboard Features

Open http://localhost:8080 in your browser.

  • 8 stat cards β€” queue size, workers, idle, busy, completed, failed, requeued, total
  • TLS badge in header β€” shows πŸ”’ TLS: ON / OFF based on /api/security
  • Security panel β€” protocol, port, cert status, non-TLS rejection policy
  • Worker panel β€” live worker list with idle/busy status and current job
  • Job submit form β€” submit any task directly from the browser
  • Status breakdown chart β€” bar chart of queued/completed/failed/requeued
  • Task distribution chart β€” bar chart of jobs by task type
  • Job log table β€” full history with job ID, task, priority badge, status badge, worker, result, timestamp

🚦 9. Running the Demo

Step 1 β€” Generate certs (once only):
  python certs/gen_certs.py

Step 2 β€” Start server:
  python server/main.py

Step 3 β€” Start workers (two terminals):
  python worker/worker.py

Step 4 β€” Submit jobs:
  python client/submit_job.py

Dashboard:
  http://localhost:8080

What to watch for:

  • Dashboard header shows πŸ”’ TLS: ON
  • Workers appear live as green (idle), turn yellow (busy) when executing
  • submit_job.py submits slow_task (priority 5) first, then reverse_string (priority 1) β€” the high-priority job completes first
  • Try submitting jobs from the dashboard form and watch the job log and charts update in real time

πŸ§ͺ 10. Stress Testing & Performance

python tests/stress_test.py

Submits 100 concurrent jobs over TLS across all task types with random priorities. Output:

=============================================
  Total Jobs   : 100
  Success      : 100
  Failed       : 0
  Total Time   : 1.83s
  Avg Latency  : 18.45 ms/job
  Throughput   : 54.64 jobs/sec
=============================================
  • Avg Latency β€” measured per job using time.perf_counter() (high resolution timer)
  • Throughput β€” successful jobs per second over total wall-clock time

Team-

  • Kurumeti Mitesh
  • Tahir
  • Rohith

About

A modular, distributed task orchestration system built from scratch using Python Sockets. Features a multi-threaded TCP broker, priority-based job scheduling, and fault-tolerant worker nodes.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors