1、kafka概述

  • 消息中间件对比
特性ActiveMQRabbitMQRocketMQKafka
开发语言javaerlangjavascala
单机吞吐量万级万级10万级100万级
时效性msusmsms级以内
可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
功能特性成熟的产品、较全的文档、各种协议支持好并发能力强、性能好、延迟低MQ功能比较完善,扩展性佳只支持主要的MQ功能,主要应用于大数据领域

分区

不是随机写 (追加)

分段设计

高可用设计

  • 消息中间件对比-选择建议
消息中间件建议
Kafka追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
RocketMQ可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验
RabbitMQ性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apache.org/

1-Kafka入门.png

名词解释

2-名称解释.png

  • producer: 发布消息的对象称之为主题生产者(Kafka topic producer)

  • topic: Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

  • consumer: 订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker: 已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2、Kafka入门使用

  • 引入依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!--kafka客户端依赖-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
</dependencies>
  • 修改application.yml配置文件
server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 101.42.152.102:9092
    # 生产者配置
    producer:
      # 配置消息的序列化类型为String类型
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • 生产者代码
/**
 * @Author :leaflei
 * @Version: 1.0
 * @Description :kafka生产者
 */
@RestController
public class ProducerController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 生产消息:key+value
     */
    @GetMapping("/send/{key}/{value}")
    public String send1(@PathVariable("key") String key, @PathVariable("value") String value){
        // 发送消息,key+value
        kafkaTemplate.send("testTopic1",key, value);
        return "ok";
    }

    /**
     * 生产消息:value
     */
    @GetMapping("/send/{value}")
    public String send2( @PathVariable("value") String value){
        // 发送消息,value
        kafkaTemplate.send("testTopic2", value);
        return "ok";
    }

}
  • 消费者代码
/**
 * @Author :leaflei
 * @Version: 1.0
 * @Description :kafka消费者
 */
@Component
public class ConsumerListener {


    /**
     * 消费者监听 key+value
     *
     * @param consumerRecord 监听到的消息
     */
    @KafkaListener(topics = "testTopic1")
    public void receiveMsg1(ConsumerRecord<String, String> consumerRecord) {
        // 使用java.util包下的Optional.ofNullable()方法,判断是否为空,并包装为一个Optional对象
        Optional<ConsumerRecord<String, String>> optional = Optional.ofNullable(consumerRecord);

        // ifPresent()方法:取出一个非空的值(consumerRecord对象)
        optional.ifPresent(x -> System.out.println(x.key() + "==>" + x.value()));
    }

    /**
     * 消费者监听 value
     *
     * @param consumerRecord 监听到的消息
     */
    @KafkaListener(topics = "testTopic2")
    public void receiveMsg2(ConsumerRecord<String, String> consumerRecord) {
        Optional<ConsumerRecord<String, String>> optional = Optional.ofNullable(consumerRecord);
        optional.ifPresent(x -> System.out.println(x.value()));
    }
}

3、消费分组(多消费者)

  • 同一个消费组下的消费者,只能有一个消费者收到消息(一对一)
# 配置多个消费者在相同的消费组内
    # 消费者1
    consumer:
      #在消费组test1
      group-id: ${spring.application.name}-test1
    ...
    # 消费者2
    consumer:
   	  #在消费组test1
      group-id: ${spring.application.name}-test1
    ...
    # 消费者3
    consumer:
      #在消费组test1
      group-id: ${spring.application.name}-test1
    ...
  • 不同消费组下消费者,每个组内起码一个消费者能收到消息(1对多,广播效果)
# 配置多个消费者在不同的消费组
    consumer:
      # 配置消费者1所在群组 tes1
      group-id: ${spring.application.name}-test1
    ...
    
    consumer:
      # 配置消费者2所在群组 test2
      group-id: ${spring.application.name}-test2
    ...