irpas技术客

Spring Cloud Netflix-Eureka(六)、集群数据同步_雨后是冰雹

网络 6135

Spring Cloud Netflix之Eureka源码系列文章一共分为六个片段

Spring Cloud Netflix-Eureka(一)、服务注册与发现 Spring Cloud Netflix-Eureka(二)、信息存储原理 Spring Cloud Netflix-Eureka(三)、自我保护机制 Spring Cloud Netflix-Eureka(四)、心跳续约机制 Spring Cloud Netflix-Eureka(五)、多级缓存机制 Spring Cloud Netflix-Eureka(六)、集群数据同步

Spring Cloud Netflix-Eureka、集群数据同步 一、Eureka Server是如何保证集群之间的数据一致性二、Eurke集群数据同步具体实现2.1 PeerAwareInstanceRegistryImpl.replicateToPeers()

一、Eureka Server是如何保证集群之间的数据一致性

Eureka Server 集群不区分主从节点,所有节点相同角色(也就是没有角色 ),完全对等,是 peer to peer 模式。没有一致性算法,全靠复制,是最终一致性。

Eureka Client 可以向任意 Eureka Server 发起任意读写操作,Eureka Server 将操作复制到另外的 Eureka Server 以达到最终一致性,基本原理如下图所示。

二、Eurke集群数据同步具体实现

在 Spring Cloud Netflix-Eureka(一)、服务注册与发现 中,我们分析了服务的注册过程,其中在 PeerAwareInstanceRegistryImpl.register() 这个方法中,当完成服务信息保存后,会调用 replicateToPeers(); 同步信息到集群。实际上,服务的注册、下线、心跳等都会调用该方法进行数据同步,保证数据的最终一致性。具体实现如下。

@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { // 服务注册 public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); // 数据同步 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } // 服务下线 public boolean cancel(final String appName, final String id, final boolean isReplication) { if (super.cancel(appName, id, isReplication)) { // 数据同步 replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); return true; } return false; } // 心跳 public boolean renew(final String appName, final String id, final boolean isReplication) { if (super.renew(appName, id, isReplication)) { // 数据同步 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; } // 状态更新 public boolean statusUpdate(final String appName, final String id, final InstanceStatus newStatus, String lastDirtyTimestamp, final boolean isReplication) { if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) { // 数据同步 replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication); return true; } return false; } // 删除状态并进行覆盖 public boolean deleteStatusOverride(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) { if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) { // 数据同步 replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication); return true; } return false; } } 2.1 PeerAwareInstanceRegistryImpl.replicateToPeers() @Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { // 数据同步 private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) {// 是复制请求 numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication // 集群为空,并且为复制请求 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 遍历集群节点,进行数据同步 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 发送数据同步请求 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } // 发送数据同步请求 private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: // 下线 node.cancel(appName, id); break; case Heartbeat: // 心跳 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: // 注册 node.register(info); break; case StatusUpdate: // 状态更新 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: // 状态删除并覆盖 infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } finally { CurrentRequestVersion.remove(); } } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Spring #Cloud