M6 primitives — memory, lease, policy, fanout
Four protocol-first primitives every harness reinvents. Backends are pluggable; the shipped reference adapters get out of the way as soon as you have a real backend.
The substrate ships four protocol-first primitives that every harness reinvents. Backends are pluggable; the shipped reference adapters get out of the way as soon as you have a real backend.
from entorin.memory import FsMemory, MemoryClient
from entorin.lease import FsLease, LeaseClient
from entorin.policy import PolicyTable, RetryPolicy
from entorin.search import fanout, best_of_n
Memory
Identity-scoped, audited storage in two tiers (session, cross_run). MemoryClient wraps any Memory backend (SessionMemory, FsMemory, your own postgres / Redis / vector DB) with capability check + audit events:
memory = MemoryClient(backend=FsMemory(Path("memory_root")), bus=bus)
prefs = await memory.get(ctx, "cross_run", "prefs") # memory.read
await memory.put(ctx, "cross_run", "prefs", {"theme": "dark"}) # memory.write
Capabilities: memory.session, memory.cross_run. Per-key gates? Wrap your own Memory impl.
Lease
Named coordination so two parallel runs serialize on a shared resource. FsLease is flock-based per-machine; the protocol is Redis-shaped so a future RedisLease is a drop-in.
lease = LeaseClient(backend=FsLease(Path("lease_root")), bus=bus)
handle = await lease.acquire(ctx, "user:42:prefs", ttl_seconds=30, wait_timeout=5)
try:
...
finally:
await lease.release(ctx, handle)
Capability: lease. kill -9 on the holder releases the flock automatically (no zombie deadlocks). TTL is informational, not enforced — for “force-eject after TTL even if alive”, swap in a Redis-backed adapter.
Failure policy
Declarative dispatch from type[EntorinError] to a recovery PolicyOutcome (retry / replan / skip / fallback_human / terminate). The substrate names the recovery; the orchestrator acts on it.
from entorin.errors import TransientError, ParseError
table = PolicyTable(bus=bus, default=RetryPolicy(max_attempts=1))
table.register(TransientError, RetryPolicy(max_attempts=3))
table.register(ParseError, RetryPolicy(max_attempts=1))
outcome = await table.handle(error, ctx, attempt=current_attempt)
match outcome:
case PolicyOutcome.retry: ...
case PolicyOutcome.replan: bus.publish(replan_event); ...
case PolicyOutcome.fallback_human: await checkpoint.request(ctx, ...)
case _: raise
MRO-walking lookup: registering EntorinError catches everything not otherwise handled.
Search fanout
Schedule N parallel branches, merge the results, audit the structural fork/join. BudgetGate on the run-shared Ledger naturally caps cumulative cost across all branches; per-branch slicing is the caller’s closure.
async def branch(i):
... # call your Model with a per-branch budget hint
return result
best = await fanout(
[lambda i=i: branch(i) for i in range(5)],
ctx=ctx, bus=bus,
parent_node="search",
merge=best_of_n(scorer=lambda r: r.confidence),
)
Other merges? Build your own — Merge is a plain Callable[[Sequence[T]], R] alias.