From Reddit Threads to Product Listings: OnlyBuyForLife's Ingestion Stack

Data Engineering

Summary: We built a product discovery engine that pulls Reddit community wisdom through an LLM, enriches it with live eBay listings, and serves it to shoppers. This is a technical walkthrough of how the pipeline works—and the specific design decisions that keep it stable across five external APIs.


What We Built

OnlyBuyForLife is a curated product discovery site powered by r/BuyItForLife, a Reddit community dedicated to finding durable, long-lasting products worth the investment. The core idea is simple: Reddit's 1.5M subscribers collectively surface exceptional products every day. We extract those recommendations, enrich them with live pricing from eBay and Amazon, and present them in a clean, browseable storefront.

Simple idea. Complicated plumbing.

Our data pipeline touches five external systems: Reddit's API, OpenAI (GPT-4o-mini), eBay's Browse API, Amazon's product catalog, and a Supabase PostgreSQL database. Each one has its own schema, its own failure modes, and its own expectations about what "valid data" looks like. A central design goal was to ensure that quirks in any one of them do not cascade into the others.


The Architecture

Data flows through five sequential stages, each writing status updates to a pipeline_runs table so we have a full audit trail of every run:

Reddit API (asyncpraw)
       │  top posts + comment threads
       ▼
GPT-4o-mini  (pydantic-ai)
       │  structured product recommendations
       ▼
Deduplication  (rapidfuzz)
       │  merged items + updated scores
       ▼
eBay Browse API  (httpx)
       │  live listings and prices
       ▼
Supabase  (PostgreSQL)
       │
       ▼
FastAPI + Jinja2  →  user-facing storefront

A separate APScheduler job handles eBay sync on a configurable interval, running independently of the Reddit ingestion pipeline. Let's walk through how each stage works.


Stage 1: Extracting Recommendations from Reddit

We use asyncpraw to pull the top posts from r/BuyItForLife on a scheduled basis. Each post's comment thread is formatted and sent to GPT-4o-mini via pydantic-ai, with each comment prefixed by its upvote score so the model can weigh community signals:

[Score: 847] Cast iron is the answer. Lodge or Le Creuset, both last forever.
[Score: 312] +1 for Lodge. Been using mine for 15 years.
[Score: 204] All-Clad if you want stainless. D3 line specifically.

The system prompt encodes 15 extraction rules: exclude retailers-as-products, brand-only mentions without a model identifier, vague descriptions, and negative recommendations. The AI returns a structured list of product names, descriptions, categories, and scores.

Because LLM output can arrive in several shapes—a dict with an items key, a bare list, a raw JSON string, or occasionally JSON embedded in a markdown code block—we normalize the response before validation:

result = await agent.run(prompt)
data = result.data

if isinstance(data, str):
    match = re.search(r"\{.*\}", data, re.DOTALL)
    if match:
        data = json.loads(match.group())

if isinstance(data, dict) and "items" in data:
    data = data["items"]

After normalization, we pass the data into Pydantic models for validation. This is the boundary where all external AI output becomes trusted internal data:

from pydantic import BaseModel, field_validator

class Item(BaseModel):
    name: str
    description: str
    category: str
    score: int = 0

    @field_validator("category")
    def normalize_category(cls, v: str) -> str:
        return canonicalize_category(v)

    @field_validator("name")
    def clean_name(cls, v: str) -> str:
        return " ".join(v.split()).strip()

class ItemList(BaseModel):
    items: list[Item]

Records that fail validation are logged with the exact field and error, then skipped. The pipeline continues with whatever valid records remain rather than halting on a single bad extraction.


Stage 2: Category Canonicalization

Category is a first-class filter dimension on the storefront—a product filed under the wrong category is effectively invisible. Because the LLM generates category labels from natural language, we cannot rely on it to produce perfectly consistent strings across thousands of extractions.

We canonicalize at every data boundary using an explicit lookup table:

CATEGORY_MAP = {
    "kitchen toolsl & utensils": "Kitchen Tools & Utensils",   # persistent LLM typo
    "kitchenware": "Kitchen Tools & Utensils",
    "outdoors & camping": "Outdoor & Recreation",
    # ... 40+ entries covering synonyms, typos, and plurality variants
}

def canonicalize_category(raw: str) -> str:
    normalized = raw.strip().lower()
    return CATEGORY_MAP.get(normalized, raw.title())

This runs when the AI returns a category, when items are imported from file, when pinned submissions are loaded, and when items are written to the database. Calling it at every boundary means no uncanonicalized string can reach the storefront regardless of which code path produced it.


Stage 3: Fuzzy Deduplication

Reddit users describe the same product in dozens of ways:

  • "All-Clad D3 Stainless 12-inch Skillet"
  • "All-Clad D3"
  • "All Clad skillet"
  • "allclad d3 fry pan"

Inserting all four as separate items would fragment community signal—each variant would have a fraction of the upvotes and mentions that the unified product deserves. Before any new item touches the database, we compare it against every existing item using four strategies in priority order:

from rapidfuzz import fuzz

def compare_product_names(a: str, b: str) -> tuple[bool, str]:
    a_norm = normalize_product_name(a)
    b_norm = normalize_product_name(b)

    # 1. Brand-prefix containment
    brand_a = a_norm.split()[0]
    if brand_a in b_norm and len(brand_a) > 3:
        return True, "brand_match"

    # 2. Token set ratio (handles word reordering)
    if fuzz.token_set_ratio(a_norm, b_norm) > 85:
        return True, "token_match"

    # 3. Partial ratio (handles one name being a substring of another)
    if fuzz.partial_ratio(a_norm, b_norm) > 90:
        return True, "partial_match"

    # 4. Jaccard word overlap
    words_a, words_b = set(a_norm.split()), set(b_norm.split())
    overlap = len(words_a & words_b) / max(len(words_a | words_b), 1)
    if overlap > 0.8:
        return True, "word_match"

    return False, "no_match"

normalize_product_name() strips common suffixes (" set", " series", " collection") and normalizes brand variants (allcladall-clad) before comparison. When a match is found, scores and mention counts are additive-merged onto the existing record and the description is updated only if the incoming item carries a higher community score.


Stage 4: Live eBay Listings

A separate APScheduler job continuously syncs eBay listings for every product in the catalog. It runs in two phases:

  • Discover: items with no eBay listings yet
  • Refresh: items whose listings haven't been checked within EBAY_REFRESH_AGE_HOURS

Both phases use a asyncio.Semaphore-bounded pool of concurrent eBay API calls to stay within rate limits. Results are upserted on ebay_item_id, so a listing that returns across multiple syncs is updated in place rather than duplicated.

eBay's relevance ranking does not know our context. A search for "Toyota Land Cruiser" returns complete vehicles; a search for "KitchenAid Dishwasher" returns replacement parts before it returns the appliance. We encode domain-aware scoring to counteract this:

def _score_listing(self, item: dict, query: str) -> float:
    title = item.get("title", "")
    price = float(item.get("price", {}).get("value", 0))

    title_score = fuzz.token_set_ratio(query, title) / 100
    price_factor = 1 / (1 + price / 100)
    score = title_score * 0.7 + price_factor * 0.3

    # Whole-vehicle listings score near zero when we want parts
    if self._is_automotive_query(query) and self._is_whole_vehicle(title):
        score *= 0.1

    # Replacement parts score near zero for appliance queries
    if "dishwasher" in query.lower() and self._is_appliance_part(title):
        score *= 0.2

    return score

Listings not seen in a refresh get an incremented miss_count. Once they cross a configured threshold, they are soft-deleted with an ended_at timestamp. Items that repeatedly return no listings get an exponentially increasing cooldown, so the sync job stops querying searches that have never produced results.

eBay's API schema also evolves—fields are added and removed between versions. We configure our listing model to silently drop unknown fields so a schema update upstream does not crash the sync:

class EbayListing(BaseModel, extra="ignore"):
    item_id: str
    title: str
    price: float
    image_url: str | None = None
    condition: str | None = None

Stage 5: Pipeline Observability

The pipeline tracks its own state through a set of named statuses written to the pipeline_runs table at each transition:

started → extracting → deduplicating → updating_retailers → checking_urls → completed

We modeled these as a typed enum rather than raw strings, so an invalid status value raises a validation error immediately:

class PipelineStatus(str, Enum):
    STARTED = "started"
    EXTRACTING = "extracting"
    DEDUPLICATING = "deduplicating"
    UPDATING_RETAILERS = "updating_retailers"
    CHECKING_URLS = "checking_urls"
    COMPLETED = "completed"
    FAILED = "failed"

class PipelineRun(BaseModel):
    id: UUID
    status: PipelineStatus
    started_at: datetime
    completed_at: datetime | None = None
    items_added: int = 0
    items_updated: int = 0
    error_message: str | None = None

Every run is fully auditable: when it started, which stage it was in when it failed, how many items were added or updated, and what error was raised if it did not complete.


The Core Design Principle

Every component in this pipeline—the LLM, eBay's Browse API, Reddit's API, Amazon's URL structure—can and does change without notice. The consistent design decision across the stack is to treat all incoming data as untrusted and validate it against an internal contract at the point it enters our system.

Pydantic enforces that contract at every boundary. An unknown field from eBay is discarded. A malformed category from the LLM is canonicalized or rejected. A product name that fails cleaning raises a ValidationError with the exact field and reason, logged and skipped, before it ever reaches the database.

The result is a pipeline that degrades gracefully rather than failing completely when any one of its five upstream sources behaves unexpectedly.


Get In Touch

Have questions about the product catalog, affiliate partnerships, or the pipeline? Learn more about us on the About page, drop us a line on the Contact page, or open an issue on the project repository.