Pthreads并行编程入门

周旨
• 阅读 3525

常见的并行编程模型

  • Manager/Worker: manager线程负责分配任务给其他worker线程。manager处理输入输出。又分为static worker pool和dynamic worker pool
  • Pipeline:跟流水线一样,每个线程负责不一样的任务,但是这些任务又是有顺序的
  • Peer:与manager/worker相似,只是manager创建了其他线程之后也会加入工作

线程安全的定义

refer to an application's ability to execute multiple threads at the same time without "clobbering" shared
​ data or creating "race" conditions.

Thread limits

Pthreads API是ANSI/IEEE标准,但是标准未指明的地方,各种不同的实现可能不同。

Pthread API

API分为4部分

类型 功能
thread management create, detach, join thread; get/set thread attributes
mutex Routines that deal with synchronization. an abbreviation for 'mutual exclusion', create, destroy, lock, unlock mutexes
condition variable address communication between threads that share a mutex.
synchronization Routines that manage read/write locks and barriers.

每个使用Pthread库的source file都应该include .h

Thread Management

创建线程与退出线程

pthread_create (thread,attr,start_routine,arg);//thread:线程对象的地址;attr:线程属性对象;start_routine:线程启动后执行的例程;arg:传递给例程的参数
pthread_exit (status);//返回状态码,该方法并不关闭文件,任何线程执行过程中打开的文件在线程关闭之后仍会打开
pthread_cancel (thread);
pthread_attr_init (attr);
pthread_attr_destroy (attr);

pthread_attr_init函数用来初始化一个pthread_attr_t对象, pthread_attr_destroy摧毁一个pthread_attr_t对象

There are several ways in which a thread may be terminated:

  1. The thread returns normally from its starting routine. Its work is done.
  2. The thread makes a call to the pthread_exit subroutine - whether its work is done or not.
  3. The thread is canceled by another thread via the pthread_cancel routine.
  4. The entire <u>process</u> is terminated due to making a call to either the exec() or exit()
  5. If main() finishes first, without calling pthread_exit explicitly itself

There is a definite problem if main() finishes before the threads it spawned if you don't call pthread_exit() explicitly. All of the threads it created will terminate because main() is done and no longer exists to support the threads.

By having main() explicitly call pthread_exit() as the last thing it does, main() will block and be kept alive to support the threads it created until they are done.

example 1

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NUM_THREADS    5

void *PrintHello(void *threadid)
{
   long tid;
   tid = (long)threadid;
   printf("Hello World! It's me, thread #%ld!\n", tid);
   pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
   pthread_t threads[NUM_THREADS];
   int rc;
   long t;
   for(t=0;t<NUM_THREADS;t++){
     printf("In main: creating thread %ld\n", t);
     rc = pthread_create(&threads[t], NULL, PrintHello, (void *)t);
     if (rc){
       printf("ERROR; return code from pthread_create() is %d\n", rc);
       exit(-1);
       }
     }

   /* Last thing that main() should do */
   pthread_exit(NULL);//如果不调用这个,因为上面没有采用任何同步方式,main可能在其他子线程前结束,会有问题,所以pthread_exit()会让main阻塞,直到子线程执行完毕
}
Questions
  1. What's the difference between exit and pthread_exit?

    stackoverflow上说,exit performs normal program termination for the entire process while pthread_exit terminate a thread whether its work is done or not. pthread_exit kills calling thread.

example 2

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NUM_THREADS    8

char *messages[NUM_THREADS];

void *PrintHello(void *threadid)
{
   long taskid;

   sleep(1);
   taskid = (long) threadid;
   printf("Thread %d: %s\n", taskid, messages[taskid]);
   pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
pthread_t threads[NUM_THREADS];
long taskids[NUM_THREADS];
int rc, t;

messages[0] = "English: Hello World!";
messages[1] = "French: Bonjour, le monde!";
messages[2] = "Spanish: Hola al mundo";
messages[3] = "Klingon: Nuq neH!";
messages[4] = "German: Guten Tag, Welt!"; 
messages[5] = "Russian: Zdravstvuyte, mir!";
messages[6] = "Japan: Sekai e konnichiwa!";
messages[7] = "Latin: Orbis, te saluto!";

for(t=0;t<NUM_THREADS;t++) {
  taskids[t] = t;
  printf("Creating thread %d\n", t);
  rc = pthread_create(&threads[t], NULL, PrintHello, (void *) taskids[t]);
  if (rc) {
    printf("ERROR; return code from pthread_create() is %d\n", rc);
    exit(-1);
    }
  }

pthread_exit(NULL);
}

因为PrintHello中调用了sleep(1)所以,运行时大部分情况下,create thread语句会先输出。

example 3

如何传递多个参数:

/******************************************************************************
* FILE: hello_arg2.c
* DESCRIPTION:
*   A "hello world" Pthreads program which demonstrates another safe way
*   to pass arguments to threads during thread creation.  In this case,
*   a structure is used to pass multiple arguments.
* AUTHOR: Blaise Barney
* LAST REVISED: 01/29/09
******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NUM_THREADS    8

char *messages[NUM_THREADS];

struct thread_data
{
   int    thread_id;
   int  sum;
   char *message;
};

struct thread_data thread_data_array[NUM_THREADS];

void *PrintHello(void *threadarg)
{
   int taskid, sum;
   char *hello_msg;
   struct thread_data *my_data;

   sleep(1);
   my_data = (struct thread_data *) threadarg;
   taskid = my_data->thread_id;
   sum = my_data->sum;
   hello_msg = my_data->message;
   printf("Thread %d: %s  Sum=%d\n", taskid, hello_msg, sum);
   pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
pthread_t threads[NUM_THREADS];
int *taskids[NUM_THREADS];
int rc, t, sum;

sum=0;
messages[0] = "English: Hello World!";
messages[1] = "French: Bonjour, le monde!";
messages[2] = "Spanish: Hola al mundo";
messages[3] = "Klingon: Nuq neH!";
messages[4] = "German: Guten Tag, Welt!"; 
messages[5] = "Russian: Zdravstvytye, mir!";
messages[6] = "Japan: Sekai e konnichiwa!";
messages[7] = "Latin: Orbis, te saluto!";

for(t=0;t<NUM_THREADS;t++) {
  sum = sum + t;
  thread_data_array[t].thread_id = t;
  thread_data_array[t].sum = sum;
  thread_data_array[t].message = messages[t];
  printf("Creating thread %d\n", t);
  rc = pthread_create(&threads[t], NULL, PrintHello, (void *) 
       &thread_data_array[t]);
  if (rc) {
    printf("ERROR; return code from pthread_create() is %d\n", rc);
    exit(-1);
    }
  }
pthread_exit(NULL);
}

example 4:

错误传参方法:main中会改变t!

/*****************************************************************************
* FILE: hello_arg3.c
* DESCRIPTION:
*   This "hello world" Pthreads program demonstrates an unsafe (incorrect)
*   way to pass thread arguments at thread creation.  In this case, the
*   argument variable is changed by the main thread as it creates new threads.
* AUTHOR: Blaise Barney
* LAST REVISED: 07/16/14
******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define NUM_THREADS     8

void *PrintHello(void *threadid)
{
   long taskid;
   sleep(1);
   taskid = *(long *)threadid;
   printf("Hello from thread %ld\n", taskid);
   pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
pthread_t threads[NUM_THREADS];
int rc;
long t;

for(t=0;t<NUM_THREADS;t++) {
  printf("Creating thread %ld\n", t);
  rc = pthread_create(&threads[t], NULL, PrintHello, (void *) &t);
  if (rc) {
    printf("ERROR; return code from pthread_create() is %d\n", rc);
    exit(-1);
    }
   }

pthread_exit(NULL);
}

Joining and Detaching Threads

pthread_join (threadid,status)
pthread_detach (threadid)
pthread_attr_setdetachstate (attr,detachstate)
pthread_attr_getdetachstate (attr,detachstate)

joining是一种线程间同步的方式,pthread_join (threadid,status) 会blocks调用线程,直到threadid这个线程终止。

Pthreads并行编程入门

一个joining thread can match one pthread_join() call. It is a logical error to attempt multiple joins on the same thread.

还有另外两种同步方法:mutex和条件变量,后面讨论。

所有线程都可以Join吗?

不是!一个thread创建的时候,它的一个属性决定了它是joinable或者detached。只有joinable的线程才可以join,detached的线程不可以被join。POSIX标准指定, threads should be created as joinable.

那么为了创建一个joinable的thread,我们应该在调用pthread_create的时候指定attr属性。

  1. 创建一个pthread attribute variable of the pthread_attr_t data type
  2. Initialize the attribute variable with pthread_attr_init()
  3. Set the attribute detached status with pthread_attr_setdetachstate()
  4. When done, free library resources used by the attribute with pthread_attr_destroy()

Detaching

pthread_detach()可以用来将创建时是joinable的threads编程detached,但是这种变化是不可逆的。没有将创建时是detached的threads变成joinable的方法。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#define NUM_THREADS    4

void *BusyWork(void *t)
{
    int i;
    long tid;
    double result=0.0;
    tid = (long)t;
    printf("Thread %ld starting...\n",tid);
    for (i=0; i<1000000; i++)
    {
        result = result + sin(i) * tan(i);
    }
    printf("Thread %ld done. Result = %e\n",tid, result);
    pthread_exit((void*) t);
}

int main (int argc, char *argv[])
{
    pthread_t thread[NUM_THREADS];
    pthread_attr_t attr;
    int rc;
    long t;
    void *status;

    /* Initialize and set thread detached attribute */
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    for(t=0; t<NUM_THREADS; t++) {
        printf("Main: creating thread %ld\n", t);
        rc = pthread_create(&thread[t], &attr, BusyWork, (void *)t);
        if (rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    /* Free attribute and wait for the other threads */
    pthread_attr_destroy(&attr);
    for(t=0; t<NUM_THREADS; t++) {
        rc = pthread_join(thread[t], &status);
        if (rc) {
            printf("ERROR; return code from pthread_join() is %d\n", rc);
            exit(-1);
        }
        printf("Main: completed join with thread %ld having a status of %ld\n",t,(long)status);
    }

    printf("Main: program completed. Exiting.\n");
    pthread_exit(NULL);
}

printf("Main: completed join with thread %ld having a status of %ld\n" 一定是按照thread 0, 1, 2, 3的顺序输出的。我的感觉,join就是调用方等待某个特定的线程结束。

注意join第二个参数是void **, 所以传参的时候需要对void *再取&

Stack Management

pthread_attr_getstacksize (attr, stacksize)
pthread_attr_setstacksize (attr, stacksize)
pthread_attr_getstackaddr (attr, stackaddr)
pthread_attr_setstackaddr (attr, stackaddr)

POSIX标准不指示线程的栈大小,因此每个标准的具体实现可能不一样。

很容易超过默认的栈大小,然后程序就终止了。安全的程序不应该依赖于默认的栈大小,而应该使用 pthread_attr_setstacksize 为每个线程显式地分配足够的栈。

pthread_attr_getstackaddr (attr, stackaddr)pthread_attr_setstackaddr (attr, stackaddr) ,当需要将一个thread的stack必须放在某块内存空间时有用。

Miscellaneous Routines

pthread_self ()//pthread_self returns the unique, system assigned thread ID of the calling thread. 
pthread_equal (thread1,thread2) //pthread_equal compares two thread IDs. If the two IDs are different 0 is returned, otherwise a non-zero value is returned.
pthread_once (once_control, init_routine) 

pthread_self()返回指定线程的thread ID。

因为thread是pthread_t 的对象,对象内含一个线程id,所以我们无法用==来比较两个pthread_t 的对象,可以调用pthread_equa(thread1, thread2) 来比较两个线程。

pthread_once executes the init_routine exactly once in a process. The first call to this routine by any thread in the process executes the given init_routine, without parameters. Any subsequent call will have no effect.

The once_control parameter is a synchronization control structure that requires initialization prior to calling pthread_once. For example:pthread_once_t once_control = PTHREAD_ONCE_INIT;

所以这个once_control到底是什么含义?

Mutex Variables

pthread_mutex_lock (mutex)
pthread_mutex_trylock (mutex)
pthread_mutex_unlock (mutex) 

The pthread_mutex_lock() routine is used by a thread to acquire a lock on the specified mutex variable. If the mutex is already locked by another thread, this call will block the calling thread until the mutex is unlocked.

pthread_mutex_trylock() will attempt to lock a mutex. However, if the mutex is already locked, the routine will return immediately with a "busy" error code. This routine may be useful in preventing deadlock conditions, as in a priority-inversion situation.

pthread_mutex_unlock() will unlock a mutex if called by the owning thread. Calling this routine is required after a thread has completed its use of protected data if other threads are to acquire the mutex for their work with the protected data. An error will be returned if:

  1. If the mutex was already unlocked
  2. If the mutex is owned by another thread

Questions

  1. What is race condition?

    A race condition is any case where the results can be different depending on the order that processes arrive or are scheduled or depending on the order that specific competing instructions are executed.

    就是程序正确性正确性依赖于调度顺序,调度顺序不同或者进程到达时间不同,可能有不同的运行结果。

mutex使用步骤

Create and initialize a mutex variable

​ Several threads attempt to lock the mutex
​ Only one succeeds and that thread owns the mutex
​ The owner thread performs some set of actions
​ The owner unlocks the mutex
​ Another thread acquires the mutex and repeats the process
​ Finally the mutex is destroyed

Create and Destroy Mutexes

pthread_mutex_init (mutex,attr)//用来初始化pthread_mutex_t对象,这是动态方式。也可以静态方式:pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER

pthread_mutex_destroy (mutex)

pthread_mutexattr_init (attr)

pthread_mutexattr_destroy (attr) 

attr对象表示mutex的一些属性,Pthreads标准定义了3个可选的属性:

Protocol: Specifies the protocol used to prevent priority inversions for a mutex.

​ Prioceiling: Specifies the priority ceiling of a mutex.
​ Process-shared: Specifies the process sharing of a mutex.
但是不是所有的实现都提供这3种属性。

Condition Variables

条件变量是另一种线程间同步的方式,mutexes通过控制线程对数据的访问实现同步,条件变量允许基于数据的真实值进行同步。

条件变量可以避免轮询。

条件变量总是和mutex lock一起使用。

阅读材料

Pthread教程

点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Wesley13 Wesley13
4年前
java多线程面试题_线程并发面试题
1、什么是线程?线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速。比如,如果一个线程完成一个任务要100毫秒,那么用十个线程完成改任务只需10毫秒。2、线程和进程有什么区别?线程是进程的子集,一个进程可以有很多线程,每条线程并行执行不
隔壁老王 隔壁老王
4年前
我的python多线程和多进程
线程存在空闲frommultiprocessing.dummyimportPoolfrommultiprocessing.dummyimportPoolasThreadPoolpoolThreadPool(20)pool.map(job_worker,result_cursor)
Stella981 Stella981
4年前
Gevent简明教程
1、前述进程线程协程异步并发编程(不是并行)目前有四种方式:多进程、多线程、协程和异步。多进程编程在python中有类似C的os.fork,更高层封装的有multiprocessing标准库多线程编程python中有Thread和threading异步编程在linux下主要有三种实现selec
Wesley13 Wesley13
4年前
JUC并发编程之:简单概述(一)
JUC并发编程之:简单概述(一)内容概述:·进程和线程、并发和并行、同步和异步概念·如何查看和关闭进程·Java线程常用的类和方法一、概念:一、进程与线程1·进程·程序由指令和数据组成,但这些指令要运行,数据要读写,就
Stella981 Stella981
4年前
SpringBatch系列之并发并行能力
1、概要大多数任务都能够通过简单的单进程单线程任务处理好,但是还有一大部分现实诉求无法满足。批量任务存在两种并行模式单进程、多线程多进程我们也可以细分为多线程Step(单进程)MultithreadStep并行Step(单进程)ParallelSteps对Step进行远程分块(
Wesley13 Wesley13
4年前
Go 并发
Go并发并发指的是同时处理多个任务的能力。并行指的是并行处理多个任务的能力。并行不一定加快运行速度,因为并行组件之间可能需要互相通信。Go中使用协程,信道来处理并发。协程Go中主要通过协程实现并发。协程是与其他函数或方法一起并发运行的函数或方法,协程可以看作是轻量级线程,但是创建成本更小,我们经常
Stella981 Stella981
4年前
OpenMP 旅行商问题,静态调度
▶《并行程序设计导论》第六章中讨论了旅行商,分别使用了MPI,Pthreads,OpenMP来进行实现,这里是OpenMP的代码,分为静态调度(每个线程分分配等量的搜索人物)和动态调度(每个线程分配不等量的任务,每当有线程完成自己的任务后,向其他线程请求新的子任务)●静态调度代码1include<stdio.h
Wesley13 Wesley13
4年前
Selenium2 Python 自动化测试实战学习笔记(八)
Python多线程分布式和并行是完全不同的概念,分布式只负责将一个测试脚本可调用不同的远程环境来执行;并行强调“同时”的概念,它可以借助多线程或多进程技术并行来执行脚本技术。10.1单进程的时代        在单线程的时代,当处理器要处理多个任务时,必须要对这些任务排一下执行顺序并按照这个顺序