学习future.then这个优雅的粘合剂与协程搭配形成框架的过程

future-state

通常情况下,一个promise仅返回一次future,promise对象会在未来异步操作时set值或者异常,future可以在恰当时候进行get操作。 能达成这样的协作,是因为这对promise/future一起维护了一个数据结构future-statetemplate <typename... T>struct future_state {};,看起来是可以同时存放多个值。 还有个无值的特化 template<> struct future_state<>{},内部进行优化删掉了保存数据的成员。

future-state被设计保存泛型值和异常。对值类型要求符合is_nothrow_copy_constructible, is_nothrow_move_constructible, is_nothrow_destructible ,我们在定义类时应该养成将这big three 都实现为noexcept的习惯,标准库的很多函数会根据类的对应函数是否是noexcept,为实现强异常安全而选择更耗时的操作。
future-state内部采用union any{}封装了值和异常。这里就要了解下c++11后增强的unrestricted union。c++11前union的成员只能是POD类型。现在允许成员变量拥有non-trivial default/copy/move ctor, copy/move assignment, desctor,不过union将禁用对应自己的ctor、assignment、desctor,需要实现者明确实现。

union test_unrestricted_union_a {  
    int i;
    string s; 
    test_unrestricted_union_a() { new(&s)string("hello"); }
    ~test_unrestricted_union_a() { s.~string(); }
    test_unrestricted_union_a(string str) {
        new(&s)string(std::move(str));
    }
};

int main(int argc, const char * argv[]) {  
    test_unrestricted_union_a t, t2(" world");
    cout << t.s << t2.s << endl; // hello world
    return 0;
}

相比上面的代码,future-state内部增加了enum class来表示any内部的数据类型。

template <typename... T>  
struct future_state {  
    enum class state {
         invalid,
         future,
         result,
         exception,
    } _state = state::future;
    union any {
        any() {}
        ~any() {}
        std::tuple<T...> value;
        std::exception_ptr ex;
    } _u;
    future_state() noexcept {}
    future_state(future_state&& x) noexcept
            : _state(x._state) {
        switch (_state) {
        case state::future:
            break;
        case state::result:
            new (&_u.value) std::tuple<T...>(std::move(x._u.value));
            x._u.value.~tuple();
            break;
        case state::exception:
            new (&_u.ex) std::exception_ptr(std::move(x._u.ex));
            x._u.ex.~exception_ptr();
            break;
        case state::invalid:
            break;
        default:
            abort();
        }
        x._state = state::invalid;
    }

future对象生命周期内可能处于四种状态:无效、正常/待定、已决、异常。future由于特性不能复制,只能move,被move后的对象是无效状态。

有点值得留意的地方,因为future与then的结合,流程上充满了右值和move,future-state 利用c++11后增加的成员函数引用修饰符对相应函数进行重载overload优化。

std::tuple<T...> get() &&; // *this为右值时调用,进行优化、move操作  
std::tuple<T...> get() const&; // *this 为左值时调用  

在future.then框架下的代码大量如some_func().then([]{}).then(...).then(...)..,上一个代码段返回的future是下一个then的输入,一直不断通过右值移动传递到末端。

class task {  
public:  
    virtual ~task() noexcept {}
    virtual void run() noexcept = 0;
};
template <typename Func>  
class lambda_task final : public task {  
    Func _func;
public:  
    lambda_task(const Func& func) : _func(func) {}
    lambda_task(Func&& func) : _func(std::move(func)) {}
    virtual void run() noexcept override { _func(); }
};
template <typename Func, typename... T>  
struct continuation final : task {  
    continuation(Func&& func, future_state<T...>&& state) : _state(std::move(state)), _func(std::move(func)) {}
    continuation(Func&& func) : _func(std::move(func)) {}
    virtual void run() noexcept override {
        _func(std::move(_state));
    }
    future_state<T...> _state;
    Func _func;
};

task是个抽象基类,promise会携带一个task,当promise被set值或异常后被安排执行。

future-state通常只保存在这下面3个位置中的一个:

  • promise生存期间,并且是在.then()调用之前。
  • 在.then被调用后的task内
  • 在一个没通过promise得到的future内,比如直接用makereadyfuture();promise析构后,转移到future内。

future-state类主要的操作就是对union any进行操作,set/get值/异常、获取状态、判断是否完成、转移内部数据到promise.

promise

promise保存值或异常以便它产生的future可以在将来异步获取到。与future,future-state一样,promise不支持值拷贝,只能移动。

template <typename... T>  
class promise {  
    enum class urgent { no, yes };
    future<T...>* _future = nullptr;
    future_state<T...> _local_state;
    future_state<T...>* _state;
    std::unique_ptr<task> _task;
}

promise关键成员变量有4个。枚举类urgent用来区分处理不同future-state时的处理方式:正常值时添加到协程消息队列的尾端、异常时添加到协程消息队列的队首。
_future为此promise关联future. 通过get_future()获取,该函数仅能调用一次。 _state默认为_local_state的地址。如果前者与后者地址不同,则有效的是_state。 而2者不等时发生在promise的schedule方法,它可以接收外部函数对象,生成一个continuation,并绑定该task的future-state. promise析构或主动放弃与future的关联时,state所指future-state将被move到future结构内的_local_state
当promise被执行set
value/set_exception后内部future-state被修改时,如果_task有效,将安排task进入协程消息队列。

futurize

futurize工具将一个类型转化为future类型。用法分3类:生成对应类型的异常future;将函数、参数执行的返回值转化为对应类型的future,如果执行出现异常,则返回异常future;直接将值转为对应类型的future.如果类型本身就是future,则直接move。

future

代表一个现在可能还未计算出的值。future内的方法可以查询future-state,更重要的是有各种方法在当前future完成后安排一个continuation继续执行下去(schedule方法)。

template<typename... T>  
class future {  
    promise<T...>* _promise;
    future_state<T...> _local_state; 
}

future成员只有关联的_promise或当没有关联的promise时才有效的_local_state

template <typename Func>  
    void schedule(Func&& func) {
        if (state()->available()) {
            ::seastar::schedule(std::make_unique<continuation<Func, T...>>(std::move(func), std::move(*state())));
        } else {
            assert(_promise);
            _promise->schedule(std::move(func));
            _promise->_future = nullptr;
            _promise = nullptr;
        }
    }

schedule 方法根据当前future是否已经完成,如已完成则在协程消息队列里添加一个参数func和当前future-state构成的continuation;未完成则断开与promise的关联,通过promise的schedule方法转移future-state到continuation内。
get,wait涉及协程调度,后面章节讲述。

    template <typename Func, typename Result = futurize_t<std::result_of_t<Func(T&&...)>>>
    GCC6_CONCEPT( requires ::seastar::CanApply<Func, T...> )
    Result
    then(Func&& func) noexcept {
        using futurator = futurize<std::result_of_t<Func(T&&...)>>;
        if (available() && !need_preempt()) {
            if (failed()) {
                return futurator::make_exception_future(get_available_state().get_exception());
            } else {
                return futurator::apply(std::forward<Func>(func), get_available_state().get_value());
            }
        }
        typename futurator::promise_type pr;
        auto fut = pr.get_future();
        try {
            schedule([pr = std::move(pr), func = std::forward<Func>(func)] (auto&& state) mutable {
                if (state.failed()) {
                    pr.set_exception(std::move(state).get_exception());
                } else {
                    futurator::apply(std::forward<Func>(func), std::move(state).get_value()).forward_to(std::move(pr));
                }
            });
        } catch (...) {
            // catch possible std::bad_alloc in schedule() above
            // nothing can be done about it, we cannot break future chain by returning
            // ready future while 'this' future is not ready
            abort();
        }
        return fut;
    }

GCC6_CONCEPT利用还不是标准的concept检验传入的func和当前类型T...是否可以调用。 then安排func在当前future完成后执行。当前future完成后的值将会是func的参数。func函数的返回值

void forward_to(promise<T...>&& pr) noexcept {  
        if (state()->available()) {
            state()->forward_to(pr);
        } else {
            _promise->_future = nullptr;
            *_promise = std::move(pr);
            _promise = nullptr;
        }
    }

将当前future内的future-state转移到promise内,当状态是已完成时,直接将state内容转移到pr,随着pr析构state转回pr对应的future;当状态未完成时,比如用户传入then内的函数仍然返回一个未决的future,就将当前future与上一个promise解除绑定,并与pr进行绑定。当前future就直接扔弃了。