网站首页 > 技术文章 正文
前言
在实际开发过程中,我们可能需要接入各种流数据源,比如在线业务用户点击流数据、监控系实时收集到的事件流数据、从传感器采集到的实时数据,等等,为了处理方便他们可能会写入Kafka消息中间件集群中某个/某些topic中,或者选择其它的缓冲/存储系统。这些数据源中数据元素具有固定的时间属性,是在流数据处理系统之外的其它系统生成的。比如,上亿用户通过手机终端操作触发生成的事件数据,都具有对应的事件时间;再特殊一点,可能我们希望回放(Replay)上一年手机终端用户的历史行为数据,与当前某个流数据集交叉分析才能够得到支持某类业务的特定结果,这种情况下,基于数据所具有的事件时间进行处理,就具有很重要的意义了。下面,我们先从Flink支持的3个与流数据处理相关的时间概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系统对时间概念的抽象有其它叫法,比如,Google Cloud Dataflow中称为时间域(Time Domain)。在Flink中,基于不同的Time Notion来处理流数据,具有不同的意义和结果,所以了解这3个Time Notion非常关键。
Time Notion
我们先看下,Apache Flink官网文档给出的一张概念图,非常形象地展示了Process Time、Event Time、Ingestion Time这三个时间分别所处的位置,如下图所示:
下面,分别对这3个Time Notion进行说明如下:
- ProcessTime
Flink中有对数据处理的操作进行抽象,称为Transformation Operator,而对于整个Dataflow的开始和结束分别对应着Source Operator和Sink Operator,这些Operator都是在Flink集群系统所在的主机节点上,所以在基于ProcessTime的Notion进行与时间相关的数据处理时,数据处理依赖于Flink程序运行所在的主机节点系统时钟(System Clock)。因为我们关心的是数据处理时间(Process Time),比如进行Time Window操作,对Window的指派就是基于当前Operator所在主机节点的系统时钟。也就是说,每次创建一个Window,计算Window对应的起始时间和结束时间都使用Process Time,它与外部进入的数据元素的事件时间无关。那么,后续作用于Window的操作(Function)都是基于具有Process Time特性的Window进行的。使用ProcessTime的场景,比如,我们需要对某个App应用的用户行为进行实时统计分析与监控,由于用户可能使用不同的终端设备,这样可能会造成数据并非是实时的(如用户手机没电,导致2小时以后才会将操作行为记录批量上传上来)。而此时,如果我们按照每分钟的时间粒度做实时统计监控,那么这些数据记录延迟的太严重,如果为了等到这些记录上传上来(无法预测,具体什么时间能获取到这些数据)再做统计分析,对每分钟之内的数据进行统计分析的结果恐怕要到几个小时甚至几天后才能计算并输出结果,这不是我们所希望的。而且,数据处理系统可能也没有这么大的容量来处理海量数据的情况。结合业务需求,其实我们只需要每分钟时间内进入的数据记录,依赖当前数据处理系统的处理时间(Process Time)生成每分钟的Window,指派数据记录到指定Window并计算结果,这样就不用考虑数据元素本身自带的事件时间了。
- EventTime
流数据中的数据元素可能会具有不变的事件时间(Event Time)属性,该事件时间是数据元素所代表的行为发生时就不会改变。最简单的情况下,这也最容易理解:所有进入到Flink处理系统的流数据,都是在外部的其它系统中产生的,它们产生后具有了事件时间,经过传输后,进入到Flink处理系统,理论上(如果所有系统都具有相同系统时钟)该事件时间对应的时间戳要早于进入到Flink处理系统中进行处理的时间戳,但实际应用中会出现数据记录乱序、延迟到达等问题,这也是非常普遍的。基于EventTime的Notion,处理数据的进度(Progress)依赖于数据本身,而不是当前Flink处理系统中Operator所在主机节点的系统时钟。所以,需要有一种机制能够控制数据处理的进度,比如一个基于事件时间的Time Window创建后,具体怎么确定属于该Window的数据元素都已经到达?如果确定都到达了,然后就可以对属于这个Window的所有数据元素做满足需要的处理(如汇总、分组等)。这就要用到WaterMark机制,它能够衡量数据处理进度(表达数据到达的完整性)。WaterMark带有一个时间戳,假设为X,进入到数据处理系统中的数据元素具有事件时间,记为Y,如果Y<X,则所有的数据元素均已到达,可以计算并输出结果。反过来说,可能更容易理解一些:要想触发对当前Window中的数据元素进行计算,必须保证对所有进入到系统的数据元素,其事件时间Y>=X。如果数据元素的事件时间是有序的,那么当出现一个数据元素的事件时间Y<X,则触发对当前Window计算,并创建另一个新的Window来指派事件时间Y<X的数据元素到该新的Window中。可以看到,有了WaterMark机制,对基于事件时间的流数据处理会变得特别灵活,可以根据实际业务需要选择各种组件和处理策略。比如,上面我们说到,当Y<X则触发当前Window计算,记为t1时刻,如果流数据元素是乱序的,经过一段时间,假设t2时刻有一个数据元素的事件时间Y>=X,这时该怎么办呢?如果t1时刻的Window已经不存在了,但我们还是希望新出现的乱序数据元素加入到t1时刻Window的计算中,这时可以实现自定义的Trigger来满足各种业务场景的需要。
- IngestionTime
IngestionTime是数据进入到Flink流数据处理系统的时间,该时间依赖于Source Operator所在主机节点的系统时钟,会为到达的数据记录指派Ingestion Time。基于IngestionTime的Notion,存在多个Source Operator的情况下,每个Source Operator会使用自己本地系统时钟指派Ingestion Time。后续基于时间相关的各种操作,都会使用数据记录中的Ingestion Time。与EventTime相比,IngestionTime不能处理乱序、延迟到达事件的应用场景,它也就不用必须指定如何生成WaterMark。
使用EventTime与WaterMark
下面,我们通过实际编程实践,来说明一些需要遵守的基本原则,以便在开发中进行合理设置。在开发Flink流数据处理程序时,需要指定Time Notion,Flink API提供了TimeCharacteristic枚举类,内部定义了3种Time Notion(参考上面说明)。设置Time Notion的示例代码,如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
上面,我们指定了基于TimeCharacteristic.EventTime来进行数据处理。如果我们没有显式指定TimeCharacteristic,默认使用TimeCharacteristic.ProcessTime。基于EventTime的数据处理,需要对进入的数据元素指派时间戳,并且指定如何生成WaterMark,这样才能通过WaterMark来机制控制输入数据的完整性(事件到达),以便触发对指定Window进行计算。有两种方式实现时间戳指派和生成WaterMark:
- 在Flink程序一开始调用assignTimestampsAndWatermarks()进行指派
- 在Source Operator中直接指派
下面,我们会基于这两种方式进行编码实现:
调用assignTimestampsAndWatermarks()进行指派
TimeWindow的大小设置为1分钟(60000ms),允许延迟到达时间设置为50秒(50000ms),并且为了模拟流数据元素事件时间早于当前处理系统的系统时间,设置延迟时间为2分钟(120000ms)。我们自定义实现了一个用来模拟的Source Operator,代码如下所示:
class StringLineEventSource(val latenessMillis: Long) extends RichParallelSourceFunction[String] {
val LOG = LoggerFactory.getLogger(classOf[StringLineEventSource])
@volatile private var running = true
val channelSet = Seq("a", "b", "c", "d")
val behaviorTypes = Seq(
"INSTALL", "OPEN", "BROWSE", "CLICK",
"PURCHASE", "CLOSE", "UNINSTALL")
val rand = Random
override def run(ctx: SourceContext[String]): Unit = {
val numElements = Long.MaxValue
var count = 0L
while (running && count < numElements) {
val channel = channelSet(rand.nextInt(channelSet.size))
val event = generateEvent()
LOG.debug("Event: " + event)
val ts = event(0)
val id = event(1)
val behaviorType = event(2)
ctx.collect(Seq(ts, channel, id, behaviorType).mkString("\t"))
count += 1
TimeUnit.MILLISECONDS.sleep(5L)
}
}
private def generateEvent(): Seq[String] = {
// simulate 10 seconds lateness
val ts = Instant.ofEpochMilli(System.currentTimeMillis)
.minusMillis(latenessMillis)
.toEpochMilli
val id = UUID.randomUUID().toString
val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
// (ts, id, behaviorType)
Seq(ts.toString, id, behaviorType)
}
override def cancel(): Unit = running = false
}
流数据中的数据元素为字符串记录行的格式,包含字段:事件时间、渠道、用户编号、用户行为类型。这里,我们直接调用SourceContext.collect()方法,将数据元素发送到下游进行处理。在Flink程序中,通过调用stream: DataStream[T]的assignTimestampsAndWatermarks()进行时间戳的指派,并生成WaterMark。然后,基于Keyed Window生成Tumbling Window(不存在Window重叠)来操作数据记录。最后,将计算结果输出到Kafka中去。对应的实现代码,如下所示:
def main(args: Array[String]): Unit = {
val params = ParameterTool.fromArgs(args)
checkParams(params)
val sourceLatenessMillis = params.getRequired("source-lateness-millis").toLong
maxLaggedTimeMillis = params.getLong("window-lagged-millis", DEFAULT_MAX_LAGGED_TIME)
val windowSizeMillis = params.getRequired("window-size-millis").toLong
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置为TimeCharacteristic.EventTime
val stream: DataStream[String] = env.addSource(new StringLineEventSource(sourceLatenessMillis))
// create a Kafka producer for Kafka 0.9.x
val kafkaProducer = new FlinkKafkaProducer09(
params.getRequired("window-result-topic"),
new SimpleStringSchema, params.getProperties
)
stream
.setParallelism(1)
.assignTimestampsAndWatermarks( // 指派时间戳,并生成WaterMark
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(maxLaggedTimeMillis)) {
override def extractTimestamp(element: String): Long = {
element.split("\t")(0).toLong
}
})
.setParallelism(2)
.map(line => {
// ts, channel, id, behaviorType
val a = line.split("\t")
val channel = a(1)
((channel, a(3)), 1L)
})
.setParallelism(3)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis))) // 使用Keyed Window
.process(new EventTimeWindowReduceFunction())
.setParallelism(4)
.map(t => {
val windowStart = t._1
val windowEnd = t._2
val channel = t._3
val behaviorType = t._4
val count = t._5
Seq(windowStart, windowEnd, channel, behaviorType, count).mkString("\t")
})
.setParallelism(3)
.addSink(kafkaProducer)
.setParallelism(3)
env.execute(getClass.getSimpleName)
}
上面,我们使用了Flink内建实现的BoundedOutOfOrdernessTimestampExtractor来指派时间戳和生成WaterMark。这里,我们实现了从事件记录中提取时间戳的逻辑,实际生成WaterMark的逻辑使用BoundedOutOfOrdernessTimestampExtractor提供的默认逻辑,在getCurrentWatermark()方法中。我们来看下BoundedOutOfOrdernessTimestampExtractor的实现,代码如下所示:
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = Long.MIN_VALUE;
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始设置当前最大事件时间戳
}
public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 当前最大事件时间戳,减去允许最大延迟到达时间
if (potentialWM >= lastEmittedWatermark) { // 检查上一次emit的WaterMark时间戳,如果比lastEmittedWatermark大则更新其值
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) { // 检查新到达的数据元素的事件时间,用currentMaxTimestamp记录下当前最大的
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
可以看到,在getCurrentWatermark()和extractTimestamp()方法中,lastEmittedWatermark是WaterMark中的时间戳,计算它时,总是根据当前进入Flink处理系统的数据元素的最大的事件时间currentMaxTimestamp,然后再减去一个maxOutOfOrderness(外部配置的支持最大延迟到达的时间),也就说,这里面实现的WaterMark中的时间戳序列是非严格单调递增的。我们实现的Flink程序为EventTimeTumblingWindowAnalytics,提交到Flink集群运行,执行如下命令:
bin/flink run --class org.shirdrn.flink.windowing.EventTimeTumblingWindowAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \
--window-result-topic windowed-result-topic \
--zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
--bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
--source-lateness-millis 120000 \
--window-lagged-millis 50000 \
--window-size-millis 60000
可以查看输出到Kafka中的数据,示例如下所示:
20180108154000 20180108154100 a CLOSE 421
20180108154000 20180108154100 c PURCHASE 434
20180108154000 20180108154100 b BROWSE 443
20180108154000 20180108154100 d OPEN 406
20180108154000 20180108154100 d CLICK 398
20180108154000 20180108154100 b PURCHASE 405
20180108154100 20180108154200 c CLOSE 421
20180108154100 20180108154200 a INSTALL 449
20180108154100 20180108154200 a CLICK 407
20180108154100 20180108154200 b CLOSE 416
20180108154100 20180108154200 a OPEN 388
20180108154200 20180108154300 b CLICK 447
20180108154200 20180108154300 b CLOSE 435
20180108154200 20180108154300 a OPEN 405
20180108154200 20180108154300 a CLICK 405
20180108154200 20180108154300 a PURCHASE 400
20180108154200 20180108154300 c UNINSTALL 407
20180108154200 20180108154300 c CLOSE 438
在Source Operator中直接指派
和上面我们最终期望的逻辑基本保持一致,我们把指派时间戳和生成WaterMark的逻辑,提取出来放到Source Operator实现中,对应的关键代码片段,如下所示:
var lastEmittedWatermark = Long.MinValue
var currentMaxTimestamp = Long.MinValue + maxLaggedTimeMillis
... ...
ctx.collectWithTimestamp(Seq(ts, channel, id, behaviorType).mkString("\t"), ts.toLong)
ctx.emitWatermark(getCurrentWatermark(ts.toLong))
... ...
private def getCurrentWatermark(ts: Long): Watermark = {
if (ts > currentMaxTimestamp) {
currentMaxTimestamp = ts
}
val watermarkTs = currentMaxTimestamp - maxLaggedTimeMillis
if (watermarkTs >= lastEmittedWatermark) {
lastEmittedWatermark = watermarkTs
}
new Watermark(lastEmittedWatermark)
}
需要在Flink程序的main方法中,将外部配置的与WaterMark生成相关的参数值,传到Source Operator实现类中,如下所示:
1
2
val stream: DataStream[String] = env.addSource(
new StringLineEventSourceWithTsAndWaterMark(sourceLatenessMillis, maxLaggedTimeMillis))
同时,把前面调用assignTimestampsAndWatermarks()的方法去掉即可。编译后,提交到Flink集群运行,可以查看输出结果,和前面类似,输出结果正是我们所期望的。(原创时延军(包含链接:http://shiyanjun.cn))
可以转发关注小编,每天更新干货文章
乘风破浪会有时,直挂云帆济沧海。
猜你喜欢
- 2024-10-29 你还在用 Date?快使用 LocalDateTime 了!
- 2024-10-29 Java修炼终极指南:79,80,81 签到终极修炼天赋
- 2024-10-29 硬核!最全的延迟任务实现方式汇总!附代码(强烈推荐)
- 2024-10-29 还在实体类中用Date?JDK8新的日期类型不香么?
- 2024-10-29 LocalDateTime 说:2020,是时候换个更好的日期时间类了
- 2024-10-29 程序员,你还在使用Date嘛?建议你使用LocalDateTime哦
- 2024-10-29 深度思考:在JDK8中,日期类型该如何使用?
- 2024-10-29 为什么建议使用你 LocalDateTime,而不是 Date?
- 2024-10-29 百度开源的分布式唯一ID生成器UidGenerator,解决了时钟回拨问题
- 2024-10-29 DeepLearning4j 实战:手写体数字识别的 GPU 实现与性能对比
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)