Spring Boot整合Kafka+SSE实现实时数据展示

2024年3月10日

知识积累

为什么使用Kafka?

不使用Rabbitmq或者Rocketmq是因为Kafka是Hadoop集群下的组成部分,对于大数据的相关开发适应性好,且当前业务场景下不需要使用死信队列,不过要注意Kafka对于更新时间慢的数据拉取也较慢,因此对与实时性要求高可以选择其他MQ。
使用消息队列是因为该中间件具有实时性,且可以作为广播进行消息分发。

为什么使用SSE?

使用Websocket传输信息的时候,会转成二进制数据,产生一定的时间损耗,SSE直接传输文本,不存在这个问题
由于Websocket是双向的,读取日志的时候,如果有人连接ws日志,会发送大量异常信息,会给使用段和日志段造成问题;SSE是单向的,不需要考虑这个问题,提高了安全性
另外就是SSE支持断线重连;Websocket协议本身并没有提供心跳机制,所以长时间没有数据发送时,会将这个连接断掉,因此需要手写心跳机制进行实现。
此外,由于是长连接的一个实现方式,所以SSE也可以替代Websocket实现扫码登陆(比如通过SSE的超时组件在实现二维码的超时功能,具体实现我可以整理一下)
另外,如果是普通项目,不需要过高的实时性,则不需要使用Websocket,使用SSE即可

代码实现

Java代码

pom.xml引入SSE和Kafka

<!-- SSE,一般springboot开发web应用的都有 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
<!-- kafka,最主要的是第一个,剩下两个是测试用的 --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency<groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>

application.properties增加Kafka配置信息

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

配置Kafka信息

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}}
配置controller,通过web方式开启效果
@RestController
@RequestMapping(path = "sse")
public class KafkaSSEController {private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;/*** @param message* @apiNote 发送信息到Kafka主题中*/@PostMapping("/send")public void sendMessage(@RequestBody String message) {kafkaTemplate.send("my-topic", message);}/*** 监听Kafka数据** @param message*/@KafkaListener(topics = "my-topic", groupId = "my-group-id")public void consume(String message) {System.out.println("Received message: " + message);}/*** 连接sse服务** @param id* @return* @throws IOException*/@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})public SseEmitter push(@RequestParam("id") String id) throws IOException {// 超时时间设置为5分钟,用于演示客户端自动重连SseEmitter sseEmitter = new SseEmitter(5_60_000L);// 设置前端的重试时间为1s// send(): 发送数据,如果传入的是一个非SseEventBuilder对象,那么传递参数会被封装到 data 中sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));sseCache.put(id, sseEmitter);System.out.println("add " + id);sseEmitter.send("你好", MediaType.APPLICATION_JSON);SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈");sseEmitter.send(data);// onTimeout(): 超时回调触发sseEmitter.onTimeout(() -> {System.out.println(id + "超时");sseCache.remove(id);});// onCompletion(): 结束之后的回调触发sseEmitter.onCompletion(() -> System.out.println("完成!!!"));return sseEmitter;}/*** http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa* @param id* @param content* @return* @throws IOException*/@ResponseBody@GetMapping(path = "push")public String push(String id, String content) throws IOException {SseEmitter sseEmitter = sseCache.get(id);if (sseEmitter != null) {sseEmitter.send(content);}return "over";}@ResponseBody@GetMapping(path = "over")public String over(String id) {SseEmitter sseEmitter = sseCache.get(id);if (sseEmitter != null) {// complete(): 表示执行完毕,会断开连接sseEmitter.complete();sseCache.remove(id);}return "over";}}

前端方式

<html><head><script>console.log('start')const clientId = "your_client_id_x"; // 设置客户端IDconst eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 订阅服务器端的SSEeventSource.onmessage = event => {console.log(event.data)const message = JSON.parse(event.data);console.log(`Received message from server: ${message}`);};// 发送消息给服务器端 可通过 postman 调用,所以下面 sendMessage() 调用被注释掉了function sendMessage() {const message = "hello sse";fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {method: "POST",headers: { "Content-Type": "application/json" },body: JSON.stringify(message)});console.log('dddd'+JSON.stringify(message))}// sendMessage()</script></head>
</html>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/590061.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【WebSocket】快速入门 springboot中使用

WebSocket 介绍 WebSocket缺点&#xff1a; 服务器长期维护长连接需要一定的成本 各个浏览器支持程度不一 WebSocket 是长连接&#xff0c;受网络限制比较大&#xff0c;需要处理好重连 结论&#xff1a; WebSocket并不能完全取代HTTP&#xff0c;它只适合在特定的场景下使用…

C++ 2024-4-2 作业

1.模板类实现顺序栈 #include <iostream> #define MAX 8 using namespace std; template<typename T> class stack {T data[MAX];int top; public:stack():top(-1){}bool empty_stack();bool full_stack();void push_stack(T data);void pop_stack();void show();…

Phpstorm配置Xdebug

步骤 1、先去官网找到对应的php xdebug的版本 2、配置phpstorm断点调试 网址&#xff1a;https://xdebug.org/ 查看php对应的xdebug版本&#xff1a;Xdebug: Support — Tailored Installation Instructions 1.1查看对应php xdebug版本 全选&#xff0c;复制到目标网址 我…

Flutter中setState函数的使用注意事项

文章目录 Flutter中setState函数的使用注意事项只能在具有State对象的类中使用不要在build方法中使用将状态更新逻辑放在setState方法内部避免频繁调用使用回调函数更新状态 Flutter中setState函数的使用注意事项 setState()函数是Flutter中非常重要的一个函数&#xff0c;它用…

antd/x6-graph——实现流程图绘制功能——技能提升

效果图&#xff1a; 解决步骤1&#xff1a;安装"antv/x6": "^1.35.0" npm install antv/x61.35.0安装指定版本的antv/x6插件 解决步骤2&#xff1a;配置tools文件 在assets/js中新增一个graphTools.js文件 内容如下&#xff1a; /* antv x6图谱相关…

基于深度学习的吸烟检测系统(网页版+YOLOv8/v7/v6/v5代码+训练数据集)

摘要&#xff1a;本文深入研究了基于YOLOv8/v7/v6/v5等深度学习模型的吸烟行为检测系统&#xff0c;核心采用YOLOv8并整合了YOLOv7、YOLOv6、YOLOv5算法&#xff0c;进行性能指标对比&#xff1b;详述了国内外研究现状、数据集处理、算法原理、模型构建与训练代码&#xff0c;及…

目标跟踪——行人车辆数据集

一、重要性及意义 首先&#xff0c;目标跟踪对于个人和组织的目标实现至关重要。无论是个人职业发展、企业业务增长还是政府的社会发展&#xff0c;目标跟踪都能够帮助我们明确目标&#xff0c;并将其分解为可行的步骤和时间表。这有助于我们保持动力和专注&#xff0c;提高效…

WPF文本框TextEdit不以科学计数法显示

WPF文本框TextEdit不以科学计数法显示 一个float或者double类型的数值&#xff0c;如果小数点后0的个数≥4&#xff0c;在界面上就会自动以科学计数法显示&#xff0c; 比如&#xff1a;0.00003会显示成这样 但是很多时候我并不希望它这样显示&#xff0c;因为这样不方便编辑…

js手持小风扇

文章目录 1. 演示效果2. 分析思路3. 代码实现 1. 演示效果 2. 分析思路 先编写动画&#xff0c;让风扇先转起来。使用 js 控制动画的持续时间。监听按钮的点击事件&#xff0c;在事件中修改元素的animation-duration属性。 3. 代码实现 <!DOCTYPE html> <html lang…

[计算机效率] 格式转换工具:格式工厂

3.14 格式转换工具&#xff1a;格式工厂 格式工厂是一款功能强大的多媒体格式转换软件&#xff0c;可以实现音频、视频、图片等多种格式的转换。它支持几乎所有类型的多媒体格式&#xff0c;包括视频、音频、图片、字幕等&#xff0c;可以轻松实现格式之间的转换&#xff0c;并…

Python基础之pandas:字符串操作与透视表

文章目录 一、字符串操作备注&#xff1a;如果想要全部行都能输出&#xff0c;可输入如下代码 1、字符检索2、字符转换3、字符类型判断4、字符调整5、字符对齐与填充6、字符检索7、字符切割8、字符整理 二、透视表1、pd.pivot_table2、多级透视表 一、字符串操作 备注&#xf…

Flask Python:数据库多条件查询,flask中模型关联

前言 在上一篇Flask Python:模糊查询filter和filter_by&#xff0c;数据库多条件查询中&#xff0c;已经分享了几种常用的数据库操作&#xff0c;这次就来看看模型的关联关系是怎么定义的&#xff0c;先说基础的关联哈。在分享之前&#xff0c;先分享官方文档,点击查看 从文档…

2024阿里云老用户服务器优惠价格99元和199元

阿里云服务器租用价格表2024年最新&#xff0c;云服务器ECS经济型e实例2核2G、3M固定带宽99元一年&#xff0c;轻量应用服务器2核2G3M带宽轻量服务器一年61元&#xff0c;ECS u1服务器2核4G5M固定带宽199元一年&#xff0c;2核4G4M带宽轻量服务器一年165元12个月&#xff0c;2核…

JAX深度学习库入门

JAX简介 https://www.bilibili.com/video/BV1Sb4y1b7rK/?spm_id_from333.999.0.0&vd_sourceb2549fdee562c700f2b1f3f49065201b JAX is NumPy wiht Autograd , XLA and Composable (function) transformations, brought together for high-performance machine learning …

HarmonyOS NEXT应用开发之状态管理优秀实践

为了帮助应用程序开发人员提高其应用程序质量&#xff0c;特别是在高效的状态管理方面。本章节面向开发者提供了多个在开发ArkUI应用中常见的低效开发的场景&#xff0c;并给出了对应的解决方案。此外&#xff0c;还提供了同一场景下&#xff0c;推荐用法和不推荐用法的对比和解…

八、从0开始卷出一个新项目之瑞萨RZN2L 3.1.7 debug调试和下载

目录 3.1.7 debug调试和下载 3.1.7.1 官方介绍 3.1.7.2 e2studio debug变量实时监控 3.1.7.3 Iar debug变量实时监控 3.1.7.4 debug经验总结 八、从0开始卷出一个新项目之瑞萨RZN2L 3.1.7 debug调试和下载 3.1.7 debug调试和下载 3.1.7.1 官方介绍 官网&#xff1a; d…

MySQL执行流程

MySQL执行流程 在使用MySQL时&#xff0c;你是否有疑惑&#xff0c;当我们提交一条SQL给MySQL时它到底是如何执行的&#xff1f; 通过了解MySQL的执行流程一定能解开你的疑惑&#x1f914; 总体流程 客户端通过连接器连接MySQL查询执行缓存解析器解析SQL执行器执行SQL调用存…

Transformer模型-用jupyter演示逐步计算attention

学习transformer模型-用jupyter演示如何计算attention&#xff0c;不含multi-head attention&#xff0c;但包括权重矩阵W。 input embedding&#xff1a;文本嵌入 每个字符用长度为5的向量表示&#xff1a; 注意力公式&#xff1a; 1&#xff0c;准备Q K V&#xff1a; 先 生…

车载通信与DDS标准解读系列(4):DDSI-RTPS协议

▎什么是RTPS 在DDS协议中&#xff0c;主要描述了实现数据分发服务的DCPS模型和QoS策略&#xff0c;但是我们还不清楚数据怎样在网络中传输&#xff0c;想要了解这些内容&#xff0c;就需要请出咱们的数据搬运工——RTPS。 RTPS全称是Real-Time Publish-Subscribe Protocol&a…

item_get_desc-获得淘宝商品描述api接口:如何通过接口获取商品信息、订单信息、物流信息可以用于数据分析、商品推荐、行业研究等领域

在当今电商繁荣的时代&#xff0c;淘宝作为中国最大的电商平台之一&#xff0c;拥有海量的商品信息。然而&#xff0c;如何高效地获取并利用这些信息&#xff0c;对于商家和开发者来说都至关重要。幸运的是&#xff0c;淘宝开放平台提供了丰富的API接口&#xff0c;其中包括用于…