An Implementation Guide to Build a Modular Conversational AI Agent with Pipecat and HuggingFace
В этой статье мы рассмотрим, как создать функционального разговорного ИИ-агента с использованием фреймворка Pipecat и моделей HuggingFace. Вы узнаете, как настроить конвейер, который объединяет пользовательские классы FrameProcessor для обработки ввода и генерации ответов, а также форматирует и отображает поток разговора.
Установка и настройка
Для начала установим необходимые библиотеки:
!pip install -q pipecat-ai transformers torch accelerate numpy
Затем импортируем нужные компоненты:
import asyncio
import logging
from typing import AsyncGenerator
import numpy as np
Проверка доступных кадров Pipecat
Проверьте, что основные классы Pipecat загружены:
try:
from pipecat.frames.frames import Frame, TextFrame
print("Базовые кадры успешно импортированы")
except ImportError as e:
print(f"Ошибка импорта: {e}")
from pipecat.frames.frames import Frame, TextFrame
Создание разговорного ИИ-агента
Теперь мы реализуем SimpleChatProcessor, который загружает модель DialoGPT-small от HuggingFace для генерации текста и ведет историю разговоров для контекста. Вот как обрабатывается ввод пользователя и генерируются ответы модели:
class SimpleChatProcessor(FrameProcessor):
def __init__(self):
super().__init__()
print("Загрузка модели генерации текста HuggingFace...")
self.chatbot = hf_pipeline(
"text-generation",
model="microsoft/DialoGPT-small",
pad_token_id=50256,
do_sample=True,
temperature=0.8,
max_length=100
)
self.conversation_history = ""
print("Модель чата успешно загружена!")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
user_text = getattr(frame, "text", "").strip()
if user_text and not user_text.startswith("AI:"):
print(f"ПОЛЬЗОВАТЕЛЬ: {user_text}")
try:
input_text = f"{self.conversation_history} User: {user_text} Bot:" if self.conversation_history else f"User: {user_text} Bot:"
response = self.chatbot(input_text, max_new_tokens=50, num_return_sequences=1, temperature=0.7, do_sample=True, pad_token_id=self.chatbot.tokenizer.eos_token_id)
generated_text = response[0]["generated_text"]
ai_response = generated_text.split("Bot:")[-1].strip().split("User:")[0].strip() or "Это интересно! Расскажи подробнее."
self.conversation_history = f"{input_text} {ai_response}"
await self.push_frame(TextFrame(text=f"AI: {ai_response}"), direction)
except Exception as e:
print(f"Ошибка чата: {e}")
await self.push_frame(TextFrame(text="AI: У меня проблемы с обработкой этого. Можешь попробовать переформулировать?"), direction)
else:
await self.push_frame(frame, direction)
Форматирование и отображение ответов
Далее реализуем TextDisplayProcessor для форматирования и отображения ответов ИИ:
class TextDisplayProcessor(FrameProcessor):
def __init__(self):
super().__init__()
self.conversation_count = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TextFrame):
text = getattr(frame, "text", "")
if text.startswith("AI:"):
print(f"{text}")
self.conversation_count += 1
print(f"Обмен {self.conversation_count} завершен\n")
await self.push_frame(frame, direction)
Симуляция разговора
Класс ConversationInputGenerator имитирует сообщения пользователей:
class ConversationInputGenerator:
def __init__(self):
self.demo_conversations = ["Привет! Как дела сегодня?", "Что тебе больше всего нравится обсуждать?", "Расскажи что-нибудь интересное об ИИ?", "Что делает общение увлекательным для тебя?", "Спасибо за отличный разговор!"]
async def generate_conversation(self) -> AsyncGenerator[TextFrame, None]:
print("Запуск симуляции разговора...\n")
for i, user_input in enumerate(self.demo_conversations):
yield TextFrame(text=user_input)
if i < len(self.demo_conversations) - 1:
await asyncio.sleep(2)
Интеграция компонентов
В классе SimpleAIAgent мы объединим обработчик чата, дисплей и генератор ввода в единый конвейер Pipecat:
class SimpleAIAgent:
def __init__(self):
self.chat_processor = SimpleChatProcessor()
self.display_processor = TextDisplayProcessor()
self.input_generator = ConversationInputGenerator()
def create_pipeline(self) -> Pipeline:
return Pipeline([self.chat_processor, self.display_processor])
async def run_demo(self):
print("Демонстрация простого AI-агента на Pipecat")
print("Разговорный AI с HuggingFace")
print("=" * 50)
pipeline = self.create_pipeline()
runner = PipelineRunner()
task = PipelineTask(pipeline)
async def produce_frames():
async for frame in self.input_generator.generate_conversation():
await task.queue_frame(frame)
await task.stop_when_done()
try:
print("Запуск демонстрации разговора...\n")
await asyncio.gather(runner.run(task), produce_frames())
except Exception as e:
print(f"Ошибка демонстрации: {e}")
logging.error(f"Ошибка конвейера: {e}")
print("Демонстрация завершена успешно!")
Заключение
В заключение, мы разработали работающий разговорный ИИ-агент, в котором пользовательские вводы (или сымитированные текстовые кадры) обрабатываются через конвейер, модель HuggingFace DialoGPT генерирует ответы, а результаты отображаются в структурированном формате разговора. Эта реализация демонстрирует, как архитектура Pipecat поддерживает асинхронную обработку, управление состоянием разговора и четкое разделение обязанностей между различными этапами обработки.
С этой основой мы можем интегрировать более продвинутые функции, такие как синхронизация речи в реальном времени, синтез речи, сохранение контекста или использование более мощных модельных бэкендов, при этом сохраняя модульную и расширяемую структуру кода.