I Built a Realtime Smart Search Pipeline

In my previous blog post, I went over how I added a smart search feature to my website that lets my audience search across all my blog posts and YouTube videos.

In this post, I will focus on the real-time pipeline that is powering the smart search, so that every time I publish a new blog post or video, the content is automatically “indexed”.

Let’s get started.

A Quick Overview

If you don’t have the time to read my previous post, I will try to summarize the overall architecture for you:

    Blogs are stored as Markdown files inside my NextJS app

    Blog metadata (URL, title, slug, category, etc) are stored in a Postgres DB

    YouTube video data is refreshed using YouTube’s Data API

    YouTube metadata (URL, title, category, thumbnail, etc) are stored in a Postgres DB

    Vector embeddings are generated through OpenAI for both blogs and videos

    Generated embeddings are stored with the metadata

The end product: all blog and video metadata are stored in a Postgres DB called content, along with their vector embedding that is used to search for similar content.

When a user performs a search — say “productivity” — in real-time embeddings for the term “productivity” are generated, and then using some Postgres functions, a cosine similarity check is computed against all videos and blogs to return the top 10 most similar content.

What Needs to be Realtime?

Every time I publish a piece of content — whether a blog or video — I want something to automatically index my content, and make it discoverable by smart search.

I publish 2-3 times a week. With such a low cadence, I could have easily updated the indexes manually, but the engineer in me hates not automating repetitive tasks.

That’s why I had to figure out a way to automate the process.

So, this is what needed to happen every time I hit “Publish”.

For my blog posts:

    The process starts when a new Markdown file is added to my NextJS app

    The file (blog) needed to be read and parsed

    OpenAI Embeddings had to be generated and stored in Postgres (note: this step only applied to the new blog post. Generating embeddings for historical content is expensive and unnecessary)

For my YouTube videos, a similar process:

    The process starts whenever a new video is found using the YouTube API

    The video title and description are parsed and concatenated into a single string

    The string is passed to OpenAI for Embeddings to be generated

    Embeddings for the new video are stored in Postgres

If you understand better with illustrations, here you go.

First Step: FastAPI Endpoint

There were a few common pieces of logic that applied to both blogs and videos:

    Only generate embeddings for content not available in Postgres already

    For duplicate requests to OpenAI, serve from the cache to save money

    Expose the ability to “force refresh” if I want to forcefully generate new embeddings for historical content

Creating a new private endpoint to encapsulate the above made the most sense.

The endpoint took 3 query parameters:

    youtube — if a new YouTube video needs to be processed

    blog — if a new blog post needs to be processed

    force_refresh — if an already processed blog or video needs to be re-processed

To guarantee not processing already “seen” content, I only look for unique URLs.

A new blog or video only gets processed, when the new URL does not already exist in stored_video_urls.

The last important logic I want to focus on has to do with caching.

Requests to OpenAI can become expensive with increasing token sizes. Because of that, I don’t want to make API calls for input_text that I had already seen.

Using Python’s built-in lru_cache does the job for now. Later on, I plan on migrating to something like Redis.

Okay, so now that you know about the important parts of the endpoint, let’s talk about how the API is called in the real-time pipeline.

Second Step: Deployment Pipelines

Whenever I want to publish a new blog post, I run a bash script on my production server.

The bash script does the following:

    Pulls the latest changes on GitHub

    Kicks off a new Node process for my NextJS app

    Makes an API request to the endpoint above to kick off processing for the new blog post

So, whenever a new blog post is added to my website, automatically the API endpoint is invoked, which in turn updates the Postgres DB with embeddings for the new blog post.

Moving to my YouTube videos now, the process is slightly different.

I upload my YouTube videos through the YouTube Studio UI. So, my production server does not get notified immediately.

Instead, I have a cron job that runs every 10 minutes on my server. It does one thing only: Pulls the latest channel video using the YouTube API, and calls the above /create_embeddings endpoint if there’s a new video to process.

At worst, there will be a 10-minute delay between uploading a new video and it being available through Smart Search.

Third Step: Periodic Refreshes

Lastly, I have set up a weekly cron job that invokes the API endpoint as follows:


curl -X 'POST'   'http://127.0.0.1:8000/create_embeddings'   -H 'accept: application/json'   -H 'Content-Type: application/json'   -d '{
  "youtube": true,
  "blog": true,
  "force_refresh": false
}'
  

In other words, it invokes the internally running API to create embeddings for the latest blog and video, if for some reason, it wasn’t created already through step 2.

With this fallback, I know that even if my automated pipeline fails, there’s a backup that will make everything discoverable to smart search by the end of the week.

Closing Thoughts

If you have made it this far, I hope you found this valuable.

Here are a few ways you can do so: follow me on Medium, subscribe to my website, or follow me on YouTube.

Irtiza Hafiz

;