对于ElasticSearch学习总结感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解基础篇,可学习,可复习,并且为您提供关于ElasticSearchJAVAAPI基础学习------El
对于ElasticSearch学习总结感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解基础篇,可学习,可复习,并且为您提供关于ElasticSearch JAVA API基础学习------Elasticsearch学习(二)、Elasticsearch 学习一(基础入门).、elasticsearch 学习总结、Elasticsearch 学习总结六 使用 Observer 实现 HBase 到 Elasticsearch 的数据同步的宝贵知识。
本文目录一览:- ElasticSearch学习总结(基础篇,可学习,可复习)(elasticsearch6教程)
- ElasticSearch JAVA API基础学习------Elasticsearch学习(二)
- Elasticsearch 学习一(基础入门).
- elasticsearch 学习总结
- Elasticsearch 学习总结六 使用 Observer 实现 HBase 到 Elasticsearch 的数据同步
ElasticSearch学习总结(基础篇,可学习,可复习)(elasticsearch6教程)
推荐:前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。。 点击跳转到网站
最近一周都在学习ElasticSearch,之前也零零散散的学过一点,这次下定决心花一周的时间将之前学的知识总结一下,顺便接着再往下学习,所以写篇博客总结一下最近一周的成果,本篇属于ElasticSearch的基础篇,后面会继续深入学习。也希望这篇拙作可以帮助到诸位大佬,如有不足之处,还望诸佬不吝赐教,倾囊相授。
ElasticSearch学习总结
- ElasticSearch概述
- ES和Solr的差别
- ElasticSearch简介
- Solr简介
- ElasticSearch安装
- ES核心概念
- IK分词器
- Rest风格说明
- 关于文档的基本操作(重点)
- 集成SpringBoot
- 实战案例
ElasticSearch概述
ElasticSearch、简称ES,ES是一个开源的高扩展的分布式全文检索引擎,它可以近乎实时的存储,检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别(大数据时代)的数据,ES也使用Java开发使用Lucene作为其核心来实现所有索引和搜索的功能,但是他的目的是通过简单的RestFul API来隐藏Lucene的复杂性,从而让全文搜索变得简单。
ES和Solr的差别
ElasticSearch简介
ElasticSearch是一个实时分布式搜索和分析引擎,它让你以前所未有的速度处理大数据成为可能
它用于全文搜索、结构化、分析以及将这三者混合使用
维基百科使用ElasticSearch提供全文搜索并高亮关键字,以及输入实时搜索(search-asyou-type)和搜索纠错等搜索建议功能……
ElasticSearch是一个基于Apache Lucene的开源搜索引擎,无论在开源还是专有领域,Lucene可以被认为是迄今为止最先进,性能最好的,功能最全的搜索引擎库
但是,Lucene只是一个库,想要使用它,你必须使用java来作为开发语言并将其直接集成到你的应用中,更糟糕的是,Lucene非常复杂,你需要深入了解检索的相关知识来理解它是如何工作的
ElasticSearch也使用java开发并使用Lucene作为其核心来实现所有索引和搜索功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单!
Solr简介
Solr是Apache下的一个顶级开源项目,采用Java开发,它是基于Lucene的全文搜索服务器,Solr提供了比Lucene更为丰富的查询语言,同时实现了可配置、可扩展、并对索引、搜索性能进行了优化!
Solr可以独立运行,运行在Jetty、Tomcat等这些Servlet容器中,Solr索引的实现方法很简单,==用POST方法向Solr服务器发送一个描述Field及其内容的XML文档,Solr根据xml文档添加,删除,更新索引,==Solr搜索只需要发送HTTP GET请求,然后对Solr返回xml、json等格式的查询结果进行解析,组织页面布局,Solr不提供构建UI的功能,Solr提供了一个管理界面,通过管理界面可以查询Solr的配置和运行情况。
Solr是基于Lucene开发企业级搜索服务器,实际上就是封装Lucene
Solr是一个独立的企业级搜索应用服务器,它对外提供类似与web-service的API接口,用户可以通过http请求,向搜索引擎服务器提交一定格式的文件,生成索引,也可以通过提交查找请求,并得到返回结果!
ElasticSearch 和 Solr
- 当单纯的对已有数据进行搜索时,Solr更快!
- 当实时建立索引时,Solr会产生IO阻塞,查询性能较差,ElasticSearch具有明显的优势
- 随着数据量的增加,Solr的搜索效率会变得更低,而ElasticSearch却没有明显的的变化
ElasticSearch vs Solr
- ES基本是开箱即用(解压就可以用),非常简单,Solr安装稍微复杂一点
- Solr利用Zookeeper进行分布式管理。而ElasticSearch自身带有分布式协调管理功能
- Solr支持更多格式的数据,比如JSON、XML、CSV,而ElasticSearch仅支持json文件格式
- Solr官方提供的功能更多,而ElasticSearch本身更注重核心功能,高级功能多有第三方插件提供,例如,图形化界面,Kibana友好支撑
- Solr查询快,但更新索引时慢(即插入删除慢),用于电商等查询多的应用;
- ES建立索引快(即查询慢),即实时性查询快,用于facebook新浪等搜索
- Solr是传统搜索应用的有利解决方案,但ElasticSearch更适用于新兴的实时搜索应用
6.solr比较成熟,有一个更大,更成熟的用户,开发和贡献者社区,而ElasticSearch相对开发维护者少,更新太快,学习成本较高
ElasticSearch安装
Java开发,ElasticSearch的版本和我们之后对应的java的核心jar包,版本对应,java环境正常!
2、熟悉目录
bin 启动文件
config 配置文件
log4j2 日志配置文件
jvm.options java虚拟机相关的配置
ElasticSearch ElasticSearch配置文件 默认端口9200 !跨域
lib 相关jar包
modules 功能模块
plugins 插件
3、启动 ,访问9200 elasticsearch.bat
4、访问测试
安装可视化界面 es head的插件
1、下载地址https://github.com/mobz/elasticsearch-head
2、启动
npm install 安装依赖
npm run start 启动
3、连接测试发现,存在跨域问题:配置es
http.cors.enabled: true
http.cors.allow-origin: "*"
4、重启es ,再次连接
初学,可以把es当做一个数据库!(可以建立索引(库),文档(库中的数据))
这个head我们就把它当做数据展示工具!我们后面所有的查询,Kibana
安装Kibana
Kibana是一个针对ElasticSearch的开源分析及可视化平台,用来搜索,查看交互存储在ElasticSearch索引中的数据,使用Kibana,可以通过各种图表进行高级数据分析及展示,Kibana让海量数据更容易理解,基于浏览器的用户界面可以快速创建仪表板实时显示ElasticSearch查询动态,设置Kibana非常简单,无序编码或者额外的基础架构,几分钟内就可以完成Kibana安装并启动ElasticSearch索引检测。
官网:https://www.elastic.co/cn/kibana
Kibana版本要和ES版本一致
启动测试
1、目录结构
2、启动
3、开发工具!(POST、curl、head、谷歌浏览器插件)
之后的所有的操作都在这里编写
4、汉化!自己修改Kibana.yml
! zh-CN
ES核心概念
概述
集群、节点、索引、类型、文档、分片、映射是什么?
@H_748_301@Relational DBelasticSearch是面向文档,关系型数据库 和 ElasticSearch 客观的对比!一切都是JSON
ElasticSearch | |
---|---|
数据库(database) | 索引(indices) |
表(tables) | types |
行(rows) | documents |
字段(columns) | fields |
elasticsearch(集群)中可以包含多个索引(数据库),每个索引中可以包含多个类型(表),每个类型下又包含多个文档(行),每个文档重女又包含多个字段(列)
物理设计:
ElasticSearch在后台把每个索引划分为多个分片,每分分片可以在集群中的不同服务器间迁移
逻辑设计:
一个索引类型中,包含多个文档,比如说文档1,文档2,当我们索引一篇文档时,可以通过这样的一个顺序找到它:索引–>类型–>文档ID。通过这个组合我们就能索引到某个具体的文档,注意:ID不必是整数,实际上它是个字符串
文档
之前说ElasticSearch是面向文档的,那么就意味着索引和搜索数据的最小单位是文档,ElasticSearch,文档有几个重要属性:
- 自我包含,一篇文档同时包含字段和对应的值,也就是同时包含key:value
- 可以是层次型的,一个文档中包含自文档,复杂的逻辑实体就是这么来的
- 灵活的结构,文档不依赖预先定义的模式,我们知道关系型数据库中,要提前定义字段才能使用,在ElasticSearch中,对于字段是非常灵活的,有时候,我们可以忽略改字段,或者动态的添加一个新的字段
- 尽管我们可以随意的新增或者忽略某个字段,但是,每个字段的类型非常重要,比如一个年龄字段类型,可以是字符串也可以是整型,因为ElasticSearch会保存字段和类型之间的映射及其他的设置,这种映射具体到每个映射的每种类型,这也是为什么在ElasticSearch中,类型有时候也称为映射类型。
类型
索引(就是一个数据库)
倒排索引
IK分词器
什么是IK分词器
分词:即把一段中文或者别的划分成一个个的关键字,我们把搜索时会把自己的信息进行分词,会把数据库中或索引库中的数据进行分词,然后进行一个匹配操作,默认的中文分词是将每个字看成一个词。比如:“我爱狂神”会被分为:”我“,”爱“,”狂“,”神“,这显然是不符合要求的,所以我们需要安装中文分词器IK来解决这个问题
IK提供了两个分词算法:ik_smart和ik_max_word,其中ik_smart为最少切分,ik_max_word为最细粒度划分
下载安装
1、https://github.com/medcl/elasticsearch-analysis-ik
2、下载完毕之后,直接放在ElasticSearch插件中即可!
3、重启ElasticSearch
测试分词器效果
ik__smart
ik_max_word
ik分词器增加自己的配置!
保存后重启ES!
Rest风格说明
一种软件架构风格,而不是标准,只是提供了一组设计原则和约束条件,它主要用于客户端和服务端交互类的软件,基于这个风格设计的软件可以更简洁,更有层次,更易于实现缓存等机制。
基本的Rest命令说明:
method | URL地址 | 描述 |
---|---|---|
PUT | localhost:9200/索引名称/类型名称/文档id | 创建文档(指定文档id) |
POST | localhost:9200/索引名称/类型名称 | 创建文档(随机文档id) |
POST | localhost:9200/索引名称/类型名称/文档id/_update | 修改文档 |
DELETE | localhost:9200/索引名称/类型名称/文档id | 删除文档 |
GET | localhost:9200/索引名称/类型名称/文档id | 查询文档通过文档id |
POST | localhost:9200/索引名称/类型名称/_search | 查询所有数据 |
基础测试
1、创建一个索引
PUT /索引名/类型名/文档id
{请求体}
2、向索引中PUT值
3、name这个字段用不用指定类型呢,毕竟我们关系型数据库,是需要指定类型的
-
字符串类型
text 、keyword
-
数值类型
long、integer、short、byte、double、float、half、float、scaled
-
日期类型
date
-
te布尔值类型
boolean
-
二进制类型
binary
-
等等……
4、指定字段的类型(创建规则)
获取规则,可以通过get请求获取具体的信息
GET test2
测试
如果自己的文档字段没有自定,那么es会给我们配置默认字段类型!
扩展:通过命令ElasticSearch索引情况! 通过get _cat/ 可以获得ElasticSearch的很多信息
修改索引 提交还是使用PUT
曾经的方法:
最新办法
删除索引 通过DELETE命令实现删除,根据你的请求来判断是删除索引还是删除文档记录!
关于文档的基本操作(重点)
基本操作
1、添加一条数据
PUT /wumao/user/1
{
"name":"wumao",
"age":21,
"desc":"一顿操作猛如虎,一看工资2500",
"tags":["技术宅","无聊者"]
}
2、获取数据 GET
3、更新操作 POST _update
推荐使用这种更新方式
简单的搜索
GET wumao/user/1
简单的条件查询 ,可以根据默认的映射规则,产生基本的查询!
复杂操作 搜索 select(排序、分页、高亮、精准查询!)
输出结果,不想要那么多结果!select name,desc . . . .
之后使用Java操作es,所有的方法和对象就是这里面的key!
排序
分页
数据下标还是从0开始的,和学的所有数据结构是一样的!
/search/{current}/{pageSize}
布尔值查询
must (and),所有的条件都要符合 where id = 1 and name =xxx
should( or ),所有的条件都要符合 where id = 1 orname =xxx
must_not(not)
过滤器filter
gt > 大于
gte >= 大于等于
lt < 小于
lte <= 小于等于
匹配多个条件
精确查询!
trem 查询是直接通过倒排索引指定的词条进行精确的查找的!
关于分词:
-
term,直接查询精确地
-
match,会使用分词器解析(先分析文档,通过分析的文档进行查询)
两个类型 text keyword
多个值精确匹配
高亮查询!
- 匹配
- 按条件匹配
- 精确匹配
- 区间范围匹配
- 匹配字段过滤
- 多条件查询
- 高亮查询
集成SpringBoot
找官方文档!
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.12/index.html
1、找到原生的依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.13.2</version>
</dependency>
2、找对象
3、分析这个类中的方法
配置基本的项目
问题:创建项目默认的elasticsearch的默认版本是7.12.1,版本和本地不一致!
需要自己定义版本的依赖
分析源码
虽然导入了三个类,静态内部类,核心类只有一个
/**
* Elasticsearch rest client configurations.
*
* @author Stephane Nicoll
*/
class ElasticsearchRestClientConfigurations {
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(RestClientBuilder.class)
static class RestClientBuilderConfiguration {
@Bean
RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
return new DefaultRestClientBuilderCustomizer(properties);
}
//第一个bean RestClientBuilder
@Bean
RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties,
ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) {
HttpHost[] hosts = properties.getUris().stream().map(this::createHttpHost).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts);
builder.setHttpClientConfigCallback((httpClientBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(httpClientBuilder));
return httpClientBuilder;
});
builder.setRequestConfigCallback((requestConfigBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(requestConfigBuilder));
return requestConfigBuilder;
});
builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
return builder;
}
private HttpHost createHttpHost(String uri) {
try {
return createHttpHost(URI.create(uri));
}
catch (IllegalArgumentException ex) {
return HttpHost.create(uri);
}
}
private HttpHost createHttpHost(URI uri) {
if (!StringUtils.hasLength(uri.getUserInfo())) {
return HttpHost.create(uri.toString());
}
try {
return HttpHost.create(new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(),
uri.getQuery(), uri.getFragment()).toString());
}
catch (URISyntaxException ex) {
throw new IllegalStateException(ex);
}
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(RestHighLevelClient.class)
static class RestHighLevelClientConfiguration {
//第二个bean RestHighLevelClient 高级客户端,后面项目会用到!
@Bean
RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Sniffer.class)
@ConditionalOnSingleCandidate(RestHighLevelClient.class)
static class RestClientSnifferConfiguration {
@Bean
@ConditionalOnMissingBean
Sniffer elasticsearchSniffer(RestHighLevelClient client, ElasticsearchRestClientProperties properties) {
SnifferBuilder builder = Sniffer.builder(client.getLowLevelClient());
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties.getSniffer().getInterval()).asInt(Duration::toMillis)
.to(builder::setSniffIntervalMillis);
map.from(properties.getSniffer().getDelayAfterFailure()).asInt(Duration::toMillis)
.to(builder::setSniffAfterFailureDelayMillis);
return builder.build();
}
}
static class DefaultRestClientBuilderCustomizer implements RestClientBuilderCustomizer {
private static final PropertyMapper map = PropertyMapper.get();
private final ElasticsearchRestClientProperties properties;
DefaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
this.properties = properties;
}
@Override
public void customize(RestClientBuilder builder) {
}
@Override
public void customize(HttpAsyncclientBuilder builder) {
builder.setDefaultCredentialsProvider(new PropertiesCredentialsProvider(this.properties));
}
@Override
public void customize(RequestConfig.Builder builder) {
map.from(this.properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis)
.to(builder::setConnectTimeout);
map.from(this.properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis)
.to(builder::setSocketTimeout);
}
}
private static class PropertiesCredentialsProvider extends BasicCredentialsProvider {
PropertiesCredentialsProvider(ElasticsearchRestClientProperties properties) {
if (StringUtils.hasText(properties.getUsername())) {
Credentials credentials = new UsernamePasswordCredentials(properties.getUsername(),
properties.getpassword());
setCredentials(AuthScope.ANY, credentials);
}
properties.getUris().stream().map(this::toUri).filter(this::hasUserInfo)
.forEach(this::addUserInfoCredentials);
}
private URI toUri(String uri) {
try {
return URI.create(uri);
}
catch (IllegalArgumentException ex) {
return null;
}
}
private boolean hasUserInfo(URI uri) {
return uri != null && StringUtils.hasLength(uri.getUserInfo());
}
private void addUserInfoCredentials(URI uri) {
AuthScope authScope = new AuthScope(uri.getHost(), uri.getPort());
Credentials credentials = createuserInfoCredentials(uri.getUserInfo());
setCredentials(authScope, credentials);
}
private Credentials createuserInfoCredentials(String userInfo) {
int delimiter = userInfo.indexOf(":");
if (delimiter == -1) {
return new UsernamePasswordCredentials(userInfo, null);
}
String username = userInfo.substring(0, delimiter);
String password = userInfo.substring(delimiter + 1);
return new UsernamePasswordCredentials(username, password);
}
}
}
具体的API测试
1、创建索引
2、判断文档是否存在
3、删除索引
4、创建文档
5、CRUD文档
@SpringBoottest
class WumaoEsApiApplicationTests {
@Autowired
@Qualifier("restHighLevelClient")
private RestHighLevelClient client;
//测试创建索引 在java中所有的请求都是用Request PUT wumao_index
@Test
public void testCreateIndex() throws IOException {
//1、创建索引请求
CreateIndexRequest request = new CreateIndexRequest("wumao_index");
//2、客户端执行请求 IndicesClient,请求后获取响应
CreateIndexResponse createIndexResponse = client.indices()
.create(request, RequestOptions.DEFAULT);
System.out.println(createIndexResponse);
}
//测试获取索引
@Test
void testExistsIndex() throws IOException {
GetIndexRequest re = new GetIndexRequest("wumao_index");
boolean exists = client.indices().exists(re,RequestOptions.DEFAULT);
System.out.println(exists);
}
//测试删除索引
@Test
void testDeleteIndex() throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest("wumao_index");
AckNowledgedResponse delete = client.indices().delete(request, RequestOptions.DEFAULT);
System.out.println(delete);
}
//添加文档
@Test
void testAddDocument() throws IOException {
//创建对象
User user = new User("五毛",3);
//创建请求
IndexRequest request = new IndexRequest("wumao_index");
//设值一些规则 put /wumao_index/_doc/1
request.id("1");
request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");
//将我们的数据放入请求 json
String string = JSON.toJSONString(user);
request.source(string, XContentType.JSON);
//客户端发送请求,获取响应的结果
IndexResponse index = client.index(request, RequestOptions.DEFAULT);
System.out.println(index.toString());
System.out.println(index.status());//对应我们命令的返回状态
}
//获取文档
@Test
void testIsExists() throws IOException {
//获取文档,判断是否存在 get/index/doc/1
GetRequest index = new GetRequest("wumao_index", "1");
//不获取返回的_source的上下文了
index.fetchSourceContext(new FetchSourceContext(false));
index.storedFields("_none_");
boolean exists = client.exists(index, RequestOptions.DEFAULT);
System.out.println(exists);
}
//获取文档的信息
@Test
void testGetDocument() throws IOException {
GetRequest index = new GetRequest("wumao_index", "1");
GetResponse fields = client.get(index, RequestOptions.DEFAULT);
System.out.println(fields.getSourceAsstring());
System.out.println(fields);
}
//更新文档记录
@Test
void testUpdateDocument() throws IOException {
UpdateRequest index = new UpdateRequest("wumao_index", "1");
//设置超时时间
index.timeout(TimeValue.timeValueSeconds(1));
User user = new User("法外狂徒张三", 12);
UpdateRequest doc = index.doc(JSON.toJSONString(user),XContentType.JSON);
UpdateResponse update = client.update(index, RequestOptions.DEFAULT);
System.out.println(update);
}
//删除文档记录
@Test
void testDeleteDocument() throws IOException {
DeleteRequest request = new DeleteRequest("wumao_index", "1");
//设置请求过期时间
request.timeout("1s");
DeleteResponse delete = client.delete(request, RequestOptions.DEFAULT);
System.out.println(delete.status());
}
//特殊的,真的项目一般都是批量插入数据
@Test
void testBulkRequest() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
ArrayList<User> userList = new ArrayList<>();
userList.add(new User("wumao",3));
userList.add(new User("wumao1",23));
userList.add(new User("wumao2",21));
userList.add(new User("wumao3",12));
userList.add(new User("wumao4",13));
userList.add(new User("wumao5",23));
userList.add(new User("wumao6",33));
for (int i = 0; i < userList.size(); i++) {
bulkRequest.add(new IndexRequest("wumao_index")
.source(JSON.toJSONString(userList.get(i)),XContentType.JSON)
);
}
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulk.hasFailures());//是否失败!
}
//查询
// SearchRequest 搜索请求
// SearchSourceBuilder 条件构造
// HighlightBuilder 构建高亮
// TermQueryBuilder 精确查询
// MatchAllQueryBuilder 查询全部
// xxxxQueryBuilder 对应之前的那些命令
@Test
void testSearch() throws IOException {
SearchRequest request = new SearchRequest("wumao_index");
//构建搜索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//查询条件,我么可以使用QueryBuilders 工具类来实现
//QueryBuilders.termQuery 精确查询
//QueryBuilders.matchAllQuery(); 匹配所有
MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
TermQueryBuilder termQuery = QueryBuilders.termQuery("name", "wumao");
//查询过期时间
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
request.source(sourceBuilder);
SearchResponse search = client.search(request, RequestOptions.DEFAULT);
System.out.println(JSON.toJSONString(search.getHits()));
for (SearchHit hit : search.getHits().getHits()) {
System.out.println(hit.getSourceAsMap());
}
}
//批量创建文档
@Test
void testBulkDocument() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
//设置过期时间
bulkRequest.timeout("60s");
ArrayList<User> userList = new ArrayList<>();
userList.add(new User("qinfeng",3));
userList.add(new User("qinfeng1",3));
userList.add(new User("qinfeng2",3));
userList.add(new User("qinfeng3",3));
userList.add(new User("qinfeng4",3));
userList.add(new User("qinfeng5",3));
for (int i = 0; i < userList.size(); i++) {
bulkRequest.add(new IndexRequest("wumao_index")
.id(""+(i+1))
.source(JSON.toJSONString(userList.get(i)),XContentType.JSON));
}
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulk.hasFailures());//判断是否失败
}
}
实战案例
- 爬虫
数据问题 数据库中获取,消息队列获取
爬取数据(获取请求返回的页面信息,筛选出我们所需要的)
<!--解析网页-->
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.14.2</version>
</dependency>
public List<Content> parseJD(String keyWords) throws Exception {
//获取请求 https://search.jd.com/Search?keyword=java
String url = "https://search.jd.com/Search?keyword="+keyWords;
//解析网页 (Jsoup返回的Document就是Document对象)
Document document = Jsoup.parse(new URL(url), 30000);
//所有在js中使用的方法,在这里都可以使用
Element element = document.getElementById("J_goodsList");
ArrayList<Content> list = new ArrayList<>();
//获取所有的li元素
Elements elements = element.getElementsByTag("li");
//这里的el就是每一个li标签
for (Element el : elements) {
//关于这种图片特别多的网站,都是延时加载的
String price = el.getElementsByClass("p-price").eq(0).text();
String title = el.getElementsByClass("p-name").eq(0).text();
Content content = new Content();
content.setTitle(title);
content.setPrice(price);
list.add(content);
}
return list;
}
-
前后端分离
-
搜索高亮
//解析数据放入到es中
public Boolean parseContent(String keyWords) throws Exception {
List<Content> contents = new HtmlParseUtil().parseJD(keyWords);
//把查询到的数据放入到es中
BulkRequest request = new BulkRequest();
request.timeout("2m");
for (int i = 0; i < contents.size(); i++) {
System.out.println(JSON.toJSONString(contents.get(i)));
request.add(
new IndexRequest("jd_goods")
.source(JSON.toJSONString(contents.get(i)), XContentType.JSON));
}
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
return !bulkResponse.hasFailures();
}
//3.实现搜索高亮功能
public List<Map<String,Object>> searchHighlightPage(String keyword,int pageNO,int pageSize) throws IOException {
if (pageNO <=1){
pageNO =1;
}
//条件搜索
SearchRequest searchRequest = new SearchRequest("jd_goods");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//分页
sourceBuilder.from(pageNO);
sourceBuilder.size(pageSize);
//精准匹配
QueryBuilders.termQuery("title",keyword);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title");
highlightBuilder.requireFieldMatch(false); //高亮显示一个title只显示一个高亮就可以
highlightBuilder.preTags("<span>");
highlightBuilder.postTags("</span>");
sourceBuilder.Highlighter(highlightBuilder);
//执行搜索
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//解析结果
ArrayList<Map<String,Object>> list = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits()) {
//解析高亮字段
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField title = highlightFields.get("title");
Map<String, Object> sourceAsMap = hit.getSourceAsMap();//原来的结果
//解析高亮字段,将原来的字段换为我们高亮的字段即可!
if (title!=null){
Text[] fragments = title.fragments();
String n_title="";
for (Text text : fragments) {
n_title += text;
}
sourceAsMap.put("title",n_title); //高亮字段替换掉原来的内容即可!
}
list.add(sourceAsMap);
}
return list;
}
本篇到这里就结束了!后续还会继续更新ElasticSearch调优、ElasticSearch集群以及面试题相关的内容,
感谢诸佬的点赞和支持。
如有不足之处,还希望诸佬指出不足之处,加以修正。
再见了各位小伙伴!
ElasticSearch JAVA API基础学习------Elasticsearch学习(二)
在上一篇博客中(Elasticsearch安装)已经完成了es的安装,那么接下来,将介绍下如在java代码中完成对某个索引的类型的文档的增删改查。这个java api的介绍在官网上也有很好的例子,大家可以参考下。
完整演示demo下载:
github:https://github.com/wesley5201314/Elasticsearch-demo
gitosc:https://git.oschina.net/zhengweishan/Elasticsearch-demo
es中的索引就对应数据库,类型就对应着数据库中的表,文档就对应着数据库表中的记录,因此,我们首先得创建一个索引,然后,再创建一个类型,这个类型会包含字段类型信息,然后就可以在这个索引上对此类型的文档进行增删改查了。
下面,将分步介绍:
创建一个EsClient.java类,直接贴出代码如下:
package com.es.demo;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
public class EsClient {
/** 配置文件参数 */
private static Properties properties = null;
private static String es_ip = null; //ip
private static int es_port = 0; //端口
private static String es_cluster_name = null; //集群名字
private static TransportClient client = null;
public TransportClient getClient() throws IOException{
properties = new Properties();
properties.load(TransportClient.class.getClassLoader().getResourceAsStream("env.properties"));
es_ip = properties.getProperty("target.es.ip");
es_port = Integer.valueOf(properties.getProperty("target.es.port"));
es_cluster_name = properties.getProperty("target.es.cluster.name");
System.out.println("原始地址,es_ip = "+es_ip);
String [] hosts = es_ip.split(",");
System.out.println("切割之后的esIp数组 = "+hosts);//针对于以后的集群部署我这里只有 一台服务器
Settings settings = Settings.settingsBuilder()
.put("cluster.name",es_cluster_name)
.put("client.transport.sniff", true).build();
client = TransportClient.builder().settings(settings)
.build();
for(int i=0;i<hosts.length;i++){
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hosts[i]), es_port));
}
return client;
}
}
然后创建一个实体类Test.java对应es中的数据
package com.es.demo;
public class Test {
private String author;
private String content;
private String title;
private String category;
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
@Override
public String toString() {
return "Test [author=" + author + ", content=" + content + ", title=" + title + ", category=" + category + "]";
}
}
然后再提供一个转换工具类(此类不是多好,有好的大家可以提供下):
package com.es.demo;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Date;
import java.util.Map;
import org.joda.time.DateTime;
import org.joda.time.format.ISODateTimeFormat;
/**
*
* <br>
* 标题: map转成bean<br>
* 描述: <br>
* 公司: www.tydic.com<br>
* @autho wesley
* [@time](https://my.oschina.net/u/126678) 2016年10月31日 下午5:03:08
*/
public class ConverterUtil {
/** 转换为驼峰(大写)
*
* [@param](https://my.oschina.net/u/2303379) underscoreName
* [@return](https://my.oschina.net/u/556800) */
public static String camelCaseName(String underscoreName) {
StringBuilder result = new StringBuilder();
if (underscoreName != null && underscoreName.length() > 0) {
boolean flag = false;
for (int i = 0; i < underscoreName.length(); i++) {
char ch = underscoreName.charAt(i);
if ("_".charAt(0) == ch) {
flag = true;
}
else {
if (flag) {
result.append(Character.toUpperCase(ch));
flag = false;
}
else {
result.append(Character.toLowerCase(ch));
}
}
}
}
return result.toString();
}
/** 转换为下划线(大写)
*
* @param camelCaseName
* @return */
public static String underscoreName(String camelCaseName) {
StringBuilder result = new StringBuilder();
int len = camelCaseName.length();
if (camelCaseName != null && len > 0) {
result.append(camelCaseName.substring(0, 1).toUpperCase());
for (int i = 1; i < len; i++) {
char ch = camelCaseName.charAt(i);
if (Character.isUpperCase(ch)) {
result.append("_");
result.append(ch);
}
else {
result.append(Character.toUpperCase(ch));
}
}
}
return result.toString();
}
/** 把Map<String,Object>处理成实体类
*
* @param clazz
*想要的实体类
* @param map
*包含信息的Map对象
* @return */
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Object mapToObject(Class clazz, Map<String, Object> map) {
if (null == map) {
return null;
}
Field[] fields = clazz.getDeclaredFields(); // 取到类下所有的属性,也就是变量名
Field field;
Object o = null;
try {
o = clazz.newInstance();
}
catch (InstantiationException e1) {
e1.printStackTrace();
}
catch (IllegalAccessException e1) {
e1.printStackTrace();
}
for (int i = 0; i < fields.length; i++) {
field = fields[i];
String fieldName = field.getName();
// 把属性的第一个字母处理成大写
String stringLetter = fieldName.substring(0, 1).toUpperCase();
// 取得set方法名,比如setBbzt
String setterName = "set" + stringLetter + fieldName.substring(1);
// 真正取得set方法。
Method setMethod = null;
Class fieldClass = field.getType();
try {
if (isHaveSuchMethod(clazz, setterName)) {
if (fieldClass == String.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, String.valueOf(map.get(fieldName)));// 为其赋值
}
}
else if (fieldClass == Integer.class || fieldClass == int.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, Integer.parseInt(String.valueOf(map.get(fieldName))));// 为其赋值
}
}
else if (fieldClass == Boolean.class || fieldClass == boolean.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, Boolean.getBoolean(String.valueOf(map.get(fieldName))));// 为其赋值
}
}
else if (fieldClass == Short.class || fieldClass == short.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, Short.parseShort(String.valueOf(map.get(fieldName))));// 为其赋值
}
}
else if (fieldClass == Long.class || fieldClass == long.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, Long.parseLong(String.valueOf(map.get(fieldName))));// 为其赋值
}
}
else if (fieldClass == Double.class || fieldClass == double.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, Double.parseDouble(String.valueOf(map.get(fieldName))));// 为其赋值
}
}
else if (fieldClass == Float.class || fieldClass == float.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, Float.parseFloat(String.valueOf(map.get(fieldName))));// 为其赋值
}
}
else if (fieldClass == BigInteger.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, BigInteger.valueOf(Long.parseLong(String.valueOf(map.get(fieldName)))));// 为其赋值
}
}
else if (fieldClass == BigDecimal.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
setMethod.invoke(o, BigDecimal.valueOf(Long.parseLong(String.valueOf(map.get(fieldName)))));// 为其赋值
}
}
else if (fieldClass == Date.class) {
setMethod = clazz.getMethod(setterName, fieldClass);
if (null != map.get(fieldName) && !("").equals(map.get(fieldName))) {
DateTime date = ISODateTimeFormat.dateTimeParser().parseDateTime((String) map.get(fieldName));
setMethod.invoke(o, new Date(date.getMillis()));// 为期赋值
}
/*
* if (map.get(fieldName).getClass() == java.sql.Date.class) { setMethod.invoke(o, new Date(((java.sql.Date)
* map.get(fieldName)).getTime()));// 为其赋值 } else if (map.get(fieldName).getClass() ==
* java.sql.Time.class) { setMethod.invoke(o, new Date(((java.sql.Time) map.get(fieldName)).getTime()));// 为其赋值 } else
* if (map.get(fieldName).getClass() == java.sql.Timestamp.class) { setMethod.invoke(o, new Date(((java.sql.Timestamp)
* map.get(fieldName)).getTime()));// 为其赋值 }else if(map.get(fieldName).getClass() ==
* org.joda.time.format.ISODateTimeFormat.class){ DateTime date =
* ISODateTimeFormat.dateTimeParser().parseDateTime((String)map.get(fieldName)); setMethod.invoke(o, new
* Date(date.getMillis())); }
*/
}
}
}
catch (SecurityException e) {
e.printStackTrace();
}
catch (NoSuchMethodException e) {
e.printStackTrace();
}
catch (IllegalArgumentException e) {
e.printStackTrace();
}
catch (IllegalAccessException e) {
e.printStackTrace();
}
catch (InvocationTargetException e) {
e.printStackTrace();
}
}
return o;
}
/** 判断某个类里是否有某个方法
*
* @param clazz
* @param methodName
* @return */
public static boolean isHaveSuchMethod(Class<?> clazz, String methodName) {
Method[] methodArray = clazz.getMethods();
boolean result = false;
if (null != methodArray) {
for (int i = 0; i < methodArray.length; i++) {
if (methodArray[i].getName().equals(methodName)) {
result = true;
break;
}
}
}
return result;
}
public static void main(String[] args) {
System.err.println(underscoreName("name_full"));
System.err.println(camelCaseName("NAME_FILL"));
System.err.println(camelCaseName("nameFill"));
System.err.println(underscoreName("nameFill"));
}
}
暂时没有封装,直接用main学习演示基本操作的的,代码如下:
package com.es.demo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args ) throws IOException
{
EsClient esClient = new EsClient();
TransportClient client = esClient.getClient();
//---------------演示查询
/* SearchRequestBuilder request = client.prepareSearch("wesley")
.setTypes("test").setSearchType(SearchType.QUERY_THEN_FETCH);
//初始化查询参数
BoolQueryBuilder query = QueryBuilders.boolQuery();
query.must(QueryBuilders.termQuery("author","author"));
request.setQuery(query);
//查询操作
SearchResponse response = request.execute().actionGet();
List<Test> returnList = new ArrayList<Test>();
SearchHits hints = response.getHits();
for(SearchHit theHit : hints){// 每条纪录
Map<String,Object> testInfo = new HashMap<String,Object>();
for(Map.Entry<String, Object> entity : theHit.getSource().entrySet()){
testInfo.put(entity.getKey(), entity.getValue()==null ? null : entity.getValue().toString());// 根据数值大小,value 可能为Integer/Long 不依赖ES映射类型
}
returnList.add((Test)ConverterUtil.mapToObject(Test.class, testInfo));
}
for(Test test:returnList){
System.out.println("-----------------"+test);
}*/
//---------------演示查询------------end
//-----------演示创建索引,类型,文档-----------
/*ObjectMapper objectMapper = new ObjectMapper();
Test test = new Test();
test.setAuthor("author6");
test.setCategory("category6");
test.setTitle("title6");
test.setContent("content6");
String source = objectMapper.writeValueAsString(test);
System.out.println("--------"+source);
IndexRequestBuilder indexBuilder = client.prepareIndex().setIndex("wesley6").setType("test6").setSource(source);
IndexResponse indexResponse = indexBuilder.execute().actionGet();
System.out.println("--------"+indexResponse.isCreated()+"----------");*/
//-----------演示创建索引,类型,文档-----------end
//------------演示删除文档-------
/*DeleteRequestBuilder deleteRequestBuilder = client.prepareDelete("wesley6", "test6", "AVmV5176AmysSwOEBaDt");
DeleteResponse deleteResponse = deleteRequestBuilder.execute().actionGet();
System.out.println("------isDelete-----"+deleteResponse.isFound());*/
//------------演示删除文档-------end
//------------演示删除指定索引-------
/*DeleteIndexRequestBuilder deleteRequestBuilder = client.admin().indices().prepareDelete("wesley6");
DeleteIndexResponse deleteResponse = deleteRequestBuilder.execute().actionGet();
System.out.println("------isDelete-----"+deleteResponse.isAcknowledged());*/
//------------演示删除指定索引-------end
//----------演示创建索引,类型
// 使用XContentBuilder创建Mapping
/*XContentBuilder builder =
XContentFactory.jsonBuilder()
.startObject()
.field("properties")
.startObject()
.field("name")
.startObject()
.field("index", "not_analyzed")
.field("type", "string")
.endObject()
.field("age")
.startObject()
.field("index", "not_analyzed")
.field("type", "integer")
.endObject()
.endObject()
.endObject();
System.out.println(builder.string());
//创建索引
CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate("wesley00");
CreateIndexResponse createIndexResponse = createIndexRequestBuilder.execute().actionGet();
System.out.println("------isCreated----"+createIndexResponse.isAcknowledged());
//创建type
PutMappingRequest mappingRequest = Requests.putMappingRequest("wesley00").source(builder).type("user");
PutMappingResponse putMappingResponse = client.admin().indices().putMapping(mappingRequest).actionGet();
System.out.println("------isCreated----"+putMappingResponse.isAcknowledged());*/
//----------演示创建索引,类型---end
//---------演示更新文档----
XContentBuilder builder =
XContentFactory.jsonBuilder()
.startObject()
.field("author", "lisi")
.field("title", "update111")
.field("content","111111")
.field("category","category-update")
.endObject();
UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate().setIndex("wesley").setType("test").setDoc(builder).setId("1111");
System.out.println("----------"+updateRequestBuilder);
UpdateResponse updateResponse = updateRequestBuilder.execute().actionGet();
System.out.println("---------"+updateResponse);
//说明isCreated这个方法如果返回的是true说明原来没有数据重新创建了,如果返回false说明更新成功。不知道他们为什么这么设计。
//Returns true if document was created due to an UPSERT operation 源码中的注释说明。
System.out.println("---------"+updateResponse.isCreated()+"---version-"+updateResponse.getVersion()+"-------------"); //
////---------演示更新文档----end--
}
}
最后,稍作总结,上面都是只贴出了代码,也没给出比较详细的解释,这个大家可以参考着官网的介绍,然后结合贴出的代码,估计就能够看懂了。另外,还有其它java api比如聚合查询,暂时也没去研究,大家可以去学习下,我也是刚学,有啥问题的地方欢迎指出,谢谢!
ElasticSearch2.x API变化官网链接 https://www.elastic.co/guide/en/elasticsearch/reference/2.3/breaking-changes-2.0.html
Elasticsearch 学习一(基础入门).
一、Elasticsearch 简介
你可以这么形容 Elasticsearch :
- 一个分布式的实时文档存储,每个字段都可以被索引与搜索
- 一个分布式实时分析搜索引擎
- 能胜任上百个服务节点的扩展,并支持 PB 级别的结构化或者非结构化数据
Elasticsearch 是一个实时分布式搜索和分析引擎,建立在一个全文搜索引擎库 Apache Lucene 基础之上,而 Lucene 是当下最先进、高性能、全功能的搜索引擎库。
但是 Lucene 仅仅只是一个库。为了充分发挥其功能,你需要使用 Java 并将 Lucene 直接集成到应用程序中。 更糟糕的是,您可能需要获得信息检索学位才能了解其工作原理。因为 Lucene 非常复杂。
Elasticsearch 也是使用 Java 编写的,它的内部使用 Lucene 做索引与搜索,但是它的目的是使全文检索变得简单,通过隐藏 Lucene 的复杂性,取而代之的提供一套简单一致的 RESTful API。
- 维基百科使用 Elasticsearch 提供全文搜索并高亮关键字,以及输入实时搜索(search-asyou-type)和搜索纠错(did-you-mean)等搜索建议功能。
- 英国卫报使用 Elasticsearch 结合用户日志和社交网络数据提供给他们的编辑以实时的反馈,以便及时了解公众对新发表的文章的回应。
- StackOverflow 结合全文搜索与地理位置查询,以及 more-like-this 功能来找到相关的问题和答案。
- Github 使用 Elasticsearch 检索 1300 亿行的代码。
- ...
Elasticsearch 不仅用于大型企业,它还让像 DataDog 以及 Klout 这样的创业公司将最初的想法变成可扩展的解决方案,Elasticsearch 可以在你的笔记本上运行,也可以在数以百计的服务器上处理 PB 级别的数据。
二、Elasticsearch 安装
- Installing Elasticsearch
- Docker 下安装 ElasticSearch 和 Kibana
- kibana 7.* 设置中文汉化
- Docker-compose 部署 ELK
五、Elasticsearch 基本概念
文档(Document)
JSON 格式,Elasticsearch 存储的最小单位,可以理解为是关系型数据库中的一条记录,每个文档都有自己的一个 unique id。
文档元数据,用于标注文档的相关信息
- _index:文档所属的索引名
- _type:文档所属的类型名
- _source:文档的原始Json数据
- _id:文档唯一id
- _version:文档版本信息
- _score:文档相关度打分
索引(Index)
索引是文档的一个容器,类比于关系型数据库的数据库概念,索引中的 setting 里定义有多少个 shards 来存储索引数据,.........
elasticsearch 学习总结
可以把 es 看成是个数据库,带索引的易扩展的高效查询数据库
Elasticsearch 学习总结六 使用 Observer 实现 HBase 到 Elasticsearch 的数据同步
-
最近在公司做统一日志收集处理平台,技术选型肯定要选择 elasticsearch,因为可以快速检索系统日志,日志问题排查及功业务链调用可以被快速检索,公司各个应用的日志有些字段比如说 content 是不需要在 es 中作为存储的,当时考虑使用一种 keyValue 形式的数据库作存储,然后使用 hbase 的 Rowkey 作为 es 的 docId,实现数据检索在 es 中,存储在 hbase 中,这样可以大大减轻 es 的存储压力。
-
什么是 Observer
HBase 0.92 版本引入了协处理器(Coprocessor),可以使开发者将自己的代码嵌入到 HBase 中,其中协处理器分为两大块,一个是终端(Endpoint),另一个是本文将要介绍的观察者(Observer)。
Observer 有些类似于 MySQL 中的触发器(Trigger),它可以为 HBase 中的操作添加钩子,并在事件发生后实现自己的的业务逻辑。
- Observer 主要分为三种:
RegionObserver:增删改查相关,例如 Get、Put、Delete、Scan 等 WALObserver:WAL 操作相关 MasterObserver:DDL - 类型相关,例如创建、删除、修改数据表等
数据同步将会使用 RegionObserver 监听 Put 和 Delete 事件。
- 如何实现自定义的的 Observer
每一个 Observer 都是一个 Jar 包。首先需要引入 hbase-server 包,并实现如 BaseRegionObserver 等 HBase 提供的相关接口,重写需要监听对应事件的方法。
实现数据同步功能可以重写 postPut 和 putDelete 方法监听 Put 和 Delete 事件。
下面就是一个最简单的例子,在这两个方法中分别得到 hbsae 表名和 RowKey 分别对应着 es 中的 indexName 和 docId
public class HbaseToEsObserver extends BaseRegionObserver {
private static Client client = null;
private static final Log LOG = LogFactory.getLog(HbaseToEsObserver.class);
public static final String SEARCH_INDICE_PATTERN = "idx_%s_%s";
/**
* 读取HBase Shell的指令参数
* @param env
*/
private void readConfiguration(CoprocessorEnvironment env) {
Configuration conf = env.getConfiguration();
EsConfig.clusterName = conf.get("es_cluster");
EsConfig.nodeHost = conf.get("es_host");
EsConfig.nodePort = conf.getInt("es_port", 9300);
EsConfig.indexName = conf.get("es_index");
EsConfig.typeName = conf.get("es_type");
LOG.info("observer -- started with config: " + EsConfig.getInfo());
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
readConfiguration(env);
client = EsSearchManager.getInstance().getClient();
}
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) {
try {
LOG.debug("es 索引开始 begin");
String indexId = new String(put.getRow());
Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String, Object> json = new HashMap<String, Object>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
LOG.info("key="+key+"value="+value);
}
}
//es中索引表的名称是idx_xxx_xxx
String tableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
String indexName = String.format(SEARCH_INDICE_PATTERN, EsConfig.indexName,tableName).toLowerCase();
ElasticSearchUtil.addUpdateBuilderToBulk(client.prepareUpdate(indexName, EsConfig.typeName, indexId).setUpsert(json));
} catch (Exception ex) {
LOG.error(ex);
}
}
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
try {
String indexId = new String(delete.getRow());
ElasticSearchUtil.addDeleteBuilderToBulk(client.prepareDelete(EsConfig.indexName, EsConfig.typeName, indexId));
LOG.info("observer -- delete a doc: " + indexId);
} catch (Exception ex) {
LOG.error(ex);
}
}
- 当日志 hbase 中一条条插入到 hbase 中的时候就会触发协处理器动作,为了减轻 es 服务器操作的压力我们批量操作 es 中的数据,先将索引数据存储到 BulkRequestBuilder,当缓冲池中的索引数据为 10 条或者当提交间隔达到最大提交间隔的时候批量将索引数据发送到 es 服务器中。下面看下 ElasticSearchUtil 中的代码
public class ElasticSearchUtil {
private static final Log LOG = LogFactory.getLog(ElasticSearchUtil.class);
// 缓冲池容量
private static final int MAX_BULK_COUNT = 10;
// 最大提交间隔(秒)
private static final int MAX_COMMIT_INTERVAL = 60 * 2;
private static Client client = null;
private static BulkRequestBuilder bulkRequestBuilder = null;
private static Lock commitIndexLock= new ReentrantLock();
static {
try {
client = EsSearchManager.getInstance().getClient();
bulkRequestBuilder = client.prepareBulk();
bulkRequestBuilder.setRefresh(true);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new CommitIndexTimer(),
30 * 1000,
MAX_COMMIT_INTERVAL * 1000,
TimeUnit.MILLISECONDS);
}catch(Exception e){
LOG.error(e.getMessage());
}
}
/**
* 判断缓存池是否已满,批量提交
*
* @param threshold
*/
private static void bulkRequest(int threshold) {
if (bulkRequestBuilder.numberOfActions() > threshold) {
LOG.info("执行索引程序,当前池中待索引数量="+bulkRequestBuilder.numberOfActions());
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
if (!bulkResponse.hasFailures()) {
LOG.info("es索引程序成功!");
bulkRequestBuilder = client.prepareBulk();
}
if (bulkResponse.hasFailures()) {
LOG.error("es索引异常:"+bulkResponse.buildFailureMessage());
}
}
}
/**
* 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步
* 定时执行
*/
static class CommitIndexTimer implements Runnable {
@Override
public void run() {
commitIndexLock.lock();
try {
bulkRequest(0);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
commitIndexLock.unlock();
}
}
}
}
然后将项目打成 jar 包,提交到 hdfs 中,然后使用 HBase Shell 创建一个表,将这个 Observer 挂到该表中:
create ''businessslog'',''info''
disable ''businessslog''
alter ''businessslog'',METHOD =>''table_att'',''coprocessor'' => ''hdfs://hadoop26:9000/observer.jar|com.github.hbase.observer.HbaseToEsObserver|1001|es_cluster=myes,es_type=loginfo,es_index=test,es_port=9300,es_host=114.55.253.15''
enable ''businessslog''
describe ''businessslog''
最后使用 describe ''businessslog'' 命令就可以查看协处理器是否挂载成功,使用命令挂载协处理器还是有点麻烦,为此 封装了 hbase 创建表的时候自动建立协处理器的代码如下,不用在使用麻烦的命令建立协处理器了,直接调用 Java 方法创建,方便了许多
public void createTableWithCoprocessor(String tableName,String oberverName,String path,Map<String,String> map, String...familyColumn) throws Exception {
TableName table = TableName.valueOf(tableName);
Admin admin = getConn().getAdmin();
boolean isExists = admin.tableExists(table);
if(isExists){
return ;
}else{
try {
HTableDescriptor htd = new HTableDescriptor(table);
for (String fc : familyColumn) {
HColumnDescriptor hcd = new HColumnDescriptor(fc);
htd.addFamily(hcd);
}
admin.createTable(htd);
admin.disableTable(table);
HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
for (String fc : familyColumn) {
HColumnDescriptor hcd = new HColumnDescriptor(fc);
hTableDescriptor.addFamily(hcd);
}
hTableDescriptor.addCoprocessor(oberverName, new Path(path), Coprocessor.PRIORITY_USER, map);
admin.modifyTable(table, hTableDescriptor);
admin.enableTable(table);
admin.close();
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}
总结: es:可以实现复杂快速查询,但是不适合存储海量数据 (针对一些大字段,不存储) hbase:可以实现海量数据存储,但是不适合进行复杂查询 es+hbase 可以实现海量数据的复杂快速查询,在这里 es 可以认为是 hbase 的二级索引
es 中还需要将 mapping 映射配置正确,确保某些大字段建立索引 不存储,这里就在赘述,如上就可以实现当检索的时候还是在 es 中查询,当查询具体能容的时候再去 hbase 根据 rowkey 也就是 es 中的 docId 定位具体日志内容。
以上总结了部分代码,详细的代码请查看 github 地址 https://github.com/winstonelei/BigDataTools ,包括了一些大数据组件的基本操作,包含了 hbase,hadoop,es,hive 等
今天关于ElasticSearch学习总结和基础篇,可学习,可复习的介绍到此结束,谢谢您的阅读,有关ElasticSearch JAVA API基础学习------Elasticsearch学习(二)、Elasticsearch 学习一(基础入门).、elasticsearch 学习总结、Elasticsearch 学习总结六 使用 Observer 实现 HBase 到 Elasticsearch 的数据同步等更多相关知识的信息可以在本站进行查询。
本文标签: