Appearance
Architektura Modułu MarketerAI - Przepływ Informacji (Flow)
Spis Treści
- Przegląd
- Inicjalizacja Konwersacji
- Streaming Odpowiedzi AI
- Wywołanie Funkcji
- Mechanizm Subagentów
- System Potwierdzeń
- Diagram Sekwencji
- Przykładowe Scenariusze
Przegląd
Ten dokument opisuje szczegółowy przepływ informacji od momentu gdy użytkownik wysyła wiadomość, przez przetwarzanie przez AI, wywołania funkcji, aż do otrzymania odpowiedzi.
Inicjalizacja Konwersacji
Scenariusz 1: Nowa Konwersacja
Endpoint: POST /api/projects/{project}/agents/{agent}/chat
Przepływ Requestu
Frontend wysyła:
POST request z body zawierającym wiadomość użytkownika ("Show me top 10 products")Kontroler (SembotChatController.create()):
- Rozpoczyna transakcję bazodanową
- Tworzy nowy ChatThread z user_id, project_id, agent_id i tytułem (obcięty do 200 znaków)
- Aktualizuje LastAgentUsage (dla statystyk ostatniego użycia agenta)
- Commituje transakcję
- Dispatchuje job ChatStream do kolejki asynchronicznej
- Zwraca natychmiast 200 OK (pusta odpowiedź)
Frontend:
- Otrzymuje 200 OK
- Nasłuchuje na prywatnym kanale WebSocket:
App.Chat.{userId}.{projectId} - Czeka na eventy: ThreadStart, StreamMessageChunk, ToolCall, ToolResponse, MessageEnd, ThreadEnd
Kluczowy moment:
Odpowiedź HTTP jest natychmiastowa - nie czekamy na przetworzenie przez AI. Całą komunikację kontynuujemy przez WebSocket w czasie rzeczywistym.
Scenariusz 2: Kontynuacja Konwersacji
Endpoint: POST /api/projects/{project}/chat-threads/{thread}
Przepływ:
Identyczny jak Scenariusz 1, ale zamiast tworzyć nowy ChatThread, używamy istniejącego. Historia konwersacji jest zachowana - AI widzi wszystkie poprzednie wiadomości, tool calls i tool responses.
Przykład:
Jeśli w pierwszej wiadomości użytkownik pytał "Show top 10 products", a AI pokazał listę, to w drugiej wiadomości użytkownik może napisać "Change labels for first 5" - AI wie które produkty (z poprzedniej odpowiedzi).
Streaming Odpowiedzi AI
Krok 1: ChatStream Job Startuje
Co się dzieje:
- Job jest wyciągany z kolejki Redis przez queue worker
- Broadcastuje event
ThreadStart(frontend pokazuje loading indicator) - Inicjalizuje ChatService i ChatMessageRepository
- Sprawdza czy to wznowienie po confirmation (jeśli tak → inna logika)
- Jeśli to UserMessage object (predefiniowana wiadomość) → generuje jej treść przez AI
- Auto-reject pending confirmations: Jeśli istnieje stare pending confirmation w tym wątku lub subthreadach, automatycznie je odrzuca i informuje AI że user wysłał nową wiadomość
- Tworzy USER_MESSAGE w bazie danych
- Wywołuje ChatService.handleStreamedAgent()
Krok 2: ChatService Wybiera API Format
Decyzja:
- Jeśli provider=OpenAI AND config('ai.default_api')='responses' → Responses API
- W przeciwnym razie → Completions API
Dlaczego ma znaczenie:
Responses API jest nowszy (2024), ma lepszą strukturę eventów i wspiera structured outputs. Completions API jest starszy ale bardziej uniwersalny (działa też z Groq, OpenRouter).
Krok 3: OpenAiService Tworzy Stream
Co się dzieje:
- Pobiera historię wiadomości z ChatMessageRepository (ostatnie N wiadomości user/assistant/tool_call/tool_response)
- Dodaje system prompt agenta na początku
- Pobiera dostępne tools (funkcje z bazy + runtime functions jak searchKnowledgeBase)
- Moderuje ostatnią wiadomość użytkownika (sprawdza czy nie zawiera hate speech/violence)
- Wywołuje OpenAI API z stream=true
- Zwraca StreamResponse - iterator który będzie produkował chunk po chunku
Krok 4: ChatService Przetwarza Stream
Dla każdego chunka:
Jeśli to tekst (delta):
- Akumuluje w zmiennej lokalnej
- Broadcastuje
StreamMessageChunkevent (frontend dodaje do displayed message)
Jeśli to tool call delta:
- Akumuluje ID, nazwę funkcji, argumenty JSON
- Broadcastuje
FunctionGenerationStart(na początku) - Broadcastuje
FunctionArgumentsChunkdla każdego fragmentu argumentów
Jeśli finish_reason='stop':
- Zapisuje ASSISTANT_MESSAGE w bazie
- Broadcastuje
MessageEnd - Koniec iteracji
Jeśli finish_reason='tool_calls':
- Zapisuje ASSISTANT_TOOL_CALL w bazie
- Wywołuje handleFunctionCalling()
- Jeśli funkcja zwróciła że potrzebna kolejna iteracja → tworzy nowy stream z tool_response w kontekście
Wywołanie Funkcji
Przepływ Tool Call
Przykład: AI wywołuje getTopProducts(limit=10, sortBy="revenue")
handleFunctionCalling otrzymuje:
- toolCallId: "call_abc123"
- functionName: "getTopProducts"
- arguments:
Broadcast ToolCall event:
Frontend pokazuje "Calling getTopProducts..." z argumentamicallChatFunction() decyduje:
- Czy to runtime function? (isRuntimeFunction("getTopProducts") → false)
- Pobiera ChatFunction z bazy dla tego agenta
- Czy ma subagent_id? (nie)
- Wywołuje executeChatFunctionClass()
executeChatFunctionClass():
- Laravel DI tworzy instancję
App\Services\Ai\Chat\ChatFunctions\Products\GetTopProducts - Wstrzykuje User, Project, Agent przez konstruktor
- Waliduje argumenty zgodnie z rules() metod (limit: integer|min:1|max:100, sortBy: in:revenue,units,clicks)
- Jeśli validation fails → zwraca "Validation failed: limit must be between 1 and 100"
- Jeśli OK → wywołuje handle($validated, $additionalData)
- Laravel DI tworzy instancję
GetTopProducts.handle():
- Wykonuje query do bazy: pobiera produkty projektu, sortuje po revenue DESC, limit 10
- Formatuje wynik jako tekst: "Product 1: iPhone 15 Pro, Revenue: $52,450, Units: 124\nProduct 2: ..."
- Opcjonalnie dodaje $additionalData['table_data'] z strukturą tabeli dla frontendu
- Zwraca tekstową odpowiedź
Zapis i broadcast:
- Tworzy TOOL_RESPONSE message w bazie z toolCallId="call_abc123" i response
- Broadcastuje
ToolResponseevent - Frontend może pokazać tabelę (jeśli są additionalData)
Kolejna iteracja:
- handleFunctionCalling zwraca true (potrzebna kolejna iteracja)
- handleCompletionsStream tworzy nowy stream
- AI teraz widzi tool_response w kontekście i może odpowiedzieć użytkownikowi:
"Oto top 10 produktów według przychodu: [formatowanie wyników]..."
Mechanizm Subagentów
Kiedy Używany
Gdy funkcja ma ustawione subagent_id - zamiast wykonywać kod PHP, delegujemy do specjalistycznego agenta AI.
Przykład użycia:
Agent główny (General Assistant) ma funkcję "manageProducts" która deleguje do subagenta "Product Manager". Product Manager ma własny system prompt, własne funkcje (getTopProducts, changeCustomLabel, duplicateProducts), i własny model AI.
Przepływ Subagenta
User → Main Agent: "Manage products: change labels for top 10"
Main Agent wywołuje:
manageProducts(user_request="change labels for top 10")handleSubagentFunction():
- Pobiera subagent (Product Manager) z function.subagent
- Przygotowuje custom system prompt (jeśli jest w function.subagent_system_prompt)
- Przygotowuje user message z placeholderami (function.subagent_user_message → "User wants to: {user_request}")
- Sprawdza preserve_previous_context parametr
SubagentThreadService:
- Jeśli preserve_previous_context=true → szuka istniejącego subthreadu
- Jeśli znajdzie → zwraca go (kontynuacja z kontekstem)
- Jeśli nie lub preserve=false → tworzy nowy subthread z parent_thread_id
Tworzenie struktury:
- Zapisuje AGENT_SUBTHREAD message w parent thread (link do subthreadu)
- Broadcastuje
AgentSubthreadStart(frontend tworzy zagnieżdżony kontener) - Tworzy USER_MESSAGE w subthread
- Broadcastuje
SubthreadUserMessage
Rekursja:
- ChatService.handleStreamedAgent() jest wywołany REKURSYWNIE dla subagenta
- Subagent ma własny streaming, własne tool calls, własne eventy
- Wszystkie eventy zawierają subthread_id żeby frontend wiedział gdzie je wyświetlać
Subagent pracuje:
- Wywołuje getTopProducts(limit=10) → dostaje listę
- Wywołuje changeCustomLabel(products=[1,2,3,4,5,6,7,8,9,10], label="Sale")
- STOP - funkcja wymaga confirmation!
Pending w subthreadzie:
- Tworzy PENDING_USER_CONFIRMATION w subthread
- Broadcastuje
ActionRequiresConfirmationze szczegółami - Job kończy się (return)
User zatwierdza:
- Frontend wysyła POST /confirmations/{id} z approved=true
- Dispatchuje nowy ChatStream job z pendingConfirmation + approved flag
Wznowienie subthreadu:
- handleConfirmationResume() wykonuje changeCustomLabel
- Broadcastuje
ToolResponsedla subthreadu - Kontynuuje streaming subagenta
- Subagent odpowiada: "Changed custom_label_0 to 'Sale' for 10 products"
- Kończy się (no more tool calls, finish_reason='stop')
Powrót do parent:
- continueParentThreads() jest wywołane
- createToolResponseInParentThread() - wyciąga ostatnią odpowiedź subagenta i zapisuje jako tool_response w parent thread
- Kontynuuje streaming main agenta z tool_response w kontekście
- Main agent odpowiada: "Done! I've managed your products - changed labels for top 10."
Hierarchia:
ChatThread (id=1, main thread)
├─ USER_MESSAGE: "Manage products..."
├─ ASSISTANT_TOOL_CALL: manageProducts()
├─ AGENT_SUBTHREAD: → subthread_id=2
│
└─ ChatThread (id=2, subthread, parent_thread_id=1)
├─ USER_MESSAGE: "User wants to: change labels for top 10"
├─ ASSISTANT_MESSAGE: "Let me get top products..."
├─ ASSISTANT_TOOL_CALL: getTopProducts()
├─ TOOL_RESPONSE: "Product 1: ..., Product 2: ..."
├─ ASSISTANT_TOOL_CALL: changeCustomLabel()
├─ PENDING_USER_CONFIRMATION: [PAUSE]
├─ TOOL_RESPONSE: "Changed labels for 10 products"
└─ ASSISTANT_MESSAGE: "Changed custom_label_0 to 'Sale' for 10 products"
├─ TOOL_RESPONSE: "Changed custom_label_0 to 'Sale' for 10 products" (from subthread)
└─ ASSISTANT_MESSAGE: "Done! I've managed your products..."System Potwierdzeń
Trigger Confirmation
Warunek: ChatFunction.requires_user_confirmation = true
Przykład: changeCustomLabel, editCampaignStatus, addNegativeKeywords, createTask
Przepływ
- AI wywołuje funkcję wymagającą confirmation
- handleFunctionCalling() wykrywa: function.requires_user_confirmation === true
- Tworzy PENDING_USER_CONFIRMATION message:
- Zawiera: tool_call_id, function_name, arguments, thread_context (thread_id, parent_thread_id)
- Broadcastuje ActionRequiresConfirmation:
- Frontend pokazuje modal: "Change custom_label_0 to 'Sale' for 5 products?" [Cancel] [Confirm]
- Job kończy się - return (nie wykonuje funkcji, nie kontynuuje)
User Decision
Cancel:
- POST /confirmations/{id} z approved=false
- Nowy job z pendingConfirmation + approved=false
- handleConfirmationResume() tworzy tool_response: "User declined this action."
- Broadcastuje
ActionRejected - Kontynuuje streaming - AI wie że user odrzucił i odpowiada np. "Okay, I won't change the labels."
Confirm:
- POST /confirmations/{id} z approved=true
- Nowy job z pendingConfirmation + approved=true
- handleConfirmationResume() WYKONUJE funkcję
- Broadcastuje
ActionConfirmed - Broadcastuje
ToolResponsez wynikiem - Kontynuuje streaming - AI widzi wynik i odpowiada np. "Done! Changed labels for 5 products."
Auto-Reject
Sytuacja: User ma pending confirmation ale wysyła nową wiadomość zamiast potwierdzić/odrzucić.
Rozwiązanie:
- ChatStream.handle() na początku sprawdza getActivePendingConfirmationInThreadTree()
- Jeśli znajdzie → automatycznie odrzuca (declinePendingAction)
- Tworzy tool_response: "Previous action was automatically declined because user sent a new message."
- Broadcastuje
ActionRejectedz automaticRejection=true - Kontynuuje z aktualną thread (tej ze starym pending) - AI dostaje info że user odrzucił
- Następnie przetwarza nową wiadomość użytkownika normalnie
Dlaczego:
User zmienił zdanie - zamiast potwierdzić "change labels" wysłał "show me analytics". Nie zostawiamy wiszących confirmations.
Diagram Sekwencji
Pełna Konwersacja z Subagentem i Potwierdzeniem
Przykładowe Scenariusze
Scenariusz 1: Prosta Funkcja bez Potwierdzenia
User: "Show me top 5 products"
Timeline:
- 0ms: ThreadStart
- 50ms: StreamMessageChunk "Let"
- 55ms: StreamMessageChunk " me"
- 100ms: FunctionGenerationStart
- 150ms: FunctionArgumentsChunk (JSON fragmenty)
- 200ms: ToolCall getTopProducts
- 1500ms: ToolResponse (z danymi produktów)
- 1600ms: StreamMessageChunk "Here"
- 1700ms: MessageEnd
- 1750ms: ThreadEnd
- 1800ms: ProposeActions
Wynik: User widzi listę 5 produktów + sugestie ("Analyze trends", "Change prices")
Scenariusz 2: Funkcja z Potwierdzeniem
User: "Change custom_label_0 to 'Sale' for products 1, 2, 3"
Timeline:
- 0ms: ThreadStart
- 100ms: FunctionGenerationStart
- 200ms: ToolCall changeCustomLabel
- 250ms: ActionRequiresConfirmation → PAUSE
- [User interaction 5000ms]
- 5000ms: User clicks Confirm
- 5010ms: ThreadStart (resume)
- 5200ms: ToolResponse "Changed labels for 3 products"
- 5300ms: StreamMessageChunk "Done!"
- 5400ms: ThreadEnd
Wynik: Labele zmienione po potwierdzeniu
Scenariusz 3: Subagent z Zagnieżdżonym Potwierdzeniem
User: "Manage products: change label for top 10"
Flow:
- Main thread → wywołuje manageProducts
- Subthread tworzony → Product Manager
- Subagent wywołuje getTopProducts → sukces
- Subagent wywołuje changeCustomLabel → wymaga confirmation
- PAUSE w subthreadzie
- User potwierdza
- Subthread kończy się
- Parent thread kontynuuje
- Main agent odpowiada
Thread Structure:
- Main Thread (id=1)
- Subthread (id=2, parent=1) ← tutaj było confirmation
- Wynik zapisany w obu threadach
Data utworzenia: 2025-11-26
Wersja: 1.0