{"id":9847,"date":"2024-04-24T15:24:26","date_gmt":"2024-04-24T23:24:26","guid":{"rendered":"https:\/\/live-cometml.pantheonsite.io\/?p=9847"},"modified":"2025-04-29T12:25:27","modified_gmt":"2025-04-29T12:25:27","slug":"streaming-pipelines-for-fine-tuning-llms","status":"publish","type":"post","link":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/","title":{"rendered":"SOTA Python Streaming Pipelines for Fine-tuning LLMs and RAG &#8211; in Real-Time!"},"content":{"rendered":"\n<p><em>Welcome to Lesson 4 of 12 in our free course series, LLM Twin: Building Your Production-Ready AI Replica. You\u2019ll learn how to use LLMs, vector DVs, and LLMOps best practices to design, train, and deploy a production ready \u201cLLM twin\u201d of yourself. This AI character will write like you, incorporating your style, personality, and voice into an LLM. For a full overview of course objectives and prerequisites, start with&nbsp;<a href=\"https:\/\/www.comet.com\/site\/blog\/an-end-to-end-framework-for-production-ready-llm-systems-by-building-your-llm-twin\/\">Lesson 1.<\/a><\/em><\/p>\n\n\n\n<p><strong>Lessons<\/strong><\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/an-end-to-end-framework-for-production-ready-llm-systems-by-building-your-llm-twin\/\">An End-to-End Framework for Production-Ready LLM Systems by Building Your LLM Twin<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/the-importance-of-data-pipelines-in-the-era-of-generative-ai\/\">Your Content is Gold: I Turned 3 Years of Blog Posts into an LLM Training<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/\">I Replaced 1000 Lines of Polling Code with 50 Lines of CDC Magic<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/\">SOTA Python Streaming Pipelines for Fine-tuning LLMs and RAG \u2014 in Real-Time!<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/advanced-rag-algorithms-optimize-retrieval\/\">The 4 Advanced RAG Algorithms You Must Know to Implement<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-fine-tuning-dataset\/\">Turning Raw Data Into Fine-Tuning Datasets<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/mistral-llm-fine-tuning\/\">8B Parameters, 1 GPU, No Problems: The Ultimate LLM Fine-tuning Pipeline<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-evaluation-best-practices\/\">The Engineer\u2019s Framework for LLM &amp; RAG Evaluation<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-rag-inference-pipelines\/\">Beyond Proof of Concept: Building RAG Systems That Scale<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/rag-evaluation-framework-ragas\/\">The Ultimate Prompt Monitoring Pipeline<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/refactoring-rag-retrieval\/\">[Bonus] Build a scalable RAG ingestion pipeline using 74.3% less code<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/multi-index-rag-apps\/\">[Bonus] Build Multi-Index Advanced RAG Apps<\/a><\/li>\n<\/ol>\n\n\n\n<p id=\"33d3\">In the&nbsp;<strong>4th lesson<\/strong>, we will focus on the&nbsp;<strong>feature pipeline.<\/strong><\/p>\n\n\n\n<p id=\"7fe5\">The&nbsp;<strong>feature pipeline<\/strong>&nbsp;is the&nbsp;<strong>first<\/strong>&nbsp;<strong>pipeline<\/strong>&nbsp;presented in the&nbsp;<strong>3 pipeline architecture<\/strong>: feature, training and inference pipelines.<\/p>\n\n\n\n<p id=\"1d57\">A&nbsp;<strong>feature pipeline<\/strong>&nbsp;takes raw data as input, processes it into features, and stores it in a feature store, from which the training &amp; inference pipelines will use it.<\/p>\n\n\n\n<p id=\"6c32\">The component is completely isolated from the training and inference code. All the communication is done through the feature store.<\/p>\n\n\n\n<p><em>To avoid repeating myself, if you are&nbsp;<strong>unfamiliar<\/strong>&nbsp;with the&nbsp;<strong>3 pipeline architecture<\/strong>, check out Lesson 1 for a refresher.<\/em><\/p>\n\n\n\n<p id=\"6a77\">By the&nbsp;<strong>end of this<\/strong>&nbsp;<strong>article<\/strong>, you will&nbsp;<strong>learn<\/strong>&nbsp;to&nbsp;<strong>design<\/strong>&nbsp;and&nbsp;<strong>build<\/strong>&nbsp;a&nbsp;<strong>production-ready feature pipeline<\/strong>&nbsp;that:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>uses\u00a0<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>\u00a0as a stream engine to process data in real-time;<\/li>\n\n\n\n<li>ingests data from a\u00a0<a href=\"https:\/\/www.rabbitmq.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">RabbitMQ queue<\/a>;<\/li>\n\n\n\n<li>uses SWE practices to process multiple data types: posts, articles, code;<\/li>\n\n\n\n<li>cleans, chunks, and embeds data for LLM fine-tuning and RAG;<\/li>\n\n\n\n<li>loads the features to a\u00a0<a href=\"https:\/\/qdrant.tech\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant vector DB<\/a>.<\/li>\n<\/ul>\n\n\n\n<p><em>Note: In our use case, the&nbsp;<strong>feature pipeline<\/strong>&nbsp;is also a&nbsp;<strong>streaming pipeline<\/strong>, as we use a&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\">Bytewax<\/a>&nbsp;streaming engine. Thus, we will use these words&nbsp;<strong>interchangeably<\/strong>.<\/em><\/p>\n\n\n\n<p id=\"ca33\">We will&nbsp;<strong>wrap up Lesson 4<\/strong>&nbsp;by showing you how to&nbsp;<strong>deploy<\/strong>&nbsp;the feature pipeline to&nbsp;<strong>AWS&nbsp;<\/strong>and integrate it with the components from previous lessons: data collection pipeline, MongoDB, and CDC.<\/p>\n\n\n\n<p id=\"5285\">In the&nbsp;<strong>5th lesson<\/strong>, we will go through the vector DB retrieval client, where we will teach you how to query the vector DB and improve the accuracy of the results using advanced retrieval techniques.<\/p>\n\n\n\n<p id=\"eebb\"><em>Excited? Let\u2019s get started!<\/em><\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*EcFFO8975n3LL51aB7BcbQ.png\" alt=\"Diagram of Streaming Pipelines for Fine-tuning LLMs and RAG in Real-Time\"\/><figcaption class=\"wp-element-caption\">The architecture of feature\/streaming pipelines.<\/figcaption><\/figure>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"b518\">Table of Contents<\/h2>\n\n\n\n<ol class=\"wp-block-list\">\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#a987\">Why are we doing this?<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#d979\">System design of the feature pipeline<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#e769\">The Bytewax streaming flow<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#a51d\">Pydantic data models<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#2684\">Load data to Qdrant<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#f058\">The dispatcher layer<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#fefe\">Preprocessing steps: Clean, chunk, embed<\/a><\/li>\n\n\n\n<li><a href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#4a40\">Run the feature pipeline<\/a><\/li>\n<\/ol>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"a987\">1. Why are we creating a streaming pipeline?<\/h2>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"4ba4\">A quick reminder from previous lessons<\/h3>\n\n\n\n<p id=\"f0af\">To give you some context, in&nbsp;<a href=\"https:\/\/medium.com\/decodingml\/the-importance-of-data-pipelines-in-the-era-of-generative-ai-673e1505a861\">Lesson 2<\/a>, we crawl data from LinkedIn, Medium, and GitHub, normalize it, and load it to MongoDB.<\/p>\n\n\n\n<p id=\"ef45\">In&nbsp;<a href=\"https:\/\/medium.com\/decodingml\/the-3nd-out-of-11-lessons-of-the-llm-twin-free-course-ba82752dad5a\">Lesson 3<\/a>, we are using CDC to listen to changes to the MongoDB database and emit events in a RabbitMQ queue based on any CRUD operation done on MongoDB.<\/p>\n\n\n\n<p id=\"ac40\">\u2026and here we are in&nbsp;<strong>Lesson 4<\/strong>, where we are building the feature pipeline that listens 24\/7 to the&nbsp;<a href=\"https:\/\/www.rabbitmq.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">RabbitMQ<\/a>&nbsp;queue for new events to process and load them to a&nbsp;<a href=\"https:\/\/qdrant.tech\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant vector DB<\/a>.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"00ef\">The problem we are solving<\/h3>\n\n\n\n<p id=\"326a\">In our LLM Twin use case, the&nbsp;<strong>feature pipeline<\/strong>&nbsp;constantly syncs the MongoDB warehouse with the Qdrant vector DB while processing the raw data into features.<\/p>\n\n\n\n<p><em><strong>Important<\/strong>: In our use case, the Qdrant vector DB will be our feature store.<\/em><\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"716f\">Why we are solving it<\/h3>\n\n\n\n<p id=\"1236\">The&nbsp;<strong>feature store<\/strong>&nbsp;will be the&nbsp;<strong>central point of access<\/strong>&nbsp;for all the features used within the training and inference pipelines.<\/p>\n\n\n\n<p><em>For consistency and simplicity, we will refer to different formats of our text data as \u201cfeatures.\u201d<\/em><\/p>\n\n\n\n<p id=\"c17b\">\u2192 The&nbsp;<strong>training pipeline<\/strong>&nbsp;will use the feature store to create&nbsp;<strong>fine-tunin<\/strong>g datasets for your&nbsp;<strong>LLM<\/strong>&nbsp;<strong>twin<\/strong>.<\/p>\n\n\n\n<p id=\"05dd\">\u2192 The&nbsp;<strong>inference pipeline<\/strong>&nbsp;will use the feature store for&nbsp;<strong>RAG<\/strong>.<\/p>\n\n\n\n<p id=\"56b1\">For reliable results (especially for RAG), the data from the vector DB must always be in sync with the data from the data warehouse.<\/p>\n\n\n\n<p id=\"06ca\">The question is, what is the best way to sync these 2?<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"821e\">Other potential solutions<\/h3>\n\n\n\n<p id=\"cc6b\">The&nbsp;<strong>most common solution<\/strong>&nbsp;is probably to use a batch pipeline that constantly polls from the warehouse, computes a difference between the 2 databases, and updates the target database.<\/p>\n\n\n\n<p id=\"f820\"><em>The issue with this technique<\/em>&nbsp;is that computing the difference between the 2 databases is extremely slow and costly.<\/p>\n\n\n\n<p id=\"d457\"><strong>Another solution<\/strong>&nbsp;is to use a push technique using a webhook. Thus, on any CRUD change in the warehouse, you also update the source DB.<\/p>\n\n\n\n<p id=\"e9c2\"><em>The biggest issue here<\/em>&nbsp;is that if the webhook fails, you have to implement complex recovery logic.<\/p>\n\n\n\n<p><em><a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/\">Lesson 3<\/a>&nbsp;on CDC covers more of this.<\/em><\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"d979\">2. System design of the feature pipeline: our solution<\/h2>\n\n\n\n<p id=\"0a3d\"><em>Our&nbsp;<\/em><strong><em>solution<\/em><\/strong><em>&nbsp;is based on&nbsp;<\/em><strong><em>CDC<\/em><\/strong><em>, a&nbsp;<\/em><strong><em>queue,&nbsp;<\/em><\/strong><em>a&nbsp;<\/em><strong><em>streaming engine,&nbsp;<\/em><\/strong><em>and a&nbsp;<\/em><strong><em>vector DB:<\/em><\/strong><\/p>\n\n\n\n<p id=\"02bc\">\u2192 CDC adds any change made to the Mongo DB to the queue (read more in&nbsp;<a href=\"https:\/\/www.comet.com\/site\/blog\/llm-twin-3-change-data-capture\/\">Lesson 3<\/a>).<\/p>\n\n\n\n<p id=\"ae4a\">\u2192 the&nbsp;<a href=\"https:\/\/www.rabbitmq.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">RabbitMQ<\/a>&nbsp;queue stores all the events until they are processed.<\/p>\n\n\n\n<p id=\"773c\">\u2192 The&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax streaming engine<\/a>&nbsp;cleans, chunks, and embeds the data.<\/p>\n\n\n\n<p id=\"975f\">\u2192 A streaming engine works naturally with a queue-based system.<\/p>\n\n\n\n<p id=\"71a6\">\u2192 The data is uploaded to a&nbsp;<a href=\"https:\/\/qdrant.tech\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant vector DB<\/a>&nbsp;on the fly<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"9ae5\"><strong>Why is this feature pipeline design powerful?<\/strong><\/h3>\n\n\n\n<p id=\"308b\">Here are 4 core reasons:<\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li>The\u00a0<strong>data<\/strong>\u00a0is\u00a0<strong>processed<\/strong>\u00a0in\u00a0<strong>real-time<\/strong>.<\/li>\n\n\n\n<li><strong>Out-of-the-box recovery system:<\/strong>\u00a0If the streaming pipeline fails to process a message will be added back to the queue<\/li>\n\n\n\n<li><strong>Lightweight:<\/strong>\u00a0No need for any diffs between databases or batching too many records<\/li>\n\n\n\n<li><strong>No I\/O bottlenecks<\/strong>\u00a0on the source database<\/li>\n<\/ol>\n\n\n\n<p id=\"a9bb\">\u2192&nbsp;<strong>It solves all our problems!<\/strong><\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*EcFFO8975n3LL51aB7BcbQ.png\" alt=\"Diagram of Streaming Pipelines for Fine-tuning LLMs and RAG in Real-Time\"\/><figcaption class=\"wp-element-caption\">The architecture of the feature\/streaming pipeline.<\/figcaption><\/figure>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"d3a7\">How is the data stored in our streaming pipeline?<\/h3>\n\n\n\n<p id=\"d3c4\"><em>We<\/em><strong><em>&nbsp;store 2 snapshots&nbsp;<\/em><\/strong><em>of our<\/em><strong><em>&nbsp;data&nbsp;<\/em><\/strong><em>in the<\/em><strong><em>&nbsp;feature store. Here is why \u2193<\/em><\/strong><\/p>\n\n\n\n<p id=\"156d\">Remember that we said that the training and inference pipeline will access the features only from the feature store, which, in our case, is the Qdrant vector DB?<\/p>\n\n\n\n<p id=\"d49f\">Well, if we had stored only the chunked &amp; embedded version of the data, that would have been useful only for RAG but not for fine-tuning.<\/p>\n\n\n\n<p id=\"3448\">Thus, we make an additional snapshot of the cleaned data, which will be used by the training pipeline.<\/p>\n\n\n\n<p id=\"7b11\">Afterward, we pass it down the streaming flow for chunking &amp; embedding.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"f274\">How do we process multiple data types for our feature pipeline?<\/h3>\n\n\n\n<p id=\"74d7\">How do you&nbsp;<strong>process multiple types<\/strong>&nbsp;<strong>of<\/strong>&nbsp;<strong>data<\/strong>&nbsp;in a&nbsp;<strong>single streaming pipeline<\/strong>&nbsp;<strong>without<\/strong>&nbsp;<strong>writing<\/strong>&nbsp;<strong>spaghetti code<\/strong>?<\/p>\n\n\n\n<p id=\"c840\">Yes, that is for you, data scientists!&nbsp;<strong>Joking\u2026<\/strong>am I<strong>?<\/strong><\/p>\n\n\n\n<p id=\"975c\">We have&nbsp;<strong>3 data types<\/strong>: posts, articles, and code.<\/p>\n\n\n\n<p id=\"4ebc\"><strong>Each data type<\/strong>&nbsp;(and its state) will be&nbsp;<strong>modeled<\/strong>&nbsp;using&nbsp;<strong>Pydantic<\/strong>&nbsp;<strong>models<\/strong>.<\/p>\n\n\n\n<p id=\"3332\">To&nbsp;<strong>process<\/strong>&nbsp;them we will write a&nbsp;<strong>dispatcher layer<\/strong>, which will use a&nbsp;<a href=\"https:\/\/refactoring.guru\/design-patterns\/abstract-factory\" target=\"_blank\" rel=\"noreferrer noopener\"><strong>creational<\/strong>&nbsp;<strong>factory<\/strong>&nbsp;<strong>pattern<\/strong><\/a><strong>&nbsp;<\/strong>[9]<strong>&nbsp;<\/strong>to&nbsp;<strong>instantiate<\/strong>&nbsp;a&nbsp;<strong>handler<\/strong>&nbsp;implemented for that&nbsp;<strong>specific data type<\/strong>&nbsp;(post, article, code) and&nbsp;<strong>operation<\/strong>&nbsp;(cleaning, chunking, embedding).<\/p>\n\n\n\n<p id=\"ecb0\">The&nbsp;<strong>handler<\/strong>&nbsp;follows the&nbsp;<a href=\"https:\/\/refactoring.guru\/design-patterns\/strategy\" target=\"_blank\" rel=\"noreferrer noopener\"><strong>strategy behavioral pattern<\/strong><\/a><strong>&nbsp;<\/strong>[10]<strong>.<\/strong><\/p>\n\n\n\n<p id=\"d6c0\"><strong>Intuitively<\/strong>, you can see the&nbsp;<strong>combination between<\/strong>&nbsp;the&nbsp;<strong>factory<\/strong>&nbsp;and&nbsp;<strong>strategy<\/strong>&nbsp;<strong>patterns<\/strong>&nbsp;as follows:<\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li>Initially, we know we want to clean the data, but as we don\u2019t know the data type, we can\u2019t know how to do so.<\/li>\n\n\n\n<li>What we can do, is write the whole code around the cleaning code and abstract away the login under a\u00a0<em>Handler()<\/em>\u00a0interface (aka the strategy).<\/li>\n\n\n\n<li>When we get a data point, the factory class creates the right cleaning handler based on its type.<\/li>\n\n\n\n<li>Ultimately the handler is injected into the rest of the system and executed.<\/li>\n<\/ol>\n\n\n\n<p id=\"b397\">By doing so, we can easily isolate the logic for a given data type &amp; operation while leveraging polymorphism to avoid filling up the code with 1000x \u201cif else\u201d statements.<\/p>\n\n\n\n<p id=\"0591\">We will dig into the implementation in future sections.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"2f06\">Streaming over batch<\/h3>\n\n\n\n<p id=\"6791\">You may ask why we need a streaming engine instead of implementing a batch job that polls the messages at a given frequency.<\/p>\n\n\n\n<p id=\"5572\">That is a valid question.<\/p>\n\n\n\n<p id=\"01cf\">The thing is that\u2026<\/p>\n\n\n\n<p id=\"f78f\">Nowadays, using tools such as&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>&nbsp;makes implementing streaming pipelines a lot more frictionless than using their JVM alternatives.<\/p>\n\n\n\n<p id=\"4ef6\">The key aspect of choosing a streaming vs. a batch design is real-time synchronization between your source and destination DBs.<\/p>\n\n\n\n<p id=\"4da8\">In our particular case, we will process social media data, which changes fast and irregularly.<\/p>\n\n\n\n<p id=\"bb83\">Also, for our digital twin, it is important to do RAG on up-to-date data. We don\u2019t want to have any delay between what happens in the real world and what your LLM twin sees.<\/p>\n\n\n\n<p id=\"5e6f\">That being said choosing a streaming architecture seemed natural in our use case.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"e769\">3. The Bytewax streaming flow for a feature pipeline<\/h2>\n\n\n\n<p id=\"ae1a\">The&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\"><strong>Bytewax<\/strong><\/a><strong>&nbsp;flow<\/strong>&nbsp;is the&nbsp;<strong>central point<\/strong>&nbsp;of the&nbsp;<strong>streaming pipeline<\/strong>. It defines all the required steps, following the next simplified pattern:&nbsp;<em>\u201cinput -&gt; processing -&gt; output\u201d.<\/em><\/p>\n\n\n\n<p id=\"97cb\">As I come from the AI world, I like to see it as the&nbsp;<strong>\u201cgraph\u201d<\/strong>&nbsp;of the&nbsp;<strong>streaming pipeline<\/strong>, where you use the&nbsp;<em>input()<\/em>,&nbsp;<em>map()<\/em>, and&nbsp;<em>output()<\/em>&nbsp;Bytewax functions to define your graph, which in the&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\"><strong>Bytewax<\/strong><\/a><strong>&nbsp;world<\/strong>&nbsp;is&nbsp;<strong>called<\/strong>&nbsp;a&nbsp;<strong><em>\u201cflow\u201d<\/em><\/strong>.<\/p>\n\n\n\n<p id=\"2bf3\">As you can see in the code snippet below, we ingest posts, articles or code messages from a&nbsp;<a href=\"https:\/\/www.rabbitmq.com\/\" target=\"_blank\" rel=\"noreferrer noopener\">RabbitMQ queue<\/a>. After we clean, chunk and embed them. Ultimately, we load the cleaned and embedded data to a&nbsp;<a href=\"https:\/\/qdrant.tech\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant vector DB<\/a>, which in our LLM twin use case will represent the feature store of our system.<\/p>\n\n\n\n<p id=\"3f20\">To structure and validate the data, between each Bytewax step, we map and pass a different Pydantic model based on its current state: raw, cleaned, chunked, or embedded.<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*RpE4JpcCFtlkBM7U48Zi4w.png\" alt=\"Code used to create real-time streaming pipeline\/feature pipeline with CDC\"\/><figcaption class=\"wp-element-caption\">Bytewax flow \u2192&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/test-module-3\/course\/module-3\/data_flow\/bytewax_pipeline.py\">GitHub Code \u20ea\u2190<\/a><\/figcaption><\/figure>\n\n\n\n<p id=\"5005\">We have a single streaming pipeline that processes everything.<\/p>\n\n\n\n<p id=\"b73c\">As we ingest multiple data types (posts, articles, or code snapshots), we have to process them differently.<\/p>\n\n\n\n<p id=\"05af\">To do this the right way, we implemented a dispatcher layer that knows how to apply data-specific operations based on the type of message.<\/p>\n\n\n\n<p id=\"baa9\">More on this in the next sections \u2193<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"903b\">Why Bytewax?<\/h3>\n\n\n\n<p id=\"8520\"><a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\"><em>Bytewax<\/em><\/a><em>&nbsp;is an open-source streaming processing framework that:<\/em><br>\u2013 is built in&nbsp;<strong>Rust<\/strong>&nbsp;\u2699\ufe0f for&nbsp;<strong>performance<\/strong><br>\u2013 has&nbsp;<strong>Python<\/strong>&nbsp;\ud83d\udc0d bindings for leveraging its powerful ML ecosystem<\/p>\n\n\n\n<p id=\"03e3\">\u2026 so, for all the Python fanatics out there, no more JVM headaches for you.<\/p>\n\n\n\n<p id=\"b343\">Jokes aside, here is why&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>&nbsp;is so powerful \u2193<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>\u00a0local setup is plug-and-play<\/li>\n\n\n\n<li>Can quickly be integrated into any Python project (you can go wild \u2014 even use it in Notebooks)<\/li>\n\n\n\n<li>Can easily be integrated with other Python packages (NumPy, PyTorch, HuggingFace, OpenCV, SkLearn, you name it)<\/li>\n\n\n\n<li>Out-of-the-box connectors for Kafka and local files, or you can quickly implement your own<\/li>\n<\/ul>\n\n\n\n<p id=\"3130\">We used&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>&nbsp;to build the streaming pipeline for the LLM Twin course and loved it.<\/p>\n\n\n\n<p><em>To&nbsp;<strong>learn more<\/strong>&nbsp;about&nbsp;<strong>Bytewax<\/strong>, go and&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\">check them out.<\/a>&nbsp;They are&nbsp;<strong>open source<\/strong>, so no strings attached \u2192&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\">Bytewax<\/a>&nbsp;[2] \u2190<\/em><\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"a51d\">4. Pydantic data models<\/h2>\n\n\n\n<p id=\"7f2d\">Let\u2019s take a look at what our Pydantic models look like.<\/p>\n\n\n\n<p id=\"b187\">First, we defined a set of base abstract models for using the same parent class across all our components.<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*JbZEKLu74rX-yGrR_APEdA.png\" alt=\"Code used to create real-time streaming pipeline\/feature pipeline with CDC\"\/><figcaption class=\"wp-element-caption\">Pydantic base model structure<\/figcaption><\/figure>\n\n\n\n<p id=\"6206\">Afterward, we defined a hierarchy of Pydantic models for:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>All our data types: posts, articles, or code<\/li>\n\n\n\n<li>All our states: raw, cleaned, chunked, and embedded<\/li>\n<\/ul>\n\n\n\n<p id=\"8079\">This is how the set of classes for the posts will look like \u2193<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*3MKCIK_5e0h-g9SF7sZk6g.png\" alt=\"Code used to create real-time streaming pipeline\/feature pipeline with CDC\"\/><figcaption class=\"wp-element-caption\">Pydantic posts model structure<\/figcaption><\/figure>\n\n\n\n<p id=\"e229\">We&nbsp;<strong>repeated<\/strong>&nbsp;the s<strong>ame process<\/strong>&nbsp;for the&nbsp;<strong>articles<\/strong>&nbsp;and&nbsp;<strong>code<\/strong>&nbsp;model&nbsp;<strong>hierarchy<\/strong>.<\/p>\n\n\n\n<p><em>Check out the other data classes on our GitHub at&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/tree\/main\/src\/feature_pipeline\/models\">feature_pipeline\/models.<\/a><\/em><\/p>\n\n\n\n<h3 class=\"wp-block-heading\">Why is keeping our data in Pydantic models so powerful?<\/h3>\n\n\n\n<p id=\"3ade\"><em>There are 4 main criteria:<\/em><\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Every field has an enforced type: you are ensured the data types are going to be correct<\/li>\n\n\n\n<li>The fields are automatically validated based on their type: for example, if the field is a string and you pass an int, it will through an error<\/li>\n\n\n\n<li>The data structure is clear and verbose: no more clandestine\u00a0<em>dicts<\/em>\u00a0that you never know what is in them<\/li>\n\n\n\n<li>You make your data the first-class citizen of your program<\/li>\n<\/ul>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"2684\">5. Load data to Qdrant<\/h2>\n\n\n\n<p id=\"a22a\">The first step is to implement our custom&nbsp;<a href=\"https:\/\/docs.bytewax.io\/stable\/api\/bytewax\/bytewax.outputs.html?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax&nbsp;<em>DynamicSink<\/em>&nbsp;class&nbsp;<\/a>\u2193<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*1wtF4XXiq7V87WVZ2H1hUg.png\" alt=\"Code used to create real-time streaming pipeline\/feature pipeline with CDC\"\/><figcaption class=\"wp-element-caption\">Qdrant DynamicSink<\/figcaption><\/figure>\n\n\n\n<p id=\"a76c\">Next, for every type of operation we need (output cleaned or embedded data ) we have to subclass the&nbsp;<a href=\"https:\/\/docs.bytewax.io\/stable\/api\/bytewax\/bytewax.outputs.html?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\"><em>StatelessSinkPartition<\/em>&nbsp;Bytewax class<\/a>&nbsp;(they also provide a stateful option \u2192&nbsp;<a href=\"https:\/\/docs.bytewax.io\/stable\/guide\/index.html?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">more in their docs<\/a>)<\/p>\n\n\n\n<p id=\"3b61\">An instance of the class will run on every partition defined within the Bytewax deployment.<\/p>\n\n\n\n<p id=\"b811\">In the course, we are using a single partition per worker. But, by adding more partitions (and workers), you can quickly scale your Bytewax feature pipeline horizontally.<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*twK1xTc66IqMwUugoT7l6g.png\" alt=\"Code used to create real-time streaming pipeline\/feature pipeline with CDC\"\/><figcaption class=\"wp-element-caption\">Qdrant worker partitions<\/figcaption><\/figure>\n\n\n\n<p id=\"1154\">Note that we used<strong>&nbsp;Qdrant\u2019s<\/strong>&nbsp;<strong>Batch<\/strong>&nbsp;method to upload all the available points at once. By doing so, we&nbsp;<strong>reduce<\/strong>&nbsp;the&nbsp;<strong>latency<\/strong>&nbsp;on the&nbsp;<strong>network I\/O<\/strong>&nbsp;side:&nbsp;<a href=\"https:\/\/qdrant.tech\/documentation\/concepts\/points\/#upload-points?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">more on that here<\/a>&nbsp;[8] \u2190<\/p>\n\n\n\n<p><em>The RabbitMQ streaming input follows a similar pattern.&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/src\/feature_pipeline\/data_flow\/stream_input.py\">Check it out here<\/a>&nbsp;\u2190<\/em><\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"f058\">6. The dispatcher layer<\/h2>\n\n\n\n<p id=\"371d\">Now that we have the&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>&nbsp;flow and all our data models.<\/p>\n\n\n\n<p id=\"b86c\">How do we map a raw data model to a cleaned data model?<\/p>\n\n\n\n<p id=\"84d7\">\u2192 All our domain logic is modeled by a set of&nbsp;<em>Handler()<\/em>&nbsp;classes.<\/p>\n\n\n\n<p id=\"9d92\">For example, this is how the&nbsp;<strong>handler<\/strong>&nbsp;used to&nbsp;<strong>map<\/strong>&nbsp;a&nbsp;<strong>PostsRawModel<\/strong>&nbsp;to a&nbsp;<strong>PostCleanedModel<\/strong>&nbsp;looks like \u2193<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*u_BY9FdUveouSMlx-64hUA.png\" alt=\"Code used to create real-time streaming pipeline\/feature pipeline with CDC\"\/><figcaption class=\"wp-element-caption\">Handler hierarchy of classes<\/figcaption><\/figure>\n\n\n\n<p>\u2192&nbsp;<em>Check out the other handlers on our GitHub at&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/tree\/main\/src\/feature_pipeline\/data_logic\">feature_pipeline\/data_logic.<\/a><\/em><\/p>\n\n\n\n<p id=\"33f2\">The following sections will explore the exact cleaning, chunking and embedding logic.<\/p>\n\n\n\n<p id=\"3311\"><strong>Now, to build our dispatcher, we need 2 last components:<\/strong><\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><strong>A factory class:<\/strong>\u00a0instantiates the right handler based on the type of the event<\/li>\n\n\n\n<li><strong>A dispatcher class:<\/strong>\u00a0the glue code that calls the factory class and handler<\/li>\n<\/ul>\n\n\n\n<p id=\"f67d\"><strong>Here is what the cleaning dispatcher and factory look like<\/strong>&nbsp;\u2193<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*jbpWplME5f5Amnig2t5E4w.png\" alt=\"Screenshot of code used to create Cleaning Dispatcher class and Cleaning Handler Factory class\"\/><figcaption class=\"wp-element-caption\">The dispatcher and factory classes<\/figcaption><\/figure>\n\n\n\n<p><em>Check out the other dispatchers on our GitHub at&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/src\/feature_pipeline\/data_logic\/dispatchers.py\">feature_pipeline\/data_logic\/dispatchers.py<\/a><\/em><\/p>\n\n\n\n<p id=\"7b73\"><strong>By repeating the same logic, we will end up with the following set of dispatchers:<\/strong><\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li><em>RawDispatcher<\/em>\u00a0(no factory class required as the data is not processed)<\/li>\n\n\n\n<li><em>CleaningDispatcher<\/em>\u00a0(with a\u00a0<em>ChunkingHandlerFactory<\/em>\u00a0class)<\/li>\n\n\n\n<li><em>ChunkingDispatcher<\/em>\u00a0(with a\u00a0<em>ChunkingHandlerFactory<\/em>\u00a0class)<\/li>\n\n\n\n<li><em>EmbeddingDispatcher<\/em>\u00a0(with an\u00a0<em>EmbeddingHandlerFactory<\/em>\u00a0class)<\/li>\n<\/ul>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"fefe\">7. Preprocessing steps: Clean, chunk, embed<\/h2>\n\n\n\n<p id=\"3ccd\">Here we will focus on the concrete logic used to clean, chunk, and embed a data point.<\/p>\n\n\n\n<p id=\"077f\">Note that this logic is wrapped by our handler to be integrated into our dispatcher layer using the&nbsp;<a href=\"https:\/\/refactoring.guru\/design-patterns\/strategy\" target=\"_blank\" rel=\"noreferrer noopener\">Strategy behavioral pattern<\/a>&nbsp;[10].<\/p>\n\n\n\n<p id=\"7be4\">We already described that in the previous section. Thus, we will directly jump into the actual logic here, which can be found in the&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/tree\/module-3\/course\/module-3\/utils\" target=\"_blank\" rel=\"noreferrer noopener\"><strong>utils<\/strong>&nbsp;<strong>module<\/strong>&nbsp;of our&nbsp;<strong>GitHub repository<\/strong><\/a>.<\/p>\n\n\n\n<p><em><strong>Note:<\/strong>&nbsp;These steps are experimental. Thus, what we present here is just the first iteration of the system. In a real-world scenario, you would experiment with different cleaning, chunking or model versions to improve it on your data.<\/em><\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"3e9b\">Cleaning<\/h3>\n\n\n\n<p id=\"8834\">This is the main utility function used to clean the text for our posts, articles, and code.<\/p>\n\n\n\n<p id=\"d25e\">Out of simplicity, we used the same logic for all the data types, but after more investigation, you would probably need to adapt it to your specific needs.<\/p>\n\n\n\n<p id=\"7e25\">For example, your posts might start containing some weird characters, and you don\u2019t want to run the \u201cunbold_text()\u201d or \u201cunitalic_text()\u201d functions on your code data point as is completely redundant.<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*feln22SMrlwp0Z6cT92RaA.png\" alt=\"screenshot of code used to create clean text function\"\/><figcaption class=\"wp-element-caption\">Cleaning logic<\/figcaption><\/figure>\n\n\n\n<p id=\"3109\">Most of the functions above are from the&nbsp;<a href=\"https:\/\/unstructured-io.github.io\/unstructured\/core\/cleaning.html#clean\" target=\"_blank\" rel=\"noreferrer noopener\"><em>unstructured<\/em><\/a>&nbsp;[3] Python package. It is a great tool for quickly finding utilities to clean text data.<\/p>\n\n\n\n<p id=\"545d\">\ud83d\udd17 More examples of&nbsp;<a href=\"https:\/\/unstructured-io.github.io\/unstructured\/core\/cleaning.html#clean\" target=\"_blank\" rel=\"noreferrer noopener\">unstructured here<\/a>&nbsp;[3] \u2190<\/p>\n\n\n\n<p id=\"c235\">One key thing to notice is that at the cleaning step, we just want to remove all the weird, non-interpretable characters from the text.<\/p>\n\n\n\n<p id=\"cf0c\">Also, we want to remove redundant data, such as extra whitespace or URLs, as they do not provide much value.<\/p>\n\n\n\n<p id=\"c044\"><strong>These steps are critical for our tokenizer to understand and efficiently transform our string input into numbers that will be fed into the transformer models.<\/strong><\/p>\n\n\n\n<p id=\"aaf7\">Note that when using bigger models (transformers) + modern tokenization techniques, you don\u2019t need to standardize your dataset too much.<\/p>\n\n\n\n<p id=\"1aa6\">For example, it is redundant to apply lemmatization or stemming, as the tokenizer knows how to split your input into a commonly used sequence of characters efficiently, and the transformers can pick up the nuances of the words.<\/p>\n\n\n\n<p id=\"0276\">\ud83d\udca1 What is important at the cleaning step is to throw out the noise.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"f834\">Chunking<\/h3>\n\n\n\n<p id=\"e3e6\">We are using Langchain to chunk our text.<\/p>\n\n\n\n<p id=\"9f44\">We use a&nbsp;<strong>2 step strategy using Langchain\u2019s&nbsp;<\/strong><a href=\"https:\/\/python.langchain.com\/docs\/modules\/data_connection\/document_transformers\/recursive_text_splitter\/\" target=\"_blank\" rel=\"noreferrer noopener\"><em>RecursiveCharacterTextSplitter<\/em><\/a><em>&nbsp;[4]&nbsp;<\/em>and&nbsp;<a href=\"https:\/\/python.langchain.com\/docs\/modules\/data_connection\/document_transformers\/split_by_token\/\" target=\"_blank\" rel=\"noreferrer noopener\"><em>SentenceTransformersTokenTextSplitter<\/em><\/a><em>&nbsp;[5]<\/em>. As seen below \u2193<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*jbbc9N3Dv6lixRwG-CHjug.png\" alt=\"screenshot of code used to create chunk text function\"\/><figcaption class=\"wp-element-caption\">Chunking logic<\/figcaption><\/figure>\n\n\n\n<p id=\"6000\">Overlapping your chunks is a common pre-indexing RAG technique, which helps to cluster chunks from the same document semantically.<\/p>\n\n\n\n<p id=\"53a1\">Again, we are using the same chunking logic for all of our data types, but to get the most out of it, we would probably need to tweak the&nbsp;<em>separators<\/em>,&nbsp;<em>chunk_size<\/em>, and&nbsp;<em>chunk_overlap<\/em>&nbsp;parameters for our different use cases.<\/p>\n\n\n\n<p id=\"ca2c\">But our&nbsp;<em>dispatcher<\/em>&nbsp;+&nbsp;<em>handler<\/em>&nbsp;<em>architecture<\/em>&nbsp;would easily allow us to configure the chunking step in future iterations.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\" id=\"ee51\">Embedding<\/h3>\n\n\n\n<p id=\"08ec\">The data preprocessing, aka the hard part is done.<\/p>\n\n\n\n<p id=\"761d\">Now we just have to call an embedding model to create our vectors.<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:1000\/1*7J9OcKDZLLxNGq462xKmYA.png\" alt=\"screenshot of code used to create embed text function and embed repositories function\"\/><figcaption class=\"wp-element-caption\">Embedding logic<\/figcaption><\/figure>\n\n\n\n<p id=\"e132\">We used the&nbsp;<em>all-MiniLm-L6-v2<\/em>&nbsp;[6] from the sentence-transformers library to embed our articles and posts: a lightweight embedding model that can easily run in real-time on a 2 vCPU machine.<\/p>\n\n\n\n<p id=\"5fb9\">As the code data points contain more complex relationships and specific jargon to embed, we used a more powerful embedding model:&nbsp;<a href=\"https:\/\/huggingface.co\/hkunlp\/instructor-xl\" target=\"_blank\" rel=\"noreferrer noopener\"><em>hkunlp\/instructor-xl<\/em><\/a><em>&nbsp;[7]<\/em>.<\/p>\n\n\n\n<p id=\"ae62\">This embedding model is unique as it can be customized on the fly with instructions based on your particular data. This allows the embedding model to specialize on your data without fine-tuning, which is handy for embedding pieces of code.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\" id=\"4a40\">8. Run the feature pipeline<\/h2>\n\n\n\n<p id=\"5dac\">To quickly test things up, we wrote a&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/test-module-3\/course\/module-3\/docker-compose.yml\" target=\"_blank\" rel=\"noreferrer noopener\"><em>docker-compose.yaml&nbsp;<\/em><\/a>file thaat will start and run the following Docker containers.<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>MongoDB, RabbitMQ, Qdrant<\/li>\n\n\n\n<li>The CDC microservice<\/li>\n\n\n\n<li>The feature pipeline<\/li>\n<\/ul>\n\n\n\n<p>You can spin up the Docker containers using our&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/Makefile\">Makefile<\/a>&nbsp;by running the following, which will run 24\/7 the CDC service and streaming pipeline:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> make local-start <\/code><\/pre>\n\n\n\n<p>To test that everything works as expected, you can kick off the workflow by crawling a random link, such as by running:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> Make local-test-medium <\/code><\/pre>\n\n\n\n<p id=\"fa24\"><em>After running the crawling command, this will happen:&nbsp;<\/em><\/p>\n\n\n\n<ol class=\"wp-block-list\">\n<li>It will crawl a Medium\/GitHub link;<\/li>\n\n\n\n<li>Process and add the data to MongoDB;<\/li>\n\n\n\n<li>The CDC component will be triggered, which will populate the RabbitMQ with the event;<\/li>\n\n\n\n<li>The RAG feature pipeline will read the event from RabbitMQ, process it for RAG, and add it to the Qdrant vector DB.<\/li>\n<\/ol>\n\n\n\n<p>To check the logs of the feature pipeline to see that it processed the events from RabbitMQ successfully, you can run the following:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code> docker logs llm-twin-feature-pipeline <\/code><\/pre>\n\n\n\n<p>\u2026and you should see something similar to:<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:2000\/1*RQf_Xc9WmNP4qo7Fqygwzw.png\" alt=\"\"\/><figcaption class=\"wp-element-caption\">Screenshot after running&nbsp;\u201cdocker logs llm-twin-feature-pipeline\u201d<\/figcaption><\/figure>\n\n\n\n<p>The last step is to navigate to the Qdrant dashboard at http:\/\/localhost:6333\/dashboard to validate that the vector DB has been successfully populated, as seen in the image below:<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/miro.medium.com\/v2\/resize:fit:2000\/1*nwxR0Lzx1HJ_UkHZlRwsYg.png\" alt=\"\"\/><figcaption class=\"wp-element-caption\">Screenshot from the Qdrant dashboard<\/figcaption><\/figure>\n\n\n\n<p><em>Find&nbsp;<strong>step-by-step instructions<\/strong>&nbsp;on installing and running<strong>&nbsp;the entire course<\/strong>&nbsp;in our&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\/blob\/main\/INSTALL_AND_USAGE.md\">INSTALL_AND_USAGE<\/a>&nbsp;document from the repository.<\/em><\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Conclusion<\/h2>\n\n\n\n<p id=\"065f\">Now you know how to write streaming pipelines like a PRO!<\/p>\n\n\n\n<p id=\"8f05\">In&nbsp;<strong>Lesson 4<\/strong>, you&nbsp;<strong>learned how<\/strong>&nbsp;to:<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>Design a feature pipeline using the 3-pipeline architecture<\/li>\n\n\n\n<li>Write a streaming pipeline using\u00a0<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>\u00a0as a streaming engine<\/li>\n\n\n\n<li>Use a dispatcher layer to write a modular and flexible application to process multiple types of data (posts, articles, code)<\/li>\n\n\n\n<li>Load the cleaned and embedded data to\u00a0<a href=\"https:\/\/qdrant.tech\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant<\/a><\/li>\n<\/ul>\n\n\n\n<p id=\"e59f\">\u2192 This is only the ingestion part used for fine-tuning LLMs and RAG.<\/p>\n\n\n\n<p id=\"bf94\">In&nbsp;<strong>Lesson 5<\/strong>, you will learn how to write a retrieval client for the 3 data types using good SWE practices and improve the retrieval accuracy using advanced retrieval &amp; post-retrieval techniques.&nbsp;<em>See you there!<\/em><\/p>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\">\n<p id=\"0c5f\">\ud83d\udd17&nbsp;<strong>Check out<\/strong>&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\" target=\"_blank\" rel=\"noreferrer noopener\">the code on GitHub<\/a>&nbsp;[1] and support us with a \u2b50\ufe0f<\/p>\n<\/blockquote>\n\n\n\n<h4 class=\"wp-block-heading\" id=\"cadd\">References<\/h4>\n\n\n\n<h5 class=\"wp-block-heading\">Literature<\/h5>\n\n\n\n<p id=\"a4cd\">[1]&nbsp;<a href=\"https:\/\/github.com\/decodingml\/llm-twin-course\" target=\"_blank\" rel=\"noreferrer noopener\">Your LLM Twin Course \u2014 GitHub Repository<\/a>&nbsp;(2024), Decoding ML GitHub Organization<\/p>\n\n\n\n<p id=\"f961\">[2]&nbsp;<a href=\"https:\/\/bytewax.io\/?utm_source=medium&amp;utm_medium=decodingml&amp;utm_campaign=2024_q1\" target=\"_blank\" rel=\"noreferrer noopener\">Bytewax<\/a>, Bytewax Landing Page<\/p>\n\n\n\n<p id=\"58f1\">[3]&nbsp;<a href=\"https:\/\/unstructured-io.github.io\/unstructured\/core\/cleaning.html#clean\" target=\"_blank\" rel=\"noreferrer noopener\">Unstructured Cleaning Examples<\/a>, Unstructured Documentation<\/p>\n\n\n\n<p id=\"4050\">[4]&nbsp;<a href=\"http:\/\/recursively%20split%20by%20character\/\" target=\"_blank\" rel=\"noreferrer noopener\">Recursively split by character<\/a>, LangChain\u2019s Documentation<\/p>\n\n\n\n<p id=\"43d4\">[5]&nbsp;<a href=\"https:\/\/python.langchain.com\/docs\/modules\/data_connection\/document_transformers\/split_by_token\/\" target=\"_blank\" rel=\"noreferrer noopener\">Split by tokens<\/a>, LangChain\u2019s Documentation<\/p>\n\n\n\n<p id=\"7903\">[6]&nbsp;<a href=\"https:\/\/huggingface.co\/sentence-transformers\/all-MiniLM-L6-v2\" target=\"_blank\" rel=\"noreferrer noopener\">sentence-transformers\/all-MiniLM-L6-v2<\/a>, HuggingFace<\/p>\n\n\n\n<p id=\"900c\">[7]&nbsp;<a href=\"http:\/\/hkunlp\/instructor-xl\" target=\"_blank\" rel=\"noreferrer noopener\">hkunlp\/instructor-xl<\/a>, HuggingFace<\/p>\n\n\n\n<p id=\"0bae\">[8]&nbsp;<a href=\"https:\/\/qdrant.tech\/documentation\/?utm_source=decodingml&amp;utm_medium=referral&amp;utm_campaign=llm-course\" target=\"_blank\" rel=\"noreferrer noopener\">Qdrant<\/a>, Qdrant Documentation<\/p>\n\n\n\n<p id=\"997a\">[9]&nbsp;<a href=\"https:\/\/refactoring.guru\/design-patterns\/abstract-factory\" target=\"_blank\" rel=\"noreferrer noopener\">Abstract Factory Pattern<\/a>, Refactoring Guru<\/p>\n\n\n\n<p id=\"692c\">[10]&nbsp;<a href=\"https:\/\/refactoring.guru\/design-patterns\/strategy\" target=\"_blank\" rel=\"noreferrer noopener\">Strategy Pattern<\/a>, Refactoring Guru<\/p>\n\n\n\n<h5 class=\"wp-block-heading\" id=\"fe02\">Images<\/h5>\n\n\n\n<p id=\"b77d\">If not otherwise stated, all images are created by the author.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Welcome to Lesson 4 of 12 in our free course series, LLM Twin: Building Your Production-Ready AI Replica. You\u2019ll learn how to use LLMs, vector DVs, and LLMOps best practices to design, train, and deploy a production ready \u201cLLM twin\u201d of yourself. This AI character will write like you, incorporating your style, personality, and voice [&hellip;]<\/p>\n","protected":false},"author":128,"featured_media":9611,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"customer_name":"","customer_description":"","customer_industry":"","customer_technologies":"","customer_logo":"","_jetpack_memberships_contains_paid_content":false,"footnotes":""},"categories":[65,6,7],"tags":[14,64,85,86,15,89,90,52,31,16,91,92],"coauthors":[222,223],"class_list":["post-9847","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-llmops","category-machine-learning","category-tutorials","tag-comet-ml","tag-cometllm","tag-data-pipeline","tag-data-quality","tag-deep-learning-experiment-management","tag-feature-engineering","tag-feature-pipeline","tag-llm","tag-llmops","tag-ml-experiment-management","tag-rag","tag-streaming-pipeline"],"yoast_head":"<!-- This site is optimized with the Yoast SEO Premium plugin v25.9 (Yoast SEO v25.9) - https:\/\/yoast.com\/wordpress\/plugins\/seo\/ -->\n<title>How to Build a Scalable Real-Time RAG Feature Pipeline<\/title>\n<meta name=\"description\" content=\"A step-by-step guide to design and build a production-ready feature pipeline for fine-tuning LLMs &amp; RAG.\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"SOTA Python Streaming Pipelines for Fine-tuning LLMs and RAG - in Real-Time!\" \/>\n<meta property=\"og:description\" content=\"A step-by-step guide to design and build a production-ready feature pipeline for fine-tuning LLMs &amp; RAG.\" \/>\n<meta property=\"og:url\" content=\"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/\" \/>\n<meta property=\"og:site_name\" content=\"Comet\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/cometdotml\" \/>\n<meta property=\"article:published_time\" content=\"2024-04-24T23:24:26+00:00\" \/>\n<meta property=\"article:modified_time\" content=\"2025-04-29T12:25:27+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/Screenshot-2024-03-27-at-10.01.21\u202fAM.png\" \/>\n\t<meta property=\"og:image:width\" content=\"1396\" \/>\n\t<meta property=\"og:image:height\" content=\"796\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/png\" \/>\n<meta name=\"author\" content=\"Paul Iusztin, Decoding ML\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@Cometml\" \/>\n<meta name=\"twitter:site\" content=\"@Cometml\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"Paul Iusztin, Decoding ML\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"18 minutes\" \/>\n<!-- \/ Yoast SEO Premium plugin. -->","yoast_head_json":{"title":"How to Build a Scalable Real-Time RAG Feature Pipeline","description":"A step-by-step guide to design and build a production-ready feature pipeline for fine-tuning LLMs & RAG.","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/","og_locale":"en_US","og_type":"article","og_title":"SOTA Python Streaming Pipelines for Fine-tuning LLMs and RAG - in Real-Time!","og_description":"A step-by-step guide to design and build a production-ready feature pipeline for fine-tuning LLMs & RAG.","og_url":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/","og_site_name":"Comet","article_publisher":"https:\/\/www.facebook.com\/cometdotml","article_published_time":"2024-04-24T23:24:26+00:00","article_modified_time":"2025-04-29T12:25:27+00:00","og_image":[{"width":1396,"height":796,"url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/Screenshot-2024-03-27-at-10.01.21\u202fAM.png","type":"image\/png"}],"author":"Paul Iusztin, Decoding ML","twitter_card":"summary_large_image","twitter_creator":"@Cometml","twitter_site":"@Cometml","twitter_misc":{"Written by":"Paul Iusztin, Decoding ML","Est. reading time":"18 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#article","isPartOf":{"@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/"},"author":{"name":"Paul Iusztin","@id":"https:\/\/www.comet.com\/site\/#\/schema\/person\/87bf0cb600025605b68dcd2f0d597560"},"headline":"SOTA Python Streaming Pipelines for Fine-tuning LLMs and RAG &#8211; in Real-Time!","datePublished":"2024-04-24T23:24:26+00:00","dateModified":"2025-04-29T12:25:27+00:00","mainEntityOfPage":{"@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/"},"wordCount":3611,"publisher":{"@id":"https:\/\/www.comet.com\/site\/#organization"},"image":{"@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#primaryimage"},"thumbnailUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/Screenshot-2024-03-27-at-10.01.21\u202fAM.png","keywords":["Comet ML","CometLLM","Data Pipeline","Data Quality","Deep Learning Experiment Management","Feature Engineering","Feature pipeline","LLM","LLMOps","ML Experiment Management","RAG","Streaming pipeline"],"articleSection":["LLMOps","Machine Learning","Tutorials"],"inLanguage":"en-US"},{"@type":"WebPage","@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/","url":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/","name":"How to Build a Scalable Real-Time RAG Feature Pipeline","isPartOf":{"@id":"https:\/\/www.comet.com\/site\/#website"},"primaryImageOfPage":{"@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#primaryimage"},"image":{"@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#primaryimage"},"thumbnailUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/Screenshot-2024-03-27-at-10.01.21\u202fAM.png","datePublished":"2024-04-24T23:24:26+00:00","dateModified":"2025-04-29T12:25:27+00:00","description":"A step-by-step guide to design and build a production-ready feature pipeline for fine-tuning LLMs & RAG.","breadcrumb":{"@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#primaryimage","url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/Screenshot-2024-03-27-at-10.01.21\u202fAM.png","contentUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/Screenshot-2024-03-27-at-10.01.21\u202fAM.png","width":1396,"height":796,"caption":"An End-to-End Framework for Production-Ready LLM Systems by Building Your LLM Twin by Paul Iusztin of DecodingML"},{"@type":"BreadcrumbList","@id":"https:\/\/www.comet.com\/site\/blog\/streaming-pipelines-for-fine-tuning-llms\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https:\/\/www.comet.com\/site\/"},{"@type":"ListItem","position":2,"name":"SOTA Python Streaming Pipelines for Fine-tuning LLMs and RAG &#8211; in Real-Time!"}]},{"@type":"WebSite","@id":"https:\/\/www.comet.com\/site\/#website","url":"https:\/\/www.comet.com\/site\/","name":"Comet","description":"Build Better Models Faster","publisher":{"@id":"https:\/\/www.comet.com\/site\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/www.comet.com\/site\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/www.comet.com\/site\/#organization","name":"Comet ML, Inc.","alternateName":"Comet","url":"https:\/\/www.comet.com\/site\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.comet.com\/site\/#\/schema\/logo\/image\/","url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2025\/01\/logo_comet_square.png","contentUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2025\/01\/logo_comet_square.png","width":310,"height":310,"caption":"Comet ML, Inc."},"image":{"@id":"https:\/\/www.comet.com\/site\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/cometdotml","https:\/\/x.com\/Cometml","https:\/\/www.youtube.com\/channel\/UCmN63HKvfXSCS-UwVwmK8Hw"]},{"@type":"Person","@id":"https:\/\/www.comet.com\/site\/#\/schema\/person\/87bf0cb600025605b68dcd2f0d597560","name":"Paul Iusztin","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/www.comet.com\/site\/#\/schema\/person\/image\/82264b94fb97af87b79646edc7e4fd81","url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2026\/05\/cropped-paul-iusztin-96x96.webp","contentUrl":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2026\/05\/cropped-paul-iusztin-96x96.webp","caption":"Paul Iusztin"},"sameAs":["https:\/\/decodingml.substack.com\/"],"url":"https:\/\/www.comet.com\/site\/blog\/author\/paul-iusztin\/"}]}},"jetpack_featured_media_url":"https:\/\/www.comet.com\/site\/wp-content\/uploads\/2024\/03\/Screenshot-2024-03-27-at-10.01.21\u202fAM.png","jetpack_sharing_enabled":true,"_links":{"self":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts\/9847","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/users\/128"}],"replies":[{"embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/comments?post=9847"}],"version-history":[{"count":3,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts\/9847\/revisions"}],"predecessor-version":[{"id":15787,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/posts\/9847\/revisions\/15787"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/media\/9611"}],"wp:attachment":[{"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/media?parent=9847"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/categories?post=9847"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/tags?post=9847"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.comet.com\/site\/wp-json\/wp\/v2\/coauthors?post=9847"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}