Skip to content

Architektura Modułu MarketerAI - Przepływ Informacji (Flow)

Spis Treści

  1. Przegląd
  2. Inicjalizacja Konwersacji
  3. Streaming Odpowiedzi AI
  4. Wywołanie Funkcji
  5. Mechanizm Subagentów
  6. System Potwierdzeń
  7. Diagram Sekwencji
  8. Przykładowe Scenariusze

Przegląd

Ten dokument opisuje szczegółowy przepływ informacji od momentu gdy użytkownik wysyła wiadomość, przez przetwarzanie przez AI, wywołania funkcji, aż do otrzymania odpowiedzi.


Inicjalizacja Konwersacji

Scenariusz 1: Nowa Konwersacja

Endpoint: POST /api/projects/{project}/agents/{agent}/chat

Przepływ Requestu

  1. Frontend wysyła:
    POST request z body zawierającym wiadomość użytkownika ("Show me top 10 products")

  2. Kontroler (SembotChatController.create()):

    • Rozpoczyna transakcję bazodanową
    • Tworzy nowy ChatThread z user_id, project_id, agent_id i tytułem (obcięty do 200 znaków)
    • Aktualizuje LastAgentUsage (dla statystyk ostatniego użycia agenta)
    • Commituje transakcję
    • Dispatchuje job ChatStream do kolejki asynchronicznej
    • Zwraca natychmiast 200 OK (pusta odpowiedź)
  3. Frontend:

    • Otrzymuje 200 OK
    • Nasłuchuje na prywatnym kanale WebSocket: App.Chat.{userId}.{projectId}
    • Czeka na eventy: ThreadStart, StreamMessageChunk, ToolCall, ToolResponse, MessageEnd, ThreadEnd

Kluczowy moment:
Odpowiedź HTTP jest natychmiastowa - nie czekamy na przetworzenie przez AI. Całą komunikację kontynuujemy przez WebSocket w czasie rzeczywistym.


Scenariusz 2: Kontynuacja Konwersacji

Endpoint: POST /api/projects/{project}/chat-threads/{thread}

Przepływ:
Identyczny jak Scenariusz 1, ale zamiast tworzyć nowy ChatThread, używamy istniejącego. Historia konwersacji jest zachowana - AI widzi wszystkie poprzednie wiadomości, tool calls i tool responses.

Przykład:
Jeśli w pierwszej wiadomości użytkownik pytał "Show top 10 products", a AI pokazał listę, to w drugiej wiadomości użytkownik może napisać "Change labels for first 5" - AI wie które produkty (z poprzedniej odpowiedzi).


Streaming Odpowiedzi AI

Krok 1: ChatStream Job Startuje

Co się dzieje:

  1. Job jest wyciągany z kolejki Redis przez queue worker
  2. Broadcastuje event ThreadStart (frontend pokazuje loading indicator)
  3. Inicjalizuje ChatService i ChatMessageRepository
  4. Sprawdza czy to wznowienie po confirmation (jeśli tak → inna logika)
  5. Jeśli to UserMessage object (predefiniowana wiadomość) → generuje jej treść przez AI
  6. Auto-reject pending confirmations: Jeśli istnieje stare pending confirmation w tym wątku lub subthreadach, automatycznie je odrzuca i informuje AI że user wysłał nową wiadomość
  7. Tworzy USER_MESSAGE w bazie danych
  8. Wywołuje ChatService.handleStreamedAgent()

Krok 2: ChatService Wybiera API Format

Decyzja:

  • Jeśli provider=OpenAI AND config('ai.default_api')='responses' → Responses API
  • W przeciwnym razie → Completions API

Dlaczego ma znaczenie:
Responses API jest nowszy (2024), ma lepszą strukturę eventów i wspiera structured outputs. Completions API jest starszy ale bardziej uniwersalny (działa też z Groq, OpenRouter).


Krok 3: OpenAiService Tworzy Stream

Co się dzieje:

  1. Pobiera historię wiadomości z ChatMessageRepository (ostatnie N wiadomości user/assistant/tool_call/tool_response)
  2. Dodaje system prompt agenta na początku
  3. Pobiera dostępne tools (funkcje z bazy + runtime functions jak searchKnowledgeBase)
  4. Moderuje ostatnią wiadomość użytkownika (sprawdza czy nie zawiera hate speech/violence)
  5. Wywołuje OpenAI API z stream=true
  6. Zwraca StreamResponse - iterator który będzie produkował chunk po chunku

Krok 4: ChatService Przetwarza Stream

Dla każdego chunka:

Jeśli to tekst (delta):

  • Akumuluje w zmiennej lokalnej
  • Broadcastuje StreamMessageChunk event (frontend dodaje do displayed message)

Jeśli to tool call delta:

  • Akumuluje ID, nazwę funkcji, argumenty JSON
  • Broadcastuje FunctionGenerationStart (na początku)
  • Broadcastuje FunctionArgumentsChunk dla każdego fragmentu argumentów

Jeśli finish_reason='stop':

  • Zapisuje ASSISTANT_MESSAGE w bazie
  • Broadcastuje MessageEnd
  • Koniec iteracji

Jeśli finish_reason='tool_calls':

  • Zapisuje ASSISTANT_TOOL_CALL w bazie
  • Wywołuje handleFunctionCalling()
  • Jeśli funkcja zwróciła że potrzebna kolejna iteracja → tworzy nowy stream z tool_response w kontekście

Wywołanie Funkcji

Przepływ Tool Call

Przykład: AI wywołuje getTopProducts(limit=10, sortBy="revenue")

  1. handleFunctionCalling otrzymuje:

    • toolCallId: "call_abc123"
    • functionName: "getTopProducts"
    • arguments:
  2. Broadcast ToolCall event:
    Frontend pokazuje "Calling getTopProducts..." z argumentami

  3. callChatFunction() decyduje:

    • Czy to runtime function? (isRuntimeFunction("getTopProducts") → false)
    • Pobiera ChatFunction z bazy dla tego agenta
    • Czy ma subagent_id? (nie)
    • Wywołuje executeChatFunctionClass()
  4. executeChatFunctionClass():

    • Laravel DI tworzy instancję App\Services\Ai\Chat\ChatFunctions\Products\GetTopProducts
    • Wstrzykuje User, Project, Agent przez konstruktor
    • Waliduje argumenty zgodnie z rules() metod (limit: integer|min:1|max:100, sortBy: in:revenue,units,clicks)
    • Jeśli validation fails → zwraca "Validation failed: limit must be between 1 and 100"
    • Jeśli OK → wywołuje handle($validated, $additionalData)
  5. GetTopProducts.handle():

    • Wykonuje query do bazy: pobiera produkty projektu, sortuje po revenue DESC, limit 10
    • Formatuje wynik jako tekst: "Product 1: iPhone 15 Pro, Revenue: $52,450, Units: 124\nProduct 2: ..."
    • Opcjonalnie dodaje $additionalData['table_data'] z strukturą tabeli dla frontendu
    • Zwraca tekstową odpowiedź
  6. Zapis i broadcast:

    • Tworzy TOOL_RESPONSE message w bazie z toolCallId="call_abc123" i response
    • Broadcastuje ToolResponse event
    • Frontend może pokazać tabelę (jeśli są additionalData)
  7. Kolejna iteracja:

    • handleFunctionCalling zwraca true (potrzebna kolejna iteracja)
    • handleCompletionsStream tworzy nowy stream
    • AI teraz widzi tool_response w kontekście i może odpowiedzieć użytkownikowi:
      "Oto top 10 produktów według przychodu: [formatowanie wyników]..."

Mechanizm Subagentów

Kiedy Używany

Gdy funkcja ma ustawione subagent_id - zamiast wykonywać kod PHP, delegujemy do specjalistycznego agenta AI.

Przykład użycia:
Agent główny (General Assistant) ma funkcję "manageProducts" która deleguje do subagenta "Product Manager". Product Manager ma własny system prompt, własne funkcje (getTopProducts, changeCustomLabel, duplicateProducts), i własny model AI.

Przepływ Subagenta

User → Main Agent: "Manage products: change labels for top 10"

  1. Main Agent wywołuje: manageProducts(user_request="change labels for top 10")

  2. handleSubagentFunction():

    • Pobiera subagent (Product Manager) z function.subagent
    • Przygotowuje custom system prompt (jeśli jest w function.subagent_system_prompt)
    • Przygotowuje user message z placeholderami (function.subagent_user_message → "User wants to: {user_request}")
    • Sprawdza preserve_previous_context parametr
  3. SubagentThreadService:

    • Jeśli preserve_previous_context=true → szuka istniejącego subthreadu
    • Jeśli znajdzie → zwraca go (kontynuacja z kontekstem)
    • Jeśli nie lub preserve=false → tworzy nowy subthread z parent_thread_id
  4. Tworzenie struktury:

    • Zapisuje AGENT_SUBTHREAD message w parent thread (link do subthreadu)
    • Broadcastuje AgentSubthreadStart (frontend tworzy zagnieżdżony kontener)
    • Tworzy USER_MESSAGE w subthread
    • Broadcastuje SubthreadUserMessage
  5. Rekursja:

    • ChatService.handleStreamedAgent() jest wywołany REKURSYWNIE dla subagenta
    • Subagent ma własny streaming, własne tool calls, własne eventy
    • Wszystkie eventy zawierają subthread_id żeby frontend wiedział gdzie je wyświetlać
  6. Subagent pracuje:

    • Wywołuje getTopProducts(limit=10) → dostaje listę
    • Wywołuje changeCustomLabel(products=[1,2,3,4,5,6,7,8,9,10], label="Sale")
    • STOP - funkcja wymaga confirmation!
  7. Pending w subthreadzie:

    • Tworzy PENDING_USER_CONFIRMATION w subthread
    • Broadcastuje ActionRequiresConfirmation ze szczegółami
    • Job kończy się (return)
  8. User zatwierdza:

    • Frontend wysyła POST /confirmations/{id} z approved=true
    • Dispatchuje nowy ChatStream job z pendingConfirmation + approved flag
  9. Wznowienie subthreadu:

    • handleConfirmationResume() wykonuje changeCustomLabel
    • Broadcastuje ToolResponse dla subthreadu
    • Kontynuuje streaming subagenta
    • Subagent odpowiada: "Changed custom_label_0 to 'Sale' for 10 products"
    • Kończy się (no more tool calls, finish_reason='stop')
  10. Powrót do parent:

    • continueParentThreads() jest wywołane
    • createToolResponseInParentThread() - wyciąga ostatnią odpowiedź subagenta i zapisuje jako tool_response w parent thread
    • Kontynuuje streaming main agenta z tool_response w kontekście
    • Main agent odpowiada: "Done! I've managed your products - changed labels for top 10."

Hierarchia:

ChatThread (id=1, main thread)
  ├─ USER_MESSAGE: "Manage products..."
  ├─ ASSISTANT_TOOL_CALL: manageProducts()
  ├─ AGENT_SUBTHREAD: → subthread_id=2

  └─ ChatThread (id=2, subthread, parent_thread_id=1)
       ├─ USER_MESSAGE: "User wants to: change labels for top 10"
       ├─ ASSISTANT_MESSAGE: "Let me get top products..."
       ├─ ASSISTANT_TOOL_CALL: getTopProducts()
       ├─ TOOL_RESPONSE: "Product 1: ..., Product 2: ..."
       ├─ ASSISTANT_TOOL_CALL: changeCustomLabel()
       ├─ PENDING_USER_CONFIRMATION: [PAUSE]
       ├─ TOOL_RESPONSE: "Changed labels for 10 products"
       └─ ASSISTANT_MESSAGE: "Changed custom_label_0 to 'Sale' for 10 products"
  
  ├─ TOOL_RESPONSE: "Changed custom_label_0 to 'Sale' for 10 products" (from subthread)
  └─ ASSISTANT_MESSAGE: "Done! I've managed your products..."

System Potwierdzeń

Trigger Confirmation

Warunek: ChatFunction.requires_user_confirmation = true

Przykład: changeCustomLabel, editCampaignStatus, addNegativeKeywords, createTask

Przepływ

  1. AI wywołuje funkcję wymagającą confirmation
  2. handleFunctionCalling() wykrywa: function.requires_user_confirmation === true
  3. Tworzy PENDING_USER_CONFIRMATION message:
    • Zawiera: tool_call_id, function_name, arguments, thread_context (thread_id, parent_thread_id)
  4. Broadcastuje ActionRequiresConfirmation:
    • Frontend pokazuje modal: "Change custom_label_0 to 'Sale' for 5 products?" [Cancel] [Confirm]
  5. Job kończy się - return (nie wykonuje funkcji, nie kontynuuje)

User Decision

Cancel:

  • POST /confirmations/{id} z approved=false
  • Nowy job z pendingConfirmation + approved=false
  • handleConfirmationResume() tworzy tool_response: "User declined this action."
  • Broadcastuje ActionRejected
  • Kontynuuje streaming - AI wie że user odrzucił i odpowiada np. "Okay, I won't change the labels."

Confirm:

  • POST /confirmations/{id} z approved=true
  • Nowy job z pendingConfirmation + approved=true
  • handleConfirmationResume() WYKONUJE funkcję
  • Broadcastuje ActionConfirmed
  • Broadcastuje ToolResponse z wynikiem
  • Kontynuuje streaming - AI widzi wynik i odpowiada np. "Done! Changed labels for 5 products."

Auto-Reject

Sytuacja: User ma pending confirmation ale wysyła nową wiadomość zamiast potwierdzić/odrzucić.

Rozwiązanie:

  • ChatStream.handle() na początku sprawdza getActivePendingConfirmationInThreadTree()
  • Jeśli znajdzie → automatycznie odrzuca (declinePendingAction)
  • Tworzy tool_response: "Previous action was automatically declined because user sent a new message."
  • Broadcastuje ActionRejected z automaticRejection=true
  • Kontynuuje z aktualną thread (tej ze starym pending) - AI dostaje info że user odrzucił
  • Następnie przetwarza nową wiadomość użytkownika normalnie

Dlaczego:
User zmienił zdanie - zamiast potwierdzić "change labels" wysłał "show me analytics". Nie zostawiamy wiszących confirmations.


Diagram Sekwencji

Pełna Konwersacja z Subagentem i Potwierdzeniem


Przykładowe Scenariusze

Scenariusz 1: Prosta Funkcja bez Potwierdzenia

User: "Show me top 5 products"

Timeline:

  • 0ms: ThreadStart
  • 50ms: StreamMessageChunk "Let"
  • 55ms: StreamMessageChunk " me"
  • 100ms: FunctionGenerationStart
  • 150ms: FunctionArgumentsChunk (JSON fragmenty)
  • 200ms: ToolCall getTopProducts
  • 1500ms: ToolResponse (z danymi produktów)
  • 1600ms: StreamMessageChunk "Here"
  • 1700ms: MessageEnd
  • 1750ms: ThreadEnd
  • 1800ms: ProposeActions

Wynik: User widzi listę 5 produktów + sugestie ("Analyze trends", "Change prices")


Scenariusz 2: Funkcja z Potwierdzeniem

User: "Change custom_label_0 to 'Sale' for products 1, 2, 3"

Timeline:

  • 0ms: ThreadStart
  • 100ms: FunctionGenerationStart
  • 200ms: ToolCall changeCustomLabel
  • 250ms: ActionRequiresConfirmation → PAUSE
  • [User interaction 5000ms]
  • 5000ms: User clicks Confirm
  • 5010ms: ThreadStart (resume)
  • 5200ms: ToolResponse "Changed labels for 3 products"
  • 5300ms: StreamMessageChunk "Done!"
  • 5400ms: ThreadEnd

Wynik: Labele zmienione po potwierdzeniu


Scenariusz 3: Subagent z Zagnieżdżonym Potwierdzeniem

User: "Manage products: change label for top 10"

Flow:

  1. Main thread → wywołuje manageProducts
  2. Subthread tworzony → Product Manager
  3. Subagent wywołuje getTopProducts → sukces
  4. Subagent wywołuje changeCustomLabel → wymaga confirmation
  5. PAUSE w subthreadzie
  6. User potwierdza
  7. Subthread kończy się
  8. Parent thread kontynuuje
  9. Main agent odpowiada

Thread Structure:

  • Main Thread (id=1)
    • Subthread (id=2, parent=1) ← tutaj było confirmation
  • Wynik zapisany w obu threadach

Data utworzenia: 2025-11-26
Wersja: 1.0