Following is a case study on a fun evening project in F# for Kafka and EventStore:
Problem
The product matching and taxonomy engine at Jet.com is a large event driven distributed system consisting of nearly 400 microservices that process a half million messages per minute on a quiet Sunday. Clustered linear data stores, such as Kafka and EventStore, are used to retain and distribute many of these messages.
For auditing and diagnostic purposes, finding an individual message in these datastores could entail scanning a huge number of
events in linear time, algorithmically on the order of O(n)
. In practice, this could take days.
Although recent versions of these technologies maintain both timestamps and sequence numbers, we may want to search based on the value of monotonically increasing fields inside the event’s Json document. Any field containing a date, incremental id or value other than the message commit date and sequence number. This is often possible because of ordering guarantees among upstream microservices.
The goal is to locate the precise offset (and partition in the case of Kafka) of messages at the lower bound of the desired date time, or id. And this needs to be done quickly without drawing down and processing the vast number of messages in the data store.
Requirements
Any solution should support technical PMs who need to quickly locate such a message. There may be an urgent need to trace down the contents of an event that could have been transmitted during a large window two or three days ago. It needs to be able to accept different path specifiers to identify the search field for different kinds of documents, and be able to query arbitrary topics and streams. It’s possible that not all messages in a stream even contain the desired search field, so it must operate within an environment of partial information.
Thus, the solution should be be friendly, fast, and flexible.
Strategy
We assume that the contents of the target search field is monotonically increasing. Imagine the following sample from a stream of data:
Partition | Offset | Contents |
---|---|---|
7 |
31415 |
{ "target": 296962622 } |
7 |
31416 |
{ "target": 296963894 } |
7 |
31417 |
{ "target": 296963894 } |
7 |
31418 |
{ "target": 296967168 } |
7 |
31419 |
{ "decoy": 1024513301 } |
7 |
31420 |
{ "target": 296970554 } |
7 |
31421 |
{ "target": 296971532 } |
Note that the target
field value is trending upward, but not necessarily one-to-one with the offset value. It may
be missing [31419]
, and it may appear several times [31416;31417]
.
Binary Search Algorithm
Fortunately, this problem looks like a slight variation of the childhood number guessing game.
In that game, you ask a child to guess what number you’re thinking of between 1
and 100
. When they say “4”, you respond,
“Nope, it’s bigger than 4!” An astute child will then guess some number between 5
and 100
to earn a cookie.
In introductory computer science, the binary search algorithm solves that game in logarithmic
time, or O(log n)
.
If you’re looking for a message with "target": 296971532
, you might start by probing offset 31418
. Seeing that 296967168
is too low, you might
then look at offset 31420
and then 31421
to find the lower bound of 296971532
.
Partial Information
However, if you probe partition 7
, offset 31419
above for the target
field, you will be unable to extract the target
value.
Thankfully, most clients for Kafka and EventStore are designed to efficiently stream subsequent events.
In this case, the algorithm needs to be modified to also consider target
values later in the stream including offsets 31420
and above.
It should also be able to handle situations where the tail of the event stream entirely excludes this target
field.
Optimization
To expand on this, consider also the rate of elimination and the time it takes to probe an offset:
For your first guess, you may be able to eliminate half of the entire data store, say a billion events. On your second guess, you may even eliminate an additional quarter: five hundred million. If it takes a second to complete each of those probes, then your information gain definitely exceeds your cost.
Within the next 20 probes, you could potentially narrow the range of possibilities to 2000 events at a cost of, say, 20 seconds.
To precisely locate the lower bound could cost another 11 seconds. However, depending on the batch size of your driver, you may already be streaming 500 events at a time. Simply iterating all of the remaining 2000 events could be completed in far less time.
As a result, you want an algorithm with both seek and scanning capability, transitioning from seek to scanning at some point in the search. In practice, this threshold is around 3-5000 messages depending on the size of the cluster and the availability of the data.
Implementation
Since Jet.com uses F# as its predominant language, there are a wide variety of options available to query both EventStore and Kafka, as well as to operate on Json formatted messages. Designing sophisticated user interfaces is also quite easy with F# and Xaml.
JsonPath
Querying a Json document using JsonPath makes it easy for an end user to specify different target
fields depending on their needs. I’ve
written extensively about this, and designed a quick JsonPath library for use from F#.
Streaming
A selection of clients exist for streaming data:
- EventStore’s official .NET Client
- Kafka support from a number of clients, including:
- Kafunk for F#, under development.
- RdKafka wrappers:
- by Andreas Heider is stable, but no longer maintained.
- by Confluent replaces it, but is pre-release.
- Microsoft C# Client is an older client in the same family as Jet’s original Marvel client.
- kafka-net a client by James Roland.
In this application, I used the Nata.IO library, which provides a common abstraction over stable versions of the above clients. It includes a consistent mechanism to query the range of available offsets, as well as to stream from arbitrary positions in the target event log.
User Interface
To build a modern WPF user interface in F#, I also used the FsXaml library. Although the documentation is sparse, it is a powerful tool with nearly the same flexibility and capability as the well-known C# tooling in Visual Studio. In fact, it supports the same interactive visual designer for Xaml.
A video screencast of the final interface:
Feature Overview
Many of these features work together to solve the problem outlined above. These include:
- Selecting either Kafka or Eventstore with custom host name, and topic or stream.
- Targetting a
Timestamp
,Text
orNumeric
value from a field that increases over time:- Custom JsonPath queries such as
$.envelope.timestamp
to extract this field. - Custom transition threshold from
Seek
toScan
.
- Custom JsonPath queries such as
- Full controls to Start, Stop and Resume multiple searches simultaneously.
- Visibility into the complete search state and progress.
- Copy/Paste support into Excel for subsequent analysis.
Source Code
-
Algorithm
The search algorithm is initialized with connection information to query the
indexOf
bounds within the topic or stream, andreadFrom
arbitrary positions within those bounds. When asked to execute it performs the search and emits periodic updates to the search state. Extraction of thetarget
is performed using a codec that transforms a raw byte stream into a stream of the desired Json field.This algorithm started its life as a simple F#
.fsx
script. -
User Interface
Originally created using C#, the FsXaml UI retains all of the original design except for some unneeded code-behind (e.g.
*.xaml.cs
).Models use dependency properties to expose search state and query connection settings for binding. Views are expressed using the exact same XAML as the original C# UI, including a composition of user controls for entering connection settings and displaying the grid of search data. Additional view-models provide the binding and commands for the application to manage the searches or connect to data stores.
The primary difference in F# is the use of type providers to generate type information from the XAML to use in subsequent F# code.
Once assembled, the application is wired up and compiled as an F# Windows Application.
-
Life-cycle
To better manage the end-user experience of the application over time, I also created a WiX Toolset setup file to create installers.
This is paired with a user control to query the latest version and provide an unobtrusive notification in the event of a new release.
Summary
The resulting application demonstrates how a simple and clean F# stream-processing algorithm can be exposed with a complete and modern user inteface.
It pulls together technology for EventStore and Kafka, query capability with JsonPath, view layout with FsXaml
type providers, and a host of other
elements designed to make this kind of application easy.
It performs extremely well. In scenarios such as the screencast above, it can locate the lowerbound of a timestamp among four million messages in less than three seconds. Moreover, the logarithmic nature of the algorithm lets us scale to many many times that amount of data with a few extra seek operations. For example, a billion events requires only 8 extra seeks (typically on the order of a few milliseconds each).
Resources
To play with the source code or download a copy, visit me on GitHub:
https://github.com/j-alexander/binary-log-search
Note that while I’ve used this tool at Jet, the company makes no warranty with respect to its use in your own environment. It’s open source. Review and edit to meet your own unique needs! And have fun!
If you’d like to learn more about technology careers at Jet.com, visit https://tech.jet.com/. ☺