cppkafka
reset offset
class ExampleRebalanceCb : public RdKafka::RebalanceCb { public: void rebalance_cb (RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) { if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { RdKafka::TopicPartition *part; // find the partition, through std::find() or other means ... if (part) part->set_offset(1234); consumer->assign(partitions); } else { consumer->unassign(); } } };
最后更新于
这有帮助吗?