Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : /*
14 : * @a Niels Nes, Peter Boncz
15 : * @+ Threads
16 : * This file contains a wrapper layer for threading, hence the
17 : * underscore convention MT_x (Multi-Threading). As all platforms
18 : * that MonetDB runs on now support POSIX Threads (pthreads), this
19 : * wrapping layer has become rather thin.
20 : *
21 : * In the late 1990s when multi-threading support was introduced in
22 : * MonetDB, pthreads was just emerging as a standard API and not
23 : * widely adopted yet. The earliest MT implementation focused on SGI
24 : * Unix and provided multi- threading using multiple processses, and
25 : * shared memory.
26 : *
27 : * One of the relics of this model, namely the need to pre-allocate
28 : * locks and semaphores, and consequently a maximum number of them,
29 : * has been removed in the latest iteration of this layer.
30 : *
31 : */
32 : /*
33 : * @- Mthreads Routine implementations
34 : */
35 : #include "monetdb_config.h"
36 : #include "mstring.h"
37 : #include "gdk.h"
38 : #include "gdk_system_private.h"
39 :
40 : #include <time.h>
41 :
42 : #ifdef HAVE_FTIME
43 : #include <sys/timeb.h> /* ftime */
44 : #endif
45 : #ifdef HAVE_SYS_TIME_H
46 : #include <sys/time.h> /* gettimeofday */
47 : #endif
48 :
49 : #include <signal.h>
50 : #include <string.h> /* for strerror */
51 : #include <unistd.h> /* for sysconf symbols */
52 :
53 : #include "mutils.h"
54 :
55 : static ATOMIC_TYPE GDKthreadid = ATOMIC_VAR_INIT(1);
56 :
57 : #ifdef LOCK_STATS
58 :
59 : ATOMIC_TYPE GDKlockcnt = ATOMIC_VAR_INIT(0);
60 : ATOMIC_TYPE GDKlockcontentioncnt = ATOMIC_VAR_INIT(0);
61 : ATOMIC_TYPE GDKlocksleepcnt = ATOMIC_VAR_INIT(0);
62 : MT_Lock *volatile GDKlocklist = 0;
63 : ATOMIC_FLAG GDKlocklistlock = ATOMIC_FLAG_INIT;
64 :
65 : /* merge sort of linked list */
66 : static MT_Lock *
67 : sortlocklist(MT_Lock *l)
68 : {
69 : MT_Lock *r, *t, *ll = NULL;
70 :
71 : if (l == NULL || l->next == NULL) {
72 : /* list is trivially sorted (0 or 1 element) */
73 : return l;
74 : }
75 : /* break list into two (almost) equal pieces:
76 : * l is start of "left" list, r of "right" list, ll last
77 : * element of "left" list */
78 : for (t = r = l; t && t->next; t = t->next->next) {
79 : ll = r;
80 : r = r->next;
81 : }
82 : ll->next = NULL; /* break list into two */
83 : r->prev = NULL;
84 : /* recursively sort both sublists */
85 : l = sortlocklist(l);
86 : r = sortlocklist(r);
87 : /* merge
88 : * t is new list, ll is last element of new list, l and r are
89 : * start of unprocessed part of left and right lists */
90 : t = ll = NULL;
91 : while (l && r) {
92 : if (ATOMIC_GET(&l->sleep) < ATOMIC_GET(&r->sleep) ||
93 : (ATOMIC_GET(&l->sleep) == ATOMIC_GET(&r->sleep) &&
94 : (ATOMIC_GET(&l->contention) < ATOMIC_GET(&r->contention) ||
95 : (ATOMIC_GET(&l->contention) == ATOMIC_GET(&r->contention) &&
96 : l->count <= r->count)))) {
97 : /* l is smaller */
98 : if (ll == NULL) {
99 : assert(t == NULL);
100 : t = ll = l;
101 : } else {
102 : ll->next = l;
103 : l->prev = ll;
104 : ll = ll->next;
105 : }
106 : l = l->next;
107 : } else {
108 : /* r is smaller */
109 : if (ll == NULL) {
110 : assert(t == NULL);
111 : t = ll = r;
112 : } else {
113 : ll->next = r;
114 : r->prev = ll;
115 : ll = ll->next;
116 : }
117 : r = r->next;
118 : }
119 : }
120 : /* append rest of remaining list */
121 : if (l) {
122 : ll->next = l;
123 : l->prev = ll;
124 : } else {
125 : ll->next = r;
126 : r->prev = ll;
127 : }
128 : return t;
129 : }
130 :
131 : static inline bool
132 : lock_isset(MT_Lock *l)
133 : {
134 : if (MT_lock_try(l)) {
135 : MT_lock_unset(l);
136 : return false;
137 : }
138 : return true;
139 : }
140 :
141 : /* function used for debugging */
142 : void
143 : GDKlockstatistics(int what)
144 : {
145 : MT_Lock *l;
146 : int n = 0;
147 :
148 : if (ATOMIC_TAS(&GDKlocklistlock) != 0) {
149 : printf("GDKlocklistlock is set, so cannot access lock list\n");
150 : return;
151 : }
152 : if (what == -1) {
153 : for (l = GDKlocklist; l; l = l->next) {
154 : l->count = 0;
155 : ATOMIC_SET(&l->contention, 0);
156 : ATOMIC_SET(&l->sleep, 0);
157 : }
158 : ATOMIC_CLEAR(&GDKlocklistlock);
159 : return;
160 : }
161 : GDKlocklist = sortlocklist(GDKlocklist);
162 : printf("%-18s\t%s\t%s\t%s\t%s\t%s\t%s\n",
163 : "lock name", "count", "content", "sleep",
164 : "locked", "locker", "thread");
165 : for (l = GDKlocklist; l; l = l->next) {
166 : n++;
167 : if (what == 0 ||
168 : (what == 1 && l->count) ||
169 : (what == 2 && ATOMIC_GET(&l->contention)) ||
170 : (what == 3 && lock_isset(l)))
171 : printf("%-18s\t%zu\t%zu\t%zu\t%s\t%s\t%s\n",
172 : l->name, l->count,
173 : (size_t) ATOMIC_GET(&l->contention),
174 : (size_t) ATOMIC_GET(&l->sleep),
175 : lock_isset(l) ? "locked" : "",
176 : l->locker ? l->locker : "",
177 : l->thread ? l->thread : "");
178 : }
179 : printf("Number of locks: %d\n", n);
180 : printf("Total lock count: %zu\n", (size_t) ATOMIC_GET(&GDKlockcnt));
181 : printf("Lock contention: %zu\n", (size_t) ATOMIC_GET(&GDKlockcontentioncnt));
182 : printf("Lock sleep count: %zu\n", (size_t) ATOMIC_GET(&GDKlocksleepcnt));
183 : fflush(stdout);
184 : ATOMIC_CLEAR(&GDKlocklistlock);
185 : }
186 :
187 : #endif /* LOCK_STATS */
188 :
189 : struct thread_funcs {
190 : void (*init)(void *);
191 : void (*destroy)(void *);
192 : void *data;
193 : };
194 :
195 : static struct mtthread {
196 : struct mtthread *next;
197 : void (*func) (void *); /* function to be called */
198 : void *data; /* and its data */
199 : struct thread_funcs *thread_funcs; /* callback funcs */
200 : int nthread_funcs;
201 : MT_Lock *lockwait; /* lock we're waiting for */
202 : MT_Sema *semawait; /* semaphore we're waiting for */
203 : MT_Cond *condwait; /* condition variable we're waiting for */
204 : #ifdef LOCK_OWNER
205 : MT_Lock *mylocks; /* locks we're holding */
206 : #endif
207 : struct mtthread *joinwait; /* process we are joining with */
208 : const char *working; /* what we're currently doing */
209 : char algorithm[512]; /* the algorithm used in the last operation */
210 : size_t algolen; /* length of string in .algorithm */
211 : ATOMIC_TYPE exited;
212 : bool detached:1, waiting:1;
213 : unsigned int refs:20;
214 : bool limit_override; /* not in bit field because of data races */
215 : char threadname[MT_NAME_LEN];
216 : QryCtx *qry_ctx;
217 : #ifdef HAVE_PTHREAD_H
218 : pthread_t hdl;
219 : #else
220 : HANDLE hdl;
221 : DWORD wtid;
222 : #endif
223 : #ifdef HAVE_GETTID
224 : pid_t lwptid;
225 : #endif
226 : MT_Id tid;
227 : uintptr_t sp;
228 : char *errbuf;
229 : struct freebats freebats;
230 : } *mtthreads = NULL;
231 : struct mtthread mainthread = {
232 : .threadname = "main thread",
233 : .exited = ATOMIC_VAR_INIT(0),
234 : .refs = 1,
235 : .tid = 1,
236 : };
237 : #ifdef HAVE_PTHREAD_H
238 : static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
239 : static pthread_key_t threadkey;
240 : #define thread_lock() pthread_mutex_lock(&posthread_lock)
241 : #define thread_unlock() pthread_mutex_unlock(&posthread_lock)
242 : #define thread_self() pthread_getspecific(threadkey)
243 : #define thread_setself(self) pthread_setspecific(threadkey, self)
244 : #else
245 : static CRITICAL_SECTION winthread_cs;
246 : static DWORD threadkey = TLS_OUT_OF_INDEXES;
247 : #define thread_lock() EnterCriticalSection(&winthread_cs)
248 : #define thread_unlock() LeaveCriticalSection(&winthread_cs)
249 : #define thread_self() TlsGetValue(threadkey)
250 : #define thread_setself(self) TlsSetValue(threadkey, self)
251 : #endif
252 : static bool thread_initialized = false;
253 :
254 : #if defined(_MSC_VER) && _MSC_VER >= 1900
255 : #pragma warning(disable : 4172)
256 : #endif
257 : static inline uintptr_t
258 95800105 : THRsp(void)
259 : {
260 : #if defined(__GNUC__) || defined(__clang__)
261 191600209 : return (uintptr_t) __builtin_frame_address(0);
262 : #else
263 : int l = 0;
264 : uintptr_t sp = (uintptr_t) (&l);
265 :
266 : return sp;
267 : #endif
268 : }
269 :
270 : bool
271 95750350 : THRhighwater(void)
272 : {
273 95750350 : struct mtthread *s = thread_self();
274 95750350 : if (s != NULL && s->sp != 0) {
275 95750309 : uintptr_t c = THRsp();
276 95750307 : size_t diff = c < s->sp ? s->sp - c : c - s->sp;
277 95750307 : if (diff > THREAD_STACK_SIZE - 80 * 1024)
278 : return true;
279 : }
280 : return false;
281 : }
282 :
283 : void
284 114 : dump_threads(void)
285 : {
286 114 : char buf[1024];
287 114 : thread_lock();
288 1375 : for (struct mtthread *t = mtthreads; t; t = t->next) {
289 1261 : MT_Lock *lk = t->lockwait;
290 1261 : MT_Sema *sm = t->semawait;
291 1261 : MT_Cond *cn = t->condwait;
292 1261 : struct mtthread *jn = t->joinwait;
293 1261 : int pos = snprintf(buf, sizeof(buf),
294 : "%s, tid %zu, "
295 : #ifdef HAVE_PTHREAD_H
296 : "Thread 0x%lx, "
297 : #endif
298 : #ifdef HAVE_GETTID
299 : "LWP %ld, "
300 : #endif
301 : "%"PRIu32" free bats, waiting for %s%s, working on %.200s",
302 1261 : t->threadname,
303 : t->tid,
304 : #ifdef HAVE_PTHREAD_H
305 1261 : (long) t->hdl,
306 : #endif
307 : #ifdef HAVE_GETTID
308 1261 : (long) t->lwptid,
309 : #endif
310 : t->freebats.nfreebats,
311 1261 : lk ? "lock " : sm ? "semaphore " : cn ? "condvar " : jn ? "thread " : "",
312 1261 : lk ? lk->name : sm ? sm->name : cn ? cn->name : jn ? jn->threadname : "nothing",
313 1261 : ATOMIC_GET(&t->exited) ? "exiting" :
314 1147 : t->working ? t->working : "nothing");
315 : #ifdef LOCK_OWNER
316 1261 : const char *sep = ", locked: ";
317 1262 : for (MT_Lock *l = t->mylocks; l && pos < (int) sizeof(buf); l = l->nxt) {
318 1 : pos += snprintf(buf + pos, sizeof(buf) - pos,
319 1 : "%s%s(%s)", sep, l->name, l->locker);
320 1 : sep = ", ";
321 : }
322 : #endif
323 1261 : TRC_DEBUG_IF(THRD)
324 0 : TRC_DEBUG_ENDIF(THRD, "%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
325 : else
326 2522 : printf("%s%s\n", buf, pos >= (int) sizeof(buf) ? "..." : "");
327 : }
328 114 : thread_unlock();
329 114 : }
330 :
331 : static void
332 49789 : rm_mtthread(struct mtthread *t)
333 : {
334 49789 : struct mtthread **pt;
335 :
336 49789 : assert(t != &mainthread);
337 49789 : thread_lock();
338 741667 : for (pt = &mtthreads; *pt && *pt != t; pt = &(*pt)->next)
339 : ;
340 49789 : if (*pt)
341 49789 : *pt = t->next;
342 49789 : ATOMIC_DESTROY(&t->exited);
343 49789 : free(t);
344 49789 : thread_unlock();
345 49789 : }
346 :
347 : bool
348 329 : MT_thread_init(void)
349 : {
350 329 : if (thread_initialized)
351 : return true;
352 : #ifdef HAVE_GETTID
353 329 : mainthread.lwptid = gettid();
354 : #endif
355 : #ifdef HAVE_PTHREAD_H
356 329 : int ret;
357 :
358 329 : if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
359 0 : GDKsyserr(ret, "Creating specific key for thread failed");
360 0 : return false;
361 : }
362 329 : mainthread.hdl = pthread_self();
363 329 : if ((ret = thread_setself(&mainthread)) != 0) {
364 0 : GDKsyserr(ret, "Setting specific value failed");
365 0 : return false;
366 : }
367 : #else
368 : threadkey = TlsAlloc();
369 : if (threadkey == TLS_OUT_OF_INDEXES) {
370 : GDKwinerror("Creating thread-local slot for thread failed");
371 : return false;
372 : }
373 : mainthread.wtid = GetCurrentThreadId();
374 : if (thread_setself(&mainthread) == 0) {
375 : GDKwinerror("Setting thread-local value failed");
376 : TlsFree(threadkey);
377 : threadkey = TLS_OUT_OF_INDEXES;
378 : return false;
379 : }
380 : InitializeCriticalSection(&winthread_cs);
381 : #endif
382 329 : mainthread.next = NULL;
383 329 : mtthreads = &mainthread;
384 329 : thread_initialized = true;
385 329 : return true;
386 : }
387 : bool
388 0 : MT_thread_register(void)
389 : {
390 0 : assert(thread_initialized);
391 0 : if (!thread_initialized)
392 : return false;
393 :
394 0 : struct mtthread *self;
395 :
396 0 : if ((self = thread_self()) != NULL) {
397 0 : if (self->refs == 1000000) {
398 : /* there are limits... */
399 : return false;
400 : }
401 0 : self->refs++;
402 0 : return true;
403 : }
404 :
405 0 : self = malloc(sizeof(*self));
406 0 : if (self == NULL)
407 : return false;
408 :
409 0 : *self = (struct mtthread) {
410 : .detached = false,
411 : #ifdef HAVE_PTHREAD_H
412 0 : .hdl = pthread_self(),
413 : #else
414 : .wtid = GetCurrentThreadId(),
415 : #endif
416 : .refs = 1,
417 0 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
418 : };
419 0 : snprintf(self->threadname, sizeof(self->threadname), "foreign %zu", self->tid);
420 0 : ATOMIC_INIT(&self->exited, 0);
421 0 : thread_setself(self);
422 0 : thread_lock();
423 0 : self->next = mtthreads;
424 0 : mtthreads = self;
425 0 : thread_unlock();
426 0 : return true;
427 : }
428 :
429 : void
430 0 : MT_thread_deregister(void)
431 : {
432 0 : struct mtthread *self;
433 :
434 0 : if ((self = thread_self()) == NULL)
435 : return;
436 :
437 0 : if (--self->refs == 0) {
438 0 : rm_mtthread(self);
439 0 : thread_setself(NULL);
440 : }
441 : }
442 :
443 : static struct mtthread *
444 10376 : find_mtthread(MT_Id tid)
445 : {
446 10376 : struct mtthread *t;
447 :
448 10376 : thread_lock();
449 50900 : for (t = mtthreads; t && t->tid != tid; t = t->next)
450 : ;
451 10376 : thread_unlock();
452 10376 : return t;
453 : }
454 :
455 : gdk_return
456 329 : MT_alloc_tls(MT_TLS_t *newkey)
457 : {
458 : #ifdef HAVE_PTHREAD_H
459 329 : int ret;
460 329 : if ((ret = pthread_key_create(newkey, NULL)) != 0) {
461 0 : GDKsyserr(ret, "Creating TLS key for thread failed");
462 0 : return GDK_FAIL;
463 : }
464 : #else
465 : if ((*newkey = TlsAlloc()) == TLS_OUT_OF_INDEXES) {
466 : GDKwinerror("Creating TLS key for thread failed");
467 : return GDK_FAIL;
468 : }
469 : #endif
470 : return GDK_SUCCEED;
471 : }
472 :
473 : void
474 49475 : MT_tls_set(MT_TLS_t key, void *val)
475 : {
476 : #ifdef HAVE_PTHREAD_H
477 49475 : pthread_setspecific(key, val);
478 : #else
479 : assert(key != TLS_OUT_OF_INDEXES);
480 : TlsSetValue(key, val);
481 : #endif
482 49466 : }
483 :
484 : void *
485 85911 : MT_tls_get(MT_TLS_t key)
486 : {
487 : #ifdef HAVE_PTHREAD_H
488 85911 : return pthread_getspecific(key);
489 : #else
490 : assert(key != TLS_OUT_OF_INDEXES);
491 : return TlsGetValue(key);
492 : #endif
493 : }
494 :
495 : const char *
496 1103727856 : MT_thread_getname(void)
497 : {
498 1103727856 : struct mtthread *self;
499 :
500 1103727856 : if (!thread_initialized)
501 : return mainthread.threadname;
502 1103768595 : self = thread_self();
503 1103621449 : return self ? self->threadname : UNKNOWN_THREAD;
504 : }
505 :
506 : void
507 94995 : GDKsetbuf(char *errbuf)
508 : {
509 94995 : struct mtthread *self;
510 :
511 94995 : self = thread_self();
512 94996 : if (self == NULL)
513 0 : self = &mainthread;
514 94996 : assert(errbuf == NULL || self->errbuf == NULL);
515 94996 : self->errbuf = errbuf;
516 47496 : if (errbuf)
517 47500 : *errbuf = 0; /* start clean */
518 94996 : }
519 :
520 : char *
521 15792746 : GDKgetbuf(void)
522 : {
523 15792746 : struct mtthread *self;
524 :
525 15792746 : self = thread_self();
526 15796214 : if (self == NULL)
527 0 : self = &mainthread;
528 15796214 : return self->errbuf;
529 : }
530 :
531 : struct freebats *
532 31014116 : MT_thread_getfreebats(void)
533 : {
534 31014116 : struct mtthread *self;
535 :
536 31014116 : self = thread_self();
537 31016892 : if (self == NULL)
538 0 : self = &mainthread;
539 31016892 : return &self->freebats;
540 : }
541 :
542 : void
543 0 : MT_thread_setdata(void *data)
544 : {
545 0 : if (!thread_initialized)
546 : return;
547 0 : struct mtthread *self = thread_self();
548 :
549 0 : if (self)
550 0 : self->data = data;
551 : }
552 :
553 : void *
554 0 : MT_thread_getdata(void)
555 : {
556 0 : if (!thread_initialized)
557 : return NULL;
558 0 : struct mtthread *self = thread_self();
559 :
560 0 : return self ? self->data : NULL;
561 : }
562 :
563 : void
564 16055595 : MT_thread_set_qry_ctx(QryCtx *ctx)
565 : {
566 16055595 : if (!thread_initialized)
567 : return;
568 16056682 : struct mtthread *self = thread_self();
569 :
570 16054483 : if (self)
571 16054483 : self->qry_ctx = ctx;
572 : }
573 :
574 : QryCtx *
575 25288815 : MT_thread_get_qry_ctx(void)
576 : {
577 25288815 : if (!thread_initialized)
578 : return NULL;
579 25288815 : struct mtthread *self = thread_self();
580 :
581 25289362 : return self ? self->qry_ctx : NULL;
582 : }
583 :
584 : void
585 5982129 : MT_thread_setlockwait(MT_Lock *lock)
586 : {
587 5982129 : if (!thread_initialized)
588 : return;
589 5982265 : struct mtthread *self = thread_self();
590 :
591 5984011 : if (self)
592 5984011 : self->lockwait = lock;
593 : }
594 :
595 : void
596 11452563 : MT_thread_setsemawait(MT_Sema *sema)
597 : {
598 11452563 : if (!thread_initialized)
599 : return;
600 11452603 : struct mtthread *self = thread_self();
601 :
602 11454210 : if (self)
603 11454210 : self->semawait = sema;
604 : }
605 :
606 : static void
607 18 : MT_thread_setcondwait(MT_Cond *cond)
608 : {
609 18 : if (!thread_initialized)
610 : return;
611 18 : struct mtthread *self = thread_self();
612 :
613 18 : if (self)
614 18 : self->condwait = cond;
615 : }
616 :
617 : #ifdef LOCK_OWNER
618 : void
619 1100032473 : MT_thread_add_mylock(MT_Lock *lock)
620 : {
621 1100032473 : struct mtthread *self;
622 1100032473 : if (!thread_initialized)
623 : self = &mainthread;
624 : else
625 1100143728 : self = thread_self();
626 :
627 1100206547 : if (self) {
628 1100095292 : lock->nxt = self->mylocks;
629 1100095292 : self->mylocks = lock;
630 : }
631 1100095292 : }
632 :
633 : void
634 1097373774 : MT_thread_del_mylock(MT_Lock *lock)
635 : {
636 1097373774 : struct mtthread *self;
637 1097373774 : if (!thread_initialized)
638 : self = &mainthread;
639 : else
640 1097514887 : self = thread_self();
641 :
642 1098223111 : if (self) {
643 1098081998 : if (self->mylocks == lock) {
644 1098066089 : self->mylocks = lock->nxt;
645 : } else {
646 147332 : for (MT_Lock *l = self->mylocks; l; l = l->nxt) {
647 147332 : if (l->nxt == lock) {
648 15909 : l->nxt = lock->nxt;
649 15909 : break;
650 : }
651 : }
652 : }
653 : }
654 1098081998 : }
655 : #endif
656 :
657 : void
658 13096732 : MT_thread_setworking(const char *work)
659 : {
660 13096732 : if (!thread_initialized)
661 : return;
662 13097456 : struct mtthread *self = thread_self();
663 :
664 13104223 : if (self) {
665 13104223 : if (work == NULL)
666 837236 : self->working = NULL;
667 12266987 : else if (strcmp(work, "store locked") == 0)
668 959965 : self->limit_override = true;
669 11307022 : else if (strcmp(work, "store unlocked") == 0)
670 959965 : self->limit_override = false;
671 : else
672 10347057 : self->working = work;
673 : }
674 : }
675 :
676 : void
677 56499805 : MT_thread_setalgorithm(const char *algo)
678 : {
679 56499805 : if (!thread_initialized)
680 : return;
681 56499886 : struct mtthread *self = thread_self();
682 :
683 56502210 : if (self) {
684 56502210 : if (algo) {
685 3538951 : if (self->algolen > 0) {
686 2187070 : if (self->algolen < sizeof(self->algorithm))
687 1252267 : self->algolen += strconcat_len(self->algorithm + self->algolen, sizeof(self->algorithm) - self->algolen, "; ", algo, NULL);
688 : } else
689 1351881 : self->algolen = strcpy_len(self->algorithm, algo, sizeof(self->algorithm));
690 : } else {
691 52963259 : self->algorithm[0] = 0;
692 52963259 : self->algolen = 0;
693 : }
694 : }
695 : }
696 :
697 : const char *
698 1678 : MT_thread_getalgorithm(void)
699 : {
700 1678 : if (!thread_initialized)
701 : return NULL;
702 1678 : struct mtthread *self = thread_self();
703 :
704 1679 : return self && self->algorithm[0] ? self->algorithm : NULL;
705 : }
706 :
707 : bool
708 0 : MT_thread_override_limits(void)
709 : {
710 0 : if (!thread_initialized)
711 : return false;
712 0 : struct mtthread *self = thread_self();
713 :
714 0 : return self && self->limit_override;
715 : }
716 :
717 : static struct thread_init_cb {
718 : struct thread_init_cb *next;
719 : void (*init)(void *);
720 : void (*destroy)(void *);
721 : void *data;
722 : } *init_cb;
723 : static MT_Lock thread_init_lock = MT_LOCK_INITIALIZER(thread_init_lock);
724 :
725 : gdk_return
726 324 : MT_thread_init_add_callback(void (*init)(void *), void (*destroy)(void *), void *data)
727 : {
728 324 : struct thread_init_cb *p = GDKmalloc(sizeof(struct thread_init_cb));
729 :
730 324 : if (p == NULL)
731 : return GDK_FAIL;
732 324 : *p = (struct thread_init_cb) {
733 : .init = init,
734 : .destroy = destroy,
735 : .next = NULL,
736 : .data = data,
737 : };
738 324 : MT_lock_set(&thread_init_lock);
739 324 : struct thread_init_cb **pp = &init_cb;
740 324 : while (*pp)
741 0 : pp = &(*pp)->next;
742 324 : *pp = p;
743 324 : MT_lock_unset(&thread_init_lock);
744 324 : return GDK_SUCCEED;
745 : }
746 :
747 : #ifdef HAVE_PTHREAD_H
748 : static void *
749 : #else
750 : static DWORD WINAPI
751 : #endif
752 49797 : thread_starter(void *arg)
753 : {
754 49797 : struct mtthread *self = (struct mtthread *) arg;
755 49797 : void *data = self->data;
756 :
757 : #ifdef HAVE_GETTID
758 49797 : self->lwptid = gettid();
759 : #endif
760 : #ifdef HAVE_PTHREAD_H
761 : #ifdef HAVE_PTHREAD_SETNAME_NP
762 : /* name can be at most 16 chars including \0 */
763 49794 : char name[16];
764 49794 : (void) strcpy_len(name, self->threadname, sizeof(name));
765 49796 : pthread_setname_np(
766 : #ifndef __APPLE__
767 : pthread_self(),
768 : #endif
769 : name);
770 : #endif
771 : #else
772 : #ifdef HAVE_SETTHREADDESCRIPTION
773 : wchar_t *wname = utf8towchar(self->threadname);
774 : if (wname != NULL) {
775 : SetThreadDescription(GetCurrentThread(), wname);
776 : free(wname);
777 : }
778 : #endif
779 : #endif
780 49796 : self->data = NULL;
781 49796 : self->sp = THRsp();
782 49797 : thread_setself(self);
783 99173 : for (int i = 0; i < self->nthread_funcs; i++) {
784 49382 : if (self->thread_funcs[i].init)
785 49382 : (*self->thread_funcs[i].init)(self->thread_funcs[i].data);
786 : }
787 49788 : (*self->func)(data);
788 99172 : for (int i = 0; i < self->nthread_funcs; i++) {
789 49384 : if (self->thread_funcs[i].destroy)
790 49384 : (*self->thread_funcs[i].destroy)(self->thread_funcs[i].data);
791 : }
792 49789 : free(self->thread_funcs);
793 49789 : BBPrelinquishbats();
794 49789 : ATOMIC_SET(&self->exited, 1);
795 49789 : TRC_DEBUG(THRD, "Exit thread \"%s\"\n", self->threadname);
796 49789 : return 0; /* NULL for pthreads, 0 for Windows */
797 : }
798 :
799 : static void
800 60507 : join_threads(void)
801 : {
802 60507 : bool waited;
803 :
804 60507 : struct mtthread *self = thread_self();
805 60507 : if (!self)
806 : return;
807 60507 : thread_lock();
808 98559 : do {
809 98559 : waited = false;
810 2519859 : for (struct mtthread *t = mtthreads; t; t = t->next) {
811 2459352 : if (ATOMIC_GET(&t->exited) && t->detached && !t->waiting) {
812 38052 : t->waiting = true;
813 38052 : thread_unlock();
814 38052 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
815 38052 : self->joinwait = t;
816 : #ifdef HAVE_PTHREAD_H
817 38052 : pthread_join(t->hdl, NULL);
818 : #else
819 : WaitForSingleObject(t->hdl, INFINITE);
820 : #endif
821 38052 : self->joinwait = NULL;
822 : #ifndef HAVE_PTHREAD_H
823 : CloseHandle(t->hdl);
824 : #endif
825 38052 : rm_mtthread(t);
826 38052 : waited = true;
827 38052 : thread_lock();
828 38052 : break;
829 : }
830 : }
831 98559 : } while (waited);
832 60507 : thread_unlock();
833 : }
834 :
835 : void
836 668 : join_detached_threads(void)
837 : {
838 668 : bool waited;
839 :
840 668 : struct mtthread *self = thread_self();
841 668 : thread_lock();
842 2029 : do {
843 2029 : waited = false;
844 7339 : for (struct mtthread *t = mtthreads; t; t = t->next) {
845 6671 : if (t->detached && !t->waiting) {
846 1361 : t->waiting = true;
847 1361 : thread_unlock();
848 1361 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
849 1361 : self->joinwait = t;
850 : #ifdef HAVE_PTHREAD_H
851 1361 : pthread_join(t->hdl, NULL);
852 : #else
853 : WaitForSingleObject(t->hdl, INFINITE);
854 : #endif
855 1361 : self->joinwait = NULL;
856 : #ifndef HAVE_PTHREAD_H
857 : CloseHandle(t->hdl);
858 : #endif
859 1361 : rm_mtthread(t);
860 1361 : waited = true;
861 1361 : thread_lock();
862 1361 : break;
863 : }
864 : }
865 2029 : } while (waited);
866 668 : thread_unlock();
867 668 : }
868 :
869 : int
870 49797 : MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname)
871 : {
872 49797 : struct mtthread *self;
873 :
874 49797 : assert(thread_initialized);
875 49797 : join_threads();
876 49797 : if (threadname == NULL) {
877 0 : TRC_CRITICAL(GDK, "Thread must have a name\n");
878 0 : return -1;
879 : }
880 49797 : if (strlen(threadname) >= sizeof(self->threadname)) {
881 0 : TRC_CRITICAL(GDK, "Thread's name is too large\n");
882 0 : return -1;
883 : }
884 :
885 : #ifdef HAVE_PTHREAD_H
886 49797 : pthread_attr_t attr;
887 49797 : int ret;
888 49797 : if ((ret = pthread_attr_init(&attr)) != 0) {
889 0 : GDKsyserr(ret, "Cannot init pthread attr");
890 0 : return -1;
891 : }
892 49797 : if ((ret = pthread_attr_setstacksize(&attr, THREAD_STACK_SIZE)) != 0) {
893 0 : GDKsyserr(ret, "Cannot set stack size");
894 0 : pthread_attr_destroy(&attr);
895 0 : return -1;
896 : }
897 : #endif
898 49797 : self = malloc(sizeof(*self));
899 49797 : if (self == NULL) {
900 0 : GDKsyserror("Cannot allocate memory\n");
901 : #ifdef HAVE_PTHREAD_H
902 0 : pthread_attr_destroy(&attr);
903 : #endif
904 0 : return -1;
905 : }
906 :
907 49797 : *self = (struct mtthread) {
908 : .func = f,
909 : .data = arg,
910 : .waiting = false,
911 49797 : .detached = (d == MT_THR_DETACHED),
912 : .refs = 1,
913 49797 : .tid = (MT_Id) ATOMIC_INC(&GDKthreadid),
914 : };
915 49797 : MT_lock_set(&thread_init_lock);
916 : /* remember the list of callback functions we need to call for
917 : * this thread (i.e. anything registered so far) */
918 99182 : for (struct thread_init_cb *p = init_cb; p; p = p->next)
919 49385 : self->nthread_funcs++;
920 49797 : if (self->nthread_funcs > 0) {
921 49385 : self->thread_funcs = malloc(self->nthread_funcs * sizeof(*self->thread_funcs));
922 49385 : if (self->thread_funcs == NULL) {
923 0 : GDKsyserror("Cannot allocate memory\n");
924 0 : MT_lock_unset(&thread_init_lock);
925 0 : free(self);
926 : #ifdef HAVE_PTHREAD_H
927 0 : pthread_attr_destroy(&attr);
928 : #endif
929 0 : return -1;
930 : }
931 49385 : int n = 0;
932 98770 : for (struct thread_init_cb *p = init_cb; p; p = p->next) {
933 49385 : self->thread_funcs[n++] = (struct thread_funcs) {
934 49385 : .init = p->init,
935 49385 : .destroy = p->destroy,
936 49385 : .data = p->data,
937 : };
938 : }
939 : }
940 49797 : MT_lock_unset(&thread_init_lock);
941 :
942 49797 : ATOMIC_INIT(&self->exited, 0);
943 49797 : strcpy_len(self->threadname, threadname, sizeof(self->threadname));
944 49797 : char *p;
945 49797 : if ((p = strstr(self->threadname, "XXXX")) != NULL) {
946 : /* overwrite XXXX with thread ID; bottom three bits are
947 : * likely 0, so skip those */
948 46121 : char buf[5];
949 46121 : snprintf(buf, 5, "%04zu", self->tid % 9999);
950 46121 : memcpy(p, buf, 4);
951 : }
952 49797 : TRC_DEBUG(THRD, "Create thread \"%s\"\n", self->threadname);
953 : #ifdef HAVE_PTHREAD_H
954 : #ifdef HAVE_PTHREAD_SIGMASK
955 49797 : sigset_t new_mask, orig_mask;
956 49797 : (void) sigfillset(&new_mask);
957 49797 : sigdelset(&new_mask, SIGQUIT);
958 49797 : sigdelset(&new_mask, SIGPROF);
959 49797 : pthread_sigmask(SIG_SETMASK, &new_mask, &orig_mask);
960 : #endif
961 49797 : ret = pthread_create(&self->hdl, &attr, thread_starter, self);
962 49797 : pthread_attr_destroy(&attr);
963 : #ifdef HAVE_PTHREAD_SIGMASK
964 49797 : pthread_sigmask(SIG_SETMASK, &orig_mask, NULL);
965 : #endif
966 49797 : if (ret != 0) {
967 0 : GDKsyserr(ret, "Cannot start thread");
968 0 : free(self->thread_funcs);
969 0 : free(self);
970 0 : return -1;
971 : }
972 : #else
973 : self->hdl = CreateThread(NULL, THREAD_STACK_SIZE, thread_starter, self,
974 : 0, &self->wtid);
975 : if (self->hdl == NULL) {
976 : GDKwinerror("Failed to create thread");
977 : free(self->thread_funcs);
978 : free(self);
979 : return -1;
980 : }
981 : #endif
982 : /* must not fail after this: the thread has been started */
983 49797 : *t = self->tid;
984 49797 : thread_lock();
985 49797 : self->next = mtthreads;
986 49797 : mtthreads = self;
987 49797 : thread_unlock();
988 49797 : return 0;
989 : }
990 :
991 : MT_Id
992 53199197 : MT_getpid(void)
993 : {
994 53199197 : struct mtthread *self;
995 :
996 53199197 : if (!thread_initialized)
997 : self = &mainthread;
998 : else
999 53199322 : self = thread_self();
1000 53197844 : return self->tid;
1001 : }
1002 :
1003 : void
1004 38194 : MT_exiting_thread(void)
1005 : {
1006 38194 : struct mtthread *self;
1007 :
1008 38194 : if (!thread_initialized)
1009 : return;
1010 38194 : self = thread_self();
1011 38194 : if (self) {
1012 38194 : ATOMIC_SET(&self->exited, 1);
1013 38194 : self->working = NULL;
1014 : }
1015 : }
1016 :
1017 : int
1018 10376 : MT_join_thread(MT_Id tid)
1019 : {
1020 10376 : struct mtthread *t;
1021 :
1022 10376 : assert(tid != mainthread.tid);
1023 10376 : join_threads();
1024 10376 : t = find_mtthread(tid);
1025 10376 : if (t == NULL
1026 : #ifndef HAVE_PTHREAD_H
1027 : || t->hdl == NULL
1028 : #endif
1029 : )
1030 : return -1;
1031 10376 : TRC_DEBUG(THRD, "Join thread \"%s\"\n", t->threadname);
1032 10376 : struct mtthread *self = thread_self();
1033 10376 : self->joinwait = t;
1034 : #ifdef HAVE_PTHREAD_H
1035 10376 : int ret = pthread_join(t->hdl, NULL);
1036 : #else
1037 : DWORD ret = WaitForSingleObject(t->hdl, INFINITE);
1038 : #endif
1039 10376 : self->joinwait = NULL;
1040 10376 : if (
1041 : #ifdef HAVE_PTHREAD_H
1042 : ret == 0
1043 : #else
1044 : ret == WAIT_OBJECT_0 && CloseHandle(t->hdl)
1045 : #endif
1046 : ) {
1047 10376 : rm_mtthread(t);
1048 10376 : return 0;
1049 : }
1050 : return -1;
1051 : }
1052 :
1053 : static bool
1054 0 : MT_kill_thread(struct mtthread *t)
1055 : {
1056 0 : assert(t != thread_self());
1057 : #ifdef HAVE_PTHREAD_H
1058 : #ifdef HAVE_PTHREAD_KILL
1059 0 : if (pthread_kill(t->hdl, SIGHUP) == 0)
1060 0 : return true;
1061 : #endif
1062 : #else
1063 : if (t->hdl == NULL) {
1064 : /* detached thread */
1065 : HANDLE h;
1066 : bool ret = false;
1067 : h = OpenThread(THREAD_ALL_ACCESS, 0, t->wtid);
1068 : if (h == NULL)
1069 : return false;
1070 : if (TerminateThread(h, -1))
1071 : ret = true;
1072 : CloseHandle(h);
1073 : return ret;
1074 : }
1075 : if (TerminateThread(t->hdl, -1))
1076 : return true;
1077 : #endif
1078 : return false;
1079 : }
1080 :
1081 : bool
1082 334 : MT_kill_threads(void)
1083 : {
1084 334 : struct mtthread *self = thread_self();
1085 334 : bool killed = false;
1086 :
1087 334 : assert(self == &mainthread);
1088 334 : join_threads();
1089 334 : thread_lock();
1090 668 : for (struct mtthread *t = mtthreads; t; t = t->next) {
1091 334 : if (t == self)
1092 334 : continue;
1093 0 : TRC_INFO(GDK, "Killing thread %s\n", t->threadname);
1094 0 : killed |= MT_kill_thread(t);
1095 : }
1096 334 : thread_unlock();
1097 334 : return killed;
1098 : }
1099 :
1100 : int
1101 336 : MT_check_nr_cores(void)
1102 : {
1103 336 : int ncpus = -1;
1104 :
1105 : #if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_ONLN)
1106 : /* this works on Linux, Solaris and AIX */
1107 336 : ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1108 : #elif defined(HW_NCPU) /* BSD */
1109 : size_t len = sizeof(int);
1110 : int mib[3];
1111 :
1112 : /* Everyone should have permission to make this call,
1113 : * if we get a failure something is really wrong. */
1114 : mib[0] = CTL_HW;
1115 : mib[1] = HW_NCPU;
1116 : mib[2] = -1;
1117 : sysctl(mib, 3, &ncpus, &len, NULL, 0);
1118 : #elif defined(WIN32)
1119 : SYSTEM_INFO sysinfo;
1120 :
1121 : GetSystemInfo(&sysinfo);
1122 : ncpus = sysinfo.dwNumberOfProcessors;
1123 : #endif
1124 :
1125 : /* if we ever need HPUX or OSF/1 (hope not), see
1126 : * http://ndevilla.free.fr/threads/ */
1127 :
1128 336 : if (ncpus <= 0)
1129 : ncpus = 1;
1130 : #if SIZEOF_SIZE_T == SIZEOF_INT
1131 : /* On 32-bits systems with large numbers of cpus/cores, we
1132 : * quickly run out of space due to the number of threads in
1133 : * use. Since it is questionable whether many cores on a
1134 : * 32-bits system are going to be beneficial due to this, we
1135 : * simply limit the auto-detected cores to 16 on 32-bits
1136 : * systems. The user can always override this via
1137 : * gdk_nr_threads. */
1138 : if (ncpus > 16)
1139 : ncpus = 16;
1140 : #endif
1141 :
1142 : #ifndef WIN32
1143 : /* get the number of allocated cpus from the cgroup settings */
1144 336 : FILE *f = fopen("/sys/fs/cgroup/cpuset/cpuset.cpus", "r");
1145 336 : if (f != NULL) {
1146 0 : char buf[512];
1147 0 : char *p = fgets(buf, 512, f);
1148 0 : fclose(f);
1149 0 : if (p != NULL) {
1150 : /* syntax is: ranges of CPU numbers separated
1151 : * by comma; a range is either a single CPU
1152 : * id, or two IDs separated by a minus; any
1153 : * deviation causes the file to be ignored */
1154 : int ncpu = 0;
1155 0 : for (;;) {
1156 0 : char *q;
1157 0 : unsigned fst = strtoul(p, &q, 10);
1158 0 : if (q == p)
1159 0 : return ncpus;
1160 0 : ncpu++;
1161 0 : if (*q == '-') {
1162 0 : p = q + 1;
1163 0 : unsigned lst = strtoul(p, &q, 10);
1164 0 : if (q == p || lst <= fst)
1165 : return ncpus;
1166 0 : ncpu += lst - fst;
1167 : }
1168 0 : if (*q == '\n')
1169 : break;
1170 0 : if (*q != ',')
1171 : return ncpus;
1172 0 : p = q + 1;
1173 : }
1174 0 : if (ncpu < ncpus)
1175 : return ncpu;
1176 : }
1177 : }
1178 : #endif
1179 :
1180 : return ncpus;
1181 : }
1182 :
1183 :
1184 : void
1185 336 : MT_cond_init(MT_Cond *cond)
1186 : {
1187 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1188 : InitializeConditionVariable(&cond->cv);
1189 : #else
1190 336 : pthread_cond_init(&cond->cv, NULL);
1191 : #endif
1192 336 : }
1193 :
1194 :
1195 : void
1196 0 : MT_cond_destroy(MT_Cond *cond)
1197 : {
1198 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1199 : /* no need */
1200 : #else
1201 0 : pthread_cond_destroy(&cond->cv);
1202 : #endif
1203 0 : }
1204 :
1205 : void
1206 9 : MT_cond_wait(MT_Cond *cond, MT_Lock *lock)
1207 : {
1208 9 : MT_thread_setcondwait(cond);
1209 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1210 : SleepConditionVariableCS(&cond->cv, &lock->lock, INFINITE);
1211 : #else
1212 9 : pthread_cond_wait(&cond->cv, &lock->lock);
1213 : #endif
1214 9 : MT_thread_setcondwait(NULL);
1215 9 : }
1216 :
1217 : void
1218 54445 : MT_cond_signal(MT_Cond *cond)
1219 : {
1220 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1221 : WakeConditionVariable(&cond->cv);
1222 : #else
1223 54445 : pthread_cond_signal(&cond->cv);
1224 : #endif
1225 54445 : }
1226 :
1227 : void
1228 2 : MT_cond_broadcast(MT_Cond *cond)
1229 : {
1230 : #if !defined(HAVE_PTHREAD_H) && defined(WIN32)
1231 : WakeAllConditionVariable(&cond->cv);
1232 : #else
1233 2 : pthread_cond_broadcast(&cond->cv);
1234 : #endif
1235 2 : }
|