Data Model: Versioned Sparse Tables
Partition the Row Space (Region, Region Server)
Transactions
Fault Tolerance: RegionServer Dies
Region data still safely stored in 3x replicated HDFS files
The regions are handoff to healthy RegionServers
Design implication
Storage Layout
Store new data in memory until we have a lot of data
Then do one big write to disk
Tombstones are used when we delete data (need to write something for record but cannot erase old version in finalized file)
If there are too many files, reads become too slow, so that need to compact/combine smaller files into bigger files.
Clusters
Cassandra clusters have many worker nodes
No centralized boss node (unlike HDFS, Spark)
Not necessarily same data center (could be geographically distributed)
Clusters are called rings because some nodes are defined to be adjacent
Ring organization doesn't necessarily correspond to network topology
Keyspaces
Partition (Wide Partition)
A partition is a collection of rows that share the same partition key. The partition key is used to determine which node in the cluster will store the partition.
Advantages:
Disadvantages:
Design:
Partition key column(s): uniquely identifies partition, determines machine placement
Static columns: one value per partition key
Cluster column(s): determines sort order within partition
Primary Key: uniquely identifies row (Combination of partition key and cluster key)
Example:
Demos
Setup Environment and Servers
# DOCKER
FROM ubuntu:22.04
RUN apt-get update; apt-get install -y wget curl openjdk-8-jdk python3-pip net-tools lsof nano
# Jupyter
RUN pip3 install jupyterlab==4.0.3 pandas==2.1.1 pyspark==3.4.1 cassandra-driver==3.28.0
# SPARK
RUN wget <https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz> && tar -xf spark-3.4.1-bin-hadoop3.tgz && rm spark-3.4.1-bin-hadoop3.tgz
# CASSANDRA
RUN wget <https://dlcdn.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz>; tar -xf apache-cassandra-4.1.3-bin.tar.gz; rm apache-cassandra-4.1.3-bin.tar.gz
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
ENV PATH="${PATH}:/apache-cassandra-4.1.3/bin:/spark-3.4.1-bin-hadoop3.2/bin"
COPY cassandra.sh /cassandra.sh
CMD ["sh", "/cassandra.sh"]
# cassandra.sh
echo "-Xms128M" >> /apache-cassandra-4.1.3/conf/jvm-server.options
echo "-Xmx128M" >> /apache-cassandra-4.1.3/conf/jvm-server.options
sed -i "s/^listen_address:.*/listen_address: "`hostname`"/" /apache-cassandra-4.1.3/conf/cassandra.yaml
sed -i "s/^rpc_address:.*/rpc_address: "`hostname`"/" /apache-cassandra-4.1.3/conf/cassandra.yaml
sed -i "s/- seeds:.*/- seeds: demo-db-1,demo-db-2,demo-db-3/" /apache-cassandra-4.1.3/conf/cassandra.yaml
/apache-cassandra-4.1.3/bin/cassandra -R
sleep infinity
# YML
services:
db:
image: cassandra-demo
deploy:
replicas: 3
volumes:
- "./nb:/nb"
ports:
- "127.0.0.1:5000-5002:5000"
docker exec -d <CONTAINER> python3 -m jupyterlab --no-browser --ip=0.0.0.0 --port=5000 --allow-root --NotebookApp.token='’
# ssh tunnel of JupyterNotebook
Command Line
docker exec -it demo-db-1 bash
# get into command line of a nodenodetool status
# check status of Cassendra workersnodetool ring
# check which rows, partitions belong on which tables in the clustercqlsh demo-db-1
help
# check commands
help describe
# check details of commands
create keysapce banking with replication = {'class': 'SimpleStrategy', 'replication_factor':3} ;
# creating a keyspace
describe keyspace
# check all keyspaces
describe tables
# show tables under each keyspace
use banking
# working under banking keyspace
describe tables
# expect to see nothing
JupyterNotebook
from cassandra.cluster import Cluster
cluster = Cluster(["demo-db-1", "demo-db-2", "demo-db-3"])
cass = cluster.connect()
# create keyspace in notebook
cass.execute("create keyspace banking with replication={'class': 'SimpleStrategy', 'replication_factor': 2};")
# create tables under the banking keyspace
cass.execute("use banking")
cass.execute("drop table if exists loans")
cass.execute("""
CREATE TABLE loans (
bank_id INT,
bank_name TEXT STATIC,
loan_id UUID,
amount INT,
state TEXT,
PRIMARY KEY ((bank_id), amount, loan_id)
) WITH CLUSTERING ORDER BY (amount DESC, loan_id ASC)
""")
# used to see what default cassendra did for table, could modify when creating table again
print(cass.execute("describe table loans").one().create_statement)
# INSERT DATA
cass.execute("""
INSERT INTO loans (bank_id, bank_name)
VALUES (544, 'test2')
""")
import pandas as pd
pd.DataFrame(cass.execute("select * from loans"))
# INSERT is really UPSERT (meaning update OR insert)
cass.execute("""
INSERT INTO loans (bank_id, amount, loan_id)
VALUES (544, 300, UUID())
""") # this does not insert, but update the record
cass.execute("""
INSERT INTO loans (bank_id, bank_name, amount, loan_id, state)
VALUES (544, 'mybank', 400, NOW(), 'wi')
""")
# NOW() and UUID() both return UUIDs
# NOW() is "more" unique (looks at MAC address, timestamp, sequence number)
cass.execute("""
INSERT INTO loans (bank_id, bank_name, amount, loan_id, state)
VALUES (999, 'uwcu', 500, NOW(), 'il')
""")
# ADD NEW COLUMN
cass.execute("""
CREATE TYPE FullName (first text, last text)
""")
cass.execute("""
alter table loans add (username FullName)
""")
cass.execute("""
INSERT INTO loans (bank_id, bank_name, amount, loan_id, username)
VALUES (999, 'uwcu', 500, NOW(), {first:'Tyler', last:'Caraza-Harter'})
""")
pd.DataFrame(cass.execute("select username, username.first, username.last from loans"))
# Prepared Statements (Like creating a function)
uwcu_insert = cass.prepare("""
INSERT INTO loans (bank_id, bank_name, amount, loan_id, username)
VALUES (999, 'uwcu', ?, NOW(), {first:?, last:?})
""")
# uwcu_insert.VARIOUS_CONFIG
cass.execute(uwcu_insert, (301, "TestFirst", "TestLast"))
# QURIES
# GROUP BY
pd.DataFrame(cass.execute("""
SELECT bank_id, bank_name, AVG(amount)
FROM loans
GROUP BY bank_id
"""))
# can only group by partition key (or partition key with some more columns of the primary key)
# Solution: Spark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("cs544")
.config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
.getOrCreate())
# approach 1
spark.conf.set("spark.sql.catalog.mycat", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set("spark.sql.catalog.mycat.spark.cassandra.connection.host", "demo-db-1,demo-db-2,demo-db-3")
# approach 2
spark.read.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "????")
.option("keyspace", ????)
.option("table", ????)
.load()
spark.sql("""
SELECT state, AVG(amount)
FROM mycat.banking.loans
GROUP BY state
""").toPandas()
# write out data to data analyst
# TODO: dump to parquet in HDFS, or Hive, or wherever
# spark.sql("""
# SELECT *
# FROM mycat.banking.loans
# """).write.....