Streaming Postgres to Kafka and Maintaining Consistency and Transaction Isolation

I've been thinking more and more about the pattern of streaming database transactions into Kafka (e.g. Debezium or bottledwater). This could theoretically enable decoupled low-latency real time applications based on the data produced from, say, a Postgres database.

But the more I think about it, this architecture would require any downstream consumers to shoulder the effort to not introduce race conditions or data inconsistency due to processing only part of a database transaction.

For instance, Debezium (and bottledwater) creates separate Kafka topics for each Postgres table. So when a transaction is committed that affects 5 tables, 5 different corresponding Kafka topics will each get at least 1 event appended to them (depending on how many records in that table were updated/inserted/removed).

Any stream processor consuming these Postgres-driven Kafka topics that wants to maintain the same data consistency and transaction isolation of the source database will need to be very careful not to produce data from only part of the original database transaction.

Particularly if you're doing a "live join" between tables, your stream processor will need to be smart enough to wait for ALL of the Kafka topics it is "joining" to produce an event tagged with the most recent transaction ID before it processes the data. If it were to naively process the data right when the first event on one of the joined topics arrived, it might produce incorrect results for downstream consumers due to the fact that it essentially joined data from a partially-applied Postgres transaction. Rather, when that first topic produces an event, the stream processor must be smart enough for all other topics involved in the "join" to produce values before joining the data.

The problem is, without some separate bookkeeping, there's no way to know whether you've received all of the events for a given transaction ID. Let's say you're joining on topics a b and c that correspond with source Postgres tables, and that you always want to produce values based on the latest "values" of a b and c. Since a Postgres transaction might include 0 or more row changes for each table, how would your stream processor know whether it's received ALL of the corresponding events for each of the joined tables? And how would it handle transactions where one or more of the tables was unchanged (and its corresponding topic produced 0 events)?

I believe the solution to this problem is that there needs to be a separate Kafka topic that contains information about which tables were changed in a given transaction. The key of the Kafka topic would be the transaction ID and the value payload could look something like:

[
  { topic: 'a', keys: ['id_0001','id_0002','id_0003'] },
  { topic: 'b', keys: ['id_9994'] },
]

This payload describes which topics are part of the transaction and which keys are impacted (the keys correspond to primary keys from the underlying table). It should be just enough information for a stream processor to know when all of the topics involved in a transaction have reported in.

Alternatively, the payload could possibly use Kafka topic offset numbers instead of keys, but I'm not sure if this would introduce performance issues since it wouldn't be possible to append this meta event to the log until all other appends had reported back with their offset numbers.

Availability Issues When Joining Multiple Topics

One downside to the approach of mapping each Postgres table to a separate topic is that if one of the topics in your "live join" goes down, it blocks the entire stream processor: if you're joining a b and c and c goes down, you'll stop producing values for downstream consumers in the name of consistency, and you'll also start accumulating large buffers for the still-functioning a and b topics.

You could entertain the idea of overriding the default one-topic-per-table mapping and instead just combine all tables changes into a single Kafka topic, but then you'd wind up with a topic containing a heterogenous collection of events, and you'd miss on all of the benefits of using a Schema Registry to safely evolve event schemas over time.

If we stick with multiple topics and we wanted to give up a bit of consistency for high availability, we could loosen the constraint that we only operate on complete transactions. For instance, we could simply ignore the constraint entirely and treat each event consumed from a joined topic as an opportunity to produce a new value for downstream consumers. This naive approach would likely benefit by applying a small timed buffer/debounce 1) so that we don't overwhelm downstream consumers (or the stream processor's own resources) with a bursty stream and 2) it gives the processor a chance to collect all events that are part of a transaction from all of the topics.

We could also use the approach described earlier in this post (using a separate meta transactions topic to describe which topics/events are part of a transaction), but if for whatever reason one of the joined topics takes too long to produce data, we stop waiting for it and perform the join on likely/possible stale data.

Thanks to Debezium maintainer rhauch for clueing me into many of these issues. If you're interested in discussing some of these issues further, you should join the Confluent Community Slack (and join the #connect channel in particular)