安装的话下载压缩包解压安装就行, 在服务器上后台运行后就可以正常使用了.
简易生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| #include <librdkafka/rdkafka.h> #include <stdio.h> #include <signal.h> #include <string.h>
static volatile sig_atomic_t run = 1;
static void stop(int sig) { run = 0; }
int main() { rd_kafka_t *producer; rd_kafka_conf_t *conf; char errstr[512]; conf = rd_kafka_conf_new(); if (rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "Config error: %s\n", errstr); return 1; } producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!producer) { fprintf(stderr, "Failed to create producer: %s\n", errstr); return 1; } signal(SIGINT, stop); signal(SIGTERM, stop); printf("Kafka Producer started. Press Ctrl+C to exit.\n"); printf("librdkafka version: %s\n", rd_kafka_version_str()); int message_count = 0; while (run && message_count < 10) { char msg[256]; snprintf(msg, sizeof(msg), "Test message %d from C client", message_count); rd_kafka_resp_err_t err = rd_kafka_producev( producer, RD_KAFKA_V_TOPIC("test"), RD_KAFKA_V_VALUE(msg, strlen(msg)), RD_KAFKA_V_END ); if (err) { fprintf(stderr, "Failed to produce message %d: %s\n", message_count, rd_kafka_err2str(err)); } else { printf("Produced message %d: %s\n", message_count, msg); } rd_kafka_poll(producer, 1000); message_count++; sleep(1); } rd_kafka_flush(producer, 5000); rd_kafka_destroy(producer); printf("Producer exited\n"); return 0; }
|
rd_kafka_conf_t : 存放kafka的标准配置, 如bootstrap.servers(集群入口点)等.
rd_kafka_new : 接管rd_kafka_conf_t的所有权, 并且传入RD_KAFKA_PRODUCER表示生产者.
rd_kafka_producev : 消息发送函数.
1 2 3 4 5 6 7 8 9 10
| rd_kafka_producev( producer, RD_KAFKA_V_TOPIC("test"), RD_KAFKA_V_VALUE(msg, strlen(msg)), RD_KAFKA_V_KEY("key1", 4), RD_KAFKA_V_HEADER("hdr1", "val1"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_TIMESTAMP(1234567890), RD_KAFKA_V_END )
|
- 消息键: 相同的消息键一定会分到统一分区, 如果数据之间有关联或经常被一块取出, 设置消息键可以让读取更加快速.
- 消息头 : 存放元数据, 本质就是自定义的数据键值, 可以用于帮助业务处理和理解.
简易消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| #include <librdkafka/rdkafka.h> #include <stdio.h> #include <signal.h> #include <string.h>
static volatile sig_atomic_t run = 1;
static void stop(int sig) { run = 0; }
int main() { rd_kafka_t *consumer; rd_kafka_conf_t *conf; rd_kafka_topic_partition_list_t *subscription; char errstr[512]; conf = rd_kafka_conf_new(); rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "group.id", "c-test-group", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "enable.auto.commit", "true", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "session.timeout.ms", "45000", ...); rd_kafka_conf_set(conf, "heartbeat.interval.ms", "3000", ...); consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if (!consumer) { fprintf(stderr, "Failed to create consumer: %s\n", errstr); return 1; } subscription = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(subscription, "test", -1); if (rd_kafka_subscribe(consumer, subscription) != RD_KAFKA_RESP_ERR_NO_ERROR) { fprintf(stderr, "Failed to subscribe\n"); rd_kafka_destroy(consumer); return 1; } rd_kafka_topic_partition_list_destroy(subscription); signal(SIGINT, stop); printf("Consumer started. Waiting for messages...\n"); while (run) { rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 1000); if (msg) { if (msg->err) { if (msg->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { printf("Reached end of partition\n"); } else { fprintf(stderr, "Consumer error: %s\n", rd_kafka_message_errstr(msg)); } } else { printf("Received message (%zd bytes): %.*s\n", msg->len, (int)msg->len, (char *)msg->payload); } rd_kafka_message_destroy(msg); } } rd_kafka_consumer_close(consumer); rd_kafka_destroy(consumer); printf("Consumer exited\n"); return 0; }
|
核心配置 :
group.id : 设置消费者组ID.
auto.offset.reset : 位移读取策略.
heartbeat.interval.ms : 心跳.
session.timeout.ms : 会话超时时间.
- earliest:从最早可用的消息开始(从头消费)
- latest:从最新消息开始(跳过历史消息), 用于实时应用
传入RD_KAFKA_CONSUMER构建消费者句柄.
rd_kafka_topic_partition_list_t : 存储当前消费者订阅的主题和分区.
rd_kafka_topic_partition_list_add : 填入想要订阅的topic和partition, partition为-1时代表订阅所有分区.
rd_kafka_consumer_poll : 用来轮询拉取订阅的消息, 拉取一条消息返回.
rd_kafka_message_t :
1 2 3 4 5 6 7 8 9 10 11 12
| typedef struct rd_kafka_message_s { rd_kafka_resp_err_t err; rd_kafka_topic_t *rkt; int32_t partition; void *payload; size_t len; void *key; size_t key_len; int64_t offset; rd_kafka_headers_t *headers; } rd_kafka_message_t;
|
这里其实传递的消息结构就非常灵活了, 可以是任何的结构体形式.

生产环境级别的消费者
一般来说, 生产者对于kafka的使用比较简单, 使用api就可以满足大部分情况, 在客户端上使用.
而消费者一般都是搭载在服务器上, 需要接收的任务和处理的任务数量是极高的, 因此需要有更合理的设计, 比如引入线程池, 事务回滚, 状态检查之类的设计, 其实本质就是围绕kafka的消费者接口做性能优化, 这就非常自由了, 下面只是ai出来的可能的生产环境的消费者结构 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
| #include <librdkafka/rdkafka.h> #include <pthread.h> #include <semaphore.h> #include <time.h> #include <sqlite3.h> #include <curl/curl.h> #include <jansson.h>
typedef struct { int consumer_id; char group_id[256]; pthread_t poll_thread; pthread_t worker_threads[8]; pthread_t monitor_thread; MessageQueue *input_queue; MessageQueue *output_queue; MessageQueue *dead_letter_queue; DatabasePool *db_pool; RedisPool *redis_pool; HttpClientPool *http_pool; PartitionState *partition_states[100]; CheckpointManager *checkpoints; StateStore *state_store; MetricsCollector *metrics; AlertManager *alerts; AuditLogger *audit_log; volatile sig_atomic_t running; volatile sig_atomic_t paused; int in_rebalance; time_t start_time; long long messages_processed; long long messages_failed; } ProductionConsumer;
void* consumer_main_thread(void *arg) { ProductionConsumer *consumer = (ProductionConsumer *)arg; consumer->start_time = time(NULL); init_connection_pools(consumer); load_saved_state(consumer); rd_kafka_conf_t *conf = create_production_config(consumer); rd_kafka_t *kafka_consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); rd_kafka_conf_set_rebalance_cb(conf, production_rebalance_cb); rd_kafka_conf_set_offset_commit_cb(conf, offset_commit_cb); rd_kafka_conf_set_error_cb(conf, error_cb); rd_kafka_subscribe(kafka_consumer, NULL); start_worker_threads(consumer); while (consumer->running) { rd_kafka_message_t *msg = rd_kafka_consumer_poll(kafka_consumer, 100); if (msg) { if (!msg->err) { enqueue_message(consumer->input_queue, msg); consumer->metrics->increment("messages.received"); } else { handle_kafka_error(msg->err, msg); } } check_consumer_health(consumer); } graceful_shutdown(consumer, kafka_consumer); return NULL; }
void* worker_thread_func(void *arg) { WorkerContext *ctx = (WorkerContext *)arg; ProductionConsumer *consumer = ctx->consumer; while (consumer->running) { KafkaMessage *msg = dequeue_message(consumer->input_queue, 1000); if (!msg) continue; DatabaseConnection *db = acquire_db_connection(consumer->db_pool); begin_transaction(db); try { BusinessData *data = parse_business_message(msg); process_business_logic(data, db, consumer); commit_transaction(db); consumer->metrics->increment("messages.processed"); consumer->messages_processed++; } catch (Exception *e) { rollback_transaction(db); consumer->metrics->increment("messages.failed"); consumer->messages_failed++; send_to_dead_letter_queue(msg, e); if (should_alert(e)) { consumer->alerts->send("PROCESSING_ERROR", e); } } release_db_connection(db); free_kafka_message(msg); } return NULL; }
|
自定义分区策略
所谓分区策略就是新消息要放到哪个分区中去处理, 默认方式是轮询分区, 也就是每个分区轮流接收. 这种策略可以很好地做到负载均衡, 但是这种方式并没有指定性, 也就是说, 如果一些消息处理是有顺序性的话, 那么是有问题的.
举个例子, 如果一个用户发布了两个消息A和B, 需求上A要比B先处理, 那么如果是轮询负载, 就会发布到分区0和分区1, 但是分区1的消费者可能比分区0的消费提前消费, 那么就会产生问题.
重平衡机制
重平衡上一章已经有涉及, 本质就是kafka服务器为了应对消费者数量变化而实现的分区分配机制.
虽然分配机制我们无法影响, 但是我们可以通过设置重平衡回调来对于分区变化进行响应.
1
| rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
|
那么为什么消费者要对于分区变化进行响应呢?
我一开始觉得分区变化不影响消费者消费, 毕竟一个topic中的消息都是类型一致的, 但是这其实有违kafka的设计理念.
在kafka的设计中partition(分区)是要被并行处理的, 这样其实是需要消费者基于分区进行消费的. 因此资源就应该以分区为单位分配, 如果所有分区共享资源, 那么相互竞争资源就没办法体现分区并行处理的优势了!
在分区的消费过程中, 我们会有很多额外的需求, 比如 :
- 状态维护 : 记录当前分区处理的消费数目, 处理时间, 处理详情等等.
- 连接绑定 : 每个分区都可以绑定自己的数据库连接, reids连接, socket连接等等.
这些都是我们需要维护针对分区维护的”资源”.
在知道为什么需要响应后, 我们来了解如何响应, 下面是重平衡回调的固定参数 :
1 2 3 4
| void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque);
|
- rd_kafka_t : 消费者实例, 用来接收新分配, 释放当前分配和提交偏移量, 这些都是要消费者手动调用的.
- rd_kafka_resp_err_t : 事件类型, 用来判断是获得新分区还是失去旧分区.
- rd_kafka_topic_partition_list_t : 可以获取当前具体的分区信息, 可以利用分区信息的唯一性 写日志 / 申请资源 / 释放资源.
- opaque : 自定义结构, 可以存放连接资源等等.
下面是一个非常简易的重平衡回调例子 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) { SimpleContext *ctx = (SimpleContext *)opaque; if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { for (int i = 0; i < partitions->cnt; i++) { rd_kafka_topic_partition_t *p = &partitions->elems[i]; p->offset = ...; } rd_kafka_assign(rk, partitions); } else if (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { rd_kafka_commit(rk, partitions, 1); rd_kafka_assign(rk, NULL); } }
|
需要注意的是在自定义的重平衡回调中是必须要调用rd_kafka_assign来接受和释放分配的, 并且也要使用rd_kafka_commit提交当前读取的偏移量到kafka服务器上, 不然是无法正常运行的!
当然如果不设置回调, 也会有一个最简易的默认回调, 该回调中就只有对于rd_kafka_assign和rd_kafka_commit的调用了.