当前位置:
首页
文章
前端
详情

spring集成kafka

1、引入依赖jar包

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、配置kafka信息

spring:
  kafka: 
    bootstrap-servers: localhost:9092
    consumer: 
      group-id: group1
    listener: 
      missing-topics-fatal: false

启动报错,需要配置missing-topics-fatal为false

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
    at com.zhoulp.SyncMessageWebApplication.main(SyncMessageWebApplication.java:24)
Caused by: java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:383)
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:144)
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 14 common frames omitted

3、实现生产者

package com.zhoulp.producer;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * 
 * @author zhoulp
 * @date   2020-08-03
 *
 */
@Component("kafkaProducer")
public class KafkaProducer {
    
    private static Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    
    @Inject
    private KafkaTemplate<String, String> template;
    
    public void sendMessage(String topic, String data) {
        log.info("send: topic = {}, data = {}", topic, data);
        template.send(topic, data);
    }

}

4、实现消费者

package com.zhoulp.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @author zhoulp
 * @date   2020-08-03
 *
 */
@Component("kafkaConsumer")
public class KafkaConsumer {
    
    private static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    
    @KafkaListener(topics = "topic1")
    public void listenTopic1(ConsumerRecord<String, String> consumerRecord) {
        log.info("listenTopic1");
        log.info(consumerRecord.toString());
        log.info(consumerRecord.topic());
        log.info(consumerRecord.value());
    }

}

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。