In this post, I will focus on the real-time pipeline that is powering the smart search, so that every time I publish a new blog post or video, the content is automatically “indexed”.
Let’s get started.
If you don’t have the time to read my previous post, I will try to summarize the overall architecture for you:
Blogs are stored as Markdown files inside my NextJS app
Blog metadata (URL, title, slug, category, etc) are stored in a Postgres DB
YouTube video data is refreshed using YouTube’s Data API
YouTube metadata (URL, title, category, thumbnail, etc) are stored in a Postgres DB
Vector embeddings are generated through OpenAI for both blogs and videos
Generated embeddings are stored with the metadata
The end product: all blog and video metadata are stored in a Postgres DB called content
, along with their vector embedding that is used to search for similar content.
When a user performs a search — say “productivity” — in real-time embeddings for the term “productivity” are generated, and then using some Postgres functions, a cosine similarity check is computed against all videos and blogs to return the top 10 most similar content.
Every time I publish a piece of content — whether a blog or video — I want something to automatically index my content, and make it discoverable by smart search.
I publish 2-3 times a week. With such a low cadence, I could have easily updated the indexes manually, but the engineer in me hates not automating repetitive tasks.
That’s why I had to figure out a way to automate the process.
So, this is what needed to happen every time I hit “Publish”.
For my blog posts:
The process starts when a new Markdown file is added to my NextJS app
The file (blog) needed to be read and parsed
OpenAI Embeddings had to be generated and stored in Postgres (note: this step only applied to the new blog post. Generating embeddings for historical content is expensive and unnecessary)
For my YouTube videos, a similar process:
The process starts whenever a new video is found using the YouTube API
The video title and description are parsed and concatenated into a single string
The string is passed to OpenAI for Embeddings to be generated
Embeddings for the new video are stored in Postgres
If you understand better with illustrations, here you go.
There were a few common pieces of logic that applied to both blogs and videos:
Only generate embeddings for content not available in Postgres already
For duplicate requests to OpenAI, serve from the cache to save money
Expose the ability to “force refresh” if I want to forcefully generate new embeddings for historical content
Creating a new private endpoint to encapsulate the above made the most sense.
The endpoint took 3 query parameters:
youtube
— if a new YouTube video needs to be processed
blog
— if a new blog post needs to be processed
force_refresh
— if an already processed blog or video needs to be re-processed
To guarantee not processing already “seen” content, I only look for unique URLs.
A new blog or video only gets processed, when the new URL does not already exist in stored_video_urls
.
The last important logic I want to focus on has to do with caching.
Requests to OpenAI can become expensive with increasing token sizes. Because of that, I don’t want to make API calls for input_text
that I had already seen.
Using Python’s built-in lru_cache
does the job for now. Later on, I plan on migrating to something like Redis.
Okay, so now that you know about the important parts of the endpoint, let’s talk about how the API is called in the real-time pipeline.
Whenever I want to publish a new blog post, I run a bash script on my production server.
The bash script does the following:
Pulls the latest changes on GitHub
Kicks off a new Node process for my NextJS app
Makes an API request to the endpoint above to kick off processing for the new blog post
So, whenever a new blog post is added to my website, automatically the API endpoint is invoked, which in turn updates the Postgres DB with embeddings for the new blog post.
Moving to my YouTube videos now, the process is slightly different.
I upload my YouTube videos through the YouTube Studio UI. So, my production server does not get notified immediately.
Instead, I have a cron job that runs every 10 minutes on my server. It does one thing only: Pulls the latest channel video using the YouTube API, and calls the above /create_embeddings
endpoint if there’s a new video to process.
At worst, there will be a 10-minute delay between uploading a new video and it being available through Smart Search.
Lastly, I have set up a weekly cron job that invokes the API endpoint as follows:
curl -X 'POST' 'http://127.0.0.1:8000/create_embeddings' -H 'accept: application/json' -H 'Content-Type: application/json' -d '{
"youtube": true,
"blog": true,
"force_refresh": false
}'
In other words, it invokes the internally running API to create embeddings for the latest blog and video, if for some reason, it wasn’t created already through step 2.
With this fallback, I know that even if my automated pipeline fails, there’s a backup that will make everything discoverable to smart search by the end of the week.
If you have made it this far, I hope you found this valuable.
Here are a few ways you can do so: follow me on Medium, subscribe to my website, or follow me on YouTube.