Skip to content

Commit

Permalink
feat(schema): fetch schema information (#33)
Browse files Browse the repository at this point in the history
Signed-off-by: Felipi Lima Matozinho <[email protected]>
  • Loading branch information
Matozinho authored Jul 31, 2024
1 parent 9831aa1 commit 6e7930d
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 7 deletions.
4 changes: 2 additions & 2 deletions examples/basic.mts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cluster, } from "../index.js"
import { Cluster } from "../index.js"

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

Expand Down Expand Up @@ -34,4 +34,4 @@ console.log(`Iter queries requested: ${metrics.getQueriesIterNum()}`);
console.log(`Errors occurred: ${metrics.getErrorsNum()}`);
console.log(`Iter errors occurred: ${metrics.getErrorsIterNum()}`);
console.log(`Average latency: ${metrics.getLatencyAvgMs()}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
console.log(`99.9 latency percentile: ${metrics.getLatencyPercentileMs(99.9)}`);
53 changes: 53 additions & 0 deletions examples/fetch-schema.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { Cluster } from "../index.js";

const nodes = process.env.CLUSTER_NODES?.split(",") ?? ["127.0.0.1:9042"];

console.log(`Connecting to ${nodes}`);

const cluster = new Cluster({ nodes });
const session = await cluster.connect();

const clusterData = await session.getClusterData();
const keyspaceInfo = clusterData.getKeyspaceInfo();

if (!keyspaceInfo) throw new Error("No data found");

console.log("ALL KEYSPACES");
for (const keyspaceName in keyspaceInfo) {
console.log("========================================================");
const keyspaceData = keyspaceInfo[keyspaceName];
console.log("Keyspace: ", keyspaceName);
console.log(
"replication strategy: ",
keyspaceData.strategy.kind,
keyspaceData.strategy.data,
);
for (const tableName in keyspaceData.tables) {
console.log("-----------------------");
const tableData = keyspaceData.tables[tableName];
console.log("Table: ", tableName);
console.log("partitionKey: ", tableData.partitionKey);
console.log("clusteringKey: ", tableData.clusteringKey);
console.log("columns: ", tableData.columns);
console.log("-----------------------");
}
console.log("========================================================");
}

console.log("================== SPECIFIC KEYSPACES ==================");
console.log(
"keyspace: system_auth | strategy: ",
keyspaceInfo.system_auth.strategy,
);
console.log(
"keyspace: system_traces | strategy: ",
keyspaceInfo.system_traces.strategy,
);
console.log(
"keyspace: system_distributed_everywhere | strategy: ",
keyspaceInfo.system_distributed_everywhere.strategy,
);
console.log(
"keyspace: system_distributed | strategy: ",
keyspaceInfo.system_distributed.strategy,
);
43 changes: 40 additions & 3 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,35 @@ export const enum VerifyMode {
None = 0,
Peer = 1
}
export interface ScyllaKeyspace {
strategy: ScyllaStrategy
tables: Record<string, ScyllaTable>
views: Record<string, ScyllaMaterializedView>
}
export interface ScyllaStrategy {
kind: string
data?: SimpleStrategy | NetworkTopologyStrategy | Other
}
export interface SimpleStrategy {
replicationFactor: number
}
export interface NetworkTopologyStrategy {
datacenterRepfactors: Record<string, number>
}
export interface Other {
name: string
data: Record<string, string>
}
export interface ScyllaTable {
columns: Array<string>
partitionKey: Array<string>
clusteringKey: Array<string>
partitioner?: string
}
export interface ScyllaMaterializedView {
viewMetadata: ScyllaTable
baseTableName: string
}
export type ScyllaCluster = Cluster
export class Cluster {
/**
Expand Down Expand Up @@ -121,8 +150,9 @@ export class Metrics {
}
export class ScyllaSession {
metrics(): Metrics
execute<T = unknown>(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<T>
query<T = unknown>(scyllaQuery: Query, parameters?: Array<number | string | Uuid> | undefined | null): Promise<T>
getClusterData(): Promise<ScyllaClusterData>
execute(query: string | Query | PreparedStatement, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
query(scyllaQuery: Query, parameters?: Array<number | string | Uuid> | undefined | null): Promise<any>
prepare(query: string): Promise<PreparedStatement>
/**
* Perform a batch query\
Expand Down Expand Up @@ -161,7 +191,7 @@ export class ScyllaSession {
* console.log(await session.execute("SELECT * FROM users"));
* ```
*/
batch<T = unknown>(batch: BatchStatement, parameters: Array<Array<number | string | Uuid> | undefined | null>): Promise<T>
batch(batch: BatchStatement, parameters: Array<Array<number | string | Uuid> | undefined | null>): Promise<any>
/**
* Sends `USE <keyspace_name>` request on all connections\
* This allows to write `SELECT * FROM table` instead of `SELECT * FROM keyspace.table`\
Expand Down Expand Up @@ -231,6 +261,13 @@ export class ScyllaSession {
awaitSchemaAgreement(): Promise<Uuid>
checkSchemaAgreement(): Promise<boolean>
}
export class ScyllaClusterData {
/**
* Access keyspaces details collected by the driver Driver collects various schema details like
* tables, partitioners, columns, types. They can be read using this method
*/
getKeyspaceInfo(): Record<string, ScyllaKeyspace> | null
}
export class Uuid {
/** Generates a random UUID v4. */
static randomV4(): Uuid
Expand Down
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}

const { Compression, Consistency, SerialConsistency, Cluster, VerifyMode, BatchStatement, PreparedStatement, Query, Metrics, ScyllaSession, Uuid } = nativeBinding
const { Compression, Consistency, SerialConsistency, Cluster, VerifyMode, BatchStatement, PreparedStatement, Query, Metrics, ScyllaSession, ScyllaClusterData, Uuid } = nativeBinding

module.exports.Compression = Compression
module.exports.Consistency = Consistency
Expand All @@ -322,6 +322,7 @@ module.exports.PreparedStatement = PreparedStatement
module.exports.Query = Query
module.exports.Metrics = Metrics
module.exports.ScyllaSession = ScyllaSession
module.exports.ScyllaClusterData = ScyllaClusterData
module.exports.Uuid = Uuid

const customInspectSymbol = Symbol.for('nodejs.util.inspect.custom')
Expand Down
1 change: 0 additions & 1 deletion src/cluster/execution_profile/consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,3 @@ impl From<scylla::statement::Consistency> for Consistency {
}
}
}

1 change: 1 addition & 0 deletions src/session/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod metrics;
pub mod scylla_session;
pub mod topology;
13 changes: 13 additions & 0 deletions src/session/scylla_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::types::uuid::Uuid;
use napi::bindgen_prelude::Either3;

use super::metrics;
use super::topology::ScyllaClusterData;

#[napi]
pub struct ScyllaSession {
Expand All @@ -24,6 +25,18 @@ impl ScyllaSession {
metrics::Metrics::new(self.session.get_metrics())
}

#[napi]
pub async fn get_cluster_data(&self) -> ScyllaClusterData {
self
.session
.refresh_metadata()
.await
.expect("Failed to refresh metadata");

let cluster_data = self.session.get_cluster_data();
cluster_data.into()
}

#[napi]
pub async fn execute(
&self,
Expand Down
176 changes: 176 additions & 0 deletions src/session/topology.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::collections::HashMap;
use std::sync::Arc;

use napi::bindgen_prelude::Either3;
use scylla::transport::topology::{Keyspace, MaterializedView, Strategy, Table};
use scylla::transport::ClusterData;

// ============= ClusterData ============= //
#[napi]
pub struct ScyllaClusterData {
inner: Arc<ClusterData>,
}

impl From<Arc<ClusterData>> for ScyllaClusterData {
fn from(cluster_data: Arc<ClusterData>) -> Self {
ScyllaClusterData {
inner: cluster_data,
}
}
}

#[napi]
impl ScyllaClusterData {
#[napi]
/// Access keyspaces details collected by the driver Driver collects various schema details like
/// tables, partitioners, columns, types. They can be read using this method
pub fn get_keyspace_info(&self) -> Option<HashMap<String, ScyllaKeyspace>> {
let keyspaces_info = self.inner.get_keyspace_info();

if keyspaces_info.is_empty() {
None
} else {
Some(
keyspaces_info
.iter()
.map(|(k, v)| (k.clone(), ScyllaKeyspace::from((*v).clone())))
.collect(),
)
}
}
}
// ======================================= //

// ============= Keyspace ============= //
#[napi(object)]
#[derive(Clone)]
pub struct ScyllaKeyspace {
pub strategy: ScyllaStrategy,
pub tables: HashMap<String, ScyllaTable>,
pub views: HashMap<String, ScyllaMaterializedView>,
// pub user_defined_types: HashMap<String, ScyllaUserDefinedType>,
}

impl From<Keyspace> for ScyllaKeyspace {
fn from(keyspace: Keyspace) -> Self {
ScyllaKeyspace {
tables: keyspace
.tables
.into_iter()
.map(|(k, v)| (k, ScyllaTable::from(v)))
.collect(),
views: keyspace
.views
.into_iter()
.map(|(k, v)| (k, ScyllaMaterializedView::from(v)))
.collect(),
strategy: keyspace.strategy.into(),
// TODO: Implement ScyllaUserDefinedType
// user_defined_types: keyspace.user_defined_types.into_iter().map(|(k, v)| (k, ScyllaUserDefinedType::from(v))).collect(),
}
}
}
// ======================================= //

// ============= Strategy ============= //
#[napi(object)]
#[derive(Clone)]
pub struct ScyllaStrategy {
pub kind: String,
pub data: Option<Either3<SimpleStrategy, NetworkTopologyStrategy, Other>>,
}

#[napi(object)]
#[derive(Clone)]
pub struct SimpleStrategy {
pub replication_factor: u32,
}

#[napi(object)]
#[derive(Clone)]
pub struct NetworkTopologyStrategy {
pub datacenter_repfactors: HashMap<String, i32>,
}

#[napi(object)]
#[derive(Clone)]
pub struct Other {
pub name: String,
pub data: HashMap<String, String>,
}

impl From<Strategy> for ScyllaStrategy {
fn from(strategy: Strategy) -> Self {
match strategy {
Strategy::SimpleStrategy { replication_factor } => ScyllaStrategy {
kind: "SimpleStrategy".to_string(),
data: Some(Either3::A(SimpleStrategy {
replication_factor: replication_factor as u32,
})),
},
Strategy::NetworkTopologyStrategy {
datacenter_repfactors,
} => ScyllaStrategy {
kind: "NetworkTopologyStrategy".to_string(),
data: Some(Either3::B(NetworkTopologyStrategy {
datacenter_repfactors: datacenter_repfactors
.into_iter()
.map(|(k, v)| (k, v as i32))
.collect(),
})),
},
Strategy::Other { name, data } => ScyllaStrategy {
kind: name.clone(),
data: Some(Either3::C(Other {
name: name.clone(),
data,
})),
},
Strategy::LocalStrategy => ScyllaStrategy {
kind: "LocalStrategy".to_string(),
data: None,
},
}
}
}
// ======================================= //

// ============= Table ============= //
#[napi(object)]
#[derive(Clone)]
pub struct ScyllaTable {
pub columns: Vec<String>,
pub partition_key: Vec<String>,
pub clustering_key: Vec<String>,
pub partitioner: Option<String>,
}

impl From<Table> for ScyllaTable {
fn from(table: Table) -> Self {
ScyllaTable {
columns: table.columns.clone().into_keys().collect::<Vec<String>>(),
partition_key: table.partition_key.clone(),
clustering_key: table.clustering_key.clone(),
partitioner: table.partitioner.clone(),
}
}
}
// ======================================= //

// ============= MaterializedView ============= //
#[napi(object)]
#[derive(Clone)]
pub struct ScyllaMaterializedView {
pub view_metadata: ScyllaTable,
pub base_table_name: String,
}

impl From<MaterializedView> for ScyllaMaterializedView {
fn from(view: MaterializedView) -> Self {
ScyllaMaterializedView {
view_metadata: ScyllaTable::from(view.view_metadata),
base_table_name: view.base_table_name,
}
}
}
// ======================================= //

0 comments on commit 6e7930d

Please sign in to comment.