type
status
date
slug
summary
tags
category
icon
password
Property
Dec 2, 2022 12:05 PM
案例简介
本文借助大数据技术简单实现了搜索功能
具体效果:
案例目的
- 尝试玩一下搜索功能的实现
- 折腾点东西,调解下大三的无聊生活
- 如果看过《数学之美》,你应该有兴趣玩一下
系统概览
本次实践对应的一些工程文件与配置文件也存放在了github:
任务分解
- 搭建ELK的docker环境
- 数据处理
- 编写Kafka脚本,模拟数据爬取
- 配置Logstash,并测试Elasticsearch是否接收到数据
- 编写前端应用与了解Elasticsearch
实验环境
Spark: 2.1.0
Kafka: 2.11-0.10.2.1
Elasticsearch: 5.6.0
Logstash: 5.6.0
Kibana: 5.6.0
Flask : 2.0.1
Flask-Bootstrap: 3.3.7.1
flask-paginate: 0.8.1
Flask-WTF: 0.15.1
实验步骤
1. 搭建ELK的docker环境
将原本的docker-compose.yml替换如下
version: '2' services: hbase-master: image: zhouxianghui/hbase-base:1.0.2 container_name: hbase-master environment: - SET_CONTAINER_TIMEZONE=true - CONTAINER_TIMEZONE=Asia/Shanghai ports: - "50070:50070" - "8088:8088" - "8080:8080" - "8081:8081" - "8042:8042" - "16010:16010" - "16000:16000" - "9090:9090" - "2181:2181" - "16020:16020" - "9000:9000" - "9092:9092" - "7077:7077" - "4040:4040" - "5000:5000" - "7777:22" volumes: - "./volume/hadoop/work/master:/works" - "./volume/hadoop/logs/master:/root/hadoop/logs/" - "./volume/spark/logs/master:/root/spark/logs/" - "./volume/hbase/master:/hworks/" - "./volume/hbase/logs/master:/root/hbase/logs/" - "./volume/code:/code" - "./volume/kafka/hbase-master/server.properties/:/root/kafka/config/server.properties" - "/etc/localtime:/etc/localtime" hostname: hbase-master.hadoop-docker networks: hadoop-docker: aliases: - hbase-master tty: true hbase-slave1: image: zhouxianghui/hbase-base:1.0.2 container_name: hbase-slave1 environment: - TZ=Asia/Shanghai volumes: - "./volume/hadoop/work/slave1:/works" - "./volume/hadoop/logs/slave1:/root/hadoop/logs/" - "./volume/spark/logs/slave1:/root/spark/logs/" - "./volume/hbase/slave1:/hworks/" - "./volume/hbase/logs/slave1:/root/hbase/logs/" - "./volume/kafka/hbase-slave1/server.properties/:/root/kafka/config/server.properties" - "/etc/localtime:/etc/localtime" hostname: hbase-slave1.hadoop-docker networks: hadoop-docker: aliases: - hbase-slave1 tty: true hbase-slave2: image: zhouxianghui/hbase-base:1.0.2 container_name: hbase-slave2 environment: - TZ=Asia/Shanghai volumes: - "./volume/hadoop/work/slave2:/works" - "./volume/hadoop/logs/slave2:/root/hadoop/logs/" - "./volume/spark/logs/slave2:/root/spark/logs/" - "./volume/hbase/slave2:/hworks/" - "./volume/hbase/logs/slave2:/root/hbase/logs/" - "./volume/kafka/hbase-slave2/server.properties/:/root/kafka/config/server.properties" - "/etc/localtime:/etc/localtime" hostname: hbase-slave2.hadoop-docker networks: hadoop-docker: aliases: - hbase-slave2 tty: true hbase-slave3: image: zhouxianghui/hbase-base:1.0.2 container_name: hbase-slave3 environment: - TZ=Asia/Shanghai volumes: - "./volume/hadoop/work/slave3:/works" - "./volume/hadoop/logs/slave3:/root/hadoop/logs/" - "./volume/spark/logs/slave3:/root/spark/logs/" - "./volume/hbase/slave3:/hworks/" - "./volume/hbase/logs/slave3:/root/hbase/logs/" - "./volume/kafka/hbase-slave3/server.properties/:/root/kafka/config/server.properties" - "/etc/localtime:/etc/localtime" hostname: hbase-slave3.hadoop-docker networks: hadoop-docker: aliases: - hbase-slave3 tty: true mysql: image: mysql:5.7 volumes: - "./volume/mysql:/var/lib/mysql" - "/etc/localtime:/etc/localtime" container_name: mysql hostname: mysql networks: - hadoop-docker environment: - TZ=Asia/Shanghai - MYSQL_ROOT_PASSWORD=hadoop tty: true ports: - "3306:3306" zoo1: image: twinsen/zookeeper:3.4.10 volumes: - "./volume/zk/zoo1:/works" - "/etc/localtime:/etc/localtime" container_name: zoo1 environment: - TZ=Asia/Shanghai hostname: zoo1 networks: hadoop-docker: aliases: - zoo1 tty: true zoo2: image: twinsen/zookeeper:3.4.10 volumes: - "./volume/zk/zoo2:/works" - "/etc/localtime:/etc/localtime" container_name: zoo2 environment: - TZ=Asia/Shanghai hostname: zoo2 networks: hadoop-docker: aliases: - zoo2 tty: true zoo3: image: twinsen/zookeeper:3.4.10 volumes: - "./volume/zk/zoo3:/works" - "/etc/localtime:/etc/localtime" container_name: zoo3 environment: - TZ=Asia/Shanghai hostname: zoo3 networks: hadoop-docker: aliases: - zoo3 tty: true es01: image: docker.elastic.co/elasticsearch/elasticsearch:5.6.0 container_name: es01 environment: - TZ=Asia/Shanghai - node.name=es01 - cluster.name=es-docker-cluster - "discovery.zen.ping.unicast.hosts=es01,es02,es03" - "discovery.zen.minimum_master_nodes=2" - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - "./volume/elasticsearch/data01:/usr/share/elasticsearch/data1" - "/etc/localtime:/etc/localtime" ports: - 9200:9200 networks: hadoop-docker: aliases: - es01 tty: true es02: image: docker.elastic.co/elasticsearch/elasticsearch:5.6.0 container_name: es02 environment: - TZ=Asia/Shanghai - node.name=es02 - cluster.name=es-docker-cluster - "discovery.zen.ping.unicast.hosts=es01,es02,es03" - "discovery.zen.minimum_master_nodes=2" - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - "./volume/elasticsearch/data02:/usr/share/elasticsearch/data2" - "/etc/localtime:/etc/localtime" networks: hadoop-docker: aliases: - es02 tty: true es03: image: docker.elastic.co/elasticsearch/elasticsearch:5.6.0 container_name: es03 environment: - TZ=Asia/Shanghai - node.name=es03 - cluster.name=es-docker-cluster - "discovery.zen.ping.unicast.hosts=es01,es02,es03" - "discovery.zen.minimum_master_nodes=2" - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 volumes: - "./volume/elasticsearch/data03:/usr/share/elasticsearch/data3" - "/etc/localtime:/etc/localtime" networks: hadoop-docker: aliases: - es03 tty: true kibana: image: docker.elastic.co/kibana/kibana:5.6.0 container_name: kibana environment: - TZ=Asia/Shanghai ports: - "5601:5601" volumes: - "/etc/localtime:/etc/localtime" - "./configs/kibana.yml:/usr/share/kibana/config/kibana.yml:rw" depends_on: - es01 networks: hadoop-docker: aliases: - kibana tty: true logstash: image: docker.elastic.co/logstash/logstash:5.6.0 volumes: - "./configs/logstash.conf:/etc/logstash/conf.d/logstash.conf" - "./configs/logstash.conf:/usr/share/logstash/pipeline/logstash.conf" - "./configs/logstash.yml:/usr/share/logstash/config/logstash.yml" - "/etc/localtime:/etc/localtime" container_name: logstash environment: - TZ=Asia/Shanghai hostname: logstash restart: always depends_on: - es01 ports: - "7001-7005:7001-7005" - "9600:9600" networks: hadoop-docker: aliases: - logstash tty: true networks: hadoop-docker: external: true
注意:
- 在volume文件夹下创建elasticsearch/data01,elasticsearch/data02,elasticsearch/data03三个文件夹,用于保存es的数据
- 创建configs文件夹,用于存放kibana和logstash文件的配置
kibana配置文件:kibana.yml
## # Default Kibana configuration for docker target server.name: kibana server.host: "0.0.0.0" #这里写你的es第一个node的地址 elasticsearch.url: "http://es01:9200" xpack.monitoring.ui.container.elasticsearch.enabled: true #i18n.locale: "zh-CN"
logstash配置文件: logstash.conf
input { kafka { bootstrap_servers => "hbase-master:9092" group_id =>"es" topics =>"doc" consumer_threads =>1 decorate_events =>true codec => json { charset => "UTF-8" } } } output { elasticsearch { hosts =>["es01:9200"] user => "elastic" password => "changeme" index => "zhwiki-%{+YYYY.MM.dd}" } }
logstash.yml:
http.host: "0.0.0.0" path.config: /usr/share/logstash/pipeline #path.logs: /var/log/logstash xpack.monitoring.elasticsearch.url: http://es01:9200 xpack.monitoring.elasticsearch.username: elastic xpack.monitoring.elasticsearch.password: changeme
本节参考资料
2. 数据处理
也可以下载使用我清洗好的中文维基数据
- 获取中文维基语料获取
- 第一步清洗
使用Wikipedia Extractor获取doc格式文档(建议Linux环境下使用,win下有错误)
使用方法见GitHub官方文档与各大搜索引擎吧
- 第二步清洗
将清洗获得的doc格式清洗为下表样式
数据格式
清洗脚本
# @Author : Difer # @Datetime : 2021年06月18日 # @File : wiki_process.py # @Last Modify Time : 2021年06月18日 import re from opencc import OpenCC from tqdm import tqdm import codecs def wiki_replace(d): openCC = OpenCC('t2s') if re.match('</doc>', d): return '\r' elif re.match('<doc', d): id = re.findall(r'id=["](.*?)["]', d)[0] url = re.findall(r'url=["](.*?)["]', d)[0] title = re.findall(r'title=["](.*)["]', d)[0] s = id + '|' + url + '|' + title + '|' return s else: return openCC.convert(d).replace('\n', '').replace('\r', '') def mycallback(x): # print(x) with codecs.open(save_path, 'a+', encoding='utf-8') as f: f.write(x) if __name__ == '__main__': input_file = "/home/aistudio/data/data96051/wiki_01" # 一次清洗得出文件 save_path = '/home/aistudio/work/wiki_01.txt' # 二次清洗文件保存位置 wiki = open(input_file, 'r', encoding='utf-8') f = codecs.open(save_path, 'a+', encoding='utf-8') w = tqdm(wiki, desc=u'已获取0篇文章') for d in w: s = wiki_replace(d) f.write(s)
本节参考资料
Kafka脚本编写
使用Kafka之前,可以先尝试写一个Kafka集群启动脚本,方便后面开启与关闭Kafka集群
脚本放到Kafka目录下使用
Kafka集群启动脚本:
# start #!/bin/bash # 集群节点的名称,这里替换成自己的 BROKERS="hbase-master hbase-slave1 hbase-slave2 hbase-slave3" # Kafka的安装目录 KAFKA_HOME="/root/kafka" for broker in $BROKERS do echo "INFO:starting kafka server on ${broker}" ssh $broker "/root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server.properties" if [ $? != 0 ]; then echo "Can not starting kafka server on host ${broker}"; exit 1; fi done
Kafka集群关闭脚本:
#stop #!/bin/bash BROKERS="hbase-master hbase-slave1 hbase-slave2 hbase-slave3" for host in $BROKERS do ssh $host "source /etc/profile;jps |grep Kafka |cut -c 1-6 |xargs kill -s 9" echo "$host kafka is stopping" done
接着编写Kafka生产者
# coding: utf-8 importtime importjson importos os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3" fromkafkaimportKafkaProducer # 实例化一个KafkaProducer示例,用于向Kafka投递消息 producer=KafkaProducer(bootstrap_servers='hbase-master:9092') # 打开数据文件 file=open("/code/wikisearch/wikidata/zhwiki_01","r") count=1 forlineinfile: print(count) line=line.split('|') res={'id':line[0],'url':line[1],'title':line[2],'text':line[3]} res=json.dumps(res) producer.send('doc', res.encode('utf8')) # result = future.get(timeout=10) count=count+1 producer.flush()
接着写一个简单的消费者,测试下是否能成功接收到生产者的消息
from kafka import KafkaConsumer import logging #收不到时开这行看看能不能找找bug #logging.basicConfig(level=logging.DEBUG) consumer = KafkaConsumer('wiki', bootstrap_servers='hbase-master:9092') print('start receive') for msg in consumer: print((msg.value).decode('utf8'))
Kafka启动正常的话,可以看到以下输出
如果前面按照本文配置了Logstash.conf,也可以去Kibana页面下看下是否可以接收到
了数据
此外,也可以如指导书流计算项目一样,使用producer.py只读取与转发数据,再开一个topic,用spark streaming来读取与处理为json发送给kafka,但使用时发现这种方式速度很慢,不清楚这种架构有什么好处,以后有机会再搞清吧.暂把代码贴在下方
*没什么用的代码
from kafka import KafkaProducer from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark import SparkConf, SparkContext import json import sys import os os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3" def getarticles(zkQuorum, group, topics, numThreads): spark_conf = SparkConf().setAppName("getArticles") sc = SparkContext(conf=spark_conf) sc.setLogLevel("ERROR") ssc = StreamingContext(sc, 1) # 这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop ssc.checkpoint(".") topicAry = topics.split(",") print(topicAry) # 将topic转换为hashmap形式,而python中字典就是一种hashmap topicMap = {} for topic in topicAry: topicMap[topic] = numThreads lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x:x[1]) # lines = KafkaUtils.createDirectStream(ssc=ssc, topics=topicAry, kafkaParams=kafkaParams ).map(lambda x:x[1]) # lines.foreachRDD(lambda x:print(x.collect())) words = lines.map(lambda x: x.split("|")) words.foreachRDD(lambda x: x.foreach(lambda x: sendmsg(x))) ssc.start() ssc.awaitTermination() # 格式转化,将[id,url,title,text]变为[{'id': }, {'url': }, {'title': }, {'text',}] def get_json(rdd_list): res = {'id': rdd_list[0], 'url': rdd_list[1], 'title': rdd_list[2], 'text': rdd_list[3]} print(rdd_list[0]) return json.dumps(res) def sendmsg(rdd): if rdd.count != 0: msg = get_json(rdd) # 实例化一个KafkaProducer示例,用于向Kafka投递消息 producer = KafkaProducer(bootstrap_servers=['hbase-master:9092']) producer.send("doc", msg.encode('utf8')) # producer.flush() if __name__ == '__main__': # 输入的四个参数分别代表着 # 1.zkQuorum为zookeeper地址 # 2.group为消费者所在的组 # 3.topics该消费者所消费的topics # 4.numThreads开启消费topic线程的个数 if (len(sys.argv) < 5): print("Usage: getArticles <zkQuorum> <group> <topics> <numThreads>") exit(1) zkQuorum = sys.argv[1] group = sys.argv[2] topics = sys.argv[3] numThreads = int(sys.argv[4]) print(group, topics) getarticles(zkQuorum, group, topics, numThreads)
此处可以了解下Kafka的语义相关的一些东西,想一想如何实现"Exactly oncel"(消息的精确一次投递,不会重复投递,也不会丢失.)
本节参考资料
配置Logstash
如果已按第一步骤配置过了logstash.conf, logstash.xml,并且,docker-compose配置正确,那么,此时logstash应该已经跑起来了,如已在前面成功写好了Kafka, 现在可以进Kibana查看是否已经接收到了数据.
打开Kibana页面:
http://192.168.217.128:5601/app/kibana(此处需改为你自己虚拟机对应IP,还有注意docker的端口映射)
默认登陆账号密码为elastic, changeme
如已有数据,则可以成功在Kibana匹配上索引格式,如下图所示
如未成功,可以使用Dev Tools,查看下index信息,看看有没有自己创建的index,如没有,请自行查找Logstash与Kafka通信相关文档,修复bug
本节参考资料
编写前端应用与了解Elasticsearch
本次实践前端框架采用Flask,如果你在前面为了使用pyspark进行了降版本为python3.5的操作.那可以先安装miniconda,在conda的环境下跑flask.
接着在conda环境下安装flask, flask_bootstrap, flask_paginate, Elasticsearch
全部pip install即可
在编写前端应用前,应先了解下Elasticsearch(后面简称Es也是是它),这里有一些文章推荐
来自卡拉搜索,简单介绍了Elasticsearch的基础知识
全文搜索引擎 Elasticsearch 入门教程-阮一峰
如果你想了解倒排索引(按兴趣了解):
如果你想了解页面排序的相关算法(反正我是看晕了Orz)(按兴趣了解)
*此处建议编写前可以去设置下在docker容器远程开发的环境,方便后面调试
由于后面会使用全文索引,所以需要给es安装分词插件.
- 下载分词插件
注意下载对应版本5.6.0的
- 解压后传到每个es节点plugins目录下(此处以es01为例)
docker cp analysis-ik/ es01:/usr/share/elasticsearch/plugins/
接下来需要了解Flask几个相关知识:
- 模板
需要从中明白如何将你的后端得到的数据渲染到html之中
- 表单
需要从中思考如何获得用户输入到搜索框中的数据,并如何传递给Es进行查询
由于我不怎么会前端,所以基本上是拿别人的开源代码略微修改得出的,此处贴上原文及原文GitHub地址,建议下放的代码配合着原文进行观看.
最终前端文件框架如下,
search.py封装了一些elasticsearch对应的操作
from elasticsearch import Elasticsearch class elasticSearch(): def __init__(self, index_type: str, index_name: str): self.es = Elasticsearch("http://es01:9200/", http_auth=('elastic', 'changeme')) self.index_type = index_type self.index_name = index_name def create_index(self): if self.es.indices.exists(index=self.index_name) is True: self.es.indices.delete(index=self.index_name) self.es.indices.create(index=self.index_name, ignore=400) def delete_index(self): try: self.es.indices.delete(index=self.index_name) except: pass def get_doc(self, uid): return self.es.get(index=self.index_name, id=uid) def insert_one(self, doc: dict): self.es.index(index=self.index_name, doc_type=self.index_type, body=doc) def insert_array(self, docs: list): for doc in docs: self.es.index(index=self.index_name, doc_type=self.index_type, body=doc) def search(self, query, count: int = 30): dsl = { "query": { "multi_match": { "query": query, "fields": ["title", "text"] } }, "highlight": { "fields": { "text": {} } } } match_data = self.es.search(index=self.index_name, body=dsl, size=count) return match_data
form.py就是简单设置下表单
from flask import request from flask_wtf import FlaskForm from wtforms import StringField, SubmitField, TextAreaField from wtforms.validators import DataRequired, Length, Email, AnyOf class SearchForm(FlaskForm): search_key = StringField(u'Search', validators=[DataRequired()]) submit = SubmitField()
app.py, 启动入口与路由配置
import sys sys.path.append(".") import logging # logging.basicConfig(level=logging.DEBUG) from flask import Flask, jsonify, request, render_template, redirect from form import SearchForm from flask_bootstrap import Bootstrap from flask_paginate import Pagination, get_page_parameter from search import elasticSearch app = Flask(__name__) app.config['SECRET_KEY'] = 'DontTellAnyone' bootstrap = Bootstrap(app) @app.route('/') @app.route('/index/') def index(): searchForm = SearchForm() return render_template('index.html', searchForm=searchForm) @app.route('/search/', methods=['GET','POST']) def search(): search_key = request.args.get("search_key", default=None) print(search_key) if search_key: searchForm = SearchForm() match_data = es.search(search_key,count=30) print(match_data['hits']['hits'][3]) print(match_data['hits']['hits'][5]) # 翻页 PER_PAGE = 10 page = request.args.get(get_page_parameter(), type=int, default=1) start = (page - 1) * PER_PAGE end = start + PER_PAGE total = 30 pagination = Pagination(page=page, start=start, end=end, total=total) context = { 'match_data': match_data["hits"]["hits"][start:end], 'pagination': pagination, 'uid_link': "/wiki/" } return render_template('data.html', q=search_key, searchForm=searchForm, **context) return redirect('/') if __name__ == "__main__": es = elasticSearch(index_type='logs', index_name='zhwiki-*') app.run(host='0.0.0.0', port=5000, debug=True)
index.html,首页
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Just for Simple Search</title> <link rel="stylesheet" href="static/css/theme.css"> </head> <body> <div class="container"> <div class="main-search-box pt-3 d-block mx-auto"> <div style="text-align: center"> </div> <form class="search-form w-100" action="/search"> <input type="text" placeholder="随便搜点" name="search_key" class="form-control search-input"> <button type="submit" class="btn search-btn" value="Search">GO</button> </form> </div> </div> </body> </html>
data.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.0.0-alpha.2/css/bootstrap.min.css" integrity="sha384-y3tfxAZXuh4HwSYylfB+J125MxIs6mR5FOHamPBG064zB+AFeWH94NdvaCBm8qnd" crossorigin="anonymous"> <link href="https://maxcdn.bootstrapcdn.com/font-awesome/4.6.3/css/font-awesome.min.css" rel="stylesheet" type="text/css"> <link href="https://cdnjs.cloudflare.com/ajax/libs/featherlight/1.3.5/featherlight.min.css" type="text/css" rel="stylesheet"> <link rel="stylesheet" href="/static/css/detail.css"> <script src="https://code.jquery.com/jquery-2.1.4.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/gsap/1.18.5/TweenMax.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/featherlight/1.3.5/featherlight.min.js" type="text/javascript" charset="utf-8"></script> <script src="/static/js/detail.js"></script> <link href='http://fonts.googleapis.com/css?family=Open+Sans:300italic,400italic,600italic,700italic,800italic,400,300,600,700,800' rel='stylesheet' type='text/css'> <link rel="stylesheet" href="/static/css/bootstrap.min.css"> <link rel="stylesheet" href="/static/css/github-markdown.min.css"> <link rel="stylesheet" href="/static/css/github-v2.min.css"> <link href="{{ url_for('static', filename='css/styles.css') }}" rel="stylesheet" /> <script src="static/js/jquery.min.js"></script> <script src="static/js/bootstrap.min.js"></script> <script src="static/js/main.js"></script> </head> <body> <div class="container nopadding-md"> <form action="/search" method="GET"> {{ searchForm.search_key(size=45,class_='form-control',type='text',id="local-search-input",placeholder=search_key) }} </form> <div class="py-5 z-depth-3" id="board"> <div class="container"> <div class="row"> <div class="col-12 col-md-10 m-auto"> {% if match_data==[] %} <div class="row mb-4 mx-auto"> <div class="col-12 col-md-8 m-auto"> <h4 style=color:#c4e6f5">Sorry, {{ q }} Not Found</h4> <h4 style=color:#c4e6f5">数据库无法得出结论,或许折木奉太郎可以</h4> <br> <br> </div> </div> {% else %} {% for data in match_data %} <div class="row mb-4 mx-auto"> <div class="col-12 col-md-8 m-auto"> <a href="{{ 'https://zh.wikipedia.org/wiki/' + data._source.url }}"> <div class="index-excerpt"> <div class="index-text mb-1"> {{ data._source.title }} <a href="javascript:;"> </a> <i class="iconfont icon-riqi2"></i> 词条得分:{{ data._score }} </div> </div> </a> <div class="search-results"> {% if data.highlight %} {{ data.highlight.text[0] | safe }} . . . . {% else %} 不知原因没有资料 。。。 {% endif %} <br><br> </div> </div> </div> {% endfor %} <ul class="fpageul"> {{ pagination.links }} </ul> {% endif %} </div> </div> </div> </div> </div> </body> </html>
本节参考资料
一个很好的ES全文搜索实践项目,博客对搭建过程讲解的很详细,还有在线demo演示https://github.com/triestpa/Guttenberg-Search
一个Flask的详细教程,第十六章讲解了如何联合ES使用
至此,此次实践项目基本讲解完毕
算是无聊日常的一次调剂吧~
- 作者:Difer
- 链接:https://difer.cn/article/kafka-spark-elasticsearch
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。