Tutorial
13 min read

Unlocking Real-Time Insights: The Power of Streaming Databases. Part 1

Data space has been changing rapidly in recent years, and data streaming plays a vital role. In this blog post, we will explore the concepts and definitions behind streaming databases, compare them with familiar technologies, and finally focus on one implementation of streaming databases: Materialize. The second part of the article will dive into more complex use cases and another popular implementation of streaming databases: RisingWave.

Intro. Context and Basics 

Our world changes rapidly, at a pace that is hard to comprehend. Businesses need to make data-driven decisions to be competitive. Batch processing can help with that, but there are many areas where batch processing is not enough; it has vast latency and requires quite heavy resources to calculate massive data, which is not the case in streaming applications. Streaming systems can help make faster decisions and provide near real time information with fewer resources needed. But that is not coming at a cost.

One of the issues regarding stream processing is that it is way harder to use than batch. We need to take into account data ordering, late data, data duplication and proper scalability when we have data peaks and many more. As a result, most have opted to continue using databases to transform data and running the data processing in batches at the expense of not meeting performance requirements.

As a mediatory solution, we can consider streaming databases, which expose SQL and are easier to use than stream processing engines but give you real time decisions compared to batch processing.

To understand what streaming databases are and where they came from, we need to go back to a few major topics that we need to understand before we dive into them. 

  • Streaming platforms and engines
  • Materialized views
  • Streaming SQL

Streaming platforms and engines

Streaming platforms as response to the need to deliver real time analytics in a highly paced world, we need to mention here Apache Kafka Streams, Apache Flink, Apache Spark and Apache Beam (there are plenty of them, this article is too short to be able to name them all). Engines mentioned earlier separate compute and storage, but issues arise when you need to expose your data to the outside world, such as for analytics purposes. In these cases, you may need to manage connectors to other sources yourself, unless you are fortunate enough to use an already implemented, fully functional connector.

As a serving step we can consider Real time olap systems like Apache Pinot or Clickhouse. Recently also popped up Apache Paimon which might be an alternative to the above mentioned systems.

Materialized views

The idea of streaming databases came from materialized views, which represent a calculated portion of data. Compared to the views, the result is cached once computed; unfortunately, it needs to be recalculated from the outside, which might take time. It's becoming slow and impossible to scale correctly for vast amounts of data, but it might be handy for more minor data to simulate the real time result.

Streaming SQL

Streaming SQL is a relatively new concept. Apache Spark and Apache Flink started from non-SQL data processing engines; the need to create streaming SQL came after. 

Streaming SQL originates from the same familiar language, Structured Query Language (SQL), but extends it to handle streams of rapidly changing data. SQL simplifies processing, allowing you to focus on what's essential for delivering business value. However, it can also become a bottleneck, as it may limit your flexibility when dealing with unique problems that differ from the industry's typical use cases.

The key difference between SQL and Streaming SQL is that first, we deal with the point-in-time problem; with Streaming SQL, it's a continuous query. 

A query run on RDBMS needs time to be processed, and once you get the result, the data might not be up to date. The main goal of streaming SQL and streaming databases is to give quick results on fast-changing event streams when data is still relevant. 

Streaming Databases

Streaming databases are designed to process, analyze, and store real-time data. They provide SQL interfaces for data manipulation, with the core abstraction being a materialized view, which plays a vital role in the system. These databases support streaming concepts such as windowing, exactly-once semantics, and watermarks.

Now, let's compare streaming databases with the technologies we are already familiar with.

RDMS - they share a few concepts like materialized views and SQL as a language to talk to your data; the main difference is the streaming databases are trying to optimize streaming use cases, materialized views are constantly updated.

Analytics databases like Redshift, BigQuery, and Snowflake are focused on batch processing at first. 

Real-time OLAP databases like Apache Pinot and Clickhouse focus on quickly run analytics queries, not recent data or incremental computation. 

Streaming processing engines like Apache Spark and Apache Flink expose SQL as a way of wrangling the data but need external storage.

Streaming processing engines like Apache Spark and Apache Flink - expose SQL as a way of wrangling the data, but they need external storage.

Streaming databases implementations

  • KSQL DB
  • Rising Wave
  • Materialise
  • Timeplus

In that section, let’s dive into the details of materialize.

Materialize

Materialize is a cloud-native data warehouse purpose-built for operational workloads where an analytical data warehouse would be too slow and a stream processor would need to be simplified.

The first phrase we can find on the official GitHub repository of materialize they aim for are use cases where solutions like Bigquery or Redshift are too slow but Apache Flink and similar are too complex. 

Materialize helps you perform analytics on live streaming data; it focuses on providing precise answers to business questions based on fresh data at a given time with low latency. 

The fully managed cloud solution is horizontally scalable in addition to the fact that as storage materialize is using s3 it, can be scaled almost infinitely. 

What is really important to notice is that materialize for reads supports PostgreSQL protocol, so you can easily connect materialize to the dashboard and play with the tool interactively in your favorite IDE.

Important concepts

  • clusters - isolated compute resources for sources, sinks, indexes, materialized views, and ad hoc queries
  • sources - places from which data needs to be read from
  • views - are concepts for calculated data that need to be repeatedly executed. Materialize split that division into views and materialized views.
  • indexes - represents query results stored in memory.
  • sinks - are external systems to which the data needs to be written.

Specific instances where indexes can be useful to improve performance include:

  • When used in ad-hoc queries.
  • When used by multiple queries within the same cluster.
  • When used to enable delta joins.

Materialize supports a variety of data sources out of the box

  • MySQL (CDC), including cloud versions like Aurora and CloudSQL
  • PostgreSQL (CDC), including cloud versions like Aurora and CloudSQL
  • SQL Server only via debeezium and Kafka
  • Kafka, including AWS MSK, confluent cloud, or self-hosted
  • RedPanda
  • Webhooks

To add a connection, the SQL code is as simple as that:

CREATE CONNECTION mysql_connection TO MYSQL (
    HOST <host>,
    PORT 3306,
    USER “materialize”,
    PASSWORD SECRET “password”,
    SSL MODE REQUIRED
);

or for kafka

CREATE CONNECTION kafka_connection TO KAFKA (
    BROKERS (“b-1.hostname-1:9096”),
    SASL MECHANISMS = “SCRAM-SHA-512”,
    SASL USERNAME = “foo”,
    SASL PASSWORD = “password”
);

Great, we have quite a rich connection list, but what we can choose as our sink when writing this blog post is that Materialize supports 4 data sinks.

It’s understandable as the product aims to keep the data within it, and you can relatively easily integrate it into the external world via a confluent cloud.

Let’s explore what tools we can use in materialize.

We can use primitive types like text., integer, datetime, byte, float, and double. Also, we can use PostgreSQL jsonb format, but it is worth mentioning that we have at our disposal record types, which can consist of any subtype (nested types, which are handy for any analytical workload)

In the SQL and functions section in the documentation, materialize supports all the functions (or almost all) we know from Postgresql, an excellent starter to be able to use the product in production.  

Common patterns

materialized views

Maintaining a materialized view in durable storage comes with resource and latency costs that should be carefully weighed based on the view's primary purpose. It is advisable to create a materialized view if:

The results need to be accessible across multiple clusters;

Independent scaling of view maintenance and query serving is beneficial;

The view's final consumer is a sink or a SUBSCRIBE operation.

However, if the view is only needed within a single cluster, creating a regular view and building an index on it might be more efficient. The index will keep the view's results updated incrementally in memory within that cluster, thereby avoiding the resource and latency costs associated with materialization.

view + index

While it's possible to query a materialized view directly, doing so interacts with the Materialize storage layer. Although this is generally fast, it's slower than reading from memory. It's recommended to create indexes tailored to common query patterns to enhance query performance on materialized views.

It's important to note that indexes are specific to each cluster and are maintained in memory. For instance, if you create a materialized view and an index on it in the quickstart cluster, queries from a different cluster won't benefit from that index. To optimize performance, you should create the necessary indexes in every cluster where you plan to reference the materialized view.

Temporal filters (time windows)

A temporal filter is a condition applied in a WHERE or HAVING clause that leverages the mz_now() function. This function provides the current virtual timestamp in Materialize, which updates in sync with real-time data processing. Using a temporal filter, you can narrow down the dataset to focus on recent records, thereby conserving memory and emphasizing recent data.

An example of a temporal filter might involve selecting records with timestamps from the last five minutes.

WHERE mz_now() <= event_ts + INTERVAL '5min'

Periodically emit results

Imagine we need to calculate the number of records in a 1-minute window grouped by some column as an additional requirement. We don't have to update when it happens. Instead, we want to emit data for each window (trigger). We can use date functions for that use case (date_bin). Great example: you can find the official product documentation 

Late arriving events

In many streaming use cases, we have to deal with late data; it might come from network issues, massive data peaks, or the user sending data with delay. How can materialize help you with that? We can use a temporal filter to filter the most recent hours' worth of records.

WHERE mz_now() <= event_ts + INTERVAL '1hr'

Window functions (OVER clause)

The platform offers a variety of window functions to handle complex business rules. Users can use window functions, such as LAG, LEAD, ROW_NUMBER, and FIRST_VALUE, and many more. When writing this article, recompute all the records for the entire window partition when an input record is added, removed, or changed in the partition. So, if we have a materialized view and imagine in 1s recalculate interval, your data changes include 20 records in 10 partitions, then if your average partition has 200 records, it means the data has to be recalculated based on 2000 records. So, it's essential to write the query to reduce the number of records that need to be considered when refreshing the query result. As documentation stands, materialize should be able to keep up with 1,000,000 per second. Consider rewriting the query to handle that load if the requirements exceed that.

Now, after understanding materialize better, let's create an application.

Dataset

We have data about ad clicks 

ad_id, region_id, timestamp_millis

The example record might look like this: 

{
    “ad_id”: “7d9d3361-4990-4da2-b489-9c8ce1a08919”,
    “region_id”: “5d206029-a4f6-4137-8d85-5a515b8ac1f7”,
    “event_time”: 1724592694014
}

Our goal is to calculate ads in a given 1-minute window.

In the demo, let’s assume that we already have the table with the data; the DDL looks like this:

CREATE TABLE ad_clicks_by_region (
    ad_id UUID,
    region_id UUID,
    event_time TIMESTAMP
);

Then we need to bucket our results into 1-minute windows.

CREATE VIEW bucketed_ad_clicks_by_region
    AS
        SELECT
            ad_id,
            date_bin(
                    '1 minute',
                    event_time,
                    '2000-01-01 00:00:00+00'
                )
                + INTERVAL '1 minute'
                AS window_end
        FROM ad_clicks_by_region
        WHERE mz_now() <= event_time + INTERVAL '30 days';

And now finally lets create materialized view aggregated by ad_id

CREATE MATERIALIZED VIEW ad_clicks_by_region_agg

    AS

        SELECT

          ad_id,

          count(ad_id) AS count,

          window_end

        FROM bucketed_ad_clicks_by_region

        WHERE

            mz_now() >= window_end

                AND

            mz_now() < window_end + INTERVAL '7 days'

        GROUP BY window_end, ad_id;

We can subscribe to the result and see the changes using local Python code.

dsn = f"user={user} password={password} host={host} port=6875 dbname=materialize sslmode=require"
conn = psycopg2.connect(dsn)

with conn.cursor() as cur:
    cur.execute("DECLARE c CURSOR FOR SUBSCRIBE ad_clicks_by_region_agg")
    while True:
        cur.execute("FETCH ALL c")
        for row in cur:
            print(row)

let’s insert some data

INSERT INTO ad_clicks_by_region VALUES ('b8d75e37-3408-4f2f-a003-5f8d19d7066b', 'fea33889-7dda-4019-afd5-5834157aa054', now());

INSERT INTO ad_clicks_by_region VALUES ('b8d75e37-3408-4f2f-a003-5f8d19d7066b', 'fea33889-7dda-4019-afd5-5834157aa054', now());

INSERT INTO ad_clicks_by_region VALUES ('4cc0a135-7566-4087-955f-0a32b0786728', '6d3a6842-dffa-4b0f-8958-ec72c6d8903b', now());

We can see when window ends we see the result

let’s run the same query multiple times

Summary

Streaming databases are designed to handle continuous streams of data in real time. Unlike traditional databases, which store and process data in batches, streaming databases process data as it arrives, enabling immediate insights and actions based on the latest information. They enable SQL as a language of communication to help deliver business logic faster, in addition to the fact that solutions available on the market like materialize. 

In conclusion, streaming databases are essential for applications requiring real-time data processing and insights. They provide businesses with the ability to act on the most current information available, driving more responsive and data-driven decision-making processes. However, they also come with challenges related to complexity, consistency, and scalability. However, compared to stream processing engines like Apache Flink, streaming databases like Materialize and Rising Wave are way more accessible to start with, but that comes with the cost of being flexible to create the solution for a problem we are dealing with. Worth keeping in mind that analytical databases like BigQuery or Snowflake recently started initiatives with streaming tables, which can be more flexible for users already using the products.

If you need help defining the best solution for your organization or implementing it, sign up for a free consultation with one of our experts.

streaming
kafka
spark
streaming database
Clickhouse
Streaming SQL
27 August 2024

Want more? Check our articles

introducinggeiparquetobszar roboczy 1 4
Tutorial

Introducing the Geoparquet data format

The need for a unified format for geospatial data In recent years, a lot of geospatial frameworks have been created to process and analyze big…

Read more
propozycja2
Tutorial

Deploying efficient Kedro pipelines on GCP Composer / Airflow with node grouping & MLflow

Airflow is a commonly used orchestrator that helps you schedule, run and monitor all kinds of workflows. Thanks to Python, it offers lots of freedom…

Read more
getindata modern data platform features tools
Tech News

GetInData Modern Data Platform - features & tools

About the GetInData Modern Data Platform In our previous article you learned what our take on the Modern Data Platform is and that we took some steps…

Read more
radiodataquantum
Radio DaTa Podcast

Data Journey with Yetunde Dada & Ivan Danov (QuantumBlack) – Kedro (an open-source MLOps framework) – introduction, benefits, use-cases, data & insights used for its development

In this episode of the RadioData Podcast, Adam Kawa talks with Yetunde Dada & Ivan Danov  about QuantumBlack, Kedro, trends in the MLOps landscape e.g…

Read more
aiobszar roboczy 1 4
Tutorial

EU Artificial Intelligence Act - where are we now

It's coming up to a year since the European Commission published its proposal for the Artificial Intelligence Act (the AI Act/AI Regulation).  The…

Read more
1RiTD99ILqsAaSQqY1GaLMw
Big Data Event

Five big ideas to learn at Big Data Tech Warsaw 2020

Hello again in 2020. It’s a new year and the new, 6th edition of Big Data Tech Warsaw is coming soon! Save the date: 27th of February. We have put…

Read more

Contact us

Interested in our solutions?
Contact us!

Together, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.


What did you find most impressive about GetInData?

They did a very good job in finding people that fitted in Acast both technically as well as culturally.
Type the form or send a e-mail: hello@getindata.com
The administrator of your personal data is GetInData Poland Sp. z o.o. with its registered seat in Warsaw (02-508), 39/20 Pulawska St. Your data is processed for the purpose of provision of electronic services in accordance with the Terms & Conditions. For more information on personal data processing and your rights please see Privacy Policy.

By submitting this form, you agree to our Terms & Conditions and Privacy Policy