Yazılım Geliştirme: Coroutine Tabanlı Tüketici-Üretici İş Akışı

Eşyordamlar, eşzamansız kod yazmanın sezgisel ve yapılandırılmış bir yolunu sağlar. Eşzamansız işlemleri prosedür tarzında yazmanıza olanak tanır. Bunlar, eşzamansız programlamayı basitleştirmek için C++20’de sunulan bir özelliktir.

Duyuru

Rainer Grimm uzun yıllardır yazılım mimarı, ekip ve eğitim yöneticisi olarak çalışmaktadır. C++, Python ve Haskell programlama dilleri üzerine makaleler yazmaktan hoşlanıyor, aynı zamanda özel konferanslarda sık sık konuşmaktan da hoşlanıyor. Modern C++ adlı blogunda C++ tutkusunu yoğun bir şekilde ele alıyor.

Bu tek üretici-tek tüketici iş akışını anlamak kolay olmasa da rutin deneyler için iyi bir başlangıç ​​noktasıdır.

Mevcut mekanizmalar şu şekilde: std::async, std::packaged_task veya olaylar (std::condition_variable & std::mutex) bir iletişim kanalı kurarak görevin sonucu üzerinde iki veya daha fazla iş parçacığını senkronize edin. Bu iletişim kanalının iki ucu vardır:

  • std::promisesonucu veya istisnayı paylaşılan duruma yazan e
  • std::future (std::shared_future) – görevin (veya istisnanın) sonucunu bekleyen alıcı uç.

Bu önceden var olan mekanizmanın aksine, eşyordamlar doğrudan iş parçacıklarına veya diğer işletim sistemi senkronizasyon ilkellerine bağlı değildir. Daha ziyade, eşyordam kontrol nesnesine ve bunun etrafında oluşturulan durum makinesi mantığına dayanan saf bir yazılım soyutlamasıdırlar.

Eşyordamlar yığınsızdır: bu, kontrol nesnesinin yığın üzerinde oluşturulması gerektiği anlamına gelir. Tesadüfen, etrafta bir kütüphane ambalajı var promise_type (std::coroutine_handle<promise_type>), aslında bununla hiçbir ilgisi yok std::promise ortak noktaya sahiptir.

THE promise_type bir eşyordamın durum makinesindeki önceden tanımlanmış geçiş durumlarını tanımlayan bir arayüzdür (bir adaptasyon noktası).

Eşyordamlar çok yönlüdür ve eşzamansız bir mesaj akışını yönetmeniz gereken çeşitli senaryolarda kullanılabilir. Yaygın bir örnek, soket tabanlı iletişimdir.

Bugün Coroutine’leri başka bir örnekle anlatmaya çalışacağım: Tek Üretici-Tek Tüketici iş akışı.

Öncelikle eşyordam için sonuç tipini tanımlamamız gerekiyor:

class[[nodiscard]] AudioDataResult final
{
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
            
        class promise_type
        {
            ...
        };
};

Bu da iç kısmın etrafını saran bir şey: promise_type Adam. Çevreleyen sınıfı bu nitelikle süslüyoruz [[nodiscard]]sonuç türü eşyordam kontrol nesnesi olduğundan, askıya alma/devam ettirme işlemlerini yönetmek için istemci koduna geri döneriz.

@Note Sınıf yıkıcı, kaynakları (dinamik bellek) RAII yöntemiyle temizler, böylece eşyordam durumunu yönetmeniz gerekmiyorsa dönüş türü kesinlikle göz ardı edilebilir.

~AudioDataResult() { if(handle_) { handle_.destroy(); } }

Sonuç türü yalnızca taşımadır: kontrol nesnesinin kopyalanmasını önlemek için kopyalama işlemleri yasaktır.

// Make the result type move-only, 
//due to exclusive ownership over the handle

AudioDataResult(const AudioDataResult& ) = delete;
AudioDataResult& operator= (constAudioDataResult& ) = delete;

AudioDataResult(AudioDataResult&& other) noexcept:
handle_(std::exchange(other.handle_, nullptr))
{}

AudioDataResult& operator = (AudioDataResult&& other) noexcept
{
    using namespace std;
    AudioDataResult tmp =std::move(other);
    swap(*this, tmp);
    return *this;
}

Şimdi Promise_type arayüzünün kendisini tanımlayalım:

// Predefined interface that has to be specify 
//in order to implement
// coroutine's state-machine transitions
class promise_type
{
    
    public:
        using value_type = std::vector<int>;
        AudioDataResult get_return_object()
        {
            return AudioDataResult{ handle_type::from_promise(*this) };
        }
        std::suspend_never initial_suspend() noexcept { return{}; }
        std::suspend_always final_suspend() noexcept { return{}; }

        void return_void() {}
        void unhandled_exception()
        {
            std::rethrow_exception(std::current_exception());
        }

        // Generates the value and suspend the "producer"
        template <typename Data>
        requires std::convertible_to<std::decay_t<Data>, value_type>
        std::suspend_always yield_value(Data&& value)
        {
            data_ = std::forward<Data>(value);
            data_ready_.store(true, std::memory_order::relaxed);
            return {};
        }

    private:
        value_type data_;
        std::atomic<bool> data_ready_;
};//promise_type interface

THE promise_type Gerekli eşyordam altyapısını tanımlar. Aynı zamanda zorunda promise_type değerlerin çıktısını almak için bir jeneratör – “üretici” olarak hareket etmek isteyen tüm eşyordamlar için yield_valueyöntem genişletilebilir (co_yield ≡ co_await promise_.yield_value). Veriler tüketildiğinde uygun sarmalama yöntemine ihtiyacımız var resume() Yapımcıyı geri almak için düzenleme yapın.

void resume() { if( not handle_.done()) { handle_.resume();} }

Şimdi tüketicinin ihtiyaçlarını karşılamak için eşyordamı genişletmemiz gerekiyor: veri kullanılabilirliği ile senkronize edilmesi gerekiyor. Yani üretici tarafından verinin mevcut olduğu rapor edilene kadar tüketici beklemeye alınır. Bunu yapmak için arayüze ihtiyacımız var Awaiter uygulamak:

class promise_type
{
    // Awaiter interface: for consumer waiting on data being ready
    struct AudioDataAwaiter
    {

        explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}

        bool await_ready() const
        {
            return promise_.data_ready_.load(std::memory_order::relaxed);
        }

        void await_suspend(handle_type) const
        {
            while( not promise_.data_ready_.exchange(false))
            {
                std::this_thread::yield();
            }
        }

        // move assignment at client invocation side:
        //        const auto data = co_await audioDataResult;
        // This requires that coroutine's result type provides
        // the co_await unary operator

        value_type&& await_resume() const
        {
            return std::move(promise_.data_);
        }

    private:
            promise_type& promise_;

    };//Awaiter interface

};//promise_type

Durum makinesinde var await_ready() ilk geçiş durumu: veri kullanılabilirliği kontrol edilir. Veriler hazır değilse devam edin await_suspend() isminde. Burada aslında ilgili bayrak ayarlanana kadar bekleyeceğiz. Sonunda olacak await_resume() denir: değeri “taşırız” promise_typekoşulsuz olarak rvalue referansına dönüştürmek. İstemci çağrısı tarafında bu, dönüş değeri için atama operatörünün şu şekilde olmasına neden olur: data – çağrıldı.

const auto data = co_await audioDataResult;

Bunun çalışması için sonuç türünün tekli operatöre sahip olması gerekir co_await bizimkini sağlayın Awaiterarayüz geri döner.

class AudioDataResult
{
    auto operator co_await() noexcept
    {
        return promise_type::AudioDataAwaiter{handle_.promise()};
    }
};

: https://godbolt.org/z/MvYfbEP8r

Aşağıdaki program producerConsumer.cpp örnek 1’in basitleştirilmiş bir versiyonunu gösterir:

// producerConsumer.cpp

#include <algorithm>
#include <atomic>
#include <chrono>
#include <coroutine>
#include <functional>
#include <iostream>
#include <iterator>
#include <memory>
#include <source_location>
#include <thread>
#include <utility>
#include <vector>


void funcName(const std::source_location location = std::source_location::current()) {
    std::cout << location.function_name() << 'n';
}


template <typename Container>
void printContainer(const Container& container)
{
    typedef typename Container::value_type value_type;
    auto first = std::cbegin(container);
    auto last = std::cend(container);

    std::cout << " [";
    std::copy(first, std::prev(last), std::ostream_iterator<value_type>(std::cout, ", "));
    std::cout << *std::prev(last) << "]n";
}




class [[nodiscard]] AudioDataResult final
{
    public:
        class promise_type;
        using handle_type = std::coroutine_handle<promise_type>;
        
        // Predefined interface that has to be specify in order to implement
        // coroutine's state-machine transitions
        class promise_type 
        {
            
            public:
                
                using value_type = std::vector<int>;

                AudioDataResult get_return_object() 
                {
                    return AudioDataResult{handle_type::from_promise(*this)};
                }
                std::suspend_never initial_suspend() noexcept { return {}; }
                std::suspend_always final_suspend() noexcept { return {}; }
                void return_void() {}
                void unhandled_exception() 
                {
                    std::rethrow_exception(std::current_exception());
                }

                // Generates the value and suspend the "producer"
                template <typename Data>
                requires std::convertible_to<std::decay_t<Data>, value_type>
                std::suspend_always yield_value(Data&& value) 
                {
                    data_ = std::forward<Data>(value);
                    data_ready_.store(true);
                    return {};
                }

                // Awaiter interface: for consumer waiting on data being ready
                struct AudioDataAwaiter 
                {
                    explicit AudioDataAwaiter(promise_type& promise) noexcept: promise_(promise) {}

                    bool await_ready() const { return promise_.data_ready_.load();}
                    
                    void await_suspend(handle_type) const
                    {
                        while(not promise_.data_ready_.exchange(false)) {
                             std::this_thread::yield(); 
                        }
                    }
                    // move assignment at client invocation side: const auto data = co_await audioDataResult;
                    // This requires that coroutine's result type provides the co_await unary operator
                    value_type&& await_resume() const 
                    {
                        return std::move(promise_.data_);
                    }

                    private: 
                        promise_type& promise_;
                };//Awaiter interface

        
            private:
                value_type data_;
                std::atomic<bool> data_ready_;
        }; //promise_type interface

        
        auto operator co_await() noexcept   
        {
            return promise_type::AudioDataAwaiter{handle_.promise()};
        }

        // Make the result type move-only, due to ownership over the handle
        AudioDataResult(const AudioDataResult&) = delete;
        AudioDataResult& operator=(const AudioDataResult&) = delete;

        AudioDataResult(AudioDataResult&& other) noexcept: handle_(std::exchange(other.handle_, nullptr)){}
        AudioDataResult& operator=(AudioDataResult&& other) noexcept 
        {
            using namespace std;
            AudioDataResult tmp = std::move(other);
            swap(*this, tmp);
            return *this;
        }

        // d-tor: RAII
        ~AudioDataResult() { if (handle_) {funcName(); handle_.destroy();}}

        // For resuming the producer - at the point when the data are consumed
        void resume() {if (not handle_.done()) { funcName(); handle_.resume();}}
    
    private:
        AudioDataResult(handle_type handle) noexcept : handle_(handle) {}

    private:
    handle_type handle_;
};


using data_type = std::vector<int>;
AudioDataResult producer(const data_type& data) 
{
    for (std::size_t i = 0; i < 5; ++i) {
        funcName();
        co_yield data;
    }
    co_yield data_type{}; // exit criteria
}

AudioDataResult consumer(AudioDataResult& audioDataResult) 
{
    while(true)
    {
        funcName();
        const auto data = co_await audioDataResult;
        if (data.empty()) {std::cout << "No data - exit!n"; break;}
        std::cout << "Data received:";
        printContainer(data);
        audioDataResult.resume(); // resume producer
    }
}

int main() 
{
    {
        const data_type data = {1, 2, 3, 4};
        auto audioDataProducer = producer(data);
        std::thread t ([&]{auto audioRecorded = consumer(audioDataProducer);});
        t.join();
    }

    std::cout << "bye-bye!n";
}

Son olarak programın çıktısı şu şekildedir:

Diğer seçenek kullanmaktır promise_type::await_transform()dosyada belirtilen değeri beklemek için promise_typeüretici tarafından kullanılan örnek saklanır.

class promise_type
{
    auto await_transform(handle_type other)
    {
        // Awaiter interface: remained the same
        struct AudioDataAwaiter
        {
            explicit AudioDataAwaiter(promise_type& promise)noexcept: promise_(promise) {}
            ...
        };

        return AudioDataAwaiter{other.promise()};
    }
};

Bu şekilde artık tekli operatöre ihtiyacımız yok co_await sonuç türünün yerine (açık) bir dönüştürme operatörü,

explicit operator handle_type() const {return handle_;}

tüketicinin istediği noktaya ulaştırabilmemiz için co_await çağrının içinde olanı arar await_transform() tercüme edilmiştir.

const auto data = co_await static_cast<AudioDataResult::handle_type>(audioDataResult);

Bunu şu şekilde örneklendirebiliriz: me.handle_.promise().await_transform(other.handle_)

: https://godbolt.org/z/57zsK9rEn

Bu basit örnekte, üretici herhangi bir ceza ödemeden duraklatılır çünkü yeniden başlatıldığında, önceden bilinen tam olarak aynı veri dizisini döndürür. Gerçek dünya senaryosunda durum muhtemelen böyle değildir: Üreticinin kendisi muhtemelen bir tür aracı, eşzamansız olarak gönderilen ve tüketiciye geri gönderilen verilerin alıcısı olacaktır. Bu nedenle, veri duraklatıldığında ve tüketicinin devam etmesini beklediğinde veri kaybını önlemek, üreticinin veri varış hızı ile tüketici tüketim hızı arasındaki farkları telafi etmek için üretici tarafında kuyruklama mantığı uygulanmalıdır.

C++20’de üç yol tanımlanabilir veya default rica etmek. Bu, altı karşılaştırma operatörünün tamamının mevcut olduğu anlamına gelir: ==, !=, <, <=, > VE >=. Eşitlik operatörünü de kullanabilirsiniz (==) tanımlayın veya ile default rica etmek.

Blogum kısa bir Noel tatili veriyor. Bir sonraki makaleyi 8 Ocak’ta yayınlayacağım. Tüm okuyuculara bol eğlenceler diliyorum.


(kendim)

Haberin Sonu

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir