import os from uuid import uuid4 import httpx import pytest from duck_core.memory.vector_memory import VectorMemory async def qdrant_available(url: str) -> bool: try: async with httpx.AsyncClient(timeout=2.0, trust_env=False) as client: response = await client.get(url) response.raise_for_status() return True except httpx.HTTPError: return False @pytest.mark.asyncio async def test_vector_memory_live_qdrant_write_and_search(): qdrant_url = os.environ.get("DUCK_QDRANT_URL", "http://127.0.0.1:6333") if not await qdrant_available(qdrant_url): pytest.skip(f"Qdrant is not running at {qdrant_url}") embedding_model = os.environ.get("DUCK_LOCAL_EMBEDDING_MODEL", "./models/all-MiniLM-L6-v2") collection = f"duck_memory_smoke_{uuid4().hex[:12]}" memory = VectorMemory( qdrant_url=qdrant_url, collection_name=collection, local_embedding_model=embedding_model, ) marker = f"DuckLM vector memory live smoke {uuid4().hex}" point_id = await memory.add_memory(marker, {"kind": "live_smoke"}) results = await memory.search_memory(marker, limit=3) assert point_id assert any(item.get("payload", {}).get("text") == marker for item in results)