序
本文主要研究一下storm-kafka-client的ProcessingGuarantee
ProcessingGuarantee
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
/**
* This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
* i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
* The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
* NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
*/
@InterfaceStability.Unstable
public enum ProcessingGuarantee {
/**
* An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
* times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
* interval.
*/
AT_LEAST_ONCE,
/**
* Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
* components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
* ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done
*/
AT_MOST_ONCE,
/**
* The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
* be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
* spout to control when commits occur. Commits asynchronously on the defined interval.
*/
NO_GUARANTEE,
}
- storm-kafka-client与旧版的storm-kafka不同之一就是引入了ProcessingGuarantee,是的整个代码更为清晰
- ProcessingGuarantee.AT_LEAST_ONCE就是开启ack的版本,它类似kafka client的auto commit,在指定interval定期commit
- ProcessingGuarantee.AT_MOST_ONCE,它就不管ack了,在polled out消息的时候同步commit(
忽略interval配置
),因而该消息最多被处理一次 - ProcessingGuarantee.NO_GUARANTEE,这个也是不管ack的,不过它跟ProcessingGuarantee.AT_LEAST_ONCE类似,是在指定interval定期commit,不同的是它是异步提交
KafkaSpout.open
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
public class KafkaSpout<K, V> extends BaseRichSpout {
//Initial delay for the commit and subscription refresh timers
public static final long TIMER_DELAY_MS = 500;
// timer == null only if the processing guarantee is at-most-once
private transient Timer commitTimer;
// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
// or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once.
private transient Map<TopicPartition, OffsetManager> offsetManagers;
// Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;
//......
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
// Spout internals
this.collector = collector;
// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
// Retries management
retryService = kafkaSpoutConfig.getRetryService();
tupleListener = kafkaSpoutConfig.getTupleListener();
if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
// In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = new HashMap<>();
commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
tupleListener.open(conf, context);
if (canRegisterMetrics()) {
registerMetric();
}
LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}
//......
}
- open的时候判断,只要不是ProcessingGuarantee.AT_MOST_ONCE,那么就初始化commitTimer,period值为kafkaSpoutConfig.getPartitionRefreshPeriodMs(),如果没有设置,默认是2000ms
Timer.isExpiredResetOnTrue
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/internal/Timer.java
public class Timer {
private final long delay;
private final long period;
private final TimeUnit timeUnit;
private final long periodNanos;
private long start;
//......
/**
* Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
* case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
* (re-initiated) and a new cycle will start.
*
* @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
* otherwise.
*/
public boolean isExpiredResetOnTrue() {
final boolean expired = Time.nanoTime() - start >= periodNanos;
if (expired) {
start = Time.nanoTime();
}
return expired;
}
}
- Timer有一个重要的方法是isExpiredResetOnTrue,用于判断“调度时间”是否到了,这个在nextTuple里头有调用到
KafkaSpout.nextTuple
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Next Tuple =======
@Override
public void nextTuple() {
try {
if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
kafkaSpoutConfig.getSubscription().refreshAssignment();
}
if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples(kafkaConsumer.assignment());
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
kafkaConsumer.commitAsync(offsetsToCommit, null);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
}
PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
if (pollablePartitionsInfo.shouldPoll()) {
try {
setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
} catch (RetriableException e) {
LOG.error("Failed to poll from kafka.", e);
}
}
emitIfWaitingNotEmitted();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
}
- nextTuple先判断要不要刷新subscription,然后就判断commitTimer,判断是否应该提交commit,这里是调用commitTimer.isExpiredResetOnTrue()
- ProcessingGuarantee类型如果是NO_GUARANTEE,则调用createFetchedOffsetsMetadata创建待提交的offset及partition信息,然后调用kafkaConsumer.commitAsync进行异步提交;
- ProcessingGuarantee类型如果是AT_LEAST_ONCE,则调用commitOffsetsForAckedTuples进行提交
- 处理完offset提交之后,通过getPollablePartitionsInfo获取PollablePartitionsInfo,如果shouldPoll则调用pollKafkaBroker拉数据,然后通过setWaitingToEmit方法将拉取的数据放入waitingToEmit
- 最后调用emitIfWaitingNotEmitted方法,当有数据的时候就进行emit或者retry,没有数据时通过while循环进行waiting
createFetchedOffsetsMetadata
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : assignedPartitions) {
offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
}
return offsetsToCommit;
}
- 这里根据kafkaConsumer.assignment()的信息,通过kafkaConsumer.position(tp)提取下一步将要fetch的offset位置,通过commitMetadataManager.getCommitMetadata()提取CommitMetadata的json串作为元信息
commitOffsetsForAckedTuples
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
// Find offsets that are ready to be committed for every assigned topic partition
final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new HashMap<>();
for (Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
if (assignedPartitions.contains(entry.getKey())) {
assignedOffsetManagers.put(entry.getKey(), entry.getValue());
}
}
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
}
}
// Commit offsets that are ready to be committed for every topic partition
if (!nextCommitOffsets.isEmpty()) {
kafkaConsumer.commitSync(nextCommitOffsets);
LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
// Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
// in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
//Update the OffsetManager for each committed partition, and update numUncommittedOffsets
final TopicPartition tp = tpOffset.getKey();
long position = kafkaConsumer.position(tp);
long committedOffset = tpOffset.getValue().offset();
if (position < committedOffset) {
/*
* The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
* than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
* part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
* to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets.
*/
LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
position, committedOffset);
kafkaConsumer.seek(tp, committedOffset);
List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
if (waitingToEmitForTp != null) {
//Discard the pending records that are already committed
List<ConsumerRecord<K, V>> filteredRecords = new ArrayList<>();
for (ConsumerRecord<K, V> record : waitingToEmitForTp) {
if (record.offset() >= committedOffset) {
filteredRecords.add(record);
}
}
waitingToEmit.put(tp, filteredRecords);
}
}
final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
offsetManager.commit(tpOffset.getValue());
LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
}
} else {
LOG.trace("No offsets to commit. {}", this);
}
}
- 这里首先通过offsetManagers,获取已经ack的等待commit的partition以及msgId信息,如果是ProcessingGuarantee.AT_MOST_ONCE则该集合为空
- 之后根据CommitMetadata通过OffsetManager.findNextCommitOffset获取这一批待commit的消息的offset
- 然后调用kafkaConsumer.commitSync同步提交offset,之后更新本地的OffsetManager的committed相关信息
getPollablePartitionsInfo
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private PollablePartitionsInfo getPollablePartitionsInfo() {
if (isWaitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted.");
return new PollablePartitionsInfo(Collections.<TopicPartition>emptySet(), Collections.<TopicPartition, Long>emptyMap());
}
Set<TopicPartition> assignment = kafkaConsumer.assignment();
if (!isAtLeastOnceProcessing()) {
return new PollablePartitionsInfo(assignment, Collections.<TopicPartition, Long>emptyMap());
}
Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
Set<TopicPartition> pollablePartitions = new HashSet<>();
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
for (TopicPartition tp : assignment) {
OffsetManager offsetManager = offsetManagers.get(tp);
int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
if (numUncommittedOffsets < maxUncommittedOffsets) {
//Allow poll if the partition is not at the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
//Allow poll if there are retriable tuples within the maxUncommittedOffsets limit
pollablePartitions.add(tp);
} else {
LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp,
numUncommittedOffsets, maxUncommittedOffsets);
}
}
}
return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets);
}
- 这里对于不是ProcessingGuarantee.AT_LEAST_ONCE类型的,则直接根据kafkaConsumer.assignment()信息返回
- 如果是ProcessingGuarantee.AT_LEAST_ONCE类型类型的,这里会获取retryService.earliestRetriableOffsets(),把fail相关的offset信息整合进去
- 这里有一个maxUncommittedOffsets参数,在numUncommittedOffsets<maxUncommittedOffsets时会进行重试,如果大于等于maxUncommittedOffsets,则会进一步判断,如果是earliestRetriableOffset小于等于offsetAtLimit,那么也加入重试
pollKafkaBroker
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== poll =========
private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
Iterator<TopicPartition> pausedIter = pausedPartitions.iterator();
while (pausedIter.hasNext()) {
if (pollablePartitionsInfo.pollablePartitions.contains(pausedIter.next())) {
pausedIter.remove();
}
}
try {
kafkaConsumer.pause(pausedPartitions);
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
kafkaConsumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
return consumerRecords;
} finally {
kafkaConsumer.resume(pausedPartitions);
}
}
private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
//Seek directly to the earliest retriable message for each retriable topic partition
kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
}
private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
ConsumerRecords<K, V> consumerRecords) {
for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
if (!records.isEmpty()) {
ConsumerRecord<K, V> record = records.get(0);
long seekOffset = entry.getValue();
long earliestReceivedOffset = record.offset();
if (seekOffset < earliestReceivedOffset) {
//Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away.
//Ack up to the first offset received if the record is not already acked or currently in the topology
for (long i = seekOffset; i < earliestReceivedOffset; i++) {
KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null));
if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) {
LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp);
retryService.remove(msgId);
emitted.add(msgId);
ack(msgId);
}
}
}
}
}
}
- 如果PollablePartitionsInfo的pollablePartitions不为空,则会调用pollKafkaBroker拉取消息
- 首先调用了doSeekRetriableTopicPartitions,根据要重试的partition及offset信息,进行seek操作,对每个parition移动到要重试的最早的offset位置
- 拉取消息的时候,先pause不符合maxUncommitted等条件的paritions,然后进行poll消息,poll拉取消息之后判断如果是ProcessingGuarantee.AT_MOST_ONCE类型的,则调用kafkaConsumer.commitSync同步提交,然后返回拉取的记录(
最后设置到waitingToEmit
),最后再resume之前pause的partitions(通过这样避免拉取不符合提交条件的partitions的消息
); - 注意这里的pollablePartitionsInfo是根据getPollablePartitionsInfo()获取的,它是遍历kafkaConsumer.assignment()根据offsetManager及maxUncommittedOffsets等相关参数进行过滤,因此可以认为pollablePartitionsInfo.pollablePartitions是kafkaConsumer.assignment()的子集,而pausedPartitions是根据kafkaConsumer.assignment()过滤掉pollablePartitionsInfo.pollablePartitions得来的,因而pausedPartitions就是getPollablePartitionsInfo()中不满足条件被剔除的partitions,针对这些partitions,先pause再调用poll,最后再resume,也就是此次poll不会从pausedPartitions拉取消息
- 在poll消息之后还有一个动作就是调用ackRetriableOffsetsIfCompactedAway,针对已经compacted的消息进行ack处理
emitIfWaitingNotEmitted
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
private void emitIfWaitingNotEmitted() {
Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator();
outerLoop:
while (waitingToEmitIter.hasNext()) {
List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next();
while (!waitingToEmitForTp.isEmpty()) {
final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0));
if (emittedTuple) {
break outerLoop;
}
}
waitingToEmitIter.remove();
}
}
- emitIfWaitingNotEmitted主要是判断waitingToEmit有无数据,有则取出来触发emitOrRetryTuple,没有则不断循环进行waiting
emitOrRetryTuple
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
/**
* Creates a tuple from the kafka record and emits it if it was never emitted or it is ready to be retried.
*
* @param record to be emitted
* @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail
*/
private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it is pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
} else {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
if (isAtLeastOnceProcessing()
&& committedOffset != null
&& committedOffset.offset() > record.offset()
&& commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
// Ensures that after a topology with this id is started, the consumer fetch
// position never falls behind the committed offset (STORM-2844)
throw new IllegalStateException("Attempting to emit a message that has already been committed."
+ " This should never occur when using the at-least-once processing guarantee.");
}
final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
if (isEmitTuple(tuple)) {
final boolean isScheduled = retryService.isScheduled(msgId);
// not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried
if (!isScheduled || retryService.isReady(msgId)) {
final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
if (!isAtLeastOnceProcessing()) {
if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
collector.emit(stream, tuple, msgId);
LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
} else {
collector.emit(stream, tuple);
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
} else {
emitted.add(msgId);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.
retryService.remove(msgId);
}
collector.emit(stream, tuple, msgId);
tupleListener.onEmit(tuple, msgId);
LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
}
return true;
}
} else {
/*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
* to allow its offset to be commited to Kafka*/
LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
if (isAtLeastOnceProcessing()) {
msgId.setNullTuple(true);
offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
ack(msgId);
}
}
}
return false;
}
- emitOrRetryTuple是整个nextTuple的核心,这里包含了emit操作以及retry操作
- 由于针对fail的消息,是使用seek方法进行重新拉取的,因而这里要使用offsetManagers(
已经acked等待commit
)以及emitted(已经emit等待ack
)进行去重判断,如果这两者都不包含,才进行emit或者retry - 进行emit处理时,先通过retryService.isScheduled(msgId)判断是否是失败重试的,如果不是失败重试的,或者是失败重试的且已经到期了,那么就是进行下面的emit处理
- 针对ProcessingGuarantee.AT_LEAST_ONCE类型的,这里要维护emitted以及offsetManagers,然后进行emit操作,回调tupleListener.onEmit(tuple, msgId)方法;如果不是ProcessingGuarantee.AT_LEAST_ONCE类型的,则仅仅是进行collector.emit操作
KafkaSpout.ack
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Ack =======
@Override
public void ack(Object messageId) {
if (!isAtLeastOnceProcessing()) {
return;
}
// Only need to keep track of acked tuples if commits to Kafka are controlled by
// tuple acks, which happens only for at-least-once processing semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (msgId.isNullTuple()) {
//a null tuple should be added to the ack list since by definition is a direct ack
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
tupleListener.onAck(msgId);
return;
}
if (!emitted.contains(msgId)) {
LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
+ "came from a topic-partition that this consumer group instance is no longer tracking "
+ "due to rebalance/partition reassignment. No action taken.", msgId);
} else {
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
emitted.remove(msgId);
}
tupleListener.onAck(msgId);
}
- ack的时候,如果不是ProcessingGuarantee.AT_LEAST_ONCE类型,就立马返回
- 之后将已经acked的msgId放入到offsetManagers这个map中,等待在nextTuple中进行commit,然后将其从emitted中移除
- 这里有一个emitted的去重判断,如果不是之前emit过的就不处理,这种通常是rebalance/partition reassignment引起的
KafkaSpout.fail
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpout.java
// ======== Fail =======
@Override
public void fail(Object messageId) {
if (!isAtLeastOnceProcessing()) {
return;
}
// Only need to keep track of failed tuples if commits to Kafka are controlled by
// tuple acks, which happens only for at-least-once processing semantics
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
LOG.debug("Received fail for tuple this spout is no longer tracking."
+ " Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed."
+ " This should never occur barring errors in the RetryService implementation or the spout code.");
msgId.incrementNumFails();
if (!retryService.schedule(msgId)) {
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
// this tuple should be removed from emitted only inside the ack() method. This is to ensure
// that the OffsetManager for that TopicPartition is updated and allows commit progression
tupleListener.onMaxRetryReached(msgId);
ack(msgId);
} else {
tupleListener.onRetry(msgId);
emitted.remove(msgId);
}
}
- fail的时候也先判断,如果不是ProcessingGuarantee.AT_LEAST_ONCE类型,就立马返回
- 然后判断emitted中是否存在,如果不存在,则立刻返回,这通常是partition reassigned引起的
- fail的时候,调用retryService.schedule(msgId),如果不成功,则触发tupleListener.onMaxRetryReached,然后进行ack;如果成功则调用tupleListener.onRetry回调,然后从emitted中删除
KafkaSpoutRetryExponentialBackoff.schedule
storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
//This class assumes that there is at most one retry schedule per message id in this set at a time.
private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
/**
* Comparator ordering by timestamp
*/
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
@Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
if(result == 0) {
//TreeSet uses compareTo instead of equals() for the Set contract
//Ensure that we can save two retry schedules with the same timestamp
result = entry1.hashCode() - entry2.hashCode();
}
return result;
}
}
@Override
public boolean schedule(KafkaSpoutMessageId msgId) {
if (msgId.numFails() > maxRetries) {
LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
return false;
} else {
//Remove existing schedule for the message id
remove(msgId);
final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
retrySchedules.add(retrySchedule);
toRetryMsgs.add(msgId);
LOG.debug("Scheduled. {}", retrySchedule);
LOG.trace("Current state {}", retrySchedules);
return true;
}
}
@Override
public Map<TopicPartition, Long> earliestRetriableOffsets() {
final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
if(currentLowestOffset != null) {
tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
} else {
tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
}
} else {
break; // Stop searching as soon as passed current time
}
}
LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
return tpToEarliestRetriableOffset;
}
@Override
public boolean isReady(KafkaSpoutMessageId msgId) {
boolean retry = false;
if (isScheduled(msgId)) {
final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
if (retrySchedule.msgId.equals(msgId)) {
retry = true;
LOG.debug("Found entry to retry {}", retrySchedule);
break; //Stop searching if the message is known to be ready for retry
}
} else {
LOG.debug("Entry to retry not found {}", retrySchedule);
break; // Stop searching as soon as passed current time
}
}
}
return retry;
}
- schedule首先判断失败次数是否超过maxRetries,如果超过了则返回false,表示不再调度了,之后KafkaSpout在fail方法回调tupleListener.onMaxRetryReached方法,然后进行ack,表示不再处理了
- 没有超过maxRetries的话,则创建retrySchedule信息,然后添加到retrySchedules中;retrySchedules是一个TreeSet,默认使用RetryEntryTimeStampComparator,根据nextRetryTimeNanos进行排序,如果相等则按hashCode进行排序
- earliestRetriableOffsets以及isReady都会用到retrySchedules的信息
小结
-
storm-kafka-client主要针对kafka0.10及以上版本,它引入了ProcessingGuarantee枚举,该枚举有三个值,分别是
- ProcessingGuarantee.AT_LEAST_ONCE就是开启ack的版本,它类似kafka client的auto commit,在指定interval定期commit;它会维护已经emitted(
已经emitted但尚未ack
),offsetManagers(已经ack但尚未commit
)以及fail需要重试的retrySchedules - ProcessingGuarantee.AT_MOST_ONCE,它就不管ack了,在polled out消息的时候同步commit(
忽略interval配置
),因而该消息最多被处理一次 - ProcessingGuarantee.NO_GUARANTEE,这个也是不管ack的,不过它跟ProcessingGuarantee.AT_LEAST_ONCE类似,是在指定interval定期commit(
都依赖commitTimer
),不同的是它是异步
- ProcessingGuarantee.AT_LEAST_ONCE就是开启ack的版本,它类似kafka client的auto commit,在指定interval定期commit;它会维护已经emitted(
- ProcessingGuarantee.AT_LEAST_ONCE它结合了storm的ack机制,在spout的ack方法维护emitted(
已经emitted但尚未ack
);在fail方法将msgId放入到retryService进行重试(这个是ProcessingGuarantee.NO_GUARANTEE所没有的
);它跟ProcessingGuarantee.NO_GUARANTEE一样是依赖commitTimer,在initerval期间提交offset信息,不同的是它是commitSync,即同步提交,而且提交的是已经acked的消息;而ProcessingGuarantee.NO_GUARANTEE是异步提交,而且提交的是offset是不管是否在storm spout已经ack,而是以consumer的poll为准的 - ProcessingGuarantee.AT_MOST_ONCE是在pollKafkaBroker方法里头,在调用完kafkaConsumer.poll之后,调用kafkaConsumer.commitSync进行同步提交commit;它是同步提交,而且不依赖commitTimer,即不是interval提交offset
- ProcessingGuarantee.NO_GUARANTEE在nextTuple中判断需要commit的时候,调用kafkaConsumer.commitAsync进行异步提交,它跟ProcessingGuarantee.AT_LEAST_ONCE一样,都依赖commitTimer,在initerval期间提交offset,但是它是异步提交,而ProcessingGuarantee.AT_LEAST_ONCE是同步提交
- nextTuple()方法会pollKafkaBroker会调用kafkaConsumer.poll方法拉取消息,然后将拉取到的消息放入waitingToEmit,之后调用emitIfWaitingNotEmitted方法进行emit或者waiting,如果emit则是调用emitOrRetryTuple方法;由于pollKafkaBroker会执行seek操作将offset移动到每个parition中失败的offset中最小的位置,从那个位置开始重新拉取消息,拉取消息调用了kafkaConsumer.poll方法,KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE是在这里进行kafkaConsumer.commitSync同步提交offset的;由于包含了要重试的消息,emitOrRetryTuple这里要根据offsetManagers(
已经ack等待commit
)以及emitted(已经emit等待ack
)进行去重判断是否需要调用collector.emit;对于ProcessingGuarantee.AT_LEAST_ONCE类型,这里不仅调用emit方法,还需要维护offsetManagers、emitted及重试信息相关状态,然后回调tupleListener.onEmit方法;对于非ProcessingGuarantee.AT_LEAST_ONCE类型这里仅仅是emit。
doc
- Storm Apache Kafka integration using the kafka-client jar