Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * @a Niels Nes, Peter Boncz
15 : * @+ Threads
16 : * This file contains a wrapper layer for threading, hence the
17 : * underscore convention MT_x (Multi-Threading). As all platforms
18 : * that MonetDB runs on now support POSIX Threads (pthreads), this
19 : * wrapping layer has become rather thin.
20 : *
21 : * In the late 1990s when multi-threading support was introduced in
22 : * MonetDB, pthreads was just emerging as a standard API and not
23 : * widely adopted yet. The earliest MT implementation focused on SGI
24 : * Unix and provided multi- threading using multiple processes, and
25 : * shared memory.
26 : *
27 : * One of the relics of this model, namely the need to pre-allocate
28 : * locks and semaphores, and consequently a maximum number of them,
29 : * has been removed in the latest iteration of this layer.
30 : *
31 : */
32 : /*
33 : * @- Mthreads Routine implementations
34 : */
35 : #include "monetdb_config.h"
36 : #include "mstring.h"
37 : #include "gdk.h"
38 : #include "gdk_system_private.h"
39 :
40 : #include <time.h>
41 :
42 : #ifdef HAVE_FTIME
43 : #include <sys/timeb.h> /* ftime */
44 : #endif
45 : #ifdef HAVE_SYS_TIME_H
46 : #include <sys/time.h> /* gettimeofday */
47 : #endif
48 :
49 : #include <signal.h>
50 : #include <string.h> /* for strerror */
51 : #include <unistd.h> /* for sysconf symbols */
52 :
53 : #include "mutils.h"
54 :
55 : static ATOMIC_TYPE GDKthreadid = ATOMIC_VAR_INIT(1);
56 :
57 : #ifdef LOCK_STATS
58 :
59 : ATOMIC_TYPE GDKlockcnt = ATOMIC_VAR_INIT(0);
60 : ATOMIC_TYPE GDKlockcontentioncnt = ATOMIC_VAR_INIT(0);
61 : ATOMIC_TYPE GDKlocksleepcnt = ATOMIC_VAR_INIT(0);
62 : MT_Lock *volatile GDKlocklist = 0;
63 : ATOMIC_FLAG GDKlocklistlock = ATOMIC_FLAG_INIT;
64 :
65 : /* merge sort of linked list */
66 : static MT_Lock *
67 : sortlocklist(MT_Lock *l)
68 : {
69 : MT_Lock *r, *t, *ll = NULL;
70 :
71 : if (l == NULL || l->next == NULL) {
72 : /* list is trivially sorted (0 or 1 element) */
73 : return l;
74 : }
75 : /* break list into two (almost) equal pieces:
76 : * l is start of "left" list, r of "right" list, ll last
77 : * element of "left" list */
78 : for (t = r = l; t && t->next; t = t->next->next) {
79 : ll = r;
80 : r = r->next;
81 : }
82 : ll->next = NULL; /* break list into two */
83 : r->prev = NULL;
84 : /* recursively sort both sublists */
85 : l = sortlocklist(l);
86 : r = sortlocklist(r);
87 : /* merge
88 : * t is new list, ll is last element of new list, l and r are
89 : * start of unprocessed part of left and right lists */
90 : t = ll = NULL;
91 : while (l && r) {
92 : if (ATOMIC_GET(&l->sleep) < ATOMIC_GET(&r->sleep) ||
93 : (ATOMIC_GET(&l->sleep) == ATOMIC_GET(&r->sleep) &&
94 : (ATOMIC_GET(&l->contention) < ATOMIC_GET(&r->contention) ||
95 : (ATOMIC_GET(&l->contention) == ATOMIC_GET(&r->contention) &&
96 : l->count <= r->count)))) {
97 : /* l is smaller */
98 : if (ll == NULL) {
99 : assert(t == NULL);
100 : t = ll = l;
101 : } else {
102 : ll->next = l;
103 : l->prev = ll;
104 : ll = ll->next;
105 : }
106 : l = l->next;
107 : } else {
108 : /* r is smaller */
109 : if (ll == NULL) {
110 : assert(t == NULL);
111 : t = ll = r;
112 : } else {
113 : ll->next = r;
114 : r->prev = ll;
115 : ll = ll->next;
116 : }
117 : r = r->next;
118 : }
119 : }
120 : /* append rest of remaining list */
121 : if (l) {
122 : ll->next = l;
123 : l->prev = ll;
124 : } else {
125 : ll->next = r;
126 : r->prev = ll;
127 : }
128 : return t;
129 : }
130 :
131 : static inline bool
132 : lock_isset(MT_Lock *l)
133 : {
134 : if (MT_lock_try(l)) {
135 : MT_lock_unset(l);
136 : return false;
137 : }
138 : return true;
139 : }
140 :
141 : /* function used for debugging */
142 : void
143 : GDKlockstatistics(int what)
144 : {
145 : MT_Lock *l;
146 : int n = 0;
147 :
148 : printf("Locks:\n");
149 : if (ATOMIC_TAS(&GDKlocklistlock) != 0) {
150 : printf("GDKlocklistlock is set, so cannot access lock list\n");
151 : return;
152 : }
153 : if (what == -1) {
154 : for (l = GDKlocklist; l; l = l->next) {
155 : l->count = 0;
156 : ATOMIC_SET(&l->contention, 0);
157 : ATOMIC_SET(&l->sleep, 0);
158 : }
159 : ATOMIC_CLEAR(&GDKlocklistlock);
160 : return;
161 : }
162 : GDKlocklist = sortlocklist(GDKlocklist);
163 : printf("%-18s\t%s\t%s\t%s\t%s\t%s\t%s\n",
164 : "lock name", "count", "content", "sleep",
165 : "locked", "locker", "thread");
166 : for (l = GDKlocklist; l; l = l->next) {
167 : n++;
168 : if (what == 0 ||
169 : (what == 1 && l->count) ||
170 : (what == 2 && ATOMIC_GET(&l->contention)) ||
171 : (what == 3 && lock_isset(l)))
172 : printf("%-18s\t%zu\t%zu\t%zu\t%s\t%s\t%s\n",
173 : l->name, l->count,
174 : (size_t) ATOMIC_GET(&l->contention),
175 : (size_t) ATOMIC_GET(&l->sleep),
176 : lock_isset(l) ? "locked" : "",
177 : l->locker ? l->locker : "",
178 : l->thread ? l->thread : "");
179 : }
180 : printf("Number of locks: %d\n", n);
181 : printf("Total lock count: %zu\n", (size_t) ATOMIC_GET(&GDKlockcnt));
182 : printf("Lock contention: %zu\n", (size_t) ATOMIC_GET(&GDKlockcontentioncnt));
183 : printf("Lock sleep count: %zu\n", (size_t) ATOMIC_GET(&GDKlocksleepcnt));
184 : fflush(stdout);
185 : ATOMIC_CLEAR(&GDKlocklistlock);
186 : }
187 :
188 : #endif /* LOCK_STATS */
189 :
190 : struct thread_funcs {
191 : void (*init)(void *);
192 : void (*destroy)(void *);
193 : void *data;
194 : };
195 :
196 : struct mtthread {
197 : struct mtthread *next;
198 : void (*func) (void *); /* function to be called */
199 : void *data; /* and its data */
200 : struct thread_funcs *thread_funcs; /* callback funcs */
201 : int nthread_funcs;
202 : MT_Lock *lockwait; /* lock we're waiting for */
203 : ATOMIC_PTR_TYPE semawait;/* semaphore we're waiting for */
204 : MT_Cond *condwait; /* condition variable we're waiting for */
205 : #ifdef LOCK_OWNER
206 : MT_Lock *mylocks; /* locks we're holding */
207 : #endif
208 : struct mtthread *joinwait; /* process we are joining with */
209 : ATOMIC_PTR_TYPE working;/* what we're currently doing */
210 : char algorithm[512]; /* the algorithm used in the last operation */
211 : size_t algolen; /* length of string in .algorithm */
212 : ATOMIC_TYPE exited;
213 : bool detached:1, waiting:1;
214 : unsigned int refs:20;
215 : bool limit_override; /* not in bit field because of data races */
216 : char threadname[MT_NAME_LEN];
217 : QryCtx *qry_ctx;
218 : #ifdef HAVE_PTHREAD_H
219 : pthread_t hdl;
220 : #else
221 : HANDLE hdl;
222 : DWORD wtid;
223 : #endif
224 : #ifdef HAVE_GETTID
225 : pid_t lwptid;
226 : #endif
227 : MT_Id tid;
228 : uintptr_t sp;
229 : char *errbuf;
230 : struct freebats freebats;
231 : };
232 : static struct mtthread mainthread = {
233 : .threadname = "main-thread",
234 : .exited = ATOMIC_VAR_INIT(0),
235 : .working = ATOMIC_PTR_VAR_INIT(NULL),
236 : .semawait = ATOMIC_PTR_VAR_INIT(NULL),
237 : .refs = 1,
238 : .tid = 1,
239 : };
240 : static struct mtthread *mtthreads = &mainthread;
241 :
242 : #ifdef HAVE_PTHREAD_H
243 : static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
244 : static pthread_key_t threadkey;
245 : #define thread_lock() pthread_mutex_lock(&posthread_lock)
246 : #define thread_lock_try() (pthread_mutex_trylock(&posthread_lock) == 0)
247 : #define thread_unlock() pthread_mutex_unlock(&posthread_lock)
248 : #define thread_self() pthread_getspecific(threadkey)
249 : #define thread_setself(self) pthread_setspecific(threadkey, self)
250 : #else
251 : static CRITICAL_SECTION winthread_cs;
252 : static DWORD threadkey = TLS_OUT_OF_INDEXES;
253 : #define thread_lock() EnterCriticalSection(&winthread_cs)
254 : #define thread_lock_try() (TryEnterCriticalSection(&winthread_cs) != 0)
255 : #define thread_unlock() LeaveCriticalSection(&winthread_cs)
256 : #define thread_self() TlsGetValue(threadkey)
257 : #define thread_setself(self) TlsSetValue(threadkey, self)
258 : #endif
259 : static bool thread_initialized = false;
260 :
261 : #if defined(_MSC_VER) && _MSC_VER >= 1900
262 : #pragma warning(disable : 4172)
263 : #endif
264 : static inline uintptr_t
265 95821226 : THRsp(void)
266 : {
267 : #ifdef __has_builtin
268 : #if __has_builtin(__builtin_frame_address)
269 191642449 : return (uintptr_t) __builtin_frame_address(0);
270 : #define BUILTIN_USED
271 : #endif
272 : #endif
273 : #ifndef BUILTIN_USED
274 : int l = 0;
275 : uintptr_t sp = (uintptr_t) (&l);
276 :
277 : return sp;
278 : #endif
279 : #undef BUILTIN_USED
280 : }
281 :
282 : bool
283 95771939 : THRhighwater(void)
284 : {
285 95771939 : struct mtthread *s = thread_self();
286 95771941 : if (s != NULL && s->sp != 0) {
287 95771942 : uintptr_t c = THRsp();
288 95771939 : size_t diff = c < s->sp ? s->sp - c : c - s->sp;
289 95771939 : if (diff > THREAD_STACK_SIZE - 80 * 1024)
290 : return true;
291 : }
292 : return false;
293 : }
294 :
295 : void
296 118 : dump_threads(void)
297 : {
298 118 : char buf[1024];
299 : #if defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) && defined(HAVE_CLOCK_GETTIME)
300 118 : struct timespec ts;
301 118 : clock_gettime(CLOCK_REALTIME, &ts);
302 118 : ts.tv_sec++; /* give it a second */
303 118 : if (pthread_mutex_timedlock(&posthread_lock, &ts) != 0) {
304 0 : printf("Threads are currently locked, so no thread information\n");
305 0 : return;
306 : }
307 : #else
308 : if (!thread_lock_try()) {
309 : MT_sleep_ms(1000);
310 : if (!thread_lock_try()) {
311 : printf("Threads are currently locked, so no thread information\n");
312 : return;
313 : }
314 : }
315 : #endif
316 118 : if (!GDK_TRACER_TEST(M_DEBUG, THRD))
317 118 : printf("Threads:\n");
318 1453 : for (struct mtthread *t = mtthreads; t; t = t->next) {
319 1335 : MT_Lock *lk = t->lockwait;
320 1335 : MT_Sema *sm = ATOMIC_PTR_GET(&t->semawait);
321 1335 : MT_Cond *cn = t->condwait;
322 1335 : struct mtthread *jn = t->joinwait;
323 1335 : const char *working = ATOMIC_PTR_GET(&t->working);
324 1335 : int pos = snprintf(buf, sizeof(buf),
325 : "%s, tid %zu, "
326 : #ifdef HAVE_PTHREAD_H
327 : "Thread 0x%lx, "
328 : #endif
329 : #ifdef HAVE_GETTID
330 : "LWP %ld, "
331 : #endif
332 : "%"PRIu32" free bats, waiting for %s%s, working on %.200s",
333 1335 : t->threadname,
334 : t->tid,
335 : #ifdef HAVE_PTHREAD_H
336 1335 : (long) t->hdl,
337 : #endif
338 : #ifdef HAVE_GETTID
339 1335 : (long) t->lwptid,
340 : #endif
341 : t->freebats.nfreebats,
342 1335 : lk ? "lock " : sm ? "semaphore " : cn ? "condvar " : jn ? "thread " : "",
343 1335 : lk ? lk->name : sm ? sm->name : cn ? cn->name : jn ? jn->threadname : "nothing",
344 1335 : ATOMIC_GET(&t->exited) ? "exiting" :
345 1188 : working ? working : "nothing");
346 : #ifdef LOCK_OWNER
347 1335 : const char *sep = ", locked: ";
348 1337 : for (MT_Lock *l = t->mylocks; l && pos < (int) sizeof(buf); l = l->nxt) {
349 2 : pos += snprintf(buf + pos, sizeof(buf) - pos,
350 2 : "%s%s(%s)", sep, l->name, l->locker);
351 2 : sep = ", ";
352 : }
353 : #endif
354 1335 : TRC_DEBUG_IF(THRD)
355 0 : TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
356 : else
357 2670 : printf("%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
358 : }
359 118 : thread_unlock();
360 : }
361 :
362 : static void
363 49280 : rm_mtthread(struct mtthread *t)
364 : {
365 49280 : struct mtthread **pt;
366 :
367 49280 : assert(t != &mainthread);
368 49280 : thread_lock();
369 859578 : for (pt = &mtthreads; *pt && *pt != t; pt = &(*pt)->next)
370 : ;
371 49280 : if (*pt)
372 49280 : *pt = t->next;
373 49280 : free(t);
374 49280 : thread_unlock();
375 49280 : }
376 :
377 : bool
378 350 : MT_thread_init(void)
379 : {
380 350 : if (thread_initialized)
381 : return true;
382 : #ifdef HAVE_GETTID
383 350 : mainthread.lwptid = gettid();
384 : #endif
385 : #ifdef HAVE_PTHREAD_H
386 350 : int ret;
387 :
388 350 : if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
389 0 : GDKsyserr(ret, "Creating specific key for thread failed");
390 0 : return false;
391 : }
392 350 : mainthread.hdl = pthread_self();
393 350 : if ((ret = thread_setself(&mainthread)) != 0) {
394 0 : GDKsyserr(ret, "Setting specific value failed");
395 0 : return false;
396 : }
397 : #else
398 : threadkey = TlsAlloc();
399 : if (threadkey == TLS_OUT_OF_INDEXES) {
400 : GDKwinerror("Creating thread-local slot for thread failed");
401 : return false;
402 : }
403 : mainthread.wtid = GetCurrentThreadId();
404 : if (thread_setself(&mainthread) == 0) {
405 : GDKwinerror("Setting thread-local value failed");
406 : TlsFree(threadkey);
407 : threadkey = TLS_OUT_OF_INDEXES;
408 : return false;
409 : }
410 : InitializeCriticalSection(&winthread_cs);
411 : #endif
412 350 : thread_initialized = true;
413 350 : return true;
414 : }
415 : bool
416 0 : MT_thread_register(void)
417 : {
418 0 : assert(thread_initialized);
419 0 : if (!thread_initialized)
420 : return false;
421 :
422 0 : struct mtthread *self;
423 :
424 0 : if ((self = thread_self()) != NULL) {
425 0 : if (self->refs == 1000000) {
426 : /* there are limits... */
427 : return false;
428 : }
429 0 : self->refs++;
430 0 : return true;
431 : }
432 :
433 0 : self = malloc(sizeof(*self));
434 0 : if (self == NULL)
435 : return false;
436 :
437 0 : *self = (struct mtthread) {
438 : .detached = false,
439 : #ifdef HAVE_PTHREAD_H
440 0 : .hdl = pthread_self(),
441 : #else
442 : .wtid = GetCurrentThreadId(),
443 : #endif
444 : .refs = 1,
445 0 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
446 : .exited = ATOMIC_VAR_INIT(0),
447 : .working = ATOMIC_PTR_VAR_INIT(NULL),
448 : .semawait = ATOMIC_PTR_VAR_INIT(NULL),
449 : };
450 0 : snprintf(self->threadname, sizeof(self->threadname), "foreign %zu", self->tid);
451 0 : thread_setself(self);
452 0 : thread_lock();
453 0 : self->next = mtthreads;
454 0 : mtthreads = self;
455 0 : thread_unlock();
456 0 : return true;
457 : }
458 :
459 : void
460 0 : MT_thread_deregister(void)
461 : {
462 0 : struct mtthread *self;
463 :
464 0 : if ((self = thread_self()) == NULL)
465 : return;
466 :
467 0 : if (--self->refs == 0) {
468 0 : rm_mtthread(self);
469 0 : thread_setself(NULL);
470 : }
471 : }
472 :
473 : static struct mtthread *
474 10468 : find_mtthread(MT_Id tid)
475 : {
476 10468 : struct mtthread *t;
477 :
478 10468 : thread_lock();
479 48076 : for (t = mtthreads; t && t->tid != tid; t = t->next)
480 : ;
481 10468 : thread_unlock();
482 10468 : return t;
483 : }
484 :
485 : gdk_return
486 351 : MT_alloc_tls(MT_TLS_t *newkey)
487 : {
488 : #ifdef HAVE_PTHREAD_H
489 351 : int ret;
490 351 : if ((ret = pthread_key_create(newkey, NULL)) != 0) {
491 0 : GDKsyserr(ret, "Creating TLS key for thread failed");
492 0 : return GDK_FAIL;
493 : }
494 : #else
495 : if ((*newkey = TlsAlloc()) == TLS_OUT_OF_INDEXES) {
496 : GDKwinerror("Creating TLS key for thread failed");
497 : return GDK_FAIL;
498 : }
499 : #endif
500 : return GDK_SUCCEED;
501 : }
502 :
503 : void
504 48947 : MT_tls_set(MT_TLS_t key, void *val)
505 : {
506 : #ifdef HAVE_PTHREAD_H
507 48947 : pthread_setspecific(key, val);
508 : #else
509 : assert(key != TLS_OUT_OF_INDEXES);
510 : TlsSetValue(key, val);
511 : #endif
512 48934 : }
513 :
514 : void *
515 85417 : MT_tls_get(MT_TLS_t key)
516 : {
517 : #ifdef HAVE_PTHREAD_H
518 85417 : return pthread_getspecific(key);
519 : #else
520 : assert(key != TLS_OUT_OF_INDEXES);
521 : return TlsGetValue(key);
522 : #endif
523 : }
524 :
525 : const char *
526 1278255005 : MT_thread_getname(void)
527 : {
528 1278255005 : struct mtthread *self;
529 :
530 1278255005 : if (!thread_initialized)
531 : return mainthread.threadname;
532 1278299291 : self = thread_self();
533 1277739042 : return self ? self->threadname : UNKNOWN_THREAD;
534 : }
535 :
536 : void
537 93781 : GDKsetbuf(char *errbuf)
538 : {
539 93781 : struct mtthread *self;
540 :
541 93781 : self = thread_self();
542 93783 : if (self == NULL)
543 0 : self = &mainthread;
544 93783 : assert(errbuf == NULL || self->errbuf == NULL);
545 93783 : self->errbuf = errbuf;
546 46889 : if (errbuf)
547 46894 : *errbuf = 0; /* start clean */
548 93783 : }
549 :
550 : char *
551 17122363 : GDKgetbuf(void)
552 : {
553 17122363 : struct mtthread *self;
554 :
555 17122363 : self = thread_self();
556 17128371 : if (self == NULL)
557 0 : self = &mainthread;
558 17128371 : return self->errbuf;
559 : }
560 :
561 : struct freebats *
562 40467731 : MT_thread_getfreebats(void)
563 : {
564 40467731 : struct mtthread *self;
565 :
566 40467731 : self = thread_self();
567 40469561 : if (self == NULL)
568 0 : self = &mainthread;
569 40469561 : return &self->freebats;
570 : }
571 :
572 : void
573 0 : MT_thread_setdata(void *data)
574 : {
575 0 : if (!thread_initialized)
576 : return;
577 0 : struct mtthread *self = thread_self();
578 :
579 0 : if (self)
580 0 : self->data = data;
581 : }
582 :
583 : void *
584 0 : MT_thread_getdata(void)
585 : {
586 0 : if (!thread_initialized)
587 : return NULL;
588 0 : struct mtthread *self = thread_self();
589 :
590 0 : return self ? self->data : NULL;
591 : }
592 :
593 : void
594 18542056 : MT_thread_set_qry_ctx(QryCtx *ctx)
595 : {
596 18542056 : if (!thread_initialized)
597 : return;
598 18542861 : struct mtthread *self = thread_self();
599 :
600 18537478 : if (self)
601 18537478 : self->qry_ctx = ctx;
602 : }
603 :
604 : QryCtx *
605 30130391 : MT_thread_get_qry_ctx(void)
606 : {
607 30130391 : if (!thread_initialized)
608 : return NULL;
609 30130391 : struct mtthread *self = thread_self();
610 :
611 30130003 : return self ? self->qry_ctx : NULL;
612 : }
613 :
614 : void
615 7578553 : MT_thread_setlockwait(MT_Lock *lock)
616 : {
617 7578553 : if (!thread_initialized)
618 : return;
619 7581836 : struct mtthread *self = thread_self();
620 :
621 7592015 : if (self)
622 7592015 : self->lockwait = lock;
623 : }
624 :
625 : void
626 12380572 : MT_thread_setsemawait(MT_Sema *sema)
627 : {
628 12380572 : if (!thread_initialized)
629 : return;
630 12380767 : struct mtthread *self = thread_self();
631 :
632 12379580 : if (self)
633 12379580 : ATOMIC_PTR_SET(&self->semawait, sema);
634 : }
635 :
636 : static void
637 18 : MT_thread_setcondwait(MT_Cond *cond)
638 : {
639 18 : if (!thread_initialized)
640 : return;
641 18 : struct mtthread *self = thread_self();
642 :
643 18 : if (self)
644 18 : self->condwait = cond;
645 : }
646 :
647 : #ifdef LOCK_OWNER
648 : void
649 1273941505 : MT_thread_add_mylock(MT_Lock *lock)
650 : {
651 1273941505 : struct mtthread *self;
652 1273941505 : if (!thread_initialized)
653 : self = &mainthread;
654 : else
655 1274086345 : self = thread_self();
656 :
657 1274199964 : if (self) {
658 1274055124 : lock->nxt = self->mylocks;
659 1274055124 : self->mylocks = lock;
660 : }
661 1274055124 : }
662 :
663 : void
664 1268513447 : MT_thread_del_mylock(MT_Lock *lock)
665 : {
666 1268513447 : struct mtthread *self;
667 1268513447 : if (!thread_initialized)
668 : self = &mainthread;
669 : else
670 1268101244 : self = thread_self();
671 :
672 1269326779 : if (self) {
673 1269738982 : if (self->mylocks == lock) {
674 1269721184 : self->mylocks = lock->nxt;
675 : } else {
676 83698 : for (MT_Lock *l = self->mylocks; l; l = l->nxt) {
677 83698 : if (l->nxt == lock) {
678 17798 : l->nxt = lock->nxt;
679 17798 : break;
680 : }
681 : }
682 : }
683 : }
684 1269738982 : }
685 : #endif
686 :
687 : void
688 14621619 : MT_thread_setworking(const char *work)
689 : {
690 14621619 : if (!thread_initialized)
691 : return;
692 14622235 : struct mtthread *self = thread_self();
693 :
694 14627616 : if (self) {
695 14627616 : if (work == NULL)
696 935752 : ATOMIC_PTR_SET(&self->working, NULL);
697 13691864 : else if (strcmp(work, "store locked") == 0)
698 721911 : self->limit_override = true;
699 12969953 : else if (strcmp(work, "store unlocked") == 0)
700 721911 : self->limit_override = false;
701 : else
702 12248042 : ATOMIC_PTR_SET(&self->working, work);
703 : }
704 : }
705 :
706 : void
707 66117850 : MT_thread_setalgorithm(const char *algo)
708 : {
709 66117850 : if (!thread_initialized)
710 : return;
711 66118461 : struct mtthread *self = thread_self();
712 :
713 66124491 : if (self) {
714 66124491 : if (algo) {
715 3869532 : if (self->algolen > 0) {
716 2330439 : if (self->algolen < sizeof(self->algorithm))
717 1340441 : self->algolen += strconcat_len(self->algorithm + self->algolen, sizeof(self->algorithm) - self->algolen, "; ", algo, NULL);
718 : } else
719 1539093 : self->algolen = strcpy_len(self->algorithm, algo, sizeof(self->algorithm));
720 : } else {
721 62254959 : self->algorithm[0] = 0;
722 62254959 : self->algolen = 0;
723 : }
724 : }
725 : }
726 :
727 : const char *
728 1650 : MT_thread_getalgorithm(void)
729 : {
730 1650 : if (!thread_initialized)
731 : return NULL;
732 1650 : struct mtthread *self = thread_self();
733 :
734 1650 : return self && self->algorithm[0] ? self->algorithm : NULL;
735 : }
736 :
737 : bool
738 0 : MT_thread_override_limits(void)
739 : {
740 0 : if (!thread_initialized)
741 : return false;
742 0 : struct mtthread *self = thread_self();
743 :
744 0 : return self && self->limit_override;
745 : }
746 :
747 : static struct thread_init_cb {
748 : struct thread_init_cb *next;
749 : void (*init)(void *);
750 : void (*destroy)(void *);
751 : void *data;
752 : } *init_cb;
753 : static MT_Lock thread_init_lock = MT_LOCK_INITIALIZER(thread_init_lock);
754 :
755 : gdk_return
756 345 : MT_thread_init_add_callback(void (*init)(void *), void (*destroy)(void *), void *data)
757 : {
758 345 : struct thread_init_cb *p = GDKmalloc(sizeof(struct thread_init_cb));
759 :
760 345 : if (p == NULL)
761 : return GDK_FAIL;
762 345 : *p = (struct thread_init_cb) {
763 : .init = init,
764 : .destroy = destroy,
765 : .next = NULL,
766 : .data = data,
767 : };
768 345 : MT_lock_set(&thread_init_lock);
769 345 : struct thread_init_cb **pp = &init_cb;
770 345 : while (*pp)
771 0 : pp = &(*pp)->next;
772 345 : *pp = p;
773 345 : MT_lock_unset(&thread_init_lock);
774 345 : return GDK_SUCCEED;
775 : }
776 :
777 : #ifdef HAVE_PTHREAD_H
778 : static void *
779 : #else
780 : static DWORD WINAPI
781 : #endif
782 49288 : thread_starter(void *arg)
783 : {
784 49288 : struct mtthread *self = (struct mtthread *) arg;
785 49288 : void *data = self->data;
786 :
787 : #ifdef HAVE_GETTID
788 49288 : self->lwptid = gettid();
789 : #endif
790 : #ifdef HAVE_PTHREAD_H
791 : #ifdef HAVE_PTHREAD_SETNAME_NP
792 : /* name can be at most 16 chars including \0 */
793 49287 : char name[16];
794 49287 : (void) strcpy_len(name, self->threadname, sizeof(name));
795 49287 : pthread_setname_np(
796 : #ifndef __APPLE__
797 : pthread_self(),
798 : #endif
799 : name);
800 : #endif
801 : #else
802 : #ifdef HAVE_SETTHREADDESCRIPTION
803 : wchar_t *wname = utf8toutf16(self->threadname);
804 : static_assert(SIZEOF_WCHAR_T == 2, "wchar_t on Windows expected to be 2 bytes");
805 : if (wname != NULL) {
806 : SetThreadDescription(GetCurrentThread(), wname);
807 : free(wname);
808 : }
809 : #endif
810 : #endif
811 49284 : self->data = NULL;
812 49284 : self->sp = THRsp();
813 49284 : thread_setself(self);
814 98125 : for (int i = 0; i < self->nthread_funcs; i++) {
815 48855 : if (self->thread_funcs[i].init)
816 48855 : (*self->thread_funcs[i].init)(self->thread_funcs[i].data);
817 : }
818 49271 : (*self->func)(data);
819 98126 : for (int i = 0; i < self->nthread_funcs; i++) {
820 48856 : if (self->thread_funcs[i].destroy)
821 48856 : (*self->thread_funcs[i].destroy)(self->thread_funcs[i].data);
822 : }
823 49279 : free(self->thread_funcs);
824 49279 : BBPrelinquishbats();
825 49280 : ATOMIC_SET(&self->exited, 1);
826 49280 : TRC_DEBUG(THRD, "Exit thread \"%s\"\n", self->threadname);
827 49280 : return 0; /* NULL for pthreads, 0 for Windows */
828 : }
829 :
830 : static void
831 60111 : join_threads(void)
832 : {
833 60111 : bool waited;
834 :
835 60111 : struct mtthread *self = thread_self();
836 60111 : if (!self)
837 : return;
838 60111 : thread_lock();
839 97471 : do {
840 97471 : waited = false;
841 2670305 : for (struct mtthread *t = mtthreads; t; t = t->next) {
842 2610194 : if (ATOMIC_GET(&t->exited) && t->detached && !t->waiting) {
843 37360 : t->waiting = true;
844 37360 : thread_unlock();
845 37360 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
846 37360 : self->joinwait = t;
847 : #ifdef HAVE_PTHREAD_H
848 37360 : pthread_join(t->hdl, NULL);
849 : #else
850 : WaitForSingleObject(t->hdl, INFINITE);
851 : #endif
852 37360 : self->joinwait = NULL;
853 : #ifndef HAVE_PTHREAD_H
854 : CloseHandle(t->hdl);
855 : #endif
856 37360 : rm_mtthread(t);
857 37360 : waited = true;
858 37360 : thread_lock();
859 37360 : break;
860 : }
861 : }
862 97471 : } while (waited);
863 60111 : thread_unlock();
864 : }
865 :
866 : void
867 710 : join_detached_threads(void)
868 : {
869 710 : bool waited;
870 :
871 710 : struct mtthread *self = thread_self();
872 710 : thread_lock();
873 2162 : do {
874 2162 : waited = false;
875 7901 : for (struct mtthread *t = mtthreads; t; t = t->next) {
876 7191 : if (t->detached && !t->waiting) {
877 1452 : t->waiting = true;
878 1452 : thread_unlock();
879 1452 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
880 1452 : self->joinwait = t;
881 : #ifdef HAVE_PTHREAD_H
882 1452 : pthread_join(t->hdl, NULL);
883 : #else
884 : WaitForSingleObject(t->hdl, INFINITE);
885 : #endif
886 1452 : self->joinwait = NULL;
887 : #ifndef HAVE_PTHREAD_H
888 : CloseHandle(t->hdl);
889 : #endif
890 1452 : rm_mtthread(t);
891 1452 : waited = true;
892 1452 : thread_lock();
893 1452 : break;
894 : }
895 : }
896 2162 : } while (waited);
897 710 : thread_unlock();
898 710 : }
899 :
900 : int
901 49288 : MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname)
902 : {
903 49288 : struct mtthread *self;
904 :
905 49288 : assert(thread_initialized);
906 49288 : join_threads();
907 49288 : if (threadname == NULL) {
908 0 : TRC_CRITICAL(GDK, "Thread must have a name\n");
909 0 : return -1;
910 : }
911 49288 : if (strlen(threadname) >= sizeof(self->threadname)) {
912 0 : TRC_CRITICAL(GDK, "Thread's name is too large\n");
913 0 : return -1;
914 : }
915 :
916 : #ifdef HAVE_PTHREAD_H
917 49288 : pthread_attr_t attr;
918 49288 : int ret;
919 49288 : if ((ret = pthread_attr_init(&attr)) != 0) {
920 0 : GDKsyserr(ret, "Cannot init pthread attr");
921 0 : return -1;
922 : }
923 49288 : if ((ret = pthread_attr_setstacksize(&attr, THREAD_STACK_SIZE)) != 0) {
924 0 : GDKsyserr(ret, "Cannot set stack size");
925 0 : pthread_attr_destroy(&attr);
926 0 : return -1;
927 : }
928 : #endif
929 49288 : self = malloc(sizeof(*self));
930 49288 : if (self == NULL) {
931 0 : GDKsyserror("Cannot allocate memory\n");
932 : #ifdef HAVE_PTHREAD_H
933 0 : pthread_attr_destroy(&attr);
934 : #endif
935 0 : return -1;
936 : }
937 :
938 49288 : *self = (struct mtthread) {
939 : .func = f,
940 : .data = arg,
941 : .waiting = false,
942 49288 : .detached = (d == MT_THR_DETACHED),
943 : .refs = 1,
944 49288 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
945 : .exited = ATOMIC_VAR_INIT(0),
946 : .working = ATOMIC_PTR_VAR_INIT(NULL),
947 : .semawait = ATOMIC_PTR_VAR_INIT(NULL),
948 : };
949 49288 : MT_lock_set(&thread_init_lock);
950 : /* remember the list of callback functions we need to call for
951 : * this thread (i.e. anything registered so far) */
952 98144 : for (struct thread_init_cb *p = init_cb; p; p = p->next)
953 48856 : self->nthread_funcs++;
954 49288 : if (self->nthread_funcs > 0) {
955 48856 : self->thread_funcs = malloc(self->nthread_funcs * sizeof(*self->thread_funcs));
956 48856 : if (self->thread_funcs == NULL) {
957 0 : GDKsyserror("Cannot allocate memory\n");
958 0 : MT_lock_unset(&thread_init_lock);
959 0 : free(self);
960 : #ifdef HAVE_PTHREAD_H
961 0 : pthread_attr_destroy(&attr);
962 : #endif
963 0 : return -1;
964 : }
965 48856 : int n = 0;
966 97712 : for (struct thread_init_cb *p = init_cb; p; p = p->next) {
967 48856 : self->thread_funcs[n++] = (struct thread_funcs) {
968 48856 : .init = p->init,
969 48856 : .destroy = p->destroy,
970 48856 : .data = p->data,
971 : };
972 : }
973 : }
974 49288 : MT_lock_unset(&thread_init_lock);
975 :
976 49288 : strcpy_len(self->threadname, threadname, sizeof(self->threadname));
977 49288 : char *p;
978 49288 : if ((p = strstr(self->threadname, "XXXX")) != NULL) {
979 : /* overwrite XXXX with thread ID; bottom three bits are
980 : * likely 0, so skip those */
981 45438 : char buf[5];
982 45438 : snprintf(buf, 5, "%04zu", self->tid % 9999);
983 45438 : memcpy(p, buf, 4);
984 : }
985 49288 : TRC_DEBUG(THRD, "Create thread \"%s\"\n", self->threadname);
986 : #ifdef HAVE_PTHREAD_H
987 : #ifdef HAVE_PTHREAD_SIGMASK
988 49288 : sigset_t new_mask, orig_mask;
989 49288 : (void) sigfillset(&new_mask);
990 49288 : sigdelset(&new_mask, SIGQUIT);
991 49288 : sigdelset(&new_mask, SIGPROF);
992 49288 : pthread_sigmask(SIG_SETMASK, &new_mask, &orig_mask);
993 : #endif
994 49288 : ret = pthread_create(&self->hdl, &attr, thread_starter, self);
995 49288 : pthread_attr_destroy(&attr);
996 : #ifdef HAVE_PTHREAD_SIGMASK
997 49288 : pthread_sigmask(SIG_SETMASK, &orig_mask, NULL);
998 : #endif
999 49288 : if (ret != 0) {
1000 0 : GDKsyserr(ret, "Cannot start thread");
1001 0 : free(self->thread_funcs);
1002 0 : free(self);
1003 0 : return -1;
1004 : }
1005 : #else
1006 : self->hdl = CreateThread(NULL, THREAD_STACK_SIZE, thread_starter, self,
1007 : 0, &self->wtid);
1008 : if (self->hdl == NULL) {
1009 : GDKwinerror("Failed to create thread");
1010 : free(self->thread_funcs);
1011 : free(self);
1012 : return -1;
1013 : }
1014 : #endif
1015 : /* must not fail after this: the thread has been started */
1016 49288 : *t = self->tid;
1017 49288 : thread_lock();
1018 49288 : self->next = mtthreads;
1019 49288 : mtthreads = self;
1020 49288 : thread_unlock();
1021 49288 : return 0;
1022 : }
1023 :
1024 : MT_Id
1025 90627304 : MT_getpid(void)
1026 : {
1027 90627304 : struct mtthread *self;
1028 :
1029 90627304 : if (!thread_initialized)
1030 : self = &mainthread;
1031 : else
1032 90627216 : self = thread_self();
1033 90627397 : return self->tid;
1034 : }
1035 :
1036 : void
1037 37551 : MT_exiting_thread(void)
1038 : {
1039 37551 : struct mtthread *self;
1040 :
1041 37551 : if (!thread_initialized)
1042 : return;
1043 37551 : self = thread_self();
1044 37551 : if (self) {
1045 37551 : ATOMIC_SET(&self->exited, 1);
1046 37551 : ATOMIC_PTR_SET(&self->working, NULL);
1047 : }
1048 : }
1049 :
1050 : int
1051 10468 : MT_join_thread(MT_Id tid)
1052 : {
1053 10468 : struct mtthread *t;
1054 :
1055 10468 : assert(tid != mainthread.tid);
1056 10468 : join_threads();
1057 10468 : t = find_mtthread(tid);
1058 10468 : if (t == NULL
1059 : #ifndef HAVE_PTHREAD_H
1060 : || t->hdl == NULL
1061 : #endif
1062 : )
1063 : return -1;
1064 10468 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
1065 10468 : struct mtthread *self = thread_self();
1066 10468 : self->joinwait = t;
1067 : #ifdef HAVE_PTHREAD_H
1068 10468 : int ret = pthread_join(t->hdl, NULL);
1069 : #else
1070 : DWORD ret = WaitForSingleObject(t->hdl, INFINITE);
1071 : #endif
1072 10468 : self->joinwait = NULL;
1073 10468 : if (
1074 : #ifdef HAVE_PTHREAD_H
1075 : ret == 0
1076 : #else
1077 : ret == WAIT_OBJECT_0 && CloseHandle(t->hdl)
1078 : #endif
1079 : ) {
1080 10468 : rm_mtthread(t);
1081 10468 : return 0;
1082 : }
1083 : return -1;
1084 : }
1085 :
1086 : static bool
1087 0 : MT_kill_thread(struct mtthread *t)
1088 : {
1089 0 : assert(t != thread_self());
1090 : #ifdef HAVE_PTHREAD_H
1091 : #ifdef HAVE_PTHREAD_KILL
1092 0 : if (pthread_kill(t->hdl, SIGHUP) == 0)
1093 0 : return true;
1094 : #endif
1095 : #else
1096 : if (t->hdl == NULL) {
1097 : /* detached thread */
1098 : HANDLE h;
1099 : bool ret = false;
1100 : h = OpenThread(THREAD_ALL_ACCESS, 0, t->wtid);
1101 : if (h == NULL)
1102 : return false;
1103 : if (TerminateThread(h, -1))
1104 : ret = true;
1105 : CloseHandle(h);
1106 : return ret;
1107 : }
1108 : if (TerminateThread(t->hdl, -1))
1109 : return true;
1110 : #endif
1111 : return false;
1112 : }
1113 :
1114 : bool
1115 355 : MT_kill_threads(void)
1116 : {
1117 355 : struct mtthread *self = thread_self();
1118 355 : bool killed = false;
1119 :
1120 355 : assert(self == &mainthread);
1121 355 : join_threads();
1122 355 : thread_lock();
1123 710 : for (struct mtthread *t = mtthreads; t; t = t->next) {
1124 355 : if (t == self)
1125 355 : continue;
1126 0 : GDKwarning("Killing thread %s\n", t->threadname);
1127 0 : killed |= MT_kill_thread(t);
1128 : }
1129 355 : thread_unlock();
1130 355 : return killed;
1131 : }
1132 :
1133 : int
1134 357 : MT_check_nr_cores(void)
1135 : {
1136 357 : int ncpus = -1;
1137 :
1138 : #if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_ONLN)
1139 : /* this works on Linux, Solaris and AIX */
1140 357 : ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1141 : #elif defined(HW_NCPU) /* BSD */
1142 : size_t len = sizeof(int);
1143 : int mib[3];
1144 :
1145 : /* Everyone should have permission to make this call,
1146 : * if we get a failure something is really wrong. */
1147 : mib[0] = CTL_HW;
1148 : mib[1] = HW_NCPU;
1149 : mib[2] = -1;
1150 : sysctl(mib, 3, &ncpus, &len, NULL, 0);
1151 : #elif defined(WIN32)
1152 : SYSTEM_INFO sysinfo;
1153 :
1154 : GetSystemInfo(&sysinfo);
1155 : ncpus = sysinfo.dwNumberOfProcessors;
1156 : #endif
1157 :
1158 : /* if we ever need HPUX or OSF/1 (hope not), see
1159 : * http://ndevilla.free.fr/threads/ */
1160 :
1161 357 : if (ncpus <= 0)
1162 : ncpus = 1;
1163 : #if SIZEOF_SIZE_T == SIZEOF_INT
1164 : /* On 32-bits systems with large numbers of cpus/cores, we
1165 : * quickly run out of space due to the number of threads in
1166 : * use. Since it is questionable whether many cores on a
1167 : * 32-bits system are going to be beneficial due to this, we
1168 : * simply limit the auto-detected cores to 16 on 32-bits
1169 : * systems. The user can always override this via
1170 : * gdk_nr_threads. */
1171 : if (ncpus > 16)
1172 : ncpus = 16;
1173 : #endif
1174 :
1175 : #ifndef WIN32
1176 : /* get the number of allocated cpus from the cgroup settings */
1177 357 : FILE *f = fopen("/sys/fs/cgroup/cpuset/cpuset.cpus", "r");
1178 357 : if (f != NULL) {
1179 0 : char buf[512];
1180 0 : char *p = fgets(buf, 512, f);
1181 0 : fclose(f);
1182 0 : if (p != NULL) {
1183 : /* syntax is: ranges of CPU numbers separated
1184 : * by comma; a range is either a single CPU
1185 : * id, or two IDs separated by a minus; any
1186 : * deviation causes the file to be ignored */
1187 : int ncpu = 0;
1188 0 : for (;;) {
1189 0 : char *q;
1190 0 : unsigned fst = strtoul(p, &q, 10);
1191 0 : if (q == p)
1192 0 : return ncpus;
1193 0 : ncpu++;
1194 0 : if (*q == '-') {
1195 0 : p = q + 1;
1196 0 : unsigned lst = strtoul(p, &q, 10);
1197 0 : if (q == p || lst <= fst)
1198 : return ncpus;
1199 0 : ncpu += lst - fst;
1200 : }
1201 0 : if (*q == '\n')
1202 : break;
1203 0 : if (*q != ',')
1204 : return ncpus;
1205 0 : p = q + 1;
1206 : }
1207 0 : if (ncpu < ncpus)
1208 : return ncpu;
1209 : }
1210 : }
1211 : #endif
1212 :
1213 : return ncpus;
1214 : }
1215 :
1216 :
1217 : void
1218 357 : MT_cond_init(MT_Cond *cond, const char *name)
1219 : {
1220 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1221 : InitializeConditionVariable(&cond->cv);
1222 : #else
1223 357 : pthread_cond_init(&cond->cv, NULL);
1224 : #endif
1225 357 : strcpy_len(cond->name, name, sizeof(cond->name));
1226 357 : }
1227 :
1228 :
1229 : void
1230 0 : MT_cond_destroy(MT_Cond *cond)
1231 : {
1232 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1233 : /* no need */
1234 : #else
1235 0 : pthread_cond_destroy(&cond->cv);
1236 : #endif
1237 0 : }
1238 :
1239 : void
1240 9 : MT_cond_wait(MT_Cond *cond, MT_Lock *lock)
1241 : {
1242 9 : MT_thread_setcondwait(cond);
1243 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1244 : SleepConditionVariableCS(&cond->cv, &lock->lock, INFINITE);
1245 : #else
1246 9 : pthread_cond_wait(&cond->cv, &lock->lock);
1247 : #endif
1248 9 : MT_thread_setcondwait(NULL);
1249 9 : }
1250 :
1251 : void
1252 55353 : MT_cond_signal(MT_Cond *cond)
1253 : {
1254 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1255 : WakeConditionVariable(&cond->cv);
1256 : #else
1257 55353 : pthread_cond_signal(&cond->cv);
1258 : #endif
1259 55353 : }
1260 :
1261 : void
1262 2 : MT_cond_broadcast(MT_Cond *cond)
1263 : {
1264 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1265 : WakeAllConditionVariable(&cond->cv);
1266 : #else
1267 2 : pthread_cond_broadcast(&cond->cv);
1268 : #endif
1269 2 : }
|