aarondb/sharded

Types

Rebalance facts across the cluster based on the current shard map. This is a simplified implementation that moves data between shards.

pub type MigrationPlan {
  MigrationPlan(
    moves: List(#(Int, List(#(fact.Eid, String, fact.Value)))),
  )
}

Constructors

Values

pub fn add_shard(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  adapter: option.Option(storage.StorageAdapter),
) -> Result(
  query_types.ShardedDb(process.Subject(transactor.Message)),
  String,
)

Dynamically add a new shard to the cluster. This will update the ShardMap and trigger a rebalance.

pub fn bloom_query(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  join_var: String,
  probe_clauses: List(ast.BodyClause),
  build_clauses: List(ast.BodyClause),
) -> query_types.QueryResult

Perform a Bloom Filter Optimized distributed join. This runs in two passes:

  1. Probe: Executes the probe clauses to identify join keys.
  2. Build: Executes the build clauses on shards using a Bloom filter of identified keys.
pub fn calculate_migration_plan(
  shards: List(#(Int, List(dict.Dict(String, fact.Value)))),
  shard_map: query_types.ShardMap,
) -> MigrationPlan

Calculate which facts need to move based on the current distribution. Pure function: f(ClusterState) -> MigrationPlan

pub fn global_vector_search(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  query_vec: List(Float),
  threshold: Float,
  k: Int,
) -> List(vec_index.SearchResult)

Perform a global vector similarity search across all shards. Phase 50: Distributed V-Link.

pub fn migrate_shard_data(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  from_shard: Int,
  to_shard: Int,
  filter: fn(#(fact.Eid, String, fact.Value)) -> Bool,
) -> Result(Int, String)

Manually migrate data from one shard to another.

pub const mirror_shard_id: Int
pub fn pull(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  eid: fact.Eid,
  pattern: List(ast.PullItem),
) -> query_types.PullResult

Pull an entity in parallel across all shards.

pub fn query(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  query: ast.Query,
) -> query_types.QueryResult

Query the sharded database (Parallel Scatter-Gather). Warning: This performs a full scan across all shards.

pub fn query_at(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  query: ast.Query,
  as_of_tx: option.Option(Int),
  as_of_valid: option.Option(Int),
) -> query_types.QueryResult

Query the sharded database at a specific temporal basis.

pub fn rebalance(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
) -> Result(
  query_types.ShardedDb(process.Subject(transactor.Message)),
  String,
)

Impure wrapper to execute a rebalance.

pub fn start_local_sharded(
  cluster_id: String,
  shard_count: Int,
  adapter: option.Option(storage.StorageAdapter),
) -> Result(
  query_types.ShardedDb(process.Subject(transactor.Message)),
  String,
)

Start a sharded database cluster in local (named) mode.

pub fn start_sharded(
  cluster_id: String,
  shard_count: Int,
  adapter: option.Option(storage.StorageAdapter),
) -> Result(
  query_types.ShardedDb(process.Subject(transactor.Message)),
  String,
)

Start a sharded database cluster.

pub fn stop(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
) -> Nil

Stop the sharded database.

pub fn transact(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  facts: List(#(fact.Eid, String, fact.Value)),
) -> Result(List(state.DbState), String)

Ingest facts into the sharded database in parallel. Routing is determined by hashing the Entity ID (Eid).

pub fn transact_shard(
  db: query_types.ShardedDb(process.Subject(transactor.Message)),
  shard_id: Int,
  facts: List(#(fact.Eid, String, fact.Value)),
) -> Result(state.DbState, String)

Transact on a specific shard regardless of entity hashing.

Search Document