-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.c
More file actions
269 lines (212 loc) · 8.44 KB
/
worker.c
File metadata and controls
269 lines (212 loc) · 8.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/*
* SMT-Aware Multi-Process Job Scheduling System
* Worker Process with Thread Pool
*
* Workers receive jobs from the scheduler and execute them using
* an internal thread pool. Threads are pinned to logical CPU cores
* to demonstrate SMT effects.
*/
#define _GNU_SOURCE
#include "common.h"
static volatile sig_atomic_t running = 1;
static thread_pool_t pool;
void signal_handler(int sig) {
(void)sig;
running = 0;
}
/* Pin thread to specific logical CPU core */
int pin_thread_to_core(int core_id) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_id, &cpuset);
int result = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
if (result != 0) {
LOG_ERROR("Failed to pin thread to core %d", core_id);
}
return result;
}
/* Execute a job (simulate work) */
void execute_job(job_t *job, int thread_id) {
clock_gettime(CLOCK_REALTIME, &job->start_time);
LOG_THREAD(pool.worker_id, thread_id,
"Executing %s (complexity: %dms)",
job->job_name, job->complexity);
/* Simulate CPU-intensive work with SMT-visible patterns */
volatile long sum = 0;
long iterations = job->complexity * 100000L;
for (long i = 0; i < iterations; i++) {
sum += i * i; /* CPU-bound work */
/* Demonstrate SMT contention: tight loops compete for
* execution units on shared physical cores */
if (i % 10000000 == 0) {
sched_yield(); /* Allow other SMT sibling to run */
}
}
clock_gettime(CLOCK_REALTIME, &job->end_time);
double latency = time_diff_ms(&job->submit_time, &job->end_time);
double exec_time = time_diff_ms(&job->start_time, &job->end_time);
LOG_THREAD(pool.worker_id, thread_id,
"Completed %s - Exec: %.1fms, Total Latency: %.1fms",
job->job_name, exec_time, latency);
}
/* Worker thread function */
void *worker_thread(void *arg) {
long thread_id = (long)arg;
/* Calculate which logical core to pin to based on worker and thread ID
* This demonstrates SMT behavior:
* - Cores 0,1 share physical core 0
* - Cores 2,3 share physical core 1
* etc. */
int logical_core = (pool.worker_id * THREADS_PER_WORKER + thread_id) % sysconf(_SC_NPROCESSORS_ONLN);
pin_thread_to_core(logical_core);
LOG_THREAD(pool.worker_id, thread_id,
"Started on logical core %d (physical: %d)",
logical_core, logical_core / 2);
while (!pool.shutdown) {
job_queue_entry_t *entry = NULL;
/* Wait for job */
pthread_mutex_lock(&pool.queue_lock);
while (pool.queue_head == NULL && !pool.shutdown) {
pthread_cond_wait(&pool.queue_cond, &pool.queue_lock);
}
if (pool.shutdown) {
pthread_mutex_unlock(&pool.queue_lock);
break;
}
/* Dequeue job */
entry = pool.queue_head;
pool.queue_head = entry->next;
if (pool.queue_head == NULL) {
pool.queue_tail = NULL;
}
pool.queue_size--;
pthread_mutex_unlock(&pool.queue_lock);
/* Update worker status */
pthread_mutex_lock(&pool.shared_mem->workers[pool.worker_id].lock);
pool.shared_mem->workers[pool.worker_id].active_threads++;
pthread_mutex_unlock(&pool.shared_mem->workers[pool.worker_id].lock);
/* Execute job */
execute_job(&entry->job, thread_id);
/* Update status and notify completion */
pthread_mutex_lock(&pool.shared_mem->workers[pool.worker_id].lock);
pool.shared_mem->workers[pool.worker_id].active_threads--;
pool.shared_mem->workers[pool.worker_id].current_load--;
pool.shared_mem->workers[pool.worker_id].jobs_completed++;
pthread_mutex_unlock(&pool.shared_mem->workers[pool.worker_id].lock);
/* Send completion message */
job_message_t complete_msg;
complete_msg.msg_type = MSG_TYPE_COMPLETE;
complete_msg.target_worker = pool.worker_id;
complete_msg.job = entry->job;
msgsnd(pool.msg_queue_id, &complete_msg,
sizeof(job_message_t) - sizeof(long), IPC_NOWAIT);
free(entry);
}
LOG_THREAD(pool.worker_id, thread_id, "Thread shutting down");
return NULL;
}
/* Add job to thread pool queue */
void pool_add_job(job_t job) {
job_queue_entry_t *entry = malloc(sizeof(job_queue_entry_t));
entry->job = job;
entry->next = NULL;
pthread_mutex_lock(&pool.queue_lock);
if (pool.queue_tail == NULL) {
pool.queue_head = pool.queue_tail = entry;
} else {
pool.queue_tail->next = entry;
pool.queue_tail = entry;
}
pool.queue_size++;
pthread_cond_signal(&pool.queue_cond);
pthread_mutex_unlock(&pool.queue_lock);
}
/* Initialize thread pool */
void pool_init(int worker_id, int msg_queue_id, shared_state_t *shm) {
pool.worker_id = worker_id;
pool.msg_queue_id = msg_queue_id;
pool.shared_mem = shm;
pool.shutdown = 0;
pool.queue_head = NULL;
pool.queue_tail = NULL;
pool.queue_size = 0;
pthread_mutex_init(&pool.queue_lock, NULL);
pthread_cond_init(&pool.queue_cond, NULL);
/* Create worker threads */
for (long i = 0; i < THREADS_PER_WORKER; i++) {
pthread_create(&pool.threads[i], NULL, worker_thread, (void *)i);
}
LOG_WORKER(worker_id, "Thread pool initialized with %d threads",
THREADS_PER_WORKER);
}
/* Shutdown thread pool */
void pool_shutdown(void) {
pool.shutdown = 1;
/* Wake all threads */
pthread_mutex_lock(&pool.queue_lock);
pthread_cond_broadcast(&pool.queue_cond);
pthread_mutex_unlock(&pool.queue_lock);
/* Wait for threads to finish */
for (int i = 0; i < THREADS_PER_WORKER; i++) {
pthread_join(pool.threads[i], NULL);
}
pthread_mutex_destroy(&pool.queue_lock);
pthread_cond_destroy(&pool.queue_cond);
LOG_WORKER(pool.worker_id, "Thread pool shut down");
}
int main(int argc, char *argv[]) {
int worker_id = 0;
int msg_queue_id;
int shm_id;
shared_state_t *shared_mem;
/* Parse worker ID */
if (argc >= 2) {
worker_id = atoi(argv[1]);
}
/* Setup signal handler */
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
/* Connect to message queue */
msg_queue_id = msgget(MSG_QUEUE_KEY, 0666);
if (msg_queue_id == -1) {
LOG_ERROR("Worker %d: Failed to connect to message queue", worker_id);
exit(EXIT_FAILURE);
}
/* Attach to shared memory */
shm_id = shmget(SHM_KEY, sizeof(shared_state_t), 0666);
if (shm_id == -1) {
LOG_ERROR("Worker %d: Failed to get shared memory", worker_id);
exit(EXIT_FAILURE);
}
shared_mem = shmat(shm_id, NULL, 0);
if (shared_mem == (void *)-1) {
LOG_ERROR("Worker %d: Failed to attach shared memory", worker_id);
exit(EXIT_FAILURE);
}
LOG_WORKER(worker_id, "Started - Connected to IPC");
/* Initialize thread pool */
pool_init(worker_id, msg_queue_id, shared_mem);
/* Main receive loop */
while (running && !shared_mem->shutdown_flag) {
job_message_t msg;
/* Receive jobs dispatched to this worker */
ssize_t ret = msgrcv(msg_queue_id, &msg, sizeof(job_message_t) - sizeof(long),
MSG_TYPE_DISPATCH + worker_id, 0);
if (ret > 0) {
if (msg.job.job_id == -1) {
/* Shutdown signal */
LOG_WORKER(worker_id, "Received shutdown signal");
break;
}
LOG_WORKER(worker_id, "Received job: %s", msg.job.job_name);
pool_add_job(msg.job);
}
}
LOG_WORKER(worker_id, "Shutting down...");
pool_shutdown();
/* Save stats before detaching shared memory */
int jobs_completed = shared_mem->workers[worker_id].jobs_completed;
shmdt(shared_mem);
LOG_WORKER(worker_id, "Worker terminated - Completed %d jobs", jobs_completed);
return EXIT_SUCCESS;
}