Kafka源码深度剖析——响应被处理了以后内存是如何处理的?

上一次我们分析到一个消息发送出去以后,然后针对其响应进行的处理,我们发现KafkaProducer最终会为每个消息都调用一次回调函数。这一小节的内容比较简单轻松,我们看一下一个消息成功发送了,并且也调用其回调函数了,那之前发送请求的时候申请的内存,KafkaProducer是如何处理的?

-     本次目标     -

上一次我们分析到一个消息发送出去以后,然后针对其响应进行的处理,我们发现KafkaProducer最终会为每个消息都调用一次回调函数。这一小节的内容比较简单轻松,我们看一下一个消息成功发送了,并且也调用其回调函数了,那之前发送请求的时候申请的内存,KafkaProducer是如何处理的?


-     源码剖析     -

我们再次看一下上一小节的代码:

  private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long timestamp, long correlationId, long now) {
        if (error != Errors.NONE && canRetry(batch, error)) {
            // retry
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                     correlationId,
                     batch.topicPartition,
                     this.retries - batch.attempts - 1,
                     error);
            this.accumulator.reenqueue(batch, now);
            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
        } else {
            RuntimeException exception;
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                exception = new TopicAuthorizationException(batch.topicPartition.topic());
            else
                exception = error.exception();
            // tell the user the result of their request
            //TODO 执行最后的操作,为每个消息调用回调函数
            batch.done(baseOffset, timestamp, exception);
           //TODO 重点的代码来了,回调函数调用完了以后,执行的是这儿的代码
            //这儿看起来就是释放内存的意思。
            this.accumulator.deallocate(batch);
            if (error != Errors.NONE)
                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
        }
        if (error.exception() instanceof InvalidMetadataException) {
            if (error.exception() instanceof UnknownTopicOrPartitionException)
                log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                        "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
            metadata.requestUpdate();
        }

        // Unmute the completed partition.
        if (guaranteeMessageOrder)
            this.accumulator.unmutePartition(batch.topicPartition);
    }
         ......

释放资源流程:

   public void deallocate(RecordBatch batch) {
        //TODO 从已完成提交的队列里移除
        incomplete.remove(batch);
        //TODO 释放资源
        free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
    }

重点观察deallocate方法:

 public void deallocate(ByteBuffer buffer, int size{
        //加锁
        lock.lock();
        try {
            //如果当前分区使用的内存刚好就是16k,那么就直接把放入内存池
            if (size == this.poolableSize && size == buffer.capacity()) {
                //把数据清空
                buffer.clear();
                //把内存放回内存池
                this.free.add(buffer);
            } else {
                //如果 内存块的大小不是一个 标准的 批次的内存大小,那么
                //标记内存可用,等着垃圾回收把内存回收掉就可以了。
                this.availableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                //唤醒之前因为内存满了,而在wait的线程。
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }

最后要么释放内存,要么把内存放回内存池了。


-     总结     -

这一节我们看了一个被正常处理的响应的流程。那么我们想一下如果响应返回来带有异常,那么针对有异常的请求的响应KafkaProducer又是如何处理的?我们下一讲继续剖析。


相关内容推荐:

  Kafka源码深度剖析系列(汇总)


更多技术资料及视频请关注微信公众号或添加 QQ交流群领取

attachments-2020-07-fDTAamMF5f1e8391948b7.png

0 条评论

请先 登录 后评论
李希沅 | 奈学教育
李希沅 | 奈学教育

奈学教育 | 金牌讲师

36 篇文章

作家榜 »

  1. NX小编 1116 文章
  2. 58沈剑 309 文章
  3. 奈学教育 137 文章
  4. 李希沅 | 奈学教育 36 文章
  5. 江帅帅 | 奈学教育 29 文章
  6. 林淮川 | 奈学教育 12 文章
  7. 科技热点 10 文章
  8. 邱鹏超 2 文章