Unicast discovery provides the following settings with the discovery.zen.ping.unicast.hosts: Either an array setting or a comma delimited setting. Each value should be in the form of host:port or host ( where port defaults to the setting transport.profiles.default.port falling back to transport.tcp.port if not set). Note that IPv6 hosts must be bracketed. Defaults to 127.0.0.1, [::1]
UnicastZenPing.java publicclassUnicastZenPingextendsAbstractComponentimplementsZenPing { publicstaticfinalStringACTION_NAME="internal:discovery/zen/unicast"; publicstaticfinal Setting<List<String>> DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Property.NodeScope); ... if (DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) { configuredHosts = DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); // we only limit to 1 addresses, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } else { // if unicast hosts are not specified, fill with simple defaults on the local machine configuredHosts = transportService.getLocalAddresses(); limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; } ... publicstatic List<DiscoveryNode> resolveHostsLists( ... final List<String> hosts, ...)throws InterruptedException { ... // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete final List<Callable<TransportAddress[]>> callables = hosts.stream() .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts)) .collect(Collectors.toList()); final List<Future<TransportAddress[]>> futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); final List<DiscoveryNode> discoveryNodes = newArrayList<>(); // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the // hostname with the corresponding task by iterating together final Iterator<String> it = hosts.iterator(); for (final Future<TransportAddress[]> future : futures) { finalStringhostname= it.next(); if (!future.isCancelled()) { assert future.isDone(); try { final TransportAddress[] addresses = future.get(); logger.trace("resolved host [{}] to {}", hostname, addresses); for (intaddressId=0; addressId < addresses.length; addressId++) { discoveryNodes.add( newDiscoveryNode( nodeId_prefix + hostname + "_" + addressId + "#", addresses[addressId], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion())); } } catch (final ExecutionException e) { assert e.getCause() != null; finalStringmessage="failed to resolve host [" + hostname + "]"; logger.warn(message, e.getCause()); } } else { logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); } } return discoveryNodes; } ...
TransportService.java public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return transport.addressesFromString(address, perAddressLimit); }
TcpTransport.java public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException { return parse(address, settings.get("transport.profiles.default.port", TransportSettings.PORT.get(settings)), perAddressLimit); } privatestaticfinalPatternBRACKET_PATTERN= Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$"); /** parse a hostname+port range spec into its equivalent addresses */ static TransportAddress[] parse(String hostPortString, String defaultPortRange, int perAddressLimit) throws UnknownHostException { Objects.requireNonNull(hostPortString); String host; StringportString=null;
if (hostPortString.startsWith("[")) { // Parse a bracketed host, typically an IPv6 literal. Matchermatcher= BRACKET_PATTERN.matcher(hostPortString); if (!matcher.matches()) { thrownewIllegalArgumentException("Invalid bracketed host/port range: " + hostPortString); } host = matcher.group(1); portString = matcher.group(2); // could be null } else { intcolonPos= hostPortString.indexOf(':'); if (colonPos >= 0 && hostPortString.indexOf(':', colonPos + 1) == -1) { // Exactly 1 colon. Split into host:port. host = hostPortString.substring(0, colonPos); portString = hostPortString.substring(colonPos + 1); } else { // 0 or 2+ colons. Bare hostname or IPv6 literal. host = hostPortString; // 2+ colons and not bracketed: exception if (colonPos >= 0) { thrownewIllegalArgumentException("IPv6 addresses must be bracketed: " + hostPortString); } } }
// if port isn't specified, fill with the default if (portString == null || portString.isEmpty()) { portString = defaultPortRange; }
// generate address for each port in the range Set<InetAddress> addresses = newHashSet<>(Arrays.asList(InetAddress.getAllByName(host))); List<TransportAddress> transportAddresses = newArrayList<>(); int[] ports = newPortsRange(portString).ports(); intlimit= Math.min(ports.length, perAddressLimit); for (inti=0; i < limit; i++) { for (InetAddress address : addresses) { transportAddresses.add(newInetSocketTransportAddress(address, ports[i])); } } return transportAddresses.toArray(newTransportAddress[transportAddresses.size()]); }
TransportSettings.java publicstaticfinal Setting<String> PORT = newSetting<>("transport.tcp.port", "9300-9400", Function.identity(), Property.NodeScope);
Translog settingsedit The data in the transaction log is only persisted to disk when the translog is fsynced and committed. In the event of hardware failure, any data written since the previous translog commit will be lost.
By default, Elasticsearch fsyncs and commits the translog every 5 seconds if index.translog.durability is set to async or if set to request (default) at the end of every index, delete, update, or bulk request. In fact, Elasticsearch will only report success of an index, delete, update, or bulk request to the client after the transaction log has been successfully fsynced and committed on the primary and on every allocated replica.
The following dynamically updatable per-index settings control the behaviour of the transaction log:
** index.translog.sync_interval ** How often the translog is fsynced to disk and committed, regardless of write operations. Defaults to 5s. Values less than 100ms are not allowed.
** index.translog.durability ** Whether or not to fsync and commit the translog after every index, delete, update, or bulk request. This setting accepts the following parameters:
request (default) fsync and commit after every request. In the event of hardware failure, all acknowledged writes will already have been committed to disk.
async fsync and commit in the background every sync_interval. In the event of hardware failure, all acknowledged writes since the last automatic commit will be discarded.