Elasticsearch几点体会

很久没有写博客了,感觉快要生疏,今天简单写一点,记录发现的几个问题。

1,
在集群增加一个节点后,不要只看是否启动成功,一定要验证下是否加入集群
考虑到32G内存的官方推荐,很多人会选择同一物理机部署两个以上节点(>128G内存),分配两个端口。比如9300/19300.
比如集群在 10.135.30.12:9200/9300 是一个master节点,之后拷贝配置新增如下一个节点:

1
2
3
4
5
cluster.name: elasts
node.master: false
node.data: true
transport.tcp.port: 19300
discovery.zen.ping.unicast.hosts: ["10.135.30.12"]

会发现该节点启动成功,但是没有加入到elasts这个cluster里。 设置为debug级别再启动,不仔细看是发现不了问题的。

官方是这么解释:

1
2
3
4
Unicast discovery provides the following settings with the discovery.zen.ping.unicast.hosts:
Either an array setting or a comma delimited setting. Each value should be in the form of host:port or host (
where port defaults to the setting transport.profiles.default.port falling back to transport.tcp.port if not set).
Note that IPv6 hosts must be bracketed. Defaults to 127.0.0.1, [::1]

简单来讲就是上述节点默认使用配置里的 transport.tcp.port 这个端口做discover,而不是 9300.
所以建议配置 discovery.zen.ping.unicast.hosts 的时候一定配置端口(使用2.x之后的单播即可)

那么原文 “ort defaults to the setting transport.profiles.default.port falling back to transport.tcp.port if not set”是什么意思?
5.2.2代码为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
UnicastZenPing.java
public class UnicastZenPing extends AbstractComponent implements ZenPing {
public static final String ACTION_NAME = "internal:discovery/zen/unicast";
public static final Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING =
Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(),
Property.NodeScope);
...
if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) {
configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings);
// we only limit to 1 addresses, makes no sense to ping 100 ports
limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT;
} else {
// if unicast hosts are not specified, fill with simple defaults on the local machine
configuredHosts = transportService.getLocalAddresses();
limitPortCounts = LIMIT_LOCAL_PORTS_COUNT;
}
...
public static List<DiscoveryNode> resolveHostsLists(
...
final List<String> hosts,
...) throws InterruptedException {
...
// create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
final List<Callable<TransportAddress[]>> callables =
hosts.stream()
.map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts))
.collect(Collectors.toList());
final List<Future<TransportAddress[]>> futures =
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
// hostname with the corresponding task by iterating together
final Iterator<String> it = hosts.iterator();
for (final Future<TransportAddress[]> future : futures) {
final String hostname = it.next();
if (!future.isCancelled()) {
assert future.isDone();
try {
final TransportAddress[] addresses = future.get();
logger.trace("resolved host [{}] to {}", hostname, addresses);
for (int addressId = 0; addressId < addresses.length; addressId++) {
discoveryNodes.add(
new DiscoveryNode(
nodeId_prefix + hostname + "_" + addressId + "#",
addresses[addressId],
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
}
} catch (final ExecutionException e) {
assert e.getCause() != null;
final String message = "failed to resolve host [" + hostname + "]";
logger.warn(message, e.getCause());
}
} else {
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
}
}
return discoveryNodes;
}
...
TransportService.java
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return transport.addressesFromString(address, perAddressLimit);
}
TcpTransport.java
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit);
}
private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$");
/** parse a hostname+port range spec into its equivalent addresses */
static TransportAddress[] parse(String hostPortString, String defaultPortRange, int perAddressLimit) throws UnknownHostException {
Objects.requireNonNull(hostPortString);
String host;
String portString = null;
if (hostPortString.startsWith("[")) {
// Parse a bracketed host, typically an IPv6 literal.
Matcher matcher = BRACKET_PATTERN.matcher(hostPortString);
if (!matcher.matches()) {
throw new IllegalArgumentException("Invalid bracketed host/port range: " + hostPortString);
}
host = matcher.group(1);
portString = matcher.group(2); // could be null
} else {
int colonPos = hostPortString.indexOf(':');
if (colonPos >= 0 && hostPortString.indexOf(':', colonPos + 1) == -1) {
// Exactly 1 colon. Split into host:port.
host = hostPortString.substring(0, colonPos);
portString = hostPortString.substring(colonPos + 1);
} else {
// 0 or 2+ colons. Bare hostname or IPv6 literal.
host = hostPortString;
// 2+ colons and not bracketed: exception
if (colonPos >= 0) {
throw new IllegalArgumentException("IPv6 addresses must be bracketed: " + hostPortString);
}
}
}
// if port isn't specified, fill with the default
if (portString == null || portString.isEmpty()) {
portString = defaultPortRange;
}
// generate address for each port in the range
Set<InetAddress> addresses = new HashSet<>(Arrays.asList(InetAddress.getAllByName(host)));
List<TransportAddress> transportAddresses = new ArrayList<>();
int[] ports = new PortsRange(portString).ports();
int limit = Math.min(ports.length, perAddressLimit);
for (int i = 0; i < limit; i++) {
for (InetAddress address : addresses) {
transportAddresses.add(new InetSocketTransportAddress(address, ports[i]));
}
}
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
}
TransportSettings.java
public static final Setting<String> PORT =
new Setting<>("transport.tcp.port", "9300-9400", Function.identity(), Property.NodeScope);

也可看到,es这块debug的日志有所欠缺,如果把UnicastZenPing这个操作时候的实际的端口log下来,方便快速定位问题。

2,
ES索引性能的优化
之前文章已推荐Ebay一篇文章总结过将Elasticsearch优化到极致的技巧,这里就不再重复,来看点不一样的:
不过,这里也不会讨论,诸如 优化索引的 index.refresh_interval,优化segment的index.merge,甚至磁盘的索引均衡(扩分片/reroute)/replia数减少。更不会讨论诸如索引字段的分词/exclude/_all字段禁用/优化_source字段等。

来看下面三个参数:

1
2
3
index.translog.durability: async
index.translog.sync_interval: 90s
index.translog.flush_threshold_size: 1024mb

主要是 index.translog.durability,看文档 Translog

Translog settingsedit
The data in the transaction log is only persisted to disk when the translog is fsynced and committed. In the event of hardware failure, any data written since the previous translog commit will be lost.

By default, Elasticsearch fsyncs and commits the translog every 5 seconds if index.translog.durability is set to async or if set to request (default) at the end of every index, delete, update, or bulk request. In fact, Elasticsearch will only report success of an index, delete, update, or bulk request to the client after the transaction log has been successfully fsynced and committed on the primary and on every allocated replica.

The following dynamically updatable per-index settings control the behaviour of the transaction log:

  • index.translog.sync_interval
    How often the translog is fsynced to disk and committed, regardless of write operations. Defaults to 5s. Values less than 100ms are not allowed.
  • index.translog.durability
    Whether or not to fsync and commit the translog after every index, delete, update, or bulk request. This setting accepts the following parameters:
    • request
      (default) fsync and commit after every request. In the event of hardware failure, all acknowledged writes will already have been committed to disk.
    • async
      fsync and commit in the background every sync_interval. In the event of hardware failure, all acknowledged writes since the last automatic commit will be discarded.

即,为了保证数据不丢失,es translog默认的额持久化策略是 每次请求都会flush。这里暂不代码展开分析,如果应用允许少量的几率丢失数据,那么这里可以设置为异步,并且增加translog大小周期性的flush。

需要注意的是,index.translog.durability 并不是一个dynamic property,即,如果修改索引的该配置,可以删除重建,不过也可以先close该索引,更新setting后再open打开索引。

对于大量请求(每天索引数据量10+亿条,每日索引数据500+GB,1备份,保留5天数据,32CPU,128g内存,机械硬盘,四台实体机),上面的配置优化还是很明显的。

3,
这里简单列个5.x相比2.x的改变 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
"commands" : [
{ "allocate_empty_primary" :
{ "index" : "INDEX", "shard" : 0, "node": "<NODE_NAME>"}
}
]
}'
curl -XPOST localhost:9200/_cluster/reroute -d '{
"commands" : [
{
"move" :{
"index" : "za-2018.03.23", "shard" : 0, "from_node" : "node-20.50", "to_node" : "node-20.39"
}
}
]
}'
  • 上述node均可用名字代替,而不必查询id
  • reroute的commands,相比细化了下,如 allocate_empty_primary,这在集群状态为red,分片数据确实并且不可恢复的时候还是有用的。