Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * @a Niels Nes, Peter Boncz
15 : * @+ Threads
16 : * This file contains a wrapper layer for threading, hence the
17 : * underscore convention MT_x (Multi-Threading). As all platforms
18 : * that MonetDB runs on now support POSIX Threads (pthreads), this
19 : * wrapping layer has become rather thin.
20 : *
21 : * In the late 1990s when multi-threading support was introduced in
22 : * MonetDB, pthreads was just emerging as a standard API and not
23 : * widely adopted yet. The earliest MT implementation focused on SGI
24 : * Unix and provided multi- threading using multiple processes, and
25 : * shared memory.
26 : *
27 : * One of the relics of this model, namely the need to pre-allocate
28 : * locks and semaphores, and consequently a maximum number of them,
29 : * has been removed in the latest iteration of this layer.
30 : *
31 : */
32 : /*
33 : * @- Mthreads Routine implementations
34 : */
35 : #include "monetdb_config.h"
36 : #include "mstring.h"
37 : #include "gdk.h"
38 : #include "gdk_system_private.h"
39 :
40 : #include <time.h>
41 :
42 : #ifdef HAVE_FTIME
43 : #include <sys/timeb.h> /* ftime */
44 : #endif
45 : #ifdef HAVE_SYS_TIME_H
46 : #include <sys/time.h> /* gettimeofday */
47 : #endif
48 :
49 : #include <signal.h>
50 : #include <string.h> /* for strerror */
51 : #include <unistd.h> /* for sysconf symbols */
52 :
53 : #include "mutils.h"
54 :
55 : static ATOMIC_TYPE GDKthreadid = ATOMIC_VAR_INIT(1);
56 :
57 : #ifdef LOCK_STATS
58 :
59 : ATOMIC_TYPE GDKlockcnt = ATOMIC_VAR_INIT(0);
60 : ATOMIC_TYPE GDKlockcontentioncnt = ATOMIC_VAR_INIT(0);
61 : ATOMIC_TYPE GDKlocksleepcnt = ATOMIC_VAR_INIT(0);
62 : MT_Lock *volatile GDKlocklist = 0;
63 : ATOMIC_FLAG GDKlocklistlock = ATOMIC_FLAG_INIT;
64 :
65 : /* merge sort of linked list */
66 : static MT_Lock *
67 : sortlocklist(MT_Lock *l)
68 : {
69 : MT_Lock *r, *t, *ll = NULL;
70 :
71 : if (l == NULL || l->next == NULL) {
72 : /* list is trivially sorted (0 or 1 element) */
73 : return l;
74 : }
75 : /* break list into two (almost) equal pieces:
76 : * l is start of "left" list, r of "right" list, ll last
77 : * element of "left" list */
78 : for (t = r = l; t && t->next; t = t->next->next) {
79 : ll = r;
80 : r = r->next;
81 : }
82 : ll->next = NULL; /* break list into two */
83 : r->prev = NULL;
84 : /* recursively sort both sublists */
85 : l = sortlocklist(l);
86 : r = sortlocklist(r);
87 : /* merge
88 : * t is new list, ll is last element of new list, l and r are
89 : * start of unprocessed part of left and right lists */
90 : t = ll = NULL;
91 : while (l && r) {
92 : if (ATOMIC_GET(&l->sleep) < ATOMIC_GET(&r->sleep) ||
93 : (ATOMIC_GET(&l->sleep) == ATOMIC_GET(&r->sleep) &&
94 : (ATOMIC_GET(&l->contention) < ATOMIC_GET(&r->contention) ||
95 : (ATOMIC_GET(&l->contention) == ATOMIC_GET(&r->contention) &&
96 : l->count <= r->count)))) {
97 : /* l is smaller */
98 : if (ll == NULL) {
99 : assert(t == NULL);
100 : t = ll = l;
101 : } else {
102 : ll->next = l;
103 : l->prev = ll;
104 : ll = ll->next;
105 : }
106 : l = l->next;
107 : } else {
108 : /* r is smaller */
109 : if (ll == NULL) {
110 : assert(t == NULL);
111 : t = ll = r;
112 : } else {
113 : ll->next = r;
114 : r->prev = ll;
115 : ll = ll->next;
116 : }
117 : r = r->next;
118 : }
119 : }
120 : /* append rest of remaining list */
121 : if (l) {
122 : ll->next = l;
123 : l->prev = ll;
124 : } else {
125 : ll->next = r;
126 : r->prev = ll;
127 : }
128 : return t;
129 : }
130 :
131 : static inline bool
132 : lock_isset(MT_Lock *l)
133 : {
134 : if (MT_lock_try(l)) {
135 : MT_lock_unset(l);
136 : return false;
137 : }
138 : return true;
139 : }
140 :
141 : /* function used for debugging */
142 : void
143 : GDKlockstatistics(int what)
144 : {
145 : MT_Lock *l;
146 : int n = 0;
147 :
148 : printf("Locks:\n");
149 : if (ATOMIC_TAS(&GDKlocklistlock) != 0) {
150 : printf("GDKlocklistlock is set, so cannot access lock list\n");
151 : return;
152 : }
153 : if (what == -1) {
154 : for (l = GDKlocklist; l; l = l->next) {
155 : l->count = 0;
156 : ATOMIC_SET(&l->contention, 0);
157 : ATOMIC_SET(&l->sleep, 0);
158 : }
159 : ATOMIC_CLEAR(&GDKlocklistlock);
160 : return;
161 : }
162 : GDKlocklist = sortlocklist(GDKlocklist);
163 : printf("%-18s\t%s\t%s\t%s\t%s\t%s\t%s\n",
164 : "lock name", "count", "content", "sleep",
165 : "locked", "locker", "thread");
166 : for (l = GDKlocklist; l; l = l->next) {
167 : n++;
168 : if (what == 0 ||
169 : (what == 1 && l->count) ||
170 : (what == 2 && ATOMIC_GET(&l->contention)) ||
171 : (what == 3 && lock_isset(l)))
172 : printf("%-18s\t%zu\t%zu\t%zu\t%s\t%s\t%s\n",
173 : l->name, l->count,
174 : (size_t) ATOMIC_GET(&l->contention),
175 : (size_t) ATOMIC_GET(&l->sleep),
176 : lock_isset(l) ? "locked" : "",
177 : l->locker ? l->locker : "",
178 : l->thread ? l->thread : "");
179 : }
180 : printf("Number of locks: %d\n", n);
181 : printf("Total lock count: %zu\n", (size_t) ATOMIC_GET(&GDKlockcnt));
182 : printf("Lock contention: %zu\n", (size_t) ATOMIC_GET(&GDKlockcontentioncnt));
183 : printf("Lock sleep count: %zu\n", (size_t) ATOMIC_GET(&GDKlocksleepcnt));
184 : fflush(stdout);
185 : ATOMIC_CLEAR(&GDKlocklistlock);
186 : }
187 :
188 : #endif /* LOCK_STATS */
189 :
190 : struct thread_funcs {
191 : void (*init)(void *);
192 : void (*destroy)(void *);
193 : void *data;
194 : };
195 :
196 : struct mtthread {
197 : struct mtthread *next;
198 : void (*func) (void *); /* function to be called */
199 : void *data; /* and its data */
200 : struct thread_funcs *thread_funcs; /* callback funcs */
201 : int nthread_funcs;
202 : MT_Lock *lockwait; /* lock we're waiting for */
203 : MT_Sema *semawait; /* semaphore we're waiting for */
204 : MT_Cond *condwait; /* condition variable we're waiting for */
205 : #ifdef LOCK_OWNER
206 : MT_Lock *mylocks; /* locks we're holding */
207 : #endif
208 : struct mtthread *joinwait; /* process we are joining with */
209 : const char *working; /* what we're currently doing */
210 : char algorithm[512]; /* the algorithm used in the last operation */
211 : size_t algolen; /* length of string in .algorithm */
212 : ATOMIC_TYPE exited;
213 : bool detached:1, waiting:1;
214 : unsigned int refs:20;
215 : bool limit_override; /* not in bit field because of data races */
216 : char threadname[MT_NAME_LEN];
217 : QryCtx *qry_ctx;
218 : #ifdef HAVE_PTHREAD_H
219 : pthread_t hdl;
220 : #else
221 : HANDLE hdl;
222 : DWORD wtid;
223 : #endif
224 : #ifdef HAVE_GETTID
225 : pid_t lwptid;
226 : #endif
227 : MT_Id tid;
228 : uintptr_t sp;
229 : char *errbuf;
230 : struct freebats freebats;
231 : };
232 : static struct mtthread mainthread = {
233 : .threadname = "main thread",
234 : .exited = ATOMIC_VAR_INIT(0),
235 : .refs = 1,
236 : .tid = 1,
237 : };
238 : static struct mtthread *mtthreads = &mainthread;
239 :
240 : #ifdef HAVE_PTHREAD_H
241 : static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
242 : static pthread_key_t threadkey;
243 : #define thread_lock() pthread_mutex_lock(&posthread_lock)
244 : #define thread_lock_try() (pthread_mutex_trylock(&posthread_lock) == 0)
245 : #define thread_unlock() pthread_mutex_unlock(&posthread_lock)
246 : #define thread_self() pthread_getspecific(threadkey)
247 : #define thread_setself(self) pthread_setspecific(threadkey, self)
248 : #else
249 : static CRITICAL_SECTION winthread_cs;
250 : static DWORD threadkey = TLS_OUT_OF_INDEXES;
251 : #define thread_lock() EnterCriticalSection(&winthread_cs)
252 : #define thread_lock_try() (TryEnterCriticalSection(&winthread_cs) != 0)
253 : #define thread_unlock() LeaveCriticalSection(&winthread_cs)
254 : #define thread_self() TlsGetValue(threadkey)
255 : #define thread_setself(self) TlsSetValue(threadkey, self)
256 : #endif
257 : static bool thread_initialized = false;
258 :
259 : #if defined(_MSC_VER) && _MSC_VER >= 1900
260 : #pragma warning(disable : 4172)
261 : #endif
262 : static inline uintptr_t
263 95818863 : THRsp(void)
264 : {
265 : #ifdef __has_builtin
266 : #if __has_builtin(__builtin_frame_address)
267 191637732 : return (uintptr_t) __builtin_frame_address(0);
268 : #define BUILTIN_USED
269 : #endif
270 : #endif
271 : #ifndef BUILTIN_USED
272 : int l = 0;
273 : uintptr_t sp = (uintptr_t) (&l);
274 :
275 : return sp;
276 : #endif
277 : #undef BUILTIN_USED
278 : }
279 :
280 : bool
281 95771742 : THRhighwater(void)
282 : {
283 95771742 : struct mtthread *s = thread_self();
284 95771744 : if (s != NULL && s->sp != 0) {
285 95771750 : uintptr_t c = THRsp();
286 95771752 : size_t diff = c < s->sp ? s->sp - c : c - s->sp;
287 95771752 : if (diff > THREAD_STACK_SIZE - 80 * 1024)
288 : return true;
289 : }
290 : return false;
291 : }
292 :
293 : void
294 116 : dump_threads(void)
295 : {
296 116 : char buf[1024];
297 : #if defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) && defined(HAVE_CLOCK_GETTIME)
298 116 : struct timespec ts;
299 116 : clock_gettime(CLOCK_REALTIME, &ts);
300 116 : ts.tv_sec++; /* give it a second */
301 116 : if (pthread_mutex_timedlock(&posthread_lock, &ts) != 0) {
302 0 : printf("Threads are currently locked, so no thread information\n");
303 0 : return;
304 : }
305 : #else
306 : if (!thread_lock_try()) {
307 : MT_sleep_ms(1000);
308 : if (!thread_lock_try()) {
309 : printf("Threads are currently locked, so no thread information\n");
310 : return;
311 : }
312 : }
313 : #endif
314 116 : if (!GDK_TRACER_TEST(M_DEBUG, THRD))
315 116 : printf("Threads:\n");
316 1868 : for (struct mtthread *t = mtthreads; t; t = t->next) {
317 1752 : MT_Lock *lk = t->lockwait;
318 1752 : MT_Sema *sm = t->semawait;
319 1752 : MT_Cond *cn = t->condwait;
320 1752 : struct mtthread *jn = t->joinwait;
321 1752 : int pos = snprintf(buf, sizeof(buf),
322 : "%s, tid %zu, "
323 : #ifdef HAVE_PTHREAD_H
324 : "Thread 0x%lx, "
325 : #endif
326 : #ifdef HAVE_GETTID
327 : "LWP %ld, "
328 : #endif
329 : "%"PRIu32" free bats, waiting for %s%s, working on %.200s",
330 1752 : t->threadname,
331 : t->tid,
332 : #ifdef HAVE_PTHREAD_H
333 1752 : (long) t->hdl,
334 : #endif
335 : #ifdef HAVE_GETTID
336 1752 : (long) t->lwptid,
337 : #endif
338 : t->freebats.nfreebats,
339 1752 : lk ? "lock " : sm ? "semaphore " : cn ? "condvar " : jn ? "thread " : "",
340 1752 : lk ? lk->name : sm ? sm->name : cn ? cn->name : jn ? jn->threadname : "nothing",
341 1752 : ATOMIC_GET(&t->exited) ? "exiting" :
342 1636 : t->working ? t->working : "nothing");
343 : #ifdef LOCK_OWNER
344 1752 : const char *sep = ", locked: ";
345 1756 : for (MT_Lock *l = t->mylocks; l && pos < (int) sizeof(buf); l = l->nxt) {
346 4 : pos += snprintf(buf + pos, sizeof(buf) - pos,
347 4 : "%s%s(%s)", sep, l->name, l->locker);
348 4 : sep = ", ";
349 : }
350 : #endif
351 1752 : TRC_DEBUG_IF(THRD)
352 0 : TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
353 : else
354 3504 : printf("%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
355 : }
356 116 : thread_unlock();
357 : }
358 :
359 : static void
360 47137 : rm_mtthread(struct mtthread *t)
361 : {
362 47137 : struct mtthread **pt;
363 :
364 47137 : assert(t != &mainthread);
365 47137 : thread_lock();
366 848791 : for (pt = &mtthreads; *pt && *pt != t; pt = &(*pt)->next)
367 : ;
368 47137 : if (*pt)
369 47137 : *pt = t->next;
370 47137 : free(t);
371 47137 : thread_unlock();
372 47137 : }
373 :
374 : bool
375 323 : MT_thread_init(void)
376 : {
377 323 : if (thread_initialized)
378 : return true;
379 : #ifdef HAVE_GETTID
380 323 : mainthread.lwptid = gettid();
381 : #endif
382 : #ifdef HAVE_PTHREAD_H
383 323 : int ret;
384 :
385 323 : if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
386 0 : GDKsyserr(ret, "Creating specific key for thread failed");
387 0 : return false;
388 : }
389 323 : mainthread.hdl = pthread_self();
390 323 : if ((ret = thread_setself(&mainthread)) != 0) {
391 0 : GDKsyserr(ret, "Setting specific value failed");
392 0 : return false;
393 : }
394 : #else
395 : threadkey = TlsAlloc();
396 : if (threadkey == TLS_OUT_OF_INDEXES) {
397 : GDKwinerror("Creating thread-local slot for thread failed");
398 : return false;
399 : }
400 : mainthread.wtid = GetCurrentThreadId();
401 : if (thread_setself(&mainthread) == 0) {
402 : GDKwinerror("Setting thread-local value failed");
403 : TlsFree(threadkey);
404 : threadkey = TLS_OUT_OF_INDEXES;
405 : return false;
406 : }
407 : InitializeCriticalSection(&winthread_cs);
408 : #endif
409 323 : thread_initialized = true;
410 323 : return true;
411 : }
412 : bool
413 0 : MT_thread_register(void)
414 : {
415 0 : assert(thread_initialized);
416 0 : if (!thread_initialized)
417 : return false;
418 :
419 0 : struct mtthread *self;
420 :
421 0 : if ((self = thread_self()) != NULL) {
422 0 : if (self->refs == 1000000) {
423 : /* there are limits... */
424 : return false;
425 : }
426 0 : self->refs++;
427 0 : return true;
428 : }
429 :
430 0 : self = malloc(sizeof(*self));
431 0 : if (self == NULL)
432 : return false;
433 :
434 0 : *self = (struct mtthread) {
435 : .detached = false,
436 : #ifdef HAVE_PTHREAD_H
437 0 : .hdl = pthread_self(),
438 : #else
439 : .wtid = GetCurrentThreadId(),
440 : #endif
441 : .refs = 1,
442 0 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
443 : .exited = ATOMIC_VAR_INIT(0),
444 : };
445 0 : snprintf(self->threadname, sizeof(self->threadname), "foreign %zu", self->tid);
446 0 : thread_setself(self);
447 0 : thread_lock();
448 0 : self->next = mtthreads;
449 0 : mtthreads = self;
450 0 : thread_unlock();
451 0 : return true;
452 : }
453 :
454 : void
455 0 : MT_thread_deregister(void)
456 : {
457 0 : struct mtthread *self;
458 :
459 0 : if ((self = thread_self()) == NULL)
460 : return;
461 :
462 0 : if (--self->refs == 0) {
463 0 : rm_mtthread(self);
464 0 : thread_setself(NULL);
465 : }
466 : }
467 :
468 : static struct mtthread *
469 7529 : find_mtthread(MT_Id tid)
470 : {
471 7529 : struct mtthread *t;
472 :
473 7529 : thread_lock();
474 30332 : for (t = mtthreads; t && t->tid != tid; t = t->next)
475 : ;
476 7529 : thread_unlock();
477 7529 : return t;
478 : }
479 :
480 : gdk_return
481 324 : MT_alloc_tls(MT_TLS_t *newkey)
482 : {
483 : #ifdef HAVE_PTHREAD_H
484 324 : int ret;
485 324 : if ((ret = pthread_key_create(newkey, NULL)) != 0) {
486 0 : GDKsyserr(ret, "Creating TLS key for thread failed");
487 0 : return GDK_FAIL;
488 : }
489 : #else
490 : if ((*newkey = TlsAlloc()) == TLS_OUT_OF_INDEXES) {
491 : GDKwinerror("Creating TLS key for thread failed");
492 : return GDK_FAIL;
493 : }
494 : #endif
495 : return GDK_SUCCEED;
496 : }
497 :
498 : void
499 46755 : MT_tls_set(MT_TLS_t key, void *val)
500 : {
501 : #ifdef HAVE_PTHREAD_H
502 46755 : pthread_setspecific(key, val);
503 : #else
504 : assert(key != TLS_OUT_OF_INDEXES);
505 : TlsSetValue(key, val);
506 : #endif
507 46740 : }
508 :
509 : void *
510 82311 : MT_tls_get(MT_TLS_t key)
511 : {
512 : #ifdef HAVE_PTHREAD_H
513 82311 : return pthread_getspecific(key);
514 : #else
515 : assert(key != TLS_OUT_OF_INDEXES);
516 : return TlsGetValue(key);
517 : #endif
518 : }
519 :
520 : const char *
521 1311363782 : MT_thread_getname(void)
522 : {
523 1311363782 : struct mtthread *self;
524 :
525 1311363782 : if (!thread_initialized)
526 : return mainthread.threadname;
527 1315108348 : self = thread_self();
528 1321025534 : return self ? self->threadname : UNKNOWN_THREAD;
529 : }
530 :
531 : void
532 89677 : GDKsetbuf(char *errbuf)
533 : {
534 89677 : struct mtthread *self;
535 :
536 89677 : self = thread_self();
537 89699 : if (self == NULL)
538 0 : self = &mainthread;
539 89699 : assert(errbuf == NULL || self->errbuf == NULL);
540 89699 : self->errbuf = errbuf;
541 44846 : if (errbuf)
542 44853 : *errbuf = 0; /* start clean */
543 89699 : }
544 :
545 : char *
546 22729724 : GDKgetbuf(void)
547 : {
548 22729724 : struct mtthread *self;
549 :
550 22729724 : self = thread_self();
551 22755075 : if (self == NULL)
552 0 : self = &mainthread;
553 22755075 : return self->errbuf;
554 : }
555 :
556 : struct freebats *
557 44430131 : MT_thread_getfreebats(void)
558 : {
559 44430131 : struct mtthread *self;
560 :
561 44430131 : self = thread_self();
562 44443962 : if (self == NULL)
563 0 : self = &mainthread;
564 44443962 : return &self->freebats;
565 : }
566 :
567 : void
568 0 : MT_thread_setdata(void *data)
569 : {
570 0 : if (!thread_initialized)
571 : return;
572 0 : struct mtthread *self = thread_self();
573 :
574 0 : if (self)
575 0 : self->data = data;
576 : }
577 :
578 : void *
579 0 : MT_thread_getdata(void)
580 : {
581 0 : if (!thread_initialized)
582 : return NULL;
583 0 : struct mtthread *self = thread_self();
584 :
585 0 : return self ? self->data : NULL;
586 : }
587 :
588 : void
589 25211084 : MT_thread_set_qry_ctx(QryCtx *ctx)
590 : {
591 25211084 : if (!thread_initialized)
592 : return;
593 25230661 : struct mtthread *self = thread_self();
594 :
595 25252357 : if (self)
596 25252357 : self->qry_ctx = ctx;
597 : }
598 :
599 : QryCtx *
600 28024119 : MT_thread_get_qry_ctx(void)
601 : {
602 28024119 : if (!thread_initialized)
603 : return NULL;
604 28024119 : struct mtthread *self = thread_self();
605 :
606 28046237 : return self ? self->qry_ctx : NULL;
607 : }
608 :
609 : void
610 34380205 : MT_thread_setlockwait(MT_Lock *lock)
611 : {
612 34380205 : if (!thread_initialized)
613 : return;
614 34454113 : struct mtthread *self = thread_self();
615 :
616 34538574 : if (self)
617 34538574 : self->lockwait = lock;
618 : }
619 :
620 : void
621 13315866 : MT_thread_setsemawait(MT_Sema *sema)
622 : {
623 13315866 : if (!thread_initialized)
624 : return;
625 13335186 : struct mtthread *self = thread_self();
626 :
627 13325469 : if (self)
628 13325469 : self->semawait = sema;
629 : }
630 :
631 : static void
632 18 : MT_thread_setcondwait(MT_Cond *cond)
633 : {
634 18 : if (!thread_initialized)
635 : return;
636 18 : struct mtthread *self = thread_self();
637 :
638 18 : if (self)
639 18 : self->condwait = cond;
640 : }
641 :
642 : #ifdef LOCK_OWNER
643 : void
644 1288939083 : MT_thread_add_mylock(MT_Lock *lock)
645 : {
646 1288939083 : struct mtthread *self;
647 1288939083 : if (!thread_initialized)
648 : self = &mainthread;
649 : else
650 1287091164 : self = thread_self();
651 :
652 1292849992 : if (self) {
653 1294697911 : lock->nxt = self->mylocks;
654 1294697911 : self->mylocks = lock;
655 : }
656 1294697911 : }
657 :
658 : void
659 1280473751 : MT_thread_del_mylock(MT_Lock *lock)
660 : {
661 1280473751 : struct mtthread *self;
662 1280473751 : if (!thread_initialized)
663 : self = &mainthread;
664 : else
665 1275097949 : self = thread_self();
666 :
667 1286534859 : if (self) {
668 1291910661 : if (self->mylocks == lock) {
669 1291902200 : self->mylocks = lock->nxt;
670 : } else {
671 74334 : for (MT_Lock *l = self->mylocks; l; l = l->nxt) {
672 74334 : if (l->nxt == lock) {
673 8461 : l->nxt = lock->nxt;
674 8461 : break;
675 : }
676 : }
677 : }
678 : }
679 1291910661 : }
680 : #endif
681 :
682 : void
683 17320757 : MT_thread_setworking(const char *work)
684 : {
685 17320757 : if (!thread_initialized)
686 : return;
687 17328052 : struct mtthread *self = thread_self();
688 :
689 17350688 : if (self) {
690 17350688 : if (work == NULL)
691 884475 : self->working = NULL;
692 16466213 : else if (strcmp(work, "store locked") == 0)
693 643026 : self->limit_override = true;
694 15823187 : else if (strcmp(work, "store unlocked") == 0)
695 643026 : self->limit_override = false;
696 : else
697 15180161 : self->working = work;
698 : }
699 : }
700 :
701 : void
702 58302393 : MT_thread_setalgorithm(const char *algo)
703 : {
704 58302393 : if (!thread_initialized)
705 : return;
706 58314464 : struct mtthread *self = thread_self();
707 :
708 58344640 : if (self) {
709 58344640 : if (algo) {
710 4420541 : if (self->algolen > 0) {
711 2350680 : if (self->algolen < sizeof(self->algorithm))
712 1397812 : self->algolen += strconcat_len(self->algorithm + self->algolen, sizeof(self->algorithm) - self->algolen, "; ", algo, NULL);
713 : } else
714 2069861 : self->algolen = strcpy_len(self->algorithm, algo, sizeof(self->algorithm));
715 : } else {
716 53924099 : self->algorithm[0] = 0;
717 53924099 : self->algolen = 0;
718 : }
719 : }
720 : }
721 :
722 : const char *
723 1837 : MT_thread_getalgorithm(void)
724 : {
725 1837 : if (!thread_initialized)
726 : return NULL;
727 1836 : struct mtthread *self = thread_self();
728 :
729 1837 : return self && self->algorithm[0] ? self->algorithm : NULL;
730 : }
731 :
732 : bool
733 0 : MT_thread_override_limits(void)
734 : {
735 0 : if (!thread_initialized)
736 : return false;
737 0 : struct mtthread *self = thread_self();
738 :
739 0 : return self && self->limit_override;
740 : }
741 :
742 : static struct thread_init_cb {
743 : struct thread_init_cb *next;
744 : void (*init)(void *);
745 : void (*destroy)(void *);
746 : void *data;
747 : } *init_cb;
748 : static MT_Lock thread_init_lock = MT_LOCK_INITIALIZER(thread_init_lock);
749 :
750 : gdk_return
751 318 : MT_thread_init_add_callback(void (*init)(void *), void (*destroy)(void *), void *data)
752 : {
753 318 : struct thread_init_cb *p = GDKmalloc(sizeof(struct thread_init_cb));
754 :
755 318 : if (p == NULL)
756 : return GDK_FAIL;
757 318 : *p = (struct thread_init_cb) {
758 : .init = init,
759 : .destroy = destroy,
760 : .next = NULL,
761 : .data = data,
762 : };
763 318 : MT_lock_set(&thread_init_lock);
764 318 : struct thread_init_cb **pp = &init_cb;
765 318 : while (*pp)
766 0 : pp = &(*pp)->next;
767 318 : *pp = p;
768 318 : MT_lock_unset(&thread_init_lock);
769 318 : return GDK_SUCCEED;
770 : }
771 :
772 : #ifdef HAVE_PTHREAD_H
773 : static void *
774 : #else
775 : static DWORD WINAPI
776 : #endif
777 47147 : thread_starter(void *arg)
778 : {
779 47147 : struct mtthread *self = (struct mtthread *) arg;
780 47147 : void *data = self->data;
781 :
782 : #ifdef HAVE_GETTID
783 47147 : self->lwptid = gettid();
784 : #endif
785 : #ifdef HAVE_PTHREAD_H
786 : #ifdef HAVE_PTHREAD_SETNAME_NP
787 : /* name can be at most 16 chars including \0 */
788 47130 : char name[16];
789 47130 : (void) strcpy_len(name, self->threadname, sizeof(name));
790 47143 : pthread_setname_np(
791 : #ifndef __APPLE__
792 : pthread_self(),
793 : #endif
794 : name);
795 : #endif
796 : #else
797 : #ifdef HAVE_SETTHREADDESCRIPTION
798 : wchar_t *wname = utf8towchar(self->threadname);
799 : if (wname != NULL) {
800 : SetThreadDescription(GetCurrentThread(), wname);
801 : free(wname);
802 : }
803 : #endif
804 : #endif
805 47113 : self->data = NULL;
806 47113 : self->sp = THRsp();
807 47117 : thread_setself(self);
808 93801 : for (int i = 0; i < self->nthread_funcs; i++) {
809 46686 : if (self->thread_funcs[i].init)
810 46686 : (*self->thread_funcs[i].init)(self->thread_funcs[i].data);
811 : }
812 47128 : (*self->func)(data);
813 93816 : for (int i = 0; i < self->nthread_funcs; i++) {
814 46690 : if (self->thread_funcs[i].destroy)
815 46690 : (*self->thread_funcs[i].destroy)(self->thread_funcs[i].data);
816 : }
817 47115 : free(self->thread_funcs);
818 47115 : BBPrelinquishbats();
819 47120 : ATOMIC_SET(&self->exited, 1);
820 47120 : TRC_DEBUG(THRD, "Exit thread \"%s\"\n", self->threadname);
821 47120 : return 0; /* NULL for pthreads, 0 for Windows */
822 : }
823 :
824 : static void
825 55006 : join_threads(void)
826 : {
827 55006 : bool waited;
828 :
829 55006 : struct mtthread *self = thread_self();
830 55006 : if (!self)
831 : return;
832 55006 : thread_lock();
833 93274 : do {
834 93274 : waited = false;
835 2781256 : for (struct mtthread *t = mtthreads; t; t = t->next) {
836 2726250 : if (ATOMIC_GET(&t->exited) && t->detached && !t->waiting) {
837 38268 : t->waiting = true;
838 38268 : thread_unlock();
839 38268 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
840 38268 : self->joinwait = t;
841 : #ifdef HAVE_PTHREAD_H
842 38268 : pthread_join(t->hdl, NULL);
843 : #else
844 : WaitForSingleObject(t->hdl, INFINITE);
845 : #endif
846 38268 : self->joinwait = NULL;
847 : #ifndef HAVE_PTHREAD_H
848 : CloseHandle(t->hdl);
849 : #endif
850 38268 : rm_mtthread(t);
851 38268 : waited = true;
852 38268 : thread_lock();
853 38268 : break;
854 : }
855 : }
856 93274 : } while (waited);
857 55006 : thread_unlock();
858 : }
859 :
860 : void
861 656 : join_detached_threads(void)
862 : {
863 656 : bool waited;
864 :
865 656 : struct mtthread *self = thread_self();
866 656 : thread_lock();
867 1996 : do {
868 1996 : waited = false;
869 9906 : for (struct mtthread *t = mtthreads; t; t = t->next) {
870 9250 : if (t->detached && !t->waiting) {
871 1340 : t->waiting = true;
872 1340 : thread_unlock();
873 1340 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
874 1340 : self->joinwait = t;
875 : #ifdef HAVE_PTHREAD_H
876 1340 : pthread_join(t->hdl, NULL);
877 : #else
878 : WaitForSingleObject(t->hdl, INFINITE);
879 : #endif
880 1340 : self->joinwait = NULL;
881 : #ifndef HAVE_PTHREAD_H
882 : CloseHandle(t->hdl);
883 : #endif
884 1340 : rm_mtthread(t);
885 1340 : waited = true;
886 1340 : thread_lock();
887 1340 : break;
888 : }
889 : }
890 1996 : } while (waited);
891 656 : thread_unlock();
892 656 : }
893 :
894 : int
895 47149 : MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname)
896 : {
897 47149 : struct mtthread *self;
898 :
899 47149 : assert(thread_initialized);
900 47149 : join_threads();
901 47149 : if (threadname == NULL) {
902 0 : TRC_CRITICAL(GDK, "Thread must have a name\n");
903 0 : return -1;
904 : }
905 47149 : if (strlen(threadname) >= sizeof(self->threadname)) {
906 0 : TRC_CRITICAL(GDK, "Thread's name is too large\n");
907 0 : return -1;
908 : }
909 :
910 : #ifdef HAVE_PTHREAD_H
911 47149 : pthread_attr_t attr;
912 47149 : int ret;
913 47149 : if ((ret = pthread_attr_init(&attr)) != 0) {
914 0 : GDKsyserr(ret, "Cannot init pthread attr");
915 0 : return -1;
916 : }
917 47149 : if ((ret = pthread_attr_setstacksize(&attr, THREAD_STACK_SIZE)) != 0) {
918 0 : GDKsyserr(ret, "Cannot set stack size");
919 0 : pthread_attr_destroy(&attr);
920 0 : return -1;
921 : }
922 : #endif
923 47149 : self = malloc(sizeof(*self));
924 47149 : if (self == NULL) {
925 0 : GDKsyserror("Cannot allocate memory\n");
926 : #ifdef HAVE_PTHREAD_H
927 0 : pthread_attr_destroy(&attr);
928 : #endif
929 0 : return -1;
930 : }
931 :
932 47149 : *self = (struct mtthread) {
933 : .func = f,
934 : .data = arg,
935 : .waiting = false,
936 47149 : .detached = (d == MT_THR_DETACHED),
937 : .refs = 1,
938 47149 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
939 : .exited = ATOMIC_VAR_INIT(0),
940 : };
941 47149 : MT_lock_set(&thread_init_lock);
942 : /* remember the list of callback functions we need to call for
943 : * this thread (i.e. anything registered so far) */
944 93845 : for (struct thread_init_cb *p = init_cb; p; p = p->next)
945 46696 : self->nthread_funcs++;
946 47149 : if (self->nthread_funcs > 0) {
947 46696 : self->thread_funcs = malloc(self->nthread_funcs * sizeof(*self->thread_funcs));
948 46696 : if (self->thread_funcs == NULL) {
949 0 : GDKsyserror("Cannot allocate memory\n");
950 0 : MT_lock_unset(&thread_init_lock);
951 0 : free(self);
952 : #ifdef HAVE_PTHREAD_H
953 0 : pthread_attr_destroy(&attr);
954 : #endif
955 0 : return -1;
956 : }
957 46696 : int n = 0;
958 93392 : for (struct thread_init_cb *p = init_cb; p; p = p->next) {
959 46696 : self->thread_funcs[n++] = (struct thread_funcs) {
960 46696 : .init = p->init,
961 46696 : .destroy = p->destroy,
962 46696 : .data = p->data,
963 : };
964 : }
965 : }
966 47149 : MT_lock_unset(&thread_init_lock);
967 :
968 47149 : strcpy_len(self->threadname, threadname, sizeof(self->threadname));
969 47149 : char *p;
970 47149 : if ((p = strstr(self->threadname, "XXXX")) != NULL) {
971 : /* overwrite XXXX with thread ID; bottom three bits are
972 : * likely 0, so skip those */
973 43245 : char buf[5];
974 43245 : snprintf(buf, 5, "%04zu", self->tid % 9999);
975 43245 : memcpy(p, buf, 4);
976 : }
977 47149 : TRC_DEBUG(THRD, "Create thread \"%s\"\n", self->threadname);
978 : #ifdef HAVE_PTHREAD_H
979 : #ifdef HAVE_PTHREAD_SIGMASK
980 47149 : sigset_t new_mask, orig_mask;
981 47149 : (void) sigfillset(&new_mask);
982 47149 : sigdelset(&new_mask, SIGQUIT);
983 47149 : sigdelset(&new_mask, SIGPROF);
984 47149 : pthread_sigmask(SIG_SETMASK, &new_mask, &orig_mask);
985 : #endif
986 47149 : ret = pthread_create(&self->hdl, &attr, thread_starter, self);
987 47149 : pthread_attr_destroy(&attr);
988 : #ifdef HAVE_PTHREAD_SIGMASK
989 47149 : pthread_sigmask(SIG_SETMASK, &orig_mask, NULL);
990 : #endif
991 47149 : if (ret != 0) {
992 0 : GDKsyserr(ret, "Cannot start thread");
993 0 : free(self->thread_funcs);
994 0 : free(self);
995 0 : return -1;
996 : }
997 : #else
998 : self->hdl = CreateThread(NULL, THREAD_STACK_SIZE, thread_starter, self,
999 : 0, &self->wtid);
1000 : if (self->hdl == NULL) {
1001 : GDKwinerror("Failed to create thread");
1002 : free(self->thread_funcs);
1003 : free(self);
1004 : return -1;
1005 : }
1006 : #endif
1007 : /* must not fail after this: the thread has been started */
1008 47149 : *t = self->tid;
1009 47149 : thread_lock();
1010 47149 : self->next = mtthreads;
1011 47149 : mtthreads = self;
1012 47149 : thread_unlock();
1013 47149 : return 0;
1014 : }
1015 :
1016 : MT_Id
1017 95853271 : MT_getpid(void)
1018 : {
1019 95853271 : struct mtthread *self;
1020 :
1021 95853271 : if (!thread_initialized)
1022 : self = &mainthread;
1023 : else
1024 95906860 : self = thread_self();
1025 95916544 : return self->tid;
1026 : }
1027 :
1028 : void
1029 38453 : MT_exiting_thread(void)
1030 : {
1031 38453 : struct mtthread *self;
1032 :
1033 38453 : if (!thread_initialized)
1034 : return;
1035 38454 : self = thread_self();
1036 38454 : if (self) {
1037 38454 : ATOMIC_SET(&self->exited, 1);
1038 38454 : self->working = NULL;
1039 : }
1040 : }
1041 :
1042 : int
1043 7529 : MT_join_thread(MT_Id tid)
1044 : {
1045 7529 : struct mtthread *t;
1046 :
1047 7529 : assert(tid != mainthread.tid);
1048 7529 : join_threads();
1049 7529 : t = find_mtthread(tid);
1050 7529 : if (t == NULL
1051 : #ifndef HAVE_PTHREAD_H
1052 : || t->hdl == NULL
1053 : #endif
1054 : )
1055 : return -1;
1056 7529 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
1057 7529 : struct mtthread *self = thread_self();
1058 7529 : self->joinwait = t;
1059 : #ifdef HAVE_PTHREAD_H
1060 7529 : int ret = pthread_join(t->hdl, NULL);
1061 : #else
1062 : DWORD ret = WaitForSingleObject(t->hdl, INFINITE);
1063 : #endif
1064 7529 : self->joinwait = NULL;
1065 7529 : if (
1066 : #ifdef HAVE_PTHREAD_H
1067 : ret == 0
1068 : #else
1069 : ret == WAIT_OBJECT_0 && CloseHandle(t->hdl)
1070 : #endif
1071 : ) {
1072 7529 : rm_mtthread(t);
1073 7529 : return 0;
1074 : }
1075 : return -1;
1076 : }
1077 :
1078 : static bool
1079 0 : MT_kill_thread(struct mtthread *t)
1080 : {
1081 0 : assert(t != thread_self());
1082 : #ifdef HAVE_PTHREAD_H
1083 : #ifdef HAVE_PTHREAD_KILL
1084 0 : if (pthread_kill(t->hdl, SIGHUP) == 0)
1085 0 : return true;
1086 : #endif
1087 : #else
1088 : if (t->hdl == NULL) {
1089 : /* detached thread */
1090 : HANDLE h;
1091 : bool ret = false;
1092 : h = OpenThread(THREAD_ALL_ACCESS, 0, t->wtid);
1093 : if (h == NULL)
1094 : return false;
1095 : if (TerminateThread(h, -1))
1096 : ret = true;
1097 : CloseHandle(h);
1098 : return ret;
1099 : }
1100 : if (TerminateThread(t->hdl, -1))
1101 : return true;
1102 : #endif
1103 : return false;
1104 : }
1105 :
1106 : bool
1107 328 : MT_kill_threads(void)
1108 : {
1109 328 : struct mtthread *self = thread_self();
1110 328 : bool killed = false;
1111 :
1112 328 : assert(self == &mainthread);
1113 328 : join_threads();
1114 328 : thread_lock();
1115 656 : for (struct mtthread *t = mtthreads; t; t = t->next) {
1116 328 : if (t == self)
1117 328 : continue;
1118 0 : TRC_INFO(GDK, "Killing thread %s\n", t->threadname);
1119 0 : killed |= MT_kill_thread(t);
1120 : }
1121 328 : thread_unlock();
1122 328 : return killed;
1123 : }
1124 :
1125 : int
1126 330 : MT_check_nr_cores(void)
1127 : {
1128 330 : int ncpus = -1;
1129 :
1130 : #if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_ONLN)
1131 : /* this works on Linux, Solaris and AIX */
1132 330 : ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1133 : #elif defined(HW_NCPU) /* BSD */
1134 : size_t len = sizeof(int);
1135 : int mib[3];
1136 :
1137 : /* Everyone should have permission to make this call,
1138 : * if we get a failure something is really wrong. */
1139 : mib[0] = CTL_HW;
1140 : mib[1] = HW_NCPU;
1141 : mib[2] = -1;
1142 : sysctl(mib, 3, &ncpus, &len, NULL, 0);
1143 : #elif defined(WIN32)
1144 : SYSTEM_INFO sysinfo;
1145 :
1146 : GetSystemInfo(&sysinfo);
1147 : ncpus = sysinfo.dwNumberOfProcessors;
1148 : #endif
1149 :
1150 : /* if we ever need HPUX or OSF/1 (hope not), see
1151 : * http://ndevilla.free.fr/threads/ */
1152 :
1153 330 : if (ncpus <= 0)
1154 : ncpus = 1;
1155 : #if SIZEOF_SIZE_T == SIZEOF_INT
1156 : /* On 32-bits systems with large numbers of cpus/cores, we
1157 : * quickly run out of space due to the number of threads in
1158 : * use. Since it is questionable whether many cores on a
1159 : * 32-bits system are going to be beneficial due to this, we
1160 : * simply limit the auto-detected cores to 16 on 32-bits
1161 : * systems. The user can always override this via
1162 : * gdk_nr_threads. */
1163 : if (ncpus > 16)
1164 : ncpus = 16;
1165 : #endif
1166 :
1167 : #ifndef WIN32
1168 : /* get the number of allocated cpus from the cgroup settings */
1169 330 : FILE *f = fopen("/sys/fs/cgroup/cpuset/cpuset.cpus", "r");
1170 330 : if (f != NULL) {
1171 0 : char buf[512];
1172 0 : char *p = fgets(buf, 512, f);
1173 0 : fclose(f);
1174 0 : if (p != NULL) {
1175 : /* syntax is: ranges of CPU numbers separated
1176 : * by comma; a range is either a single CPU
1177 : * id, or two IDs separated by a minus; any
1178 : * deviation causes the file to be ignored */
1179 : int ncpu = 0;
1180 0 : for (;;) {
1181 0 : char *q;
1182 0 : unsigned fst = strtoul(p, &q, 10);
1183 0 : if (q == p)
1184 0 : return ncpus;
1185 0 : ncpu++;
1186 0 : if (*q == '-') {
1187 0 : p = q + 1;
1188 0 : unsigned lst = strtoul(p, &q, 10);
1189 0 : if (q == p || lst <= fst)
1190 : return ncpus;
1191 0 : ncpu += lst - fst;
1192 : }
1193 0 : if (*q == '\n')
1194 : break;
1195 0 : if (*q != ',')
1196 : return ncpus;
1197 0 : p = q + 1;
1198 : }
1199 0 : if (ncpu < ncpus)
1200 : return ncpu;
1201 : }
1202 : }
1203 : #endif
1204 :
1205 : return ncpus;
1206 : }
1207 :
1208 :
1209 : void
1210 330 : MT_cond_init(MT_Cond *cond)
1211 : {
1212 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1213 : InitializeConditionVariable(&cond->cv);
1214 : #else
1215 330 : pthread_cond_init(&cond->cv, NULL);
1216 : #endif
1217 330 : }
1218 :
1219 :
1220 : void
1221 0 : MT_cond_destroy(MT_Cond *cond)
1222 : {
1223 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1224 : /* no need */
1225 : #else
1226 0 : pthread_cond_destroy(&cond->cv);
1227 : #endif
1228 0 : }
1229 :
1230 : void
1231 9 : MT_cond_wait(MT_Cond *cond, MT_Lock *lock)
1232 : {
1233 9 : MT_thread_setcondwait(cond);
1234 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1235 : SleepConditionVariableCS(&cond->cv, &lock->lock, INFINITE);
1236 : #else
1237 9 : pthread_cond_wait(&cond->cv, &lock->lock);
1238 : #endif
1239 9 : MT_thread_setcondwait(NULL);
1240 9 : }
1241 :
1242 : void
1243 56532 : MT_cond_signal(MT_Cond *cond)
1244 : {
1245 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1246 : WakeConditionVariable(&cond->cv);
1247 : #else
1248 56532 : pthread_cond_signal(&cond->cv);
1249 : #endif
1250 56532 : }
1251 :
1252 : void
1253 2 : MT_cond_broadcast(MT_Cond *cond)
1254 : {
1255 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1256 : WakeAllConditionVariable(&cond->cv);
1257 : #else
1258 2 : pthread_cond_broadcast(&cond->cv);
1259 : #endif
1260 2 : }
|