Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024, 2025 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * @a Niels Nes, Peter Boncz
15 : * @+ Threads
16 : * This file contains a wrapper layer for threading, hence the
17 : * underscore convention MT_x (Multi-Threading). As all platforms
18 : * that MonetDB runs on now support POSIX Threads (pthreads), this
19 : * wrapping layer has become rather thin.
20 : *
21 : * In the late 1990s when multi-threading support was introduced in
22 : * MonetDB, pthreads was just emerging as a standard API and not
23 : * widely adopted yet. The earliest MT implementation focused on SGI
24 : * Unix and provided multi- threading using multiple processes, and
25 : * shared memory.
26 : *
27 : * One of the relics of this model, namely the need to pre-allocate
28 : * locks and semaphores, and consequently a maximum number of them,
29 : * has been removed in the latest iteration of this layer.
30 : *
31 : */
32 : /*
33 : * @- Mthreads Routine implementations
34 : */
35 : #include "monetdb_config.h"
36 : #include "mstring.h"
37 : #include "gdk.h"
38 : #include "gdk_system_private.h"
39 :
40 : #include <time.h>
41 :
42 : #ifdef HAVE_FTIME
43 : #include <sys/timeb.h> /* ftime */
44 : #endif
45 : #ifdef HAVE_SYS_TIME_H
46 : #include <sys/time.h> /* gettimeofday */
47 : #endif
48 :
49 : #include <signal.h>
50 : #include <string.h> /* for strerror */
51 : #include <unistd.h> /* for sysconf symbols */
52 :
53 : #include "mutils.h"
54 :
55 : static ATOMIC_TYPE GDKthreadid = ATOMIC_VAR_INIT(1);
56 :
57 : #ifdef LOCK_STATS
58 :
59 : ATOMIC_TYPE GDKlockcnt = ATOMIC_VAR_INIT(0);
60 : ATOMIC_TYPE GDKlockcontentioncnt = ATOMIC_VAR_INIT(0);
61 : ATOMIC_TYPE GDKlocksleepcnt = ATOMIC_VAR_INIT(0);
62 : MT_Lock *volatile GDKlocklist = 0;
63 : ATOMIC_FLAG GDKlocklistlock = ATOMIC_FLAG_INIT;
64 :
65 : /* merge sort of linked list */
66 : static MT_Lock *
67 : sortlocklist(MT_Lock *l)
68 : {
69 : MT_Lock *r, *t, *ll = NULL;
70 :
71 : if (l == NULL || l->next == NULL) {
72 : /* list is trivially sorted (0 or 1 element) */
73 : return l;
74 : }
75 : /* break list into two (almost) equal pieces:
76 : * l is start of "left" list, r of "right" list, ll last
77 : * element of "left" list */
78 : for (t = r = l; t && t->next; t = t->next->next) {
79 : ll = r;
80 : r = r->next;
81 : }
82 : ll->next = NULL; /* break list into two */
83 : r->prev = NULL;
84 : /* recursively sort both sublists */
85 : l = sortlocklist(l);
86 : r = sortlocklist(r);
87 : /* merge
88 : * t is new list, ll is last element of new list, l and r are
89 : * start of unprocessed part of left and right lists */
90 : t = ll = NULL;
91 : while (l && r) {
92 : if (ATOMIC_GET(&l->sleep) < ATOMIC_GET(&r->sleep) ||
93 : (ATOMIC_GET(&l->sleep) == ATOMIC_GET(&r->sleep) &&
94 : (ATOMIC_GET(&l->contention) < ATOMIC_GET(&r->contention) ||
95 : (ATOMIC_GET(&l->contention) == ATOMIC_GET(&r->contention) &&
96 : l->count <= r->count)))) {
97 : /* l is smaller */
98 : if (ll == NULL) {
99 : assert(t == NULL);
100 : t = ll = l;
101 : } else {
102 : ll->next = l;
103 : l->prev = ll;
104 : ll = ll->next;
105 : }
106 : l = l->next;
107 : } else {
108 : /* r is smaller */
109 : if (ll == NULL) {
110 : assert(t == NULL);
111 : t = ll = r;
112 : } else {
113 : ll->next = r;
114 : r->prev = ll;
115 : ll = ll->next;
116 : }
117 : r = r->next;
118 : }
119 : }
120 : /* append rest of remaining list */
121 : if (l) {
122 : ll->next = l;
123 : l->prev = ll;
124 : } else {
125 : ll->next = r;
126 : r->prev = ll;
127 : }
128 : return t;
129 : }
130 :
131 : static inline bool
132 : lock_isset(MT_Lock *l)
133 : {
134 : if (MT_lock_try(l)) {
135 : MT_lock_unset(l);
136 : return false;
137 : }
138 : return true;
139 : }
140 :
141 : /* function used for debugging */
142 : void
143 : GDKlockstatistics(int what)
144 : {
145 : MT_Lock *l;
146 : int n = 0;
147 :
148 : printf("Locks:\n");
149 : if (ATOMIC_TAS(&GDKlocklistlock) != 0) {
150 : printf("GDKlocklistlock is set, so cannot access lock list\n");
151 : return;
152 : }
153 : if (what == -1) {
154 : for (l = GDKlocklist; l; l = l->next) {
155 : l->count = 0;
156 : ATOMIC_SET(&l->contention, 0);
157 : ATOMIC_SET(&l->sleep, 0);
158 : }
159 : ATOMIC_CLEAR(&GDKlocklistlock);
160 : return;
161 : }
162 : GDKlocklist = sortlocklist(GDKlocklist);
163 : printf("%-18s\t%s\t%s\t%s\t%s\t%s\t%s\n",
164 : "lock name", "count", "content", "sleep",
165 : "locked", "locker", "thread");
166 : for (l = GDKlocklist; l; l = l->next) {
167 : n++;
168 : if (what == 0 ||
169 : (what == 1 && l->count) ||
170 : (what == 2 && ATOMIC_GET(&l->contention)) ||
171 : (what == 3 && lock_isset(l)))
172 : printf("%-18s\t%zu\t%zu\t%zu\t%s\t%s\t%s\n",
173 : l->name, l->count,
174 : (size_t) ATOMIC_GET(&l->contention),
175 : (size_t) ATOMIC_GET(&l->sleep),
176 : lock_isset(l) ? "locked" : "",
177 : l->locker ? l->locker : "",
178 : l->thread ? l->thread : "");
179 : }
180 : printf("Number of locks: %d\n", n);
181 : printf("Total lock count: %zu\n", (size_t) ATOMIC_GET(&GDKlockcnt));
182 : printf("Lock contention: %zu\n", (size_t) ATOMIC_GET(&GDKlockcontentioncnt));
183 : printf("Lock sleep count: %zu\n", (size_t) ATOMIC_GET(&GDKlocksleepcnt));
184 : fflush(stdout);
185 : ATOMIC_CLEAR(&GDKlocklistlock);
186 : }
187 :
188 : #endif /* LOCK_STATS */
189 :
190 : struct thread_funcs {
191 : void (*init)(void *);
192 : void (*destroy)(void *);
193 : void *data;
194 : };
195 :
196 : struct mtthread {
197 : struct mtthread *next;
198 : void (*func) (void *); /* function to be called */
199 : void *data; /* and its data */
200 : struct thread_funcs *thread_funcs; /* callback funcs */
201 : int nthread_funcs;
202 : MT_Lock *lockwait; /* lock we're waiting for */
203 : ATOMIC_PTR_TYPE semawait;/* semaphore we're waiting for */
204 : MT_Cond *condwait; /* condition variable we're waiting for */
205 : #ifdef LOCK_OWNER
206 : MT_Lock *mylocks; /* locks we're holding */
207 : #endif
208 : struct mtthread *joinwait; /* process we are joining with */
209 : ATOMIC_PTR_TYPE working;/* what we're currently doing */
210 : char algorithm[512]; /* the algorithm used in the last operation */
211 : size_t algolen; /* length of string in .algorithm */
212 : ATOMIC_TYPE exited;
213 : bool detached:1, waiting:1;
214 : unsigned int refs:20;
215 : bool limit_override; /* not in bit field because of data races */
216 : char threadname[MT_NAME_LEN];
217 : QryCtx *qry_ctx;
218 : #ifdef HAVE_PTHREAD_H
219 : pthread_t hdl;
220 : #else
221 : HANDLE hdl;
222 : DWORD wtid;
223 : #endif
224 : #ifdef HAVE_GETTID
225 : pid_t lwptid;
226 : #endif
227 : MT_Id tid;
228 : uintptr_t sp;
229 : char *errbuf;
230 : struct freebats freebats;
231 : };
232 : static struct mtthread mainthread = {
233 : .threadname = "main-thread",
234 : .exited = ATOMIC_VAR_INIT(0),
235 : .working = ATOMIC_PTR_VAR_INIT(NULL),
236 : .semawait = ATOMIC_PTR_VAR_INIT(NULL),
237 : .refs = 1,
238 : .tid = 1,
239 : };
240 : static struct mtthread *mtthreads = &mainthread;
241 :
242 : #ifdef HAVE_PTHREAD_H
243 : static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
244 : static pthread_key_t threadkey;
245 : #define thread_lock() pthread_mutex_lock(&posthread_lock)
246 : #define thread_lock_try() (pthread_mutex_trylock(&posthread_lock) == 0)
247 : #define thread_unlock() pthread_mutex_unlock(&posthread_lock)
248 : #define thread_self() pthread_getspecific(threadkey)
249 : #define thread_setself(self) pthread_setspecific(threadkey, self)
250 : #else
251 : static CRITICAL_SECTION winthread_cs;
252 : static DWORD threadkey = TLS_OUT_OF_INDEXES;
253 : #define thread_lock() EnterCriticalSection(&winthread_cs)
254 : #define thread_lock_try() (TryEnterCriticalSection(&winthread_cs) != 0)
255 : #define thread_unlock() LeaveCriticalSection(&winthread_cs)
256 : #define thread_self() TlsGetValue(threadkey)
257 : #define thread_setself(self) TlsSetValue(threadkey, self)
258 : #endif
259 : static bool thread_initialized = false;
260 :
261 : #if defined(_MSC_VER) && _MSC_VER >= 1900
262 : #pragma warning(disable : 4172)
263 : #endif
264 : static inline uintptr_t
265 95823237 : THRsp(void)
266 : {
267 : #ifdef __has_builtin
268 : #if __has_builtin(__builtin_frame_address)
269 191646480 : return (uintptr_t) __builtin_frame_address(0);
270 : #define BUILTIN_USED
271 : #endif
272 : #endif
273 : #ifndef BUILTIN_USED
274 : int l = 0;
275 : uintptr_t sp = (uintptr_t) (&l);
276 :
277 : return sp;
278 : #endif
279 : #undef BUILTIN_USED
280 : }
281 :
282 : bool
283 95776706 : THRhighwater(void)
284 : {
285 95776706 : struct mtthread *s = thread_self();
286 95776713 : if (s != NULL && s->sp != 0) {
287 95776720 : uintptr_t c = THRsp();
288 95776723 : size_t diff = c < s->sp ? s->sp - c : c - s->sp;
289 95776723 : if (diff > THREAD_STACK_SIZE - 80 * 1024)
290 : return true;
291 : }
292 : return false;
293 : }
294 :
295 : void
296 119 : dump_threads(void)
297 : {
298 119 : char buf[1024];
299 : #if defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) && defined(HAVE_CLOCK_GETTIME)
300 119 : struct timespec ts;
301 119 : clock_gettime(CLOCK_REALTIME, &ts);
302 119 : ts.tv_sec++; /* give it a second */
303 119 : if (pthread_mutex_timedlock(&posthread_lock, &ts) != 0) {
304 0 : printf("Threads are currently locked, so no thread information\n");
305 0 : return;
306 : }
307 : #else
308 : if (!thread_lock_try()) {
309 : MT_sleep_ms(1000);
310 : if (!thread_lock_try()) {
311 : printf("Threads are currently locked, so no thread information\n");
312 : return;
313 : }
314 : }
315 : #endif
316 119 : if (!GDK_TRACER_TEST(M_DEBUG, THRD))
317 119 : printf("Threads:\n");
318 1944 : for (struct mtthread *t = mtthreads; t; t = t->next) {
319 1825 : MT_Lock *lk = t->lockwait;
320 1825 : MT_Sema *sm = ATOMIC_PTR_GET(&t->semawait);
321 1825 : MT_Cond *cn = t->condwait;
322 1825 : struct mtthread *jn = t->joinwait;
323 1825 : const char *working = ATOMIC_PTR_GET(&t->working);
324 1825 : int pos = snprintf(buf, sizeof(buf),
325 : "%s, tid %zu, "
326 : #ifdef HAVE_PTHREAD_H
327 : "Thread 0x%lx, "
328 : #endif
329 : #ifdef HAVE_GETTID
330 : "LWP %ld, "
331 : #endif
332 : "%"PRIu32" free bats, waiting for %s%s, working on %.200s",
333 1825 : t->threadname,
334 : t->tid,
335 : #ifdef HAVE_PTHREAD_H
336 1825 : (long) t->hdl,
337 : #endif
338 : #ifdef HAVE_GETTID
339 1825 : (long) t->lwptid,
340 : #endif
341 : t->freebats.nfreebats,
342 1825 : lk ? "lock " : sm ? "semaphore " : cn ? "condvar " : jn ? "thread " : "",
343 1825 : lk ? lk->name : sm ? sm->name : cn ? cn->name : jn ? jn->threadname : "nothing",
344 1825 : ATOMIC_GET(&t->exited) ? "exiting" :
345 1678 : working ? working : "nothing");
346 : #ifdef LOCK_OWNER
347 1825 : const char *sep = ", locked: ";
348 1831 : for (MT_Lock *l = t->mylocks; l && pos < (int) sizeof(buf); l = l->nxt) {
349 6 : pos += snprintf(buf + pos, sizeof(buf) - pos,
350 6 : "%s%s(%s)", sep, l->name, l->locker);
351 6 : sep = ", ";
352 : }
353 : #endif
354 1825 : TRC_DEBUG_IF(THRD)
355 0 : TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
356 : else
357 3650 : printf("%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
358 : }
359 119 : thread_unlock();
360 : }
361 :
362 : static void
363 46534 : rm_mtthread(struct mtthread *t)
364 : {
365 46534 : struct mtthread **pt;
366 :
367 46534 : assert(t != &mainthread);
368 46534 : thread_lock();
369 885366 : for (pt = &mtthreads; *pt && *pt != t; pt = &(*pt)->next)
370 : ;
371 46534 : if (*pt)
372 46534 : *pt = t->next;
373 46534 : free(t);
374 46534 : thread_unlock();
375 46534 : }
376 :
377 : bool
378 351 : MT_thread_init(void)
379 : {
380 351 : if (thread_initialized)
381 : return true;
382 : #ifdef HAVE_GETTID
383 351 : mainthread.lwptid = gettid();
384 : #endif
385 : #ifdef HAVE_PTHREAD_H
386 351 : int ret;
387 :
388 351 : if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
389 0 : GDKsyserr(ret, "Creating specific key for thread failed");
390 0 : return false;
391 : }
392 351 : mainthread.hdl = pthread_self();
393 351 : if ((ret = thread_setself(&mainthread)) != 0) {
394 0 : GDKsyserr(ret, "Setting specific value failed");
395 0 : return false;
396 : }
397 : #else
398 : threadkey = TlsAlloc();
399 : if (threadkey == TLS_OUT_OF_INDEXES) {
400 : GDKwinerror("Creating thread-local slot for thread failed");
401 : return false;
402 : }
403 : mainthread.wtid = GetCurrentThreadId();
404 : if (thread_setself(&mainthread) == 0) {
405 : GDKwinerror("Setting thread-local value failed");
406 : TlsFree(threadkey);
407 : threadkey = TLS_OUT_OF_INDEXES;
408 : return false;
409 : }
410 : InitializeCriticalSection(&winthread_cs);
411 : #endif
412 351 : thread_initialized = true;
413 351 : return true;
414 : }
415 : bool
416 0 : MT_thread_register(void)
417 : {
418 0 : assert(thread_initialized);
419 0 : if (!thread_initialized)
420 : return false;
421 :
422 0 : struct mtthread *self;
423 :
424 0 : if ((self = thread_self()) != NULL) {
425 0 : if (self->refs == 1000000) {
426 : /* there are limits... */
427 : return false;
428 : }
429 0 : self->refs++;
430 0 : return true;
431 : }
432 :
433 0 : self = malloc(sizeof(*self));
434 0 : if (self == NULL)
435 : return false;
436 :
437 0 : *self = (struct mtthread) {
438 : .detached = false,
439 : #ifdef HAVE_PTHREAD_H
440 0 : .hdl = pthread_self(),
441 : #else
442 : .wtid = GetCurrentThreadId(),
443 : #endif
444 : .refs = 1,
445 0 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
446 : .exited = ATOMIC_VAR_INIT(0),
447 : .working = ATOMIC_PTR_VAR_INIT(NULL),
448 : .semawait = ATOMIC_PTR_VAR_INIT(NULL),
449 : };
450 0 : snprintf(self->threadname, sizeof(self->threadname), "foreign %zu", self->tid);
451 0 : thread_setself(self);
452 0 : thread_lock();
453 0 : self->next = mtthreads;
454 0 : mtthreads = self;
455 0 : thread_unlock();
456 0 : return true;
457 : }
458 :
459 : void
460 0 : MT_thread_deregister(void)
461 : {
462 0 : struct mtthread *self;
463 :
464 0 : if ((self = thread_self()) == NULL)
465 : return;
466 :
467 0 : if (--self->refs == 0) {
468 0 : rm_mtthread(self);
469 0 : thread_setself(NULL);
470 : }
471 : }
472 :
473 : static struct mtthread *
474 7720 : find_mtthread(MT_Id tid)
475 : {
476 7720 : struct mtthread *t;
477 :
478 7720 : thread_lock();
479 30276 : for (t = mtthreads; t && t->tid != tid; t = t->next)
480 : ;
481 7720 : thread_unlock();
482 7720 : return t;
483 : }
484 :
485 : gdk_return
486 352 : MT_alloc_tls(MT_TLS_t *newkey)
487 : {
488 : #ifdef HAVE_PTHREAD_H
489 352 : int ret;
490 352 : if ((ret = pthread_key_create(newkey, NULL)) != 0) {
491 0 : GDKsyserr(ret, "Creating TLS key for thread failed");
492 0 : return GDK_FAIL;
493 : }
494 : #else
495 : if ((*newkey = TlsAlloc()) == TLS_OUT_OF_INDEXES) {
496 : GDKwinerror("Creating TLS key for thread failed");
497 : return GDK_FAIL;
498 : }
499 : #endif
500 : return GDK_SUCCEED;
501 : }
502 :
503 : void
504 46130 : MT_tls_set(MT_TLS_t key, void *val)
505 : {
506 : #ifdef HAVE_PTHREAD_H
507 46130 : pthread_setspecific(key, val);
508 : #else
509 : assert(key != TLS_OUT_OF_INDEXES);
510 : TlsSetValue(key, val);
511 : #endif
512 46105 : }
513 :
514 : void *
515 81256 : MT_tls_get(MT_TLS_t key)
516 : {
517 : #ifdef HAVE_PTHREAD_H
518 81256 : return pthread_getspecific(key);
519 : #else
520 : assert(key != TLS_OUT_OF_INDEXES);
521 : return TlsGetValue(key);
522 : #endif
523 : }
524 :
525 : const char *
526 1394749285 : MT_thread_getname(void)
527 : {
528 1394749285 : struct mtthread *self;
529 :
530 1394749285 : if (!thread_initialized)
531 : return mainthread.threadname;
532 1398797257 : self = thread_self();
533 1406042252 : return self ? self->threadname : UNKNOWN_THREAD;
534 : }
535 :
536 : void
537 88226 : GDKsetbuf(char *errbuf)
538 : {
539 88226 : struct mtthread *self;
540 :
541 88226 : self = thread_self();
542 88251 : if (self == NULL)
543 0 : self = &mainthread;
544 88251 : assert(errbuf == NULL || self->errbuf == NULL);
545 88251 : self->errbuf = errbuf;
546 44122 : if (errbuf)
547 44129 : *errbuf = 0; /* start clean */
548 88251 : }
549 :
550 : char *
551 25657058 : GDKgetbuf(void)
552 : {
553 25657058 : struct mtthread *self;
554 :
555 25657058 : self = thread_self();
556 25687397 : if (self == NULL)
557 0 : self = &mainthread;
558 25687397 : return self->errbuf;
559 : }
560 :
561 : struct freebats *
562 49019324 : MT_thread_getfreebats(void)
563 : {
564 49019324 : struct mtthread *self;
565 :
566 49019324 : self = thread_self();
567 49032370 : if (self == NULL)
568 0 : self = &mainthread;
569 49032370 : return &self->freebats;
570 : }
571 :
572 : void
573 0 : MT_thread_setdata(void *data)
574 : {
575 0 : if (!thread_initialized)
576 : return;
577 0 : struct mtthread *self = thread_self();
578 :
579 0 : if (self)
580 0 : self->data = data;
581 : }
582 :
583 : void *
584 0 : MT_thread_getdata(void)
585 : {
586 0 : if (!thread_initialized)
587 : return NULL;
588 0 : struct mtthread *self = thread_self();
589 :
590 0 : return self ? self->data : NULL;
591 : }
592 :
593 : void
594 28410208 : MT_thread_set_qry_ctx(QryCtx *ctx)
595 : {
596 28410208 : if (!thread_initialized)
597 : return;
598 28420562 : struct mtthread *self = thread_self();
599 :
600 28445427 : if (self)
601 28445427 : self->qry_ctx = ctx;
602 : }
603 :
604 : QryCtx *
605 35483436 : MT_thread_get_qry_ctx(void)
606 : {
607 35483436 : if (!thread_initialized)
608 : return NULL;
609 35483436 : struct mtthread *self = thread_self();
610 :
611 35512576 : return self ? self->qry_ctx : NULL;
612 : }
613 :
614 : void
615 39759855 : MT_thread_setlockwait(MT_Lock *lock)
616 : {
617 39759855 : if (!thread_initialized)
618 : return;
619 39838826 : struct mtthread *self = thread_self();
620 :
621 39946054 : if (self)
622 39946054 : self->lockwait = lock;
623 : }
624 :
625 : void
626 13995547 : MT_thread_setsemawait(MT_Sema *sema)
627 : {
628 13995547 : if (!thread_initialized)
629 : return;
630 14007501 : struct mtthread *self = thread_self();
631 :
632 14106597 : if (self)
633 14106597 : ATOMIC_PTR_SET(&self->semawait, sema);
634 : }
635 :
636 : static void
637 18 : MT_thread_setcondwait(MT_Cond *cond)
638 : {
639 18 : if (!thread_initialized)
640 : return;
641 18 : struct mtthread *self = thread_self();
642 :
643 18 : if (self)
644 18 : self->condwait = cond;
645 : }
646 :
647 : #ifdef LOCK_OWNER
648 : void
649 1368446635 : MT_thread_add_mylock(MT_Lock *lock)
650 : {
651 1368446635 : struct mtthread *self;
652 1368446635 : if (!thread_initialized)
653 : self = &mainthread;
654 : else
655 1366443294 : self = thread_self();
656 :
657 1373149380 : if (self) {
658 1375152721 : lock->nxt = self->mylocks;
659 1375152721 : self->mylocks = lock;
660 : }
661 1375152721 : }
662 :
663 : void
664 1360376539 : MT_thread_del_mylock(MT_Lock *lock)
665 : {
666 1360376539 : struct mtthread *self;
667 1360376539 : if (!thread_initialized)
668 : self = &mainthread;
669 : else
670 1354144891 : self = thread_self();
671 :
672 1367177524 : if (self) {
673 1373409172 : if (self->mylocks == lock) {
674 1373399224 : self->mylocks = lock->nxt;
675 : } else {
676 75849 : for (MT_Lock *l = self->mylocks; l; l = l->nxt) {
677 75849 : if (l->nxt == lock) {
678 9948 : l->nxt = lock->nxt;
679 9948 : break;
680 : }
681 : }
682 : }
683 : }
684 1373409172 : }
685 : #endif
686 :
687 : void
688 19649116 : MT_thread_setworking(const char *work)
689 : {
690 19649116 : if (!thread_initialized)
691 : return;
692 19653774 : struct mtthread *self = thread_self();
693 :
694 19672401 : if (self) {
695 19672401 : if (work == NULL)
696 941752 : ATOMIC_PTR_SET(&self->working, NULL);
697 18730649 : else if (strcmp(work, "store locked") == 0)
698 721321 : self->limit_override = true;
699 18009328 : else if (strcmp(work, "store unlocked") == 0)
700 721321 : self->limit_override = false;
701 : else
702 17288007 : ATOMIC_PTR_SET(&self->working, work);
703 : }
704 : }
705 :
706 : void
707 72335421 : MT_thread_setalgorithm(const char *algo)
708 : {
709 72335421 : if (!thread_initialized)
710 : return;
711 72348825 : struct mtthread *self = thread_self();
712 :
713 72389778 : if (self) {
714 72389778 : if (algo) {
715 4768960 : if (self->algolen > 0) {
716 2457080 : if (self->algolen < sizeof(self->algorithm))
717 1458302 : self->algolen += strconcat_len(self->algorithm + self->algolen, sizeof(self->algorithm) - self->algolen, "; ", algo, NULL);
718 : } else
719 2311880 : self->algolen = strcpy_len(self->algorithm, algo, sizeof(self->algorithm));
720 : } else {
721 67620818 : self->algorithm[0] = 0;
722 67620818 : self->algolen = 0;
723 : }
724 : }
725 : }
726 :
727 : const char *
728 1839 : MT_thread_getalgorithm(void)
729 : {
730 1839 : if (!thread_initialized)
731 : return NULL;
732 1839 : struct mtthread *self = thread_self();
733 :
734 1841 : return self && self->algorithm[0] ? self->algorithm : NULL;
735 : }
736 :
737 : bool
738 0 : MT_thread_override_limits(void)
739 : {
740 0 : if (!thread_initialized)
741 : return false;
742 0 : struct mtthread *self = thread_self();
743 :
744 0 : return self && self->limit_override;
745 : }
746 :
747 : static struct thread_init_cb {
748 : struct thread_init_cb *next;
749 : void (*init)(void *);
750 : void (*destroy)(void *);
751 : void *data;
752 : } *init_cb;
753 : static MT_Lock thread_init_lock = MT_LOCK_INITIALIZER(thread_init_lock);
754 :
755 : gdk_return
756 346 : MT_thread_init_add_callback(void (*init)(void *), void (*destroy)(void *), void *data)
757 : {
758 346 : struct thread_init_cb *p = GDKmalloc(sizeof(struct thread_init_cb));
759 :
760 346 : if (p == NULL)
761 : return GDK_FAIL;
762 346 : *p = (struct thread_init_cb) {
763 : .init = init,
764 : .destroy = destroy,
765 : .next = NULL,
766 : .data = data,
767 : };
768 346 : MT_lock_set(&thread_init_lock);
769 346 : struct thread_init_cb **pp = &init_cb;
770 346 : while (*pp)
771 0 : pp = &(*pp)->next;
772 346 : *pp = p;
773 346 : MT_lock_unset(&thread_init_lock);
774 346 : return GDK_SUCCEED;
775 : }
776 :
777 : #ifdef HAVE_PTHREAD_H
778 : static void *
779 : #else
780 : static DWORD WINAPI
781 : #endif
782 46540 : thread_starter(void *arg)
783 : {
784 46540 : struct mtthread *self = (struct mtthread *) arg;
785 46540 : void *data = self->data;
786 :
787 : #ifdef HAVE_GETTID
788 46540 : self->lwptid = gettid();
789 : #endif
790 : #ifdef HAVE_PTHREAD_H
791 : #ifdef HAVE_PTHREAD_SETNAME_NP
792 : /* name can be at most 16 chars including \0 */
793 46534 : char name[16];
794 46534 : (void) strcpy_len(name, self->threadname, sizeof(name));
795 46543 : pthread_setname_np(
796 : #ifndef __APPLE__
797 : pthread_self(),
798 : #endif
799 : name);
800 : #endif
801 : #else
802 : #ifdef HAVE_SETTHREADDESCRIPTION
803 : wchar_t *wname = utf8toutf16(self->threadname);
804 : static_assert(SIZEOF_WCHAR_T == 2, "wchar_t on Windows expected to be 2 bytes");
805 : if (wname != NULL) {
806 : SetThreadDescription(GetCurrentThread(), wname);
807 : free(wname);
808 : }
809 : #endif
810 : #endif
811 46517 : self->data = NULL;
812 46517 : self->sp = THRsp();
813 46520 : thread_setself(self);
814 92586 : for (int i = 0; i < self->nthread_funcs; i++) {
815 46060 : if (self->thread_funcs[i].init)
816 46060 : (*self->thread_funcs[i].init)(self->thread_funcs[i].data);
817 : }
818 46525 : (*self->func)(data);
819 92583 : for (int i = 0; i < self->nthread_funcs; i++) {
820 46057 : if (self->thread_funcs[i].destroy)
821 46057 : (*self->thread_funcs[i].destroy)(self->thread_funcs[i].data);
822 : }
823 46509 : free(self->thread_funcs);
824 46509 : BBPrelinquishbats();
825 46524 : ATOMIC_SET(&self->exited, 1);
826 46524 : TRC_DEBUG(THRD, "Exit thread \"%s\"\n", self->threadname);
827 46524 : return 0; /* NULL for pthreads, 0 for Windows */
828 : }
829 :
830 : static void
831 54622 : join_threads(void)
832 : {
833 54622 : bool waited;
834 :
835 54622 : struct mtthread *self = thread_self();
836 54622 : if (!self)
837 : return;
838 54622 : thread_lock();
839 91981 : do {
840 91981 : waited = false;
841 2795080 : for (struct mtthread *t = mtthreads; t; t = t->next) {
842 2740458 : if (ATOMIC_GET(&t->exited) && t->detached && !t->waiting) {
843 37359 : t->waiting = true;
844 37359 : thread_unlock();
845 37359 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
846 37359 : self->joinwait = t;
847 : #ifdef HAVE_PTHREAD_H
848 37359 : pthread_join(t->hdl, NULL);
849 : #else
850 : WaitForSingleObject(t->hdl, INFINITE);
851 : #endif
852 37359 : self->joinwait = NULL;
853 : #ifndef HAVE_PTHREAD_H
854 : CloseHandle(t->hdl);
855 : #endif
856 37359 : rm_mtthread(t);
857 37359 : waited = true;
858 37359 : thread_lock();
859 37359 : break;
860 : }
861 : }
862 91981 : } while (waited);
863 54622 : thread_unlock();
864 : }
865 :
866 : void
867 712 : join_detached_threads(void)
868 : {
869 712 : bool waited;
870 :
871 712 : struct mtthread *self = thread_self();
872 712 : thread_lock();
873 2167 : do {
874 2167 : waited = false;
875 10738 : for (struct mtthread *t = mtthreads; t; t = t->next) {
876 10026 : if (t->detached && !t->waiting) {
877 1455 : t->waiting = true;
878 1455 : thread_unlock();
879 1455 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
880 1455 : self->joinwait = t;
881 : #ifdef HAVE_PTHREAD_H
882 1455 : pthread_join(t->hdl, NULL);
883 : #else
884 : WaitForSingleObject(t->hdl, INFINITE);
885 : #endif
886 1455 : self->joinwait = NULL;
887 : #ifndef HAVE_PTHREAD_H
888 : CloseHandle(t->hdl);
889 : #endif
890 1455 : rm_mtthread(t);
891 1455 : waited = true;
892 1455 : thread_lock();
893 1455 : break;
894 : }
895 : }
896 2167 : } while (waited);
897 712 : thread_unlock();
898 712 : }
899 :
900 : int
901 46546 : MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname)
902 : {
903 46546 : struct mtthread *self;
904 :
905 46546 : assert(thread_initialized);
906 46546 : join_threads();
907 46546 : if (threadname == NULL) {
908 0 : TRC_CRITICAL(GDK, "Thread must have a name\n");
909 0 : return -1;
910 : }
911 46546 : if (strlen(threadname) >= sizeof(self->threadname)) {
912 0 : TRC_CRITICAL(GDK, "Thread's name is too large\n");
913 0 : return -1;
914 : }
915 :
916 : #ifdef HAVE_PTHREAD_H
917 46546 : pthread_attr_t attr;
918 46546 : int ret;
919 46546 : if ((ret = pthread_attr_init(&attr)) != 0) {
920 0 : GDKsyserr(ret, "Cannot init pthread attr");
921 0 : return -1;
922 : }
923 46546 : if ((ret = pthread_attr_setstacksize(&attr, THREAD_STACK_SIZE)) != 0) {
924 0 : GDKsyserr(ret, "Cannot set stack size");
925 0 : pthread_attr_destroy(&attr);
926 0 : return -1;
927 : }
928 : #endif
929 46546 : self = malloc(sizeof(*self));
930 46546 : if (self == NULL) {
931 0 : GDKsyserror("Cannot allocate memory\n");
932 : #ifdef HAVE_PTHREAD_H
933 0 : pthread_attr_destroy(&attr);
934 : #endif
935 0 : return -1;
936 : }
937 :
938 46546 : *self = (struct mtthread) {
939 : .func = f,
940 : .data = arg,
941 : .waiting = false,
942 46546 : .detached = (d == MT_THR_DETACHED),
943 : .refs = 1,
944 46546 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
945 : .exited = ATOMIC_VAR_INIT(0),
946 : .working = ATOMIC_PTR_VAR_INIT(NULL),
947 : .semawait = ATOMIC_PTR_VAR_INIT(NULL),
948 : };
949 46546 : MT_lock_set(&thread_init_lock);
950 : /* remember the list of callback functions we need to call for
951 : * this thread (i.e. anything registered so far) */
952 92611 : for (struct thread_init_cb *p = init_cb; p; p = p->next)
953 46065 : self->nthread_funcs++;
954 46546 : if (self->nthread_funcs > 0) {
955 46065 : self->thread_funcs = malloc(self->nthread_funcs * sizeof(*self->thread_funcs));
956 46065 : if (self->thread_funcs == NULL) {
957 0 : GDKsyserror("Cannot allocate memory\n");
958 0 : MT_lock_unset(&thread_init_lock);
959 0 : free(self);
960 : #ifdef HAVE_PTHREAD_H
961 0 : pthread_attr_destroy(&attr);
962 : #endif
963 0 : return -1;
964 : }
965 46065 : int n = 0;
966 92130 : for (struct thread_init_cb *p = init_cb; p; p = p->next) {
967 46065 : self->thread_funcs[n++] = (struct thread_funcs) {
968 46065 : .init = p->init,
969 46065 : .destroy = p->destroy,
970 46065 : .data = p->data,
971 : };
972 : }
973 : }
974 46546 : MT_lock_unset(&thread_init_lock);
975 :
976 46546 : strcpy_len(self->threadname, threadname, sizeof(self->threadname));
977 46546 : char *p;
978 46546 : if ((p = strstr(self->threadname, "XXXX")) != NULL) {
979 : /* overwrite XXXX with thread ID; bottom three bits are
980 : * likely 0, so skip those */
981 42500 : char buf[5];
982 42500 : snprintf(buf, 5, "%04zu", self->tid % 9999);
983 42500 : memcpy(p, buf, 4);
984 : }
985 46546 : TRC_DEBUG(THRD, "Create thread \"%s\"\n", self->threadname);
986 : #ifdef HAVE_PTHREAD_H
987 : #ifdef HAVE_PTHREAD_SIGMASK
988 46546 : sigset_t new_mask, orig_mask;
989 46546 : (void) sigfillset(&new_mask);
990 46546 : sigdelset(&new_mask, SIGQUIT);
991 46546 : sigdelset(&new_mask, SIGPROF);
992 46546 : pthread_sigmask(SIG_SETMASK, &new_mask, &orig_mask);
993 : #endif
994 46546 : ret = pthread_create(&self->hdl, &attr, thread_starter, self);
995 46546 : pthread_attr_destroy(&attr);
996 : #ifdef HAVE_PTHREAD_SIGMASK
997 46546 : pthread_sigmask(SIG_SETMASK, &orig_mask, NULL);
998 : #endif
999 46546 : if (ret != 0) {
1000 0 : GDKsyserr(ret, "Cannot start thread");
1001 0 : free(self->thread_funcs);
1002 0 : free(self);
1003 0 : return -1;
1004 : }
1005 : #else
1006 : self->hdl = CreateThread(NULL, THREAD_STACK_SIZE, thread_starter, self,
1007 : 0, &self->wtid);
1008 : if (self->hdl == NULL) {
1009 : GDKwinerror("Failed to create thread");
1010 : free(self->thread_funcs);
1011 : free(self);
1012 : return -1;
1013 : }
1014 : #endif
1015 : /* must not fail after this: the thread has been started */
1016 46546 : *t = self->tid;
1017 46546 : thread_lock();
1018 46546 : self->next = mtthreads;
1019 46546 : mtthreads = self;
1020 46546 : thread_unlock();
1021 46546 : return 0;
1022 : }
1023 :
1024 : MT_Id
1025 104983663 : MT_getpid(void)
1026 : {
1027 104983663 : struct mtthread *self;
1028 :
1029 104983663 : if (!thread_initialized)
1030 : self = &mainthread;
1031 : else
1032 105052928 : self = thread_self();
1033 105049364 : return self->tid;
1034 : }
1035 :
1036 : void
1037 37548 : MT_exiting_thread(void)
1038 : {
1039 37548 : struct mtthread *self;
1040 :
1041 37548 : if (!thread_initialized)
1042 : return;
1043 37549 : self = thread_self();
1044 37549 : if (self) {
1045 37549 : ATOMIC_SET(&self->exited, 1);
1046 37549 : ATOMIC_PTR_SET(&self->working, NULL);
1047 : }
1048 : }
1049 :
1050 : int
1051 7720 : MT_join_thread(MT_Id tid)
1052 : {
1053 7720 : struct mtthread *t;
1054 :
1055 7720 : assert(tid != mainthread.tid);
1056 7720 : join_threads();
1057 7720 : t = find_mtthread(tid);
1058 7720 : if (t == NULL
1059 : #ifndef HAVE_PTHREAD_H
1060 : || t->hdl == NULL
1061 : #endif
1062 : )
1063 : return -1;
1064 7720 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
1065 7720 : struct mtthread *self = thread_self();
1066 7720 : self->joinwait = t;
1067 : #ifdef HAVE_PTHREAD_H
1068 7720 : int ret = pthread_join(t->hdl, NULL);
1069 : #else
1070 : DWORD ret = WaitForSingleObject(t->hdl, INFINITE);
1071 : #endif
1072 7720 : self->joinwait = NULL;
1073 7720 : if (
1074 : #ifdef HAVE_PTHREAD_H
1075 : ret == 0
1076 : #else
1077 : ret == WAIT_OBJECT_0 && CloseHandle(t->hdl)
1078 : #endif
1079 : ) {
1080 7720 : rm_mtthread(t);
1081 7720 : return 0;
1082 : }
1083 : return -1;
1084 : }
1085 :
1086 : static bool
1087 0 : MT_kill_thread(struct mtthread *t)
1088 : {
1089 0 : assert(t != thread_self());
1090 : #ifdef HAVE_PTHREAD_H
1091 : #ifdef HAVE_PTHREAD_KILL
1092 0 : if (pthread_kill(t->hdl, SIGHUP) == 0)
1093 0 : return true;
1094 : #endif
1095 : #else
1096 : if (t->hdl == NULL) {
1097 : /* detached thread */
1098 : HANDLE h;
1099 : bool ret = false;
1100 : h = OpenThread(THREAD_ALL_ACCESS, 0, t->wtid);
1101 : if (h == NULL)
1102 : return false;
1103 : if (TerminateThread(h, -1))
1104 : ret = true;
1105 : CloseHandle(h);
1106 : return ret;
1107 : }
1108 : if (TerminateThread(t->hdl, -1))
1109 : return true;
1110 : #endif
1111 : return false;
1112 : }
1113 :
1114 : bool
1115 356 : MT_kill_threads(void)
1116 : {
1117 356 : struct mtthread *self = thread_self();
1118 356 : bool killed = false;
1119 :
1120 356 : assert(self == &mainthread);
1121 356 : join_threads();
1122 356 : thread_lock();
1123 712 : for (struct mtthread *t = mtthreads; t; t = t->next) {
1124 356 : if (t == self)
1125 356 : continue;
1126 0 : GDKwarning("Killing thread %s\n", t->threadname);
1127 0 : killed |= MT_kill_thread(t);
1128 : }
1129 356 : thread_unlock();
1130 356 : return killed;
1131 : }
1132 :
1133 : int
1134 358 : MT_check_nr_cores(void)
1135 : {
1136 358 : int ncpus = -1;
1137 :
1138 : #if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_ONLN)
1139 : /* this works on Linux, Solaris and AIX */
1140 358 : ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1141 : #elif defined(HW_NCPU) /* BSD */
1142 : size_t len = sizeof(int);
1143 : int mib[3];
1144 :
1145 : /* Everyone should have permission to make this call,
1146 : * if we get a failure something is really wrong. */
1147 : mib[0] = CTL_HW;
1148 : mib[1] = HW_NCPU;
1149 : mib[2] = -1;
1150 : sysctl(mib, 3, &ncpus, &len, NULL, 0);
1151 : #elif defined(WIN32)
1152 : SYSTEM_INFO sysinfo;
1153 :
1154 : GetSystemInfo(&sysinfo);
1155 : ncpus = sysinfo.dwNumberOfProcessors;
1156 : #endif
1157 :
1158 : /* if we ever need HPUX or OSF/1 (hope not), see
1159 : * http://ndevilla.free.fr/threads/ */
1160 :
1161 358 : if (ncpus <= 0)
1162 : ncpus = 1;
1163 : #if SIZEOF_SIZE_T == SIZEOF_INT
1164 : /* On 32-bits systems with large numbers of cpus/cores, we
1165 : * quickly run out of space due to the number of threads in
1166 : * use. Since it is questionable whether many cores on a
1167 : * 32-bits system are going to be beneficial due to this, we
1168 : * simply limit the auto-detected cores to 16 on 32-bits
1169 : * systems. The user can always override this via
1170 : * gdk_nr_threads. */
1171 : if (ncpus > 16)
1172 : ncpus = 16;
1173 : #endif
1174 :
1175 : #ifndef WIN32
1176 : /* get the number of allocated cpus from the cgroup settings */
1177 358 : FILE *f = fopen("/sys/fs/cgroup/cpuset/cpuset.cpus", "r");
1178 358 : if (f != NULL) {
1179 0 : char buf[512];
1180 0 : char *p = fgets(buf, 512, f);
1181 0 : fclose(f);
1182 0 : if (p != NULL) {
1183 : /* syntax is: ranges of CPU numbers separated
1184 : * by comma; a range is either a single CPU
1185 : * id, or two IDs separated by a minus; any
1186 : * deviation causes the file to be ignored */
1187 : int ncpu = 0;
1188 0 : for (;;) {
1189 0 : char *q;
1190 0 : unsigned fst = strtoul(p, &q, 10);
1191 0 : if (q == p)
1192 0 : return ncpus;
1193 0 : ncpu++;
1194 0 : if (*q == '-') {
1195 0 : p = q + 1;
1196 0 : unsigned lst = strtoul(p, &q, 10);
1197 0 : if (q == p || lst <= fst)
1198 : return ncpus;
1199 0 : ncpu += lst - fst;
1200 : }
1201 0 : if (*q == '\n')
1202 : break;
1203 0 : if (*q != ',')
1204 : return ncpus;
1205 0 : p = q + 1;
1206 : }
1207 0 : if (ncpu < ncpus)
1208 : return ncpu;
1209 : }
1210 : }
1211 : #endif
1212 :
1213 : return ncpus;
1214 : }
1215 :
1216 :
1217 : void
1218 2867558 : MT_cond_init(MT_Cond *cond, const char *name)
1219 : {
1220 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1221 : InitializeConditionVariable(&cond->cv);
1222 : #else
1223 2867558 : pthread_cond_init(&cond->cv, NULL);
1224 : #endif
1225 2867558 : strcpy_len(cond->name, name, sizeof(cond->name));
1226 2867558 : }
1227 :
1228 :
1229 : void
1230 0 : MT_cond_destroy(MT_Cond *cond)
1231 : {
1232 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1233 : /* no need */
1234 : #else
1235 0 : pthread_cond_destroy(&cond->cv);
1236 : #endif
1237 0 : }
1238 :
1239 : void
1240 9 : MT_cond_wait(MT_Cond *cond, MT_Lock *lock)
1241 : {
1242 9 : MT_thread_setcondwait(cond);
1243 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1244 : SleepConditionVariableCS(&cond->cv, &lock->lock, INFINITE);
1245 : #else
1246 9 : pthread_cond_wait(&cond->cv, &lock->lock);
1247 : #endif
1248 9 : MT_thread_setcondwait(NULL);
1249 9 : }
1250 :
1251 : void
1252 56873 : MT_cond_signal(MT_Cond *cond)
1253 : {
1254 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1255 : WakeConditionVariable(&cond->cv);
1256 : #else
1257 56873 : pthread_cond_signal(&cond->cv);
1258 : #endif
1259 56873 : }
1260 :
1261 : void
1262 3224698 : MT_cond_broadcast(MT_Cond *cond)
1263 : {
1264 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1265 : WakeAllConditionVariable(&cond->cv);
1266 : #else
1267 3224698 : pthread_cond_broadcast(&cond->cv);
1268 : #endif
1269 3224698 : }
|