网站首页 > 技术文章 正文
使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。
背景
MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flink SQL 里的 MySQL CDC Connector 将数据同步到其他数据存储是常见的一种处理方式。
例如 CDC 到 ES 实现数据检索,CDC 到 ClikHouse 进行 OLAP 分析,CDC 到 Kafka 实现数据同步等,然而目前官方 MySQL CDC Connector 还无法实现动态同步表结构,如果新增字段,则下游无法收到新增字段的数据,如果删除字段,那 Flink 任务将会报错退出,需要修改 SQL 后才能正常启动。
对于某些业务来说,数据库 Schema 变动是非常频繁的操作,如果只是变动就需要修改 SQL 并重启 Flink 任务,那么会带来很多不必要的维护成本。
适用版本
flink 1.11
flink-cdc-connector 1.x
无法同步表结构的原因
那么为什么 Flink SQL 无法通过 binlog 来同步表结构呢?查阅下源码可以发现,Flink 进行 binlog 数据转换时主要是通过 Flink SQL 种类似 Create Table 的语法预先定义的 Schema 来进行转换的,具体代码如下:
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(
rowType,
typeInfo,
((rowData, rowKind) -> {}),
serverTimeZone);
...
}
DebeziumDeserializationSchema 是用于转换 binlog 数据到 RowData 的核心接口,创建这个类时传入了 Flink SQL 定义的物理 Schema(封装为 RowType)。
public RowDataDebeziumDeserializeSchema(RowType rowType, TypeInformation<RowData> resultTypeInfo, ValueValidator validator, ZoneId serverTimeZone) {
this.runtimeConverter = createConverter(rowType);
this.resultTypeInfo = resultTypeInfo;
this.validator = validator;
this.serverTimeZone = serverTimeZone;
}
RowDataDebeziumDeserializeSchema 是 DebeziumDeserializationSchema 核心实现类,可以看到 createConverter 方法创建了用于转换 binlog 数据的转换器。
private DeserializationRuntimeConverter createRowConverter(RowType rowType) {
final DeserializationRuntimeConverter[] fieldConverters = rowType.getFields().stream()
.map(RowType.RowField::getType)
.map(this::createConverter)
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
return (dbzObj, schema) -> {
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
GenericRowData row = new GenericRowData(arity);
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Object fieldValue = struct.get(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
...
}
在最核心的转换方法中,Flink 通过 rowType.getFieldNames 获取到了 SQL 定义好的 fieldNames,并在后续的转换函数中通过 fieldName 来读取 binlog 的 schema 和 value,因此当数据库的表结构发生变更时,binlog 数据中即使已经有了新增的 schema 结构与数据,但因为 fieldNames 依然还是旧的,因此无法获取到新的变更。
解决方案
既然 Flink SQL 无法实现需求,那么很容易想到,使用 JAR 作业进行一些自定义扩展是非常适合这个场景的。
- 首先我们需要实现自己的 DebeziumDeserializationSchema,这里实现了一个名为 JsonStringDebeziumDeserializationSchema 的简单示例,实现将 binlog 数据转换为 JSON,在实际业务中可以根据业务需求实现更个性化的操作,例如向下游发送自定义的 Schema 变更通知等等。
public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
@Override
public void deserialize(SourceRecord record, Collector out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
String insert = extractAfterRow(value, valueSchema);
out.collect(new Tuple2<>(true, insert));
} else if (op == Envelope.Operation.DELETE) {
String delete = extractBeforeRow(value, valueSchema);
out.collect(new Tuple2<>(false, delete));
}else {
String after = extractAfterRow(value, valueSchema);
out.collect(new Tuple2<>(true, after));
}
}
private Map<String,Object> getRowMap(Struct after){
return after.schema().fields().stream().collect(Collectors.toMap(Field::name,f->after.get(f)));
}
private String extractAfterRow(Struct value, Schema valueSchema) throws Exception {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Map<String,Object> rowMap = getRowMap(after);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(rowMap);
}
private String extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
Struct after = value.getStruct(Envelope.FieldName.BEFORE);
Map<String,Object> rowMap = getRowMap(after);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(rowMap);
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(new TypeHint<Tuple2<Boolean,String>>(){});
}
}
实现 DebeziumDeserializationSchema 需要实现 deserialize、getProducedType 两个函数。 deserialize 实现转换数据的逻辑,getProducedType 定义返回的类型,这里返回两个参数,第一个Boolean 类型的参数表示数据是 upsert 或是 delete,第二个参数返回转换后的 JSON string,这里的 JSON 将会包含 Schema 变更后的 Column 与对应的 Value。
- 编写启动 Main 函数,将我们自定义的 DebeziumDeserializationSchema 实现设置到 SourceFunction 中
public class MySQLCDC{
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 关闭 Operator Chaining, 令运行图更容易初学者理解
env.disableOperatorChaining();
env.setParallelism(1);
//checkpoint的一些配置
env.enableCheckpointing(params.getInt("checkpointInterval",60000));
env.getCheckpointConfig().setCheckpointTimeout(5000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
SourceFunction source = MySQLSource.builder()
.hostname(params.get("hostname","127.0.0.1"))
.port(params.getInt("port",3306))
.username(params.get("username","root"))
.password(params.get("password",""))
.serverTimeZone("Asia/Shanghai")
//设置我们自己的实现
.deserializer(new JsonStringDebeziumDeserializationSchema())
.databaseList(params.get("databaseList","test"))
.tableList(params.get("tableList","test.my_test"))
.build();
// 定义数据源
DataStream<Tuple2<Boolean, String>> streamSource =
env.addSource(source).name("MySQLSource");
...
env.execute(MySQLCDC.class.getSimpleName());
}
}
建立测试数据库,并插入几条数据
CREATE TABLE `my_test` (
`f_sequence` int(11) DEFAULT NULL,
`f_random` int(11) DEFAULT NULL,
`f_random_str` varchar(255) NOT NULL DEFAULT '',
`Name` varchar(255) DEFAULT '',
`f_date` date DEFAULT NULL,
`f_datetime` datetime DEFAULT NULL,
`f_timestamp` bigint(20) DEFAULT NULL,
PRIMARY KEY (`f_random_str`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
这个时候运行程序,已经可以看到一些输出了。
Schema 变更前输出:
(true,{"f_date":18545,"f_random_str":"1","f_sequence":1,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":1,"Name":"1"})
(true,{"f_date":18545,"f_random_str":"2","f_sequence":2,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":2,"Name":"2"})
(true,{"f_date":18545,"f_random_str":"3","f_sequence":3,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":3,"Name":"3"})
(true,{"f_date":18545,"f_random_str":"4","f_sequence":33333,"f_timestamp":1630486762,"f_datetime":1602328271000,"f_random":4,"Name":"3"})
但是与数据库对比可以发现,这里的时间戳与数据库时间刚好相差了 8 个小时
f_sequence|f_random|f_random_str|Name|f_date |f_datetime |f_timestamp|
----------+--------+------------+----+----------+-------------------+-----------+
1| 1|1 |1 |2020-10-10|2020-10-10 11:11:11| 1630486762|
2| 2|2 |2 |2020-10-10|2020-10-10 11:11:11| 1630486762|
3| 3|3 |3 |2020-10-10|2020-10-10 11:11:11| 1630486762|
33333| 4|4 |3 |2020-10-10|2020-10-10 11:11:11| 1630486762|
说明我们启动时设置的 .serverTimeZone("Asia/Shanghai") 并没有生效,查源码可以发现,底层的 Debezium 并没有实现 serverTimeZone 的配置,相应的转换是在 RowDataDebeziumDeserializeSchema 内实现的,源码如下:
private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
}
}
//这里的serverTimeZone来自于Bean构造函数传入的配置项
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
因此如果要实现完整的功能,那么我们自己实现的 JsonStringDebeziumDeserializationSchema 也需要包含对应的 Converter,最终代码如下:
public class JsonStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
public JsonStringDebeziumDeserializationSchema(int zoneOffset) {
//实现一个用于转换时间的Converter
this.runtimeConverter = (dbzObj,schema) -> {
if(schema.name() != null){
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)).toLocalDateTime().atOffset(ZoneOffset.ofHours(zoneOffset)).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
case Date.SCHEMA_NAME:
return TemporalConversions.toLocalDate(dbzObj).format(DateTimeFormatter.ISO_LOCAL_DATE);
}
}
return dbzObj;
};
}
//定义接口
private interface DeserializationRuntimeConverter extends Serializable {
Object convert(Object dbzObj, Schema schema);
}
private final JsonStringDebeziumDeserializationSchema.DeserializationRuntimeConverter runtimeConverter;
private Map<String,Object> getRowMap(Struct after){
//转换时使用对应的转换器
return after.schema().fields().stream()
.collect(Collectors.toMap(Field::name,f->runtimeConverter.convert(after.get(f),f.schema())));
}
...
}
同时修改 Main 函数,在构造 JsonStringDebeziumDeserializationSchema 时传入对应的时区,再次运行时就可以看到符合我们预期的输出了。
修改时区后输出:
(true,{"f_date":"2020-10-10","f_random_str":"1","f_sequence":1,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":1,"Name":"1"})
(true,{"f_date":"2020-10-10","f_random_str":"2","f_sequence":2,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":2,"Name":"2"})
(true,{"f_date":"2020-10-10","f_random_str":"3","f_sequence":3,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":3,"Name":"3"})
(true,{"f_date":"2020-10-10","f_random_str":"4","f_sequence":33333,"f_timestamp":1630486762,"f_datetime":"2020-10-10T11:11:11+08:00","f_random":4,"Name":"3"})
最后我们可以验证一下 Schema 变更是不是可以及时同步到输出的 JSON 中,通过语句在数据库中新增一个字段,并插入一条新数据:
ALTER TABLE my_test ADD f_added_string varchar(255) NOT NULL DEFAULT '' COMMENT '新增字段';
INSERT INTO my_test VALUES(1,1,'new','new','2020-10-10 10:10:10',1630486762,'new');
可以看到输出中已经出现了新增的字段
Schema 变更后输出:
猜你喜欢
- 2024-10-29 你还在用 Date?快使用 LocalDateTime 了!
- 2024-10-29 Java修炼终极指南:79,80,81 签到终极修炼天赋
- 2024-10-29 硬核!最全的延迟任务实现方式汇总!附代码(强烈推荐)
- 2024-10-29 还在实体类中用Date?JDK8新的日期类型不香么?
- 2024-10-29 LocalDateTime 说:2020,是时候换个更好的日期时间类了
- 2024-10-29 程序员,你还在使用Date嘛?建议你使用LocalDateTime哦
- 2024-10-29 深度思考:在JDK8中,日期类型该如何使用?
- 2024-10-29 为什么建议使用你 LocalDateTime,而不是 Date?
- 2024-10-29 百度开源的分布式唯一ID生成器UidGenerator,解决了时钟回拨问题
- 2024-10-29 DeepLearning4j 实战:手写体数字识别的 GPU 实现与性能对比
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)