{"id":9785,"date":"2024-04-15T11:13:17","date_gmt":"2024-04-15T19:13:17","guid":{"rendered":"https:\/\/live-cometml.pantheonsite.io\/?p=9785"},"modified":"2025-04-29T12:33:03","modified_gmt":"2025-04-29T12:33:03","slug":"llm-twin-3-change-data-capture","status":"publish","type":"post","link":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/","title":{"rendered":"I Replaced 1000 Lines of Polling Code with 50 Lines of CDC Magic"},"content":{"rendered":"\n<p><em>Welcome to Lesson 3 of 12 in our free course series, LLM Twin: Building Your Production-Ready AI Replica. You\u2019ll learn how to use LLMs, vector DVs, and LLMOps best practices to design, train, and deploy a production ready \u201cLLM twin\u201d of yourself. This AI character will write like you, incorporating your style, personality, and voice into an LLM. For a full overview of course objectives and prerequisites, start with&nbsp;<a href=\"https:\/\/www.comet.com\/site\/blog\/an-end-to-end-framework-for-production-ready-llm-systems-by-building-your-llm-twin\/\">Lesson 1.&nbsp;<\/a><\/em><\/p>\n\n\n\n<p><strong>Lessons<\/strong><\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/an-end-to-end-framework-for-production-ready-llm-systems-by-building-your-llm-twin\/\">An End-to-End Framework for Production-Ready LLM Systems by Building Your LLM Twin<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/the-importance-of-data-pipelines-in-the-era-of-generative-ai\/\">Your Content is Gold: I Turned 3 Years of Blog Posts into an LLM Training<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/\">I Replaced 1000 Lines of Polling Code with 50 Lines of CDC Magic<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/\">SOTA Python Streaming Pipelines for Fine-tuning LLMs and RAG \u2014 in Real-Time!<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/advanced-rag-algorithms-optimize-retrieval\/\">The 4 Advanced RAG Algorithms You Must Know to Implement<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-fine-tuning-dataset\/\">Turning Raw Data Into Fine-Tuning Datasets<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/mistral-llm-fine-tuning\/\">8B Parameters, 1 GPU, No Problems: The Ultimate LLM Fine-tuning Pipeline<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-evaluation-best-practices\/\">The Engineer\u2019s Framework for LLM &amp; RAG Evaluation<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-rag-inference-pipelines\/\">Beyond Proof of Concept: Building RAG Systems That Scale<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/rag-evaluation-framework-ragas\/\">The Ultimate Prompt Monitoring Pipeline<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/refactoring-rag-retrieval\/\">[Bonus] Build a scalable RAG ingestion pipeline using 74.3% less code<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/multi-index-rag-apps\/\">[Bonus] Build Multi-Index Advanced RAG Apps<\/a><\/li>\n<\/ol>\n\n\n\n<p id=\"b2ff\">We have changes everywhere. Linkedin, Medium, Github, Substack can be updated everyday.<\/p>\n\n\n\n<p id=\"8fcc\">To be able to have or Digital Twin up to date we need&nbsp;<strong>synchronized<\/strong>&nbsp;data.<\/p>\n\n\n\n<p id=\"2c17\">What is synchronized data?<\/p>\n\n\n\n<p id=\"5c09\"><strong>Synchronized data<\/strong>&nbsp;is data that is consistent and up-to-date across all systems and platforms it resides on or interacts with. It is the result of making sure that any change made in one dataset is immediately reflected in all other datasets that need to share that information.<\/p>\n\n\n\n<p id=\"00b0\"><strong>Change Data Capture(CDC)\u2019s primary purpose&nbsp;<\/strong>is to identify and capture changes made to database data, such as insertions, updates, and deletions.<\/p>\n\n\n\n<p id=\"5da6\">It then logs these events and sends them to a message queue, like RabbitMQ. This allows other system parts to react to the data changes in real-time by reading from the queue, ensuring that all application parts are up-to-date.<\/p>\n\n\n\n<p id=\"0041\">Today, we will learn how to synchronize a data pipeline and a feature pipeline by using CDC pattern.<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1400\/1*SOwb1WF2RcRKi45SSq7rbw.png\" alt=\"\"\/><figcaption class=\"wp-element-caption\">Integrating CDC for Enhanced Data Consistency in LLM System Architecture<\/figcaption><\/figure>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"cae2\">Table of Contents<\/h2>\n\n\n\n<ol class=\"wp-block-list\">\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#c3b6\">CDC pattern \u2014 Overview<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#255a\">CDC pattern \u2014Digital Twin Architecture Use Case<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#a3b6\">CDC with MongoDB<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#8ed7\">RabbitMQ Message Broker<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#c041\">Hands-on CDC: Mongo+RabbitMQ<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#7589\">Running the CDC microservice<\/a><\/li>\n<\/ol>\n\n\n\n<h2 class=\"wp-block-heading\">1. CDC pattern \u2013 Overview<\/h2>\n\n\n\n<p id=\"c3b6\"><strong>Change Data Capture<\/strong>, commonly known as CDC, is an efficient way to track changes in a database.<\/p>\n\n\n\n<p id=\"8ba9\">The purpose of CDC is to capture insertions, updates, and deletions applied to a database and to make this change data available in a format easily consumable by downstream applications.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"11aa\"><strong>Why do we need CDC pattern?<\/strong><\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Real-time Data Syncing<\/strong>: CDC facilitates near-real-time data integration and syncing.<\/li>\n\n\n\n<li><strong>Efficient Data Pipelines<\/strong>: It allows incremental data loading, which is more efficient than bulk load operations.<\/li>\n\n\n\n<li><strong>Minimized System Impact<\/strong>: CDC minimizes the impact on the source system by reducing the need for performance-intensive queries.<\/li>\n\n\n\n<li><strong>Event-Driven Architectures<\/strong>: It enables event-driven architectures by streaming database events.<\/li>\n<\/ul>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"fd10\"><strong>What problem does CDC solve?<\/strong><\/h3>\n\n\n\n<p id=\"36fa\">Change Data Capture (CDC) is particularly adept at solving consistency issues in distributed systems.<\/p>\n\n\n\n<p id=\"baeb\">Consider a common scenario where an application is required to perform a sequence of actions in response to a trigger \u2014 such as a REST call or an event receipt.<\/p>\n\n\n\n<p id=\"87d9\">These actions usually involve making a change to the database and sending a message through a messaging service like Kafka.<\/p>\n\n\n\n<p id=\"d35d\">However, there\u2019s an inherent risk: if the application encounters a failure or loses its connection to the messaging service after the database transaction but before the message dispatch, the database will reflect the change, but the corresponding message will never be sent. This discrepancy leads to an inconsistent state within the system.<\/p>\n\n\n\n<p id=\"e1a2\">CDC solve this challenge by decoupling the database update from the messaging.<\/p>\n\n\n\n<p id=\"b651\">It works by treating the database as a reliable source of events. Any committed change in the database is automatically captured by the CDC mechanism, which then ensures the corresponding message is sent to the messaging queue.<\/p>\n\n\n\n<p id=\"efb5\">This&nbsp;<strong>separation of concerns<\/strong>&nbsp;provided by CDC means that the database update and the message dispatch are no longer directly dependent on the application\u2019s stability or network reliability.<\/p>\n\n\n\n<p id=\"62ef\">By employing CDC, we can maintain consistency across distributed components of a system, even in the face of application failures or network issues, thereby solving a critical problem in maintaining the integrity of distributed systems.<\/p>\n\n\n\n<p id=\"ba6b\">Another advantage of using change streams is that they read from this&nbsp;<strong>Oplog<\/strong>, not directly from the database.<\/p>\n\n\n\n<p id=\"0147\">This method significantly reduces the load on the database, avoiding the common pitfall of throttling database performance with frequent direct queries.<\/p>\n\n\n\n<p id=\"b913\">By tapping into the Oplog, CDC can efficiently identify and capture change events (such as insertions, updates, or deletions) without adding undue stress on the database itself. You can learn more about it here&nbsp;<a href=\"https:\/\/www.mongodb.com\/docs\/manual\/changeStreams\/\" target=\"_blank\" rel=\"noreferrer noopener\">[2]<\/a>,&nbsp;<a href=\"https:\/\/shantanubansal.medium.com\/demystifying-mongodb-oplog-a-comprehensive-guide-with-oplog-entry-examples-3bd716789f78\">[3]<\/a>&nbsp;and&nbsp;<a href=\"https:\/\/www.mongodb.com\/basics\/change-streams\" target=\"_blank\" rel=\"noreferrer noopener\">[4]<\/a><\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:2000\/1*YycR_jRhh-tDbCdAGZGqUw.png\" alt=\"Change Data Capture graphic, Decoding ML for Comet ML\"\/><figcaption class=\"wp-element-caption\">The problem that CDC solves in distributed systems [Generated by ChatGpt]<\/figcaption><\/figure>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"0a36\">Summary of diagram:<\/h3>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Application Triggered<\/strong>: The diagram begins with an application that is triggered by a REST call or an event.<\/li>\n\n\n\n<li><strong>Update Database<\/strong>: The application first updates the database. This is shown as a communication from the \u2018Application\u2019 to the \u2018Database\u2019.<\/li>\n\n\n\n<li><strong>Database Acknowledges<\/strong>: The database acknowledges the update back to the application.<\/li>\n\n\n\n<li><strong>Send Message Attempt<\/strong>: Next, the application tries to send a message through the messaging service (like Kafka). This is where the risk of failure is highlighted \u2014 if the application fails after updating the database but before successfully sending the message, it results in inconsistency.<\/li>\n\n\n\n<li><strong>CDC Mechanism<\/strong>: To resolve this, the CDC mechanism comes into play. It decouples the database update from the messaging.<\/li>\n\n\n\n<li><strong>Database Commit Triggering CDC<\/strong>: Any committed change in the database is automatically captured by the CDC mechanism.<\/li>\n\n\n\n<li><strong>CDC Dispatches Message<\/strong>: Finally, the CDC mechanism ensures that the corresponding message is sent to the messaging service. This maintains consistency across the system, even if the application encounters issues after updating the database.<\/li>\n<\/ol>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"255a\">2. CDC pattern \u2014 Digital Twin Architecture Use Case<\/h2>\n\n\n\n<p id=\"3ab7\">The Digital Twin Architecture is respecting \u2018the 3-pipeline architecture\u2019 pattern:<\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li>The feature pipeline<\/li>\n\n\n\n<li>The training pipeline<\/li>\n\n\n\n<li>The inference pipeline<\/li>\n<\/ol>\n\n\n\n<p id=\"cd02\">But one of the most important component in our architecture is the&nbsp;entry point&nbsp;of the system: the data pipeline<\/p>\n\n\n\n<p id=\"8004\">To ensure our&nbsp;feature store&nbsp;stays up-to-date with the&nbsp;data pipeline, we need a mechanism that detects changes at the pipeline\u2019s entry point. This way, we can avoid discrepancies like having 100 entries deleted from our RAW Database while the Vector Database lags behind without these updates.<\/p>\n\n\n\n<p id=\"9a53\">In the&nbsp;<strong>Data Collection Pipeline,<\/strong>&nbsp;data from various digital platforms like Medium, Substack, LinkedIn, and GitHub is extracted, transformed, and loaded (ETL) into a NoSQL database.<\/p>\n\n\n\n<p>Once this raw data is stored,<strong>&nbsp;the CDC pattern comes into play.<\/strong><\/p>\n\n\n\n<p id=\"457c\">The CDC pattern comes into action after data storage, meticulously monitoring and capturing any changes \u2014 insertions, updates, or deletions within the NoSQL database.<\/p>\n\n\n\n<p id=\"58d2\">These changes then trigger events that the CDC system captures and pushes onto a queue, managed by&nbsp;<strong>RabbitMQ&nbsp;<\/strong>(message broker).<\/p>\n\n\n\n<p id=\"e2a2\">On the other side of the CDC pattern is the&nbsp;<strong>Feature Pipeline<\/strong>, where the data continue to flow.<\/p>\n\n\n\n<p id=\"10c1\">A streaming ingestion pipeline, implemented in&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax,&nbsp;<\/a>takes the queue\u2019s data and processes it in real-time. The processed data includes articles, posts, and code which are then transformed into features, such as actionable insights or inputs for machine learning models.<\/p>\n\n\n\n<p id=\"a2f2\">The processed data is then loaded into a Vector DB (<a href=\"https:\/\/qdrant.tech\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant<\/a>), where it\u2019s organized and indexed for efficient retrieval.<\/p>\n\n\n\n<p id=\"655c\">The Vector DB Retrieval Clients serve as the access points for querying and extracting these processed data features, now ready to be used in various applications, including training machine learning models or powering search algorithms.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"a3b6\">3. CDC with MongoDB<\/h2>\n\n\n\n<p id=\"2a42\">In the world of data-driven applications, timing is everything.<\/p>\n\n\n\n<p id=\"39e9\">The swifter a system can respond to data changes, the more agile and user-friendly it becomes. Let\u2019s dive into this concept, especially in the context of&nbsp;<strong>MongoDB\u2019s change streams<\/strong>, a feature that fundamentally transforms how applications interact with data.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"387f\">Immediate Response to Data Changes<\/h3>\n\n\n\n<p id=\"15d7\">Consider a scenario where LinkedIn posts are regularly updated in our MongoDB database. Each post might undergo changes \u2014 perhaps an edit to the content, a new comment, or an update in user engagement metrics.<\/p>\n\n\n\n<p id=\"8f8c\">In a traditional setup, reflecting these updates into our feature store, specifically&nbsp;<a href=\"https:\/\/qdrant.tech\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant<\/a>, could involve significant delays and manual intervention.<\/p>\n\n\n\n<p id=\"332c\">However, with&nbsp;<strong>MongoDB\u2019s change streams<\/strong>, we implement a observer within our database. This feature is detecting changes in real-time. When a LinkedIn post is edited, MongoDB instantly captures this event and relays it to our data pipeline.<\/p>\n\n\n\n<p id=\"2947\">Our&nbsp;<strong>data pipeline<\/strong>, upon receiving a notification of the change, springs into action. The updated LinkedIn post is then processed \u2014 perhaps analyzed for new keywords, sentiments, or user interactions \u2014 and updated in Qdrant.<\/p>\n\n\n\n<p id=\"447a\">The sweet spot of MongoDB\u2019s change streams is in their ability to streamline this process. They provide a direct line from the occurrence of a change in MongoDB to its reflection in Qdrant, ensuring our feature store is always in sync with the latest data.<\/p>\n\n\n\n<p id=\"d445\">This capability is crucial for maintaining an up-to-date and accurate data landscape, which in turn, powers more relevant and dynamic analytics for the LLM twin.<\/p>\n\n\n\n<p id=\"0256\">Before&nbsp;<strong>change streams<\/strong>, applications that needed to know about the addition of new data in real-time had to continuously poll data or rely on other update mechanisms.<\/p>\n\n\n\n<p id=\"53d5\">One common, if complex, technique for monitoring changes was tailing&nbsp;<strong>MongoDB\u2019s Operation Log (Oplog).<\/strong>&nbsp;The Oplog is part of the replication system of MongoDB and as such already tracks modifications to the database but is not easy to use for business logic.<\/p>\n\n\n\n<p id=\"c0ac\"><strong>Note that you cannot open a change stream against a collection in a standalone MongoDB server because the feature relies on the Oplog which is only used on replica sets.<\/strong><\/p>\n\n\n\n<p id=\"93d0\">When registering a change stream you need to specify&nbsp;<strong>the collection<\/strong>&nbsp;and what&nbsp;<strong>types of changes<\/strong>&nbsp;you want to listen to. You can do this by using the&nbsp;<code>$match<\/code>&nbsp;and a few other aggregation pipeline stages which limit the amount of data you will receive.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"8ed7\">4. RabbitMQ Message Broker<\/h2>\n\n\n\n<p id=\"44e1\"><a href=\"https:\/\/www.rabbitmq.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">RabbitMQ<\/a>&nbsp;is a reliable and mature messaging and streaming broker, which is easy to deploy on cloud environments, on-premises, and on your local machine. It is currently used by millions worldwide.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"c7f8\"><strong>Why do we need a message broker?<\/strong><\/h3>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>Reliability<\/strong>: RabbitMQ guarantees reliable message delivery, ensuring that change events are conveyed to the Feature Pipeline, even in the face of temporary outages.<\/li>\n\n\n\n<li><strong>Decoupling<\/strong>: This enables loose coupling between services, promoting autonomous operation and mitigating the propagation of failures across the system.<\/li>\n\n\n\n<li><strong>Load Management<\/strong>: It evenly distributes the data load across multiple consumers, enhancing system efficiency.<\/li>\n\n\n\n<li><strong>Asynchronous Processing<\/strong>: The system benefits from asynchronous processing, with RabbitMQ queuing change events for processing without delay.<\/li>\n\n\n\n<li><strong>Scalability<\/strong>: RabbitMQ\u2019s scalability features accommodate growing data volumes by facilitating easy addition of consumers and horizontal scaling.<\/li>\n\n\n\n<li><strong>Data Integrity<\/strong>: It ensures messages are processed in the order they\u2019re received, which is critical for data integrity.<\/li>\n\n\n\n<li><strong>Recovery Mechanisms<\/strong>: RabbitMQ offers message acknowledgment and redelivery features, vital for recovery from failures without data loss.<\/li>\n<\/ul>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"c041\">5. Hands-on CDC: Mongo+RabbitMQ<\/h2>\n\n\n\n<p id=\"ad14\">We are building the&nbsp;<code>RabbitMQConnection<\/code>&nbsp;class, a singleton structure, for establishing and managing connections to the RabbitMQ server. This class is robustly designed to handle connection parameters like username, password, queue name, host, port, and virtual_host, which can be customized or defaulted from settings.<\/p>\n\n\n\n<p id=\"f7b4\">Utilizing the&nbsp;<code>pika<\/code>&nbsp;Python library,&nbsp;<code>RabbitMQConnection<\/code>&nbsp;provides methods to connect, check connection status, retrieve channels, and close the connection. This improved approach encapsulates connection management within a singleton instance, ensuring efficient handling of RabbitMQ connections throughout the system lifecycle, from initialization to closure.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> from typing import Self import pika from config import settings from core.logger_utils import get_logger logger = get_logger(__file__) class RabbitMQConnection: \"\"\"Singleton class to manage RabbitMQ connection.\"\"\" _instance = None def __new__(cls, *args, **kwargs) -&gt; Self: if not cls._instance: cls._instance = super().__new__(cls, *args, **kwargs) return cls._instance def __init__( self, host: str | None = None, port: int | None = None, username: str | None = None, password: str | None = None, virtual_host: str = \"\/\", fail_silently: bool = False, **kwargs, ) -&gt; None: self.host = host or settings.RABBITMQ_HOST self.port = port or settings.RABBITMQ_PORT self.username = username or settings.RABBITMQ_DEFAULT_USERNAME self.password = password or settings.RABBITMQ_DEFAULT_PASSWORD self.virtual_host = virtual_host self.fail_silently = fail_silently self._connection = None def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def connect(self): try: credentials = pika.PlainCredentials(self.username, self.password) self._connection = pika.BlockingConnection( pika.ConnectionParameters( host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=credentials, ) ) except pika.exceptions.AMQPConnectionError as e: logger.exception(\"Failed to connect to RabbitMQ:\") if not self.fail_silently: raise e def is_connected(self) -&gt; bool: return self._connection is not None and self._connection.is_open def get_channel(self): if self.is_connected(): return self._connection.channel() def close(self): if self.is_connected(): self._connection.close() self._connection = None print(\"Closed RabbitMQ connection\u201d) <\/code><\/pre>\n\n\n\n<p id=\"da52\"><strong>Publishing to RabbitMQ<\/strong>: The&nbsp;<code>publish_to_rabbitmq<\/code>&nbsp;function is where the magic happens. It connects to RabbitMQ , ensures that the message delivery is confirmed for reliability, and then publishes the data.<\/p>\n\n\n\n<p>The&nbsp;<code>data<\/code>&nbsp;variable, which is expected to be a JSON string, represents the changes captured by MongoDB\u2019s CDC mechanism.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> def publish_to_rabbitmq(queue_name: str, data: str): \"\"\"Publish data to a RabbitMQ queue.\"\"\" try: # Create an instance of RabbitMQConnection rabbitmq_conn = RabbitMQConnection() # Establish connection with rabbitmq_conn: channel = rabbitmq_conn.get_channel() # Ensure the queue exists channel.queue_declare(queue=queue_name, durable=True) # Delivery confirmation channel.confirm_delivery() # Send data to the queue channel.basic_publish( exchange=\"\", routing_key=queue_name, body=data, properties=pika.BasicProperties( delivery_mode=2, # make message persistent ), ) print(\"Sent data to RabbitMQ:\", data) except pika.exceptions.UnroutableError: print(\"Message could not be routed\") except Exception as e: print(f\"Error publishing to RabbitMQ: {e}\")<\/code><\/pre>\n\n\n\n<p>For example, you can call it as:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> publish_to_rabbitmq(\"test_queue\", \"Hello, World!\") <\/code><\/pre>\n\n\n\n<p>\u2192 Full RabbitMQ code at&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/src\/core\/mq.py\" target=\"_blank\" rel=\"noreferrer noopener\"><strong>core\/mq.py<\/strong><\/a><strong>.<\/strong><\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"2275\">CDC Pattern in MongoDB<\/h3>\n\n\n\n<p id=\"1091\"><strong>Setting Up MongoDB Connection:<\/strong>&nbsp;The script connects to a MongoDB database using the&nbsp;<strong>MongoDatabaseConnector<\/strong>&nbsp;class. We instantiate the connection instance, which we will use to communicate with MongoDB.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> from pymongo import MongoClient from pymongo.errors import ConnectionFailure from core.config import settings from core.logger_utils import get_logger logger = get_logger(__file__) class MongoDatabaseConnector: \"\"\"Singleton class to connect to MongoDB database.\"\"\" _instance: MongoClient | None = None def __new__(cls, *args, **kwargs): if cls._instance is None: try: cls._instance = MongoClient(settings.MONGO_DATABASE_HOST) logger.info( f\"Connection to database with uri: {settings.MONGO_DATABASE_HOST} successful\" ) except ConnectionFailure: logger.error(f\"Couldn't connect to the database.\") raise return cls._instance def get_database(self): assert self._instance, \"Database connection not initialized\" return self._instance&#91;settings.MONGO_DATABASE_NAME] def close(self): if self._instance: self._instance.close() logger.info(\"Connected to database has been closed.\") connection = MongoDatabaseConnector() <\/code><\/pre>\n\n\n\n<p id=\"9848\"><strong>Monitoring Changes with&nbsp;<\/strong><code><strong>watch<\/strong><\/code>: The core of the CDC pattern in MongoDB is realized through the&nbsp;<code>watch<\/code>&nbsp;method. Here, the script sets up a change stream to monitor for specific types of changes in the database.<\/p>\n\n\n\n<p>In this case, it\u2019s configured to listen for&nbsp;<code>insert<\/code>&nbsp;operations in any collection within the&nbsp;<code>scrabble<\/code>&nbsp;database.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> changes = db.watch(&#91;{'$match': {'operationType': {'$in': &#91;'insert']}}}])\/code&gt;<\/code><\/pre>\n\n\n\n<p id=\"660f\"><strong>Processing Each Change<\/strong>: As changes occur in the database, the script iterates through each change event.<\/p>\n\n\n\n<p>The script extracts essential metadata for each event, like the data type (collection name) and the entry ID. It also reformats the document by removing the MongoDB-specific&nbsp;<code>_id<\/code>&nbsp;and appending the data type and entry ID. This formatting makes the data compatible with the feature pipeline.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> for change in changes:\ndata_type = change&#91;\"ns\"]&#91;\"coll\"]\nentry_id = str(change&#91;\"fullDocument\"]&#91;\"_id\"]) # Convert ObjectId to string\n\nchange&#91;\"fullDocument\"].pop(\"_id\")\nchange&#91;\"fullDocument\"]&#91;\"type\"] = data_type\nchange&#91;\"fullDocument\"]&#91;\"entry_id\"] = entry_id\n\nif data_type not in &#91;\"articles\", \"posts\", \"repositories\"]:\nlogging.info(f\"Unsupported data type: '{data_type}'\")\ncontinue <\/code><\/pre>\n\n\n\n<p><strong>Conversion to JSON and Publishing to RabbitMQ:<\/strong>&nbsp;The transformed document is converted to a JSON string (serialized) and sent to the RabbitMQ queue:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> data = json.dumps(change&#91;\"fullDocument\"], default=json_util.default) logger.info( f\"Change detected and serialized for a data sample of type {data_type}.\" ) publish_to_rabbitmq(queue_name=settings.RABBITMQ_QUEUE_NAME, data=data) logger.info(f\"Data of type '{data_type}' published to RabbitMQ.\") <\/code><\/pre>\n\n\n\n<p>\u2192 Full code available at&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/src\/data_cdc\/cdc.py\">data_cdc\/cdc.py<\/a><\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"3a00\">The full system docker-compose<\/h3>\n\n\n\n<p id=\"6c3d\">This&nbsp;<code>docker-compose<\/code>&nbsp;configuration outlines the setup for a system comprising a MongoDB database and a RabbitMQ message broker. The setup is designed to facilitate a development or testing environment using Docker containers.<\/p>\n\n\n\n<p>Let\u2019s walk through the critical components of this configuration:<\/p>\n\n\n\n<p id=\"227b\">This&nbsp;<code>docker-compose<\/code>&nbsp;configuration outlines the setup for a system comprising a MongoDB database and a RabbitMQ message broker. The setup is designed to facilitate a development or testing environment using Docker containers.<\/p>\n\n\n\n<p>Let\u2019s walk through the critical components of this configuration:<\/p>\n\n\n\n<p id=\"f081\"><strong>MongoDB Service Setup<\/strong><\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Image<\/strong>: Each MongoDB instance uses the\u00a0<code>mongo:5<\/code>\u00a0image, which is the official V5 MongoDB Docker image<\/li>\n\n\n\n<li><strong>Container Names<\/strong>: Individually named (<code>mongo1<\/code>,\u00a0<code>mongo2<\/code>,\u00a0<code>mongo3<\/code>) for easy identification.<\/li>\n\n\n\n<li><strong>Commands<\/strong>: Each instance is started with specific commands:<\/li>\n<\/ol>\n\n\n\n<ul class=\"wp-block-list\">\n<li><code>--replSet \"my-replica-set\"<\/code>\u00a0to set up a replica set named \u2018my-replica-set\u2019.<\/li>\n\n\n\n<li><code>--bind_ip_all<\/code>\u00a0to bind MongoDB to all IP addresses.<\/li>\n\n\n\n<li><code>--port 3000X<\/code>\u00a0(where X is 1, 2, or 3) to define distinct ports for each instance.<\/li>\n<\/ul>\n\n\n\n<p id=\"41e9\">Using<strong>&nbsp;three replicas in a MongoDB replica set<\/strong>&nbsp;is a common practice primarily for achieving high availability, data redundancy, and fault tolerance. Here\u2019s why having three replicas is beneficial:<\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>High Availability<\/strong>: In a replica set, one node is the primary node that handles all write operations, while the others are secondary nodes that replicate the data from the primary. If the primary node fails, one of the secondary nodes is automatically elected as the new primary. With three nodes, you ensure that there\u2019s always a secondary node available to take over if the primary fails, minimizing downtime.<\/li>\n\n\n\n<li><strong>Data Redundancy<\/strong>: Multiple copies of the data are maintained across different nodes. This redundancy safeguards against data loss in case of a hardware failure or corruption on one of the nodes.<\/li>\n\n\n\n<li><strong>Volumes<\/strong>: Maps to the\u00a0<em>mongo-replica-1-data, mongo-replica-2-data,<\/em>\u00a0and\u00a0<em>mongo-replica-3-data<\/em>\u00a0volumes managed by Docker. This ensures data persistence across container restarts.<\/li>\n\n\n\n<li><strong>Ports<\/strong>: Exposes each MongoDB instance on a unique port on the host machine (30001, 30002, 30003).<\/li>\n\n\n\n<li><strong>Healthcheck (only for\u00a0<\/strong><code><strong>mongo1<\/strong><\/code><strong>)<\/strong>: Regularly checks the health of the first MongoDB instance, ensuring the replica set is correctly initiated and operational.<\/li>\n<\/ol>\n\n\n\n<p id=\"2b30\"><strong>RabbitMQ Service Setup<\/strong><\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li><strong>Image and Container<\/strong>: Uses RabbitMQ 3 with management plugin based on Alpine Linux.<\/li>\n\n\n\n<li><strong>Ports<\/strong>: Exposes RabbitMQ on port\u00a0<code>5673<\/code>\u00a0for message queue communication and\u00a0<code>15673<\/code>\u00a0for management console access.<\/li>\n\n\n\n<li><strong>Volumes<\/strong>: Maps local directories for RabbitMQ data and log storage, ensuring persistence and easy access to logs.<\/li>\n\n\n\n<li><strong>Restart Policy<\/strong>: Like MongoDB, it\u2019s configured to\u00a0<em>always<\/em>\u00a0restart if it stops.<\/li>\n<\/ol>\n\n\n\n<pre class=\"wp-block-code\"><code> services: mongo1: image: mongo:5 container_name: llm-twin-mongo1 command: &#91;\"--replSet\", \"my-replica-set\", \"--bind_ip_all\", \"--port\", \"30001\"] volumes: - mongo-replica-1-data:\/data\/db ports: - \"30001:30001\" healthcheck: test: test $$(echo \"rs.initiate({_id:'my-replica-set',members:&#91;{_id:0,host:\\\"mongo1:30001\\\"},{_id:1,host:\\\"mongo2:30002\\\"},{_id:2,host:\\\"mongo3:30003\\\"}]}).ok || rs.status().ok\" | mongo --port 30001 --quiet) -eq 1 interval: 10s start_period: 30s restart: always mongo2: image: mongo:5 container_name: llm-twin-mongo2 command: &#91;\"--replSet\", \"my-replica-set\", \"--bind_ip_all\", \"--port\", \"30002\"] volumes: - mongo-replica-2-data:\/data\/db ports: - \"30002:30002\" restart: always mongo3: ... # Another read-only replica similar to mongo2 mq: image: rabbitmq:3-management-alpine container_name: llm-twin-mq ports: - \"5673:5672\" - \"15673:15672\" volumes: - .\/rabbitmq\/data\/:\/var\/lib\/rabbitmq\/ - .\/rabbitmq\/log\/:\/var\/log\/rabbitmq restart: always qdrant: ... data-crawlers: ... data_cdc: image: \"llm-twin-data-cdc\" container_name: llm-twin-data-cdc build: context: . dockerfile: .docker\/Dockerfile.data_cdc env_file: - .env depends_on: - mongo1 - mongo2 - mongo3 - mq feature_pipeline: ... volumes: mongo-replica-1-data: mongo-replica-2-data: mongo-replica-3-data: qdrant-data: <\/code><\/pre>\n\n\n\n<p>\u2192 Full Docker compose file available at&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/docker-compose.yml\">docker-compose.yml<\/a><\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"7589\">6. Running the CDC microservice<\/h2>\n\n\n\n<p>The CDC microservice will run automatically when starting the Docker containers defined in the Docker compose file from above.<\/p>\n\n\n\n<p>To build and run the Docker images, run the following:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> make local-start<\/code><\/pre>\n\n\n\n<p>This will start by default the CDC microservice, which will listen to changes done to the MongoDB and send them to the RabbitMQ queue.<\/p>\n\n\n\n<p>For&nbsp;<strong>macOS\/Linux<\/strong>&nbsp;users, for the multi-replica set-up to work correctly, you have to add the following lines of code to your \/etc\/hosts file:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> 127.0.0.1 mongo1 127.0.0.1 mongo2 127.0.0.1 mongo3<\/code><\/pre>\n\n\n\n<p>\u2192 More details in our&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/INSTALL_AND_USAGE.md\">INSTALL_AND_USAGE<\/a>&nbsp;docs.<\/p>\n\n\n\n<p>To test it out, trigger the crawlers to send some data to MongoDB as follows:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> make local-test-medium # or make local-test-github<\/code><\/pre>\n\n\n\n<p>This will crawl a Medium (or GitHub) article, which will be saved to MongoDB, trigger the CDC service, and send the event to the RabbitMQ queue.<\/p>\n\n\n\n<p>You can check the logs of the Docker containers by running:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> docker logs llm-twin-data-crawlers # Crawler service docker logs llm-twin-data-cdc # CDC service docker logs llm-twin-mq # RabbitMQ<\/code><\/pre>\n\n\n\n<p id=\"50b5\">If everything runs as expected, you should see in the logs of the CDC service something similar to the image below:<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:2000\/1*Abqh7XZ_-j4hN8r7J-4ffA.png\" alt=\"\"\/><figcaption class=\"wp-element-caption\">Screenshot after running&nbsp;`docker logs llm-twin-data-cdc`&nbsp;in the CLI.<\/figcaption><\/figure>\n\n\n\n<p>Find&nbsp;<strong>step-by-step instructions<\/strong>&nbsp;on installing and running&nbsp;<strong>the entire course<\/strong>&nbsp;in our&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/INSTALL_AND_USAGE.md\">INSTALL_AND_USAGE<\/a>&nbsp;document from the repository.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"cdb5\">Conclusion<\/h2>\n\n\n\n<p>This lesson presented the Change Data Capture (CDC) pattern, a powerful strategy for synchronizing data across multiple databases crucial for maintaining real-time data consistency in event-driven systems.<\/p>\n\n\n\n<p>We showed how to implement the CDC pattern using a MongoDB data warehouse and a RabbitMQ queue.<\/p>\n\n\n\n<p>As this lesson is part of the LLM Twin course, we presented how to integrate the CDC microservice into a larger system that contains data and feature engineer pipelines through Docker containers.<\/p>\n\n\n\n<p>In Lesson 4, we will explore the feature pipeline, which will be implemented as a streaming pipeline using Bytewax. It will consume real-time events from the RabbitMQ queue and process them to fine-tune LLMs and RAG, ultimately loading the processed data into a vector DB.<\/p>\n\n\n\n<p><em>\ud83d\udd17&nbsp;<\/em><strong><em>Check out&nbsp;<\/em><\/strong><a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\" target=\"_blank\" rel=\"noreferrer noopener\"><em>the code on GitHub<\/em><\/a><em>&nbsp;[1] and support us with a&nbsp;<\/em>\u2b50\ufe0f<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">References<\/h2>\n\n\n\n<p id=\"ba69\">[1]&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\" target=\"_blank\" rel=\"noreferrer noopener\">Your LLM Twin Course \u2014 GitHub Repository<\/a>&nbsp;(2024), Decoding ML GitHub Organization<br>[2]&nbsp;<a href=\"https:\/\/www.mongodb.com\/docs\/manual\/changeStreams\/\" target=\"_blank\" rel=\"noreferrer noopener\">Change Streams<\/a>, MongoDB Documentation<br>[3]Shantanu Bansal,&nbsp;<a href=\"https:\/\/shantanubansal.medium.com\/demystifying-mongodb-oplog-a-comprehensive-guide-with-oplog-entry-examples-3bd716789f78\">Demystifying MongoDB Oplog: A Comprehensive Guide with Oplog Entry Examples<\/a>, 2023, Medium<br>[4]&nbsp;<a href=\"https:\/\/www.mongodb.com\/basics\/change-streams\" target=\"_blank\" rel=\"noreferrer noopener\">How Do Change Streams Work in MongoDB?<\/a>, MongoDB Documentation<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Welcome to Lesson 3 of 12 in our free course series, LLM Twin: Building Your Production-Ready AI Replica. You\u2019ll learn how to use LLMs, vector DVs, and LLMOps best practices to design, train, and deploy a production ready \u201cLLM twin\u201d of yourself. This AI character will write like you, incorporating your style, personality, and voice [&hellip;]<\/p>\n","protected":false},"author":128,"featured_media":9720,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"customer_name":"","customer_description":"","customer_industry":"","customer_technologies":"","customer_logo":"","footnotes":""},"categories":[65,6,7],"tags":[88,72,30,15,71,52,31,16,53],"coauthors":[222,223],"class_list":["post-9785","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-llmops","category-machine-learning","category-tutorials","tag-change-data-capture","tag-chatbots","tag-deep-learning","tag-deep-learning-experiment-management","tag-language-models","tag-llm","tag-llmops","tag-ml-experiment-management","tag-mlops"],"yoast_head":"<!-- This site is optimized with the Yoast SEO Premium plugin v25.9 (Yoast SEO v25.9) - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>Change Data Capture for LLM-Powered Applications<\/title>\n<meta name=\"description\" content=\"Follow these steps to synchronize data for an LLM system using change data capture.\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"I Replaced 1000 Lines of Polling Code with 50 Lines of CDC Magic\" \/>\n<meta property=\"og:description\" content=\"Follow these steps to synchronize data for an LLM system using change data capture.\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/\" \/>\n<meta property=\"og:site_name\" content=\"Comet\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/cometdotml\" \/>\n<meta property=\"article:published_time\" content=\"2024-04-15T19:13:17+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2025-04-29T12:33:03+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/04\/Screenshot-2024-04-03-at-4.40.11\u202fPM-1024x584.png\" \/>\n\t<meta property=\"og:image:width\" content=\"1024\" \/>\n\t<meta property=\"og:image:height\" content=\"584\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/png\" \/>\n<meta name=\"author\" content=\"Paul Iusztin, Decoding ML\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@Cometml\" \/>\n<meta name=\"twitter:site\" content=\"@Cometml\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"Paul Iusztin, Decoding ML\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"14 minutes\" \/>\n<!-- \/ Yoast SEO Premium plugin. -->","yoast_head_json":{"title":"Change Data Capture for LLM-Powered Applications","description":"Follow these steps to synchronize data for an LLM system using change data capture.","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/","og_locale":"en_US","og_type":"article","og_title":"I Replaced 1000 Lines of Polling Code with 50 Lines of CDC Magic","og_description":"Follow these steps to synchronize data for an LLM system using change data capture.","og_url":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/","og_site_name":"Comet","article_publisher":"https:\/\/www.facebook.com\/cometdotml","article_published_time":"2024-04-15T19:13:17+00:00","article_modified_time":"2025-04-29T12:33:03+00:00","og_image":[{"width":1024,"height":584,"url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/04\/Screenshot-2024-04-03-at-4.40.11\u202fPM-1024x584.png","type":"image\/png"}],"author":"Paul Iusztin, Decoding ML","twitter_card":"summary_large_image","twitter_creator":"@Cometml","twitter_site":"@Cometml","twitter_misc":{"Written by":"Paul Iusztin, Decoding ML","Est. reading time":"14 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#article","isPartOf":{"@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/"},"author":{"name":"Paul Iusztin","@id":"https:\/\/www.comet.com\/site\/#\/schema\/person\/87bf0cb600025605b68dcd2f0d597560"},"headline":"I Replaced 1000 Lines of Polling Code with 50 Lines of CDC Magic","datePublished":"2024-04-15T19:13:17+00:00","dateModified":"2025-04-29T12:33:03+00:00","mainEntityOfPage":{"@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/"},"wordCount":3012,"publisher":{"@id":"https:\/\/www.comet.com\/site\/#organization"},"image":{"@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#primaryimage"},"thumbnailUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/04\/Screenshot-2024-04-03-at-4.40.11\u202fPM.png","keywords":["Change Data Capture","Chatbots","Deep Learning","Deep Learning Experiment Management","Language Models","LLM","LLMOps","ML Experiment Management","MLOps"],"articleSection":["LLMOps","Machine Learning","Tutorials"],"inLanguage":"en-US"},{"@type":"WebPage","@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/","url":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/","name":"Change Data Capture for LLM-Powered Applications","isPartOf":{"@id":"https:\/\/www.comet.com\/site\/#website"},"primaryImageOfPage":{"@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#primaryimage"},"image":{"@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#primaryimage"},"thumbnailUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/04\/Screenshot-2024-04-03-at-4.40.11\u202fPM.png","datePublished":"2024-04-15T19:13:17+00:00","dateModified":"2025-04-29T12:33:03+00:00","description":"Follow these steps to synchronize data for an LLM system using change data capture.","breadcrumb":{"@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#primaryimage","url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/04\/Screenshot-2024-04-03-at-4.40.11\u202fPM.png","contentUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/04\/Screenshot-2024-04-03-at-4.40.11\u202fPM.png","width":1626,"height":928},{"@type":"BreadcrumbList","@id":"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.comet.com\/site\/"},{"@type":"ListItem","position":2,"name":"I Replaced 1000 Lines of Polling Code with 50 Lines of CDC Magic"}]},{"@type":"WebSite","@id":"https:\/\/www.comet.com\/site\/#website","url":"https:\/\/www.comet.com\/site\/","name":"Comet","description":"Build Better Models Faster","publisher":{"@id":"https:\/\/www.comet.com\/site\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/www.comet.com\/site\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/www.comet.com\/site\/#organization","name":"Comet ML, Inc.","alternateName":"Comet","url":"https:\/\/www.comet.com\/site\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.comet.com\/site\/#\/schema\/logo\/image\/","url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2025\/01\/logo_comet_square.png","contentUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2025\/01\/logo_comet_square.png","width":310,"height":310,"caption":"Comet ML, Inc."},"image":{"@id":"https:\/\/www.comet.com\/site\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/cometdotml","https:\/\/x.com\/Cometml","https:\/\/www.youtube.com\/channel\/UCmN63HKvfXSCS-UwVwmK8Hw"]},{"@type":"Person","@id":"https:\/\/www.comet.com\/site\/#\/schema\/person\/87bf0cb600025605b68dcd2f0d597560","name":"Paul Iusztin","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.comet.com\/site\/#\/schema\/person\/image\/0bb2983de08cbe4fe43fad876af41aee","url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/cropped-1664517339716-96x96.jpg","contentUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/cropped-1664517339716-96x96.jpg","caption":"Paul Iusztin"},"sameAs":["https:\/\/decodingml.substack.com\/"],"url":"https:\/\/www.comet.com\/site\/blog\/author\/paul-iusztin\/"}]}},"_links":{"self":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts\/9785","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/users\/128"}],"replies":[{"embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/comments?post=9785"}],"version-history":[{"count":2,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts\/9785\/revisions"}],"predecessor-version":[{"id":15792,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts\/9785\/revisions\/15792"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/media\/9720"}],"wp:attachment":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/media?parent=9785"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/categories?post=9785"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/tags?post=9785"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/coauthors?post=9785"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}