计算机系统应用教程网站

网站首页 > 技术文章 正文

一文读懂C++ 异步编程 c++异步调用

btikc 2024-10-17 08:43:45 技术文章 13 ℃ 0 评论

一、异步编程简介

说到异步很多人可能会联想到线程编程,即那种直接操作线程的生命周期来管理一个任务。而本文要介绍的为不直接操作线程的生命周期而是管理线程上运行的任务来达到异步编程的目的。

在C++ 中涉及到异步的编程的主要是:

  1. future
  2. async
  3. promise
  4. packaged_task

下面本文将从简介、使用、源码角度、4者之间的关系逐一介绍。

二、future

2.1 简介

std::future提供了一种访问异步操作结果的机制。从字面意思来理解,它表示未来,这个名字非常贴切,因为一个异步操作我们是不可能马上就获取操作结果的,只能在未来某个时候获取,但是我们可以以同步等待的方式来获取结果,可以通过查询future的状态(future_status)来获取异步操作的结果。future_status有三种状态:

  1. deferred:异步操作还没开始
  2. ready:异步操作已经完成
  3. timeout:异步操作超时

获取future结果有三种方式:

  1. get:等待异步操作结束并返回结果
  2. wait:只是等待异步操作完成,没有返回值
  3. wait_for:超时等待返回结果。

2.1 使用

C++标准库使用std::future为一次性事件建模,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个future对象来代表这个事件。异步调用往往不知道何时返回,但是如果异步调用的过程需要同步,或者说后一个异步调用需要使用前一个异步调用的结果。这个时候就要用到future。线程可以周期性的在这个future上等待一小段时间,检查future是否已经ready,如果没有,该线程可以先去做另一个任务,一旦future就绪,该future就无法复位(无法再次使用这个future等待这个事件),所以future代表的是一次性事件。

std::future是一个类模板(class template),其对象存储未来的值,从一个异步调用的角度来说,future更像是执行函数的返回值,其模板参数就是期待返回的类型。模板源码如下:

template<typename ResultType>
class future
{
public:
  future() noexcept;
  future(future&&) noexcept;
  future& operator=(future&&) noexcept;
  ~future();
  
  future(future const&) = delete;
  future& operator=(future const&) = delete;
 
 
  bool valid() const noexcept;
  
  ResultType get();
  shared_future<ResultType> share();
 
  void wait();
 
  template<typename Rep,typename Period>
  future_status wait_for(
      std::chrono::duration<Rep,Period> const& relative_time);
 
  template<typename Clock,typename Duration>
  future_status wait_until(
      std::chrono::time_point<Clock,Duration> const& absolute_time);
};

成员函数

说明

构造函数:

  1. future() noexcept
  2. future(future const&) = delete;
  3. future(future&&) noexcept;
  1. 不带参数的默认构造函数,此对象没有共享状态,因此它是无效的,但是可以通过移动赋值的方式将一个有效的future值赋值给它;
  2. 禁用拷贝构造;
  3. 支持移动构造

析构函数


operator=

移动future对象 (公开成员函数) (1).禁用拷贝赋值。

(2).支持移动赋值:如果在调用之前,此对象是有效的(即它已经访问共享状态),则将其与先前已关联的共享状态解除关联。如果它是与先前共享状态关联的唯一对象,则先前的共享状态也会被销毁

share

从 *this 转移共享状态给 shared_future 并返回它 (公开成员函数)

get()

返回结果 (公开成员函数)

  1. 当共享状态就绪时,返回存储在共享状态中的值(或抛出异常)。
  2. 如果共享状态尚未就绪(即提供者尚未设置其值或异常),则该函数将阻塞调用的线程直到就绪。
  3. 当共享状态就绪后,则该函数将取消阻塞并返回(或抛出)释放其共享状态,这使得future对象不再有效,因此对于每一个future共享状态,该函数最多应被调用一次。(4).std::future<void>::get()不返回任何值,但仍等待共享状态就绪并释放它。
  4. 共享状态是作为原子操作(atomic operation)被访问

valid()

检查 future 是否拥有共享状态(公开成员函数)

wait()

等待结果变得可用

  1. 等待共享状态就绪。
  2. 如果共享状态尚未就绪(即提供者尚未设置其值或异常),则该函数将阻塞调用的线程直到就绪。
  3. 当共享状态就绪后,则该函数将取消阻塞并void返回

wait_for()

等待结果,如果在指定的超时间隔后仍然无法得到结果,则返回。

wait_until()

等待结果,如果在已经到达指定的时间点时仍然无法得到结果,则返回。

在C++中,std::future 通常与 std::async 、 std::packaged_task、std::promise 等异步机制一起使用,以便在后台执行任务。

示例一:

#include <iostream>
#include <future>
#include <thread>
 
int main()
{
    // 来自 packaged_task 的 future
    std::cout << "主线程 tid = " << std::this_thread::get_id() << std::endl;
    // 来自 packaged_task 的 future
    std::packaged_task<int()> task([](){
        std::cout << "\npackaged tid = " << std::this_thread::get_id() << std::endl;
        return 7;
    }); // 包装函数
    std::future<int> f1 = task.get_future();  // 获取 future
    std::thread(std::move(task)).detach(); // 在线程上运行
    
    
    // 来自 async() 的 future
    std::future<int> f2 = std::async(std::launch::async, [](){
        std::cout << "\nstd::async tid = " << std::this_thread::get_id() << std::endl;
        return 8;
    });
    
    // 来自 promise 的 future
    std::promise<int> p;
    std::future<int> f3 = p.get_future();
    std::thread( [&p]{
        std::cout << "std::promise tid = " << std::this_thread::get_id() << std::endl;
        p.set_value_at_thread_exit(9);
    }).detach();
    
    std::cout << "\nWaiting..." << std::flush;
    f1.wait();
    f2.wait();
    f3.wait();
    std::cout << "Done!\nResults are: "
              << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
    
    return 0;
}

示例一运行结果:

示例二:使用4个线程计算1+2+...+100 的结果,要求按如下顺序执行:

线程1 运行1+2+。。。+25

线程2 运行26+27+。。。+50;

线程3 运行50+51+。。。+75;

线程4 运行76+77+。。。+100。

最后将4个线程执行的结果累加输出。

std::future<int> f1 = std::async(std::launch::async, [](){
    std::cout << "\nstd::async f1 tid = " << std::this_thread::get_id() << std::endl;
    int result = 0;
    for(int i = 0; i <= 25; i ++) {
        result += i;
    }
    return result;
});

std::future<int> f2 = std::async(std::launch::async, [](){
    std::cout << "\nstd::async f2 tid = " << std::this_thread::get_id() << std::endl;
    int result = 0;
    for(int i = 26; i <= 50; i ++) {
        result += i;
    }
    return result;
});

std::future<int> f3 = std::async(std::launch::async, [](){
    std::cout << "\nstd::async f3 tid = " << std::this_thread::get_id() << std::endl;
    int result = 0;
    for(int i = 51; i <= 75; i ++) {
        result += i;
    }
    return result;
});

std::future<int> f4 = std::async(std::launch::async, [](){
    std::cout << "\nstd::async f4 tid = " << std::this_thread::get_id() << std::endl;
    int result = 0;
    for(int i = 76; i <= 100; i ++) {
        result += i;
    }
    return result;
});

int f1_result = f1.get();
int f2_result = f2.get();
int f3_result = f3.get();
int f4_result = f4.get();
int all_result = f1_result + f2_result + f3_result + f4_result;
std::cout<< " f1 result = "<<f1_result << " f2 result = "<<f2_result << " f3 result = " <<f3_result << " f4 result = " << f4_result <<
 " 总和 = "  <<all_result <<std::endl;

示例二结果:

2.3 异常处理

当异步操作抛出异常时,我们可以使用std::future::get来获取异常信息。例如:

try {  
        fut.get(); // 获取结果并处理异常 
     } 
catch (const std::exception& e) {  
    // 处理异常情况  
    }

通过调用fut.get()来获取异步操作的结果。如果异步操作抛出了异常,那么这个异常将被传递给调用get()的线程,我们可以通过捕获异常来处理这种情况。

三、async

3.1 简介

std::async 是 C++11 标准库中用于异步执行的函数,会返回一个 std::future 对象,以获取函数的执行结果。可用其在新线程中执行函数,也可以在当前线程中执行。std::async 的函数声明形式通常如下

template<class Function, class... Args>
std::future<std::invoke_result_t<std::decay_t<Function>,
                                 std::decay_t<Args>...>>
    async(std::launch policy, Function&& f, Args&&... args);

template<class Function, class... Args>
std::future<std::invoke_result_t<std::decay_t<Function>,
                                 std::decay_t<Args>...>>
    async(Function&& f, Args&&... args);
// 等同于以 std::launch::async | std::launch::deferred 策略调用上面的函数

函数说明:

  1. 函数参数:
  2. policy:以何种策略调用可调用对象 f,可以为以下三种:
  3. std::launch::async:以异步的方式调用 f,即必须另外开启专属的线程,在其上运行 f。
  4. std::launch::deferred:在返回的 std::future 上调用了非定时等待函数,即 wait 或者 get 时,才执行 f。
  5. std::launch::async | std::launch::deferred:可能异步运行 f 或者直到调用 wait 或者 get 时才运行,取决于系统的负载,无法人为控制。
  6. f:要调用的可调用对象。
  7. args:传递给 f 的参数。
  8. 函数返回值:

返回 std::future 对象,在其上调用 wait 可以等待 f 完成,调用 get 可以等待并获取 f 的返回值。 -

  1. 函数作用:

以异步或者同步的调用可调用对象 f,并可以通过返回的 std::future 对象获取 f 的返回值

注意:

如果没有任何对象接收 std::sync 的返回值,即使指定了 std::launch::async 策略,std::future 的析构函数也会阻塞直到整个调用完成

示例:

// 临时量即 std::future 的析构函数等待睡眠完成。
std::async(std::launch::async, []{ std::this_thread::sleep_for(10); });
// 在睡眠完成之前,本行代码不会得到运行
std::async(std::launch::async, []{ printf("xx"); });

3.2 使用

示例一:函数作为参数

int async_fuc1(int n, int& b)
{
    for (int i = 0; i < 3; ++i) {
        printf("async is running\n");
        ++n;
        ++b;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
    printf("n is %d, b is %d\n", n, b);
    return n + b;
}

void test_async1() {
    int n = 0;
    int b = 0;
    // n 按值传递,b 按引用传递
    auto f = std::async(std::launch::async, async_fuc1, n, std::ref(b));
    int ret = f.get();
    printf("196----result is %d\n", ret);
}

int main() {
    test_async1()
    return 0;
}

示例一结果:

示例2 lamba 作为参数

/**
 * 测试async 使用lamba 作为参数
 */
void test_async2() {
    auto f = std::async([]() -> int {
        for (int i = 0; i < 3; ++i) {
            printf("async is running\n");
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
        return 100;
    });
    int ret = f.get();
    printf("result is %d\n", ret);
}

示例二结果

std:async 使用的场景非常丰富其他参数就不一一举例了,如果遇到不能直接作为参数的对象可通过std:function 进行包装然后将std:function 作为std:async 的参数即可。如下:

示例3 std::function 作为参数
std::function<int(void)> func = []() -> int {
        for (int i = 0; i < 3; ++i) {
            printf("async is running\n");
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
        return 100;
    };
    auto f = std::async(func);
    int ret = f.get();
    printf("result is %d\n", ret);

3.3 异常处理

// 抛出异常
void task1() {
    throw std::runtime_error("An error occurred in task1()");
}
 
int main() {
    try {
        // 使用 std::async 启动一个异步任务
        auto future1 = std::async(std::launch::async, task1);
 
        // 等待异步任务的完成并获取结果
        future1.get(); // 这里会抛出异常
    } catch (const std::exception& e) {
        std::cerr << "Caught exception: " << e.what() << std::endl;
    }
 
    return 0;
}
 
//输出内容:
Caught exception:An error occurred in task1()

四、promise

4.1 简介

std::promise 是 C++11 标准库中提供的异步编程工具之一,它通常与 std::future 一起使用,用来实现线程间的异步通信。promise 提供了一个承诺(promise),表示在某个时间点一定会有一个值或一个异常被设置。

promise 可以在一个线程中设置一个值,而另一个线程中可以通过 std::future 来访问这个值。通常的做法是,创建一个 promise 对象,然后通过 promise 对象获取一个 future 对象,将 future 对象传递到另一个线程中去,另一个线程将值或异常设置到 promise 对象中,随后原线程可以通过 future 对象来获取值或异常。

注意std::promise对象只能使用一次

4.2 使用

示例:在主函数(主线程)中声明 promise 及其对应的 future,在某个线程中传入 promise 并进行运算

// 异步求和
void Accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    accumulate_promise.set_value(sum);  // Notify future
}

// 线程1设置值
void Do_work1(std::promise<int> &barrier)
{
    //为了突出效果,可以使线程休眠5s
    std::this_thread::sleep_for(std::chrono::seconds(3));
    int iVal = 233;
    std::cout << "传入数据(int):" << iVal << std::endl;
    barrier.set_value(iVal); // 设置共享状态的值, 此处和线程new_work_thread保持同步.

}

// 线程2获取值
void Do_work2(std::future<int> &fun)
{
    //阻塞函数,直到收到相关联的std::promise对象传入的数据
    auto iVal = fun.get();    //iVal = 233
    std::cout << "收到数据(int):" << iVal << std::endl;
}

void test_promise()
{
    // 演示如何使用promise<int>在线程之间传输结果
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise<int> accumulate_promise; // 主线程生成一个 std::promise<int> 对象.
    std::future<int> accumulate_future = accumulate_promise.get_future(); // 和 future 关联.
    std::thread work_thread(Accumulate, numbers.begin(), numbers.end(),
                            std::move(accumulate_promise)); // 求和

    // future::get() will wait until the future has a valid result and retrieves it.
    // Calling wait() before get() is not needed
    // accumulate_future.wait();  // wait for result
    std::cout << "result=" << accumulate_future.get() << '\n'; // 异步获取最后的结果
    work_thread.join();  // wait for thread completion

    //演示如何使用promise<int>在线程之间表示状态。
    std::promise<int> barrier; // 生成一个 std::promise<int> 对象.
    std::future<int> barrier_future = barrier.get_future(); // 和 future 关联.
    std::thread new_work1_thread(Do_work1, std::ref(barrier));
    std::thread new_work2_thread(Do_work2, std::ref(barrier_future));

    new_work1_thread.join();
    new_work2_thread.join();
}

运行结果:

std:promise 与std:future 关系及源码分析见:https://zhuanlan.zhihu.com/p/672327290

4.3 异常处理

参考future 的异常处理

五、std:packge_task

参考:https://www.toutiao.com/article/7330567831136240140/?log_from=696a968e7e36a_1724656810109

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表