简介
什么是跳跃表?
所谓跳跃表,就是在普通链表的基础上增加了多层索引链表,这样在查找时就可以通过在上下不同层级的索引链表间跳跃,以达到快速查找的目的。当然,我这样说可能比较抽象,下面我用一张图来简单解释一下。
从上面这张图可以看出,跳跃表有三种不同的对象。
i)以蓝色表示的数据链表
图中最下层是真正存储数据的数据链表,其中每个Node节点
都有三个字段:
- key:键值对的KEY
- value:键值对的VALUE
- next:指向下一个数据节点
此外,数据链表还有一个特点,那就是它是顺序存储的,通过插入数据时处理实现。
ii)以橙色表示的索引链表
通过观察图中的索引链表,我们可以发现它有一个特点,那就是它是分层级的,而且同时指向它的下层索引节点以及同层级的右侧的索引节点。因此,每个Index节点
也有三个字段:
- node:同一列的
Index节点
都指向最下层的同一个Node节点
,目的是可以随时通过索引节点判断出它对应的数据节点是哪个 - down:指向下一层的索引节点
- right:指向同一层的右侧的索引节点
iii)以紫色表示的头部索引链表
仔细观察上面的示意图可以发现,头部索引链表本质上也是索引链表,只是在普通的索引链表的基础上新增了一个表示当前横向链表层级的字段。因此,每个HeadIndex节点
就需要用四个字段来表示:
- level:当前索引链表的层级,从 1 开始计数
- node:指向左下角的基础Node节点,这个Node节点只是起到一个标志性作用,因此没有KEY
- down:指向下一层的头部索引节点
- right:指向同一层的右侧的索引节点
跳跃表的查找
还是用上面那个示意图举例,我们来看两个典型的查找示例。
i)查找数据节点D:
很明显,先从左上角的HeadIndex节点
开始查找。
- 第三层右侧的索引节点是
Index[G]
,其数据节点大于D,所以不能移动到右侧的Index[G]
; - 下降到第二层的
HeadIndex
节点继续向右查找,Index[B]
的数据节点的KEY是B,小于D,因此移动到第二层的Index[B]
; - 再次将第二层的
Index[B]
跟右侧的Index[G]
比较,发现不能继续向右移动; - 下降到第一层的
Index[B]
,然后跟右侧的Index[F]
比较,发现不能继续向右移动; - 紧接着,下面已经没有其他索引链表,因此在数据链表上继续向右查找D,最后查得返回结果。
ii)查找数据节点H:
看完上面的例子后,相信你应该已经明白,在跳跃表中查找数据一般需要以下几个步骤:
- 从左上角的
HeadIndex节点
开始查找; - 然后先在横向链表中向右查找小于目标节点的最大值的索引节点(PS:如果这个索引节点对应的数据节点就是我们查找的那个节点,则直接返回);
- 如果执行完上面步骤后的那个索引节点还存在更底层的索引节点,那么继续向下移动一步;
- 重复上面的2、3步骤,直到下面已经没有更底层的索引节点;
- 最后在数据链表上继续向右查找目标节点,并返回最终结果。
根据上面叙述的一般性步骤,下面我们通过一张示意图再来看看查找H需要什么步骤。
(3)跳跃表的插入
在上面的示意图可以看出,跳跃表主要分为两个部分:数据链表和索引链表。因此,在插入数据的时候首先需要做的是将数据插入到数据链表中,然后再给新插入的数据设置一列合理的索引。
我们这里还是以上面那个图为例,假设节点B是新插入到跳跃表的节点,我们来看一下整个插入过程需要做哪些步骤。
i)将数据插入到数据链表:
因为跳跃表是在普通链表的基础上增加了多个层级的索引链表,以实现快速查找的目的,所以它的核心功能仍然有一个数据链表来存储数据,因此我们需要做的第一步就是将数据插入到数据链表合适的位置。
有了上面查询流程的介绍,这个步骤的实现就很简单了,总共需要以下两步来实现:
- 查找比待插入节点小的最大数据节点;
- 将新节点插入到数据链表。
ii)建立随机层级的合理的索引:
在将数据插入到数据链表后,我们还需要做的是为这个新的数据节点建立合适的索引。这一个步骤没有标准答案,只需要为新节点建立随机层级的索引节点就可以了,当然最好还可以做到均匀分布。
在Java8
的ConcurrentSkipListMap
源码中,它设置的策略是:
- 生成一个随机整数(可能是整数,也可能是负数);
- 判断这个随机数的二进制表示的最高位和最低位是否同时为1,如果都为1,则不需要为该节点建立索引,否则走下面建立索引节点的流程(00、01、10、11,因此有3/4的概率会创建索引);
- 设置默认待建立的索引的层级是1,然后判断上面这个随机数的二进制表示从右数第二位起一共有几个1,有几个1前面的默认层级(1)就自增几次作为最终的新节点的索引层级。当然,如果这个数字太大,超过了原来索引链表的最大层级,则将新节点的索引层级设置为原来索引链表的最大层级再加一;
- 循环初始化新节点新增的那一列的索引节点,这个步骤结束。
iii)将新建立的索引节点关联到原有的索引链表中:
在上面那个步骤完成索引节点的初始化后,这一步需要做的是将各层级的索引节点关联到原来的横向索引链表中。因此可以通过以下四步来实现:
- 从左上角的
HeadIndex
节点开始查找; - 根据level和新节点的KEY找到新节点的最顶层的索引节点(PS:上面步骤完成后得到的就是这个最顶层的索引节点)的左边的那个索引节点,然后在这一层的横向索引链表中加入新节点的最顶层的索引节点;
- 继续将新节点的最顶层的索引节点的底层的索引节点插入到它所在的横向链表中。同样先是根据当前level和新节点的KEY查到其左边的那个索引节点,然后将新节点的这一层的索引节点加入到横向索引链表中;
- 重复上面的2、3步骤,直到将新节点的所有索引节点都关联到原来的横向索引链表中。
(4)跳跃表的删除
跳跃表的删除实际上就是插入过程的逆向操作,因此理解了上面是怎么插入的,自然就知道如何删除了。当然,主要步骤也是三个:
- 查找到需要删除的那个数据节点;
- 将数据节点从数据链表上移除;
- 循环删除这个节点的所有层级的索引节点,并重新关联索引节点。
跳跃表的Java实现
在上面,我花了很大篇幅介绍跳跃表的基本操作步骤,下面我将给出一份用Java实现的跳跃表,详细逻辑可以参考上述说明以及代码中的注释。
package com.zrkworld.skiplist;
import com.zrkworld.hashtable.Map;
import java.text.MessageFormat;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
public class SkipListMap <K extends Comparable, V> implements Map<K, V> {
/**
* 单个节点信息
*/
static class Node<K extends Comparable, V> {
/**
* key
*/
final K key;
/**
* value
*/
volatile Object value;
/**
* 下一个节点
*/
volatile Node<K, V> next;
public Node(K key, V value) {
this.key = key;
this.value = value;
this.next = null;
}
public Node(K key, Object value, Node<K, V> next) {
this.key = key;
this.value = value;
this.next = next;
}
/**
* 返回是否是“链表最左下角的 BASE_HEADER 节点”
*/
boolean isBaseHeader(){
return value == BASE_HEADER;
}
/**
* 返回是否是数据节点
*/
V getValidValue() {
Object v = value;
if (v == this || this.isBaseHeader()) {
return null;
}
return (V)v;
}
/**
* 跟另一个节点比较KEY的大小
* @param o 另一个节点
* @return int
*/
int compareKeyTo(Node<K, V> o){
return this.key.compareTo(o.key);
}
/**
* 当前节点KEY跟另一个节点的KEY比较
* @param key 另一个节点的KEY
* @return int
*/
int compareKeyTo(K key){
return this.key.compareTo(key);
}
@Override
public final String toString() {
return key + "=" + value;
}
@Override
public final boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Node<?, ?> node = (Node<?, ?>) o;
return SkipListMap.equals(key, node.key) &&
SkipListMap.equals(value, node.value);
}
/**
* 取 key 和 value 的 hashCode 的异或
*/
@Override
public final int hashCode() {
return SkipListMap.hashCode(key) ^ SkipListMap.hashCode(value);
}
}
/**
* 索引节点
*/
static class Index<K extends Comparable, V> {
/**
* 索引指向的最底层的数据节点
*/
final Node<K, V> node;
/**
* 下边的索引节点
*/
final Index<K, V> down;
/**
* 右边的索引节点
*/
volatile Index<K, V> right;
public Index(Node<K, V> node, Index<K, V> down) {
this(node, down, null);
}
public Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
this.node = node;
this.down = down;
this.right = right;
}
}
/**
* 头部索引节点
*/
static class HeadIndex<K extends Comparable, V> extends Index<K, V>{
/**
* 用于标识当前索引的层级
*/
int level;
public HeadIndex(Node<K, V> node, Index<K, V> down, Index<K, V> right, int level) {
super(node, down, right);
this.level = level;
}
}
/* ---------------- Fields -------------- */
/**
* 初始首节点
*/
private static final Object BASE_HEADER = new Object();
/**
* 默认第一层的level
*/
public static final int DEFAULT_LEVEL = 1;
/**
* 整个跳跃表的最顶层的头节点
*/
private transient volatile HeadIndex<K, V> head;
/**
* 索引的level
*/
private volatile int level;
public SkipListMap() {
this.initialize();
}
public static int hashCode(Object o) {
return o != null ? o.hashCode() : 0;
}
public static boolean equals(Object a, Object b) {
return (a == b) || (a != null && a.equals(b));
}
@Override
public int size() {
int count = 0;
for(Node<K, V> temp = this.findFirst(); temp != null; temp = temp.next){
if(temp.getValidValue() != null){
count++;
}
}
return count;
}
@Override
public boolean isEmpty() {
return this.findFirst() == null;
}
@Override
public boolean containsKey(K key) {
return this.getNode(key) != null;
}
@Override
public V get(K key) {
Node<K, V> temp;
return (temp = this.getNode(key)) != null ? (V) temp.value : null;
}
@Override
public void put(K key, V value) {
this.putVal(key, value);
}
@Override
public void remove(K key) {
this.removeVal(key);
}
@Override
public void clear() {
this.initialize();
}
@Override
public void forEach(BiConsumer<? super K, ? super V> action) {
}
/**
* 格式化遍历输出
*/
public void forEach() {
HeadIndex<K, V> headIdx = this.head;
//1. 获取索引节点的数组
Index<?, ?>[] idxArr = new Index<?, ?>[this.level];
while (headIdx != null){
idxArr[headIdx.level-1] = headIdx.right;
headIdx = (HeadIndex<K, V>) headIdx.down;
}
//2. 获取第一个数据节点
Node<K, V> node = this.findFirst();
//3. 遍历索引节点
for(int i = this.level; i > 0; i--){
System.out.print(MessageFormat.format("HeadIdx[level={0}]", i));
if(idxArr.length == this.level){
Index<?, ?> idx = idxArr[i - 1];
Node<K, V> tmpNode = node;
//以数据链表为基准遍历索引列表,如果某个数据节点不存在索引则用占位符替代
while (tmpNode != null){
if(idx != null && tmpNode.equals(idx.node)){
System.out.print(MessageFormat.format(", Idx[key={0}]", tmpNode.key));
idx = idx.right;
}else{
System.out.print(", Idx[null]");
}
tmpNode = tmpNode.next;
}
}
System.out.print("\n");
}
//4. 遍历数据节点
System.out.print("HeadNode[BASE_HEADER]");
while (node != null){
System.out.print(MessageFormat.format(", Node[key={0}, value={1}]", node.key, node.value));
node = node.next;
}
System.out.print("\n");
}
/**
* 初始化
*/
private void initialize() {
level = DEFAULT_LEVEL;
head = new HeadIndex<K, V>(new Node<K, V>(null, BASE_HEADER, null),
null, null, DEFAULT_LEVEL);
}
/**
* 获取第一个数据节点
*/
private Node<K, V> findFirst(){
Node<K, V> preNode = this.head.node;
//数据部分的节点
Node<K, V> node = preNode.next;
if(node != null){
return node;
}else{
return null;
}
}
/**
* 移除某个键值对
*/
private void removeVal(K key){
//1. 删除数据节点
//1.1 获取key的前置数据节点
Node<K, V> preNode = this.findPreNode(key);
//下一个数据节点
Node<K, V> node = preNode.next;
//1.2 判断是否是我们查找的那个节点
if(node == null || node.key.compareTo(key) != 0){
return;
}
//1.3 将数据节点从链表上移除
preNode.next = node.next;
//2. 删除索引节点
Index<K, V> preLevelIdx = this.findDownPreIdx(this.head, node);
if(preLevelIdx != null){
//2.1 获取目标节点的索引节点
Index<K, V> levelIdx = preLevelIdx.right;
//2.2 重新关联索引链表
preLevelIdx.right = levelIdx.right;
//2.3 继续删除下层的索引节点
while (preLevelIdx != null){
preLevelIdx = this.findDownPreIdx(preLevelIdx, node);
if (preLevelIdx != null) {
levelIdx = preLevelIdx.right;
preLevelIdx.right = levelIdx.right;
}
}
}
}
/**
* 根据 key 查找对应数据节点
*/
private Node<K, V> getNode(K key){
//1. 获取key的前置数据节点
Node<K, V> preNode = this.findPreNode(key);
//下一个数据节点
Node<K, V> node = preNode.next;
//2. 判断是否是我们查找的那个节点
if(node != null && node.key.compareTo(key) == 0){
return node;
}else{
return null;
}
}
/**
* 存储某个键值对
*/
private void putVal(K key, V value){
//1. 获取key的前置数据节点
Node<K, V> preNode = this.findPreNode(key);
//获取后置节点,作为新节点的后置节点
Node<K, V> nextNode = preNode.next;
//如果发现重复节点,则直接替换值并返回
if(nextNode != null && nextNode.compareKeyTo(key) == 0){
nextNode.value = value;
preNode.next = nextNode;
return;
}
//2. 创建新的数据节点
Node<K, V> newNode = new Node<>(key, value);
//3. 将新节点挂载到链表
newNode.next = nextNode;
preNode.next = newNode;
//4. 设置新节点的索引
this.createNodeIndex(newNode);
}
/**
* 设置新插入数据节点的索引情况
* @param newNode 新插入到链表的数据节点
*/
private void createNodeIndex(Node<K, V> newNode){
//1. 生成一个随机整数
int rnd = ThreadLocalRandom.current().nextInt();
//2. 如果最高位和最低位都为1,则直接插入链表(1/4的概率),其他的需要创建索引节点
if((rnd & 0x80000001) == 0){
HeadIndex<K, V> headIdx = this.head;
Index<K, V> idx = null;
//索引节点的层级
int level = 1, maxLevel = headIdx.level;
//2.1 判断应该建立几层的索引节点(从右边第2为开始计算连续为1的个数)
while (((rnd = rnd >>> 1) & 1) == 1){
level++;
}
//2.2 创建对应的索引节点
//如果计算出来的层级没有超过原来最大的层级,则直接循环建立索引节点,否则需要在顶层再新建一层
if(level <= maxLevel){
for(int i = 1; i <= level; i++){
idx = new Index<>(newNode, idx);
}
}else{
//在顶层再新建一层索引节点
level = maxLevel + 1;
this.level = level;
//2.2.1 创建索引节点
for(int i = 1; i <= level; i++){
idx = new Index<>(newNode, idx);
}
//2.2.2 重新设置各层级的头索引结点
HeadIndex<K, V> newHeadIdx = new HeadIndex<>(headIdx.node, headIdx, null, level);
HeadIndex<K, V> levelHeadIdx = newHeadIdx;
for(int i = level; i >= 1; i--){
levelHeadIdx.level = i;
levelHeadIdx = (HeadIndex<K, V>) levelHeadIdx.down;
}
//创建新的最顶层的头节点
this.head = newHeadIdx;
headIdx = newHeadIdx;
}
//2.3 将新创建的那一列索引节点跟原来的关联起来
Index<K, V> levelIdx = idx;
Index<K, V> preLevelIdx = this.findDownPreIdx(headIdx, headIdx.level, level, idx.node);
//横向关联索引节点
levelIdx.right = preLevelIdx.right;
preLevelIdx.right = levelIdx;
for(int i = level; i > 1; i--){
levelIdx = levelIdx.down;
preLevelIdx = this.findDownPreIdx(preLevelIdx, i, (i - 1), levelIdx.node);
//横向关联索引节点
levelIdx.right = preLevelIdx.right;
preLevelIdx.right = levelIdx;
}
}
}
/**
* 查找底层的指定索引节点的前置索引节点,没有则返回空
* @param current 当前索引位置
* @param expectedNode 目标节点
* @return com.zrkworld.skiplist.SkipListMap.Index<K, V>
*/
private Index<K, V> findDownPreIdx(Index<K, V> current, Node<K, V> expectedNode){
Index<K, V> currentIdx = current;
//右边索引节点
Index<K, V> rightIdx;
//右边数据节点
Node<K, V> rightNode;
//不断向右和向下搜索指定层级的前置索引节点
while (currentIdx != null){
//1. 将索引向右移动
while (true){
rightIdx = currentIdx.right;
if(rightIdx == null){
break;
}
rightNode = rightIdx.node;
//如果右边索引节点还在目标节点的左边,则将索引向右移动
if(rightNode.compareKeyTo(expectedNode) < 0){
currentIdx = currentIdx.right;
}else if(rightNode.compareKeyTo(expectedNode) == 0){
return currentIdx;
}else{
break;
}
}
//2. 将索引向下移动一步
currentIdx = currentIdx.down;
}
return null;
}
/**
* 查找底层的指定层级的前置索引节点
* @param current 当前索引位置
* @param currentLevel 当前层级
* @param expectedLevel 待查找的层级
* @param expectedNode 目标节点
* @return com.zrkworld.skiplist.SkipListMap.Index<K, V>
*/
private Index<K, V> findDownPreIdx(Index<K, V> current, int currentLevel,
int expectedLevel, Node<K, V> expectedNode){
Index<K, V> currentIdx = current;
//右边索引节点
Index<K, V> rightIdx;
//右边数据节点
Node<K, V> rightNode;
//不断向右和向下搜索指定层级的前置索引节点
while (currentLevel >= 1){
//1. 将索引向右移动
while (true){
rightIdx = currentIdx.right;
if(rightIdx == null){
break;
}
rightNode = rightIdx.node;
//如果右边索引节点还在目标节点的左边,则将索引向右移动
if(rightNode.compareKeyTo(expectedNode) < 0){
currentIdx = currentIdx.right;
}else{
break;
}
}
//2. 将索引向下移动一步
if(currentLevel > expectedLevel){
currentLevel = (currentLevel -1);
currentIdx = currentIdx.down;
}else{
break;
}
}
return currentIdx;
}
/**
* 查找指定KEY的前置
* @param key KEY
* @return com.zrkworld.skiplist.SkipListMap.Node<K,V>
*/
private Node<K, V> findPreNode(K key){
Index<K, V> currentIdx = head;
//下边索引节点
Index<K, V> downIdx;
//右边索引节点
Index<K, V> rightIdx;
//右边数据节点
Node<K, V> rightNode;
//不断向右和向下搜索指定KEY的前置数据节点
while (true){
//1. 将索引向右移动
while (true){
rightIdx = currentIdx.right;
if(rightIdx == null){
break;
}
rightNode = rightIdx.node;
//如果右边索引节点还在目标节点的左边,则将索引向右移动
if(rightNode.compareKeyTo(key) < 0){
currentIdx = currentIdx.right;
}else{
break;
}
}
downIdx = currentIdx.down;
//2. 如果下边索引节点不为空,则将索引向下移动一步
if(downIdx != null){
currentIdx = downIdx;
}else{
break;
}
}
//3. 在数据链表上继续向右移动
Node<K, V> idxNode = currentIdx.node;
while (true){
rightNode = idxNode.next;
//如果右边数据节点还在目标节点的左边,则将索引向右移动
if(rightNode == null || rightNode.compareKeyTo(key) >= 0){
break;
}else{
idxNode = idxNode.next;
}
}
return idxNode;
}
}
测试用例:
@Test
public void test(){
SkipListMap<String,Integer > skipListMap = new SkipListMap<>();
//1.1 插入
skipListMap.put(“A”, 1);
skipListMap.put(“B”, 2);
skipListMap.put(“J”, 10);
skipListMap.put(“D”, 4);
skipListMap.put(“C”, 3);
skipListMap.put(“E”, 5);
skipListMap.put(“I”, 9);
skipListMap.put(“F”, 6);
skipListMap.put(“H”, 8);
skipListMap.put(“G”, 7);
//1.2 遍历
skipListMap.forEach();
System.out.println("--------------------------------------------------");
//2.1 查找
System.out.println("KEY=D,VALUE=" + skipListMap.get("D"));
System.out.println("KEY=H,VALUE=" + skipListMap.get("H"));
//2.2 遍历
skipListMap.forEach();
System.out.println("--------------------------------------------------");
//3. 删除
skipListMap.remove("B");
skipListMap.remove("C");
skipListMap.remove("G");
//3.2 遍历
skipListMap.forEach();
System.out.println("--------------------------------------------------");
123456789101112131415161718
}
测试用例输出如下:
HeadIdx[level=3], Idx[null], Idx[null], Idx[null], Idx[null], Idx[key=E], Idx[null], Idx[key=G], Idx[null], Idx[null], Idx[null]
HeadIdx[level=2], Idx[key=A], Idx[null], Idx[key=C], Idx[null], Idx[key=E], Idx[null], Idx[key=G], Idx[null], Idx[null], Idx[null]
HeadIdx[level=1], Idx[key=A], Idx[null], Idx[key=C], Idx[null], Idx[key=E], Idx[null], Idx[key=G], Idx[null], Idx[null], Idx[null]
HeadNode[BASE_HEADER], Node[key=A, value=1], Node[key=B, value=2], Node[key=C, value=3], Node[key=D, value=4], Node[key=E, value=5], Node[key=F, value=6], Node[key=G, value=7], Node[key=H, value=8], Node[key=I, value=9], Node[key=J, value=10]
--------------------------------------------------
KEY=D,VALUE=4
KEY=H,VALUE=8
HeadIdx[level=3], Idx[null], Idx[null], Idx[null], Idx[null], Idx[key=E], Idx[null], Idx[key=G], Idx[null], Idx[null], Idx[null]
HeadIdx[level=2], Idx[key=A], Idx[null], Idx[key=C], Idx[null], Idx[key=E], Idx[null], Idx[key=G], Idx[null], Idx[null], Idx[null]
HeadIdx[level=1], Idx[key=A], Idx[null], Idx[key=C], Idx[null], Idx[key=E], Idx[null], Idx[key=G], Idx[null], Idx[null], Idx[null]
HeadNode[BASE_HEADER], Node[key=A, value=1], Node[key=B, value=2], Node[key=C, value=3], Node[key=D, value=4], Node[key=E, value=5], Node[key=F, value=6], Node[key=G, value=7], Node[key=H, value=8], Node[key=I, value=9], Node[key=J, value=10]
--------------------------------------------------
HeadIdx[level=3], Idx[null], Idx[null], Idx[key=E], Idx[null], Idx[null], Idx[null], Idx[null]
HeadIdx[level=2], Idx[key=A], Idx[null], Idx[key=E], Idx[null], Idx[null], Idx[null], Idx[null]
HeadIdx[level=1], Idx[key=A], Idx[null], Idx[key=E], Idx[null], Idx[null], Idx[null], Idx[null]
HeadNode[BASE_HEADER], Node[key=A, value=1], Node[key=D, value=4], Node[key=E, value=5], Node[key=F, value=6], Node[key=H, value=8], Node[key=I, value=9], Node[key=J, value=10]
--------------------------------------------------
1234567891011121314151617
注:跳跃表的输出结果是不确定的,这里只展示了其中一种输出情况,仅供参考。