Как создать полезный сайт за четыре шага
Описание
В этой статье мы создадим веб-сайт, на котором будут собраны последние новости из Hacker News. Для этого мы будем использовать API HackerNews для получения главных новостей сегодняшнего дня. Кроме того, мы сделаем запрос API OpenAI для группировки новостных статей по темам, сохраняя результаты в формате JSON. Веб-сайт будет обслуживаться с использованием FastAPI и Jinja Template Engine.
Шаг 1. Получайте главные новости от Hacker News
Чтобы увидеть полный список кода, проверьте файл
worker.py
в репозитории GitHub
Во-первых, давайте получим идентификаторы историй в виде списка целых чисел.
def get_topstories(max_stories=30): # Get top stories topstories = requests.get("https://hacker-news.firebaseio.com/v0/topstories.json") if (code := topstories.status_code) != 200: raise ValueError(f"topstories status code: {code}") topstories_ids = topstories.json() # Filter stores return topstories_ids[:max_stories] # i.e. [3000, 3004, 3051]
Как можно заметить, мы ограничим количество анализируемых историй параметром max_stories=30
Сложность заключается в том, как выполнить все 30 запросов асинхронно. Мы будем использовать aiohttp
и создадим файл helpers.py
для добавления следующих функций:
import aiohttp import asyncio BATCH_SIZE = 15 async def fetch_url(session, url): async with session.get(url) as response: return await response.json() async def process_batch(session, urls): tasks = [] for url in urls: task = asyncio.ensure_future(fetch_url(session, url)) tasks.append(task) return await asyncio.gather(*tasks) async def process_urls(urls, batch_size=BATCH_SIZE): async with aiohttp.ClientSession() as session: batches = [urls[i : i + batch_size] for i in range(0, len(urls), batch_size)] results = [] for batch in batches: batch_results = await process_batch(session, batch) results.extend(batch_results) return results
Теперь мы можем передать список URL-адресов в process_urls
для обработки всех запросов с использованием асинхронного подхода.
Давайте подготовим URL-адреса, используя list_of_items
:
def get_items(list_of_items: List[int], batch_size=12): # Prepare API requests to get all items URL_ITEM = "https://hacker-news.firebaseio.com/v0/item/{}.json" urls = [URL_ITEM.format(t_s) for t_s in list_of_items] loop = asyncio.get_event_loop() results = loop.run_until_complete(process_urls(urls, batch_size)) return results list_of_items = get_topstories() results = get_items(list_of_items) # Now we have a list of urls: # ["https://hacker-news.firebaseio.com/v0/item/3001.json", # "https://hacker-news.firebaseio.com/v0/item/4001.json", # ...]
Далее мы преобразуем полученные результаты в формат, который легко анализировать для запросов ChatGPT. Мы сохраним поля «название» и «URL», поскольку URL-адрес может предоставить ценную информацию для классификации элементов.
results_parsed = [ f"{el['title']} URL: {el['url']}" for el in results if el.get("url", None) is not None ] # The result will be: # ["The Password Game URL: https://neal.fun/password-game/", # "FreeBSD Jails Containers URL: https://vermaden.wordpress.com/2023/06/28/freebsd-jails-containers/" # ...]
Шаг 2. Делайте запросы к API OpenAI и обрабатывайте результаты
Во-первых, давайте создадим функцию с именем get_openai_promt
. Он принимает List[strings]
в качестве входных данных и возвращает system_message
и user_message
(мы будем использовать модели, оптимизированные для чата).
from typing import List, Tuple def get_openai_prompt(topics: List[str]) -> Tuple[dict, dict]: system_message = { "role": "system", "content": ( "You are an assistant that can group news articles from hackernews (news.ycombinator.com) into topics" ), } user_message = { "role": "user", "content": ( "Group the following news articles into topics\n\n" + topics + "\n\nUse the following format:\n" + "topic_name_1\n- title\turl\n- title\turl\ntopic_name_2\n\ttitle\turl" ), } return system_message, user_message
Следующий шаг — запросить OpenAI через API, проанализировать ответ и сохранить его как json
file.
import openai topics = "\n\n".join(results_parsed) s_m, u_m = get_openai_prompt(topics=topics) # system & user messages # Get an API-key here: https://platform.openai.com/account/api-keys openai.api_key = "sk-74xTNuflpF3CtQAdOeD3T3BlXkFJhYw70q1XYJKxqq0XdBZS" # Get response from the model response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[s_m, u_m], max_tokens=2200, # You can increase this number if needed ) # Get a body of the response res = response["choices"][0]["message"]["content"].split("\n") # Parse results # Sometimes response may be structured in different current_topic = None dict_ = {} titles_returned = {} for l in res: if l == "\n": # We will ignore empty strings continue if not ("http://" in l.lower() or "https://" in l.lower()): # If there is no link in the string it means that the string is a "topic" current_topic = l continue # Otherwise current string is a title that contains a link as well if current_topic not in dict_: dict_[current_topic] = {} pattern = r"- (.+?)\s*URL:" pattern2 = r"- (.+?)\s*http" match = re.search(pattern, l) match2 = re.search(pattern2, l) if match: substring = str(match.group(1)) titles_returned[substring] = current_topic elif match2: substring = str(match2.group(1)) titles_returned[substring] = current_topic else: print(l) data = {} for r in results: if "url" not in r or "score" not in r: print("Skip") continue data[r["title"]] = {"url": r["url"], "score": r["score"]} for k in data: if k in titles_returned: data[k]["topic"] = titles_returned[k] continue data[k]["topic"] = "Other" prefix = datetime.datetime.now().strftime("%Y-%m-%d") fname = f"data/{prefix}_articles.json" json.dump(data, open(fname, "w"))
Скрипт сгенерирует JSON, подобный приведенному ниже:
Теперь мы готовы использовать этот JSON для использования на нашем веб-сайте.
Шаг 3. Веб-сайт (шаблоны FastAPI + Jinja)
Чтобы просмотреть полный список кода, проверьте файл
app/app.py
в репозитории GitHub.
Создадим файл app.py
в папке app
.
import json from collections import defaultdict import uvicorn from fastapi import FastAPI, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import glob app = FastAPI() app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") @app.get("/") def get_articles(request: Request): fname = sorted(glob.glob("data/*_articles.json"), reverse=True)[0] with open(fname, "r") as json_file: articles = json.load(json_file) grouped_articles = {} for title, article in articles.items(): topic = article["topic"] if topic in grouped_articles: grouped_articles[topic][title] = article else: grouped_articles[topic] = {title: article} # Calculate total score for each topic/group topic_scores = defaultdict(lambda: 0) for topic, data in articles.items(): topic_scores[data["topic"]] += data["score"] return templates.TemplateResponse( "index.html", { "request": request, "articles": grouped_articles, "topic_scores": topic_scores, }, ) if __name__ == "__main__": uvicorn.run("app:app", host="127.0.0.1", port=5556, reload=True)
Было бы идеально не читать файл .json каждый раз из файловой системы, а вместо этого хранить его в памяти, изредка обновляя. Однако для простоты кода в нашей конкретной ситуации мы выбрали самый простой код, который выполняет свою задачу. Мы ожидаем, что нагрузка на сайт будет минимальной, менее одного запроса в секунду (RPS).
Теперь подготовим файлы index.html
и styles.css
index.html
стили.css
Шаг 4. Запустите и посмотрите результаты
Чтобы одновременно запустить оба скрипта, app.py
для веб-сервера и worker.py
для взаимодействия с внешним API, мы можем использовать tmux.
Чтобы запустить сервер, используйте следующую команду
uvicorn app.app:app --port 5556
Чтобы запустить воркера, используйте команду ниже
while true; do python3 worker.py; ls data/*; sleep 12h; done
Теперь вы можете открыть свой любимый браузер и проверить результат: http://localhost:5556
или протестировать производственную версию по адресу https://betterhacker.news
.
Спасибо, что нашли время прочитать это. Вы можете найти полный код, доступный на GitHub.