cppkafka
最后更新于
这有帮助吗?
最后更新于
这有帮助吗?
reset offset
class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
void rebalance_cb (RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
RdKafka::TopicPartition *part;
// find the partition, through std::find() or other means
...
if (part)
part->set_offset(1234);
consumer->assign(partitions);
} else {
consumer->unassign();
}
}
};