API Reference

Key entry points for the engrava Python library. For the full auto-generated reference, see the source docstrings in the package.

EngravaManager

Multi-service store manager. Use when your agent needs isolated memory spaces per service (e.g. per-user, per-conversation). Each service gets its own SQLite database under data_dir.

from pathlib import Path

from engrava import EngravaManager

async with EngravaManager(data_dir=Path("./data")) as mgr:
    store = await mgr.get_store("main")
    # store is a fully initialised SqliteEngravaCore
    await mgr.list_services()        # -> ["main"]
    await mgr.delete_service("old")

Methods

MethodDescription
get_store(name)Return (or lazily open) the named store for the service.
list_services()List the service names currently backed by a database.
delete_service(name)Remove a service and its database.
service_exists(name)Return whether a database already exists for the service.
close_all()Close all open stores. Called automatically when used as an async context manager.

SqliteEngravaCore

The primary store. Wraps an already-open aiosqlite.Connection and provides the full thought + edge + search API. All operations are async.

import aiosqlite
from engrava import SqliteEngravaCore

async with aiosqlite.connect("engrava.db") as conn:
    conn.row_factory = aiosqlite.Row
    store = SqliteEngravaCore(conn)
    await store.ensure_schema()

For a config-driven, one-call setup, use the from_config factory — it opens and owns the connection and applies the schema for you. Use it as an async context manager:

async with await SqliteEngravaCore.from_config("engrava.yaml") as store:
    ...  # schema already applied by from_config

SqliteEngravaCore(db_path=...) does not exist — pass an open connection, or use await SqliteEngravaCore.from_config(path).

Thought Methods

create_thought

create_thought takes a single frozen ThoughtRecord object (build it, then pass it) and returns the persisted record. It does not take field keyword arguments and does not return a UUID string.

import uuid
from engrava import ThoughtRecord, ThoughtType, Priority, LifecycleStatus

record = ThoughtRecord(
    thought_id=str(uuid.uuid4()),
    thought_type=ThoughtType.OBSERVATION,
    essence="Short summary (1-200 chars)",
    content="Full content text.",
    priority=Priority.P2,
    lifecycle_status=LifecycleStatus.ACTIVE,
    created_cycle=0,
    updated_cycle=0,
    source="human",
    metadata={"role": "user", "lang": "en"},  # optional structured attributes
)
stored = await store.create_thought(record)
# stored is the persisted ThoughtRecord

thought_type values: TASK, OBSERVATION, BELIEF, REFLECTION, OUTPUT_DRAFT, NOTE.

Keyword-only options: expires_after_seconds (relative TTL) and deduplicate (collapse identical content into the existing thought, bumping its confirmation_count).

remember

remember is the one-call shorthand over create_thought for the common case of storing a bare string — it generates the UUID and fills the fields for you.

stored = await store.remember("User prefers concise answers")
# stored is the persisted ThoughtRecord (thought_type=NOTE, priority=P3,
# essence=text[:200], content=text, created_cycle=updated_cycle=0)

Signature: remember(text, *, metadata=None, deduplicate=False) -> ThoughtRecord. For a write that needs an explicit cognitive cycle, build a ThoughtRecord and call create_thought instead — remember always stamps cycle 0.

recall

recall is the one-call shorthand over search_hybrid for retrieval.

result = await store.recall("what does the user prefer?")
for thought_id, score in result.results:
    ...

Signature: recall(query, *, top_k=10, current_cycle=None) -> HybridSearchResult. When current_cycle is None the recency signal is inactive; pass a cycle to blend recency into ranking.

get_thought

thought = await store.get_thought(thought_id)
# Returns ThoughtRecord, or None if not found

update_thought

await store.update_thought(
    thought_id,
    essence="Updated summary",
    priority="P1",
)
# Optimistic-concurrency update; raises ThoughtNotFoundError / StaleDataError

list_thoughts

thoughts = await store.list_thoughts(
    limit=50,
    lifecycle_status="ACTIVE",     # optional filter
    thought_type="OBSERVATION",    # optional filter
)

Filters are keyword-only: thought_type, lifecycle_status, priority, include_expired, limit, offset. Use count_thoughts(...) for the count under the same filters.

Edge Methods

create_edge

create_edge takes a single EdgeRecord object and returns the persisted record. It raises ReferentialIntegrityError when an endpoint thought does not exist.

import uuid
from engrava import EdgeRecord, EdgeType

edge = await store.create_edge(
    EdgeRecord(
        edge_id=str(uuid.uuid4()),
        from_thought_id=thought_id,
        to_thought_id=other_id,
        edge_type=EdgeType.ASSOCIATED,
        weight=0.8,               # required; float in [0.0, 1.0]
        created_cycle=0,
    )
)

edge_type values: ASSOCIATED, DEPENDS_ON, DERIVED_FROM, MESSAGE_OF, BRIDGE, CONSOLIDATED_FROM, CONTESTED_BY.

get_edges

edges = await store.get_edges(
    thought_id,
    direction="BOTH",  # "IN", "OUT", or "BOTH" (keyword-only)
)

Valid-time Methods

invalidate_thought / invalidate_edge

Close a record’s valid-time interval at a given instant — mark a fact as no longer true without deleting it. Deterministic, idempotent, non-cascading (invalidating a thought leaves its edges untouched). The row and its history remain on file; a point-in-time query for an instant before the cut-off still returns it.

# Reality changed on 2026-06-01 — close the window, keep the history.
updated = await store.invalidate_thought(
    thought_id,
    valid_until="2026-06-01T00:00:00+00:00",
)
# Returns the updated ThoughtRecord (valid_until now set).

await store.invalidate_edge(
    edge_id,
    valid_until="2026-06-01T00:00:00+00:00",
)
# Returns the updated EdgeRecord.

Use these when a fact stopped being true; use delete_thought only for a fact that should never have existed. See Bi-temporal Model.

Search Methods

search_hybrid

The primary search method — fuses FTS5, vector, recency, priority, and optionally graph signals.

result = await store.search_hybrid(
    "python async agents",        # query_text (positional)
    query_vector=embedding,       # optional; enables vector signal
    top_k=10,
    current_cycle=42,             # optional; enables recency signal
    fts_weight=0.3,               # override per-call
    vector_weight=0.55,
    recency_weight=0.1,
    priority_weight=0.05,
    graph_weight=0.0,             # opt-in
    include_reflections=True,
    reflection_boost=1.0,
)

for thought_id, score in result.results:
    thought = await store.get_thought(thought_id)
    print(f"{score:.3f}  {thought.essence}")

search_hybrid returns a single HybridSearchResult with two fields: result.results (a list[tuple[str, float]] of (thought_id, combined_score), highest first) and result.backends_used (a frozenset[str] of the signals that contributed, e.g. {"fts5", "vector", "graph_expansion"}).

search_fts

results = await store.search_fts("keyword query", top_k=10)
# Returns list[tuple[str, float]] — (thought_id, bm25_score)

FTS5-only search, ordered by BM25 score. Fetch each record with get_thought when you need its fields.

search_similar

results = await store.search_similar(query_vector, top_k=10)
# Returns list[tuple[str, float]] — (thought_id, score)

Vector-only cosine similarity search.

search_reflections_only

result = await store.search_reflections_only(
    "recurring themes",
    query_vector=embedding,
    top_k=5,
    current_cycle=42,
)

Hybrid search restricted to REFLECTION thoughts. Returns a HybridSearchResult (same shape as search_hybrid), so reflections do not compete against regular thoughts for result slots.

Schema and Lifecycle

await store.ensure_schema()   # idempotent — call once on startup
await store.close()           # closes the connection only if the store owns it
                              # (from_config); for a manually-supplied connection
                              # this is a no-op — close the connection yourself.
                              # Does not checkpoint the WAL.

ThoughtRecord

The core domain model. All fields are immutable (frozen=True).

from engrava import ThoughtRecord

thought: ThoughtRecord = await store.get_thought(thought_id)

thought.thought_id          # UUID str
thought.thought_type        # "TASK" | "OBSERVATION" | "BELIEF" | "REFLECTION" | "OUTPUT_DRAFT" | "NOTE"
thought.essence             # short summary (1-200 chars, indexed for FTS)
thought.content             # full content text
thought.priority            # "P1" | "P2" | "P3" | "P4"
thought.lifecycle_status    # "CREATED" | "ACTIVE" | "DONE" | "ARCHIVED"
thought.confidence          # float | None
thought.source              # origin identifier
thought.metadata            # dict (structured attributes; defaults to {})
thought.created_cycle       # int (creation cycle number)
thought.updated_cycle       # int (last update cycle)
thought.confirmation_count  # int
thought.access_count        # int
thought.created_at          # ISO-8601 str | None (when persisted)
thought.updated_at          # ISO-8601 str | None (last mutation)
thought.valid_from          # ISO-8601 str | None (valid-time lower bound; None = −∞)
thought.valid_until         # ISO-8601 str | None (valid-time upper bound, exclusive; None = +∞)

valid_from / valid_until are the optional valid-time bounds — the real-world window during which the fact is true, independent of when it was stored. Both default to None (valid for all time). EdgeRecord carries the same two fields. See Bi-temporal Model for the semantics and query predicates.

To produce a modified copy:

updated = thought.model_copy(update={"priority": "P1", "confidence": 0.9})

MindQL

MindQL is engrava’s read-only query language. MindQLExecutor runs against an open aiosqlite.Connection (the same connection the store wraps), and execute() takes a parsed MindQLQuery — parse the string first with parse().

from engrava import MindQLExecutor, MindQLResult, parse

executor = MindQLExecutor(conn)  # an aiosqlite.Connection, not a store
result: MindQLResult = await executor.execute(
    parse("FIND thoughts WHERE thought_type = 'OBSERVATION' LIMIT 10")
)
print(result.rows)    # list[dict]
print(result.count)   # int | None (set for COUNT queries)

If your connection is owned by the store, you can run a parsed query directly via the store instead of constructing an executor:

from engrava import parse

result = await store.execute_mindql(
    parse("FIND thoughts WHERE valid_now")
)

execute_mindql is the store-level entry point for the same execution contract — it accepts an already-parsed MindQLQuery and returns a MindQLResult.

Grammar

The grammar requires a table name and an optional WHERE clause:

FIND <table> [WHERE <field> <op> '<value>' [AND ...]] [LIMIT n]

Tables: thoughts, edges, embeddings, actions (singular forms accepted too). Operators: =, !=, >, <, >=, <=. String values are single-quoted.

FIND

Retrieve rows matching a filter.

await executor.execute(parse("FIND thoughts WHERE lifecycle_status = 'ACTIVE' LIMIT 10"))
await executor.execute(parse("FIND thoughts WHERE priority = 'P1' AND thought_type = 'OBSERVATION'"))
await executor.execute(parse("FIND edges WHERE from_thought_id = 'abc'"))

COUNT

Count rows matching a filter.

result = await executor.execute(parse("COUNT thoughts WHERE thought_type = 'OBSERVATION'"))
print(result.count)

SELECT

Read-only SQL passthrough.

result = await executor.execute(
    parse("SELECT thought_id, essence FROM thought WHERE lifecycle_status = 'ACTIVE'")
)
for row in result.rows:
    print(row["thought_id"], row["essence"])

parse

parse() returns a MindQLQuery plan with .command, .table, .conditions, and .limit:

from engrava import parse, MindQLQuery

query: MindQLQuery = parse("FIND thoughts WHERE thought_type = 'OBSERVATION' LIMIT 5")
query.command     # MindQLCommand.FIND
query.table       # "thought"
query.conditions  # list of parsed conditions (field / operator / value)
query.limit       # 5

Result Structure

result.columns   # list[str] — column names in result order
result.rows      # list[dict]
result.count     # int | None — set for COUNT queries
result.command   # str — the command that was executed

Embedding Providers

Install one of the extras and pass a provider to the store:

pip install 'engrava[embeddings-local]'     # sentence-transformers
pip install 'engrava[embeddings-openai]'    # OpenAI-compatible text-embedding-*
pip install 'engrava[embeddings-ollama]'    # Ollama local models
pip install 'engrava[embeddings-hf]'        # Hugging Face Inference API
from engrava import SentenceTransformerProvider

provider = SentenceTransformerProvider(model_name="all-MiniLM-L12-v2")
vector = await provider.embed("text to embed")
await store.store_embedding(thought_id, vector, model_name=provider.model_name)

store_embedding is keyword-only for model_name (and optional embedding_id); the vector dimension is derived from len(vector).

The CallbackProvider accepts any Python callable as an embedding function — useful for testing or for providers not yet wrapped:

from engrava import CallbackProvider

provider = CallbackProvider(
    callback=lambda text: [0.0] * 384,
    dimension=384,
    model_name="stub",
)