【ElasticSearch】全量或增量构建索引
本篇文章给大家分享《【ElasticSearch】全量或增量构建索引》,覆盖了数据库的常见基础知识,其实一个语言的全部知识点一篇文章是不可能说完的,但希望通过这些问题,让读者对自己的掌握程度有一定的认识(B 数),从而弥补自己的不足,更好的掌握它。
一、全量构建索引
ElasticSearch集群环境搭建好以后,首次需要全量地从关系型数据库中将目标待索引数据写入到ElasticSearch搜索引擎中,以下我们将用到logstash的插件logstash-input-jdbc来全量同步数据。
1、Logstash管道构建全量索引
Logstash好比一个数据管道,通常一端连接着关系型数据库来往管道中输入数据,一端连接着ElasticSearch将管道中的数据输出。
下载Logstash
- 前往官网下载Logstash,需要与ElasticSearch版本一致,兼容性更好。
- 将下载的Logstash压缩包上传到服务器上,示例将Logstash安装在了与ElasticSearch同一台服务器上。

安装Logstash
#进入logstash压缩包所在目录 cd /usr/local #加压logstash压缩包 unzip logstash-7.8.0.zip

- 安装logstash的插件logstash-input-jdbc
#进入logstash安装目录的bin目录下 cd /usr/local/logstash-7.8.0/bin #安装插件,注意logstash依赖jdk,安装插件前请确保安装了jdk #当前logstash-7.8.0版本已经默认安装了logstash-integration-jdbc,而此插件已经包含logstash-input-jdbc ./logstash-plugin install logstash-input-jdbc #查看logstash所有已安装的插件 ./logstash-plugin list
配置logstash
logstash需要与mysql通信并获取数据,于是需要准备与mysql通信的驱动程序,需要准备拉取数据所需要执行的sql语句,还需要logstash的输入、输出端点。
- 创建相关目录及文件
cd /usr/local/logstash-7.8.0 mkdir mysql cd mysql touch jdbc.conf touch jdbc.sql

- mysql驱动程序,前往maven中央仓库获取
- 获取数据的sql脚本(jdbc.sql)
SELECT
a.id,
a.NAME,
a.tags,
concat( a.latitude, ',', a.longitude ) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.NAME AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN category b ON a.category_id = b.id
INNER JOIN seller c ON c.id = a.seller_id
- logstash输入、输出配置
input {
jdbc {
# mysql 数据库链接,dianping为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "123456"
# 驱动
jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称
statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.15.151:9200"]
# 索引名称
index => "shop"
document_type => "_doc"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
启动logstash
#进入logstash安装目录 cd /usr/local/logstash-7.8.0 #后台进程方式启动logstash nohup ./bin/logstash -f mysql/jdbc.conf & #查看logstash日志 tail -f -n 100 ./nohup.out

说明:从以上日志中看出logstash正常启动,并且每分钟会进行全量同步数据,日志中也打印出了执行的sql,首先会查个总数,为了后续的分页获取数据。
二、增量构建索引
1、Canal管道准实时构建增量索引
Canal简介
canal将自己伪装成mysql主备的slaver,通过模拟mysql主备之间的通信协议,利用mysql主备同步原理,获取binary log并解析,准实时的获取数据的变更。canal管道中数据可以流出到关系型数据库、消息队列、es等等的其它中间件或数据库存储。

- mysql主备复制原理

Canal下载

说明:当前最新的稳定版是1.1.4,我们需要下载如上图中标红的三个压缩包,之所以需要下载源码包是因为这个版本的canal.adapter仅支持6.x的es,而我用的是7.8.0的es,需要稍微修改下源码,重新编译打包canal.adapter才能使用。
Canal.deployer部署
Canal.deployer的作用是伪装成mysql主备的一个slaver,发送dump指令获取binary log。依赖于jdk,需要先安装配置jdk。
开启mysql的二进制日志
默认mysql没有开启binary log,我们需要确认mysql是否开启了binary log,如果没有开启,那么Canal的能力将无法发挥。

说明:以上标红的三行是需要配置的,配置后重启mysql服务,使配置生效。
- 验证mysql开启了binary log
mysql> show variables like 'log_bin';

为canal创建mysql用户
canal作为一个伪slaver应该单独创建一个专用用户,用root用户不合适。权限太大,过于危险。
- 创建canal用户并授权
#仅仅授予查询权限及其复制所需权限,注意密码过短会违反mysql默认密码策略 #%表示远端所有主机都能访问 grant select,replication slave,replication client on *.* to 'canal'@'%' identified by '123456'; #localhost表示仅本地当前主机能访问 grant select,replication slave,replication client on *.* to 'canal'@'localhost' identified by '123456'; #刷新权限,持久化 flush privileges;
Canal.deployer配置启动
- 修改instance.properties文件
#进入canal.deployer解压目录 cd /usr/local/canal.deployer-1.1.4 #修改配置文件 vim ./conf/example/instance.properties ########修改如下四个配置项即可,其它默认######## #设置伪slaver id,与mysql master 设置的要不同 canal.instance.mysql.slaveId=3 #mysql master地址及端口 canal.instance.master.address=192.168.15.150:3306 #mysql连接用户及密码 canal.instance.dbUsername=canal canal.instance.dbPassword=910625 #############################################

- 修改后的instance.properties文件
################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=3 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.15.150:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=910625 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
- Canal.deployer启动
#进入canal.deployer解压目录 cd /usr/local/canal.deployer-1.1.4 #启动canal.deployer ./bin/startup.sh #canal.deployer默认会暴露11111端口,与其消费者通信 netstat -an | grep 11111
Canal.adapter部署
Canal.adapter扮演着Canal.deployer的消费端的角色,作为一个适配者的身份存在,它从Canal.deployer管道中获取到数据后,通过自身的适配转存到其它数据库或者中间件做进一步处理。
canal.adapter源码小调整
由于canal.adapter在目前的1.1.4版本中不支持es7,所以需要对其源码做个小调整,让其支持es7。
下载canal源码并解压到本地

用idea打开client-adapter子模块

修改elasticsearch子模块下pom.xml
<!--仅仅需要将如下几个es的依赖版本号修改成es服务的版本一致--> <dependency><groupid>org.elasticsearch</groupid><artifactid>elasticsearch</artifactid><version>7.8.0</version></dependency><dependency><groupid>org.elasticsearch.client</groupid><artifactid>transport</artifactid><version>7.8.0</version></dependency><dependency><groupid>org.elasticsearch.client</groupid><artifactid>elasticsearch-rest-client</artifactid><version>7.8.0</version></dependency><dependency><groupid>org.elasticsearch.client</groupid><artifactid>elasticsearch-rest-high-level-client</artifactid><version>7.8.0</version></dependency>
修改elasticsearch子模块下几个编译失败的点
es新版客户端api做了细微调整,基本仅需要修改如下几个文件,ESConnection.java、ESAdapter.java、ESTemplate.java就能被正确编译了。
- MappingMetaData修改成了MappingMetadata
- com.alibaba.otter.canal.client.adapter.es.support.ESConnection.ESBulkRequest#bulk修改

- com.alibaba.otter.canal.client.adapter.es.ESAdapter#count修改

说明:修改如下几个文件里的几个编译通不过的点后,可以使用maven重新编译打包。
maven重新编译打包
#回到源码根目录下用maven编译打包,跳过测试 D:\DeveloperPackage\canal\canal-canal-1.1.4>mvn clean package -Dmaven.test.skip=true
- 找到修改后重新被编译打包好的canal-adapter
#此目录即是我们可以正常使用的canal-adapter #我们仅需要将canal-adapter此目录文件打包上传到服务器后,修改些许配置文件后就能正常运行了 D:\DeveloperPackage\canal\canal-canal-1.1.4\client-adapter\launcher\target\canal-adapter
canal.adapter配置启动
将我们上一步重新编译打包好的canal-adapter上传到服务器后,解压目录如下图所示。

修改application.yml
进入canal-adapter解压目录 cd /usr/local/canal-adapter vim conf/application.yml
- 修改application.yml中数据源配置

说明:告诉canal-deployer需要关注的具体的数据库。
- 修改application.yml中适配器配置

说明:配置数据输出目的地,此处配置为es,采用transport模式与es服务通信。
- 修改后的application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 127.0.0.1:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.15.150:3306/dianping?useUnicode=true
username: canal
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es
hosts: 192.168.15.151:9300
properties:
#mode: rest
# security.auth: test:123456 # only used for rest mode
cluster.name: test-app
创建与es映射的配置文件
cd /usr/local/canal-adapter/conf/es touch shop.yml
- shop.yml内容如下
#配置在client-adapter的application.yml配置文件中,表示数据源 dataSourceKey: defaultDS #配置在client-adapter的application.yml配置文件中,表示管道中数据的目的地,如es destination: example groupId: esMapping: #es索引名称 _index: shop _type: _doc _id: id #表示文档不存在就插入,存在则更新 upsert: true sql: "select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c.id = a.seller_id" commitBatch: 3000
启动canal.adapter
cd /usr/local/canal-adapter #执行启动脚本 ./bin/startup.sh #关闭脚本 ./bin/stop.sh
- 查看canal.adapter启动日志
tail -f -n 100 logs/adapter/adapter.log

说明:此时canal.adapter成功启动,可以尝试修改mysql库中的记录,验证是否准实时地同步到了es中。
Canal自定义客户端
canal.adapter不够灵活,处理多表连接还可能会有问题,如字段冲突等。我们可以自定义canal客户端接入canal服务端,从canal服务端批量获取变更记录,然后利用es客户端更新索引。
相关依赖
<!-- es客户端依赖 --> <dependency><groupid>org.elasticsearch.client</groupid><artifactid>elasticsearch-rest-client</artifactid><version>7.8.0</version></dependency><dependency><groupid>org.elasticsearch</groupid><artifactid>elasticsearch</artifactid><version>7.8.0</version></dependency><dependency><groupid>org.elasticsearch.client</groupid><artifactid>elasticsearch-rest-high-level-client</artifactid><version>7.8.0</version></dependency><!-- canal客户端依赖 --><dependency><groupid>com.alibaba.otter</groupid><artifactid>canal.client</artifactid><version>1.1.4</version></dependency><dependency><groupid>com.alibaba.otter</groupid><artifactid>canal.common</artifactid><version>1.1.4</version></dependency><dependency><groupid>com.alibaba.otter</groupid><artifactid>canal.protocol</artifactid><version>1.1.4</version></dependency>
canal客户端连接器
package com.imooc.dianping.canal;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Component
public class CanalClient implements DisposableBean{
private CanalConnector canalConnector;
@Bean
public CanalConnector getCanalConnector(){
canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(
new InetSocketAddress("192.168.15.157", 11111)),
"example","canal","910625"
);
canalConnector.connect();
//订阅,可指定filter,格式{database}.{table}
canalConnector.subscribe();
//回滚寻找上次中断的为止
canalConnector.rollback();
return canalConnector;
}
@Override
public void destroy() throws Exception {
if(canalConnector != null){
canalConnector.disconnect();
}
}
}
es客户端
package com.imooc.dianping.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchRestClient {
@Value("${elasticsearch.ip}")
String ipAddress;
/**
* 高级别的ES客户端,与es server通信
* @return
*/
@Bean(name="highLevelClient")
public RestHighLevelClient highLevelClient(){
String[] address = ipAddress.split(":");
String ip = address[0];
int port = Integer.valueOf(address[1]);
HttpHost httpHost = new HttpHost(ip,port,"http");
return new RestHighLevelClient(RestClient.builder(new HttpHost[]{httpHost}));
}
}
通过定时任务定时从canal服务拉取变更记录
package com.imooc.dianping.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.imooc.dianping.dal.ShopModelMapper;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class CanalScheduling implements Runnable,ApplicationContextAware {
private ApplicationContext applicationContext;
@Autowired
private ShopModelMapper shopModelMapper;
@Resource
private CanalConnector canalConnector;
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
@Scheduled(fixedDelay = 100)
public void run() {
long batchId = -1;
try{
int batchSize = 1000;
//从canal服务端批量拉取数据(未确认)
Message message = canalConnector.getWithoutAck(batchSize);
batchId = message.getId();
List<canalentry.entry> entries = message.getEntries();
//判断正确返回了数据,并处理
if(batchId != -1 && entries.size() > 0){
entries.forEach(entry -> {
if(entry.getEntryType() == CanalEntry.EntryType.ROWDATA){
//解析处理
publishCanalEvent(entry);
}
});
}
//向canal服务端确认,客户端已成功消费
canalConnector.ack(batchId);
}catch(Exception e){
e.printStackTrace();
//出现异常,根据消息的编号进行回滚,下次继续消费
canalConnector.rollback(batchId);
}
}
/**
* 处理canal服务端返回的数据
* @param entry
*/
private void publishCanalEvent(CanalEntry.Entry entry){
CanalEntry.EventType eventType = entry.getHeader().getEventType();
//当前数据库名称
String database = entry.getHeader().getSchemaName();
//当前表的名称
String table = entry.getHeader().getTableName();
//改变的行数据
CanalEntry.RowChange change = null;
try {
change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
return;
}
change.getRowDatasList().forEach(rowData -> {
List<canalentry.column> columns = rowData.getAfterColumnsList();
String primaryKey = "id";
CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()
&& primaryKey.equals(column.getName())).findFirst().orElse(null);
Map<string> dataMap = parseColumnsToMap(columns);
try{
indexES(dataMap,database,table);
} catch (IOException e) {
e.printStackTrace();
}
});
}
/**
* 解析行数据
* @param columns
* @return
*/
Map<string> parseColumnsToMap(List<canalentry.column> columns){
Map<string> jsonMap = new HashMap();
columns.forEach(column -> {
if(column == null){
return;
}
jsonMap.put(column.getName(),column.getValue());
});
return jsonMap;
}
/**
* 更新索引
* @param dataMap 行数数据
* @param database 数据库索引
* @param table 表
* @throws IOException
*/
private void indexES(Map<string> dataMap,String database,String table) throws IOException {
if(!StringUtils.equals("dianping",database)){
return;
}
List<map>> result = new ArrayList();
//根据变更表的行数据,刷新影响的相关索引文档
if(StringUtils.equals("seller",table)){
result = shopModelMapper.buildESQuery(new Integer((String)dataMap.get("id")),null,null);
}else if(StringUtils.equals("category",table)){
result = shopModelMapper.buildESQuery(null,new Integer((String)dataMap.get("id")),null);
}else if(StringUtils.equals("shop",table)){
result = shopModelMapper.buildESQuery(null,null,new Integer((String)dataMap.get("id")));
}else{
return;
}
for(Map<string>map : result){
IndexRequest indexRequest = new IndexRequest("shop");
indexRequest.id(String.valueOf(map.get("id")));
indexRequest.source(map);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
</string></map></string></string></canalentry.column></string></string></canalentry.column></canalentry.entry>
说明:通过以上的定时任务定时从canal服务端拉取数据,然后通过es客户端将变更更新到对应的索引中。
2、Logstash管道构建增量索引
其实在以上Logstash管道构建全量索引的基础上,略微修改即可实现基于时间轴的增量构建索引。仅仅需要修改jdbc.conf、新增一个记录时间的文件,修改jdbc.sql文件加入时间条件即可。
获取数据的sql脚本(jdbc.sql)
SELECT
a.id,
a.NAME,
a.tags,
concat( a.latitude, ',', a.longitude ) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.NAME AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN category b ON a.category_id = b.id
INNER JOIN seller c ON c.id = a.seller_id
WHERE
a.update_at > : sql_last_value
OR b.update_at > : sql_last_value
OR c.update_at > : sql_last_value
说明:加入了时间条件,控制抓取时间窗口内的数据,增量更新。
logstash输入、输出配置
input {
jdbc {
#设置时区timezone
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,dianping为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "910625"
# 驱动
jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#记录上次运行元数据文件路径,如记录增量同步的时间窗口
last_run_metadata_path => "/usr/local/logstash-7.8.0/mysql/last_value_meta"
# 执行的sql 文件路径+名称
statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.15.151:9200"]
# 索引名称
index => "shop"
document_type => "_doc"
# 自增ID 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
说明:增加了时区的配置,以及指定了记录上次运行元数据文件路径,如记录增量同步的时间窗口。
新增last_value_meta文件,并写入一个初始时间
2020-07-12 00:00:00
说明:记录上次执行同步任务时的时间,下次任务会继续更新这个时间。

Logstash管道构建增量索引的缺点
这种以时间窗口的方式进行数据同步,始终避免不了同步延迟的问题,但是有时候业务上也是能够允许的,增量同步的时间窗口需要更具业务来定。时间窗口太短,数据更新太多,会出现一次增量同步任务还没完成,下一次增量任务又开始了。
好了,本文到此结束,带大家了解了《【ElasticSearch】全量或增量构建索引》,希望本文对你有所帮助!关注golang学习网公众号,给大家分享更多数据库知识!
精选2020年最新97道Java面试题:spring+Redis+JVM+mysql全在这里了
- 上一篇
- 精选2020年最新97道Java面试题:spring+Redis+JVM+mysql全在这里了
- 下一篇
- Liquibase 数据库版本管理工具:3. changeSet 变更集详解
-
- 数据库 · MySQL | 2天前 |
- MySQL数值函数大全及使用技巧
- 117浏览 收藏
-
- 数据库 · MySQL | 3天前 |
- 三种登录MySQL方法详解
- 411浏览 收藏
-
- 数据库 · MySQL | 4天前 |
- MySQL数据备份方法与工具推荐
- 420浏览 收藏
-
- 数据库 · MySQL | 4天前 |
- MySQL数据备份方法与工具推荐
- 264浏览 收藏
-
- 数据库 · MySQL | 5天前 |
- MySQL索引的作用是什么?
- 266浏览 收藏
-
- 数据库 · MySQL | 6天前 |
- MySQL排序原理与实战应用
- 392浏览 收藏
-
- 数据库 · MySQL | 1星期前 |
- MySQLwhere条件查询技巧
- 333浏览 收藏
-
- 数据库 · MySQL | 1星期前 |
- MySQL常用数据类型有哪些?怎么选更合适?
- 234浏览 收藏
-
- 数据库 · MySQL | 1星期前 |
- MySQL常用命令大全管理员必学30条
- 448浏览 收藏
-
- 数据库 · MySQL | 1星期前 |
- MySQL高效批量插入数据方法大全
- 416浏览 收藏
-
- 数据库 · MySQL | 1星期前 |
- MySQL性能优化技巧大全
- 225浏览 收藏
-
- 数据库 · MySQL | 1星期前 |
- MySQL数据备份4种方法保障安全
- 145浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3182次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3393次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3424次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4528次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3802次使用
-
- golang MySQL实现对数据库表存储获取操作示例
- 2022-12-22 499浏览
-
- 搞一个自娱自乐的博客(二) 架构搭建
- 2023-02-16 244浏览
-
- B-Tree、B+Tree以及B-link Tree
- 2023-01-19 235浏览
-
- mysql面试题
- 2023-01-17 157浏览
-
- MySQL数据表简单查询
- 2023-01-10 101浏览

