Problema Produttore-Consumatore in C++11

di il
4 risposte

Problema Produttore-Consumatore in C++11

Salve a tutti,sperò qualcuno sia disposto ad aiutarmi,sto sbattendo la testa con i magnifici puntatori.

Per l' università(Uninettuno) sto seguendo un corso in cui ,tra le molte cose,c'è il multithreading in C++11.

Mi sto esercitando sul problema del produttore/consumatore ma ho incontrato delle difficoltà quando il buffer invece di contenere una coda di interi contiene una coda di msg_t, dove rappresenta una classe apposita presente nell' esercizio.

#ifndef HOMEWORK_1_MSG_T_H
#define HOMEWORK_1_MSG_T_H
class msg_t{
public:
    msg_t();
    msg_t *msg_init(void *content_);
    void msg_destroy(msg_t *msg);
    void *content;
};
#endif //HOMEWORK_1_MSG_T_H
#include "msg_t.h"
msg_t::msg_t() : content(nullptr) {}
msg_t* msg_t::msg_init(void *content_) {
    content=content_;
    return static_cast<msg_t *>(content);
}
void msg_t::msg_destroy(msg_t *msg){
    content=msg;
    delete content;
    content= nullptr;
}
#ifndef HOMEWORK_1_BUFFER_T_H
#define HOMEWORK_1_BUFFER_T_H

#include <queue>
#include <mutex>
#include <condition_variable>
#include "msg_t.h"

#define BUFFER_ERROR (msg_t *) NULL
class buffer_t{
public:
    buffer_t* buffer_init(unsigned int maxsize);
    void put_bloccante(buffer_t* buffer, msg_t *msg);
    msg_t* get_bloccante(buffer_t* buffer);
private:
    unsigned int maxSize;
    std::queue<msg_t> messages;   //coda di messaggi condivisa
    std::mutex mtx;              //Mutex per garantire l'accesso esclusivo alla coda
    std::condition_variable bufferNotEmpty;
    std::condition_variable bufferNotFull;

};
buffer_t* buffer_t::buffer_init(unsigned int maxsize){
         (this)->maxSize=maxsize;
         return this;
}
void buffer_t::put_bloccante(buffer_t *buffer, msg_t *msg) {
    try {
        std::unique_lock<std::mutex> lock(mtx);
        bufferNotFull.wait(lock,[&buffer]{return buffer->messages.size() < (buffer)->maxSize;});
        (buffer)->messages.push(*msg);
        bufferNotEmpty.notify_all();  // Notifica i consumatori che il buffer non è vuoto
        lock.unlock();
        //msg->msg_destroy(msg);
    }
    catch(const std::exception& e){
        std::cerr << "An exception occurred: " << e.what() << std::endl;

    }
}
msg_t*  buffer_t::get_bloccante(buffer_t* buffer){
    try {
        std::unique_lock<std::mutex> lock(mtx);
        bufferNotEmpty.wait(lock, [&buffer] { return !buffer->messages.empty(); });  // Attende finché il buffer non è vuoto
        msg_t *message=new msg_t();
        *message = (buffer)->messages.front();
        (buffer)->messages.pop();
        bufferNotFull.notify_all();
        lock.unlock();
        //message.msg_destroy(&message);
        return message;
    }
    catch(const std::exception& e){
        std::cerr << "An exception occurred: " << e.what() << std::endl;
        return nullptr;
    }
}
#include <iostream>
#include <thread>
#include "buffer_t.h"

void producer(buffer_t& buffer, int producer_id);
void consumer(buffer_t& buffer, int consumer_id);
int main() {
    buffer_t *buffer=new buffer_t();
    buffer->buffer_init(10);
    std::thread producers(producer,std::ref(*buffer),1);
    std::thread consumers(consumer,std::ref(*buffer),1);
    producers.join();
    consumers.join();
    return 0;
}
void producer(buffer_t& buffer, int producer_id) {
    try {

        for (int i = 0; i < 10; i++) {
            msg_t* message=new msg_t();
            message->msg_init(&i);
            buffer.put_bloccante(&buffer, message);
            int* contentPtr = static_cast<int*>(message->content);
            int contentValue = *contentPtr;
            std::cout << "\nProducer " << producer_id << " pushed message -->" <<contentValue<< std::endl;
            delete message;
        }
    }
    catch (const std::exception& e) {
        std::cerr << "producer " << producer_id << " encountered an exception: " << e.what() << std::endl;
    }
}

void consumer(buffer_t& buffer, int consumer_id) {

    for (int i = 0; i < 10; i++) {
        try {
            msg_t* message=new msg_t();
            message = buffer.get_bloccante(&buffer);
            int* contentPtr = static_cast<int*>(message->content);
            int contentValue = *contentPtr;
            std::cout << "\nConsumer " << consumer_id << " popped message -->" <<contentValue<< std::endl;
            delete message;
        } catch (const std::exception& e) {
            std::cerr << "Consumer " << consumer_id << " encountered an exception: " << e.what() << std::endl;
        }
    }
}

Al momento per semplicità ho creato solo un produttore e un consumatore,con un biffer che contiene una oda di messaggi di tipo msg_t,ma succedono cose strane quando mangia e ? termina con errore.

4 Risposte

  • Re: Problema Produttore-Consumatore in C++11

    Succedono cose strane” non è un buon incentivo ad occuparsi del tuo codice. Al massimo ti risponde un esorcista…

    Cerca di spiegare meglio…

  • Re: Problema Produttore-Consumatore in C++11

    Non ho ben capito cosa intendi quando converti content al suo contenitore msg_t.

    Comunque lo std::unique_lock non andrebbe sbloccato manualmente, ma lo fa in automatico quando esce dal suo scopo. 

    https://en.cppreference.com/w/cpp/thread/unique_lock/%7Eunique_lock

    Per verificare che l'errore stia lì, prova a fare un blocco try, magari nel main, in cui catturi l'eccezione qui riportata:

    https://en.cppreference.com/w/cpp/thread/unique_lock/unlock

  • Re: Problema Produttore-Consumatore in C++11

    03/07/2023 - oregon ha scritto:


    Succedono cose strane” non è un buon incentivo ad occuparsi del tuo codice. Al massimo ti risponde un esorcista…

    Cerca di spiegare meglio…

    nel senso che l' esito cambia ad ogni esecuzione,a volte termina con Exit Code 0,mentre due volte su tre cosi:

    C:\Users\vitto\CLionProjects\HomeWork_1\cmake-build-debug\HomeWork_1.exe

    Producer 1 pushed message -->5

    Producer 1 pushed message -->6

    Producer 1 pushed message -->7

    Producer 1 pushed message -->8

    Producer 1 pushed message -->9

    Consumer 1 popped message -->1

    Process finished with exit code -1073741819 (0xC0000005)

  • Re: Problema Produttore-Consumatore in C++11

    Raga Ho risolto!per prima cosa ho inserito i try vatch con l' eccezione suggerita sopra per catturare eventuale problema dell' unique-lock,ho tolto l' unlock,e ho sistemato il metodo msg_init della classe msg_t.

    posto i cambiamenti per chi ha il mio stesso problema:

    class msg_t{
    public:
        msg_t();
        msg_t(void* content_);
        void *msg_init(void *content_);
        void msg_destroy(msg_t *msg);
    
    private:
        void *content;
    public:
        void *getContent() const;
    
        void setContent(void *content);
    };
    #include "msg_t.h"
    msg_t::msg_t() : content(nullptr) {}
    msg_t::msg_t(void* content_) : content(content_) {}
    void* msg_t::msg_init(void *content_) {
        content=content_;
        return (content);
    }
    void msg_t::msg_destroy(msg_t *msg){
        content=msg;
        delete content;
        content= nullptr;
    }
    
    void *msg_t::getContent() const {
        return content;
    }
    
    void msg_t::setContent(void *content) {
        msg_t::content = content;
    }
    #include <iostream>
    #include "buffer_t.h"
    
    //
    // Created by vitto on 02/07/2023.
    //
    buffer_t* buffer_t::buffer_init(unsigned int maxsize){
             (this)->maxSize=maxsize;
             return this;
    }
    void buffer_t::put_bloccante(buffer_t *buffer, msg_t *msg) {
        try {
            std::unique_lock<std::mutex> lock(mtx);
            bufferNotFull.wait(lock,[&buffer](){return buffer->messages.size() < (buffer)->maxSize;});
            (buffer)->messages.push(*msg);
            bufferNotEmpty.notify_one();  // Notifica i consumatori che il buffer non è vuoto
        }
        catch(const std::system_error& e){
            std::cout << "Caught a system_error\n";
            if (e.code() == std::errc::invalid_argument)
                std::cout << "The error condition is 'std::errc::invalid_argument'\n";
            std::cout << "The error description is '" << e.what() << "'\n";
        }
    
    
    }
    msg_t*  buffer_t::get_bloccante(buffer_t* buffer){
        try {
            std::unique_lock<std::mutex> lock(mtx);
            bufferNotEmpty.wait(lock, [&buffer]() { return !buffer->messages.empty(); });  // Attende finché il buffer non è vuoto
            msg_t* message = new msg_t ((buffer)->messages.front());
            (buffer)->messages.pop();
            bufferNotFull.notify_one();
            return message;
        }
        catch(const std::system_error& e){
            std::cout << "Caught a system_error\n";
            if (e.code() == std::errc::invalid_argument)
                std::cout << "The error condition is 'std::errc::invalid_argument'\n";
            std::cout << "The error description is '" << e.what() << "'\n";
            return nullptr;
        }
    }
    
    
    #include <iostream>
    #include <thread>
    #include "buffer_t.h"
    
    void producer(buffer_t *buffer, int producer_id,int numMessage);
    void consumer(buffer_t *buffer, int consumer_id,int numMessage);
    int main() {
        try {
            buffer_t *buffer = new buffer_t();
            buffer->buffer_init(10);
            std::thread producers(producer, buffer, 1,5);
            std::thread consumers(consumer, buffer, 1,5);
            producers.join();
            consumers.join();
            return 0;
        }
        catch(const std::system_error& e){
            std::cout << "Caught a system_error\n";
            if (e.code() == std::errc::invalid_argument)
                std::cout << "The error condition is 'std::errc::invalid_argument'\n";
            std::cout << "The error description is '" << e.what() << "'\n";
            return -1;
        }
    }
    void producer(buffer_t * buffer, int producer_id,int numMessage) {
        try {
            for (int i = 0; i < numMessage; i++) {
                msg_t* message=new msg_t();
                message->msg_init(new int(i));
                //message->setContent(new int(i));
                buffer->put_bloccante(buffer, message);
                int* contentPtr = static_cast<int*>(message->getContent());
                int contentValue = *contentPtr;
                std::cout << "\nProducer " << producer_id << " pushed message -->" <<contentValue<< std::endl;
            }
        }
        catch (const std::exception& e) {
            std::cerr << "producer " << producer_id << " encountered an exception: " << e.what() << std::endl;
        }
    }
    
    void consumer(buffer_t *buffer, int consumer_id,int numMessage) {
        try {
            for (int i = 0; i < numMessage; i++) {
                msg_t *message = buffer->get_bloccante(buffer);
                int *contentPtr = static_cast<int *>(message->getContent());
                int contentValue = *contentPtr;
                std::cout << "\nConsumer " << consumer_id << " popped message -->" << contentValue << std::endl;
                delete message;
            }
        }catch (const std::exception& e) {
                std::cerr << "Consumer " << consumer_id << " encountered an exception: " << e.what() << std::endl;
            }
        }
    

    Grazie a tutti per i consigli, purtroppo sono stato mal abituato da java e il passaggio a c++ è tosto.

Devi accedere o registrarti per scrivere nel forum
4 risposte