发布任务
java /**
* 添加文章审核任务到延迟队列中
*
* @param id
* @param publicTime
*/
@Override
@Async // 开启异步调用
public void addNewToTask(Integer id, Date publicTime) {
log.info("添加任务到延迟队列中-----begin");
Task task = new Task();
task.setExecuteTime(publicTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));
scheduleClient.addTask(task);
log.info("添加任务到延迟队列中-----end");
}
消费任务
java /**
* 消费任务审核文章
*/
@Scheduled(fixedRate = 1000) // 每秒一次
@Override
public void scanNewsByTask() {
log.info("消费任务-----审核文章");
ResponseResult responseResult = scheduleClient.pull(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if (responseResult.getCode().equals(200) && responseResult.getData() != null){
String jsonString = JSON.toJSONString(responseResult.getData());
Task task = JSON.parseObject(jsonString, Task.class);
WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);
wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
}
}

安装


shelldocker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.42.10 \ --env KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.42.10:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1
Kafka快速入门
producer
javapackage com.heima.kafka.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 生产者
*/
public class ProducerQuickStart {
public static void main(String[] args) {
// kafka连接配置
Properties prop = new Properties();
// kafka连接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.42.10:9092");
// kafka key和value序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 创建kafka生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);
// 发送消息
/**
* 第一个参数 topic
* 第二 key
* 第三 value
*/
ProducerRecord<String,String> message = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
producer.send(message);
// 关闭消息通道
// 必须关闭否则发送不成功
producer.close();
}
}
consumer
javapackage com.heima.kafka.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerQuickStart {
public static void main(String[] args) {
// kafka连接配置
Properties prop = new Properties();
// kafka连接地址
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.42.10:9092");
// 指定消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");
// 指定反序列化
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
// 创建kafka消费者对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(prop);
// 订阅主题
consumer.subscribe(Collections.singletonList("topic-first"));
// 拉取消息
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
System.out.println(stringStringConsumerRecord.key());
System.out.println(stringStringConsumerRecord.value());
}
}
}
}
kafka分区机制



kafka高可用机制

备份机制

生产者发送类型(同步发送


ack确认机制

消费者详情
消息有序性




编写生产者向kafka发送消息
java /**
* 文章的上下架
*
* @param dto
* @return
*/
@Override
public ResponseResult downOrUp(WmNewsDto dto) {
// 1.检查参数
if (dto.getId() == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
// 2.查询文章
WmNews wmNews = getById(dto.getId());
if (wmNews == null){
return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在");
}
// 3.判断文章是否已发布
if (!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态不能上下架");
}
// 4.修改文章
if (dto.getEnable()!=null&&dto.getEnable() > -1&&dto.getEnable()<2 ){
update(Wrappers.<WmNews>lambdaUpdate().set(WmNews::getEnable,dto.getEnable())
.eq(WmNews::getId,dto.getId()));
}
// 通知article下架 kafka生产者发送消息
if (wmNews.getArticleId()!=null){
Map<String,Object> map = new HashMap<>();
map.put("articleId",wmNews.getArticleId());
map.put("enable",dto.getEnable());
kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));
}
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
app端消费kafka的消息上下架文章
java @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onArticleDown(String message) {
if (StringUtils.isNotBlank(message)) {
Map map = JSON.parseObject(message, Map.class);
apArticleConfigService.updateByMap(map);
}
}

搭建elasticsearch环境

docker run -id --name elasticsearch -d --restart=always -p 9200:9200 -p 9300:9300 -v /usr/share/elasticsearch/plugins:/usr/share/elasticsearch/plugins -e "discovery.type=single-node" elasticsearch:7.4.0
创建索引库

{ "mappings":{ "properties":{ "id":{ "type":"long" }, "publishTime":{ "type":"date" }, "layout":{ "type":"integer" }, "images":{ "type":"keyword", "index": false }, "staticUrl":{ "type":"keyword", "index": false }, "authorId": { "type": "long" }, "authorName": { "type": "text" }, "title":{ "type":"text", "analyzer":"ik_smart" }, "content":{ "type":"text", "analyzer":"ik_smart" } } } }
导入leadnews-search到项目中 nacos中添加配置
yamlspring:
autoconfigure:
exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
elasticsearch:
host: 192.168.42.10
port: 9200
es查询接口
java /**
* es文章分页检索
*
* @param dto
* @return
*/
@Override
public ResponseResult search(UserSearchDto dto) throws IOException {
// 1.检查参数
if (dto == null|| StringUtils.isBlank(dto.getSearchWords())) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
// 2.设置查询条件
// 关键字分词之后查询
SearchRequest searchRequest = new SearchRequest("app_info_article");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 布尔查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(dto.getSearchWords()).field("title").field("content")
.defaultOperator(Operator.OR);
boolQueryBuilder.must(queryStringQueryBuilder);
// 查询小于mindata之后的数据
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("publishTime").lt(dto.getMinBehotTime().getTime());
boolQueryBuilder.filter(rangeQueryBuilder);
// 按照发布时间
searchSourceBuilder.sort("publishTime", SortOrder.DESC);
// 关键字高亮
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title");
highlightBuilder.preTags("<font style='color: red; font-size: inherit;'>");
highlightBuilder.postTags("</font>");
searchSourceBuilder.highlighter(highlightBuilder);
// 设置分页
searchSourceBuilder.from(0)
.size(dto.getPageSize());
searchSourceBuilder.query(boolQueryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 3.结果封装返回
SearchHit[] hits = searchResponse.getHits().getHits();
List<Map> resultList = new ArrayList<>();
for (SearchHit hit : hits) {
String JSONString = hit.getSourceAsString();
Map map = JSON.parseObject(JSONString, Map.class);
// 处理高亮
if (hit.getHighlightFields()!=null && hit.getHighlightFields().size()>0) {
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
Text[] titles = highlightFields.get("title").getFragments();
String title = StringUtils.join(titles);
// 高亮标题
map.put("h_title", title);
}else{
// 原始标题
map.put("h_title", map.get("title"));
}
resultList.add(map);
}
return ResponseResult.okResult(resultList);
}
创建索引接口
本文作者:钱小杰
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!