entorin

M6 primitives — memory, lease, policy, fanout

Four protocol-first primitives every harness reinvents. Backends are pluggable; the shipped reference adapters get out of the way as soon as you have a real backend.

The substrate ships four protocol-first primitives that every harness reinvents. Backends are pluggable; the shipped reference adapters get out of the way as soon as you have a real backend.

from entorin.memory import FsMemory, MemoryClient
from entorin.lease  import FsLease,  LeaseClient
from entorin.policy import PolicyTable, RetryPolicy
from entorin.search import fanout, best_of_n

Memory

Identity-scoped, audited storage in two tiers (session, cross_run). MemoryClient wraps any Memory backend (SessionMemory, FsMemory, your own postgres / Redis / vector DB) with capability check + audit events:

memory = MemoryClient(backend=FsMemory(Path("memory_root")), bus=bus)
prefs  = await memory.get(ctx, "cross_run", "prefs")          # memory.read
await memory.put(ctx, "cross_run", "prefs", {"theme": "dark"}) # memory.write

Capabilities: memory.session, memory.cross_run. Per-key gates? Wrap your own Memory impl.

Lease

Named coordination so two parallel runs serialize on a shared resource. FsLease is flock-based per-machine; the protocol is Redis-shaped so a future RedisLease is a drop-in.

lease = LeaseClient(backend=FsLease(Path("lease_root")), bus=bus)
handle = await lease.acquire(ctx, "user:42:prefs", ttl_seconds=30, wait_timeout=5)
try:
    ...
finally:
    await lease.release(ctx, handle)

Capability: lease. kill -9 on the holder releases the flock automatically (no zombie deadlocks). TTL is informational, not enforced — for “force-eject after TTL even if alive”, swap in a Redis-backed adapter.

Failure policy

Declarative dispatch from type[EntorinError] to a recovery PolicyOutcome (retry / replan / skip / fallback_human / terminate). The substrate names the recovery; the orchestrator acts on it.

from entorin.errors import TransientError, ParseError

table = PolicyTable(bus=bus, default=RetryPolicy(max_attempts=1))
table.register(TransientError, RetryPolicy(max_attempts=3))
table.register(ParseError,     RetryPolicy(max_attempts=1))

outcome = await table.handle(error, ctx, attempt=current_attempt)
match outcome:
    case PolicyOutcome.retry:           ...
    case PolicyOutcome.replan:          bus.publish(replan_event); ...
    case PolicyOutcome.fallback_human:  await checkpoint.request(ctx, ...)
    case _: raise

MRO-walking lookup: registering EntorinError catches everything not otherwise handled.

Search fanout

Schedule N parallel branches, merge the results, audit the structural fork/join. BudgetGate on the run-shared Ledger naturally caps cumulative cost across all branches; per-branch slicing is the caller’s closure.

async def branch(i):
    ...                              # call your Model with a per-branch budget hint
    return result

best = await fanout(
    [lambda i=i: branch(i) for i in range(5)],
    ctx=ctx, bus=bus,
    parent_node="search",
    merge=best_of_n(scorer=lambda r: r.confidence),
)

Other merges? Build your own — Merge is a plain Callable[[Sequence[T]], R] alias.