Linux的线程 | 骤雨落宿命敲
Cobb 729 1

创建线程

Linux的线程 | 骤雨落宿命敲

知识储备: 编译时链接库: g++ XXX.cc -lpthread
库的位置在: /usr/lib64/libpthread.so

在Linux的内核上看进程和线程,实际上没有区别,因为调用的都是clone函数。 进程和线程都有PCB,线程没有自己独立的用户空间,共享父进程的用户空间。 线程间共享数据是非常方便的 .data .heap

问题:Linux内核调度的是进程还是线程? 进程和线程都是通过clone创建的,都有PCB,都可以受内核调度,统称Task任务。

多线程相对多进程的优势:任务调度时,上下文切换做的事情少很多,例如切换地址空间,多线程调度时不一定要切换,而进程调度时,一定会切换的。


// getpid()
// 线程的入口函数
void* th1(void *arg)
{
    // pthread_self打印线程的tid
    cout << "child tid:" << pthread_self() << endl;
    sleep(3);
    cout << "child done..." << endl;
    return NULL;
}

// 当前进程 => 主线程(入口函数是main)
int main()
{
    cout << "main tid:" << pthread_self() << endl;

    pthread_t tid;
    // 创建新线程并直接启动线程
    pthread_create(&tid, NULL, th1, NULL);

    // 等待子线程tid执行完
    pthread_join(tid, NULL); // fork wait

    cout << "main done..." << endl;
}

pthread_detach 分离线程

小启发: 线程池中用到的:先创建对象,再启动线程。 先创建线程对象make_shared(this), 再启动线程(迭代器遍历,创建线程pthread_create) 这样就可以,有多少个对象就启动多少个线程了

面试问题:在线程里面调用exit(0),请问是退出线程了,还是整个进程全部退出了? 整个进程都结束了


// getpid()
// 线程的入口函数
void* th1(void *arg)
{
    // pthread_self打印线程的tid
    cout << "child tid:" << pthread_self() << endl;
    sleep(3);
    cout << "child done..." << endl;

    // 问题:在线程里面调用exit(0),请问是退出线程了,还是整个进程全部退出了?
    // 整个进程都结束了
    exit(0); 

    return NULL;
}

// 当前进程 => 主线程(入口函数是main)
int main()
{
    cout << "main tid:" << pthread_self() << endl;

    pthread_t tid;
    // 创建新线程并直接启动线程
    pthread_create(&tid, NULL, th1, NULL);

    // 设置分离线程,线程执行完,可以自动回收资源
    pthread_detach(tid);

    // 等待子线程tid执行完
    // pthread_join(tid, NULL); // fork wait
    sleep(20);

    cout << "main done..." << endl;
}

mutex 互斥锁

Linux的线程 | 骤雨落宿命敲

知识储备: void* th(void*) { for (int i = 0; i < 1000; i++) { count++; } }

for循环执行指令; 线程1: mov exa, count add eax, 1 mov count, eax

线程2: mov exa, count add eax, 1 mov count, eax

分析:这几行指令,完成++操作,不是原子操作 会产生竞态条件,临界区代码段无法保证原子操作。

竞态条件:程序在线程运行下执行,随着线程调度的顺序不同,而出现不同的结果。 临界区代码段:产生竞态条件这个for循环就是临界区了。

利用智能指针思想,出作用域自动析构,封装一下互斥锁,进行使用

lock.h

#ifndef _LOCK_H_
#define _LOCK_H_

#include <pthread.h>

// 封装了互斥锁的常用操作
class Mutex
{
public:
    Mutex();
    ~Mutex();
    void Lock();
    void UnLock();
private:
    pthread_mutex_t mutex_;
};

// 类似智能指针原理,通过构造和析构自动加解锁, 使用同一把锁
class MutexLock
{
public:
    MutexLock(Mutex &mutex);
    ~MutexLock();
private:
    Mutex &mutex_;
};

#endif

lock.cc


#include "lock.h"

Mutex::Mutex()
{
    pthread_mutex_init(&mutex_, NULL);
}

Mutex::~Mutex()
{
    pthread_mutex_destroy(&mutex_);
}

void Mutex::Lock()
{
    pthread_mutex_lock(&mutex_);
}

void Mutex::UnLock()
{
    pthread_mutex_unlock(&mutex_);
}

MutexLock::MutexLock(Mutex &mutex) :mutex_(mutex)
{
    mutex_.Lock();
}

MutexLock::~MutexLock()
{
    mutex_.UnLock();
}

main.cc


#include <pthread.h>
#include <iostream>
#include <vector>
#include "lock.h"

using namespace std;

// 全局的互斥锁
// pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
Mutex mutex;

// .data的数据是当前进程所有线程所共享的
int count = 0; 
// 线程的入口函数
void* th(void*)
{
    for (int i = 0; i < 1000; i++)
    {
        // 通过互斥锁保证了临界区代码段 - 原子操作
        // 加互斥锁
        //pthread_mutex_lock(&mutex);
        {
            MutexLock lock(mutex);
            count++;
        }
        // 释放互斥锁
        //pthread_mutex_unlock(&mutex);
    }
}
int main()
{
    // 创建10个线程,每个线程对count++ 1千次
    vector<pthread_t> tids;
    tids.resize(10);
    for (int i = 0; i < 10; i++)
    {
        pthread_create(&tids[i], NULL, th, NULL);
    }

    // 主线程要等待所有子线程执行完
    for (int i = 0; i < 10; i++)
    {
        pthread_join(tids[i], NULL);
    }

    // 释放互斥锁的资源
    //pthread_mutex_destroy(&mutex);

    cout << "count:" << count << endl;
}

窗口售票问题

了解一下函数sem_wait/sem_post的用法

亲爱的朋友,别灰心,下滑到main函数。


#include <iostream>
#include <pthread.h>
#include <string>
//#include "lock.h"
#include <unistd.h>

using namespace std;


int tickets = 100; // 100张车票
//Mutex mutex;
pthread_mutex_t mutex;

void* sell_ticket(void *arg)
{
    // 0x2 -> 2
    int no = (int)arg;

    while (tickets > 0)
    {
        {
            //MutexLock lock(mutex);
            //sem_wait(&sem); 二元信号量在这里也可以做线程互斥操作,信号量初始化资源计数为1
            pthread_mutex_lock(&mutex);
            if (tickets > 0)
            {
                cout << "no:" << no << " sell " << tickets << endl;
                tickets--;
            }
            pthread_mutex_unlock(&mutex);
            //sem_post(&sem);
        }
        usleep(10000);
    }
}

/*
模拟三个窗口同时卖票,要求:
1.同一张车票不能重复卖
2.车票不能跳着卖
3.不能卖0 -1张车票
4.每个窗口都要卖票(睡眠100毫秒)

第xxx个窗口卖 100票
第xxx个窗口卖 99票
第xxx个窗口卖 98票
第xxx个窗口卖 97票
*/
int main()
{
    pthread_t tids[3];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(&tids[i], NULL, sell_ticket, (void*)(i+1)); // 2 => 0x2
    }

    for (int i = 0; i < 3; i++)
    {
        pthread_join(tids[i], NULL);
    }
}

semaphore 信号量



#include <pthread.h>
#include <iostream>
#include <queue>
#include <semaphore.h> // 信号量
#include <unistd.h>

/*
semaphore:信号量  => 线程通信
看作引用计数不设上限的资源

mutex:互斥锁 => 线程互斥
引用计数为0或者1的资源
*/
sem_t sem;
pthread_mutex_t mutex;

using namespace std;

// 生产者往这里面生产数据,消费者从这里面消费数据
queue<int> que;

// 生产者线程
void* producer(void* arg)
{
    for (int i = 1; i <= 10; i++ )
    {
        pthread_mutex_lock(&mutex);
        que.push(i);
        cout << "produce: " << i << endl;
        pthread_mutex_unlock(&mutex);
        sem_post(&sem); // 给sem资源计数+1
    }
    return NULL;
}

// 消费者线程
void* consumer(void* arg)
{
    for (int i = 1; i <= 10; i++ )
    {
        sem_wait(&sem); // sem资源为0,阻塞线程;sem大于0,往下执行,sem资源计数-1
        pthread_mutex_lock(&mutex);
        int no = que.front();
        que.pop();
        cout << "consume: " << no << endl;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main()
{
    // 信号量初始化
    sem_init(&sem, false, 0);
    pthread_mutex_init(&mutex, NULL);

    pthread_t t1, t2;
    pthread_create(&t1, NULL, producer, NULL);
    pthread_create(&t2, NULL, consumer, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    // 销毁信号量
    sem_destroy(&sem);
    pthread_mutex_destroy(&mutex);
}

cond 条件变量

经典并发同步模式:生产者-消费者设计模式


#include <pthread.h>
#include <iostream>
#include <queue>
#include <semaphore.h> // 信号量
#include <unistd.h>

/*
semaphore:信号量  => 线程通信
看作引用计数不设上限的资源

mutex:互斥锁 => 线程互斥
引用计数为0或者1的资源

线程通信 => 条件变量
*/
pthread_mutex_t mutex;
pthread_cond_t cond;

using namespace std;

// 生产者往这里面生产数据,消费者从这里面消费数据
queue<int> que;

// 生产者线程
void* producer(void* arg)
{
    for (int i = 1; i <= 10; i++ )
    {
        pthread_mutex_lock(&mutex);

        while (que.size() > 0)
        {
            // 条件变量不成立,线程阻塞,把mutex释放掉
            // 如果consumer使cond成立,该线程唤醒,抢到mutex锁,再继续往下执行
            pthread_cond_wait(&cond, &mutex);
        }

        que.push(i);
        cout << "produce: " << i << endl;
        // 通知消费者赶紧消费
        pthread_cond_signal(&cond);

        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

// 消费者线程
void* consumer(void* arg)
{
    for (int i = 1; i <= 10; i++ )
    {
        pthread_mutex_lock(&mutex);

        while (que.size() == 0)
        {
            // 条件变量不成立,线程阻塞,把mutex释放掉
            // 如果producer使cond成立,该线程唤醒,抢到mutex锁,再继续往下执行
            pthread_cond_wait(&cond, &mutex);
        }

        int no = que.front();
        que.pop();
        cout << "consume: " << no << endl;
        // 消费完了马上通知producer线程开始生产
        pthread_cond_signal(&cond);

        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main()
{
    // 信号量初始化
    pthread_cond_init(&cond, NULL);
    pthread_mutex_init(&mutex, NULL);

    pthread_t t1, t2;
    pthread_create(&t1, NULL, producer, NULL);
    pthread_create(&t2, NULL, consumer, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    // 销毁信号量
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);
}

同步生产和消费(生产一个立即消费)


#include <pthread.h>
#include <iostream>
#include <queue>
#include <semaphore.h> // 信号量
#include <unistd.h>

/*
semaphore:信号量  => 线程通信
看作引用计数不设上限的资源

mutex:互斥锁 => 线程互斥
引用计数为0或者1的资源

线程通信 => 条件变量
*/
pthread_mutex_t mutex;
pthread_cond_t cond;

using namespace std;

// 生产者往这里面生产数据,消费者从这里面消费数据
queue<int> que;

// 生产者线程
void* producer(void* arg)
{
    for (int i = 1; i <= 10; i++ )
    {
        pthread_mutex_lock(&mutex);

        while (que.size() > 0)
        {
            pthread_cond_wait(&cond, &mutex);
        }

        que.push(i);
        cout << "produce: " << i << endl;
        // 通知消费者赶紧消费
        pthread_cond_broadcast(&cond);

        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

// 消费者线程
void* consumer(void* arg)
{
    for (int i = 1; i <= 5; i++ )
    {
        pthread_mutex_lock(&mutex);

        while (que.size() == 0)
        {
            // 条件变量不成立,线程阻塞,把mutex释放掉
            // 如果producer使cond成立,该线程唤醒,抢到mutex锁,再继续往下执行
            pthread_cond_wait(&cond, &mutex);
        }

        int no = que.front();
        que.pop();
        cout << "consume: " << no << endl;
        // 消费完了马上通知producer线程开始生产
        pthread_cond_broadcast(&cond);

        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main()
{
    // 信号量初始化
    pthread_cond_init(&cond, NULL);
    pthread_mutex_init(&mutex, NULL);

    pthread_t t1, t2, t3;
    pthread_create(&t1, NULL, producer, NULL);
    pthread_create(&t2, NULL, consumer, NULL);
    pthread_create(&t3, NULL, consumer, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);

    // 销毁信号量
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);
}
评论区

索引目录