API Reference
Key entry points for the engrava Python library. For the full auto-generated reference, see the source docstrings in the package.
EngravaManager
Multi-service store manager. Use when your agent needs isolated memory
spaces per service (e.g. per-user, per-conversation). Each service gets its
own SQLite database under data_dir.
from pathlib import Path
from engrava import EngravaManager
async with EngravaManager(data_dir=Path("./data")) as mgr:
store = await mgr.get_store("main")
# store is a fully initialised SqliteEngravaCore
await mgr.list_services() # -> ["main"]
await mgr.delete_service("old")
Methods
| Method | Description |
|---|---|
get_store(name) | Return (or lazily open) the named store for the service. |
list_services() | List the service names currently backed by a database. |
delete_service(name) | Remove a service and its database. |
service_exists(name) | Return whether a database already exists for the service. |
close_all() | Close all open stores. Called automatically when used as an async context manager. |
SqliteEngravaCore
The primary store. Wraps an already-open aiosqlite.Connection and
provides the full thought + edge + search API. All operations are async.
import aiosqlite
from engrava import SqliteEngravaCore
async with aiosqlite.connect("engrava.db") as conn:
conn.row_factory = aiosqlite.Row
store = SqliteEngravaCore(conn)
await store.ensure_schema()
For a config-driven, one-call setup, use the from_config factory — it opens
and owns the connection and applies the schema for you. Use it as an async
context manager:
async with await SqliteEngravaCore.from_config("engrava.yaml") as store:
... # schema already applied by from_config
SqliteEngravaCore(db_path=...)does not exist — pass an open connection, or useawait SqliteEngravaCore.from_config(path).
Thought Methods
create_thought
create_thought takes a single frozen ThoughtRecord object (build it, then
pass it) and returns the persisted record. It does not take field keyword
arguments and does not return a UUID string.
import uuid
from engrava import ThoughtRecord, ThoughtType, Priority, LifecycleStatus
record = ThoughtRecord(
thought_id=str(uuid.uuid4()),
thought_type=ThoughtType.OBSERVATION,
essence="Short summary (1-200 chars)",
content="Full content text.",
priority=Priority.P2,
lifecycle_status=LifecycleStatus.ACTIVE,
created_cycle=0,
updated_cycle=0,
source="human",
metadata={"role": "user", "lang": "en"}, # optional structured attributes
)
stored = await store.create_thought(record)
# stored is the persisted ThoughtRecord
thought_type values: TASK, OBSERVATION, BELIEF, REFLECTION,
OUTPUT_DRAFT, NOTE.
Keyword-only options: expires_after_seconds (relative TTL) and
deduplicate (collapse identical content into the existing thought, bumping
its confirmation_count).
remember
remember is the one-call shorthand over create_thought for the common case
of storing a bare string — it generates the UUID and fills the fields for you.
stored = await store.remember("User prefers concise answers")
# stored is the persisted ThoughtRecord (thought_type=NOTE, priority=P3,
# essence=text[:200], content=text, created_cycle=updated_cycle=0)
Signature: remember(text, *, metadata=None, deduplicate=False) -> ThoughtRecord.
For a write that needs an explicit cognitive cycle, build a ThoughtRecord and
call create_thought instead — remember always stamps cycle 0.
recall
recall is the one-call shorthand over search_hybrid for retrieval.
result = await store.recall("what does the user prefer?")
for thought_id, score in result.results:
...
Signature: recall(query, *, top_k=10, current_cycle=None) -> HybridSearchResult.
When current_cycle is None the recency signal is inactive; pass a cycle to
blend recency into ranking.
get_thought
thought = await store.get_thought(thought_id)
# Returns ThoughtRecord, or None if not found
update_thought
await store.update_thought(
thought_id,
essence="Updated summary",
priority="P1",
)
# Optimistic-concurrency update; raises ThoughtNotFoundError / StaleDataError
list_thoughts
thoughts = await store.list_thoughts(
limit=50,
lifecycle_status="ACTIVE", # optional filter
thought_type="OBSERVATION", # optional filter
)
Filters are keyword-only: thought_type, lifecycle_status, priority,
include_expired, limit, offset. Use count_thoughts(...) for the count
under the same filters.
Edge Methods
create_edge
create_edge takes a single EdgeRecord object and returns the persisted
record. It raises ReferentialIntegrityError when an endpoint thought does
not exist.
import uuid
from engrava import EdgeRecord, EdgeType
edge = await store.create_edge(
EdgeRecord(
edge_id=str(uuid.uuid4()),
from_thought_id=thought_id,
to_thought_id=other_id,
edge_type=EdgeType.ASSOCIATED,
weight=0.8, # required; float in [0.0, 1.0]
created_cycle=0,
)
)
edge_type values: ASSOCIATED, DEPENDS_ON, DERIVED_FROM, MESSAGE_OF,
BRIDGE, CONSOLIDATED_FROM, CONTESTED_BY.
get_edges
edges = await store.get_edges(
thought_id,
direction="BOTH", # "IN", "OUT", or "BOTH" (keyword-only)
)
Valid-time Methods
invalidate_thought / invalidate_edge
Close a record’s valid-time interval at a given instant — mark a fact as no longer true without deleting it. Deterministic, idempotent, non-cascading (invalidating a thought leaves its edges untouched). The row and its history remain on file; a point-in-time query for an instant before the cut-off still returns it.
# Reality changed on 2026-06-01 — close the window, keep the history.
updated = await store.invalidate_thought(
thought_id,
valid_until="2026-06-01T00:00:00+00:00",
)
# Returns the updated ThoughtRecord (valid_until now set).
await store.invalidate_edge(
edge_id,
valid_until="2026-06-01T00:00:00+00:00",
)
# Returns the updated EdgeRecord.
Use these when a fact stopped being true; use delete_thought only for a fact
that should never have existed. See Bi-temporal Model.
Search Methods
search_hybrid
The primary search method — fuses FTS5, vector, recency, priority, and optionally graph signals.
result = await store.search_hybrid(
"python async agents", # query_text (positional)
query_vector=embedding, # optional; enables vector signal
top_k=10,
current_cycle=42, # optional; enables recency signal
fts_weight=0.3, # override per-call
vector_weight=0.55,
recency_weight=0.1,
priority_weight=0.05,
graph_weight=0.0, # opt-in
include_reflections=True,
reflection_boost=1.0,
)
for thought_id, score in result.results:
thought = await store.get_thought(thought_id)
print(f"{score:.3f} {thought.essence}")
search_hybrid returns a single HybridSearchResult with two fields:
result.results (a list[tuple[str, float]] of (thought_id, combined_score),
highest first) and result.backends_used (a frozenset[str] of the signals
that contributed, e.g. {"fts5", "vector", "graph_expansion"}).
search_fts
results = await store.search_fts("keyword query", top_k=10)
# Returns list[tuple[str, float]] — (thought_id, bm25_score)
FTS5-only search, ordered by BM25 score. Fetch each record with
get_thought when you need its fields.
search_similar
results = await store.search_similar(query_vector, top_k=10)
# Returns list[tuple[str, float]] — (thought_id, score)
Vector-only cosine similarity search.
search_reflections_only
result = await store.search_reflections_only(
"recurring themes",
query_vector=embedding,
top_k=5,
current_cycle=42,
)
Hybrid search restricted to REFLECTION thoughts. Returns a
HybridSearchResult (same shape as search_hybrid), so reflections do not
compete against regular thoughts for result slots.
Schema and Lifecycle
await store.ensure_schema() # idempotent — call once on startup
await store.close() # closes the connection only if the store owns it
# (from_config); for a manually-supplied connection
# this is a no-op — close the connection yourself.
# Does not checkpoint the WAL.
ThoughtRecord
The core domain model. All fields are immutable (frozen=True).
from engrava import ThoughtRecord
thought: ThoughtRecord = await store.get_thought(thought_id)
thought.thought_id # UUID str
thought.thought_type # "TASK" | "OBSERVATION" | "BELIEF" | "REFLECTION" | "OUTPUT_DRAFT" | "NOTE"
thought.essence # short summary (1-200 chars, indexed for FTS)
thought.content # full content text
thought.priority # "P1" | "P2" | "P3" | "P4"
thought.lifecycle_status # "CREATED" | "ACTIVE" | "DONE" | "ARCHIVED"
thought.confidence # float | None
thought.source # origin identifier
thought.metadata # dict (structured attributes; defaults to {})
thought.created_cycle # int (creation cycle number)
thought.updated_cycle # int (last update cycle)
thought.confirmation_count # int
thought.access_count # int
thought.created_at # ISO-8601 str | None (when persisted)
thought.updated_at # ISO-8601 str | None (last mutation)
thought.valid_from # ISO-8601 str | None (valid-time lower bound; None = −∞)
thought.valid_until # ISO-8601 str | None (valid-time upper bound, exclusive; None = +∞)
valid_from / valid_until are the optional valid-time bounds — the
real-world window during which the fact is true, independent of when it was
stored. Both default to None (valid for all time). EdgeRecord carries the
same two fields. See Bi-temporal Model for the
semantics and query predicates.
To produce a modified copy:
updated = thought.model_copy(update={"priority": "P1", "confidence": 0.9})
MindQL
MindQL is engrava’s read-only query language. MindQLExecutor runs against an
open aiosqlite.Connection (the same connection the store wraps), and
execute() takes a parsed MindQLQuery — parse the string first with
parse().
from engrava import MindQLExecutor, MindQLResult, parse
executor = MindQLExecutor(conn) # an aiosqlite.Connection, not a store
result: MindQLResult = await executor.execute(
parse("FIND thoughts WHERE thought_type = 'OBSERVATION' LIMIT 10")
)
print(result.rows) # list[dict]
print(result.count) # int | None (set for COUNT queries)
If your connection is owned by the store, you can run a parsed query directly via the store instead of constructing an executor:
from engrava import parse
result = await store.execute_mindql(
parse("FIND thoughts WHERE valid_now")
)
execute_mindql is the store-level entry point for the same execution
contract — it accepts an already-parsed MindQLQuery and returns a
MindQLResult.
Grammar
The grammar requires a table name and an optional WHERE clause:
FIND <table> [WHERE <field> <op> '<value>' [AND ...]] [LIMIT n]
Tables: thoughts, edges, embeddings, actions (singular forms accepted
too). Operators: =, !=, >, <, >=, <=. String values are
single-quoted.
FIND
Retrieve rows matching a filter.
await executor.execute(parse("FIND thoughts WHERE lifecycle_status = 'ACTIVE' LIMIT 10"))
await executor.execute(parse("FIND thoughts WHERE priority = 'P1' AND thought_type = 'OBSERVATION'"))
await executor.execute(parse("FIND edges WHERE from_thought_id = 'abc'"))
COUNT
Count rows matching a filter.
result = await executor.execute(parse("COUNT thoughts WHERE thought_type = 'OBSERVATION'"))
print(result.count)
SELECT
Read-only SQL passthrough.
result = await executor.execute(
parse("SELECT thought_id, essence FROM thought WHERE lifecycle_status = 'ACTIVE'")
)
for row in result.rows:
print(row["thought_id"], row["essence"])
parse
parse() returns a MindQLQuery plan with .command, .table,
.conditions, and .limit:
from engrava import parse, MindQLQuery
query: MindQLQuery = parse("FIND thoughts WHERE thought_type = 'OBSERVATION' LIMIT 5")
query.command # MindQLCommand.FIND
query.table # "thought"
query.conditions # list of parsed conditions (field / operator / value)
query.limit # 5
Result Structure
result.columns # list[str] — column names in result order
result.rows # list[dict]
result.count # int | None — set for COUNT queries
result.command # str — the command that was executed
Embedding Providers
Install one of the extras and pass a provider to the store:
pip install 'engrava[embeddings-local]' # sentence-transformers
pip install 'engrava[embeddings-openai]' # OpenAI-compatible text-embedding-*
pip install 'engrava[embeddings-ollama]' # Ollama local models
pip install 'engrava[embeddings-hf]' # Hugging Face Inference API
from engrava import SentenceTransformerProvider
provider = SentenceTransformerProvider(model_name="all-MiniLM-L12-v2")
vector = await provider.embed("text to embed")
await store.store_embedding(thought_id, vector, model_name=provider.model_name)
store_embedding is keyword-only for model_name (and optional
embedding_id); the vector dimension is derived from len(vector).
The CallbackProvider accepts any Python callable as an embedding function —
useful for testing or for providers not yet wrapped:
from engrava import CallbackProvider
provider = CallbackProvider(
callback=lambda text: [0.0] * 384,
dimension=384,
model_name="stub",
)