Langsung ke konten
KamusNgoding
Mahir Ai-agent 6 menit baca

Optimasi Interaksi Antar Agen: Tips dan Trik Pro

#ai agent #multi-agent #optimasi #advanced

Optimasi Interaksi Antar Agen: Tips dan Trik Pro

Pendahuluan

Membangun sistem multi-agen yang bekerja itu mudah. Tapi membangun sistem multi-agen yang bekerja dengan efisien — itu tantangan tersendiri.

Bayangkan kamu sedang membangun platform layanan seperti Gojek, di mana ada agen yang menangani pemesanan, agen yang memproses pembayaran, agen yang mengelola kurir, dan agen yang menangani notifikasi. Setiap agen punya tugasnya masing-masing, tapi mereka harus berkoordinasi dengan cepat dan tepat. Satu bottleneck kecil bisa membuat seluruh sistem melambat.

Di artikel ini, kita akan membahas teknik-teknik lanjutan untuk mengoptimalkan interaksi antar agen — mulai dari desain protokol komunikasi, pengelolaan context window, parallelisasi tugas, hingga strategi retry dan error propagation. Artikel ini ditujukan untuk developer yang sudah familiar dengan konsep dasar AI agent dan ingin membawa sistem mereka ke level berikutnya.

Sebelum masuk ke sini, pastikan kamu sudah memahami konsep Tutorial Menulis Prompt Efektif untuk Pemula karena kualitas prompt yang dikirim antar agen sangat mempengaruhi performa keseluruhan sistem.


Konsep Dasar

1. Protokol Komunikasi Antar Agen

Interaksi antar agen terjadi melalui message passing — setiap agen mengirim dan menerima pesan terstruktur. Masalah yang sering muncul adalah pesan yang terlalu verbose atau tidak terstruktur, sehingga agen penerima membuang token hanya untuk mem-parsing input.

Prinsip komunikasi yang efisien:

  • Minimal context, maksimal informasi — Kirim hanya data yang dibutuhkan agen penerima
  • Typed messages — Gunakan schema yang konsisten (JSON Schema, Pydantic, dll.)
  • Stateless by default — Setiap pesan harus self-contained kecuali memang perlu state

2. Context Window Management

Setiap agen memiliki batasan context window. Dalam sistem multi-agen, context yang tidak dikelola dengan baik adalah penyebab nomor satu degradasi performa. Mirip dengan bagaimana Memahami Tokenisasi: Kunci Utama Cara Kerja LLM menjelaskan bahwa setiap token memiliki biaya, dalam sistem multi-agen biaya itu berlipat ganda.

Strategi pengelolaan context:

  • Summarization — Kompres riwayat percakapan sebelum diteruskan ke agen berikutnya
  • Selective passing — Hanya teruskan output relevan, bukan seluruh chain of thought
  • Context pruning — Hapus intermediate steps yang tidak diperlukan downstream

3. Parallelisasi dan Dependency Graph

Banyak developer menjalankan agen secara sekuensial padahal bisa diparalel. Identifikasi dependency antar agen dan jalankan agen yang independen secara bersamaan.

Sequential (lambat):
A → B → C → D

Parallel (optimal):
A → B ──┐
         ├→ D
A → C ──┘

Contoh Kode

Implementasi Dasar: Structured Message Protocol

# Pastikan Python 3.8+ dan install: pip install pydantic
from pydantic import BaseModel
from typing import Any, Dict
from enum import Enum
import json

class MessageType(str, Enum):
    TASK = "task"
    RESULT = "result"
    ERROR = "error"
    SUMMARY = "summary"

class AgentMessage(BaseModel):
    sender: str
    receiver: str
    message_type: MessageType
    payload: Dict[str, Any]
    context_id: str  # untuk tracing
    priority: int = 5  # 1 (tinggi) - 10 (rendah)

# Contoh penggunaan
msg = AgentMessage(
    sender="planner_agent",
    receiver="executor_agent",
    message_type=MessageType.TASK,
    payload={
        "action": "search_web",
        "query": "harga saham BBCA hari ini",
        "max_results": 3
    },
    context_id="session-abc123",
    priority=2
)

print(msg.model_dump_json(indent=2))

Output yang dihasilkan:

{
  "sender": "planner_agent",
  "receiver": "executor_agent",
  "message_type": "task",
  "payload": {
    "action": "search_web",
    "query": "harga saham BBCA hari ini",
    "max_results": 3
  },
  "context_id": "session-abc123",
  "priority": 2
}

Pesan terstruktur ini memudahkan agen penerima mem-parsing instruksi tanpa ambiguitas. Field context_id memungkinkan kamu melacak seluruh alur eksekusi lintas agen untuk keperluan debugging.


Optimasi Context: Summarization Sebelum Handoff

# Install: pip install anthropic
import json
from typing import Dict, List
from anthropic import Anthropic

client = Anthropic()

def compress_agent_history(
    history: List[Dict],
    max_tokens: int = 500
) -> str:
    """
    Kompres riwayat percakapan agen menjadi summary ringkas
    sebelum diteruskan ke agen berikutnya.
    """
    if not history:
        return ""

    # Estimasi kasar: 1 token ≈ 4 karakter
    total_chars = sum(len(str(h)) for h in history)
    if total_chars < max_tokens * 4:
        # Sudah cukup pendek, tidak perlu kompres
        return json.dumps(history)

    history_text = "\n".join([
        f"[{h.get('role', 'unknown')}]: {h.get('content', '')[:200]}"
        for h in history[-10:]  # Ambil 10 pesan terakhir
    ])

    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=max_tokens,
        messages=[{
            "role": "user",
            "content": f"""Buat ringkasan singkat dari riwayat interaksi agen berikut.
Fokus pada: keputusan yang diambil, hasil penting, dan konteks yang diperlukan agen berikutnya.
Maksimal 3 kalimat.

Riwayat:
{history_text}

Ringkasan:"""
        }]
    )

    return response.content[0].text


# Contoh penggunaan
history = [
    {"role": "user", "content": "Cari informasi tentang harga emas"},
    {"role": "assistant", "content": "Saya akan mencari harga emas terkini..."},
    {"role": "tool", "content": "Harga emas: Rp 1.200.000/gram"},
    {"role": "assistant", "content": "Harga emas saat ini adalah Rp 1.200.000 per gram"},
]

summary = compress_agent_history(history)
print(f"Summary: {summary}")
# Contoh output:
# Summary: Pengguna meminta pencarian harga emas. Hasil pencarian menunjukkan
# harga emas saat ini Rp 1.200.000/gram. Informasi ini telah dikonfirmasi
# dan siap diteruskan ke agen berikutnya.

Fungsi ini sangat berguna sebelum meneruskan context ke agen downstream. Daripada mengirim seluruh riwayat percakapan (yang bisa ribuan token), kita kirim ringkasan 3 kalimat yang berisi informasi esensial saja.


Parallelisasi dengan asyncio

# Install: pip install anthropic
import asyncio
from typing import Dict, List, Tuple
from dataclasses import dataclass
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

@dataclass
class AgentTask:
    name: str
    prompt: str

async def run_agent(task: AgentTask) -> Tuple[str, str]:
    """Jalankan satu agen secara async"""
    response = await async_client.messages.create(
        model="claude-haiku-4-5-20251001",  # Gunakan model cepat untuk subtask
        max_tokens=1024,
        messages=[{"role": "user", "content": task.prompt}]
    )
    return task.name, response.content[0].text


async def run_parallel_agents(tasks: List[AgentTask]) -> Dict[str, str]:
    """
    Jalankan multiple agen secara paralel.
    Jauh lebih cepat dibanding sequential untuk task yang independen.
    """
    results = await asyncio.gather(
        *[run_agent(task) for task in tasks],
        return_exceptions=True
    )

    output = {}
    for result in results:
        if isinstance(result, Exception):
            print(f"Agent error: {result}")
            continue
        name, content = result
        output[name] = content

    return output


async def orchestrator_agent(user_query: str) -> str:
    """
    Agen orkestrasi yang membagi tugas dan menggabungkan hasil.
    """
    # Step 1: Bagi tugas ke agen paralel
    subtasks = [
        AgentTask(
            name="researcher",
            prompt=f"Riset fakta-fakta kunci tentang: {user_query}. Jawab singkat dalam 3 poin."
        ),
        AgentTask(
            name="analyst",
            prompt=f"Analisis implikasi dan risiko dari: {user_query}. Jawab singkat dalam 3 poin."
        ),
        AgentTask(
            name="recommender",
            prompt=f"Berikan 3 rekomendasi praktis terkait: {user_query}"
        )
    ]

    print(f"Menjalankan {len(subtasks)} agen secara paralel...")
    parallel_results = await run_parallel_agents(subtasks)

    # Step 2: Agregasi hasil di agen utama
    aggregated_context = "\n\n".join([
        f"=== {name.upper()} ===\n{content}"
        for name, content in parallel_results.items()
    ])

    final_response = await async_client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"""Berdasarkan analisis dari beberapa agen spesialis berikut,
buat laporan komprehensif untuk pertanyaan: "{user_query}"

{aggregated_context}

Tulis laporan yang kohesif dan actionable:"""
        }]
    )

    return final_response.content[0].text


# Jalankan
if __name__ == "__main__":
    result = asyncio.run(
        orchestrator_agent("Strategi adopsi AI untuk startup fintech di Indonesia")
    )
    print(result)
    # Output: Laporan komprehensif yang menggabungkan riset, analisis risiko,
    # dan rekomendasi praktis dari 3 agen spesialis dalam satu teks kohesif.

Perhatikan bahwa 3 agen berjalan secara bersamaan — bukan satu per satu. Dalam praktiknya, ini bisa memangkas waktu eksekusi hingga 60-70% dibanding pendekatan sekuensial.


Retry dengan Exponential Backoff dan Circuit Breaker

import asyncio
import time
from typing import Any, Callable
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

class CircuitBreaker:
    """
    Mencegah cascade failure di sistem multi-agen.
    Jika satu agen terus gagal, circuit breaker akan memblokir
    request untuk sementara dan mencoba recovery.
    """

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF-OPEN

    def can_proceed(self) -> bool:
        if self.state == "CLOSED":
            return True

        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF-OPEN"
                return True
            return False

        # HALF-OPEN: izinkan satu request test
        return True

    def record_success(self):
        self.failure_count = 0
        self.state = "CLOSED"

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
            print(f"Circuit OPEN! Agen diblokir selama {self.recovery_timeout}s")


async def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    circuit_breaker: CircuitBreaker = None
) -> Any:
    """
    Retry dengan exponential backoff + optional circuit breaker.
    """
    for attempt in range(max_retries):
        # Cek circuit breaker
        if circuit_breaker and not circuit_breaker.can_proceed():
            raise RuntimeError("Circuit breaker OPEN — agen tidak tersedia sementara")

        try:
            result = await func()
            if circuit_breaker:
                circuit_breaker.record_success()
            return result

        except Exception as e:
            if circuit_breaker:
                circuit_breaker.record_failure()

            if attempt == max_retries - 1:
                raise  # Sudah habis retry

            delay = base_delay * (2 ** attempt)  # 1s, 2s, 4s
            print(f"Attempt {attempt + 1} gagal: {e}. Retry dalam {delay:.1f}s...")
            await asyncio.sleep(delay)


# Contoh: agen dengan retry dan circuit breaker
payment_circuit = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

async def payment_agent_with_resilience(amount: float) -> dict:
    async def _call():
        response = await async_client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=256,
            messages=[{
                "role": "user",
                "content": f"Proses pembayaran sebesar Rp {amount:,.0f}. Berikan konfirmasi JSON."
            }]
        )
        return {"status": "success", "response": response.content[0].text}

    return await retry_with_backoff(
        _call,
        max_retries=3,
        base_delay=1.0,
        circuit_breaker=payment_circuit
    )


# Jalankan contoh
async def main():
    result = await payment_agent_with_resilience(500000)
    print(result)
    # Output sukses:
    # {'status': 'success', 'response': '{"konfirmasi": "OK", "jumlah": "Rp 500.000"}'}
    #
    # Output jika 3x gagal berturut-turut:
    # Circuit OPEN! Agen diblokir selama 30s
    # RuntimeError: Circuit breaker OPEN — agen tidak tersedia sementara

if __name__ == "__main__":
    asyncio.run(main())

Priority Queue untuk Orkestrasi Multi-Agen

import heapq
import asyncio
from typing import Any, Callable, Dict
from dataclasses import dataclass, field
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

@dataclass
class AgentTask:
    name: str
    prompt: str

@dataclass(order=True)
class PrioritizedTask:
    priority: int
    task_id: str = field(compare=False)
    agent_func: Callable = field(compare=False)
    kwargs: Dict = field(compare=False, default_factory=dict)

class AgentOrchestrator:
    """
    Orkestrasi multi-agen dengan priority queue.
    Cocok untuk sistem yang punya task dengan urgensi berbeda-beda,
    misalnya jika ingin membangun marketplace dengan priority handling
    untuk transaksi vs. notifikasi.
    """

    def __init__(self, max_concurrent: int = 3):
        self.queue = []
        self.max_concurrent = max_concurrent
        self.results: Dict[str, Any] = {}

    def add_task(self, task_id: str, priority: int, agent_func: Callable, **kwargs):
        task = PrioritizedTask(
            priority=priority,
            task_id=task_id,
            agent_func=agent_func,
            kwargs=kwargs
        )
        heapq.heappush(self.queue, task)
        print(f"Task '{task_id}' ditambahkan dengan priority {priority}")

    async def run_all(self) -> Dict[str, Any]:
        semaphore = asyncio.Semaphore(self.max_concurrent)

        async def run_task(task: PrioritizedTask):
            async with semaphore:
                print(f"Menjalankan: {task.task_id}")
                try:
                    result = await task.agent_func(**task.kwargs)
                    self.results[task.task_id] = result
                    print(f"Selesai: {task.task_id}")
                except Exception as e:
                    self.results[task.task_id] = {"error": str(e)}
                    print(f"Error: {task.task_id}{e}")

        # Ambil task dari queue berdasarkan priority
        sorted_tasks = []
        while self.queue:
            sorted_tasks.append(heapq.heappop(self.queue))

        await asyncio.gather(*[run_task(t) for t in sorted_tasks])
        return self.results


async def simple_agent(prompt: str) -> str:
    """Agen sederhana untuk demonstrasi."""
    response = await async_client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=256,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text


async def main():
    orchestrator = AgentOrchestrator(max_concurrent=2)

    # Priority 1 = paling urgent, 9 = paling rendah
    orchestrator.add_task(
        "critical-alert",
        priority=1,
        agent_func=simple_agent,
        prompt="Buat pesan alert singkat: sistem pembayaran down"
    )
    orchestrator.add_task(
        "user-recommendation",
        priority=5,
        agent_func=simple_agent,
        prompt="Beri 3 rekomendasi produk untuk pengguna aktif"
    )
    orchestrator.add_task(
        "analytics-report",
        priority=9,
        agent_func=simple_agent,
        prompt="Buat ringkasan laporan harian dalam 2 kalimat"
    )

    results = await orchestrator.run_all()
    for task_id, result in results.items():
        print(f"\n[{task_id}]:\n{result}")
    # Output menunjukkan critical-alert dieksekusi pertama kali,
    # diikuti user-recommendation, lalu analytics-report.

if __name__ == "__main__":
    asyncio.run(main())

Troubleshooting: Error yang Sering Muncul

Rate Limit Error saat Agen Paralel Terlalu Banyak

Penyebab: Menjalankan terlalu banyak agen secara bersamaan menyebabkan API throttling, terutama saat menggunakan model yang sama untuk semua agen.

Solusi:

import asyncio
from typing import Dict, List, Tuple
from dataclasses import dataclass
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

@dataclass
class AgentTask:
    name: str
    prompt: str

async def run_agent(task: AgentTask) -> Tuple[str, str]:
    response = await async_client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=512,
        messages=[{"role": "user", "content": task.prompt}]
    )
    return task.name, response.content[0].text

# Batasi concurrency dengan Semaphore
MAX_CONCURRENT_AGENTS = 5  # Sesuaikan dengan tier API kamu

semaphore = asyncio.Semaphore(MAX_CONCURRENT_AGENTS)

async def rate_limited_agent(task: AgentTask) -> Tuple[str, str]:
    async with semaphore:
        await asyncio.sleep(0.1)  # Jitter kecil untuk menghindari burst
        return await run_agent(task)

async def run_agents_safely(tasks: List[AgentTask]) -> Dict[str, str]:
    results = await asyncio.gather(
        *[rate_limited_agent(t) for t in tasks],
        return_exceptions=True
    )
    output = {}
    for result in results:
        if isinstance(result, Exception):
            print(f"Agen gagal: {result}")
            continue
        name, content = result
        output[name] = content
    return output

# Contoh penggunaan
async def main():
    tasks = [AgentTask(f"agen-{i}", f"Jawab singkat: apa itu AI?") for i in range(10)]
    results = await run_agents_safely(tasks)
    print(f"Berhasil: {len(results)} dari {len(tasks)} agen")
    # Output: Berhasil: 10 dari 10 agen

if __name__ == "__main__":
    asyncio.run(main())

Context Overflow: “max_tokens exceeded” pada Agen Downstream

Penyebab: Agen upstream mengirimkan hasil yang terlalu panjang tanpa kompresi, sehingga agen penerima kehabisan context window saat menggabungkan input dari banyak agen.

Solusi:

from typing import Dict

def truncate_agent_output(output: str, max_chars: int = 2000) -> str:
    """
    Potong output agen jika terlalu panjang.
    Selalu pertahankan bagian awal (biasanya paling penting).
    """
    if len(output) <= max_chars:
        return output

    keep_chars = int(max_chars * 0.8)
    return output[:keep_chars] + f"\n\n[... output dipotong, total {len(output)} karakter ...]"


def prepare_handoff_context(results: Dict[str, str], max_per_agent: int = 1500) -> str:
    sections = []
    for agent_name, content in results.items():
        truncated = truncate_agent_output(content, max_per_agent)
        sections.append(f"### {agent_name}\n{truncated}")
    return "\n\n".join(sections)


# Contoh penggunaan
results = {
    "researcher": "A" * 3000,   # output panjang 3000 karakter
    "analyst": "B" * 800,       # output pendek, tidak dipotong
}

context = prepare_handoff_context(results)
print(context)
# Output:
# ### researcher
# AAAA...AAAA
# [... output dipotong, total 3000 karakter ...]
#
# ### analyst
# BBBB...BBBB  (tidak dipotong karena di bawah 1500 karakter)

Infinite Loop antar Agen

Penyebab: Dua agen saling memanggil satu sama lain tanpa kondisi terminasi yang jelas, biasanya karena state management yang buruk atau kondisi stop tidak terdefinisi.

Solusi:

from typing import Dict, Optional
from anthropic import AsyncAnthropic
import asyncio

async_client = AsyncAnthropic()

class LoopGuard:
    """Mencegah agen memanggil satu sama lain secara infinite."""

    def __init__(self, max_iterations: int = 10):
        self.max_iterations = max_iterations
        self.call_counts: Dict[str, int] = {}

    def check_and_increment(self, agent_id: str) -> bool:
        """Returns True jika masih boleh jalan, False jika sudah limit."""
        count = self.call_counts.get(agent_id, 0)
        if count >= self.max_iterations:
            print(f"Loop guard: '{agent_id}' sudah dipanggil {count}x, dihentikan!")
            return False
        self.call_counts[agent_id] = count + 1
        return True

    def reset(self, agent_id: Optional[str] = None):
        if agent_id:
            self.call_counts.pop(agent_id, None)
        else:
            self.call_counts.clear()


guard = LoopGuard(max_iterations=5)

async def safe_agent_call(agent_id: str, prompt: str) -> str:
    if not guard.check_and_increment(agent_id):
        return "MAX_ITERATIONS_REACHED"

    response = await async_client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=256,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text


async def main():
    # Simulasi pemanggilan berulang
    for i in range(7):
        result = await safe_agent_call("planning-agent", "Buat rencana selanjutnya")
        print(f"Iterasi {i+1}: {result[:50] if result != 'MAX_ITERATIONS_REACHED' else result}")
    # Output:
    # Iterasi 1-5: [respons normal dari agen]
    # Loop guard: 'planning-agent' sudah dipanggil 5x, dihentikan!
    # Iterasi 6: MAX_ITERATIONS_REACHED
    # Iterasi 7: MAX_ITERATIONS_REACHED

if __name__ == "__main__":
    asyncio.run(main())

Hasil Agen Tidak Konsisten (Non-Deterministic Output)

Penyebab: Menggunakan temperature tinggi untuk agen yang seharusnya menghasilkan output terstruktur (JSON, kode), sehingga format output berubah-ubah antar run.

Solusi:

import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

async def main():
    # Untuk agen yang butuh output terstruktur: gunakan temperature rendah
    structured_response = await async_client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        temperature=0.0,  # Deterministik untuk parsing JSON
        messages=[{
            "role": "user",
            "content": 'Ekstrak ke JSON: "Nama: Budi, Usia: 25, Kota: Jakarta"'
        }]
    )
    print("Structured output:")
    print(structured_response.content[0].text)
    # Output konsisten setiap run:
    # {"nama": "Budi", "usia": 25, "kota": "Jakarta"}

    # Untuk agen kreatif/generatif: boleh temperature lebih tinggi
    creative_response = await async_client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        temperature=0.7,  # Lebih kreatif untuk konten naratif
        messages=[{
            "role": "user",
            "content": "Tulis tagline marketing untuk aplikasi keuangan dalam 1 kalimat"
        }]
    )
    print("\nCreative output:")
    print(creative_response.content[0].text)
    # Output bervariasi setiap run (sesuai ekspektasi untuk konten kreatif)

if __name__ == "__main__":
    asyncio.run(main())

Pertanyaan yang Sering Diajukan

Apa perbedaan antara orchestrator agent dan coordinator agent?

Orchestrator bertugas memecah tugas besar menjadi subtask dan mendistribusikannya ke agen-agen spesialis — ia tahu apa yang harus dilakukan. Coordinator bertugas mengatur urutan eksekusi, mengelola dependency, dan memastikan hasil dari satu agen diteruskan dengan benar ke agen lain — ia tahu kapan dan bagaimana mengkoordinasikan. Dalam implementasi sederhana keduanya bisa digabung, tapi memisahkannya meningkatkan maintainability untuk sistem yang kompleks.

Bagaimana cara memilih model yang tepat untuk setiap agen dalam sistem?

Gunakan model berdasarkan kompleksitas tugasnya: model ringan dan cepat (seperti claude-haiku-4-5-20251001) untuk agen yang melakukan tugas sederhana seperti klasifikasi, ekstraksi, atau routing; model lebih kuat (seperti claude-sonnet-4-6) untuk agen yang butuh reasoning mendalam. Strategi ini mengurangi latency dan biaya secara signifikan — bayangkan jika ingin membangun sistem dengan ratusan agen bersamaan, penghematan biaya per-request sangat terasa di tagihan akhir bulan.

Mengapa sistem multi-agen saya semakin lambat setelah percakapan panjang?

Ini hampir selalu karena context accumulation — setiap agen membawa seluruh riwayat percakapan ke dalam promptnya. Solusinya adalah menerapkan summarization secara berkala (setiap N iterasi), menggunakan selective context passing (hanya kirim bagian yang relevan), dan mempertimbangkan arsitektur stateless di mana setiap agen hanya menerima state yang dia butuhkan saja.

Apakah perlu database eksternal untuk state management di sistem multi-agen?

Untuk sistem kecil dengan satu sesi, dictionary in-memory sudah cukup. Tapi untuk sistem produksi yang harus handle banyak sesi concurrent, atau yang perlu persistence antar restart, gunakan Redis untuk state yang sering diakses atau SQLite/PostgreSQL untuk state yang perlu disimpan permanen. Pilihan arsitektur ini juga berdampak pada bagaimana kamu merancang Optimasi Pipeline CI/CD di GitLab: Tips dan Trik Pro untuk deploy sistem agenmu.

Bagaimana cara debug ketika hasil agen tidak sesuai ekspektasi?

Implementasikan tracing dengan context_id yang unik per sesi, log input dan output setiap agen, dan gunakan temperature=0 saat debugging untuk mendapatkan output deterministik. Tools seperti LangSmith atau implementasi custom logging middleware sangat membantu untuk memvisualisasikan alur eksekusi antar agen dan mengidentifikasi di mana chain of thought mulai melenceng.


Kesimpulan

Mengoptimalkan interaksi antar agen bukan sekadar soal kecepatan — ini tentang membangun sistem yang reliabel, efisien, dan mudah di-maintain. Key takeaways dari artikel ini:

  1. Gunakan typed messages dengan schema yang konsisten untuk komunikasi antar agen
  2. Kompres context sebelum handoff untuk menghindari context overflow
  3. Parallelkan task independen dengan asyncio dan Semaphore untuk rate limiting
  4. Implement circuit breaker dan retry dengan exponential backoff untuk resilience
  5. Gunakan priority queue untuk orkestrasi yang lebih cerdas
  6. Sesuaikan model dengan kompleksitas tugas setiap agen

Dengan menerapkan teknik-teknik ini, sistem multi-agen kamu akan mampu menangani beban produksi yang sesungguhnya — tidak hanya berjalan di localhost, tapi juga scale di lingkungan real-world.

Selamat bereksperimen dan terus tingkatkan sistemmu! Jika ada pertanyaan atau ingin berbagi pengalaman membangun sistem multi-agen, jangan ragu untuk eksplorasi artikel-artikel lainnya di KamusNgoding — komunitas developer Indonesia selalu siap belajar dan berkembang bersama.

Artikel Terkait