Brpc学习(三)流式请求的多线程实现

 
Category: brpc

写在前面

好久没更新 brpc 的内容了, 主要是因为还是对 brpc 没有深入的理解, 也沉淀不出来什么东西…

代码:Server 端

#include "echo.pb.h"
#include <brpc/server.h>
#include <butil/logging.h>
#include <gflags/gflags.h>

#include <brpc/channel.h>
#include <brpc/server.h>
#include <brpc/stream.h>
#include <string>

DEFINE_bool(echo_attachment, true, "Echo attachment as well");
DEFINE_int32(port, 8002, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1,
             "Connection will be closed if there is no "
             "read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
DEFINE_int32(internal_port, -1, "Only allow builtin services at this port");

DEFINE_string(protocol, "baidu_std",
              "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "",
              "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(attachment_size, 0,
             "Carry so many byte attachment along with requests");
DEFINE_int32(request_size, 16, "Bytes of each request");
std::string g_request;
std::string g_attachment;


class StreamReceiver : public brpc::StreamInputHandler {
  // 仅流式请求需要这样接收,
  // 普通的echo 可以直接用 request->message() 获取消息,
  // 用 response->message() 发送消息
public:
  virtual int on_received_messages(brpc::StreamId id,
                                   butil::IOBuf *const messages[],
                                   size_t size) {
    std::ostringstream os;
    for (size_t i = 0; i < size; ++i) {
      os << "msg[" << i << "]=" << *messages[i];
    }
    LOG(INFO) << "从 client 收到的消息=" << id << ": " << os.str();
    //   //组装消息, 在从 client 收到消息之后开始发送回 client
     butil::IOBuf msg1;
     msg1.append(*messages[0]);
     msg1.append("$$$");
     CHECK_EQ(0, brpc::StreamWrite(id, msg1));

    return 0;
  }

  virtual void on_idle_timeout(brpc::StreamId id) {
    LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
  }

  virtual void on_closed(brpc::StreamId id) {
    LOG(INFO) << "Stream=" << id << " is closed";
  }
};

// Your implementation of example::EchoService
class StreamingEchoService : public example::EchoService {
public:
    StreamingEchoService() : _sd(brpc::INVALID_STREAM_ID) {}

    virtual ~StreamingEchoService() {
        brpc::StreamClose(_sd);
    };

    virtual void Echo(google::protobuf::RpcController *controller,
                      const example::EchoRequest * /*request*/,
                      example::EchoResponse *response,
                      google::protobuf::Closure *done) {
        // This object helps you to call done->Run() in RAII style. If you need
        // to process the request asynchronously, pass done_guard.release().
        brpc::ClosureGuard done_guard(done);

        brpc::Controller *cntl =
                static_cast<brpc::Controller *>(controller);
        brpc::StreamOptions stream_options;
        stream_options.handler = &_receiver;
        if (brpc::StreamAccept(&_sd, *cntl, &stream_options) != 0) {
            cntl->SetFailed("Fail to accept stream");
            return;
        }
        response->set_message("Accepted stream");
    }

private:
    StreamReceiver _receiver;
    brpc::StreamId _sd;
};

DEFINE_bool(h, false, "print help information");

int main(int argc, char *argv[]) {
  std::string help_str = "dummy help infomation";
  GFLAGS_NS::SetUsageMessage(help_str);

  // Parse gflags. We recommend you to use gflags as well.
  GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);

  if (FLAGS_h) {
    fprintf(stderr, "%s\n%s\n%s", help_str.c_str(), help_str.c_str(),
            help_str.c_str());
    return 0;
  }

  // Generally you only need one Server.
  brpc::Server server;

  // Instance of your service.
  // example::EchoServiceImpl echo_service_impl;
  StreamingEchoService echo_service_impl;

  // Add the service into server. Notice the second parameter, because the
  // service is put on stack, we don't want server to delete it, otherwise
  // use brpc::SERVER_OWNS_SERVICE.
  if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) !=
      0) {
    LOG(ERROR) << "Fail to add service";
    return -1;
  }

  // Start the server.
  brpc::ServerOptions options;
  options.mutable_ssl_options()->default_cert.certificate = "cert.pem";
  options.mutable_ssl_options()->default_cert.private_key = "key.pem";
  options.idle_timeout_sec = FLAGS_idle_timeout_s;
  options.max_concurrency = FLAGS_max_concurrency;
  options.internal_port = FLAGS_internal_port;
  if (server.Start(FLAGS_port, &options) != 0) {
    LOG(ERROR) << "Fail to start EchoServer";
    return -1;
  }

  // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
  server.RunUntilAskedToQuit();
  return 0;
}

client

#include "butil/iobuf.h"
#include "echo.pb.h"
#include <brpc/channel.h>
#include <brpc/server.h>
#include <brpc/stream.h>
#include <bthread/bthread.h>
#include <bthread/butex.h>
#include <butil/logging.h>
#include <bvar/bvar.h>
#include <gflags/gflags.h>

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define MAX_LINE_LENGTH 1024
#define MAX_LINES 1000000

DEFINE_int32(thread_num, 2, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_int32(attachment_size, 0,
             "Carry so many byte attachment along with requests");
DEFINE_int32(request_size, 16, "Bytes of each request");
DEFINE_string(protocol, "baidu_std",
              "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "",
              "Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_bool(enable_ssl, false, "Use SSL connection");
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");

std::string g_request;
std::string g_attachment;

bvar::LatencyRecorder g_latency_recorder("client");
bvar::Adder<int> g_error_count("client_error_count");

butil::IOBuf lines[MAX_LINES];
int line_count = 0;
bool done_reading = false;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *read_file(void *arg) {
    FILE *file = (FILE *)arg;
    char buffer[MAX_LINE_LENGTH];

    while (fgets(buffer, sizeof(buffer), file) != NULL) {
        pthread_mutex_lock(&mutex); // 加锁
        butil::IOBuf tmp;
        tmp.append(buffer);
        lines[line_count++] = std::move(tmp); // strdup??

        pthread_cond_signal(&cond);   // 唤醒一个等待的处理线程
        pthread_mutex_unlock(&mutex); // 解锁
    }

    fclose(file);

    // 通知所有线程读取完毕
    pthread_mutex_lock(&mutex); // 加锁
    done_reading = true;
    pthread_cond_broadcast(&cond); // 广播通知所有等待的处理线程
    pthread_mutex_unlock(&mutex);  // 解锁

    pthread_exit(NULL);
}

// 接收消息都要继承自这个类, 用于设置收到消息之后的动作
class ClientStreamReceiver : public brpc::StreamInputHandler {
  public:
    virtual int on_received_messages(brpc::StreamId id,
                                     butil::IOBuf *const messages[],
                                     size_t size) {
        std::ostringstream os;
        for (size_t i = 0; i < size; ++i) {
            os << "msg[" << i << "]=" << *messages[i];
        }
        LOG(INFO) << "从 Server 收到的消息=" << id << ": " << os.str();
        return 0;
    }

    virtual void on_idle_timeout(brpc::StreamId id) {
        LOG(INFO) << "Server Stream=" << id
                  << " has no data transmission for a while";
    }

    virtual void on_closed(brpc::StreamId id) {
        LOG(INFO) << "Server Stream=" << id << " is closed";
    }
};

static void *sender(void *arg) {
    example::EchoService_Stub stub(
        static_cast<google::protobuf::RpcChannel *>(arg));

    int log_id [[maybe_unused]] = 0;
    // streaming
    brpc::StreamId stream;
    brpc::Controller cntl;
    /// add this
    ClientStreamReceiver clientStreamReceiver;
    brpc::StreamOptions streamOptions;
    streamOptions.handler = &clientStreamReceiver;
    ///
    if (brpc::StreamCreate(&stream, cntl, &streamOptions) != 0) {
        LOG(ERROR) << "Fail to create stream";
        return NULL;
    }
    LOG(INFO) << "Created Stream=" << stream;
    example::EchoRequest request;
    example::EchoResponse response;

    request.set_message(g_request); // 此时这条语句 不发送数据
    stub.Echo(&cntl, &request, &response, NULL);
    if (cntl.Failed()) {
        LOG(ERROR) << "Fail to connect stream, " << cntl.ErrorText();
        // return -1;
    }

    while (1) {
        pthread_mutex_lock(&mutex); // 加锁
        while (line_count == 0 && !done_reading) {
            pthread_cond_wait(&cond, &mutex); // 等待条件变量信号并释放锁
        }

        if (line_count == 0 && done_reading) {
            pthread_mutex_unlock(&mutex); // 解锁
            break;
        }
        butil::IOBuf line = std::move(lines[--line_count]); // 访问共享数据
        pthread_mutex_unlock(&mutex);                       // 解锁

        if (line != NULL) {
            // 执行特定操作,例如打印行和线程ID
            // printf("Thread %d processing line: %s", thread_id, line);
            // printf("processing line: %s\n", line);
            // LOG(INFO)<< line<<" is processing ...";
            line.append("  client_msg=" + std::to_string(log_id++));
            CHECK_EQ(0, brpc::StreamWrite(stream, line));
            // free(line); // 释放内存
        }
        // usleep(500);
    }

    // for statistic
    while (!brpc::IsAskedToQuit()) {
        if (!cntl.Failed()) {
            g_latency_recorder << cntl.latency_us();
        } else {
            g_error_count << 1;
            CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
                << "error=" << cntl.ErrorText()
                << " latency=" << cntl.latency_us();
            bthread_usleep(50000);
        }
        sleep(1);
    }

    return NULL;
}

int main(int argc, char *argv[]) {
    // std::cout << my_list.front() << std::endl;
    // exit(0);
    // Parse gflags. We recommend you to use gflags as well.
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);

    brpc::Channel channel;

    // Initialize the channel, NULL means using default options.
    brpc::ChannelOptions options;
    if (FLAGS_enable_ssl) {
        options.mutable_ssl_options();
    }
    options.protocol = FLAGS_protocol;
    options.connection_type = FLAGS_connection_type;
    options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
    options.timeout_ms = FLAGS_timeout_ms;
    options.max_retry = FLAGS_max_retry;
    if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
                     &options) != 0) {
        LOG(ERROR) << "Fail to initialize channel";
        return -1;
    }

    if (FLAGS_attachment_size > 0) {
        g_attachment.resize(FLAGS_attachment_size, 'a');
    }
    if (FLAGS_request_size <= 0) {
        LOG(ERROR) << "Bad request_size=" << FLAGS_request_size;
        return -1;
    }
    g_request.resize(FLAGS_request_size, 'r');

    if (FLAGS_dummy_port >= 0) {
        brpc::StartDummyServerAt(FLAGS_dummy_port);
    }

    pthread_t reader_thread;
    FILE *file = fopen("data.txt", "r");
    if (file == NULL) {
        perror("Error opening file");
        return EXIT_FAILURE;
    }
    // 创建文件读取线程
    if (pthread_create(&reader_thread, NULL, read_file, (void *)file) != 0) {
        perror("Error creating reader thread");
        return EXIT_FAILURE;
    }

    // 创建处理/发送线程
    std::vector<bthread_t> bids;
    std::vector<pthread_t> pids;
    if (!FLAGS_use_bthread) {
        pids.resize(FLAGS_thread_num);
        for (int i = 0; i < FLAGS_thread_num; ++i) {
            if (pthread_create(&pids[i], NULL, sender, &channel) != 0) {
                LOG(ERROR) << "Fail to create pthread";
                return -1;
            }
        }
    } else {
        bids.resize(FLAGS_thread_num);
        for (int i = 0; i < FLAGS_thread_num; ++i) {
            if (bthread_start_background(&bids[i], NULL, sender, &channel) !=
                0) {
                LOG(ERROR) << "Fail to create bthread";
                return -1;
            }
        }
    }

    while (!brpc::IsAskedToQuit()) {
        sleep(1);
        LOG(INFO) << "Sending EchoRequest at qps=" << g_latency_recorder.qps(1)
                  << " latency=" << g_latency_recorder.latency(1);
    }

    LOG(INFO) << "EchoClient is going to quit";

    // 等待文件读取线程完成
    pthread_join(reader_thread, NULL);

    // 等待发送线程
    for (int i = 0; i < FLAGS_thread_num; ++i) {
        if (!FLAGS_use_bthread) {
            pthread_join(pids[i], NULL);
        } else {
            bthread_join(bids[i], NULL);
        }
    }

    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);

    printf("Processing complete.\n");

    return 0;
}

执行

./echo_client -thread_num=10

坑点

  1. 多条流式请求之间需要通过 usleep()方式显式指定消息发送的时间间隔, 否则Server 端收到消息之后直接发回会导致segment fault. 或者导致 streamwrite 返回值非零.
  2. StreamAccept函数注册接收流信息的回调函数, 通过 option 传入.
  3. 线程数多了的话 server 会 core, 不知道是为什么, 猜测可能是 server 的问题.
    1. 尝试仅向 server发送数据, 而不回收数据, 看看会不会 core

常见错误

  1. segment fault :
    1. 是因为没有注册相应的 receiver handler, 导致使用 streamwrite 写入 stream 时候出现 sf
    2. 发送数据过快, 导致消息重叠, 也会 sf
    3. 线程数过多也会导致这个问题. 看看用性能更好的机器试试?