A high-performance, concurrent event processing system built in Go that demonstrates advanced concurrency patterns, message queue integration, and real-time event aggregation.
This project implements a scalable microservice that processes events from RabbitMQ queues using Go's powerful concurrency primitives. It showcases real-world patterns for handling distributed message processing with automatic shutdown and JSON output generation.
- Concurrent Processing: Multi-worker architecture using goroutines and channels
- Message Deduplication: Ensures each message is processed exactly once
- Type-based Routing: Events are routed to specific channels based on event type
- Automatic Shutdown: Graceful shutdown after 5 seconds of inactivity
- JSON Export: Generates structured output files per event type
- Configurable: Dynamic RabbitMQ URL and exchange configuration
The system follows a producer-consumer pattern with the following components:
RabbitMQ Queue → Message Receiver → Type-based Channels → Worker Pool → Event Counters → JSON Output
type Consumer interface {
Created(ctx context.Context, uid string) error
Updated(ctx context.Context, uid string) error
Deleted(ctx context.Context, uid string) error
}{
"id": "unique-message-identifier"
}Routing Key Pattern: <user-id>.event.<event-type>
Queue Name: eventcountertest
- Goroutines & Channels: Efficient concurrent processing
- Context Management: Proper cancellation and timeout handling
- Sync Package: Thread-safe operations and synchronization
- Message Queue Integration: RabbitMQ with AMQP protocol
- Clean Architecture: Modular design with clear separation of concerns
- Go 1.19+
- Docker & Docker Compose
- Make
-
Clone the repository
git clone https://github.com/reb-felipe/eventcounter.git cd eventcounter -
Start RabbitMQ environment
make env-up
-
Generate test data
make generator-publish
-
Run the event counter
go run cmd/consumer/main.go
The consumer accepts the following flags:
--amqp-url: RabbitMQ connection URL (default:amqp://guest:guest@localhost:5672/)--amqp-exchange: Exchange name (default:eventcountertest)
Example:
go run cmd/consumer/main.go --amqp-url="amqp://user:pass@localhost:5672/" --amqp-exchange="events"The system generates JSON files in the ./output directory, organized by event type:
output/
├── created_events.json
├── updated_events.json
└── deleted_events.json
Each file contains user event counts:
{
"user-123": 15,
"user-456": 8,
"user-789": 23
}- Start environment:
make env-up - Stop environment:
make env-down - Generate test messages:
make generator-publish
Modify the first lines of the Makefile to customize RabbitMQ port and exchange settings.
├── cmd/
│ ├── consumer/ # Main consumer application
│ └── generator/ # Test message generator
├── pkg/ # Shared packages and interfaces
├── Makefile # Development commands
└── README.md