博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spring整合kafka项目生产和消费测试结果记录(一)
阅读量:6350 次
发布时间:2019-06-22

本文共 4928 字,大约阅读时间需要 16 分钟。

使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者。

通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的接口,生产者将json数据发送到kafka集群,

消费者监听到kafka集群中的消息就开始消费,并将json解析成对象存到MySQL数据库。

下面是使用JMeter测试工具模拟100个并发的线程设置截图:

请求所发送的数据:

 

下面是100个用户10000个请求的聚合报告:

 

下面是生产者截图生产完10000条消息的时间截图:

 

下面是消费者项目消费入库的结束时间截图:

 

可见,10000条消息从生产完成到入库(消费完10000条消息的时间只是比生产完成的时间落后了几十秒,但是消费端真正入库完成所需要的时间很长)完成时间相差了10几分钟。

 

下面是MySQL数据库截图,数据全部入库成功:

下面是消息对应的POJO:

1 package com.xuebusi.pojo; 2  3 public class TbPerson { 4     private Long id; 5  6     private String name; 7  8     private Integer age; 9 10     public Long getId() {11         return id;12     }13 14     public void setId(Long id) {15         this.id = id;16     }17 18     public String getName() {19         return name;20     }21 22     public void setName(String name) {23         this.name = name == null ? null : name.trim();24     }25 26     public Integer getAge() {27         return age;28     }29 30     public void setAge(Integer age) {31         this.age = age;32     }33 34     @Override35     public String toString() {36         return "TbPerson [id=" + id + ", name=" + name + ", age=" + age + "]";37     }38 }

 

下面是生产端的逻辑:

1 package com.xuebusi.controller; 2  3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.KafkaService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.stereotype.Controller; 9 import org.springframework.web.bind.annotation.RequestBody;10 import org.springframework.web.bind.annotation.RequestMapping;11 import org.springframework.web.bind.annotation.RequestMethod;12 import org.springframework.web.bind.annotation.ResponseBody;13 14 import javax.annotation.Resource;15 16 @Controller17 @RequestMapping("/producer")18 public class KafkaController {19 20     private static final Logger logger = LoggerFactory.getLogger(KafkaController.class);21 22     @Resource23     private KafkaService kafkaService;24 25     /**26      * 发消息到ssmk这个topic27      * @param person28      * @return29      */30     @RequestMapping(value = "/person", method = RequestMethod.POST)31     @ResponseBody32     public String createPerson(@RequestBody TbPerson person) {33         if (person == null){34             return "fail, data can not be null.";35         }36         String json = JSON.toJSONString(person);37         boolean result = kafkaService.sendInfo("ssmk", json);38         logger.info("生产者发送消息[" + result + "]:" + json);39         return "success";40     }41 }

 

下面是消费端的逻辑:

1 package com.xuebusi.consumer; 2  3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.PersonService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.stereotype.Service;10 11 import java.util.List;12 import java.util.Map;13 14 @Service15 public class KafkaConsumerService {16     private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);17 18     @Autowired19     private PersonService personService;20 21     public void processMessage(Map
> msgs) {22 /*for (Map.Entry
> entry : msgs.entrySet()) {23 String topic = entry.getKey();24 Map
value = entry.getValue();25 for (Map.Entry
entrySet : value.entrySet()) {26 Integer partiton = entrySet.getKey();27 String msg = entrySet.getValue();28 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);29 logger.info("=======使用JSON解析对象=========");30 TbPerson person = JSON.parseObject(msg, TbPerson.class);31 logger.info("=======对象开始入库=========");32 personService.insert(person);33 logger.info("=======对象入库成功=========");34 }35 }*/36 37 for (Map.Entry
> entry : msgs.entrySet()) {38 String topic = entry.getKey();39 Map
value = entry.getValue();40 for (Map.Entry
entrySet : value.entrySet()) {41 Integer partiton = entrySet.getKey();42 String msg = entrySet.getValue();43 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);44 msg = "[" + msg + "]";//注意这里要在前后都加上中括号,否则下面在解析json成对象的时候会报json格式不对的异常(spring会对多条json数据用逗号分隔)45 logger.info("=======使用JSON解析对象=========");46 List
personList = JSON.parseArray(msg, TbPerson.class);47 //TbPerson person = JSON.parseObject(msg, TbPerson.class);48 if (personList != null && personList.size() > 0) {49 logger.info("消息中包含[" + personList.size() + "]个对象");50 for (TbPerson person : personList) {51 logger.info("=======对象开始入库=========");52 personService.insert(person);53 logger.info("=======对象入库成功=========");54 }55 }56 57 }58 }59 }60 }

 

如果觉得本文对您有帮助,不妨扫描下方微信二维码打赏点,您的鼓励是我前进最大的动力:

 

转载地址:http://ayvla.baihongyu.com/

你可能感兴趣的文章
ngrok
查看>>
ThinkPHP 模板变量输出
查看>>
android系统信息(内存、cpu、sd卡、电量、版本)获取
查看>>
HTML5、WebKit与移动应用开发
查看>>
面google的试题,对google面试题的衍生推导
查看>>
Eclipse Debug Android Native Application
查看>>
java动态代理
查看>>
node.js原型继承
查看>>
揭露让Linux与Windows隔阂消失的奥秘(1)
查看>>
我的友情链接
查看>>
Mysql备份和恢复策略
查看>>
linux17-邮件服务器
查看>>
AS开发JNI步骤
查看>>
Android NDK开发:JNI基础篇
查看>>
使用Maven命令快速建立项目结构
查看>>
类的生命周期
查看>>
android中在代码中设置margin属性
查看>>
SQL开发利器SQL Prompt
查看>>
二分查找,php
查看>>
必看的 jQuery性能优化的38个建议 (转)
查看>>