kafka+spring cloud stream 发送接收消息

news/2025/2/23 11:03:08

方案 1:使用旧版 @StreamListener(适用于 Spring Cloud Stream <= 2.x)

1. 添加依赖(pom.xml

<!-- Spring Cloud Stream + Kafka Binder -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2. 定义消息通道接口

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MyChannels {
    String INPUT = "myInput";

    @Input(INPUT)
    SubscribableChannel input();
}

3. 使用 @StreamListener 监听

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
@EnableBinding(MyChannels.class) // 绑定消息通道
public class KafkaStreamListener {

    @StreamListener(MyChannels.INPUT)
    public void handleMessage(String message) {
        System.out.println("Received via @StreamListener: " + message);
    }
}

4. 配置 application.properties

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定输入通道到 Kafka Topic
spring.cloud.stream.bindings.myInput.destination=my-topic
spring.cloud.stream.bindings.myInput.group=my-group
spring.cloud.stream.bindings.myInput.content-type=text/plain

方案 2:新版函数式编程模型(推荐,Spring Cloud Stream >= 3.x)

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class KafkaStreamListener {

    @Bean
    public Consumer<String> myInput() {
        return message -> {
            System.out.println("Received via Function: " + message);
        };
    }
}

# Kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092

# 绑定函数到 Kafka Topic
spring.cloud.stream.bindings.myInput-in-0.destination=my-topic
spring.cloud.stream.bindings.myInput-in-0.group=my-group
spring.cloud.stream.bindings.myInput-in-0.content-type=text/plain

生产者代码示例(发送消息)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendMessage(String topic, String message) {
        streamBridge.send(topic, message);
    }
}

测试步骤

  1. 启动 Kafka:确保 Kafka 和 Zookeeper 服务运行。

  2. 创建 Topic

    kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

  3. 发送消息

    kafkaProducer.sendMessage("my-topic", "Hello Kafka Stream!");

  4. 查看消费者日志

    Received via @StreamListener: Hello Kafka Stream! // 或 Received via Function: Hello Kafka Stream!


常见问题

  1. 版本兼容性

    • Spring Cloud Stream 3.x 后需使用函数式编程。

    • 检查 Spring Boot 版本与 Spring Cloud Stream 的匹配关系(如 Spring Boot 2.6.x + Spring Cloud 2021.x)。

  2. 绑定配置

    • 函数式模型中,绑定名称格式为 <functionName>-in-<index>(如 myInput-in-0)。

  3. 序列化配置

    • 若传递 JSON 对象,需配置 content-type=application/json 并添加 Jackson 依赖。


总结

  • 旧版:使用 @StreamListener + 通道接口(适合遗留代码升级)。

  • 新版:推荐函数式编程模型(更简洁,符合现代 Spring 设计)。

  • 根据实际 Spring Cloud Stream 版本选择方案!


http://www.niftyadmin.cn/n/5863354.html

相关文章

leetcode 题目解析 第3题 无重复字符的最长子串

给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长 子串的长度。 示例 1: 输入: s “abcabcbb” 输出: 3 解释: 因为无重复字符的最长子串是 “abc”&#xff0c;所以其长度为 3。 示例 2: 输入: s “bbbbb” 输出: 1 解释: 因为无重复字符的最长子串是 “b”…

C# 从基础神经元到实现在0~9数字识别

训练图片:mnist160 测试结果:1000次训练学习率为0.1时,准确率在60%以上 学习的图片越多&#xff0c;训练的时候越长(比如把 epochs*10 10000或更高时)效果越好 using System; using System.Collections.Generic; using System.Drawing; using System.IO; using System.Windo…

SAP S4HANA Administration (Mark Mergaerts Bert Vanstechelman)

SAP S4HANA Administration (Mark Mergaerts Bert Vanstechelman)

spring中事务为什么会回滚?什么原理?

事务回滚是保证数据一致性的关键机制&#xff0c;但如果事务回滚失效&#xff0c;可能会导致数据不一致的问题。我会用简单易懂的方式来讲解&#xff0c;帮助你理解事务回滚失效的常见原因及解决方法。 1. 什么是Spring事务回滚&#xff1f; 在Spring中&#xff0c;事务管理是…

中兴B863AV3.2-T/B863AV3.1-T2/B863AV3.1-T2K_电信高安_S905L3A-B_安卓9.0_线刷固件包

中兴B863AV3.2-T&#xff0f;B863AV3.1-T2&#xff0f;B863AV3.1-T2K_电信高安_S905L3A-B_安卓9.0_线刷固件包 B863AV3.2-T B863AV3.1-T2 已知可通刷贵州、江苏、贵州、北京、河南、陕西等省份。 线刷方法&#xff1a;&#xff08;新手参考借鉴一下&#xff09; 1、准备好一…

区块链相关方法-波士顿矩阵 (BCG Matrix)

波士顿矩阵&#xff08;BCG Matrix&#xff09;&#xff0c;又称市场增长率 - 相对市场份额矩阵、波士顿咨询集团法、四象限分析法、产品系列结构管理法等&#xff0c;由美国著名的管理学家、波士顿咨询公司创始人布鲁斯・亨德森于 1970 年首创1。以下是关于波士顿矩阵的详细介…

DeepSeek-R1之三_基于RAGFlowAI托管平台在Docker中部署搭建本地AI知识库

DeepSeekR1之三_基于RAGFlowAI托管平台在Docker中部署搭建本地AI知识库 文章目录 DeepSeekR1之三_基于RAGFlowAI托管平台在Docker中部署搭建本地AI知识库1. RAGFlow是什么1. 主要功能1. **"Quality in, quality out"**2. &#x1f371; **基于模板的文本切片**3. &am…

千峰React:函数组件使用(2)

前面写了三千字没保存&#xff0c;恨&#xff01; 批量渲染 function App() {const list [{id:0,text:aaaa},{id:1,text:bbbb},{id:2,text:cccc}]// for (let i 0; i < list.length; i) {// list[i] <li>{list[i]}</li>// }return (<div><…