Skip to content

Notifier优化 #256

@dyx2025

Description

@dyx2025

Notifier优化

ProposeBatch::Propose使用Notifier::WaitNotify接收返回值。目前Notifier的底层使用pipe。一个pipe使用两个文件描述符,默认使用较大的内核缓冲区(phxpaxos使用默认设置,笔者运行环境是65536字节),即使通过调节也只能把内核缓冲区降至一个页的大小(笔者运行环境是4096字节)。

业界为了解决pipe的以上,在 Linux 2.6.22,glibc 2.8引入了eventfd。eventfd只需要一个文件描述符,使用的内核缓冲区大小大大减少(核心内核缓冲区是一个uint64_t的计数器)。eventfd的man page主张在任何使用pipe作为简单信号事件的地方都可以使用eventfd替代pipe。

原文如下:
https://www.man7.org/linux/man-pages/man2/eventfd.2.html

Applications can use an eventfd file descriptor instead of a pipe
       (see [pipe(2)](https://www.man7.org/linux/man-pages/man2/pipe.2.html)) in all cases where a pipe is used simply to signal
       events.

facebook的生产级c++库folly的通知队列NotificationQueue在支持eventfd的环境中优先选择使用eventfd,不支持才选择使用pipe。

本人在模拟phxpaxos::Notifier用法的测试中发现,生产者使用eventfd的耗时是pipe耗时的0.92-0.93,消费者使用eventfd的耗时是pipe耗时的0.92-0.93。

在内存缓冲区使用量和运行速度方面,eventfd都比pipe有优势。

phxpaxos开启batch功能后,phxpaxos::Notifier处于热路径,有必要优化。

源代码路径:
src/utils/notifier_pool.h

// 底层使用pipe
class Notifier
{
public:
    Notifier();
    ~Notifier();

    int Init();

    void SendNotify(const int ret);

    void WaitNotify(int & ret);

private:
    int m_iPipeFD[2];
};

src/utils/notifier_pool.cpp

// 底层使用pipe
Notifier :: Notifier()
{
    m_iPipeFD[0] = -1;
    m_iPipeFD[1] = -1;
}

Notifier :: ~Notifier()
{
    for (int i = 0; i < 2; i++)
    {   
        if (m_iPipeFD[i] != -1) 
        {   
            close(m_iPipeFD[i]);
        }   
    }
}

int Notifier :: Init()
{
    int ret = pipe(m_iPipeFD);
    if (ret != 0)
    {   
        return ret;
    } 

    return 0;
}

void Notifier :: SendNotify(const int ret)
{
    int iWriteLen = write(m_iPipeFD[1], (char *)&ret, sizeof(int));
    assert(iWriteLen == sizeof(int));
}

void Notifier :: WaitNotify(int & ret)
{
    ret = -1;
    int iReadLen = read(m_iPipeFD[0], (char *)&ret, sizeof(int));
    assert(iReadLen == sizeof(int));
}

修改后代码:
src/utils/notifier_pool.h

// 底层使用pipe
class Notifier
{
public:
    Notifier();
    ~Notifier();

    int Init();

    void SendNotify(const int ret);

    void WaitNotify(int & ret);

private:
    // int m_iPipeFD[2];
    int event_fd_{-1}; // 新增代码
    int ret_{-1}; // 新增代码
};

src/utils/notifier_pool.cpp

// 底层使用pipe
Notifier :: Notifier()
{
    /*
    m_iPipeFD[0] = -1;
    m_iPipeFD[1] = -1;
    */
}

Notifier :: ~Notifier()
{
    /*
    for (int i = 0; i < 2; i++)
    {   
        if (m_iPipeFD[i] != -1) 
        {   
            close(m_iPipeFD[i]);
        }   
    }
    */
    close(event_fd_); // 新增代码
}

int Notifier :: Init()
{
    /*
    int ret = pipe(m_iPipeFD);
    if (ret != 0)
    {   
        return ret;
    }
    */

    // 新增代码
    event_fd_ = eventfd(0, 0); 
    if (-1 == event_fd_)
    {   
        return -1; 
    }  

    return 0;
}

void Notifier :: SendNotify(const int ret)
{
    /*
    int iWriteLen = write(m_iPipeFD[1], (char *)&ret, sizeof(int));
    assert(iWriteLen == sizeof(int));
    */

    // 新增代码
    ret_ = ret;
    uint64_t v = 1;
    int iWriteLen = write(event_fd_, &v, sizeof(uint64_t));
    assert(iWriteLen == sizeof(uint64_t));
}

void Notifier :: WaitNotify(int & ret)
{
    /*
    ret = -1;
    int iReadLen = read(m_iPipeFD[0], (char *)&ret, sizeof(int));
    assert(iReadLen == sizeof(int));
    */

    uint64_t v;
    int iReadLen = read(event_fd_, &v, sizeof(uint64_t));
    assert(iReadLen == sizeof(uint64_t) && 1 == v); 
    ret = ret_;
}

测试代码模拟phxpaxos::Notifier一来一回的使用方法(生产者和消费者分别交替调用SendNotify和WaitNotify)。对比使用以pipe为底层和以eventfd为底层Notifier的生产者和消费者的耗时。

测试代码:
pipe_notifier.h

namespace test {                                                                                                                                                                    

class PipeNotifier
{
public:
    PipeNotifier();
    ~PipeNotifier();

    int Init();

    void SendNotify(const int ret);

    void WaitNotify(int & ret);

private:
    int m_iPipeFD[2];
};

}

pipe_notifier.cpp

#include <unistd.h>                                                                                                                                                                 
#include <assert.h>

#include "pipe_notifier.h"

namespace test {

PipeNotifier :: PipeNotifier()
{
    m_iPipeFD[0] = -1; 
    m_iPipeFD[1] = -1; 
}

PipeNotifier :: ~PipeNotifier()
{
    for (int i = 0; i < 2; i++)
    {   
        if (m_iPipeFD[i] != -1) 
        {   
            close(m_iPipeFD[i]);
        }   
    }   
}

int PipeNotifier :: Init()
{
    int ret = pipe(m_iPipeFD);
    if (ret != 0)
    {   
        return ret;
    }   

    return 0;
}

void PipeNotifier :: SendNotify(const int ret)
{
    int iWriteLen = write(m_iPipeFD[1], (char *)&ret, sizeof(int));
    assert(iWriteLen == sizeof(int));
}

void PipeNotifier :: WaitNotify(int & ret)
{
    ret = -1; 
    int iReadLen = read(m_iPipeFD[0], (char *)&ret, sizeof(int));
    assert(iReadLen == sizeof(int));
}

}

eventfd_notifier.h

namespace test {                                                                                                                                                                    

class EventFDNotifier
{
public:
    EventFDNotifier();
    ~EventFDNotifier();

    int Init();

    void SendNotify(const int ret);

    void WaitNotify(int & ret);

private:
    int event_fd_{-1};
    int ret_{-1};
};

}

eventfd_notifier.cpp

#include <unistd.h>                                                                                                                                                                 
#include <sys/eventfd.h>
#include <assert.h>
#include <string.h>

#include "eventfd_notifier.h"

#include <iostream>

namespace test {

EventFDNotifier :: EventFDNotifier()
{
}

EventFDNotifier :: ~EventFDNotifier()
{
    close(event_fd_);
}

int EventFDNotifier :: Init()
{
    event_fd_ = eventfd(0, 0); 
    if (-1 == event_fd_)
    {   
        return -1; 
    }   

    return 0;
}

void EventFDNotifier :: SendNotify(const int ret)
{
    ret_ = ret;
    uint64_t v = 1;
    int iWriteLen = write(event_fd_, &v, sizeof(uint64_t));
    assert(iWriteLen == sizeof(uint64_t));
}

void EventFDNotifier :: WaitNotify(int & ret)
{
    uint64_t v;
    int iReadLen = read(event_fd_, &v, sizeof(uint64_t));
    assert(iReadLen == sizeof(uint64_t) && 1 == v); 
    ret = ret_;
}

}

test_notifier.cpp

#include <unistd.h>                                                                                                                                                                 
#include <stdlib.h>
#include <fcntl.h>
#include <x86intrin.h>
#include <time.h>

#include <iostream>
#include <vector>
#include <deque>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <atomic>

#include "pipe_notifier.h"
#include "eventfd_notifier.h"

constexpr int g_producer_thread_count = 1;
constexpr int g_consumer_thread_count = 1;

constexpr std::pair<int, int> g_ret_range = {-1000000, 1000000};

static std::atomic<bool> g_original_consumer_run{false};
static test::PipeNotifier g_original_notifier;
static std::atomic<int> g_original_producer_ret{g_ret_range.first};
static std::atomic<int> g_original_consumer_ret{g_ret_range.second};

static void OriginalProducer(uint64_t* elapsed) {
    while (!g_original_consumer_run.load(std::memory_order_acquire)) {}

    uint64_t start = __rdtsc();
    for (int i = g_ret_range.first; i <= g_ret_range.second; ++i) {
        while (g_original_producer_ret.load(std::memory_order_acquire) != i) {}
        g_original_notifier.SendNotify(i);
        g_original_consumer_ret.store(i, std::memory_order_release);
    }   
    *elapsed = __rdtsc() - start;
}

static void OriginalConsumer(uint64_t* elapsed) {
    int ret = g_original_notifier.Init();
    if (ret) {
        std::cout << "g_original_notifier.Init fail ret = " << ret << std::endl;
        exit(-1);
    }   

    g_original_consumer_run.store(true, std::memory_order_release);
    uint64_t start = __rdtsc();
    for (int i = g_ret_range.first; i <= g_ret_range.second; ++i) {
        while (g_original_consumer_ret.load(std::memory_order_acquire) != i) {}
        g_original_notifier.WaitNotify(ret);
        if (ret != i) {
            std::cout << "ret " << ret << " != i " << i << std::endl;
            exit(-1);
        } 
        g_original_producer_ret.store(i + 1, std::memory_order_release);
    }
    *elapsed = __rdtsc() - start;
}

static std::atomic<bool> g_improved_consumer_run{false};
static test::EventFDNotifier g_improved_notifier;
static std::atomic<int> g_improved_producer_ret{g_ret_range.first};
static std::atomic<int> g_improved_consumer_ret{g_ret_range.second};

static void ImprovedProducer(uint64_t* elapsed) {
    while (!g_improved_consumer_run.load(std::memory_order_acquire)) {}

    uint64_t start = __rdtsc();
    for (int i = g_ret_range.first; i <= g_ret_range.second; ++i) {
        while (g_improved_producer_ret.load(std::memory_order_acquire) != i) {}
        g_improved_notifier.SendNotify(i);
        g_improved_consumer_ret.store(i, std::memory_order_release);
    }
    *elapsed = __rdtsc() - start;
}

static void ImprovedConsumer(uint64_t* elapsed) {
    int ret = g_improved_notifier.Init();
    if (ret) {
        std::cout << "g_improved_notifier.Init fail ret = " << ret << std::endl;
        exit(-1);
    }

    g_improved_consumer_run.store(true, std::memory_order_release);
    uint64_t start = __rdtsc();
    for (int i = g_ret_range.first; i <= g_ret_range.second; ++i) {
        while (g_improved_consumer_ret.load(std::memory_order_acquire) != i) {}
        g_improved_notifier.WaitNotify(ret);
        if (ret != i) {
            std::cout << "ret " << ret << " != i " << i << std::endl;
            exit(-1);
        }
        g_improved_producer_ret.store(i + 1, std::memory_order_release);
    }
    *elapsed = __rdtsc() - start;
}

typedef void (*producer_function)(uint64_t* elapsed);
typedef void (*consumer_function)(uint64_t* elapsed);

static void Benchmarck(const std::string& prefix, producer_function pf, consumer_function cf) {
    std::vector<uint64_t> producer_elapseds(g_producer_thread_count, 0);
    std::vector<std::thread> producer_threads;
    producer_threads.reserve(g_producer_thread_count);
    for (int i = 0; i < g_producer_thread_count; ++i) {
        producer_threads.emplace_back(pf, &producer_elapseds[i]);
    }

    std::vector<uint64_t> consumer_elapseds(g_consumer_thread_count, 0);                                                                                                            
    std::vector<std::thread> consumer_threads;
    consumer_threads.reserve(g_consumer_thread_count);
    for (int i = 0; i < g_consumer_thread_count; ++i) {
        consumer_threads.emplace_back(cf, &consumer_elapseds[i]);
    }

    uint64_t producer_elapseds_sum = 0;
    for (int i = 0; i < g_producer_thread_count; ++i) {
        if (producer_threads[i].joinable()) {
            producer_threads[i].join();
        }
        producer_elapseds_sum += producer_elapseds[i];
    }

    uint64_t consumer_elapseds_sum = 0;
    for (int i = 0; i < g_consumer_thread_count; ++i) {
        if (consumer_threads[i].joinable()) {
            consumer_threads[i].join();
        }
        consumer_elapseds_sum += consumer_elapseds[i];
    }

    std::cout << "prefix = " << prefix << "producer_elapseds_ave = " << producer_elapseds_sum / g_producer_thread_count << " consumer_elapsed_ave = " << consumer_elapseds_sum / g_c
onsumer_thread_count << std::endl;
}

int main(int argc, char** argv) {
    Benchmarck("original", OriginalProducer, OriginalConsumer);
    Benchmarck("improved", ImprovedProducer, ImprovedConsumer);
    return 0;
}                                                                                                                                  

测试代码编译命令:

g++ pipe_notifier.cpp eventfd_notifier.cpp test_notifier.cpp

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions