AI Content Pipeline
Manual content creation doesn't scale. This guide builds a complete end-to-end pipeline: ingest YouTube transcripts and research sources, extract insights, draft articles, run automated quality gates, route to human review, and publish — with one person doing the work of ten.
01 Pipeline Overview
An AI content pipeline replaces a sequence of manual decisions — what to write about, what to include, how to structure it, whether it's good enough — with a sequence of automated steps where AI handles the heavy lifting and humans intervene only at high-stakes decision points.
This guide builds the pipeline that powers Aether Intel: raw YouTube transcripts and research sources go in, polished articles ready for publishing come out.
Two human checkpoints in a seven-stage pipeline. Everything else runs automatically. The human's time is spent on judgment calls — angle selection after extraction, and final approval before publish — not on mechanical production tasks.
02 Stage 1: Source Ingestion
The pipeline starts with raw source material. The most valuable sources for an AI-focused publication are YouTube transcripts from expert creators, research papers, technical blog posts, and podcast transcripts. All need to be normalised into clean text before processing.
YouTube transcript extraction
YouTube's auto-generated and manual captions are accessible via youtube-transcript-api — no browser automation required.
from youtube_transcript_api import YouTubeTranscriptApi
import re
def ingest_youtube(video_id: str) -> dict:
"""Fetch and normalise a YouTube transcript."""
try:
transcript_list = YouTubeTranscriptApi.get_transcript(
video_id,
languages=["en", "en-US", "en-GB"],
)
except Exception as e:
raise ValueError(f"Transcript unavailable for {video_id}: {e}")
# Merge all segments into a single text block
full_text = " ".join(seg["text"] for seg in transcript_list)
# Basic normalisation
full_text = re.sub(r"\[.*?\]", "", full_text) # Remove [Music], [Applause] etc.
full_text = re.sub(r"\s+", " ", full_text).strip()
return {
"source_type": "youtube",
"source_id": video_id,
"url": f"https://youtube.com/watch?v={video_id}",
"raw_text": full_text,
"word_count": len(full_text.split()),
"char_count": len(full_text),
}
Web article ingestion
import requests
from bs4 import BeautifulSoup
import re
def ingest_url(url: str) -> dict:
"""Scrape and clean a web article."""
resp = requests.get(url, timeout=15, headers={"User-Agent": "Mozilla/5.0"})
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# Remove boilerplate
for tag in soup.find_all(["script", "style", "nav", "footer", "aside", "header"]):
tag.decompose()
# Extract main content (prefer article/main elements)
main = soup.find("article") or soup.find("main") or soup.find("body")
raw_text = main.get_text(separator=" ", strip=True)
raw_text = re.sub(r"\s+", " ", raw_text).strip()
return {
"source_type": "web",
"source_id": url,
"url": url,
"raw_text": raw_text,
"word_count": len(raw_text.split()),
"char_count": len(raw_text),
}
Source metadata schema
Every ingested source gets the same metadata envelope — this normalises disparate source types into a uniform format for downstream processing:
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Source:
source_type: str # "youtube" | "web" | "pdf" | "rss"
source_id: str # URL or video ID
url: str
raw_text: str
word_count: int
char_count: int
ingested_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
title: str = ""
author: str = ""
published_at: str = ""
tags: list[str] = field(default_factory=list)
03 Stage 2: Transcript Processing
Raw transcripts are messy: YouTube captions have no punctuation, duplicate phrases from speech disfluencies, and filler words. Processing makes the text usable for extraction.
Cleaning prompt
You are a transcript editor. Clean the raw transcript below.
Tasks:
1. Add punctuation and capitalisation where missing
2. Remove filler words (um, uh, you know, like, sort of, basically)
3. Remove false starts and repeated phrases (e.g. "I think — I think what we need...")
4. Correct obvious transcription errors (e.g. "ai" when context means "AI")
5. Preserve all factual content, specific claims, and technical terms exactly
Do NOT:
- Add information not in the original
- Change the meaning of any statement
- Add your own commentary
Return the cleaned transcript as plain text. No preamble or metadata.
RAW TRANSCRIPT:
{raw_text}
Chunking long transcripts
A 90-minute video produces ~15,000 words — too long for a single LLM context window in the extraction step. Chunk it before cleaning, then clean each chunk independently:
import asyncio
def chunk_text(text: str, max_words: int = 2000, overlap_words: int = 100) -> list[str]:
"""Split text into overlapping chunks by word count."""
words = text.split()
chunks = []
start = 0
while start < len(words):
end = min(start + max_words, len(words))
chunks.append(" ".join(words[start:end]))
start += max_words - overlap_words
return chunks
async def process_transcript(source: Source) -> str:
"""Clean a transcript, chunking if needed."""
if source.word_count <= 2500:
# Short enough to clean in one shot
return await llm_async(CLEAN_SYSTEM, source.raw_text)
# Long transcript — chunk, clean in parallel, reassemble
chunks = chunk_text(source.raw_text, max_words=2000, overlap_words=100)
cleaned_chunks = await asyncio.gather(*[
llm_async(CLEAN_SYSTEM, chunk) for chunk in chunks
])
return " ".join(cleaned_chunks)
04 Stage 3: Insight Extraction
This is the most important step in the pipeline. A good extraction pass transforms a 10,000-word transcript into 20–40 structured insights, claims, and quotes that the drafting step can build a focused article from.
Extraction prompt
You are a research analyst specialising in AI and technology.
Read the source material and extract every distinct insight, claim, statistic, or notable quote.
For each extracted item, assess:
- type: "insight" | "claim" | "statistic" | "quote" | "prediction"
- novelty: 1-5 (5 = something I haven't seen discussed elsewhere)
- importance: 1-5 (5 = genuinely changes how people should think about this)
- source_quote: the verbatim phrase or sentence from the source
Focus on:
✓ Specific claims with concrete numbers or evidence
✓ Counterintuitive or surprising findings
✓ Practical implications for AI practitioners
✓ Predictions with reasoning behind them
✓ Direct quotes from named experts
Ignore:
✗ Vague generalisations ("AI is changing everything")
✗ Common knowledge anyone in the field already knows
✗ Marketing claims without evidence
Return ONLY JSON:
{
"source_summary": "2-3 sentence summary of what this source is about",
"speaker": "Name of main speaker/author if identifiable, else null",
"insights": [
{
"id": 1,
"type": "insight",
"text": "The insight in one clear sentence",
"source_quote": "Exact phrase from source",
"novelty": 4,
"importance": 5,
"topic_tags": ["llm-training", "cost"]
}
]
}
SOURCE MATERIAL:
{cleaned_text}
Scoring and filtering
import json
def score_insight(insight: dict) -> float:
"""Composite score: novelty weighted 60%, importance 40%."""
return (insight["novelty"] * 0.6) + (insight["importance"] * 0.4)
def filter_insights(extraction: dict, min_score: float = 3.0) -> list[dict]:
insights = extraction.get("insights", [])
scored = [(i, score_insight(i)) for i in insights]
filtered = [(i, s) for i, s in scored if s >= min_score]
filtered.sort(key=lambda x: x[1], reverse=True)
return [i for i, _ in filtered]
Angle Selection
- Review the
source_summaryand top 10–15 scored insights - Select the article angle: which 5–8 insights best support a focused, publishable piece?
- Flag any insights that need external verification before use
- Set the target audience and word count for the draft
05 Stage 4: Drafting
With a curated set of insights and a chosen angle, the drafting stage produces a complete article. The key constraint: the draft must be grounded exclusively in the extracted insights — no hallucinated facts, no outside knowledge added without explicit sourcing.
Two-pass drafting: outline then prose
For longer articles (1500+ words), splitting drafting into an outline pass and a prose pass produces more structured results:
You are a content strategist for an AI-focused publication.
Create an article outline using the insights below.
Target: {word_count} words, audience: {audience}
Angle: {angle}
The outline must:
1. Have a specific, compelling headline (not generic — make a clear argument)
2. Open with the most surprising or counterintuitive insight
3. Have 4–6 H2 sections, each anchored to 1–3 specific insights
4. End with a practical takeaway the reader can act on
Return JSON:
{
"headline": "...",
"subheadline": "One sentence hook under the headline",
"sections": [
{
"heading": "Section heading",
"angle": "What this section argues",
"insights": [2, 5, 8],
"word_target": 300
}
]
}
Insights (use insight IDs to reference them):
{insights_json}
You are a technology journalist writing for practitioners who read to learn, not to feel good about AI.
Write the full article following the outline below.
Voice:
- Direct and specific — no filler ("In today's rapidly evolving landscape...")
- Active voice throughout
- Technical depth without jargon for its own sake
- Let the data and insights speak; your job is to frame them clearly
Rules:
- Use ONLY the provided insights as your factual foundation
- When you use an insight, add the insight ID inline: (insight_3)
- Do not add outside knowledge, statistics, or examples not in the insights
- Quotes from the source should be presented as quotes with the speaker's name
Outline: {outline_json}
Insights: {insights_json}
temperature=0.65–0.75 for the prose pass. Low enough for coherence and factual discipline; high enough for natural sentence variety. The outline pass can run at temperature=0.3 since you want a predictable structure.06 Stage 5: Quality Gates
Automated quality gates catch problems before a human ever sees the draft. Run these checks in parallel — they're independent of each other.
| Gate | Type | What it checks | Failure action |
|---|---|---|---|
| Length check | Automated | Word count within 20% of target | Regenerate with explicit word count instruction |
| Citation check | Automated | Every (insight_N) ref maps to a real insight ID | Flag orphaned citations for human review |
| Groundedness check | Automated | LLM verifies claims match source insights | Flag ungrounded claims; request revision |
| Filler detection | Automated | Regex scan for banned phrases | Regenerate opening / affected paragraphs |
| Readability score | Automated | Flesch-Kincaid grade 10–14 | Flag for human review if outside range |
| SEO metadata | Automated | Title <70 chars, meta description 150–160 chars | Regenerate metadata |
import re, asyncio
from dataclasses import dataclass
BANNED_OPENERS = [
"in today's rapidly", "in the ever-evolving", "artificial intelligence is",
"as we stand on the cusp", "the landscape of", "it's no secret that",
"in recent years,", "needless to say,",
]
@dataclass
class GateResult:
gate: str
passed: bool
message: str
def check_length(draft: str, target_words: int) -> GateResult:
count = len(draft.split())
within_range = abs(count - target_words) / target_words < 0.20
return GateResult("length", within_range,
f"{count} words (target: {target_words})")
def check_citations(draft: str, insight_ids: set[int]) -> GateResult:
refs = set(int(m) for m in re.findall(r"\(insight_(\d+)\)", draft))
orphans = refs - insight_ids
return GateResult("citations", not orphans,
f"Orphaned refs: {orphans}" if orphans else "All citations valid")
def check_filler(draft: str) -> GateResult:
opening = draft[:300].lower()
hits = [p for p in BANNED_OPENERS if p in opening]
return GateResult("filler", not hits,
f"Found: {hits}" if hits else "No filler detected")
async def check_groundedness(draft: str, insights_json: str) -> GateResult:
verdict_raw = await llm_async(
system=(
"You are a fact-checker. Review the article draft against the provided insights. "
"Identify any factual claims in the draft NOT supported by the insights. "
"Return JSON: {\"ungrounded\": [\"claim 1\", \"claim 2\"], \"passed\": true|false}"
),
user=f"DRAFT:\n{draft}\n\nINSIGHTS:\n{insights_json}",
)
verdict = json.loads(verdict_raw)
return GateResult("groundedness", verdict["passed"],
str(verdict.get("ungrounded", [])))
async def run_quality_gates(draft: str, target_words: int,
insight_ids: set[int], insights_json: str) -> list[GateResult]:
# Run automated checks + async groundedness check in parallel
auto_results = [
check_length(draft, target_words),
check_citations(draft, insight_ids),
check_filler(draft),
]
ground_result = await check_groundedness(draft, insights_json)
return auto_results + [ground_result]
Handling gate failures
async def handle_gate_failures(
draft: str,
gates: list[GateResult],
insights_json: str,
target_words: int,
) -> str:
failed = [g for g in gates if not g.passed]
if not failed:
return draft # All gates passed
# Build revision instructions from failed gates
issues = "\n".join(f"- {g.gate}: {g.message}" for g in failed)
revised = await llm_async(
system=(
"You are an editor. Revise the article to fix the listed issues. "
"Preserve all content that doesn't need to change. "
"Do not add outside knowledge."
),
user=(
f"ISSUES TO FIX:\n{issues}\n\n"
f"TARGET WORD COUNT: {target_words}\n\n"
f"INSIGHTS (for reference):\n{insights_json}\n\n"
f"ARTICLE TO REVISE:\n{draft}"
),
temperature=0.4,
)
return revised
07 Stage 6: Human-in-the-Loop Review
Automation handles production. Humans handle judgment. The review checkpoint is where the editor decides if the article is worth publishing and makes the changes only a human should make.
What the editor does here
- Headline: Is it specific, interesting, and true to the content? Rewrite if needed.
- Opening: Does the first paragraph earn the reader's attention? This is the highest-leverage edit a human can make.
- Factual spot-check: Verify 2–3 of the most surprising claims against the source transcript or external sources.
- Tone alignment: Does this sound like a human expert, or does it feel AI-generated? Flag flat, generic paragraphs for rewriting.
- Kill decision: Is this article worth publishing? Not everything extracted is worth writing about. Kill it here rather than after publishing.
Async review pattern
Don't block the pipeline waiting for human review. Store the draft and notify the reviewer asynchronously — the pipeline resumes when the reviewer signals approval:
import json, time
from pathlib import Path
REVIEW_QUEUE = Path("./review_queue")
REVIEW_QUEUE.mkdir(exist_ok=True)
def submit_for_review(state: dict) -> str:
"""Save draft to review queue. Returns the review ID."""
review_id = f"review_{int(time.time())}_{state['run_id'][:8]}"
review_path = REVIEW_QUEUE / f"{review_id}.json"
review_path.write_text(json.dumps({
"review_id": review_id,
"status": "pending",
"submitted_at": time.time(),
"headline": state["outline"]["headline"],
"draft": state["draft"],
"gate_results": [g.__dict__ for g in state["gate_results"]],
"insights": state["filtered_insights"],
"source_url": state["source"]["url"],
}, indent=2))
return review_id
def get_review_status(review_id: str) -> dict:
"""Check if a review has been completed."""
review_path = REVIEW_QUEUE / f"{review_id}.json"
if not review_path.exists():
raise FileNotFoundError(f"Review {review_id} not found")
return json.loads(review_path.read_text())
def approve_review(review_id: str, editor_notes: str = "") -> dict:
"""Mark a review as approved. Call this from the review UI or CLI."""
data = get_review_status(review_id)
data["status"] = "approved"
data["approved_at"] = time.time()
data["editor_notes"] = editor_notes
path = REVIEW_QUEUE / f"{review_id}.json"
path.write_text(json.dumps(data, indent=2))
return data
08 Stage 7: Publish & Distribute
After approval, the pipeline prepares the article for publication: generates SEO metadata, converts to CMS format, creates social distribution copy, and posts via API or webhook.
SEO metadata generation
You are an SEO specialist. Generate metadata for the article below.
Return ONLY JSON:
{
"title": "Title tag (max 60 chars, include primary keyword)",
"meta_description": "Meta description (150-160 chars, include keyword, ends with action hook)",
"primary_keyword": "Main keyword phrase to target",
"secondary_keywords": ["keyword 2", "keyword 3", "keyword 4"],
"slug": "url-friendly-slug",
"og_title": "Open Graph title (can be more engaging than title tag)",
"og_description": "OG description (1-2 sentences, compelling for social sharing)",
"schema_type": "Article",
"featured_image_prompt": "Prompt for DALL-E or Midjourney to generate a relevant header image"
}
ARTICLE:
{approved_draft}
Social distribution copy
You are a social media writer. Write distribution copy for the article below.
Return JSON:
{
"twitter_thread": [
"Tweet 1 — hook (max 240 chars)",
"Tweet 2 — key insight 1",
"Tweet 3 — key insight 2",
"Tweet 4 — CTA with article link placeholder: {URL}"
],
"linkedin_post": "3-4 paragraph LinkedIn post. Professional tone. End with question to spark comments.",
"newsletter_subject": "Email subject line (A/B test option A)",
"newsletter_subject_b": "Email subject line (A/B test option B)",
"newsletter_preview": "Preview text for email clients (90-140 chars)"
}
ARTICLE:
{approved_draft}
CMS publishing via API
import jwt, time, requests
def publish_to_ghost(
draft: str,
seo: dict,
ghost_url: str,
admin_api_key: str,
status: str = "draft", # "draft" | "published"
) -> dict:
# Ghost Admin API uses JWT auth
key_id, secret = admin_api_key.split(":")
payload = {
"iat": int(time.time()),
"exp": int(time.time()) + 300,
"aud": "/admin/",
}
token = jwt.encode(payload, bytes.fromhex(secret), algorithm="HS256",
headers={"kid": key_id})
post_data = {
"posts": [{
"title": seo["og_title"],
"slug": seo["slug"],
"html": draft,
"status": status,
"meta_title": seo["title"],
"meta_description": seo["meta_description"],
"og_title": seo["og_title"],
"og_description": seo["og_description"],
"custom_excerpt": seo["meta_description"],
}]
}
resp = requests.post(
f"{ghost_url}/ghost/api/admin/posts/",
json=post_data,
headers={"Authorization": f"Ghost {token}"},
timeout=30,
)
resp.raise_for_status()
return resp.json()["posts"][0]
09 Orchestration: Putting It Together
The full pipeline runs as a single async function that carries a PipelineState object through all stages. Each stage updates the state and persists it to disk — enabling resume-from-checkpoint if any stage fails.
import asyncio, json
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class PipelineState:
run_id: str
source_url: str
status: str = "pending" # pending|extracting|awaiting_review|drafting|quality|awaiting_publish|done|failed
source: dict = field(default_factory=dict)
cleaned_text: str = ""
extraction: dict = field(default_factory=dict)
filtered_insights: list = field(default_factory=list)
outline: dict = field(default_factory=dict)
draft: str = ""
gate_results: list = field(default_factory=list)
seo: dict = field(default_factory=dict)
social_copy: dict = field(default_factory=dict)
review_id: str = ""
published_url: str = ""
errors: list = field(default_factory=list)
def save(self, state_dir: Path = Path("./pipeline_states")):
state_dir.mkdir(exist_ok=True)
path = state_dir / f"{self.run_id}.json"
path.write_text(json.dumps(self.__dict__, indent=2, default=str))
async def run_pipeline(source_url: str, config: dict) -> PipelineState:
import uuid
state = PipelineState(run_id=str(uuid.uuid4()), source_url=source_url)
state.save()
try:
# Stage 1: Ingest
state.status = "ingesting"
state.source = ingest_youtube(source_url) if "youtube.com" in source_url \
else ingest_url(source_url)
state.save()
# Stage 2: Process
state.status = "processing"
state.cleaned_text = await process_transcript(state.source)
state.save()
# Stage 3: Extract (uses smarter model)
state.status = "extracting"
extraction_raw = await llm_async(
EXTRACT_SYSTEM, state.cleaned_text, model="claude-sonnet-4"
)
state.extraction = json.loads(extraction_raw)
state.filtered_insights = filter_insights(state.extraction, min_score=3.0)
state.status = "awaiting_angle" # Human checkpoint 1
state.save()
return state # Pipeline pauses here — resume after human selects angle
except Exception as e:
state.status = "failed"
state.errors.append(str(e))
state.save()
raise
async def resume_after_angle(state: PipelineState, selected_insight_ids: list[int],
angle: str, target_words: int) -> PipelineState:
"""Resume pipeline after human selects angle at checkpoint 1."""
try:
selected = [i for i in state.filtered_insights if i["id"] in selected_insight_ids]
insights_json = json.dumps(selected)
# Stage 4: Draft
state.status = "drafting"
outline_raw = await llm_async(OUTLINE_SYSTEM.format(
angle=angle, target_words=target_words
), insights_json, model="claude-sonnet-4")
state.outline = json.loads(outline_raw)
state.draft = await llm_async(
DRAFT_SYSTEM, f"OUTLINE:\n{outline_raw}\n\nINSIGHTS:\n{insights_json}",
model="claude-sonnet-4", temperature=0.7
)
state.save()
# Stage 5: Quality gates
state.status = "quality_check"
insight_ids = {i["id"] for i in selected}
gate_results = await run_quality_gates(
state.draft, target_words, insight_ids, insights_json
)
state.gate_results = [g.__dict__ for g in gate_results]
failed = [g for g in gate_results if not g.passed]
if failed:
state.draft = await handle_gate_failures(
state.draft, failed, insights_json, target_words
)
state.save()
# Stage 6: Submit for human review (checkpoint 2)
state.status = "awaiting_review"
state.review_id = submit_for_review(state.__dict__)
state.save()
return state # Pauses again — resume after editor approves
except Exception as e:
state.status = "failed"
state.errors.append(str(e))
state.save()
raise
async def resume_after_approval(state: PipelineState) -> PipelineState:
"""Resume pipeline after editor approves the draft."""
review = get_review_status(state.review_id)
approved_draft = review.get("edited_draft") or state.draft
# Stage 7: Publish
state.status = "publishing"
seo_raw = await llm_async(SEO_SYSTEM, approved_draft)
state.seo = json.loads(seo_raw)
social_raw = await llm_async(SOCIAL_SYSTEM, approved_draft)
state.social_copy = json.loads(social_raw)
result = publish_to_ghost(approved_draft, state.seo,
GHOST_URL, GHOST_KEY, status="draft")
state.published_url = result["url"]
state.status = "done"
state.save()
return state
PipelineState is serialised after every stage, a crash or API timeout doesn't restart the whole run. Load the saved JSON, inspect status, and resume from the appropriate function. This is essential for pipelines that include human-wait steps.10 Common Failure Modes
PipelineState to disk (or a database) after every stage completion. Check for an existing state file at the start of each run to enable resumption.Take the Pipeline Templates
Download the complete content pipeline reference — all stage prompts, the Python orchestrator, quality gate runner, review queue pattern, and the full checklist — in one Markdown file.