aarondb/sharded
Types
Rebalance facts across the cluster based on the current shard map. This is a simplified implementation that moves data between shards.
pub type MigrationPlan {
MigrationPlan(
moves: List(#(Int, List(#(fact.Eid, String, fact.Value)))),
)
}
Constructors
-
MigrationPlan( moves: List(#(Int, List(#(fact.Eid, String, fact.Value)))), )
Values
pub fn add_shard(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
adapter: option.Option(storage.StorageAdapter),
) -> Result(
query_types.ShardedDb(process.Subject(transactor.Message)),
String,
)
Dynamically add a new shard to the cluster. This will update the ShardMap and trigger a rebalance.
pub fn bloom_query(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
join_var: String,
probe_clauses: List(ast.BodyClause),
build_clauses: List(ast.BodyClause),
) -> query_types.QueryResult
Perform a Bloom Filter Optimized distributed join. This runs in two passes:
- Probe: Executes the probe clauses to identify join keys.
- Build: Executes the build clauses on shards using a Bloom filter of identified keys.
pub fn calculate_migration_plan(
shards: List(#(Int, List(dict.Dict(String, fact.Value)))),
shard_map: query_types.ShardMap,
) -> MigrationPlan
Calculate which facts need to move based on the current distribution. Pure function: f(ClusterState) -> MigrationPlan
pub fn global_vector_search(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
query_vec: List(Float),
threshold: Float,
k: Int,
) -> List(vec_index.SearchResult)
Perform a global vector similarity search across all shards. Phase 50: Distributed V-Link.
pub fn migrate_shard_data(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
from_shard: Int,
to_shard: Int,
filter: fn(#(fact.Eid, String, fact.Value)) -> Bool,
) -> Result(Int, String)
Manually migrate data from one shard to another.
pub const mirror_shard_id: Int
pub fn pull(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
eid: fact.Eid,
pattern: List(ast.PullItem),
) -> query_types.PullResult
Pull an entity in parallel across all shards.
pub fn query(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
query: ast.Query,
) -> query_types.QueryResult
Query the sharded database (Parallel Scatter-Gather). Warning: This performs a full scan across all shards.
pub fn query_at(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
query: ast.Query,
as_of_tx: option.Option(Int),
as_of_valid: option.Option(Int),
) -> query_types.QueryResult
Query the sharded database at a specific temporal basis.
pub fn rebalance(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
) -> Result(
query_types.ShardedDb(process.Subject(transactor.Message)),
String,
)
Impure wrapper to execute a rebalance.
pub fn start_local_sharded(
cluster_id: String,
shard_count: Int,
adapter: option.Option(storage.StorageAdapter),
) -> Result(
query_types.ShardedDb(process.Subject(transactor.Message)),
String,
)
Start a sharded database cluster in local (named) mode.
pub fn start_sharded(
cluster_id: String,
shard_count: Int,
adapter: option.Option(storage.StorageAdapter),
) -> Result(
query_types.ShardedDb(process.Subject(transactor.Message)),
String,
)
Start a sharded database cluster.
pub fn stop(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
) -> Nil
Stop the sharded database.
pub fn transact(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
facts: List(#(fact.Eid, String, fact.Value)),
) -> Result(List(state.DbState), String)
Ingest facts into the sharded database in parallel. Routing is determined by hashing the Entity ID (Eid).
pub fn transact_shard(
db: query_types.ShardedDb(process.Subject(transactor.Message)),
shard_id: Int,
facts: List(#(fact.Eid, String, fact.Value)),
) -> Result(state.DbState, String)
Transact on a specific shard regardless of entity hashing.