commit() $consumer->commit(); # commit current partition assignment (blocking call) $consumer->commit(1); # commit current partition assignment (non-blocking call) my $tp_list = Net::Kafka::TopicPartitionList->new(); $tp_list->add("my_topic", 0); $tp_list->set_offset("my_topic", 0, 12345); $consumer->commit(0, $tp_list); # commit $tp_list assignment (blocking call); Commit offsets on broker for the provided list of partitions. If no partitions provided current assignment is committed instead.
commit_message(); my $message = $consumer->poll(1000); $consumer->commit_message(0, $message); # commit message (blocking call); $consumer->commit_message(1, $message); # commit message (non-blocking call); Commit message's offset on broker for the message's partition.
В случае того модуля что я кидал, когда читаешь из партиции, то передвигаемый офсет без комита не запоминается. Комит нужно делать когда хочешь чтоб кафка в следующий раз те же самые данные из очереди не давала