写在前面
好久没更新 brpc 的内容了, 主要是因为还是对 brpc 没有深入的理解, 也沉淀不出来什么东西…
代码:Server 端
#include "echo.pb.h"
#include <brpc/server.h>
#include <butil/logging.h>
#include <gflags/gflags.h>
#include <brpc/channel.h>
#include <brpc/server.h>
#include <brpc/stream.h>
#include <string>
DEFINE_bool(echo_attachment, true, "Echo attachment as well");
DEFINE_int32(port, 8002, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
DEFINE_int32(internal_port, -1, "Only allow builtin services at this port");
DEFINE_string(protocol, "baidu_std",
"Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "",
"Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(attachment_size, 0,
"Carry so many byte attachment along with requests");
DEFINE_int32(request_size, 16, "Bytes of each request");
std::string g_request;
std::string g_attachment;
class StreamReceiver : public brpc::StreamInputHandler {
// 仅流式请求需要这样接收,
// 普通的echo 可以直接用 request->message() 获取消息,
// 用 response->message() 发送消息
public:
virtual int on_received_messages(brpc::StreamId id,
butil::IOBuf *const messages[],
size_t size) {
std::ostringstream os;
for (size_t i = 0; i < size; ++i) {
os << "msg[" << i << "]=" << *messages[i];
}
LOG(INFO) << "从 client 收到的消息=" << id << ": " << os.str();
// //组装消息, 在从 client 收到消息之后开始发送回 client
butil::IOBuf msg1;
msg1.append(*messages[0]);
msg1.append("$$$");
CHECK_EQ(0, brpc::StreamWrite(id, msg1));
return 0;
}
virtual void on_idle_timeout(brpc::StreamId id) {
LOG(INFO) << "Stream=" << id << " has no data transmission for a while";
}
virtual void on_closed(brpc::StreamId id) {
LOG(INFO) << "Stream=" << id << " is closed";
}
};
// Your implementation of example::EchoService
class StreamingEchoService : public example::EchoService {
public:
StreamingEchoService() : _sd(brpc::INVALID_STREAM_ID) {}
virtual ~StreamingEchoService() {
brpc::StreamClose(_sd);
};
virtual void Echo(google::protobuf::RpcController *controller,
const example::EchoRequest * /*request*/,
example::EchoResponse *response,
google::protobuf::Closure *done) {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
brpc::Controller *cntl =
static_cast<brpc::Controller *>(controller);
brpc::StreamOptions stream_options;
stream_options.handler = &_receiver;
if (brpc::StreamAccept(&_sd, *cntl, &stream_options) != 0) {
cntl->SetFailed("Fail to accept stream");
return;
}
response->set_message("Accepted stream");
}
private:
StreamReceiver _receiver;
brpc::StreamId _sd;
};
DEFINE_bool(h, false, "print help information");
int main(int argc, char *argv[]) {
std::string help_str = "dummy help infomation";
GFLAGS_NS::SetUsageMessage(help_str);
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_h) {
fprintf(stderr, "%s\n%s\n%s", help_str.c_str(), help_str.c_str(),
help_str.c_str());
return 0;
}
// Generally you only need one Server.
brpc::Server server;
// Instance of your service.
// example::EchoServiceImpl echo_service_impl;
StreamingEchoService echo_service_impl;
// Add the service into server. Notice the second parameter, because the
// service is put on stack, we don't want server to delete it, otherwise
// use brpc::SERVER_OWNS_SERVICE.
if (server.AddService(&echo_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) !=
0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
// Start the server.
brpc::ServerOptions options;
options.mutable_ssl_options()->default_cert.certificate = "cert.pem";
options.mutable_ssl_options()->default_cert.private_key = "key.pem";
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;
options.internal_port = FLAGS_internal_port;
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
return 0;
}
client
#include "butil/iobuf.h"
#include "echo.pb.h"
#include <brpc/channel.h>
#include <brpc/server.h>
#include <brpc/stream.h>
#include <bthread/bthread.h>
#include <bthread/butex.h>
#include <butil/logging.h>
#include <bvar/bvar.h>
#include <gflags/gflags.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define MAX_LINE_LENGTH 1024
#define MAX_LINES 1000000
DEFINE_int32(thread_num, 2, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_int32(attachment_size, 0,
"Carry so many byte attachment along with requests");
DEFINE_int32(request_size, 16, "Bytes of each request");
DEFINE_string(protocol, "baidu_std",
"Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "",
"Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_bool(enable_ssl, false, "Use SSL connection");
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
std::string g_request;
std::string g_attachment;
bvar::LatencyRecorder g_latency_recorder("client");
bvar::Adder<int> g_error_count("client_error_count");
butil::IOBuf lines[MAX_LINES];
int line_count = 0;
bool done_reading = false;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *read_file(void *arg) {
FILE *file = (FILE *)arg;
char buffer[MAX_LINE_LENGTH];
while (fgets(buffer, sizeof(buffer), file) != NULL) {
pthread_mutex_lock(&mutex); // 加锁
butil::IOBuf tmp;
tmp.append(buffer);
lines[line_count++] = std::move(tmp); // strdup??
pthread_cond_signal(&cond); // 唤醒一个等待的处理线程
pthread_mutex_unlock(&mutex); // 解锁
}
fclose(file);
// 通知所有线程读取完毕
pthread_mutex_lock(&mutex); // 加锁
done_reading = true;
pthread_cond_broadcast(&cond); // 广播通知所有等待的处理线程
pthread_mutex_unlock(&mutex); // 解锁
pthread_exit(NULL);
}
// 接收消息都要继承自这个类, 用于设置收到消息之后的动作
class ClientStreamReceiver : public brpc::StreamInputHandler {
public:
virtual int on_received_messages(brpc::StreamId id,
butil::IOBuf *const messages[],
size_t size) {
std::ostringstream os;
for (size_t i = 0; i < size; ++i) {
os << "msg[" << i << "]=" << *messages[i];
}
LOG(INFO) << "从 Server 收到的消息=" << id << ": " << os.str();
return 0;
}
virtual void on_idle_timeout(brpc::StreamId id) {
LOG(INFO) << "Server Stream=" << id
<< " has no data transmission for a while";
}
virtual void on_closed(brpc::StreamId id) {
LOG(INFO) << "Server Stream=" << id << " is closed";
}
};
static void *sender(void *arg) {
example::EchoService_Stub stub(
static_cast<google::protobuf::RpcChannel *>(arg));
int log_id [[maybe_unused]] = 0;
// streaming
brpc::StreamId stream;
brpc::Controller cntl;
/// add this
ClientStreamReceiver clientStreamReceiver;
brpc::StreamOptions streamOptions;
streamOptions.handler = &clientStreamReceiver;
///
if (brpc::StreamCreate(&stream, cntl, &streamOptions) != 0) {
LOG(ERROR) << "Fail to create stream";
return NULL;
}
LOG(INFO) << "Created Stream=" << stream;
example::EchoRequest request;
example::EchoResponse response;
request.set_message(g_request); // 此时这条语句 不发送数据
stub.Echo(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to connect stream, " << cntl.ErrorText();
// return -1;
}
while (1) {
pthread_mutex_lock(&mutex); // 加锁
while (line_count == 0 && !done_reading) {
pthread_cond_wait(&cond, &mutex); // 等待条件变量信号并释放锁
}
if (line_count == 0 && done_reading) {
pthread_mutex_unlock(&mutex); // 解锁
break;
}
butil::IOBuf line = std::move(lines[--line_count]); // 访问共享数据
pthread_mutex_unlock(&mutex); // 解锁
if (line != NULL) {
// 执行特定操作,例如打印行和线程ID
// printf("Thread %d processing line: %s", thread_id, line);
// printf("processing line: %s\n", line);
// LOG(INFO)<< line<<" is processing ...";
line.append(" client_msg=" + std::to_string(log_id++));
CHECK_EQ(0, brpc::StreamWrite(stream, line));
// free(line); // 释放内存
}
// usleep(500);
}
// for statistic
while (!brpc::IsAskedToQuit()) {
if (!cntl.Failed()) {
g_latency_recorder << cntl.latency_us();
} else {
g_error_count << 1;
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "error=" << cntl.ErrorText()
<< " latency=" << cntl.latency_us();
bthread_usleep(50000);
}
sleep(1);
}
return NULL;
}
int main(int argc, char *argv[]) {
// std::cout << my_list.front() << std::endl;
// exit(0);
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
if (FLAGS_enable_ssl) {
options.mutable_ssl_options();
}
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
&options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
if (FLAGS_attachment_size > 0) {
g_attachment.resize(FLAGS_attachment_size, 'a');
}
if (FLAGS_request_size <= 0) {
LOG(ERROR) << "Bad request_size=" << FLAGS_request_size;
return -1;
}
g_request.resize(FLAGS_request_size, 'r');
if (FLAGS_dummy_port >= 0) {
brpc::StartDummyServerAt(FLAGS_dummy_port);
}
pthread_t reader_thread;
FILE *file = fopen("data.txt", "r");
if (file == NULL) {
perror("Error opening file");
return EXIT_FAILURE;
}
// 创建文件读取线程
if (pthread_create(&reader_thread, NULL, read_file, (void *)file) != 0) {
perror("Error creating reader thread");
return EXIT_FAILURE;
}
// 创建处理/发送线程
std::vector<bthread_t> bids;
std::vector<pthread_t> pids;
if (!FLAGS_use_bthread) {
pids.resize(FLAGS_thread_num);
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (pthread_create(&pids[i], NULL, sender, &channel) != 0) {
LOG(ERROR) << "Fail to create pthread";
return -1;
}
}
} else {
bids.resize(FLAGS_thread_num);
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (bthread_start_background(&bids[i], NULL, sender, &channel) !=
0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
}
}
while (!brpc::IsAskedToQuit()) {
sleep(1);
LOG(INFO) << "Sending EchoRequest at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}
LOG(INFO) << "EchoClient is going to quit";
// 等待文件读取线程完成
pthread_join(reader_thread, NULL);
// 等待发送线程
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (!FLAGS_use_bthread) {
pthread_join(pids[i], NULL);
} else {
bthread_join(bids[i], NULL);
}
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
printf("Processing complete.\n");
return 0;
}
执行
./echo_client -thread_num=10
坑点
- 多条流式请求之间需要通过
usleep()
方式显式指定消息发送的时间间隔, 否则Server 端收到消息之后直接发回会导致segment fault. 或者导致 streamwrite 返回值非零. - StreamAccept函数注册接收流信息的回调函数, 通过 option 传入.
- 线程数多了的话 server 会 core, 不知道是为什么, 猜测可能是 server 的问题.
- 尝试仅向 server发送数据, 而不回收数据, 看看会不会 core
常见错误
- segment fault :
- 是因为没有注册相应的 receiver handler, 导致使用 streamwrite 写入 stream 时候出现 sf
- 发送数据过快, 导致消息重叠, 也会 sf
- 线程数过多也会导致这个问题. 看看用性能更好的机器试试?