SIPping from a firehose

Vedant Puri
Klaviyo Engineering
11 min readDec 18, 2023

--

I never knew updating the favorite color for a mock profile could be this captivating. With just a few keystrokes, we watched the update traverse different components in the system and cause the profile to land in the expected segment. After months of design work, this moment marked the first end-to-end demonstration of the new segmentation system. We previously wrote about our segmentation improvement project. This post drills into our profile change data pipeline that handles roughly a billion changes per day.

First, some terminology:

  • A profile represents a human, usually a customer or a potential customer of a company using Klaviyo.
  • A trait is a property of a profile. For example, favorite color, postal code, or SMS consent status.

Profiles can be segmented into groups based on any combination of their traits (as well as event data — outside the scope of this post). Different types of traits belong to different product areas (domains) owned by different engineering teams using datastores suitable for their requirements. For example, the subscriptions team manages profile consent data, whereas the data science team manages predictive analytics (e.g. customer lifetime value). These teams maintain the source of truth of these traits.

In the previous segmentation engine, group memberships were computed by issuing queries against these multiple source of truth systems via their service interfaces (i.e., their APIs). As mentioned in the previous post, this design held up well for many years, but it was approaching a point where we had limited options to scale further. The databases underlying these other product areas were typically optimized for transactional (OLTP) workloads and not the high throughput analytics queries from the segmentation engine.

Our new segmentation engine uses a centralized database optimized for analytical workloads (OLAP). We chose ClickHouse as our OLAP database as explained in the post referenced above. We needed a mechanism to keep profile trait data in sync between ClickHouse and its source of truth in various product areas. Synchronizing data sources in a distributed system is a notoriously hard problem by itself. It gets even more challenging when the change volume peaks at a million changes per minute and there are multiple data sources involved. To top it all off, this was one of the first major implementations of this technique at Klaviyo.

There were two pieces to making this work: loading the preexisting data and replicating ongoing changes. We will be focused on the latter in this post.

Acknowledging the scale we were dealing with, we decided to replicate these changes asynchronously resulting in eventual data consistency between the two copies of data. Thus, there needed to be a message bus that shipped the changes from the source database to our service. These would then be transformed and persisted in our ClickHouse cluster for segment processing. The question was how to capture these changes in the source database and put them on this message bus.

A whimsical representation of the missing piece required to connect the source of Truth with ClickHouse.
Our beloved project mascot, the SIP cat, eyeing the change data pipeline

We considered four approaches:

  • Log-based Change Data Capture — Consume database transaction logs from the source.
  • Event Carried State Transfer — Consume change events with the latest state of the record at the application level.
  • Event Notification — Similar to previous, but instead of the latest state, consume only identifiers for the changed records, and then pull the relevant change information using these identifiers.
  • Event Sourcing — Generate and consume a log of every change to every record at the application level and build the latest state using these changes.

We chose Event Notification. I’ll first give an overview of how our system works with this. Then I’ll go back and explain why we decided against the other approaches.

Profile trait data is constantly updated in various product areas. Whenever a record changes, a change notification containing the profile identifier is published to a topic designated for that trait using a simple API.

A consumer, specific for the trait, extracts the necessary fields from the latest version of this record from the source of truth by calling into the corresponding service interface. In the same process, this fetched data is massaged into the payload expected by the segmentation service. The time of read is also recorded in the payload denoting the timestamp of the current state of the record. Finally, the transformed payload is funneled into a unified traits topic.

A consumer in the segmentation service consumes the mixed trait change payloads in batches and inserts them into a ClickHouse table. The ClickHouse table schema and engine are configured to keep the trait records for a profile with the latest read timestamp records, no matter what order they are inserted in. Here is a visual representation:

A flow diagram visualizing the journey of a profile update from the domain boundary to the Segmentation ClickHouse
The change data pipeline architecture

A separate process observes the recently made changes and re-evaluates memberships for the changed profiles for relevant segments. This separate process is how real-time segmentation works.

Before we get into why we chose Event Notification, let’s talk about how we store traits in our centralized database, since that decision came first and drove us to our design.

Storing Profile Data in ClickHouse

Due to the critical nature of ClickHouse in this project, we decided to nail down the schema and query design first and then implement the rest of the system around these decisions.

Unlike with event data (e.g. a profile looked at a product details page or made a purchase), for traits, segmentation only cares only about the current state. This detail was interesting for data ingestion. In a traditional OLTP database, we would simply update the existing record when new data comes in. But, updates are not supposed to be a frequent operation in ClickHouse. In fact they are performed using an ALTER TABLE operation to discourage frequent usage (docs).

We had a few ideas for how to construct the latest state of a profile trait:

  • Ingest change deltas to profile traits, look up the current state of rows in ClickHouse, apply the change and write the latest version back. Beyond the need for some locking mechanism for concurrency control, this would be an anti-pattern in ClickHouse due to the high volume of point reads involved (docs).
  • Similar to above, but instead of resolving the final state in memory, we could persist the change delta that came in and build the state asynchronously or at query time. This is essentially how the event sourcing pattern works. Since there was no requirement for point-in-time queries, this seemed unnecessarily complex for our use case, and also inefficient from a processing / query point of view.
  • Finally, we could design ingestion to always transmit the final state of the row and insert (append) it directly into the table. At query time, we would look at the state as of the last insert. This was by far the simplest and cleanest approach and is what we went with.

Based on this decision, we chose the ReplicatedReplacingMergeTree engine, which removes older timestamped records at some point in the future. A consequence of this is idempotence: If the record with the matching key is inserted with the same timestamp, it has no meaningful effect. Similarly, if a record with an older timestamp is inserted after a record with a newer timestamp, it has no meaningful effect.

This means, as long as we have correct timestamps associated with the traits as they come in, we can relax any ordering constraints in the ingestion pipeline. Not only was this convenient when loading pre-existing data while simultaneously ingesting live data, it has also proven its worth during incidents, planned maintenance windows, or any failure that requires data replay. Having worked with OLTP databases with no such in-built mechanisms, this is one of my favorite behavior patterns of this system.

Exploring log-based change data capture solutions

One idea we explored was getting trait changes from the database level in the various product areas. For example, there are tools like Debezium that sit behind a database and leverage change logs from the database to produce a change data stream. This is essentially what our goal was here and using an off-the-shelf solution was tempting, but we chose to avoid using it for a few reasons.

  • We didn’t want the operational overhead of a new technology, especially since we were already introducing a few other technologies that were new to the team and more critical to the project.
  • We didn’t want to risk impairing various product areas from reliably and efficiently processing incoming requests due to additional load or constraints placed on their databases.
  • We worried about blocking teams from confidently running common operations such as database migrations. Migration tools like pt-online-schema-change (PTOSC) work by placing triggers on the tables which conflict with how these change capture tools observe data change. This conflict can result in failures in capturing changes.
  • We worried that plugging a tool behind the databases of other product areas would add friction to changes such as sharding, running custom maintenance, swapping the DB technology, or even simply renaming databases and tables. Plus, this would be yet another component to think about when planning version upgrades and handling routine or urgent failovers.

To move the project along efficiently and avoid creating a burden on other teams, we planned for our team to implement the sync for all profile traits and then transfer ownership of the code to the respective teams. In addition to the concerns above, implementing log-based CDC would require bespoke configuration per database technology and throughput requirements. This would have been cumbersome to implement for several existing trait types and also not a sustainable pattern for the future.

We concluded that managing these concerns and behavior was much easier (or irrelevant) if we worked at the application level rather than the database level. So, we moved to exploring application-level, decoupled interfaces for publishing change notifications.

Choosing Event Notification over Event Carried State Transfer

This section discusses the final two patterns we considered: Event Notification (the one we picked) and Event Carried State Transfer.

The Event Notification pattern works by publishing an event containing only an identifier of the changed record. Then, if needed, the consumer can call back to the publisher system to fetch details about the change. In our context this means publishing an identifier of the profile for which trait data was modified.

The Event Carried State Transfer (ECST) pattern works by publishing an event that contains the latest state of data after the change. In our context this would mean publishing the state of profile trait data after the change was made.

With ECST, there were two options for the event payload schema. Either design the payload to be specific for segmentation so that it could be directly consumed by our service, or create a generic schema capable of supporting multiple use cases. The latter would need an additional transformation step before being consumed by our service. Let’s talk about both possibilities.

Implementing a subscriber-specific schema would result in tight coupling. This would also lock us in to whatever information was available within what was published and make it less natural to join data from other product areas, at least not without an extra transform step (which questions the need for this tight coupling from the outset). This wasn’t just a theoretical concern because changes in some traits do need to be joined with other profile fields to provide the segmentation engine with the complete picture.

Sending events with a broader, multi-purpose schema, although possibly better in the grand scheme of things, wasn’t an ideal choice either. Besides the lack of other use cases that would benefit from a broader schema (YAGNI) and the time it would have taken to agree on a generic enough payload, we would have ended up ignoring most of the fields on this broader schema. Plus, generating and sending the larger payload would increase compute and latency.

The event notification pattern was appealing. It could serve as a loosely coupled interface that gave us control over what data to pull and transform it according to our needs. Moreover, since it was similar to how legacy segmentation sent notifications for real time updates, we were confident that we could iterate on this pattern quickly.

Another consideration was transactions. With ECST, extremely careful code would need to be written in each product area to ensure that emitted change events matched with committed transactions. It’s straightforward to see how a successful publish performed in a transaction that is later rolled back is bad. The problem becomes simpler with Event Notification since even if a change is rolled back, the consumer will always read the latest version of the trait. Since our ingestion logic is idempotent, duplicates are not a problem.

Overall, the event notification pattern offers a leaner, loosely-coupled interface for different product areas to notify of profile changes. A simple payload with profile identifiers, published to a per-trait topic, is consumed by a process that can query the product area to fetch as few or as many fields as wanted, and join against whatever other data is necessary, leaving room for trait schema flexibility and evolution. As a bonus, we didn’t need to dive into the product area object specifics to implement this, we only needed to know the interfaces. But, like any design, this has blemishes too.

Recent Changes

This project was officially marked done in June this year. Since then, we have learned a few things about our design. As part of our Black Friday, Cyber Monday preparations, we made two important adjustments.

Change hints

One of our current trait types is list memberships. In the trait change pipeline this is represented as an array of list references a profile is currently a part of. Just like other traits, we transmit the latest version of list memberships to the segmentation service.

Based only on this latest state, it is impossible to tell if a profile was removed from a particular list. This meant that for a profile, the system had to re-evaluate memberships for all segments that had a list membership criterion. If a large number of changes were made to a particular list (e.g. manually importing profiles into a list) and that company had many segments with a list membership criterion, the cardinality of segment-profile pairs to re–evaluate was basically the cardinality of the cross product. This heavily increased the work our queries had to process and often resulted in backups in real-time segmentation.

This problem was an outcome of how the notification pattern was implemented. For this specific case, we realized that changes to list memberships happened for a single list at a time. We now pass the identifier of the changed list through our pipeline as a hint, alongside the current list memberships for the profile. With this augmented information, we know which list reference was involved in this change and re-evaluate memberships for this profile only for segments that have a list membership criterion for this list. This reduces unnecessary load.

Buffering inserts

Our ClickHouse cluster is sharded. In the original design, a single ingestion process consumed large batches of trait changes and fanned out inserts to different shards in memory. We assumed that at such a high volume, keeping our consumer configured to buffer messages for a few seconds would result in large batches for all shards. This was not what we observed, though. In a single insert statement, the number of records inserted per shard was notably lower than the ideal of 10K-100K records that ClickHouse recommends.

To solve this problem, we dispatched trait change messages from this topic into a topic dedicated for each shard. When inserting, the consumer pulls large batches for that specific shard. Now, our insert batch sizes are rarely below the ideal range.

Key Takeaways

We’ve been in production for half a year and are very positive about the design. Observations:

  • Building the system to be idempotent and free of ordering constraints has proven extremely convenient.
  • We’re happy that we didn’t bring in a new technology. Keeping the design simple and largely free of unfamiliar third-party tools has made it convenient to maintain and make changes to this system.
  • Having a standard ingestion pattern for all traits has made it easy to add new trait types and the decoupled nature of the system has simplified evolution of existing trait types. This was validated very recently for a project which required adding a new trait type and expanding the schema of two existing trait types.

The change data pipeline was one of the important building blocks of the new segmentation engine. We’re glad we landed on a practical architecture that meets our scaling requirements and is also straightforward to operate and evolve.

--

--