Reflecting on 2023: Celebrating GetInData’s Achievements in Data & AI
Let’s take a little step back to 2023 to summarize and celebrate our achievements. Last year was focused on knowledge-sharing actions and joining…
Read moreData space has been changing rapidly in recent years, and data streaming plays a vital role. In this blog post, we will explore the concepts and definitions behind streaming databases, compare them with familiar technologies, and finally focus on one implementation of streaming databases: Materialize. The second part of the article will dive into more complex use cases and another popular implementation of streaming databases: RisingWave.
Our world changes rapidly, at a pace that is hard to comprehend. Businesses need to make data-driven decisions to be competitive. Batch processing can help with that, but there are many areas where batch processing is not enough; it has vast latency and requires quite heavy resources to calculate massive data, which is not the case in streaming applications. Streaming systems can help make faster decisions and provide near real time information with fewer resources needed. But that is not coming at a cost.
One of the issues regarding stream processing is that it is way harder to use than batch. We need to take into account data ordering, late data, data duplication and proper scalability when we have data peaks and many more. As a result, most have opted to continue using databases to transform data and running the data processing in batches at the expense of not meeting performance requirements.
As a mediatory solution, we can consider streaming databases, which expose SQL and are easier to use than stream processing engines but give you real time decisions compared to batch processing.
To understand what streaming databases are and where they came from, we need to go back to a few major topics that we need to understand before we dive into them.
Streaming platforms as response to the need to deliver real time analytics in a highly paced world, we need to mention here Apache Kafka Streams, Apache Flink, Apache Spark and Apache Beam (there are plenty of them, this article is too short to be able to name them all). Engines mentioned earlier separate compute and storage, but issues arise when you need to expose your data to the outside world, such as for analytics purposes. In these cases, you may need to manage connectors to other sources yourself, unless you are fortunate enough to use an already implemented, fully functional connector.
As a serving step we can consider Real time olap systems like Apache Pinot or Clickhouse. Recently also popped up Apache Paimon which might be an alternative to the above mentioned systems.
The idea of streaming databases came from materialized views, which represent a calculated portion of data. Compared to the views, the result is cached once computed; unfortunately, it needs to be recalculated from the outside, which might take time. It's becoming slow and impossible to scale correctly for vast amounts of data, but it might be handy for more minor data to simulate the real time result.
Streaming SQL is a relatively new concept. Apache Spark and Apache Flink started from non-SQL data processing engines; the need to create streaming SQL came after.
Streaming SQL originates from the same familiar language, Structured Query Language (SQL), but extends it to handle streams of rapidly changing data. SQL simplifies processing, allowing you to focus on what's essential for delivering business value. However, it can also become a bottleneck, as it may limit your flexibility when dealing with unique problems that differ from the industry's typical use cases.
The key difference between SQL and Streaming SQL is that first, we deal with the point-in-time problem; with Streaming SQL, it's a continuous query.
A query run on RDBMS needs time to be processed, and once you get the result, the data might not be up to date. The main goal of streaming SQL and streaming databases is to give quick results on fast-changing event streams when data is still relevant.
Streaming databases are designed to process, analyze, and store real-time data. They provide SQL interfaces for data manipulation, with the core abstraction being a materialized view, which plays a vital role in the system. These databases support streaming concepts such as windowing, exactly-once semantics, and watermarks.
Now, let's compare streaming databases with the technologies we are already familiar with.
RDMS - they share a few concepts like materialized views and SQL as a language to talk to your data; the main difference is the streaming databases are trying to optimize streaming use cases, materialized views are constantly updated.
Analytics databases like Redshift, BigQuery, and Snowflake are focused on batch processing at first.
Real-time OLAP databases like Apache Pinot and Clickhouse focus on quickly run analytics queries, not recent data or incremental computation.
Streaming processing engines like Apache Spark and Apache Flink expose SQL as a way of wrangling the data but need external storage.
Streaming processing engines like Apache Spark and Apache Flink - expose SQL as a way of wrangling the data, but they need external storage.
In that section, let’s dive into the details of materialize.
Materialize is a cloud-native data warehouse purpose-built for operational workloads where an analytical data warehouse would be too slow and a stream processor would need to be simplified.
The first phrase we can find on the official GitHub repository of materialize they aim for are use cases where solutions like Bigquery or Redshift are too slow but Apache Flink and similar are too complex.
Materialize helps you perform analytics on live streaming data; it focuses on providing precise answers to business questions based on fresh data at a given time with low latency.
The fully managed cloud solution is horizontally scalable in addition to the fact that as storage materialize is using s3 it, can be scaled almost infinitely.
What is really important to notice is that materialize for reads supports PostgreSQL protocol, so you can easily connect materialize to the dashboard and play with the tool interactively in your favorite IDE.
Specific instances where indexes can be useful to improve performance include:
Materialize supports a variety of data sources out of the box
To add a connection, the SQL code is as simple as that:
CREATE CONNECTION mysql_connection TO MYSQL (
HOST <host>,
PORT 3306,
USER “materialize”,
PASSWORD SECRET “password”,
SSL MODE REQUIRED
);
or for kafka
CREATE CONNECTION kafka_connection TO KAFKA (
BROKERS (“b-1.hostname-1:9096”),
SASL MECHANISMS = “SCRAM-SHA-512”,
SASL USERNAME = “foo”,
SASL PASSWORD = “password”
);
Great, we have quite a rich connection list, but what we can choose as our sink when writing this blog post is that Materialize supports 4 data sinks.
It’s understandable as the product aims to keep the data within it, and you can relatively easily integrate it into the external world via a confluent cloud.
Let’s explore what tools we can use in materialize.
We can use primitive types like text., integer, datetime, byte, float, and double. Also, we can use PostgreSQL jsonb format, but it is worth mentioning that we have at our disposal record types, which can consist of any subtype (nested types, which are handy for any analytical workload)
In the SQL and functions section in the documentation, materialize supports all the functions (or almost all) we know from Postgresql, an excellent starter to be able to use the product in production.
materialized views
Maintaining a materialized view in durable storage comes with resource and latency costs that should be carefully weighed based on the view's primary purpose. It is advisable to create a materialized view if:
The results need to be accessible across multiple clusters;
Independent scaling of view maintenance and query serving is beneficial;
The view's final consumer is a sink or a SUBSCRIBE operation.
However, if the view is only needed within a single cluster, creating a regular view and building an index on it might be more efficient. The index will keep the view's results updated incrementally in memory within that cluster, thereby avoiding the resource and latency costs associated with materialization.
view + index
While it's possible to query a materialized view directly, doing so interacts with the Materialize storage layer. Although this is generally fast, it's slower than reading from memory. It's recommended to create indexes tailored to common query patterns to enhance query performance on materialized views.
It's important to note that indexes are specific to each cluster and are maintained in memory. For instance, if you create a materialized view and an index on it in the quickstart cluster, queries from a different cluster won't benefit from that index. To optimize performance, you should create the necessary indexes in every cluster where you plan to reference the materialized view.
Temporal filters (time windows)
A temporal filter is a condition applied in a WHERE or HAVING clause that leverages the mz_now() function. This function provides the current virtual timestamp in Materialize, which updates in sync with real-time data processing. Using a temporal filter, you can narrow down the dataset to focus on recent records, thereby conserving memory and emphasizing recent data.
An example of a temporal filter might involve selecting records with timestamps from the last five minutes.
WHERE mz_now() <= event_ts + INTERVAL '5min'
Periodically emit results
Imagine we need to calculate the number of records in a 1-minute window grouped by some column as an additional requirement. We don't have to update when it happens. Instead, we want to emit data for each window (trigger). We can use date functions for that use case (date_bin). Great example: you can find the official product documentation
Late arriving events
In many streaming use cases, we have to deal with late data; it might come from network issues, massive data peaks, or the user sending data with delay. How can materialize help you with that? We can use a temporal filter to filter the most recent hours' worth of records.
WHERE mz_now() <= event_ts + INTERVAL '1hr'
Window functions (OVER clause)
The platform offers a variety of window functions to handle complex business rules. Users can use window functions, such as LAG, LEAD, ROW_NUMBER, and FIRST_VALUE, and many more. When writing this article, recompute all the records for the entire window partition when an input record is added, removed, or changed in the partition. So, if we have a materialized view and imagine in 1s recalculate interval, your data changes include 20 records in 10 partitions, then if your average partition has 200 records, it means the data has to be recalculated based on 2000 records. So, it's essential to write the query to reduce the number of records that need to be considered when refreshing the query result. As documentation stands, materialize should be able to keep up with 1,000,000 per second. Consider rewriting the query to handle that load if the requirements exceed that.
Now, after understanding materialize better, let's create an application.
Dataset
We have data about ad clicks
ad_id, region_id, timestamp_millis
The example record might look like this:
{
“ad_id”: “7d9d3361-4990-4da2-b489-9c8ce1a08919”,
“region_id”: “5d206029-a4f6-4137-8d85-5a515b8ac1f7”,
“event_time”: 1724592694014
}
Our goal is to calculate ads in a given 1-minute window.
In the demo, let’s assume that we already have the table with the data; the DDL looks like this:
CREATE TABLE ad_clicks_by_region (
ad_id UUID,
region_id UUID,
event_time TIMESTAMP
);
Then we need to bucket our results into 1-minute windows.
CREATE VIEW bucketed_ad_clicks_by_region
AS
SELECT
ad_id,
date_bin(
'1 minute',
event_time,
'2000-01-01 00:00:00+00'
)
+ INTERVAL '1 minute'
AS window_end
FROM ad_clicks_by_region
WHERE mz_now() <= event_time + INTERVAL '30 days';
And now finally lets create materialized view aggregated by ad_id
CREATE MATERIALIZED VIEW ad_clicks_by_region_agg
AS
SELECT
ad_id,
count(ad_id) AS count,
window_end
FROM bucketed_ad_clicks_by_region
WHERE
mz_now() >= window_end
AND
mz_now() < window_end + INTERVAL '7 days'
GROUP BY window_end, ad_id;
We can subscribe to the result and see the changes using local Python code.
dsn = f"user={user} password={password} host={host} port=6875 dbname=materialize sslmode=require"
conn = psycopg2.connect(dsn)
with conn.cursor() as cur:
cur.execute("DECLARE c CURSOR FOR SUBSCRIBE ad_clicks_by_region_agg")
while True:
cur.execute("FETCH ALL c")
for row in cur:
print(row)
let’s insert some data
INSERT INTO ad_clicks_by_region VALUES ('b8d75e37-3408-4f2f-a003-5f8d19d7066b', 'fea33889-7dda-4019-afd5-5834157aa054', now());
INSERT INTO ad_clicks_by_region VALUES ('b8d75e37-3408-4f2f-a003-5f8d19d7066b', 'fea33889-7dda-4019-afd5-5834157aa054', now());
INSERT INTO ad_clicks_by_region VALUES ('4cc0a135-7566-4087-955f-0a32b0786728', '6d3a6842-dffa-4b0f-8958-ec72c6d8903b', now());
We can see when window ends we see the result
let’s run the same query multiple times
Streaming databases are designed to handle continuous streams of data in real time. Unlike traditional databases, which store and process data in batches, streaming databases process data as it arrives, enabling immediate insights and actions based on the latest information. They enable SQL as a language of communication to help deliver business logic faster, in addition to the fact that solutions available on the market like materialize.
In conclusion, streaming databases are essential for applications requiring real-time data processing and insights. They provide businesses with the ability to act on the most current information available, driving more responsive and data-driven decision-making processes. However, they also come with challenges related to complexity, consistency, and scalability. However, compared to stream processing engines like Apache Flink, streaming databases like Materialize and Rising Wave are way more accessible to start with, but that comes with the cost of being flexible to create the solution for a problem we are dealing with. Worth keeping in mind that analytical databases like BigQuery or Snowflake recently started initiatives with streaming tables, which can be more flexible for users already using the products.
If you need help defining the best solution for your organization or implementing it, sign up for a free consultation with one of our experts.
Let’s take a little step back to 2023 to summarize and celebrate our achievements. Last year was focused on knowledge-sharing actions and joining…
Read moreSo, you have an existing infrastructure in the cloud and want to wrap it up as code in a new, shiny IaC style? Splendid! Oh… it’s spanning through two…
Read moreThe rapid growth of electronic customer contact channels has led to an explosion of data, both financial and behavioral, generated in real-time. This…
Read morenote: Read the second part of this post here. Introduction Many companies nowadays are facing the question, “How can I get value from my data easier…
Read moreOne of our internal initiatives are GetInData Labs, the projects where we discover and work with different data tools. In the DataOps Labs, we’ve been…
Read moreThe 2024 edition of InfoShare was a landmark two-day conference for IT professionals, attracting data and platform engineers, software developers…
Read moreTogether, we will select the best Big Data solutions for your organization and build a project that will have a real impact on your organization.
What did you find most impressive about GetInData?