From Reddit Threads to Product Listings: OnlyBuyForLife's Ingestion Stack
Summary: We built a product discovery engine that pulls Reddit community wisdom through an LLM, enriches it with live eBay listings, and serves it to shoppers. This is a technical walkthrough of how the pipeline works—and the specific design decisions that keep it stable across five external APIs.
What We Built
OnlyBuyForLife is a curated product discovery site powered by r/BuyItForLife, a Reddit community dedicated to finding durable, long-lasting products worth the investment. The core idea is simple: Reddit's 1.5M subscribers collectively surface exceptional products every day. We extract those recommendations, enrich them with live pricing from eBay and Amazon, and present them in a clean, browseable storefront.
Simple idea. Complicated plumbing.
Our data pipeline touches five external systems: Reddit's API, OpenAI (GPT-4o-mini), eBay's Browse API, Amazon's product catalog, and a Supabase PostgreSQL database. Each one has its own schema, its own failure modes, and its own expectations about what "valid data" looks like. A central design goal was to ensure that quirks in any one of them do not cascade into the others.
The Architecture
Data flows through five sequential stages, each writing status updates to a pipeline_runs table so we have a full audit trail of every run:
Reddit API (asyncpraw)
│ top posts + comment threads
▼
GPT-4o-mini (pydantic-ai)
│ structured product recommendations
▼
Deduplication (rapidfuzz)
│ merged items + updated scores
▼
eBay Browse API (httpx)
│ live listings and prices
▼
Supabase (PostgreSQL)
│
▼
FastAPI + Jinja2 → user-facing storefront
A separate APScheduler job handles eBay sync on a configurable interval, running independently of the Reddit ingestion pipeline. Let's walk through how each stage works.
Stage 1: Extracting Recommendations from Reddit
We use asyncpraw to pull the top posts from r/BuyItForLife on a scheduled basis. Each post's comment thread is formatted and sent to GPT-4o-mini via pydantic-ai, with each comment prefixed by its upvote score so the model can weigh community signals:
[Score: 847] Cast iron is the answer. Lodge or Le Creuset, both last forever.
[Score: 312] +1 for Lodge. Been using mine for 15 years.
[Score: 204] All-Clad if you want stainless. D3 line specifically.
The system prompt encodes 15 extraction rules: exclude retailers-as-products, brand-only mentions without a model identifier, vague descriptions, and negative recommendations. The AI returns a structured list of product names, descriptions, categories, and scores.
Because LLM output can arrive in several shapes—a dict with an items key, a bare list, a raw JSON string, or occasionally JSON embedded in a markdown code block—we normalize the response before validation:
result = await agent.run(prompt)
data = result.data
if isinstance(data, str):
match = re.search(r"\{.*\}", data, re.DOTALL)
if match:
data = json.loads(match.group())
if isinstance(data, dict) and "items" in data:
data = data["items"]
After normalization, we pass the data into Pydantic models for validation. This is the boundary where all external AI output becomes trusted internal data:
from pydantic import BaseModel, field_validator
class Item(BaseModel):
name: str
description: str
category: str
score: int = 0
@field_validator("category")
def normalize_category(cls, v: str) -> str:
return canonicalize_category(v)
@field_validator("name")
def clean_name(cls, v: str) -> str:
return " ".join(v.split()).strip()
class ItemList(BaseModel):
items: list[Item]
Records that fail validation are logged with the exact field and error, then skipped. The pipeline continues with whatever valid records remain rather than halting on a single bad extraction.
Stage 2: Category Canonicalization
Category is a first-class filter dimension on the storefront—a product filed under the wrong category is effectively invisible. Because the LLM generates category labels from natural language, we cannot rely on it to produce perfectly consistent strings across thousands of extractions.
We canonicalize at every data boundary using an explicit lookup table:
CATEGORY_MAP = {
"kitchen toolsl & utensils": "Kitchen Tools & Utensils", # persistent LLM typo
"kitchenware": "Kitchen Tools & Utensils",
"outdoors & camping": "Outdoor & Recreation",
# ... 40+ entries covering synonyms, typos, and plurality variants
}
def canonicalize_category(raw: str) -> str:
normalized = raw.strip().lower()
return CATEGORY_MAP.get(normalized, raw.title())
This runs when the AI returns a category, when items are imported from file, when pinned submissions are loaded, and when items are written to the database. Calling it at every boundary means no uncanonicalized string can reach the storefront regardless of which code path produced it.
Stage 3: Fuzzy Deduplication
Reddit users describe the same product in dozens of ways:
"All-Clad D3 Stainless 12-inch Skillet""All-Clad D3""All Clad skillet""allclad d3 fry pan"
Inserting all four as separate items would fragment community signal—each variant would have a fraction of the upvotes and mentions that the unified product deserves. Before any new item touches the database, we compare it against every existing item using four strategies in priority order:
from rapidfuzz import fuzz
def compare_product_names(a: str, b: str) -> tuple[bool, str]:
a_norm = normalize_product_name(a)
b_norm = normalize_product_name(b)
# 1. Brand-prefix containment
brand_a = a_norm.split()[0]
if brand_a in b_norm and len(brand_a) > 3:
return True, "brand_match"
# 2. Token set ratio (handles word reordering)
if fuzz.token_set_ratio(a_norm, b_norm) > 85:
return True, "token_match"
# 3. Partial ratio (handles one name being a substring of another)
if fuzz.partial_ratio(a_norm, b_norm) > 90:
return True, "partial_match"
# 4. Jaccard word overlap
words_a, words_b = set(a_norm.split()), set(b_norm.split())
overlap = len(words_a & words_b) / max(len(words_a | words_b), 1)
if overlap > 0.8:
return True, "word_match"
return False, "no_match"
normalize_product_name() strips common suffixes (" set", " series", " collection") and normalizes brand variants (allclad → all-clad) before comparison. When a match is found, scores and mention counts are additive-merged onto the existing record and the description is updated only if the incoming item carries a higher community score.
Stage 4: Live eBay Listings
A separate APScheduler job continuously syncs eBay listings for every product in the catalog. It runs in two phases:
- Discover: items with no eBay listings yet
- Refresh: items whose listings haven't been checked within
EBAY_REFRESH_AGE_HOURS
Both phases use a asyncio.Semaphore-bounded pool of concurrent eBay API calls to stay within rate limits. Results are upserted on ebay_item_id, so a listing that returns across multiple syncs is updated in place rather than duplicated.
eBay's relevance ranking does not know our context. A search for "Toyota Land Cruiser" returns complete vehicles; a search for "KitchenAid Dishwasher" returns replacement parts before it returns the appliance. We encode domain-aware scoring to counteract this:
def _score_listing(self, item: dict, query: str) -> float:
title = item.get("title", "")
price = float(item.get("price", {}).get("value", 0))
title_score = fuzz.token_set_ratio(query, title) / 100
price_factor = 1 / (1 + price / 100)
score = title_score * 0.7 + price_factor * 0.3
# Whole-vehicle listings score near zero when we want parts
if self._is_automotive_query(query) and self._is_whole_vehicle(title):
score *= 0.1
# Replacement parts score near zero for appliance queries
if "dishwasher" in query.lower() and self._is_appliance_part(title):
score *= 0.2
return score
Listings not seen in a refresh get an incremented miss_count. Once they cross a configured threshold, they are soft-deleted with an ended_at timestamp. Items that repeatedly return no listings get an exponentially increasing cooldown, so the sync job stops querying searches that have never produced results.
eBay's API schema also evolves—fields are added and removed between versions. We configure our listing model to silently drop unknown fields so a schema update upstream does not crash the sync:
class EbayListing(BaseModel, extra="ignore"):
item_id: str
title: str
price: float
image_url: str | None = None
condition: str | None = None
Stage 5: Pipeline Observability
The pipeline tracks its own state through a set of named statuses written to the pipeline_runs table at each transition:
started → extracting → deduplicating → updating_retailers → checking_urls → completed
We modeled these as a typed enum rather than raw strings, so an invalid status value raises a validation error immediately:
class PipelineStatus(str, Enum):
STARTED = "started"
EXTRACTING = "extracting"
DEDUPLICATING = "deduplicating"
UPDATING_RETAILERS = "updating_retailers"
CHECKING_URLS = "checking_urls"
COMPLETED = "completed"
FAILED = "failed"
class PipelineRun(BaseModel):
id: UUID
status: PipelineStatus
started_at: datetime
completed_at: datetime | None = None
items_added: int = 0
items_updated: int = 0
error_message: str | None = None
Every run is fully auditable: when it started, which stage it was in when it failed, how many items were added or updated, and what error was raised if it did not complete.
The Core Design Principle
Every component in this pipeline—the LLM, eBay's Browse API, Reddit's API, Amazon's URL structure—can and does change without notice. The consistent design decision across the stack is to treat all incoming data as untrusted and validate it against an internal contract at the point it enters our system.
Pydantic enforces that contract at every boundary. An unknown field from eBay is discarded. A malformed category from the LLM is canonicalized or rejected. A product name that fails cleaning raises a ValidationError with the exact field and reason, logged and skipped, before it ever reaches the database.
The result is a pipeline that degrades gracefully rather than failing completely when any one of its five upstream sources behaves unexpectedly.
Get In Touch
Have questions about the product catalog, affiliate partnerships, or the pipeline? Learn more about us on the About page, drop us a line on the Contact page, or open an issue on the project repository.