Linux 多线程编程问题 1重入问题 传统的UNIX没有太多考虑线程问题,库函数里过多使用了全局和静态数据,导致严重的线程重入问题。 1.1–D_REENTRANT /-pthread和errno的重入问题。 所先UNIX的系统调用被设计为出错返回-1,把错误码放在errno中(更简单而直 接的方法应该是程序直接返回错误码,或者通过几个参数指针来返回)。由于线程 共享所有的数据区,而errno是一个全局的变量,这里产生了最糟糕的线程重入问 题。比如: do { bytes = recv(netfd, recvbuf, buflen, 0); } while (bytes != -1 && errno != EINTR); 在上面的处理recv被信号打断的程序里。如果这时连接被关闭,此时errno应该不 等于EINTR,如果别的线程正好设置errno为EINTR,这时程序就可能进入死循环。 其它的错误码处理也可能进入不可预测的分支。 在线程需求刚开始时,很多方面技术和标准(TLS)还不够成熟,所以在 为 了 解决这个重入问题引入了一个解决方案,把errno定义为一个宏: extern int *__errno_location (void); #define errno (*__errno_location()) 在上面的方案里,访问errno之前先调用__errno_location()函数,线程库提供这个 函数,不同线程返回各自errno的地址,从而解决这个重入问题。在编译时加 -D_REENTRANT就是启用上面的宏,避免errno重入。另外 -D_REENTRANT 还影响一些stdio的函数。在较高版本的gcc里,有很多嵌入函数的优化,比如把 printf(“Hello/n”); 优化为 puts(“hello/n”); 之类的,有些优化在多线程下有问题。所以gcc引入了 –pthread 参数,这个 参数出了-D_REENTRANT外,还校正一些针对多线程的优化。 因为宏是编译时确定的,所以没有加-D_REENTRANT编译的程序和库都有errno 重入问题,原则上都不能在线程环境下使用。不过在一般实现上主线程是直接使用 全局errno变量的,也就是 __errno_location()返回值为全局&errno,所以那些没加 -D_REENTRANT编译的库可以在主线程里使用。这里仅限于主线程,有其它且只 有一个固定子线程使用也不行,因为子线程使用的errno地址不是全局errno变量 地址。 对于一个纯算法的库,不涉及到errno和stdio等等,有时不加_REENTRANT也是 安全的,比如一个纯粹的加密/解谜函数库。比较简单的判断一个库是否有errno问 题是看看这个库是使用了errno还是__errno_location(): readelf -s libxxx.so | grep errno 另外一个和errno类似的变量是DNS解析里用到的h_errno变量,这个变量的重入 和处理与errno一样。这个h_errno用于gethostbyXX这个系列的函数。 1.2库函数重入 早期很多unix函数设计成返回静态buffer。这些函数都是不能重入的。识别这 些函数有几个简单的规则: 1.2.1stdio函数是可以重入的。这是因为stdio函数入口都会调用flockfile()锁定FILE。另外stdio也提供不锁定(非重入)的函数,这些函数以_unlock结尾,具体参见man unlocked_stdio。利用这些特性可以做到多个stdio的互斥操作。如: flockfile(fp); fwrite_unlocked(rec1, reclen1, 1, fp); fwrite_unlocked(rec2, reclen2, 1, fp); funlockfile(fp); 1.2.2返回动态分配数据的函数,这些一般是可以重入的。这些函数的特点是返回的指针需要显式释放,用free或者配对的释放函数。如: getaddrinfo /freeaddrinfo malloc/strdup/calloc/free fopen/fdopen/popen/fclose get_current_dir_name/free asprintf/vasprintf/free getline/getdelim/free regcomp/regfree 1.2.3函数返回一个和输入参数无关的数据,而且不需要free的大部分情况下是不可重入的。如gmtime, ntoa, gethostbyname… 1.2.4函数依赖一个全局数据,在多次或者多个函数间维护状态的函数是不可重入的。如getpwent, rand… 1.2.5带有_r变体的函数都是不可重入的。这些函数大部分是上面两类的。这些变体函数是可重入的代替版本。可以用下面命令查看glibc有多少这种函数: readelf -s /lib/libc.so.6 | grep _r@ 这些函数名有很大一部分是 getXXbyYY, getXXid, getXXent, getXXnam 1.2.6rand,lrand48系列随机数不可重入的原因在于这些函数使用一个全局的状态,并且都有相应的_r变体。重入这些非线程安全的函数不会有稳定性问题,不过会导致随机数不随机(可预测)。在要求比较严格的随机数应用里,建议用/dev/random和/dev/urandom,这两个设备的不同在于前者读出的数据理论上是绝对随机的,在系统无法提供足够随机数据时读会阻塞。后者只是提供尽量随即的数据,随机度不够时用算法生成伪随机数来代替,所以不会阻塞。 1.2.7不可重入函数处理。对大部分不可重入函数可以使用对应的_r变体。有些函数可能没有对应_r变体,可以选用类似功能的函数替换。如: inet_ntoa inet_ntop ctime strftime, asctime, localtime_r+sprintf gethostbyname, getservbyname getaddrinfo 1.2.8用其它代码/逻辑替换不可重入代码 1.2.9有些库有两个版本,带和不带_r/_mt/th等后缀的,多线程一般用带后缀的版本的库。 1.3应用程序的线程安全 1.3.1全局量/共享资源互斥访问 1.3.2相关数据原子操作 1.3.3操作顺序 2互斥逻辑 同步逻辑不仅仅是多线程程序的问题,在多进程环境里也经常使用。同步逻辑有很多种,其中最常用的就是互斥逻辑,也就是锁。由于历史原因,LINUX下产生了好多锁定API,下面列个简单的表格: Fcntl文件锁 Flock文件锁 SYSV semaphore Mutex rwlock 类别 读写锁 读写锁 信号量 互斥锁 读写锁 对象 进程/文件/范围 句柄 信号值 内存 内存 非竞争开销 高 高 中 很低 低 竞争开销 高 高 中 很高/linuxthread 较低/NPTL 很高/linuxthread 低/NPTL 资源消耗 句柄 句柄 信号ID 24 byte内存 32 byte内存 进程 支持 支持 支持 不支持/linuxthread 支持/NPTL 不支持/linuxthread 支持/NPTL 线程 支持(2.4) 不支持(2.6) 支持 支持 支持 支持 Crash解锁 是 是 需要UNDO 否 否 2.1Fcntl文件锁 2.1.1支持偏移量。也就是可以用一个文件模拟许多互斥锁。 2.1.2进程锁非线程锁。也就是线程之间无法互斥。老的2.4 kernel没有支持这个POSIX标准,所以可以跨线程使用。 2.1.3相关句柄关闭导致文件解锁。这个锁是按进程+文件定位的,也就是同一进程打开多次文件使用相同的锁定关系。即使只关闭其中一个句柄导致解锁。在2.4 kernel下也有这个问题,任何线程关闭对应文件句柄,不是导致该线程解锁,而是导致所有线程解锁。 2.1.4逻辑死锁检测。 2.2Flock文件锁 2.2.1按句柄锁定 2.2.2进程的句柄继承 2.3SYSV semaphore 2.3.1信号量。 2.3.2性能比文件锁要好。 2.3.3可以同时对多个信号量进行复合操作 2.3.4/proc/sys/kernel/sem: SEMMSL SEMMNS SEMOPM SEMMNI 2.3.4.1SEMMSL, 每个信号量ID里的最大信号量数 2.3.4.2SEMMNS, 系统总信量灯数,小于SEMMSL x SEMMNI 2.3.4.3SEMOPM, 每次semop最大操作个数 2.3.4.4SEMMNI, 信号量ID数 2.3.5高版kernel有等待超时机制 #include <unistd.h> #include <asm/unistd.h> #ifndef SEMTIMEDOP #define SEMTIMEDOP 4 #endif static inline int semtimedop(int semid, struct sembuf *sops, unsigned nsops, struct timespec *timeout) { return syscall(__NR_ipc, SEMTIMEDOP, semid, nsops, 0, sops, timeout); } 2.4Mutex和rwlock 2.4.1在非竞争下性能最好 2.4.2NPTL使用futex实现,竞争条件下性能也不错。Linuxthread在竞争时由管理线程仲裁,开销较大。 2.4.3无crash自动解锁机制 2.4.4有等待超时机制 2.5内存原子操作 2.5.1内存原子操作是多CPU系统里最基本的互斥操作。所有的其它逻辑都是建立这之上的。 2.5.2整数操作,操作书为一个int类型。有些非x86的CPU只支持到24位值。 #include <asm/atomic.h> atomic_t value; int v; v=atomic_read(&value); atomic_set(&value, v); atomic_add(v, &value); atomic_sub(v, &value); atomic_sub_and_test(v, &value); /*返回结果是否为0*/ atomic_inc(&value); atomic_dec(&value); atomic_dec_and_test(&value); /*返回结果是否为0*/ atomic_inc_and_test(&value); /*返回结果是否为0*/ atomic_set_mask(mask, &value); atomic_clear_mask(mask, &value); 2.5.3bitmap位图操作 #include <asm/bitops.h> void * bitmap; int nr; set_bit(nr, bitmap); /* OR */ clear_bit(nr, bitmap); /* AND ~ */ change_bit(nr, bitmap); /* XOR */ test_bit(nr, bitmap); test_and_set_bit(nr, bitmap); /* 返回旧值 */ test_and_clear_bit(nr, bitmap); /* 返回旧值 */ test_and_change_bit(nr, bitmap); /* 返回新值 */ 这些函数前面可以加两个下划线__表示非原子操作。非原子位图操作最好用 FD_SET, FD_CLR, FD_ISSET。 2.6Tag文件,用tag文件可以实现简单的trylock/unlock重入逻辑。 char *tagfile; open(tagfile, O_RDWR|O_CREAT|O_EXCL, 0666); /* trylock */ unlink(tagfile); /* unlock */ daemon里经常使用这种tag文件,并且在文件里记录自己的pid。在该pid文件存在时,还可以进一步检查里面的pid是否还在运行。 2.7Stdio。 2.8Futex 直接使用futex需要2.6 kernel支持。在有些特殊情况下可以考虑用futex代替mutex。 2.8.1内存考虑,mutex需要占24字节,而futex只需4个字节。 2.8.2简单的等待唤醒逻辑 2.8.3Futex可以转为文件句柄,并在上面poll唤醒事件 2.9海量稀疏锁 也就是需要上万个锁的情形,而且大部分事件只有少数上锁。以上各种锁中: 2.9.1Flock锁需要大量文件,显然不合适。 2.9.2fcntl文件锁可以用大量偏移量模拟大量锁,使用上非常方便。问题是在2.6 的kernel + 线程应用上无法使用。Fcntl文件锁在核心是在核心维护一张被锁定的区域表,锁定关系通过修改该表来实现。这导致 2.9.2.1修改该表的所有操作互斥的,也就是不同偏移量的lock和unlock操作无法并行处理。 2.9.2.2修改锁定范围表的操作也较复杂,CPU开销也大些。 2.9.2.3内存等等开销在于锁定表。也就是取决于同时锁定的数目,和总的锁数目无关。 2.9.3SYSV信号量 SYS的信号量缺省设置比较小。要作为海量锁要求需要修改系统配置。 2.9.3.1内存需求与锁数目成正比,大量锁的情况下核心大约需要8字节/锁。 2.9.3.2性能和速度都比文件锁要好。 2.9.3.3不能使用UNDO,没有crash自动解锁功能。 2.9.4Mutex 2.9.4.1内存消耗巨大 2.9.4.2非竞争条件小性能最好 2.9.4.3非NPTL下无法跨进程使用 2.9.4.4无crash自动解锁 2.9.5Futex 和mutex比内存消耗小写,只有4字节/锁。只是要求2.6的kernel。 2.9.6应用级黑名单锁 这种方法有点像在应用级模拟文件锁。也就是在共享内存中保存一份当前被锁定的黑名单。 class lockarray { private: pthread_mutex_t mutex; int lockid[NUM]; int waitnum[NUM]; pthread_cond_t wqueue[NUM]; public: lockarray() { /* 初始化 */ } ~lockarray(){ /* 释放资源 */ } void lock(int id) { pthread_mutex_lock(&mutex); if( id 在 lockid [i] 中) { while(1) { waitnum[i]++; pthread_cond_wait( &wqueue[i], &mutex); waitnum[i]--; if(waitnum[i]==0) { waitnum[i]=1; pthread_mutex_unlock(&mutex); return; } } } /* 查找waitnum[i] 为 0 */ lockid[i]=id; waitnum[i]=1 pthread_mutex_unlock(&mutex); } void unlock(int id) { pthread_mutex_lock(&mutex); if(id 在 lockid[i] 中) { waitnum[i]--; if(waitnum[i] > 0) pthread_cond_signal(&wqueue[i]); } pthread_mutex_unlock(&mutex); } }