一、背景

1、流式计算的概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

2、应用场景

  • 日志分析

    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

  • 大屏看板统计

    可以实时的查看网站注册数量,订单数量,购买数量,金额等。

  • 公交实时数据

    可以随时更新公交车方位,计算多久到达站牌等

  • 实时文章分值计算

    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

3、技术选型

  • Hadoop

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。Hadoop实现了一个分布式文件系统( Distributed File System),其中一个组件是HDFS(Hadoop Distributed File System)。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,而MapReduce则为海量的数据提供了计算 [1] 。

  • Apche Storm

    Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

  • Kafka Stream

    可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

二、Kafka Stream

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

1-KafkaStream.png

三、Kafka Stream案例

  • 需求分析,求单词个数(word count)

2-KafkaStream.png

1、添加配置文件信息

  • 找到kafka-demo的application.yml文件,在最下方添加自定义配置
kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

2、新增四个配置类

2.1 自定配置参数

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;

    /**
     * 重新定义默认的KafkaStreams配置属性,包括:
     * 1、服务器地址
     * 2、应用ID
     * 3、流消息的副本数等配置
     * @return
     */
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // 消息副本数量
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
        props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
        return new KafkaStreamsConfiguration(props);
    }
}

2.2 定义监听接口

/**
 * 流数据的监听消费者实现的接口类,系统自动会通过
 * KafkaStreamListenerFactory类扫描项目中实现该接口的类,
 * 并注册为流数据的消费端。
 *
 * 其中泛型可是KStream或KTable
 * @param <T>
 */
public interface KafkaStreamListener<T> {

    // 监听的类型
    String listenerTopic();
    // 处理结果发送的类
    String sendTopic();
    // 对象处理逻辑
    T getService(T stream);

}

2.3 KafkaStream自动处理包装类

/**
 * KafkaStream自动处理包装类
 */
public class KafkaStreamProcessor {

    // 流构建器
    StreamsBuilder streamsBuilder;
    private String type;
    KafkaStreamListener listener;

    public KafkaStreamProcessor(StreamsBuilder streamsBuilder,KafkaStreamListener kafkaStreamListener){
        this.streamsBuilder = streamsBuilder;
        this.listener = kafkaStreamListener;
        this.parseType();
        Assert.notNull(this.type,"Kafka Stream 监听器只支持kstream、ktable,当前类型是"+this.type);
    }

    /**
     * 通过泛型类型自动注册对应类型的流处理器对象
     * 支持KStream、KTable
     * @return
     */
    public Object doAction(){
        if("kstream".equals(this.type)) {
            KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            stream=(KStream)listener.getService(stream);
            stream.to(listener.sendTopic());
            return stream;
        }else{
            KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
            table = (KTable)listener.getService(table);
            table.toStream().to(listener.sendTopic());
            return table;
        }
    }

    /**
     * 解析传入listener类的泛型类
     */
    private void parseType(){
        Type[] types = listener.getClass().getGenericInterfaces();
        if(types!=null){
            for (int i = 0; i < types.length; i++) {
                if( types[i] instanceof ParameterizedType){
                    ParameterizedType t = (ParameterizedType)types[i];
                    String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
                    if(name.contains("org.apache.kafka.streams.kstream.kstream")||name.contains("org.apache.kafka.streams.kstream.ktable")){
                        this.type = name.substring(0,name.indexOf('<')).replace("org.apache.kafka.streams.kstream.","").trim();
                        break;
                    }
                }
            }
        }
    }
}

2.4 完成监听器的实际注册

  • KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类
@Component
public class KafkaStreamListenerFactory implements InitializingBean {

    Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);

    @Autowired
    DefaultListableBeanFactory defaultListableBeanFactory;

    /**
     * 初始化完成后自动调用
     */
    @Override
    public void afterPropertiesSet() {
        Map<String, KafkaStreamListener> map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
        for (String key : map.keySet()) {
            KafkaStreamListener k = map.get(key);
            KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
            String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;
            //注册baen,并且执行doAction方法
            defaultListableBeanFactory.registerSingleton(beanName,processor.doAction());
            logger.info("add kafka stream auto listener [{}]",beanName);
        }
    }
}

3、手动创建监听器

1.该类需要实现KafkaStreamListener接口
2.listenerTopic方法返回需要监听的topic(入口topic)
3.sendTopic方法返回需要处理完后发送的topic(出口topic)
4.getService方法,主要处理流数据


@Component
public class MyStreamListener implements KafkaStreamListener<KStream<String,String>> {


    @Override
    public String listenerTopic() {
        return "input_topic_001";
    }

    @Override
    public String sendTopic() {
        return "output_topic_001";
    }

    @Override
    public KStream<String, String> getService(KStream<String, String> kStream) {

        KStream<String, String> map = stream.flatMapValues(value->{
                System.out.println("第一阶段:value=>" +value);
                return Arrays.asList(value.split(" "));
        }).map((key,value)->{
                System.out.println("第二阶段:key==>"+key+",value=>"+value);
                return new KeyValue<>(value, null);
        }).groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count(Materialized.as("app" + UUID.randomUUID().toString()))
                .toStream()
                .map((key,value)->{
                        System.out.println("第三阶段:key===>"+key.key()+",value===>"+value.toString());
                        return new KeyValue<>(key.key(), value.toString());
                });
        return map;
    }
}

4、修改生产者代码


/**
 * Kafka主题生产者
 */
@RestController
public class ProducerController {

    @Autowired
    private KafkaTemplate kafkaTemplate;


    @GetMapping("/send/{key}/{value}")
    public String sendMsg(@PathVariable("key")String key, @PathVariable("value") String value){
        kafkaTemplate.send("in001",0, key, value).addCallback(new ListenableFutureCallback() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println(throwable.getMessage());
            }

            @Override
            public void onSuccess(Object o) {
                System.out.println(o);
            }
        });
        System.out.println("生产消息完毕了。。。");
        return "ok";
    }

}

5、修改消费者代码


/**
 * Kafka主题消费者监听器
 */
@Component
public class ConsumerListener {


    @KafkaListener(topics = "out001")
    public void pollMsg(ConsumerRecord<String,String> consumerRecord){
        Optional<ConsumerRecord<String, String>> optional = Optional.ofNullable(consumerRecord);
        optional.ifPresent(x-> System.out.println(x.key() + "===" + x.value()));
    }

}

6、测试

启动服务前,注意要先创建入口topic,否则流式计算程序启动会报错提示topic不存在!!!!

  • 登录kafka容器,创建topic

    /opt/kafka_2.12-2.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic inTopic  --partitions 1 --replication-factor 1
    
    /opt/kafka_2.12-2.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic outTopic  --partitions 1 --replication-factor 1
    
  • 查看topic

    /opt/kafka_2.12-2.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
    

启动微服务,正常发送消息,可以正常接收到消息

注意:如果出现Cannot determine JNI library name for ARCH='x86' OS='windows 10' name='rocksdb'这样的报错,查看jdk是否是32位的,如果是需要改成64位的,然后JAVA_HOME路径换成64位的JDK路径且IDEA中选择新的JDK路径。