Cassandra
Cassandra
Table of Contents
- Table of Contents
- Theory
- Cassandra
- Keyspaces
- Tables
- Partitions
- Node / Nodetool
- Token Ring
- Peer-to-Peer
- Virtual Nodes (VNodes)
- Gossip
- Snitches
- Replication
- Consistency
- Hinted Handoff
- Read Repair
- Write Path
- Read Path
- Compaction
Theory
- Cassandra Strengths
- Scalability
- Linear scale performance
- Reliability
- No single point of failure
(Peer-to-Peer leaderless architecture)
- No single point of failure
- Availability
- Scalability
- Cassandra characteristics
- IDs are created in an independent manner using UUIDs
(Also for time ordered data you can use TIMEUUID)
- IDs are created in an independent manner using UUIDs
Cassandra
- Start Cassandra
- Use
cassandra
to start
- Use
- Logs are located in
node/logs/
- e.g.
node/logs/system.log
Keyspaces
- Keyspaces
- Keyspaces are similar to relational “schemas”
- Serve as a namespace (or wrapper) around DB objects
- Create Keyspace
- When creating Keyspace you must provide replication parameters -
CREATE KEYSPACE <keyspace> WITH REPLICATION = { 'class': '<replication>', 'replication_factor': <replication_factor> }
- e.g.
CREATE KEYSPACE killrvideo WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }
- When creating Keyspace you must provide replication parameters -
- Set Keyspace
USE <keyspace>;
- Show Keyspaces
DESCRIBE KEYSPACES;
SELECT * FROM system_schema.keyspaces;
Tables
- Create table
1 2 3 4 5 6
CREATE TABLE videos ( video_id TIMEUUID, added_date TIMESTAMP, title TEXT, PRIMARY KEY (video_id) );
Insert data
INSERT INTO videos (video_id, added_date, title) VALUES (1645ea59-14bd-11e5-a993-8138354b7e31, '2014-01-29', 'Cassandra History');
- Insert data from file
COPY videos (video_id, added_date, title) FROM 'videos.csv' WITH HEADER = TRUE;
- Show info about table
DESCRIBE TABLE my_table;
- Remove everything
TRUNCATE videos
Partitions
- Partition Key
- It makes data grouped together
- It’s responsible for how the data is placed on the Ring
- So basing on Partition Key you know where data is located on the Ring
- It’s a part of
PRIMARY KEY
- Partitioner uses Consistent Hashing algorithm
- So with remapping hash table, only
number_of_keys / number_of_slots
keys have to be remapped- (in contrast to almost all keys in standard hashing)
- Querying for data is
O(1)
- So with remapping hash table, only
PRIMARY KEY
- The first value in
PRIMARY KEY
is always Partition Key - To make
PRIMARY KEY
unique, additional ID is added (like ordinal ID or UUID) - You can’t change
PRIMARY KEY
in existing data model- There’s no
ALTER TABLE
and add column toPRIMARY KEY
- There’s no
- The first value in
- Clustering Columns
- The first column in
PRIMARY KEY
is Partition Key, remaining ones are Clustering ColumnsPRIMARY KEY((state), city, name, manual_unique_id)
- Data is sorted according to these Clustering Columns
- (And they are sorted during the insert)
- You can inverse sorting using
PRIMARY KEY (...) WITH CLUSTERING ORDER BY (city DESC, name ASC)
- 🔨 Descending sorting can be helpful for Time Series data, as we’re gonna have the latest data right on the top
- The first column in
- 🔨 Querying
- Always provide a Partition Key - you ought to use it for the system to know where data lies.
Otherwise you’d end up with a full scan. - You can use
=
,<
,>
on Clustering Columns - All equality comparisons must come before inequality operators
- Specify comparisons in exact order as in the
PRIMARY KEY
- As data is sorted on disk, range searches are a binary search followed by a linear read
- Always provide a Partition Key - you ought to use it for the system to know where data lies.
ALLOW FILTERING
- Makes Cassandra scan all partitions in the table
- Don’t use it
- Unless you really, really have to do it
- Avoid larger data sets
- But still, pls, don’t use it
- If you need such a query, there’s probably something wrong with your data model
- Show token for a given column
SELECT column, tag(column), other_column FROM table
Node / Nodetool
- Node performance
- Single Node can typically handle
6000 - 12000 TRX/s/core
2 - 4 TB
of data
- Warning: Run Cassandra on local storage or direct attached storage, never run on SAN.
- If your Disc has an Ethernet cable, it’s probably a wrong choice.
- Single Node can typically handle
- Show wide basic info
nodetool info
- Check DC and Clusters status
nodetool status
- Get stats for table (e.g. `SSTables info)
nodetool tablestats keyspace1.standard1
- obsolete version:
nodetool cfstats keyspace1.standard1
- Show Cluster information
- Settings common across all nodes, current schema used by each of them
nodetool describecluster
- Show ring info
- e.g. which Nodes own what Token Ranges
nodetool ring
- Show Logging Levels
nodetool getlogginglevels
- Set Logging Level
nodetool setlogginglevel <logger> <level>
- e.g.
nodetool setlogginglevel org.apache.cassandra TRACE
- e.g.
- Set trace probability
- It’s percentage of queries being saved
nodetool settraceprobability 0.1
- Show trace
- Look at tables in
system_traces
keyspace
- Look at tables in
- Map tags to endpoints/nodes
nodetool getendpoints <keyspace> <table> <value>
- e.g.
nodetool getendpoints killrvideo videos_by_tag 'datastax'
- Gossip information
nodetool gossipinfo
- Drain
- Stops writes from occurring on the node and flushes all the data to disk.
- Typically it may be run before stopping a Node
nodetool drain
- Stop node execution
nodetool stopdaemon
- Stress test
- Located in
cassandra/tools/bin
directory - Example stress test:
cassandra-stress write n=50000 no-warmup -rate threads=1
- Located in
- Flush
- Use this to all data written to MemTable be committed into the disk
nodetool flush
- A way to rebuild
SSTables
nodetool upgradesstables (...)
Token Ring
- Token Ring
- Key concept for performance
- Token Ring keeps track of tokens
- Each Node is responsible for a range of data (Token Range)
- Allows to know exactly which Nodes contain which Partitions
- Helps to avoid Single Points of Failure
- Token
partition_key => token
- Token is 64bit integer
- So Token Range is
(-2^63 - 2^63-1)
- So Token Range is
- Partitioner
- Determines how data will be distributed across the ring
- Murmur3 or MD5 do fine distribution in a random fashion
- Coordinator
- Any Node can be a Coordinator
- Thanks to Token Ring / Token Range, Coordinator knows where to relay data
- Node joining a Cluster
- New Node tells any existing Node about a will to join
- Other Nodes calculate (manually or automatically) where New Node will fit in the Ring
- After joining, other Nodes stream data to the New Node (Joining status)
- There’s no any downtime during joining
Peer-to-Peer
- What are downsides of Leader-Follower design?
- When leader goes down, you have downtime until a new Leader gets elected
- When internode communication goes down, system may become highly inconsistent, as Leaders can’t see Followers and vice versa
- Then probably new Leaders will be elected, and then after connections goes back up, you may end up with multiple competing Leaders
- Leader-Follower probably involves sharding, in which at least:
- You lose advantages of
JOIN
,GROUP BY
and other aggregations - You have to introduce some way of routing to/from shards
- You lose advantages of
- What makes Peer-to-Peer a great approach?
- There are no Leaders, so there are no election issues
- There is a Coordinator, which takes data and performs writes asynchronously to Replicas
- You don’t have to target any specific Node, you can reach any of them (as all are equal), and that Node will handle the rest
- Split “Failure” Scenario
- Each Node which can be seen by the client is still online
- As long as it’s possible to write to right Replicas, system is operational
- Consistency Level determines if a split is a problem for the system or not especially
Virtual Nodes (VNodes)
- Adding or removing Nodes from/to the system causes it to be unbalanced for certain amount of time
- It also requires existing Nodes to stream data from/to other Nodes
- VNode
- It’s a Virtual Node - an artificial SubNode within a real Node
- It helps to distribute data more evenly
- Real Node is responsible for multiple VNodes areas instead of just one big Node area
- VNodes automate Token Range assignment
- Having
num_tokens
>1
incassandra.yaml
makes VNodes turned on
- Consistent Hashing
- Idea: Let almost all Objects stay assigned to the same Servers even when number of Servers change
- Old Way (Simple Hashing): Having direct reference from Object to the Server. When Server goes down, all references have to be reassigned to another Server
- Implementation: Here both Objects and Servers have their Hashes. Servers cover the full Hash Space - each Server has some Range of Hashes. After applying Hash Function on Object we compare it with provided Ranges and therefore find target Server. When new Server is added, it takes over only some subset of Ranges, thus only subset of Objects needs to be rearranged. Similarly is in the case of removal of a Server.
- Improvement: To mitigate case when removal or adding a Server causes (too) many Objects to be moved (as the * *whole** Range inhabitants have to be reassigned), the actual distribution of Node Ranges is “stripped” - Nodes have not one long continuous space, but they have many small spaces interlaced by other Nodes’ spaces. Therefore a continuous space contains Ranges of multiple Nodes, not just one or even two.
- Consistent Hashing, continuation
- It makes distribution of Objects much more balanced
- Adding/removing Node will require synchronization of less data per Node
- Nodes will have smaller overhead
- Synchronization will take less time as it will be more parallelized
- Show ring info
- e.g. which nodes own which token ranges
nodetool ring
Gossip
- Gossip
- Broadcast Protocol for disseminating (spreading) information
- In Cassandra there’s no central cluster information holder, peers spread such info by themselves (automatically)
- Info is spread out in a polynomial fashion
- so as the more Nodes know that info, the more Nodes are sharing such info to another ones
- Gossip round is initiated every second
- There are 1-3 Nodes randomly* chosen to Gossip
- Nodes can Gossip to any other Node in the Cluster
- * Actually there are some Node choosing criteria
- e.g. Seed and Downed Nodes are slightly more favored
- * Actually there are some Node choosing criteria
- Nodes don’t save data about which Nodes they Gossiped to
- Only metadata are Gossiped, not any client data
- Endpoint State
- It’s a data structure each Node has, containing:
- Heartbeat State
generation
- a timestamp of when Node bootstrappedversion
- a number incremented by Node every second
- Application State
- Stores metadata of the Node, like:
STATUS=NORMAL
- the one we see when executingnodetool status
DC=west
- data centerRACK=rack1
- rackSCHEMA=c2a2b...
- changes as schema mutates over timeLOAD=100.0
- disk space used- …and some other metadata
- Heartbeat State
- It’s a data structure each Node has, containing:
- Gossip Message flow
- Node sends info it possesses about Nodes
- Each digest contains
IP address
,generation
,version
(Complete Heartbeat) - The flow is:
SYN
- Sender initiates Gossip sending
SYN
to Recipient - Recipient examines
SYN
and checks if values ofgeneration
/versions
are newer that current ones
- Sender initiates Gossip sending
ACK
- Recipient sends
ACK
back to Sender - For data outdated from Recipient perspective, Recipient sends related Heartbeats
- For data newer from Recipient perspective, Recipient sends fresh Endpoint States
- Recipient sends
ACK2
- Sender sends
ACK2
to Recipient with fresh Endpoint States for endpoints Recipient had stale data
- Sender sends
- Gossip Message Flow, continuation
- Constant rate of network usage (trickle)
- Doesn’t case network spikes
- Minimal message size in comparison with “real” table data
- Actually just a lightweight background process
- Constant rate of network usage (trickle)
- Gossip information
- Nodetool
nodetool gossipinfo
- You can also get Gossip data with query like
SELECT peer, data_center, host_id, rack, release_version, rpc_address, schema_version FROM system.peers;
- (And for current Gossip info look at
system.local
table)
- Nodetool
Snitches
- Snitches
- Report Node’s Rack and DC
- Determine “topology” of a cluster (which Nodes belong where)
- Snitches, continuation
- Configured in
endpoint_snitch
incassandra.yaml
- 🔨 Be sure all Nodes in a Cluster use the same Snitch!
- Also: changing Cluster network topology (and/or Snitches) requires restarting all Nodes
- 🔨 Be aware that Racks and DCs are logical assignments to Cassandra. Please ensure that your logical racks and data centers align with your physical failure zones.
- Configured in
- Types of Snitches
- Regular
- e.g.
SimpleSnitch
,PropertyFileSnitch
,GossipingPropertyFileSnitch
,DynamicSnitch
- e.g.
- Cloud Based
- Snitches for particular cloud environments like Ec2, Ec2 Multi Region, Cloudstack, Google Cloud
- Regular
- Snitches Comparison
SimpleSnitch
- Default
- Places all Nodes in the same DC and Rack
- Appropriate only for single DC deployments
PropertyFileSnitch
- Reads DC and Rack info from
cassandra-topology.properties
file - File consists of rows in shape like
IP=DC:RAC
- When using this Snitch you have to keep files in sync across all Nodes in the Cluster!
- Reads DC and Rack info from
- 🔨
GossipingPropertyFileSnitch
- This should be your go-to Snitch for use in production
- Declare each Node’s info in
cassandra-rackdc.properties
files separately - You don’t have to cope with manual sync like in regular
PropertyFileSnitch
- Gossip automatically spreads info through the Cluster
1 2
dc=DC1 rack=RACK1
- Note: Racks can have same names in different DCs, but these will be still different Racks (see it like namespace, e.g.
DC1.RACK1
andDC2.RACK1
are separate ones)
RackInferringSnitch
- Infers DC and Rack from IP
- Be sure to read docs about it as it’s kinda simple yet complex solution!
- 🔨
DynamicSnitch
- Layered on top of current Snitch (wrapper)
- Turned on by default for all Snitches
- Monitors Cluster health and performance
- Determines which Node to query so it will be the most efficient choice
- Thanks to that e.g. sluggish Nodes aren’t overwhelmed and traffic goes to most performant ones at a given time
Replication
- Replication
- With
RF=1
there’s only one “instance” of data- So if Node is unavailable, this data availability is equal to 0%
- With
RF=2
Nodes keep their own data and their neighbors’ data as well - 🔨
RF=3
seems to be a really decent (even preferable) factor in general.- It’s just a sweet spot.
- Any Node has data from both his neighbors.
- Scenario when three Servers get down together has a pretty low probability
- With
- Multi DC Replication
- In Multi DC, Coordinator replicates data just like in a single DC, but of course according to DC’s RF setting
- In another DC there becomes a Local Coordinator who does it
- RF can be different for different DCs
- 🔨 You can e.g. disable replication when due to law data must not leave the country
- In Multi DC, Coordinator replicates data just like in a single DC, but of course according to DC’s RF setting
Consistency
- Consistency Levels - CL
ONE
- just one available Node is sufficient for data to be processed successfullyQUORUM
(MAJORITY) - it’s a reasonable sweet spotALL
- here you just remove Partition Tolerance from your system (Total Consistency)- Any many, many others.. (But actually only for sophisticated purposes)
- And don’t use
ANY
- Strong Consistency
- It can be described as
- “When I try to read data that I’ve just wrote, I’ll always receive exactly such data and never anything stale“
- You can have e.g.
Write CL = ALL; Read CL = ONE
and have Strong Consistency (but this example it’s rather not a great choice..) - The reasonable way is to have both
Write CL = Read CL = QUORUM
.
You have Strong Consistency here without having to Write/Read on all Nodes!
- It can be described as
- Consistency Levels - Comparison
ONE
- It can be used for eventually consistent data
- For example: log data, IoT data
- And it’s just a tradeoff of having faster system but with slower consistency
- However “slower” can be just even in range of millis (within DC)
LOCAL_QUORUM
- In Multi DC, standard
QUORUM
would mean that confirmations would have come from all Nodes across all DCs. Do expect high latency (at least). - Local Quorum handles this making Quorum limited only to given DC
- In Multi DC, standard
- Check current CL
CONSISTENCY;
- Set CL
CONSISTENCY <level>;
Hinted Handoff
- Hinted Handoff
- Hinted Handoff allows to be able to handle writes, even when Nodes are down
- In short, Coordinator holds data and waits for Node to become available
- Check the scenario:
- Write request arrives into a Cluster with
RF=3
- Coordinator sends data into Replicas, however one Replica is down
- So then Coordinator stores data to send it when Node eventually goes up
- Write request arrives into a Cluster with
- Hints
- Hints are stored in directory specified in
cassandra.yaml
- Hints can be disabled
- Hints do not fulfill RF/CL settings
- e.g. when you execute Read Query when all relevant Nodes are down (and Write Data is stored merely as Hints), that Query will fail
- Hints are stored in directory specified in
- Default time Hint is stored is
3 hours
- After that time a given write is just lost
- Also if Coordinator dies during this time, data will be lost as well
- (That’s why
CL=ANY
is quite not reasonable)
- Show saved Hints
ls -l node/data/hints
Read Repair
- Read Data Flow for
CL=ALL
- Coordinator gets a single instance of actual data and checksums
- The most performant Node is asked for data, other Nodes are asked just for checksums
- Checksums are compared
- When checksums don’t match, Read Repair takes place
- Coordinator asks Nodes for real data
- Coordinator compares the real data and chooses data instance with highest timestamp value
- Coordinator sends this data instance for all Nodes having outdated data
At the same time result is returned to the client
- Coordinator gets a single instance of actual data and checksums
- Read Repair
- Proactive activity improving Consistency
- Read Repair happens only when
Read CL
<ALL
- (In other words: if there’s a possibility of getting out of sync, Cassandra can proactively try to fix potentially existing inconsistencies)
- Process happens asynchronously in the background
- Note: Even if Read Repair repairs a given portion of data, client will receive stale data as process happens at the same time when data are returned
- By default
dclocal_read_repair_chance
(Read Repair taking place in the same DC as Coordinator) is set to0.1
( 10%) - By default
read_repair_chance
(Read Repair taking place across all DCs) is set to0
- Full Repair
- You can use
nodetool
to manually execute Full Repair- But beware! It’s extremely resource intensive operation and can even get down your entire Cluster!
- Treat it as a last line of defense
- Full Repair synchronizes all data in Cluster
- If to do so, then only in Clusters servicing high writes/deletes
- Or to synchronize a failed Node coming back online
- Or run on Nodes that aren’t read very often
- You can use
- Data is stored in
/data/data/<keyspace>/
Write Path
- When data arrives to a Node it goes into two places
MemTable
in RAMMemTable
is always (so at insert time) ordered by Partition Key and then by Clustering ColumnsMemTable
is for fast serving data to the user
CommitLog
on HDD- Located in
node/data/commitlog
CommitLog
is stored sequentially - every new record is appended at the very endCommitLog
is for safety purposes - in case of Node failure not to lost data present in memory- Node can perform event sourcing using
CommitLog
(e.g. after a sudden crash)
- Node can perform event sourcing using
- Located in
- After saving data in both
MemTable
andCommitLog
, Cassandra responds to Client that Write was successful- When flush time comes (e.g. when
MemTable
gets full), data is persisted in disk- Also
CommitLog
is erased, because it’s no longer needed
- Also
- Data is persisted in
SSTable
(Sorted String Table)SSTable
is an immutable data structure- Data in
SSTable
is always sorted by Partition Key and Clustering Columns - Partitions in
SSTable
are likely to have different sizes
- As the time goes by, the more data is being added, more
SSTables
appear
- When flush time comes (e.g. when
- 🔨 It is recommended to store
CommitLog
in a different physical disk thanSSTables
CommitLog
is just sequential write - append, append, append..SSTable
is random- So it is a good idea to split it - and then even on HDD seq-read will be surprisingly fast
- Watching
CommitLog
- To see contents of
CommitLog
live, you can use such a bash command: watch -n 1 -d "ls -lh /data/commitlog
- To see contents of
Read Path
- Read requires to take data from both
MemTable
andSSTables
- Reading from
MemTable
- Very fast, as it lies in Memory
- Also
MemTables
are organized by Partition Key (hash value / token), so finding data has complexity just of binary search comparing operations
- Reading from
SSTable
- Somehow slower, as it’s on disk, not RAM
- Notice that Partitions don’t have the same size/length
- Therefore we have file offsets for each given Partition
- Such offsets lie in Partition Index file
- Reading from
- Partition Index
- It’s an offset index
- Thanks to it, Cassandra doesn’t have to perform any search to reach data, as it has clear localization pointed in Partition Index
- You can look at it like:
token => partition_byte_offset
SSTables
can contain many Partitions in them what can lead to significant size of Partition Index- Also it’s stored in Disk, so it’s not as fast as RAM
- Partition Summary
- As Partition Index can grow large, there’s another index - Partition Summary
- Partition Summary is located in RAM
- It samples every N keys
- Provides an estimate of offset of Partition Index
token => index_offset
- It means: for a given token, jump to specific area of Partition Index, where you will likely find that token and therefore you will be able to jump at Data Partition offset (where it really lies on disk)
- Key Cache
- Located in RAM
- It can be manually enabled
token => partition_byte_offset
- Bloom Filter
- It’s about existence, not localization of data
token => yes / no / maybe
- Sometimes a False Read can happen - when Bloom says token/data is there, but it isn’t (false positive)
- Bloom Filter can be tuned for better results
ALTER TABLE <table> WITH bloom_filter_fp_chance = <value>;
- The lower the
value
, the lower fp change, the bigger disk usage
- The lower the
- The whole flow
- input
- -> Bloom Filter
- -> Key Cache
- -> Partition Summary
- -> Partition Index
- ->
SSTable
- data
Compaction
- Compaction is
- Removing stale data from persisted
SSTables
- Combining multiple
SSTables
into oneSSTable
- Remember
SSTables
are immutable - nothing is modified, a brand newSSTable
is created
- Remember
- Removing stale data from persisted
- Compaction is based on timestamps
- the latest record wins
gc_grace_seconds
- time tombstone entry is kept before it gets evicted
- prevents resurrection of data in case of repair
- defaults to
10 days
- Compaction Strategies
- Performing one big ultimate all-round Compaction would be an overkill
- Therefore there are particular strategies design for optimization of this process
- Compaction Strategies, continued
SizeTiered
Compaction- Default
- Triggers when there are multiple
SSTables
of a similar size - Fine for write-heavy workloads
Leveled
Compaction- Groups
SSTables
into Levels- each having fixed size limit being 10x larger than previous Level
- Fine for read-heavy workloads
- Groups
TimeWindow
Compaction- Creates time windowed buckets of
SSTables
- which are compacted (with each other) using
SizeTiered
Compaction
- which are compacted (with each other) using
- Fine for data coming in sequentially by time
- Creates time windowed buckets of