Function Deep Dive: Watermarking in Apache Spark Structured Streaming







Key Takeaways

  • Watermarks assist Spark perceive the processing progress primarily based on occasion time, when to supply windowed aggregates and when to trim the aggregations state
  • When becoming a member of streams of knowledge, Spark, by default, makes use of a single, international watermark that evicts state primarily based on the minimal occasion time seen throughout the enter streams
  • RocksDB will be leveraged to cut back stress on cluster reminiscence and GC pauses
  • StreamingQueryProgress and StateOperatorProgress objects include key details about how watermarks have an effect on your stream


When constructing real-time pipelines, one of many realities that groups need to work with is that distributed information ingestion is inherently unordered. Moreover, within the context of stateful streaming operations, groups want to have the ability to correctly monitor occasion time progress within the stream of knowledge they’re ingesting for the correct calculation of time-window aggregations and different stateful operations. We are able to resolve for all of this utilizing Structured Streaming.

For instance, let’s say we’re a crew engaged on constructing a pipeline to assist our firm do proactive upkeep on our mining machines that we lease to our prospects. These machines all the time must be working in prime situation so we monitor them in real-time. We might want to carry out stateful aggregations on the streaming information to grasp and establish issues within the machines.

That is the place we have to leverage Structured Streaming and Watermarking to supply the mandatory stateful aggregations that can assist inform selections round predictive upkeep and extra for these machines.

What Is Watermarking?

Usually talking, when working with real-time streaming information there will probably be delays between occasion time and processing time because of how information is ingested and whether or not the general software experiences points like downtime. As a result of these potential variable delays, the engine that you just use to course of this information must have some mechanism to determine when to shut the combination home windows and produce the combination outcome.

Whereas the pure inclination to treatment these points could be to make use of a hard and fast delay primarily based on the wall clock time, we’ll present on this upcoming instance why this isn’t the very best resolution.

To elucidate this visually let’s take a state of affairs the place we’re receiving information at numerous occasions from round 10:50 AM → 11:20 AM. We’re creating 10-minute tumbling home windows that calculate the common of the temperature and stress readings that got here in throughout the windowed interval.

On this first image, now we have the tumbling home windows set off at 11:00 AM, 11:10 AM and 11:20 AM resulting in the outcome tables proven on the respective occasions. When the second batch of knowledge comes round 11:10 AM with information that has an occasion time of 10:53 AM this will get integrated into the temperature and stress averages calculated for the 11:00 AM → 11:10 AM window that closes at 11:10 AM, which doesn’t give the proper outcome.

Visual representation of a Structured Streaming pipeline ingesting batches of temperature and pressure data

To make sure we get the proper outcomes for the aggregates we need to produce, we have to outline a watermark that can enable Spark to grasp when to shut the combination window and produce the proper combination outcome.

In Structured Streaming functions, we are able to make sure that all related information for the aggregations we need to calculate is collected by utilizing a function referred to as watermarking. In essentially the most primary sense, by defining a watermark Spark Structured Streaming then is aware of when it has ingested all information as much as a while, T, (primarily based on a set lateness expectation) in order that it will possibly shut and produce windowed aggregates as much as timestamp T.

This second visible reveals the impact of implementing a watermark of 10 minutes and utilizing Append mode in Spark Structured Streaming.

Visual representation of the effect a 10-minute watermark has when applied to the Structured Streaming pipeline.

In contrast to the primary state of affairs the place Spark will emit the windowed aggregation for the earlier ten minutes each ten minutes (i.e. emit the 11:00 AM →11:10 AM window at 11:10 AM), Spark now waits to shut and output the windowed aggregation as soon as the max occasion time seen minus the required watermark is larger than the higher sure of the window.

In different phrases, Spark wanted to attend till it noticed information factors the place the newest occasion time seen minus 10 minutes was higher than 11:00 AM to emit the ten:50 AM → 11:00 AM combination window. At 11:00 AM, it doesn’t see this, so it solely initializes the combination calculation in Spark’s inner state retailer. At 11:10 AM, this situation continues to be not met, however now we have a brand new information level for 10:53 AM so the inner state will get up to date, simply not emitted. Then lastly by 11:20 AM Spark has seen an information level with an occasion time of 11:15 AM and since 11:15 AM minus 10 minutes is 11:05 AM which is later than 11:00 AM the ten:50 AM → 11:00 AM window will be emitted to the outcome desk.

This produces the proper outcome by correctly incorporating the information primarily based on the anticipated lateness outlined by the watermark. As soon as the outcomes are emitted the corresponding state is faraway from the state retailer.

Incorporating Watermarking into Your Pipelines

To know the way to incorporate these watermarks into our Structured Streaming pipelines, we’ll discover this state of affairs by strolling by an precise code instance primarily based on our use case acknowledged within the introduction part of this weblog.

Let’s say we’re ingesting all our sensor information from a Kafka cluster within the cloud and we need to calculate temperature and stress averages each ten minutes with an anticipated time skew of ten minutes. The Structured Streaming pipeline with watermarking would seem like this:


sensorStreamDF = spark 
  .choice("kafka.bootstrap.servers", "host1:port1,host2:port2") 
  .choice("subscribe", "tempAndPressureReadings") 

sensorStreamDF = sensorStreamDF 
.withWatermark("eventTimestamp", "10 minutes") 
.groupBy(window(sensorStreamDF.eventTimestamp, "10 minutes")) 

  .choice("checkpointLocation", "/delta/occasions/_checkpoints/temp_pressure_job/")

Right here we merely learn from Kafka, apply our transformations and aggregations, then write out to Delta Lake tables which will probably be visualized and monitored in Databricks SQL. The output written to the desk for a selected pattern of knowledge would seem like this:

Output from the streaming query defined in PySpark code sample above

To include watermarking we first wanted to establish two objects:

  1. The column that represents the occasion time of the sensor studying
  2. The estimated anticipated time skew of the information

Taken from the earlier instance, we are able to see the watermark outlined by the .withWatermark() methodology with the eventTimestamp column used because the occasion time column and 10 minutes to characterize the time skew that we count on.


sensorStreamDF = sensorStreamDF 
.withWatermark("eventTimestamp", "10 minutes") 
.groupBy(window(sensorStreamDF.eventTimestamp, "10 minutes")) 

Now that we all know the way to implement watermarks in our Structured Streaming pipeline, will probably be necessary to grasp how different objects like streaming be a part of operations and managing state are affected by watermarks. Moreover, as we scale our pipelines there will probably be key metrics our information engineers will want to pay attention to and monitor to keep away from efficiency points. We’ll discover all of this as we dive deeper into watermarking.

Watermarks in Totally different Output Modes

Earlier than we dive deeper, you will need to perceive how your selection of output mode impacts the conduct of the watermarks you set.

Watermarks can solely be used when you’re working your streaming software in append or replace output modes. There’s a third output mode, full mode, during which the whole outcome desk is written to storage. This mode can’t be used as a result of it requires all combination information to be preserved, and therefore can’t use watermarking to drop intermediate state.

The implication of those output modes within the context of window aggregation and watermarks is that in ‘append’ mode an combination will be produced solely as soon as and cannot be up to date. Due to this fact, as soon as the combination is produced, the engine can delete the combination’s state and thus maintain the general aggregation state bounded. Late information – those for which the approximate watermark heuristic didn’t apply (they have been older than the watermark delay interval), subsequently need to be dropped by necessity – the combination has been produced and the combination state deleted.

Inversely, for ‘replace’ mode, the combination will be produced repeatedly ranging from the primary document and on every acquired document, thus a watermark is non-compulsory. The watermark is simply helpful for trimming the state as soon as heuristically the engine is aware of that no extra information for that combination will be acquired. As soon as the state is deleted, once more any late information need to be dropped as the combination worth has been misplaced and may’t be up to date.

You will need to perceive how state, late-arriving information, and the completely different output modes may result in completely different behaviors of your software working on Spark. The principle takeaway right here is that in each append and replace modes, as soon as the watermark signifies that every one information is acquired for an combination time window, the engine can trim the window state. In append mode the combination is produced solely on the closing of the time window plus the watermark delay whereas in replace mode it’s produced on each replace to the window.

Lastly, by rising your watermark delay window you’ll trigger the pipeline to attend longer for information and probably drop much less information – greater precision, but additionally greater latency to supply the aggregates. On the flip facet, smaller watermark delay results in decrease precision but additionally decrease latency to supply the aggregates.

Window Delay Size Precision Latency
Longer Delay Window Increased Precision Increased Latency
Shorter Delay Window Decrease Precision Decrease Latency

Deeper Dive into Watermarking

Joins and Watermarking

There are a pair concerns to pay attention to when doing be a part of operations in your streaming functions, particularly when becoming a member of two streams. Let’s say for our use case, we need to be a part of the streaming dataset about temperature and stress readings with extra values captured by different sensors throughout the machines.

There are three overarching sorts of stream-stream joins that may be carried out in Structured Streaming: internal, outer, and semi joins. The principle drawback with doing joins in streaming functions is that you’ll have an incomplete image of 1 facet of the be a part of. Giving Spark an understanding of when there are not any future matches to count on is much like the sooner drawback with aggregations the place Spark wanted to grasp when there have been no new rows to include into the calculation for the aggregation earlier than emitting it.

To permit Spark to deal with this, we are able to leverage a mix of watermarks and event-time constraints throughout the be a part of situation of the stream-stream be a part of. This mixture permits Spark to filter out late information and trim the state for the be a part of operation by a time vary situation on the be a part of. We display this within the instance under:


sensorStreamDF = spark.readStream.format("delta").desk("sensorData")
tempAndPressStreamDF = spark.readStream.format("delta").desk("tempPressData")

sensorStreamDF_wtmrk = sensorStreamDF.withWatermark("timestamp", "5 minutes")
tempAndPressStreamDF_wtmrk = tempAndPressStreamDF.withWatermark("timestamp", "5 minutes")

joinedDF = tempAndPressStreamDF_wtmrk.alias("t").be a part of(
   s.sensor_id == t.sensor_id AND
   s.timestamp >= t.timestamp AND
   s.timestamp <= t.timestamp + interval 5 minutes
).withColumn("sensorMeasure", col("Sensor1")+col("Sensor2")) 
.groupBy(window(col("t.timestamp"), "10 minutes")) 
.agg(avg(col("sensorMeasure")).alias("avg_sensor_measure"), avg(col("temperature")).alias("avg_temperature"), avg(col("stress")).alias("avg_pressure")) 
.choose("window", "avg_sensor_measure", "avg_temperature", "avg_pressure")

       .choice("checkpointLocation", "/checkpoint/recordsdata/") 

Nonetheless, not like the above instance, there will probably be occasions the place every stream might require completely different time skews for his or her watermarks. On this state of affairs, Spark has a coverage for dealing with a number of watermark definitions. Spark maintains one international watermark that’s primarily based on the slowest stream to make sure the best quantity of security in terms of not lacking information.

Builders do have the power to vary this conduct by altering spark.sql.streaming.multipleWatermarkPolicy to max; nevertheless, which means information from the slower stream will probably be dropped.

To see the complete vary of be a part of operations that require or may leverage watermarks take a look at this part of Spark’s documentation.

Monitoring and Managing Streams with Watermarks

When managing a streaming question the place Spark might must handle thousands and thousands of keys and maintain state for every of them, the default state retailer that comes with Databricks clusters might not be efficient. You would possibly begin to see greater reminiscence utilization, after which longer rubbish assortment pauses. These will each impede the efficiency and scalability of your Structured Streaming software.

That is the place RocksDB is available in. You possibly can leverage RocksDB natively in Databricks by enabling it like so within the Spark configuration:


This may enable the cluster working the Structured Streaming software to leverage RocksDB which may extra effectively handle state within the native reminiscence and benefit from the native disk/SSD as an alternative of holding all state in reminiscence.

Past monitoring reminiscence utilization and rubbish assortment metrics, there are different key indicators and metrics that needs to be collected and tracked when coping with Watermarking and Structured Streaming. To entry these metrics you’ll be able to have a look at the StreamingQueryProgress and the StateOperatorProgress objects. Try our documentation for examples of the way to use these right here.

Within the StreamingQueryProgress object, there’s a methodology referred to as “eventTime” that may be referred to as and that can return the maxminavg, and watermark timestamps. The primary three are the max, min, and common occasion time seen in that set off. The final one is the watermark used within the set off.

Abbreviated Instance of a StreamingQueryProgress object

  "id" : "f4311acb-15da-4dc3-80b2-acae4a0b6c11",
  . . . .
  "eventTime" : {
    "avg" : "2021-02-14T10:56:06.000Z",
    "max" : "2021-02-14T11:01:06.000Z",
    "min" : "2021-02-14T10:51:06.000Z",
    "watermark" : "2021-02-14T10:41:06.000Z"
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 7,
    "numRowsUpdated" : 0,
    "allUpdatesTimeMs" : 205,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 233,
    "commitTimeMs" : 15182,
    "memoryUsedBytes" : 91504,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 200,
    "numStateStoreInstances" : 200,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 4800,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 25680
  . . . .

These pieces of information can be used to reconcile the data in the result tables that your streaming queries are outputting and also be used to verify that the watermark being used is the intended eventTime timestamp. This can become important when you are joining streams of data together.

Within the StateOperatorProgress object there is the numRowsDroppedByWatermark metric. This metric will show how many rows are being considered too late to be included in the stateful aggregation. Note that this metric is measuring rows dropped post-aggregation and not the raw input rows, so the number is not precise but can give an indication that there is late data being dropped. This, in conjunction with the information from the StreamingQueryProgress object, can help developers determine whether the watermarks are correctly configured.

Multiple Aggregations, Streaming, and Watermarks

One remaining limitation of Structured Streaming queries is chaining multiple stateful operators (e.g. aggregations, streaming joins) in a single streaming query. This limitation of a singular global watermark for stateful aggregations is something that we at Databricks are working on a solution for and will be releasing more information about in the coming months. Check out our blog on Project Lightspeed to learn more: Project Lightspeed: Faster and Simpler Stream Processing With Apache Spark (databricks.com).


With Structured Streaming and Watermarking on Databricks, organizations, like the one with the use case described above, can build resilient real-time applications that ensure metrics driven by real-time aggregations are being accurately calculated even if data is not properly ordered or on-time. To learn more about how you can build real-time applications with Databricks, contact your Databricks representative.


Share this


Top 42 Como Insertar Una Imagen En Html Bloc De Notas Update

Estás buscando información, artículos, conocimientos sobre el tema. como insertar una imagen en html bloc de notas en Google

Top 8 Como Insertar Una Imagen En Excel Desde El Celular Update

Estás buscando información, artículos, conocimientos sobre el tema. como insertar una imagen en excel desde el celular en Google

Top 7 Como Insertar Una Imagen En Excel Como Marca De Agua Update

Estás buscando información, artículos, conocimientos sobre el tema. como insertar una imagen en excel como marca de agua en Google

Recent articles

More like this