跳至主要内容

基于最终一致性的数据同步工具的设计与实现1

基于最终一致性的数据同步工具的设计与实现1

这个系列的文章将基于本人的硕士毕业论文,介绍数据同步工具Syncer


数据同步,顾名思义,就是在跨系统之间进行同步的数据。取决于不同的具体业务目的和应用场景,不同的数据同步工具及框架的功能侧重点往往并不相同,因而不同的人也会使用不同的名称来称呼这类工具,比如数据传输工具,数据采集工具等等。

在系统逐渐发展,复杂度增加之后会产生数据同步的需求。系统需要将数据同步到同构数据库做冗余备份,或者是同步到异构数据库提供其他服务。数据同步工具有许多应用场景:数据库管理员进行数据库实时备份以及数据恢复;微服务开发者为了进行数据自治进行数据冗余,或者为了提高查询效率从而将两张表进行Join;大数据开发平台为了进行大规模在线、离线计算而同步获取业务数据。

常见实现

为了解决数据同步的问题,有许多公司进行了尝试。在国内外许多数据同步工具中,有两种主要的实现方式:

  • 批量查询数据,然后批量导入。常见例子有:为了将 MySQL 数据同步 到 Hive 中,可以直连 MySQL 数据库,然后 Select 表中数据,然后使 用特定格式保存到本地文件作为中间存储,最后把文件 Load 到 Hive 表中。这种方式实现简单,但是也有明显的缺陷:耗时随着数据量的变 大而逐渐变大;大量的 Select 语句对数据源的影响很大,从而影响正 常业务;实时性相对较差。所以这种实现方式逐渐被多数公司放弃,转而开发下面一种方式;

  • 基于数据库的操作日志的数据变更分发。例如,Binary Log 是 MySQL 以二进制形式存储的日志,记录了 MySQL 中的数据变更,MySQL 集 群自身的主从同步也是基于 Binary Log 实现。同步工具通过实时获取 Binary Log 的变更事件,然后合并数据变更,从而还原数据源的数据。 通过这种异步变更通知的方式,可以有效降低对原有数据源的影响,并且具有相当好的实时性和性能。接下来将着重介绍这种数据同步工具的 实现。

目前在国内外基于 MySQL 数据库 Binary Log 的数据同步工具有许多,比如阿里巴巴的 Canal [Canal, 2019],Github 社区维护的 Debezium [Debezium, 2019]。Canal 支持从 MySQL5.7 及以下的版本监听数据变更,然后通过消息队 列 Kafka 进行变更的推送。Debezium 则是一个支持多种数据库的数据变更监听平台,可以监听包括 MongoDB,MySQL,PostgreSQL,SqlServer 等多种主流数据库,然后将数据变更发送至 Kafka 消息队列。

此种依赖主从同步协议的数据同步工具,大多采用监听数据库操作日志再加上消息队列的架构进行实现:同步工具伪装为数据源的从节点,数据源在数据发生变更之后,通知从节点获取数据变更并做简单处理。工具然后发送数据变更至持久化的消息队列(如 Kafka)。最后,真正的数据变更消费方监听 Kafka 消息进行消费,从而实现数据同步的目的。这类工具通过这种架构,利用主从同步协议的确认机制和消息队列的重试和确认机制,从而确保数据变更可以被消费者接收和处理。

此种类型的数据同步工具架构相对简单,支持同步变更到消息队列,需要数据消费方自行编程进行变更的获取及变更和发送操作。对于延时要求极低的场景,例如某些微服务数据自治的情况,无法满足。对于无需对数据变更进行处理的场景,需要消费方进行额外的消息消费开发,并且需要增加对消息队列的依赖,较为繁琐。

所以,我们开发了自己的数据同步工具-- Syncer

数据数据同步工具 Syncer 的目标

数据同步的目的是让数据可以适合业务需求的形式,在不同的系统中用各自擅长的方式运转起来,有多种应用场景。数据同步工具Syncer的核心应用场景有如下几种:

  • 同步微服务数据进行去范式化:将相关联的业务数据进行join同步,减少微服务之间依赖与请求;
  • 同步数据到异构系统
    • 搜索引擎:将业务数据从MySQL同步至Elasticsearch中,进行索引和快速检索;
    • 消息队列:协助进行异构的系统之间的数据交换,进行系统之间的解耦,如大数据计算平台进行消息消费;
  • 同步相关联的拆分表:将范式化的数据表进行join同步,增加数据库查询效率。

数据同步工具Syncer的需要有如下质量属性:

  • 异步更新:业务数据库应几乎不受影响,延迟无可见变化;
  • 最终一致性:同步可以有短暂延时,但是要确保最终结果的一致性;
  • 增量更新:对于数据变更进行同步,而非全量更新;

数据同步工具Syncer的主要模块和功能

数据同步工具 Syncer 的主要模块包含数据的生产者模块以及数据的消费者模块。

生产者模块

生产者模块对外提供消费者注册接口,提供连接远端接口,数据变更分发接口。注册模块负责处理消费者的注册,连接模块负责连接远端获取变更,分发模块负责将数据变更分发。

  • 注册模块将多个消费者注册的用于同步的元信息(包括但不限于:同步来源数据库,表,字段;消费者 id;消费者同步位置)统一处理、合并,供连接模块及分发模块使用。

  • 连接模块:根据注册模块提供的来源数据库信息,连接远端数据源:首先获取用于同步的数据库元信息,然后建立长连接监听数据变更。

  • 分发模块:根据注册模块提供的消费者需要同步的信息,将对应的数据变更封装成统一的数据变更事件,并分发到相应的消费者。

消费者模块

消费者模块又细分为消费者输入模块,消费者处理模块,消费者输出模块和消费者 Ack 模块。

  • 输入模块 输入模块负责接收从生产者生产的数据变更事件,并且在保证一致性的 前提下,分配给多线程处理模块。

  • 处理模块 处理模块根据用户配置,利用多线程,并行处理数据变更事件:如变更字段名称,对数据增删。

  • 输出模块

    • 通道模块:根据用户配置,采用不同的输出通道,将数据变更事件转换为相对 应的数据格式(如 MySQL 的 SQL 语句,Elasticsearch 的查询请 求,Kafka 的记录),并尝试发送。

    • 批量模块: 接收通道模块的异构数据结构,并短暂保存在内存中。根据用户配置,当数据量超过限制,或等待时间超过限制,统一批量发送。

    • 失败处理模块: 根据用户配置,在尝试发送失败之后,进行判断,选择性的进行数 次重试。若数次尝试之后,仍然失败,则记入失败日志,供之后人 工查询。

    • Ack模块: Ack 模块在消费者收到生产者发送的数据变更时间之后,利用预写式日志(Write Ahead Log),记录该数据变更所处的同步位置。并且,Ack 模块会定期将最小的同步位置写入本地存储。当输出模块尝试发送成功之后,从预写式日志删除该记录。在Syncer重启之后,Ack模块会从本地文件读取上次同步的位置,使得Syncer继续从对应位置开始同步。

Ref

评论

此博客中的热门博文

Spring Boot: Customize Environment

Spring Boot: Customize Environment Environment variable is a very commonly used feature in daily programming: used in init script used in startup configuration used by logging etc In Spring Boot, all environment variables are a part of properties in Spring context and managed by Environment abstraction. Because Spring Boot can handle the parse of configuration files, when we want to implement a project which uses yml file as a separate config file, we choose the Spring Boot. The following is the problems we met when we implementing the parse of yml file and it is recorded for future reader. Bind to Class Property values can be injected directly into your beans using the @Value annotation, accessed via Spring’s Environment abstraction or bound to structured objects via @ConfigurationProperties. As the document says, there exists three ways to access properties in *.properties or *.yml : @Value : access single value Environment : can access multi

Elasticsearch: Join and SubQuery

Elasticsearch: Join and SubQuery Tony was bothered by the recent change of search engine requirement: they want the functionality of SQL-like join in Elasticsearch! “They are crazy! How can they think like that. Didn’t they understand that Elasticsearch is kind-of NoSQL 1 in which every index should be independent and self-contained? In this way, every index can work independently and scale as they like without considering other indexes, so the performance can boost. Following this design principle, Elasticsearch has little related supports.” Tony thought, after listening their requirements. Leader notice tony’s unwillingness and said, “Maybe it is hard to do, but the requirement is reasonable. We need to search person by his friends, didn’t we? What’s more, the harder to implement, the more you can learn from it, right?” Tony thought leader’s word does make sense so he set out to do the related implementations Application-Side Join “The first implementation

Implement isdigit

It is seems very easy to implement c library function isdigit , but for a library code, performance is very important. So we will try to implement it and make it faster. Function So, first we make it right. int isdigit ( char c) { return c >= '0' && c <= '9' ; } Improvements One – Macro When it comes to performance for c code, macro can always be tried. #define isdigit (c) c >= '0' && c <= '9' Two – Table Upper version use two comparison and one logical operation, but we can do better with more space: # define isdigit(c) table[c] This works and faster, but somewhat wasteful. We need only one bit to represent true or false, but we use a int. So what to do? There are many similar functions like isalpha(), isupper ... in c header file, so we can combine them into one int and get result by table[c]&SOME_BIT , which is what source do. Source code of ctype.h : # define _ISbit(bit) (1 << (