Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * @a Niels Nes, Peter Boncz
15 : * @+ Threads
16 : * This file contains a wrapper layer for threading, hence the
17 : * underscore convention MT_x (Multi-Threading). As all platforms
18 : * that MonetDB runs on now support POSIX Threads (pthreads), this
19 : * wrapping layer has become rather thin.
20 : *
21 : * In the late 1990s when multi-threading support was introduced in
22 : * MonetDB, pthreads was just emerging as a standard API and not
23 : * widely adopted yet. The earliest MT implementation focused on SGI
24 : * Unix and provided multi- threading using multiple processses, and
25 : * shared memory.
26 : *
27 : * One of the relics of this model, namely the need to pre-allocate
28 : * locks and semaphores, and consequently a maximum number of them,
29 : * has been removed in the latest iteration of this layer.
30 : *
31 : */
32 : /*
33 : * @- Mthreads Routine implementations
34 : */
35 : #include "monetdb_config.h"
36 : #include "mstring.h"
37 : #include "gdk.h"
38 : #include "gdk_system_private.h"
39 :
40 : #include <time.h>
41 :
42 : #ifdef HAVE_FTIME
43 : #include <sys/timeb.h> /* ftime */
44 : #endif
45 : #ifdef HAVE_SYS_TIME_H
46 : #include <sys/time.h> /* gettimeofday */
47 : #endif
48 :
49 : #include <signal.h>
50 : #include <string.h> /* for strerror */
51 : #include <unistd.h> /* for sysconf symbols */
52 :
53 : #include "mutils.h"
54 :
55 : static ATOMIC_TYPE GDKthreadid = ATOMIC_VAR_INIT(1);
56 :
57 : #ifdef LOCK_STATS
58 :
59 : ATOMIC_TYPE GDKlockcnt = ATOMIC_VAR_INIT(0);
60 : ATOMIC_TYPE GDKlockcontentioncnt = ATOMIC_VAR_INIT(0);
61 : ATOMIC_TYPE GDKlocksleepcnt = ATOMIC_VAR_INIT(0);
62 : MT_Lock *volatile GDKlocklist = 0;
63 : ATOMIC_FLAG GDKlocklistlock = ATOMIC_FLAG_INIT;
64 :
65 : /* merge sort of linked list */
66 : static MT_Lock *
67 : sortlocklist(MT_Lock *l)
68 : {
69 : MT_Lock *r, *t, *ll = NULL;
70 :
71 : if (l == NULL || l->next == NULL) {
72 : /* list is trivially sorted (0 or 1 element) */
73 : return l;
74 : }
75 : /* break list into two (almost) equal pieces:
76 : * l is start of "left" list, r of "right" list, ll last
77 : * element of "left" list */
78 : for (t = r = l; t && t->next; t = t->next->next) {
79 : ll = r;
80 : r = r->next;
81 : }
82 : ll->next = NULL; /* break list into two */
83 : r->prev = NULL;
84 : /* recursively sort both sublists */
85 : l = sortlocklist(l);
86 : r = sortlocklist(r);
87 : /* merge
88 : * t is new list, ll is last element of new list, l and r are
89 : * start of unprocessed part of left and right lists */
90 : t = ll = NULL;
91 : while (l && r) {
92 : if (ATOMIC_GET(&l->sleep) < ATOMIC_GET(&r->sleep) ||
93 : (ATOMIC_GET(&l->sleep) == ATOMIC_GET(&r->sleep) &&
94 : (ATOMIC_GET(&l->contention) < ATOMIC_GET(&r->contention) ||
95 : (ATOMIC_GET(&l->contention) == ATOMIC_GET(&r->contention) &&
96 : l->count <= r->count)))) {
97 : /* l is smaller */
98 : if (ll == NULL) {
99 : assert(t == NULL);
100 : t = ll = l;
101 : } else {
102 : ll->next = l;
103 : l->prev = ll;
104 : ll = ll->next;
105 : }
106 : l = l->next;
107 : } else {
108 : /* r is smaller */
109 : if (ll == NULL) {
110 : assert(t == NULL);
111 : t = ll = r;
112 : } else {
113 : ll->next = r;
114 : r->prev = ll;
115 : ll = ll->next;
116 : }
117 : r = r->next;
118 : }
119 : }
120 : /* append rest of remaining list */
121 : if (l) {
122 : ll->next = l;
123 : l->prev = ll;
124 : } else {
125 : ll->next = r;
126 : r->prev = ll;
127 : }
128 : return t;
129 : }
130 :
131 : static inline bool
132 : lock_isset(MT_Lock *l)
133 : {
134 : if (MT_lock_try(l)) {
135 : MT_lock_unset(l);
136 : return false;
137 : }
138 : return true;
139 : }
140 :
141 : /* function used for debugging */
142 : void
143 : GDKlockstatistics(int what)
144 : {
145 : MT_Lock *l;
146 : int n = 0;
147 :
148 : if (ATOMIC_TAS(&GDKlocklistlock) != 0) {
149 : printf("GDKlocklistlock is set, so cannot access lock list\n");
150 : return;
151 : }
152 : if (what == -1) {
153 : for (l = GDKlocklist; l; l = l->next) {
154 : l->count = 0;
155 : ATOMIC_SET(&l->contention, 0);
156 : ATOMIC_SET(&l->sleep, 0);
157 : }
158 : ATOMIC_CLEAR(&GDKlocklistlock);
159 : return;
160 : }
161 : GDKlocklist = sortlocklist(GDKlocklist);
162 : printf("%-18s\t%s\t%s\t%s\t%s\t%s\t%s\n",
163 : "lock name", "count", "content", "sleep",
164 : "locked", "locker", "thread");
165 : for (l = GDKlocklist; l; l = l->next) {
166 : n++;
167 : if (what == 0 ||
168 : (what == 1 && l->count) ||
169 : (what == 2 && ATOMIC_GET(&l->contention)) ||
170 : (what == 3 && lock_isset(l)))
171 : printf("%-18s\t%zu\t%zu\t%zu\t%s\t%s\t%s\n",
172 : l->name, l->count,
173 : (size_t) ATOMIC_GET(&l->contention),
174 : (size_t) ATOMIC_GET(&l->sleep),
175 : lock_isset(l) ? "locked" : "",
176 : l->locker ? l->locker : "",
177 : l->thread ? l->thread : "");
178 : }
179 : printf("Number of locks: %d\n", n);
180 : printf("Total lock count: %zu\n", (size_t) ATOMIC_GET(&GDKlockcnt));
181 : printf("Lock contention: %zu\n", (size_t) ATOMIC_GET(&GDKlockcontentioncnt));
182 : printf("Lock sleep count: %zu\n", (size_t) ATOMIC_GET(&GDKlocksleepcnt));
183 : fflush(stdout);
184 : ATOMIC_CLEAR(&GDKlocklistlock);
185 : }
186 :
187 : #endif /* LOCK_STATS */
188 :
189 : struct thread_funcs {
190 : void (*init)(void *);
191 : void (*destroy)(void *);
192 : void *data;
193 : };
194 :
195 : static struct mtthread {
196 : struct mtthread *next;
197 : void (*func) (void *); /* function to be called */
198 : void *data; /* and its data */
199 : struct thread_funcs *thread_funcs; /* callback funcs */
200 : int nthread_funcs;
201 : MT_Lock *lockwait; /* lock we're waiting for */
202 : MT_Sema *semawait; /* semaphore we're waiting for */
203 : MT_Cond *condwait; /* condition variable we're waiting for */
204 : #ifdef LOCK_OWNER
205 : MT_Lock *mylocks; /* locks we're holding */
206 : #endif
207 : struct mtthread *joinwait; /* process we are joining with */
208 : const char *working; /* what we're currently doing */
209 : char algorithm[512]; /* the algorithm used in the last operation */
210 : size_t algolen; /* length of string in .algorithm */
211 : ATOMIC_TYPE exited;
212 : bool detached:1, waiting:1;
213 : unsigned int refs:20;
214 : bool limit_override; /* not in bit field because of data races */
215 : char threadname[MT_NAME_LEN];
216 : QryCtx *qry_ctx;
217 : #ifdef HAVE_PTHREAD_H
218 : pthread_t hdl;
219 : #else
220 : HANDLE hdl;
221 : DWORD wtid;
222 : #endif
223 : #ifdef HAVE_GETTID
224 : pid_t lwptid;
225 : #endif
226 : MT_Id tid;
227 : uintptr_t sp;
228 : char *errbuf;
229 : struct freebats freebats;
230 : } *mtthreads = NULL;
231 : struct mtthread mainthread = {
232 : .threadname = "main thread",
233 : .exited = ATOMIC_VAR_INIT(0),
234 : .refs = 1,
235 : .tid = 1,
236 : };
237 : #ifdef HAVE_PTHREAD_H
238 : static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
239 : static pthread_key_t threadkey;
240 : #define thread_lock() pthread_mutex_lock(&posthread_lock)
241 : #define thread_unlock() pthread_mutex_unlock(&posthread_lock)
242 : #define thread_self() pthread_getspecific(threadkey)
243 : #define thread_setself(self) pthread_setspecific(threadkey, self)
244 : #else
245 : static CRITICAL_SECTION winthread_cs;
246 : static DWORD threadkey = TLS_OUT_OF_INDEXES;
247 : #define thread_lock() EnterCriticalSection(&winthread_cs)
248 : #define thread_unlock() LeaveCriticalSection(&winthread_cs)
249 : #define thread_self() TlsGetValue(threadkey)
250 : #define thread_setself(self) TlsSetValue(threadkey, self)
251 : #endif
252 : static bool thread_initialized = false;
253 :
254 : #if defined(_MSC_VER) && _MSC_VER >= 1900
255 : #pragma warning(disable : 4172)
256 : #endif
257 : static inline uintptr_t
258 95817866 : THRsp(void)
259 : {
260 : #if defined(__GNUC__) || defined(__clang__)
261 191635733 : return (uintptr_t) __builtin_frame_address(0);
262 : #else
263 : int l = 0;
264 : uintptr_t sp = (uintptr_t) (&l);
265 :
266 : return sp;
267 : #endif
268 : }
269 :
270 : bool
271 95771271 : THRhighwater(void)
272 : {
273 95771271 : struct mtthread *s = thread_self();
274 95771277 : if (s != NULL && s->sp != 0) {
275 95771240 : uintptr_t c = THRsp();
276 95771238 : size_t diff = c < s->sp ? s->sp - c : c - s->sp;
277 95771238 : if (diff > THREAD_STACK_SIZE - 80 * 1024)
278 : return true;
279 : }
280 : return false;
281 : }
282 :
283 : void
284 114 : dump_threads(void)
285 : {
286 114 : char buf[1024];
287 114 : thread_lock();
288 1835 : for (struct mtthread *t = mtthreads; t; t = t->next) {
289 1721 : MT_Lock *lk = t->lockwait;
290 1721 : MT_Sema *sm = t->semawait;
291 1721 : MT_Cond *cn = t->condwait;
292 1721 : struct mtthread *jn = t->joinwait;
293 1721 : int pos = snprintf(buf, sizeof(buf),
294 : "%s, tid %zu, "
295 : #ifdef HAVE_PTHREAD_H
296 : "Thread 0x%lx, "
297 : #endif
298 : #ifdef HAVE_GETTID
299 : "LWP %ld, "
300 : #endif
301 : "%"PRIu32" free bats, waiting for %s%s, working on %.200s",
302 1721 : t->threadname,
303 : t->tid,
304 : #ifdef HAVE_PTHREAD_H
305 1721 : (long) t->hdl,
306 : #endif
307 : #ifdef HAVE_GETTID
308 1721 : (long) t->lwptid,
309 : #endif
310 : t->freebats.nfreebats,
311 1721 : lk ? "lock " : sm ? "semaphore " : cn ? "condvar " : jn ? "thread " : "",
312 1721 : lk ? lk->name : sm ? sm->name : cn ? cn->name : jn ? jn->threadname : "nothing",
313 1721 : ATOMIC_GET(&t->exited) ? "exiting" :
314 1607 : t->working ? t->working : "nothing");
315 : #ifdef LOCK_OWNER
316 1721 : const char *sep = ", locked: ";
317 1723 : for (MT_Lock *l = t->mylocks; l && pos < (int) sizeof(buf); l = l->nxt) {
318 2 : pos += snprintf(buf + pos, sizeof(buf) - pos,
319 2 : "%s%s(%s)", sep, l->name, l->locker);
320 2 : sep = ", ";
321 : }
322 : #endif
323 1721 : TRC_DEBUG_IF(THRD)
324 0 : TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
325 : else
326 3442 : printf("%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
327 : }
328 114 : thread_unlock();
329 114 : }
330 :
331 : static void
332 46646 : rm_mtthread(struct mtthread *t)
333 : {
334 46646 : struct mtthread **pt;
335 :
336 46646 : assert(t != &mainthread);
337 46646 : thread_lock();
338 838879 : for (pt = &mtthreads; *pt && *pt != t; pt = &(*pt)->next)
339 : ;
340 46646 : if (*pt)
341 46646 : *pt = t->next;
342 46646 : free(t);
343 46646 : thread_unlock();
344 46646 : }
345 :
346 : bool
347 334 : MT_thread_init(void)
348 : {
349 334 : if (thread_initialized)
350 : return true;
351 : #ifdef HAVE_GETTID
352 334 : mainthread.lwptid = gettid();
353 : #endif
354 : #ifdef HAVE_PTHREAD_H
355 334 : int ret;
356 :
357 334 : if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
358 0 : GDKsyserr(ret, "Creating specific key for thread failed");
359 0 : return false;
360 : }
361 334 : mainthread.hdl = pthread_self();
362 334 : if ((ret = thread_setself(&mainthread)) != 0) {
363 0 : GDKsyserr(ret, "Setting specific value failed");
364 0 : return false;
365 : }
366 : #else
367 : threadkey = TlsAlloc();
368 : if (threadkey == TLS_OUT_OF_INDEXES) {
369 : GDKwinerror("Creating thread-local slot for thread failed");
370 : return false;
371 : }
372 : mainthread.wtid = GetCurrentThreadId();
373 : if (thread_setself(&mainthread) == 0) {
374 : GDKwinerror("Setting thread-local value failed");
375 : TlsFree(threadkey);
376 : threadkey = TLS_OUT_OF_INDEXES;
377 : return false;
378 : }
379 : InitializeCriticalSection(&winthread_cs);
380 : #endif
381 334 : mainthread.next = NULL;
382 334 : mtthreads = &mainthread;
383 334 : thread_initialized = true;
384 334 : return true;
385 : }
386 : bool
387 0 : MT_thread_register(void)
388 : {
389 0 : assert(thread_initialized);
390 0 : if (!thread_initialized)
391 : return false;
392 :
393 0 : struct mtthread *self;
394 :
395 0 : if ((self = thread_self()) != NULL) {
396 0 : if (self->refs == 1000000) {
397 : /* there are limits... */
398 : return false;
399 : }
400 0 : self->refs++;
401 0 : return true;
402 : }
403 :
404 0 : self = malloc(sizeof(*self));
405 0 : if (self == NULL)
406 : return false;
407 :
408 0 : *self = (struct mtthread) {
409 : .detached = false,
410 : #ifdef HAVE_PTHREAD_H
411 0 : .hdl = pthread_self(),
412 : #else
413 : .wtid = GetCurrentThreadId(),
414 : #endif
415 : .refs = 1,
416 0 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
417 : .exited = ATOMIC_VAR_INIT(0),
418 : };
419 0 : snprintf(self->threadname, sizeof(self->threadname), "foreign %zu", self->tid);
420 0 : thread_setself(self);
421 0 : thread_lock();
422 0 : self->next = mtthreads;
423 0 : mtthreads = self;
424 0 : thread_unlock();
425 0 : return true;
426 : }
427 :
428 : void
429 0 : MT_thread_deregister(void)
430 : {
431 0 : struct mtthread *self;
432 :
433 0 : if ((self = thread_self()) == NULL)
434 : return;
435 :
436 0 : if (--self->refs == 0) {
437 0 : rm_mtthread(self);
438 0 : thread_setself(NULL);
439 : }
440 : }
441 :
442 : static struct mtthread *
443 7443 : find_mtthread(MT_Id tid)
444 : {
445 7443 : struct mtthread *t;
446 :
447 7443 : thread_lock();
448 29839 : for (t = mtthreads; t && t->tid != tid; t = t->next)
449 : ;
450 7443 : thread_unlock();
451 7443 : return t;
452 : }
453 :
454 : gdk_return
455 334 : MT_alloc_tls(MT_TLS_t *newkey)
456 : {
457 : #ifdef HAVE_PTHREAD_H
458 334 : int ret;
459 334 : if ((ret = pthread_key_create(newkey, NULL)) != 0) {
460 0 : GDKsyserr(ret, "Creating TLS key for thread failed");
461 0 : return GDK_FAIL;
462 : }
463 : #else
464 : if ((*newkey = TlsAlloc()) == TLS_OUT_OF_INDEXES) {
465 : GDKwinerror("Creating TLS key for thread failed");
466 : return GDK_FAIL;
467 : }
468 : #endif
469 : return GDK_SUCCEED;
470 : }
471 :
472 : void
473 46254 : MT_tls_set(MT_TLS_t key, void *val)
474 : {
475 : #ifdef HAVE_PTHREAD_H
476 46254 : pthread_setspecific(key, val);
477 : #else
478 : assert(key != TLS_OUT_OF_INDEXES);
479 : TlsSetValue(key, val);
480 : #endif
481 46231 : }
482 :
483 : void *
484 81270 : MT_tls_get(MT_TLS_t key)
485 : {
486 : #ifdef HAVE_PTHREAD_H
487 81270 : return pthread_getspecific(key);
488 : #else
489 : assert(key != TLS_OUT_OF_INDEXES);
490 : return TlsGetValue(key);
491 : #endif
492 : }
493 :
494 : const char *
495 1208432539 : MT_thread_getname(void)
496 : {
497 1208432539 : struct mtthread *self;
498 :
499 1208432539 : if (!thread_initialized)
500 : return mainthread.threadname;
501 1212005086 : self = thread_self();
502 1217636487 : return self ? self->threadname : UNKNOWN_THREAD;
503 : }
504 :
505 : void
506 88643 : GDKsetbuf(char *errbuf)
507 : {
508 88643 : struct mtthread *self;
509 :
510 88643 : self = thread_self();
511 88648 : if (self == NULL)
512 0 : self = &mainthread;
513 88648 : assert(errbuf == NULL || self->errbuf == NULL);
514 88648 : self->errbuf = errbuf;
515 44324 : if (errbuf)
516 44324 : *errbuf = 0; /* start clean */
517 88648 : }
518 :
519 : char *
520 21574189 : GDKgetbuf(void)
521 : {
522 21574189 : struct mtthread *self;
523 :
524 21574189 : self = thread_self();
525 21590520 : if (self == NULL)
526 0 : self = &mainthread;
527 21590520 : return self->errbuf;
528 : }
529 :
530 : struct freebats *
531 43121153 : MT_thread_getfreebats(void)
532 : {
533 43121153 : struct mtthread *self;
534 :
535 43121153 : self = thread_self();
536 43140122 : if (self == NULL)
537 0 : self = &mainthread;
538 43140122 : return &self->freebats;
539 : }
540 :
541 : void
542 0 : MT_thread_setdata(void *data)
543 : {
544 0 : if (!thread_initialized)
545 : return;
546 0 : struct mtthread *self = thread_self();
547 :
548 0 : if (self)
549 0 : self->data = data;
550 : }
551 :
552 : void *
553 0 : MT_thread_getdata(void)
554 : {
555 0 : if (!thread_initialized)
556 : return NULL;
557 0 : struct mtthread *self = thread_self();
558 :
559 0 : return self ? self->data : NULL;
560 : }
561 :
562 : void
563 23377240 : MT_thread_set_qry_ctx(QryCtx *ctx)
564 : {
565 23377240 : if (!thread_initialized)
566 : return;
567 23387715 : struct mtthread *self = thread_self();
568 :
569 23407412 : if (self)
570 23407412 : self->qry_ctx = ctx;
571 : }
572 :
573 : QryCtx *
574 27628735 : MT_thread_get_qry_ctx(void)
575 : {
576 27628735 : if (!thread_initialized)
577 : return NULL;
578 27628735 : struct mtthread *self = thread_self();
579 :
580 27649877 : return self ? self->qry_ctx : NULL;
581 : }
582 :
583 : void
584 32174519 : MT_thread_setlockwait(MT_Lock *lock)
585 : {
586 32174519 : if (!thread_initialized)
587 : return;
588 32245581 : struct mtthread *self = thread_self();
589 :
590 32327399 : if (self)
591 32327399 : self->lockwait = lock;
592 : }
593 :
594 : void
595 12661344 : MT_thread_setsemawait(MT_Sema *sema)
596 : {
597 12661344 : if (!thread_initialized)
598 : return;
599 12684305 : struct mtthread *self = thread_self();
600 :
601 12675058 : if (self)
602 12675058 : self->semawait = sema;
603 : }
604 :
605 : static void
606 18 : MT_thread_setcondwait(MT_Cond *cond)
607 : {
608 18 : if (!thread_initialized)
609 : return;
610 18 : struct mtthread *self = thread_self();
611 :
612 18 : if (self)
613 18 : self->condwait = cond;
614 : }
615 :
616 : #ifdef LOCK_OWNER
617 : void
618 1198432266 : MT_thread_add_mylock(MT_Lock *lock)
619 : {
620 1198432266 : struct mtthread *self;
621 1198432266 : if (!thread_initialized)
622 : self = &mainthread;
623 : else
624 1196815789 : self = thread_self();
625 :
626 1199846707 : if (self) {
627 1201463184 : lock->nxt = self->mylocks;
628 1201463184 : self->mylocks = lock;
629 : }
630 1201463184 : }
631 :
632 : void
633 1191799574 : MT_thread_del_mylock(MT_Lock *lock)
634 : {
635 1191799574 : struct mtthread *self;
636 1191799574 : if (!thread_initialized)
637 : self = &mainthread;
638 : else
639 1187861634 : self = thread_self();
640 :
641 1193707900 : if (self) {
642 1197645840 : if (self->mylocks == lock) {
643 1197637795 : self->mylocks = lock->nxt;
644 : } else {
645 205017 : for (MT_Lock *l = self->mylocks; l; l = l->nxt) {
646 205017 : if (l->nxt == lock) {
647 8045 : l->nxt = lock->nxt;
648 8045 : break;
649 : }
650 : }
651 : }
652 : }
653 1197645840 : }
654 : #endif
655 :
656 : void
657 16914549 : MT_thread_setworking(const char *work)
658 : {
659 16914549 : if (!thread_initialized)
660 : return;
661 16919012 : struct mtthread *self = thread_self();
662 :
663 16977409 : if (self) {
664 16977409 : if (work == NULL)
665 859265 : self->working = NULL;
666 16118144 : else if (strcmp(work, "store locked") == 0)
667 952957 : self->limit_override = true;
668 15165187 : else if (strcmp(work, "store unlocked") == 0)
669 952957 : self->limit_override = false;
670 : else
671 14212230 : self->working = work;
672 : }
673 : }
674 :
675 : void
676 60936237 : MT_thread_setalgorithm(const char *algo)
677 : {
678 60936237 : if (!thread_initialized)
679 : return;
680 60953554 : struct mtthread *self = thread_self();
681 :
682 60986553 : if (self) {
683 60986553 : if (algo) {
684 4187859 : if (self->algolen > 0) {
685 2272192 : if (self->algolen < sizeof(self->algorithm))
686 1324495 : self->algolen += strconcat_len(self->algorithm + self->algolen, sizeof(self->algorithm) - self->algolen, "; ", algo, NULL);
687 : } else
688 1915667 : self->algolen = strcpy_len(self->algorithm, algo, sizeof(self->algorithm));
689 : } else {
690 56798694 : self->algorithm[0] = 0;
691 56798694 : self->algolen = 0;
692 : }
693 : }
694 : }
695 :
696 : const char *
697 1882 : MT_thread_getalgorithm(void)
698 : {
699 1882 : if (!thread_initialized)
700 : return NULL;
701 1882 : struct mtthread *self = thread_self();
702 :
703 1882 : return self && self->algorithm[0] ? self->algorithm : NULL;
704 : }
705 :
706 : bool
707 0 : MT_thread_override_limits(void)
708 : {
709 0 : if (!thread_initialized)
710 : return false;
711 0 : struct mtthread *self = thread_self();
712 :
713 0 : return self && self->limit_override;
714 : }
715 :
716 : static struct thread_init_cb {
717 : struct thread_init_cb *next;
718 : void (*init)(void *);
719 : void (*destroy)(void *);
720 : void *data;
721 : } *init_cb;
722 : static MT_Lock thread_init_lock = MT_LOCK_INITIALIZER(thread_init_lock);
723 :
724 : gdk_return
725 329 : MT_thread_init_add_callback(void (*init)(void *), void (*destroy)(void *), void *data)
726 : {
727 329 : struct thread_init_cb *p = GDKmalloc(sizeof(struct thread_init_cb));
728 :
729 329 : if (p == NULL)
730 : return GDK_FAIL;
731 329 : *p = (struct thread_init_cb) {
732 : .init = init,
733 : .destroy = destroy,
734 : .next = NULL,
735 : .data = data,
736 : };
737 329 : MT_lock_set(&thread_init_lock);
738 329 : struct thread_init_cb **pp = &init_cb;
739 329 : while (*pp)
740 0 : pp = &(*pp)->next;
741 329 : *pp = p;
742 329 : MT_lock_unset(&thread_init_lock);
743 329 : return GDK_SUCCEED;
744 : }
745 :
746 : #ifdef HAVE_PTHREAD_H
747 : static void *
748 : #else
749 : static DWORD WINAPI
750 : #endif
751 46653 : thread_starter(void *arg)
752 : {
753 46653 : struct mtthread *self = (struct mtthread *) arg;
754 46653 : void *data = self->data;
755 :
756 : #ifdef HAVE_GETTID
757 46653 : self->lwptid = gettid();
758 : #endif
759 : #ifdef HAVE_PTHREAD_H
760 : #ifdef HAVE_PTHREAD_SETNAME_NP
761 : /* name can be at most 16 chars including \0 */
762 46643 : char name[16];
763 46643 : (void) strcpy_len(name, self->threadname, sizeof(name));
764 46650 : pthread_setname_np(
765 : #ifndef __APPLE__
766 : pthread_self(),
767 : #endif
768 : name);
769 : #endif
770 : #else
771 : #ifdef HAVE_SETTHREADDESCRIPTION
772 : wchar_t *wname = utf8towchar(self->threadname);
773 : if (wname != NULL) {
774 : SetThreadDescription(GetCurrentThread(), wname);
775 : free(wname);
776 : }
777 : #endif
778 : #endif
779 46626 : self->data = NULL;
780 46626 : self->sp = THRsp();
781 46629 : thread_setself(self);
782 92815 : for (int i = 0; i < self->nthread_funcs; i++) {
783 46183 : if (self->thread_funcs[i].init)
784 46183 : (*self->thread_funcs[i].init)(self->thread_funcs[i].data);
785 : }
786 46639 : (*self->func)(data);
787 92834 : for (int i = 0; i < self->nthread_funcs; i++) {
788 46187 : if (self->thread_funcs[i].destroy)
789 46187 : (*self->thread_funcs[i].destroy)(self->thread_funcs[i].data);
790 : }
791 46627 : free(self->thread_funcs);
792 46627 : BBPrelinquishbats();
793 46635 : ATOMIC_SET(&self->exited, 1);
794 46635 : TRC_DEBUG(THRD, "Exit thread \"%s\"\n", self->threadname);
795 46635 : return 0; /* NULL for pthreads, 0 for Windows */
796 : }
797 :
798 : static void
799 54440 : join_threads(void)
800 : {
801 54440 : bool waited;
802 :
803 54440 : struct mtthread *self = thread_self();
804 54440 : if (!self)
805 : return;
806 54440 : thread_lock();
807 92262 : do {
808 92262 : waited = false;
809 2754751 : for (struct mtthread *t = mtthreads; t; t = t->next) {
810 2700311 : if (ATOMIC_GET(&t->exited) && t->detached && !t->waiting) {
811 37822 : t->waiting = true;
812 37822 : thread_unlock();
813 37822 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
814 37822 : self->joinwait = t;
815 : #ifdef HAVE_PTHREAD_H
816 37822 : pthread_join(t->hdl, NULL);
817 : #else
818 : WaitForSingleObject(t->hdl, INFINITE);
819 : #endif
820 37822 : self->joinwait = NULL;
821 : #ifndef HAVE_PTHREAD_H
822 : CloseHandle(t->hdl);
823 : #endif
824 37822 : rm_mtthread(t);
825 37822 : waited = true;
826 37822 : thread_lock();
827 37822 : break;
828 : }
829 : }
830 92262 : } while (waited);
831 54440 : thread_unlock();
832 : }
833 :
834 : void
835 678 : join_detached_threads(void)
836 : {
837 678 : bool waited;
838 :
839 678 : struct mtthread *self = thread_self();
840 678 : thread_lock();
841 2059 : do {
842 2059 : waited = false;
843 10203 : for (struct mtthread *t = mtthreads; t; t = t->next) {
844 9525 : if (t->detached && !t->waiting) {
845 1381 : t->waiting = true;
846 1381 : thread_unlock();
847 1381 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
848 1381 : self->joinwait = t;
849 : #ifdef HAVE_PTHREAD_H
850 1381 : pthread_join(t->hdl, NULL);
851 : #else
852 : WaitForSingleObject(t->hdl, INFINITE);
853 : #endif
854 1381 : self->joinwait = NULL;
855 : #ifndef HAVE_PTHREAD_H
856 : CloseHandle(t->hdl);
857 : #endif
858 1381 : rm_mtthread(t);
859 1381 : waited = true;
860 1381 : thread_lock();
861 1381 : break;
862 : }
863 : }
864 2059 : } while (waited);
865 678 : thread_unlock();
866 678 : }
867 :
868 : int
869 46658 : MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname)
870 : {
871 46658 : struct mtthread *self;
872 :
873 46658 : assert(thread_initialized);
874 46658 : join_threads();
875 46658 : if (threadname == NULL) {
876 0 : TRC_CRITICAL(GDK, "Thread must have a name\n");
877 0 : return -1;
878 : }
879 46658 : if (strlen(threadname) >= sizeof(self->threadname)) {
880 0 : TRC_CRITICAL(GDK, "Thread's name is too large\n");
881 0 : return -1;
882 : }
883 :
884 : #ifdef HAVE_PTHREAD_H
885 46658 : pthread_attr_t attr;
886 46658 : int ret;
887 46658 : if ((ret = pthread_attr_init(&attr)) != 0) {
888 0 : GDKsyserr(ret, "Cannot init pthread attr");
889 0 : return -1;
890 : }
891 46658 : if ((ret = pthread_attr_setstacksize(&attr, THREAD_STACK_SIZE)) != 0) {
892 0 : GDKsyserr(ret, "Cannot set stack size");
893 0 : pthread_attr_destroy(&attr);
894 0 : return -1;
895 : }
896 : #endif
897 46658 : self = malloc(sizeof(*self));
898 46658 : if (self == NULL) {
899 0 : GDKsyserror("Cannot allocate memory\n");
900 : #ifdef HAVE_PTHREAD_H
901 0 : pthread_attr_destroy(&attr);
902 : #endif
903 0 : return -1;
904 : }
905 :
906 46658 : *self = (struct mtthread) {
907 : .func = f,
908 : .data = arg,
909 : .waiting = false,
910 46658 : .detached = (d == MT_THR_DETACHED),
911 : .refs = 1,
912 46658 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
913 : .exited = ATOMIC_VAR_INIT(0),
914 : };
915 46658 : MT_lock_set(&thread_init_lock);
916 : /* remember the list of callback functions we need to call for
917 : * this thread (i.e. anything registered so far) */
918 92851 : for (struct thread_init_cb *p = init_cb; p; p = p->next)
919 46193 : self->nthread_funcs++;
920 46658 : if (self->nthread_funcs > 0) {
921 46193 : self->thread_funcs = malloc(self->nthread_funcs * sizeof(*self->thread_funcs));
922 46193 : if (self->thread_funcs == NULL) {
923 0 : GDKsyserror("Cannot allocate memory\n");
924 0 : MT_lock_unset(&thread_init_lock);
925 0 : free(self);
926 : #ifdef HAVE_PTHREAD_H
927 0 : pthread_attr_destroy(&attr);
928 : #endif
929 0 : return -1;
930 : }
931 46193 : int n = 0;
932 92386 : for (struct thread_init_cb *p = init_cb; p; p = p->next) {
933 46193 : self->thread_funcs[n++] = (struct thread_funcs) {
934 46193 : .init = p->init,
935 46193 : .destroy = p->destroy,
936 46193 : .data = p->data,
937 : };
938 : }
939 : }
940 46658 : MT_lock_unset(&thread_init_lock);
941 :
942 46658 : strcpy_len(self->threadname, threadname, sizeof(self->threadname));
943 46658 : char *p;
944 46658 : if ((p = strstr(self->threadname, "XXXX")) != NULL) {
945 : /* overwrite XXXX with thread ID; bottom three bits are
946 : * likely 0, so skip those */
947 42782 : char buf[5];
948 42782 : snprintf(buf, 5, "%04zu", self->tid % 9999);
949 42782 : memcpy(p, buf, 4);
950 : }
951 46658 : TRC_DEBUG(THRD, "Create thread \"%s\"\n", self->threadname);
952 : #ifdef HAVE_PTHREAD_H
953 : #ifdef HAVE_PTHREAD_SIGMASK
954 46658 : sigset_t new_mask, orig_mask;
955 46658 : (void) sigfillset(&new_mask);
956 46658 : sigdelset(&new_mask, SIGQUIT);
957 46658 : sigdelset(&new_mask, SIGPROF);
958 46658 : pthread_sigmask(SIG_SETMASK, &new_mask, &orig_mask);
959 : #endif
960 46658 : ret = pthread_create(&self->hdl, &attr, thread_starter, self);
961 46658 : pthread_attr_destroy(&attr);
962 : #ifdef HAVE_PTHREAD_SIGMASK
963 46658 : pthread_sigmask(SIG_SETMASK, &orig_mask, NULL);
964 : #endif
965 46658 : if (ret != 0) {
966 0 : GDKsyserr(ret, "Cannot start thread");
967 0 : free(self->thread_funcs);
968 0 : free(self);
969 0 : return -1;
970 : }
971 : #else
972 : self->hdl = CreateThread(NULL, THREAD_STACK_SIZE, thread_starter, self,
973 : 0, &self->wtid);
974 : if (self->hdl == NULL) {
975 : GDKwinerror("Failed to create thread");
976 : free(self->thread_funcs);
977 : free(self);
978 : return -1;
979 : }
980 : #endif
981 : /* must not fail after this: the thread has been started */
982 46658 : *t = self->tid;
983 46658 : thread_lock();
984 46658 : self->next = mtthreads;
985 46658 : mtthreads = self;
986 46658 : thread_unlock();
987 46658 : return 0;
988 : }
989 :
990 : MT_Id
991 93617760 : MT_getpid(void)
992 : {
993 93617760 : struct mtthread *self;
994 :
995 93617760 : if (!thread_initialized)
996 : self = &mainthread;
997 : else
998 93705171 : self = thread_self();
999 93699408 : return self->tid;
1000 : }
1001 :
1002 : void
1003 37965 : MT_exiting_thread(void)
1004 : {
1005 37965 : struct mtthread *self;
1006 :
1007 37965 : if (!thread_initialized)
1008 : return;
1009 37965 : self = thread_self();
1010 37966 : if (self) {
1011 37966 : ATOMIC_SET(&self->exited, 1);
1012 37966 : self->working = NULL;
1013 : }
1014 : }
1015 :
1016 : int
1017 7443 : MT_join_thread(MT_Id tid)
1018 : {
1019 7443 : struct mtthread *t;
1020 :
1021 7443 : assert(tid != mainthread.tid);
1022 7443 : join_threads();
1023 7443 : t = find_mtthread(tid);
1024 7443 : if (t == NULL
1025 : #ifndef HAVE_PTHREAD_H
1026 : || t->hdl == NULL
1027 : #endif
1028 : )
1029 : return -1;
1030 7443 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
1031 7443 : struct mtthread *self = thread_self();
1032 7443 : self->joinwait = t;
1033 : #ifdef HAVE_PTHREAD_H
1034 7443 : int ret = pthread_join(t->hdl, NULL);
1035 : #else
1036 : DWORD ret = WaitForSingleObject(t->hdl, INFINITE);
1037 : #endif
1038 7443 : self->joinwait = NULL;
1039 7443 : if (
1040 : #ifdef HAVE_PTHREAD_H
1041 : ret == 0
1042 : #else
1043 : ret == WAIT_OBJECT_0 && CloseHandle(t->hdl)
1044 : #endif
1045 : ) {
1046 7443 : rm_mtthread(t);
1047 7443 : return 0;
1048 : }
1049 : return -1;
1050 : }
1051 :
1052 : static bool
1053 0 : MT_kill_thread(struct mtthread *t)
1054 : {
1055 0 : assert(t != thread_self());
1056 : #ifdef HAVE_PTHREAD_H
1057 : #ifdef HAVE_PTHREAD_KILL
1058 0 : if (pthread_kill(t->hdl, SIGHUP) == 0)
1059 0 : return true;
1060 : #endif
1061 : #else
1062 : if (t->hdl == NULL) {
1063 : /* detached thread */
1064 : HANDLE h;
1065 : bool ret = false;
1066 : h = OpenThread(THREAD_ALL_ACCESS, 0, t->wtid);
1067 : if (h == NULL)
1068 : return false;
1069 : if (TerminateThread(h, -1))
1070 : ret = true;
1071 : CloseHandle(h);
1072 : return ret;
1073 : }
1074 : if (TerminateThread(t->hdl, -1))
1075 : return true;
1076 : #endif
1077 : return false;
1078 : }
1079 :
1080 : bool
1081 339 : MT_kill_threads(void)
1082 : {
1083 339 : struct mtthread *self = thread_self();
1084 339 : bool killed = false;
1085 :
1086 339 : assert(self == &mainthread);
1087 339 : join_threads();
1088 339 : thread_lock();
1089 678 : for (struct mtthread *t = mtthreads; t; t = t->next) {
1090 339 : if (t == self)
1091 339 : continue;
1092 0 : TRC_INFO(GDK, "Killing thread %s\n", t->threadname);
1093 0 : killed |= MT_kill_thread(t);
1094 : }
1095 339 : thread_unlock();
1096 339 : return killed;
1097 : }
1098 :
1099 : int
1100 341 : MT_check_nr_cores(void)
1101 : {
1102 341 : int ncpus = -1;
1103 :
1104 : #if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_ONLN)
1105 : /* this works on Linux, Solaris and AIX */
1106 341 : ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1107 : #elif defined(HW_NCPU) /* BSD */
1108 : size_t len = sizeof(int);
1109 : int mib[3];
1110 :
1111 : /* Everyone should have permission to make this call,
1112 : * if we get a failure something is really wrong. */
1113 : mib[0] = CTL_HW;
1114 : mib[1] = HW_NCPU;
1115 : mib[2] = -1;
1116 : sysctl(mib, 3, &ncpus, &len, NULL, 0);
1117 : #elif defined(WIN32)
1118 : SYSTEM_INFO sysinfo;
1119 :
1120 : GetSystemInfo(&sysinfo);
1121 : ncpus = sysinfo.dwNumberOfProcessors;
1122 : #endif
1123 :
1124 : /* if we ever need HPUX or OSF/1 (hope not), see
1125 : * http://ndevilla.free.fr/threads/ */
1126 :
1127 341 : if (ncpus <= 0)
1128 : ncpus = 1;
1129 : #if SIZEOF_SIZE_T == SIZEOF_INT
1130 : /* On 32-bits systems with large numbers of cpus/cores, we
1131 : * quickly run out of space due to the number of threads in
1132 : * use. Since it is questionable whether many cores on a
1133 : * 32-bits system are going to be beneficial due to this, we
1134 : * simply limit the auto-detected cores to 16 on 32-bits
1135 : * systems. The user can always override this via
1136 : * gdk_nr_threads. */
1137 : if (ncpus > 16)
1138 : ncpus = 16;
1139 : #endif
1140 :
1141 : #ifndef WIN32
1142 : /* get the number of allocated cpus from the cgroup settings */
1143 341 : FILE *f = fopen("/sys/fs/cgroup/cpuset/cpuset.cpus", "r");
1144 341 : if (f != NULL) {
1145 0 : char buf[512];
1146 0 : char *p = fgets(buf, 512, f);
1147 0 : fclose(f);
1148 0 : if (p != NULL) {
1149 : /* syntax is: ranges of CPU numbers separated
1150 : * by comma; a range is either a single CPU
1151 : * id, or two IDs separated by a minus; any
1152 : * deviation causes the file to be ignored */
1153 : int ncpu = 0;
1154 0 : for (;;) {
1155 0 : char *q;
1156 0 : unsigned fst = strtoul(p, &q, 10);
1157 0 : if (q == p)
1158 0 : return ncpus;
1159 0 : ncpu++;
1160 0 : if (*q == '-') {
1161 0 : p = q + 1;
1162 0 : unsigned lst = strtoul(p, &q, 10);
1163 0 : if (q == p || lst <= fst)
1164 : return ncpus;
1165 0 : ncpu += lst - fst;
1166 : }
1167 0 : if (*q == '\n')
1168 : break;
1169 0 : if (*q != ',')
1170 : return ncpus;
1171 0 : p = q + 1;
1172 : }
1173 0 : if (ncpu < ncpus)
1174 : return ncpu;
1175 : }
1176 : }
1177 : #endif
1178 :
1179 : return ncpus;
1180 : }
1181 :
1182 :
1183 : void
1184 341 : MT_cond_init(MT_Cond *cond)
1185 : {
1186 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1187 : InitializeConditionVariable(&cond->cv);
1188 : #else
1189 341 : pthread_cond_init(&cond->cv, NULL);
1190 : #endif
1191 341 : }
1192 :
1193 :
1194 : void
1195 0 : MT_cond_destroy(MT_Cond *cond)
1196 : {
1197 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1198 : /* no need */
1199 : #else
1200 0 : pthread_cond_destroy(&cond->cv);
1201 : #endif
1202 0 : }
1203 :
1204 : void
1205 9 : MT_cond_wait(MT_Cond *cond, MT_Lock *lock)
1206 : {
1207 9 : MT_thread_setcondwait(cond);
1208 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1209 : SleepConditionVariableCS(&cond->cv, &lock->lock, INFINITE);
1210 : #else
1211 9 : pthread_cond_wait(&cond->cv, &lock->lock);
1212 : #endif
1213 9 : MT_thread_setcondwait(NULL);
1214 9 : }
1215 :
1216 : void
1217 55964 : MT_cond_signal(MT_Cond *cond)
1218 : {
1219 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1220 : WakeConditionVariable(&cond->cv);
1221 : #else
1222 55964 : pthread_cond_signal(&cond->cv);
1223 : #endif
1224 55964 : }
1225 :
1226 : void
1227 2 : MT_cond_broadcast(MT_Cond *cond)
1228 : {
1229 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1230 : WakeAllConditionVariable(&cond->cv);
1231 : #else
1232 2 : pthread_cond_broadcast(&cond->cv);
1233 : #endif
1234 2 : }
|