编辑
2025-12-19
黑马头条
00

目录

1.发布文章自动添加/消费任务
2.Kafka
3.springboot集成kafka
3.自媒体文章上下架
4.elasticsearch实现App端搜索

1.发布文章自动添加/消费任务

发布任务

java
/** * 添加文章审核任务到延迟队列中 * * @param id * @param publicTime */ @Override @Async // 开启异步调用 public void addNewToTask(Integer id, Date publicTime) { log.info("添加任务到延迟队列中-----begin"); Task task = new Task(); task.setExecuteTime(publicTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任务到延迟队列中-----end"); }

消费任务

java
/** * 消费任务审核文章 */ @Scheduled(fixedRate = 1000) // 每秒一次 @Override public void scanNewsByTask() { log.info("消费任务-----审核文章"); ResponseResult responseResult = scheduleClient.pull(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if (responseResult.getCode().equals(200) && responseResult.getData() != null){ String jsonString = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(jsonString, Task.class); WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); } }

2.Kafka

image.png

安装

image.png

image.png

shell
docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.42.10 \ --env KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.42.10:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1

Kafka快速入门

producer

java
package com.heima.kafka.sample; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) { // kafka连接配置 Properties prop = new Properties(); // kafka连接地址 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.42.10:9092"); // kafka key和value序列化 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); // 创建kafka生产者对象 KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop); // 发送消息 /** * 第一个参数 topic * 第二 key * 第三 value */ ProducerRecord<String,String> message = new ProducerRecord<String,String>("topic-first","key-001","hello kafka"); producer.send(message); // 关闭消息通道 // 必须关闭否则发送不成功 producer.close(); } }

consumer

java
package com.heima.kafka.sample; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class ConsumerQuickStart { public static void main(String[] args) { // kafka连接配置 Properties prop = new Properties(); // kafka连接地址 prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.42.10:9092"); // 指定消费者组 prop.put(ConsumerConfig.GROUP_ID_CONFIG,"group2"); // 指定反序列化 prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); // 创建kafka消费者对象 KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop); // 订阅主题 consumer.subscribe(Collections.singletonList("topic-first")); // 拉取消息 while (true) { ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) { System.out.println(stringStringConsumerRecord.key()); System.out.println(stringStringConsumerRecord.value()); } } } }

kafka分区机制

image.png

image.png

image.png

kafka高可用机制

image.png

备份机制

image.png

生产者发送类型(同步发送

image.png

image.png

ack确认机制

image.png

消费者详情

消息有序性

image.png

3.springboot集成kafka

image.png

image.png

3.自媒体文章上下架

image.png

编写生产者向kafka发送消息

java
/** * 文章的上下架 * * @param dto * @return */ @Override public ResponseResult downOrUp(WmNewsDto dto) { // 1.检查参数 if (dto.getId() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } // 2.查询文章 WmNews wmNews = getById(dto.getId()); if (wmNews == null){ return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在"); } // 3.判断文章是否已发布 if (!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态不能上下架"); } // 4.修改文章 if (dto.getEnable()!=null&&dto.getEnable() > -1&&dto.getEnable()<2 ){ update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable()) .eq(WmNews::getId,dto.getId())); } // 通知article下架 kafka生产者发送消息 if (wmNews.getArticleId()!=null){ Map<String,Object> map = new HashMap<>(); map.put("articleId",wmNews.getArticleId()); map.put("enable",dto.getEnable()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }

app端消费kafka的消息上下架文章

java
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void onArticleDown(String message) { if (StringUtils.isNotBlank(message)) { Map map = JSON.parseObject(message, Map.class); apArticleConfigService.updateByMap(map); } }

4.elasticsearch实现App端搜索

image.png

搭建elasticsearch环境

image.png

docker run -id --name elasticsearch -d --restart=always -p 9200:9200 -p 9300:9300 -v /usr/share/elasticsearch/plugins:/usr/share/elasticsearch/plugins -e "discovery.type=single-node" elasticsearch:7.4.0 创建索引库

image.png

{ "mappings":{ "properties":{ "id":{ "type":"long" }, "publishTime":{ "type":"date" }, "layout":{ "type":"integer" }, "images":{ "type":"keyword", "index": false }, "staticUrl":{ "type":"keyword", "index": false }, "authorId": { "type": "long" }, "authorName": { "type": "text" }, "title":{ "type":"text", "analyzer":"ik_smart" }, "content":{ "type":"text", "analyzer":"ik_smart" } } } }

导入leadnews-search到项目中 nacos中添加配置

yaml
spring: autoconfigure: exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration elasticsearch: host: 192.168.42.10 port: 9200

es查询接口

java
/** * es文章分页检索 * * @param dto * @return */ @Override public ResponseResult search(UserSearchDto dto) throws IOException { // 1.检查参数 if (dto == null|| StringUtils.isBlank(dto.getSearchWords())) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } // 2.设置查询条件 // 关键字分词之后查询 SearchRequest searchRequest = new SearchRequest("app_info_article"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 布尔查询 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(dto.getSearchWords()).field("title").field("content") .defaultOperator(Operator.OR); boolQueryBuilder.must(queryStringQueryBuilder); // 查询小于mindata之后的数据 RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("publishTime").lt(dto.getMinBehotTime().getTime()); boolQueryBuilder.filter(rangeQueryBuilder); // 按照发布时间 searchSourceBuilder.sort("publishTime", SortOrder.DESC); // 关键字高亮 HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.field("title"); highlightBuilder.preTags("<font style='color: red; font-size: inherit;'>"); highlightBuilder.postTags("</font>"); searchSourceBuilder.highlighter(highlightBuilder); // 设置分页 searchSourceBuilder.from(0) .size(dto.getPageSize()); searchSourceBuilder.query(boolQueryBuilder); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); // 3.结果封装返回 SearchHit[] hits = searchResponse.getHits().getHits(); List<Map> resultList = new ArrayList<>(); for (SearchHit hit : hits) { String JSONString = hit.getSourceAsString(); Map map = JSON.parseObject(JSONString, Map.class); // 处理高亮 if (hit.getHighlightFields()!=null && hit.getHighlightFields().size()>0) { Map<String, HighlightField> highlightFields = hit.getHighlightFields(); Text[] titles = highlightFields.get("title").getFragments(); String title = StringUtils.join(titles); // 高亮标题 map.put("h_title", title); }else{ // 原始标题 map.put("h_title", map.get("title")); } resultList.add(map); } return ResponseResult.okResult(resultList); }

创建索引接口

本文作者:钱小杰

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!