Fork me on GitHub
Blog


  • 首页

  • 标签

  • 分类

  • 归档

  • 关于

【redis】redis Sentinel 原理

发表于 2018-09-15 | 分类于 redis

[TOC]

redis Sentinel 原理

1. 定时监控任务

一套合理的监控机制是Sentinel节点判定节点不可达的重要保证, Redis Sentinel通过三个定时监控任务完成对各个节点发现和监控:

  1. 每隔10秒,每个Sentinel节点会向主节点和从节点发送info命令获取 最新的拓扑结构

定时任务作用:

  • 通过向主节点执行info命令, 获取从节点的信息, 这也是为什么 Sentinel节点不需要显式配置监控从节点。
  • 当有新的从节点加入时都可以立刻感知出来。
  • 节点不可达或者故障转移后, 可以通过info命令实时更新节点拓扑信 息
  1. 每隔2秒,每个Sentinel节点会向Redis数据节点的__sentinel__:hello 频道上发送该Sentinel节点对于主节点的判断以及当前Sentinel节点的信息 ,同时每个Sentinel节点也会订阅该频道,来了解其他 Sentinel节点以及它们对主节点的判断

定时任务作用:

  • 发现新的Sentinel节点: 通过订阅主节点的__sentinel__:hello了解其他 的Sentinel节点信息, 如果是新加入的Sentinel节点, 将该Sentinel节点信息保 存起来, 并与该Sentinel节点创建连接
  • Sentinel节点之间交换主节点的状态, 作为后面客观下线以及领导者选举的依据
  1. 每隔1秒, 每个Sentinel节点会向主节点、 从节点、 其余Sentinel节点 发送一条ping命令做一次心跳检测, 来确认这些节点当前是否可达,这个定时任务是节 点失败判定的重要依据

2. 主观下线和客观下线

2.1 主观下线

每个Sentinel节点会每隔1秒对主节 点、 从节点、 其他Sentinel节点发送ping命令做心跳检测, 当这些节点超过 down-after-milliseconds没有进行有效回复, Sentinel节点就会对该节点做失败 判定, 这个行为叫做主观下线

2.2 客观下线

当Sentinel主观下线的节点是主节点时, 该Sentinel节点会通过sentinel ismaster-down-by-addr命令向其他Sentinel节点询问对主节点的判断, 当超过 <quorum>个数, Sentinel节点认为主节点确实有问题, 这时该Sentinel节点会 做出客观下线的决定

从节点、 Sentinel节点在主观下线后, 没有后续的故障转移操作

3. 领导者Sentinel选举

Redis使用了Raft算法实 现领导者选举

Redis Sentinel进行领导者选举的大致思路:

  • 每个在线的Sentinel节点都有资格成为领导者, 当它确认主节点主观 下线时候, 会向其他Sentinel节点发送sentinel is-master-down-by-addr命令, 要求将自己设置为领导者。
  • 收到命令的Sentinel节点, 如果没有同意过其他Sentinel节点的sentinel is-master-down-by-addr命令, 将同意该请求, 否则拒绝。
  • 如果该Sentinel节点发现自己的票数已经大于等于max(quorum, num(sentinels) /2+1) , 那么它将成为领导者。
  • 如果此过程没有选举出领导者, 将进入下一次选举

4. 故障转移

故障转移流程:

  1. 在从节点列表中选出一个节点作为新的主节点, 选择方法如下:
    • 过滤: “不健康”(主观下线、 断线) 、 5秒内没有回复过Sentinel节 点ping响应、 与主节点失联超过down-after-milliseconds*10秒。
    • 选择slave-priority(从节点优先级) 最高的从节点列表, 如果存在则 返回, 不存在则继续。
    • 选择复制偏移量最大的从节点(复制的最完整) , 如果存在则返 回, 不存在则继续。
    • 选择runid最小的从节点。
  2. Sentinel领导者节点会对第一步选出来的从节点执行slaveof no one命 令让其成为主节点。
  3. Sentinel领导者节点会向剩余的从节点发送命令, 让它们成为新主节 点的从节点, 复制规则和parallel-syncs参数有关。
  4. Sentinel节点集合会将原来的主节点更新为从节点, 并保持着对其关 注, 当其恢复后命令它去复制新的主节点

【redis】redis Sentinel 简介

发表于 2018-09-14 | 分类于 redis

[TOC]

redis Sentinel

1. 高可用

1.1 主从模式的高可用性

redis主从复制模式下主节点故障,故障转移过程:

  • 主节点发生故障后, 客户端(client) 连接主节点失 败, 两个从节点与主节点连接失败造成复制中断
  • 如果主节点无法正常启动, 需要选出一个从节点 (slave-1) , 对其执行slaveof no one命令使其成为新的主节点
  • 原来的从节点(slave-1) 成为新的主节点后, 更新应 用方的主节点信息, 重新启动应用方
  • 客户端命令另一个从节点(slave-2) 去复制新的主节 点(new-master)
  • 待原来的主节点恢复后, 让它去复制新的主节点。

产生的问题:

  • 判断节点不可达的机制是否健全和标 准
  • 如果有多个从节点, 怎样保证只有一个被晋升为主节点
  • 通知客户端新的主节点机制是否足够健壮

1.2 Redis Sentinel的高可用性

当主节点出现故障时, Redis Sentinel能自动完成故障发现和故障转移, 并通知应用方, 从而实现真正的高可用

Redis Sentinel是一个分布式架构, 其中包含若干个Sentinel节点和Redis 数据节点, 每个Sentinel节点会对数据节点和其余Sentinel节点进行监控, 当 它发现节点不可达时, 会对节点做下线标识。 如果被标识的是主节点, 它还 会和其他Sentinel节点进行“协商”, 当大多数Sentinel节点都认为主节点不可 达时, 它们会选举出一个Sentinel节点来完成自动故障转移的工作, 同时会 将这个变化实时通知给Redis应用方。

1个主节点、 2个从节点、 3个Sentinel节点组成的Redis Sentinel 架构故障转移机制:

  • 主节点出现故障, 此时两个从节点与主节点失去连 接, 主从复制失败
  • 每个Sentinel节点通过定期监控发现主节点出现了故 障。
  • 多个Sentinel节点对主节点的故障达成一致, 选举出 sentinel-3节点作为领导者负责故障转移。
  • Sentinel领导者节点执行了故障转移 ,和主从复制模式下流程相同
  • 故障转移完成

1.3 Redis Sentinel 的功能

  • 监控: Sentinel节点会定期检测Redis数据节点、 其余Sentinel节点是否 可达。
  • 通知: Sentinel节点会将故障转移的结果通知给应用方。
  • 主节点故障转移: 实现从节点晋升为主节点并维护后续正确的主从关 系。
  • 配置提供者: 在Redis Sentinel结构中, 客户端在初始化的时候连接的 是Sentinel节点集合, 从中获取主节点信息。

2. redis Sentinel 部署

2.1 配置

1
2
3
4
5
6
7
8
9
10
port 26379
dir /opt/soft/redis/data
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
#主节点设置了密码
#sentinel auth-pass <master-name> <password>
#sentinel notification-script <master-name> <script-path>
#sentinel client-reconfig-script <master-name> <script-path>
  • sentinel monitor
1
sentinel monitor <master-name> <ip> <port> <quorum>

代表要判定主节点最终不可达所需要的票数

启动后配置文件变化,会自动发现了从节点、 其余Sentinel节点 ;去掉了默认配置, 例如parallel-syncs、 failover-timeout参数;添加了配置纪元相关参数。

1
2
3
4
5
6
sentinel leader-epoch mymaster 0
sentinel known-slave mymaster 127.0.0.1 6380
sentinel known-slave mymaster 127.0.0.1 6381
sentinel known-sentinel mymaster 127.0.0.1 26381 6201f902c6d584980bb9e6eb05dc26947ae346e0
sentinel known-sentinel mymaster 127.0.0.1 26380 a9c803c3d4f07b0389163d837de2262559ed0321
sentinel current-epoch 0
  • sentinel down-after-milliseconds
1
sentinel down-after-milliseconds <master-name> <times>

(单位为毫秒) 就是超时时间

每个Sentinel节点都要通过定期发送ping命令来判断Redis数据节点和其余Sentinel节点是否可达, 如果超过了down-after-milliseconds配置的时间且没 有有效的回复, 则判定节点不可达

  • sentinel parallel-syncs
1
sentinel parallel-syncs <master-name> <nums>

parallel-syncs 用来限制在一次故障转移之后, 每次向新的主节点发起复制操作的从节点个数

  • sentinel failover-timeout
1
sentinel failover-timeout <master-name> <times>

failover-timeout故障转移超时时间,作用于故障各个阶段:

  1. 选出合适从节点。
  2. 晋升选出的从节点为主节点。
  3. 命令其余从节点复制新的主节点。
  4. 等待原主节点恢复后命令它去复制新的主节点。
  • sentinel notification-script
1
sentinel notification-script <master-name> <script-path>

sentinel notification-script的作用是在故障转移期间, 当一些警告级别的 Sentinel事件发生(指重要事件, 例如-sdown: 客观下线、 -odown: 主观下 线) 时, 会触发对应路径的脚本, 并向脚本发送相应的事件参数

  • sentinel client-reconfig-script
1
sentinel client-reconfig-script <master-name> <script-path>

sentinel client-reconfig-script的作用是在故障转移结束后, 会触发对应路 径的脚本, 并向脚本发送故障转移结果的相关参数

2.2 监控多 个节点

指定多个masterName来区分不同的主节点 即可

1
2
3
4
5
6
7
8
sentinel monitor master-business-1 10.10.xx.1 6379 2
sentinel down-after-milliseconds master-business-1 60000
sentinel failover-timeout master-business-1 180000
sentinel parallel-syncs master-business-1 1
sentinel monitor master-business-2 10.16.xx.2 6380 2
sentinel down-after-milliseconds master-business-2 10000
sentinel failover-timeout master-business-2 180000
sentinel parallel-syncs master-business-2 1

2.3 部署优化

  • Sentinel节点不应该部署在一台物理“机器”上
  • 部署至少三个且奇数个的Sentinel节点。
  • 如果Sentinel节点集合监控的是同一个业务的多个主节点集合, 那么使用一套Sentinel、 否则一般建议采用多套Sentinel

3. Redis Sentinel 客户端

3.1 客户端实现流程

  1. 遍历Sentinel节点集合获取一个可用的Sentinel节点,Sentinel节点之间可以共享数据, 所以从任意一个Sentinel节点获取主节点信 息都是可以的
  2. 通过sentinel get-master-addr-by-name master-name这个API来获取对应 主节点的相关信息
  3. 验证当前获取的“主节点”是真正的主节点, 这样做的目的是为了防 止故障转移期间主节点的变化
  4. 保持和Sentinel节点集合的“联系”, 时刻获取关于主节点的相关“信 息”

3.2 Jedis Sentinel操作

1
2
3
4
5
public JedisSentinelPool(String masterName, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int connectionTimeout,
final int soTimeout,
final String password, final int database,
final String clientName)

Options:

  • masterName——主节点名。
  • sentinels——Sentinel节点集合。
  • poolConfig——common-pool连接池配置。
  • connectTimeout——连接超时。
  • soTimeout——读写超时。
  • password——主节点密码。
  • database——当前数据库索引。
  • clientName——客户端名
1
2
3
4
5
6
7
8
9
10
11
//获取jedis
Jedis jedis = null;
try {
jedis = jedisSentinelPool.getResource();
// jedis command
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (jedis != null)
jedis.close();
}

【redis】redis 复制

发表于 2018-09-13 | 分类于 redis

[TOC]

redis 复制

1. 配置

参与复制的Redis实例划分为主节点(master) 和从节点(slave),复制的数据流是单向的, 只能由主节点复制到从节 点

1.1 建立复制

配置复制的方式有以下三种:

  • 在配置文件中加入slaveof{masterHost}{masterPort}随Redis启动生 效。
  • 在redis-server启动命令后加入–slaveof{masterHost}{masterPort}生 效。
  • 直接使用命令: slaveof{masterHost}{masterPort}生效。

1.2 断开复制

在从节点执行slaveof no one来断 开与主节点复制关系

1.3 只读

从节点使用slave-read-only=yes配置为只读模式,防止数据不一致

1.4 传输延迟

Redis为我们提供了repl-disable-tcp-nodelay参数用于控制是否关闭 TCP_NODELAY, 默认关闭

  • 当关闭时, 主节点产生的命令数据无论大小都会及时地发送给从节 点, 这样主从之间延迟会变小, 但增加了网络带宽的消耗。 适用于主从之间 的网络环境良好的场景, 如同机架或同机房部署。
  • 当开启时, 主节点会合并较小的TCP数据包从而节省带宽。 默认发送 时间间隔取决于Linux的内核, 一般默认为40毫秒。 这种配置节省了带宽但 增大主从之间的延迟。 适用于主从网络环境复杂或带宽紧张的场景, 如跨机 房部署。

2. 复制原理

2.1 复制过程

在从节点执行slaveof命令后, 复制过程便开始运作,复制过程大致分为6个过程

  1. 保存主节点(master) 信息。执行slaveof后从节点只保存主节点的地址信息便直接返回, 这时建立复 制流程还没有开始
  2. 从节点(slave) 内部通过每秒运行的定时任务维护复制相关逻辑, 当定时任务发现存在新的主节点后, 会尝试与该节点建立网络连接(socket套接字)
  3. 发送ping命令。连接建立成功后从节点发送ping请求进行首次通信,用于检测主从之间网络套接字是否可用,检测主节点当前是否可接受处理命令
  4. 权限验证。从节点必须配置masterauth参数保证与主节点requirepass相同的密码才能通过验证
  5. 同步数据集。主从复制连接正常通信后, 对于首次建立复制的场景,主节点会把持有的数据全部发送给从节点, 这部分操作是耗时最长的步骤
  6. 命令持续复制。主节点会持续地把写命令发送给从节点, 保证主从 数据一致性

2.2 数据同步

  • 全量复制: 一般用于初次复制场景, Redis早期支持的复制功能只有全 量复制, 它会把主节点全部数据一次性发送给从节点, 当数据量较大时, 会对主从节点和网络造成很大的开销。
  • 部分复制: 用于处理在主从复制中因网络闪断等原因造成的数据丢失 场景, 当从节点再次连上主节点后, 如果条件允许, 主节点会补发丢失数据给从节点。 因为补发的数据远远小于全量数据, 可以有效避免全量复制的过高开销

从节点使用psync命令完成部分复制和全量复制功能, 命令格式:

1
psync {runId} {offset}

Options:

runId: 从节点所复制主节点的运行id; offset: 当前从节点已复制的数据偏移量

流程说明:

  1. 从节点(slave) 发送psync命令给主节点, 参数runId是当前从节点保 存的主节点运行ID, 如果没有则默认值为, 参数offset是当前从节点保存的 复制偏移量, 如果是第一次参与复制则默认值为-1。
  2. 主节点(master) 根据psync参数和自身数据情况决定响应结果: ·
    • 如果回复+FULLRESYNC{runId}{offset}, 那么从节点将触发全量复制流程。
    • 如果回复+CONTINUE, 从节点将触发部分复制流程。
    • 如果回复+ERR, 说明主节点版本低于Redis2.8, 无法识别psync命令, 从节点将发送旧版的sync命令触发全量复制流程。

2.3 全量复制

触发全量复制的命令是sync(版本<2.8)和psync(版本>=2.8)

psync流程如下:

  1. 发送psync命令进行数据同步, 由于是第一次进行复制, 从节点没有 复制偏移量和主节点的运行ID, 所以发送psync-1。
  2. 主节点根据psync-1解析出当前为全量复制, 回复+FULLRESYNC响 应。
  3. 从节点接收主节点的响应数据保存运行ID和偏移量offset
  4. 主节点执行bgsave保存RDB文件到本地
  5. 主节点发送RDB文件给从节点, 从节点把接收的RDB文件保存在本 地并直接作为从节点的数据文件
  6. 对于从节点开始接收RDB快照到接收完成期间, 主节点仍然响应读 写命令, 因此主节点会把这期间写命令数据保存在复制客户端缓冲区内, 当 从节点加载完RDB文件后, 主节点再把缓冲区内的数据发送给从节点, 保证 主从之间数据一致性
  7. 从节点接收完主节点传送来的全部数据后会清空自身旧数据
  8. 从节点清空数据后开始加载RDB文件, 对于较大的RDB文件, 这一 步操作依然比较耗时
  9. 从节点成功加载完RDB后, 如果当前节点开启了AOF持久化功能, 它会立刻做bgrewriteaof操作, 为了保证全量复制后AOF持久化文件立刻可 用

2.4 部分复制

流程如下:

  1. 当主从节点之间网络出现中断时, 如果超过repl-timeout时间, 主节点会认为从节点故障并中断复制连接
  2. 主从连接中断期间主节点依然响应命令, 但因复制连接中断命令无 法发送给从节点, 不过主节点内部存在的复制积压缓冲区, 依然可以保存最 近一段时间的写命令数据, 默认最大缓存1MB
  3. 当主从节点网络恢复后, 从节点会再次连上主节点
  4. 当主从连接恢复后, 由于从节点之前保存了自身已复制的偏移量和 主节点的运行ID。 因此会把它们当作psync参数发送给主节点, 要求进行部 分复制操作。
  5. 主节点接到psync命令后首先核对参数runId是否与自身一致, 如果一 致, 说明之前复制的是当前主节点; 之后根据参数offset在自身复制积压缓 冲区查找, 如果偏移量之后的数据存在缓冲区中, 则对从节点发送 +CONTINUE响应, 表示可以进行部分复制
  6. 主节点根据偏移量把复制积压缓冲区里的数据发送给从节点, 保证 主从复制进入正常状态

2.5 心跳机制

主从心跳判断机制:

  1. 主从节点彼此都有心跳检测机制, 各自模拟成对方的客户端进行通 信, 通过client list命令查看复制相关客户端信息, 主节点的连接状态为 flags=M, 从节点连接状态为flags=S。
  2. 主节点默认每隔10秒对从节点发送ping命令, 判断从节点的存活性 和连接状态。 可通过参数repl-ping-slave-period控制发送频率。
  3. 从节点在主线程中每隔1秒发送replconf ack{offset}命令, 给主节点 上报自身当前的复制偏移量

主节点根据replconf命令判断从节点超时时间, 体现在info replication统 计中的lag信息中, lag表示与从节点最后一次通信延迟的秒数, 正常延迟应 该在0和1之间。 如果超过repl-timeout配置的值(默认60秒) , 则判定从节点 下线并断开复制客户端连接

【redis】redis 持久化

发表于 2018-09-13 | 分类于 redis

[TOC]

redis 持久化

Redis支持RDB和AOF两种持久化机制, 持久化功能有效地避免因进程退出造成的数据丢失问题, 当下次重启时利用之前持久化的文件即可实现数据恢复

1. RDB

RDB持久化是把当前进程数据生成快照保存到硬盘的过程, 触发RDB持 久化过程分为手动触发和自动触发

1.1 触发机制

  1. 手动触发分别对应save和bgsave命令

    • save命令:阻塞当前Redis服务器, 直到RDB过程完成为止, 对于内存 比较大的实例会造成长时间阻塞, 线上环境不建议使用
    • bgsave命令:Redis进程执行fork操作创建子进程, RDB持久化过程由子 进程负责, 完成后自动结束。 阻塞只发生在fork阶段, 一般时间很短
  2. 自动触发

    • 使用save相关配置, 如“save m n”。 表示m秒内数据集存在n次修改 时, 自动触发bgsave。
    • 如果从节点执行全量复制操作, 主节点自动执行bgsave生成RDB文件并发送给从节点, 更多细节见6.3节介绍的复制原理。
    • 执行debug reload命令重新加载Redis时, 也会自动触发save操作。
    • 默认情况下执行shutdown命令时, 如果没有开启AOF持久化功能则 自动执行bgsave。

1.2 流程分析

  1. 执行bgsave命令, Redis父进程判断当前是否存在正在执行的子进 程, 如RDB/AOF子进程, 如果存在bgsave命令直接返回
  2. 父进程执行fork操作创建子进程, fork操作过程中父进程会阻塞, 通 过info stats命令查看latest_fork_usec选项, 可以获取最近一个fork操作的耗 时, 单位为微秒
  3. 父进程fork完成后, bgsave命令返回“Background saving started”信息 并不再阻塞父进程, 可以继续响应其他命令
  4. 子进程创建RDB文件, 根据父进程内存生成临时快照文件, 完成后 对原有文件进行原子替换
  5. 进程发送信号给父进程表示完成, 父进程更新统计信息

1.3 优缺点

RDB的优点:

  • RDB是一个紧凑压缩的二进制文件, 代表Redis在某个时间点上的数据 快照。 非常适用于备份, 全量复制等场景。 比如每6小时执行bgsave备份, 并把RDB文件拷贝到远程机器或者文件系统中(如hdfs) , 用于灾难恢复。
  • Redis加载RDB恢复数据远远快于AOF的方式。

RDB的缺点:

  • RDB方式数据没办法做到实时持久化/秒级持久化。 因为bgsave每次运 行都要执行fork操作创建子进程, 属于重量级操作, 频繁执行成本过高。
  • RDB文件使用特定二进制格式保存, Redis版本演进过程中有多个格式 的RDB版本, 存在老版本Redis服务无法兼容新版RDB格式的问题。

2. AOF

以独立日志的方式记录每次写命令, 重启时再重新执行AOF文件中的命令达到恢复数据的目的。

appendonly yes 开启AOF功能

2.1 流程分析

  1. 所有的写入命令会追加到aof_buf(缓冲区) 中
  2. AOF缓冲区根据对应的策略向硬盘做同步操作。
  3. 随着AOF文件越来越大, 需要定期对AOF文件进行重写, 达到压缩 的目的。
  4. 当Redis服务器重启时, 可以加载AOF文件进行数据恢复

2.2 文件同步

Redis提供了多种AOF缓冲区同步文件策略, 由参数appendfsync控制

有三个选项:

  • always:每次有新命令追加到 AOF 文件时就执行一次 fsync ,非常慢,也非常安全。
  • everysec(每秒 fsync 一次):命令写入aof_buf后调用操作系统write操作,write完成后线程返回,fsync同步文件操作由专门线程每秒调用一次。足够快(和使用 RDB 持久化差不多),并且在故障时只会丢失 1 秒钟的数据。
  • no (从不 fsync) :将数据交给操作系统来处理。更快,也更不安全的选择。

2.3 重写机制

重写后文件变小:

  • 进程内已经超时的数据不再写入文件。
  • 旧的AOF文件含有无效命令, 如del key1、 hdel key2、 srem keys、 set a111、 set a222等。 重写使用进程内数据直接生成, 这样新的AOF文件只保 留最终数据的写入命令。
  • 多条写命令可以合并为一个, 如: lpush list a、 lpush list b、 lpush list c可以转化为: lpush list a b c。 为了防止单条命令过大造成客户端缓冲区溢 出, 对于list、 set、 hash、 zset等类型操作, 以64个元素为界拆分为多条。

AOF重写过程可以手动触发和自动触发:

  • 手动触发: 直接调用bgrewriteaof命令。
  • 自动触发: 根据auto-aof-rewrite-min-size和auto-aof-rewrite-percentage参 数确定自动触发时机
    • auto-aof-rewrite-min-size: 表示运行AOF重写时文件最小体积, 默认 为64MB。
    • auto-aof-rewrite-percentage: 代表当前AOF文件空间 (aof_current_size) 和上一次重写后AOF文件空间(aof_base_size) 的比 值。
    • 自动触发时机: aof_current_size>auto-aof-rewrite-minsize &&(aof_current_size-aof_base_size) /aof_base_size>=auto-aof-rewritepercentage 其中aof_current_size和aof_base_size可以在info Persistence统计信息中查 看

重写流程:

  1. 执行AOF重写请求

如果当前进程正在执行AOF重写, 请求不执行并返回如下响应:

1
ERR Background append only file rewriting already in progress

如果当前进程正在执行bgsave操作, 重写命令延迟到bgsave完成之后再 执行, 返回如下响应

1
Background append only file rewriting scheduled
  1. 父进程执行fork创建子进程, 开销等同于bgsave过程。

  2. 1 主进程fork操作完成后, 继续响应其他命令。 所有修改命令依然写 入AOF缓冲区并根据appendfsync策略同步到硬盘, 保证原有AOF机制正确 性。

    2 由于fork操作运用写时复制技术, 子进程只能共享fork操作时的内 存数据。 由于父进程依然响应命令, Redis使用“AOF重写缓冲区”保存这部 分新数据, 防止新AOF文件生成期间丢失这部分数据。

  3. 子进程根据内存快照, 按照命令合并规则写入到新的AOF文件。 每 次批量写入硬盘数据量由配置aof-rewrite-incremental-fsync控制, 默认为 32MB, 防止单次刷盘数据过多造成硬盘阻塞。

    1. 新AOF文件写入完成后, 子进程发送信号给父进程, 父进程更新 统计信息, 具体见info persistence下的aof_*相关统计。
    2. 父进程把AOF重写缓冲区的数据写入到新的AOF文件。
    3. 使用新AOF文件替换老文件, 完成AOF重写。

【redis】redis 特性

发表于 2018-09-12 | 分类于 redis

[TOC]

1. redis 特性

1.1 速度快

  • Redis的所有数据都是存放在内存中的
  • Redis是用C语言实现的, 一般来说C语言实现的程序“距离”操作系统更近, 执行速度相对会更快。
  • Redis使用了单线程架构, 预防了多线程可能产生的竞争问题。
  • 作者对于Redis源代码可以说是精打细磨, 曾经有人评价Redis是少有的集性能和优雅于一身的开源代码

1.2 基于键值对的数据结构服务器

主要提供了5种数据结构: 字符串、 哈希、 列表、 集合、 有序集合
同时在字符串的基础之上演变出了位图(Bitmaps) 和HyperLogLog两种神奇的“数据结构”

1.3 丰富的功能

  • 提供了键过期功能, 可以用来实现缓存。
  • 提供了发布订阅功能, 可以用来实现消息系统。
  • 支持Lua脚本功能, 可以利用Lua创造出新的Redis命令。
  • 提供了简单的事务功能, 能在一定程度上保证事务特性。
  • 提供了流水线(Pipeline) 功能, 这样客户端能将一批命令一次性传到Redis, 减少了网络的开销。

1.4 简单稳定

  • Redis的源码很少
  • Redis使用单线程模型
  • Redis不需要依赖于操作系统中的类库

1.5 客户端语言多

1.6 持久化

Redis提供了两种持久化方式: RDB和AOF, 即可以用两种策略将内存的数据保存到硬盘中

1.7 主从复制

Redis提供了复制功能, 实现了多个相同数据的Redis副本

1.8 高可用和分布式

Redis从2.8版本正式提供了高可用实现Redis Sentinel, 它能够保证Redis节点的故障发现和故障自动转移。
Redis从3.0版本正式提供了分布式实现Redis Cluster, 它是Redis真正的分布式实现, 提供了高可用、 读写和容量的
扩展性。

2. redis 适用场景

2.1 缓存

缓存机制几乎在所有的大型网站都有使用, 合理地使用缓存不仅可以加快数据的访问速度, 而且能够有效地降低后端数据源的压力。
Redis提供了键值过期时间设置, 并且也提供了灵活控制最大内存和内存溢出后的淘汰策略。

2.2 排行榜系统

Redis提供了列表和有序集合数据结构, 合理地使用这些数据结构可以很方便地构建各种排行榜系

2.3 计数器应用

2.4 社交网络

2.5 消息队列系统

Redis提供了发布订阅功能和阻塞队列的功能

3. 单线程架构

Redis是单线程来处理命令的, 所以一条命令从客户端达到服务端不会立刻被执行, 所有命令都会进入一个队列中, 然后逐个被执行

性能快的原因:

  • 纯内存访问, Redis将所有数据放在内存中, 内存的响应时长大约为100纳秒, 这是Redis达到每秒万级别访问的重要基础
  • 非阻塞I/O, Redis使用epoll作为I/O多路复用技术的实现, 再加上Redis自身的事件处理模型将epoll中的连接、 读写、 关闭都转换为事件, 不在网络I/O上浪费过多的时间
  • 单线程避免了线程切换和竞态产生的消耗

4. 内部编码

String字符串类型的内部编码有3种:

  • int: 8个字节的长整型。
  • embstr: 小于等于39个字节的字符串。
  • raw: 大于39个字节的字符串

Hash哈希类型的内部编码有两种:

  • ziplist(压缩列表) : 当哈希类型元素个数小于hash-max-ziplist-entries配置(默认512个) 、 同时所有值都小于hash-max-ziplist-value配置(默认64字节) 时, Redis会使用ziplist作为哈希的内部实现, ziplist使用更加紧凑的结构实现多个元素的连续存储, 所以在节省内存方面比hashtable更加优秀。
  • hashtable(哈希表) : 当哈希类型无法满足ziplist的条件时, Redis会使用hashtable作为哈希的内部实现, 因为此时ziplist的读写效率会下降, 而hashtable的读写时间复杂度为O(1) 。

List列表类型的内部编码有两种。

  • ziplist(压缩列表) : 当列表的元素个数小于list-max-ziplist-entries配置(默认512个) , 同时列表中每个元素的值都小于list-max-ziplist-value配置时(默认64字节) , Redis会选用ziplist来作为列表的内部实现来减少内存的使用。
  • linkedlist(链表) : 当列表类型无法满足ziplist的条件时, Redis会使用linkedlist作为列表的内部实现。

Set集合类型的内部编码有两种:

  • intset(整数集合) : 当集合中的元素都是整数且元素个数小于set-maxintset-entries配置(默认512个) 时, Redis会选用intset来作为集合的内部实 现, 从而减少内存的使用。
  • hashtable(哈希表) : 当集合类型无法满足intset的条件时, Redis会使 用hashtable作为集合的内部实现。

有序集合类型的内部编码有两种:

  • ziplist(压缩列表) : 当有序集合的元素个数小于zset-max-ziplistentries配置(默认128个) , 同时每个元素的值都小于zset-max-ziplist-value配 置(默认64字节) 时, Redis会用ziplist来作为有序集合的内部实现, ziplist 可以有效减少内存的使用。
  • skiplist(跳跃表) : 当ziplist条件不满足时, 有序集合会使用skiplist作 为内部实现, 因为此时ziplist的读写效率会下降。

【Java多线程】JUC集合 08. PriorityBlockingQueue

发表于 2018-08-18 | 分类于 Java多线程 , JUC集合

PriorityBlockingQueue

1. 前言

  • 线程安全的无界优先级队列

  • 基于数组实现的二叉堆,原理和结构根PriorityQueue基本一致

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//默认数组容量大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;

//数组最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

//优先级队列数组,queue[n]的2个左右子元素为queue[2*n+1]和queue[2*(n+1)]
private transient Object[] queue;

//队列元素个数
private transient int size;

//比较器,构造时可以选择传入,若没有则使用元素的自然排序
private transient Comparator<? super E> comparator;

//重入锁
private final ReentrantLock lock;

//队列为空的时候条件队列
private final Condition notEmpty;

//自旋锁
private transient volatile int allocationSpinLock;

//序列化的时候使用PriorityQueue
private PriorityQueue q;
}

UML类图如下:

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 默认构造,使用默认容量,没有比较器
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}

/**
* 最终调用的构造
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
  • 添加元素: offer(E e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();//获取锁
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);//如果元素数量大于数组大小,则进行扩容
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);//自底向上调整堆
size = n + 1;
notEmpty.signal();//唤醒在notEmpty等待的线程
} finally {
lock.unlock();//释放锁
}
return true;
}

//扩容
private void tryGrow(Object[] array, int oldCap) {
//数组扩容的时候使用自旋锁,不需要锁主锁,先释放
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small 双倍扩容
(oldCap >> 1));// 扩容1.5倍
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;//扩容后释放自旋锁
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);//复制数组元素
}
}

//使用比较器,自底向上调整堆
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (cmp.compare(x, (T) e) >= 0)//x比父节点“大”则中止循环
break;
array[k] = e;//向上调整父节点
k = parent;
}
array[k] = x;
}

流程如下:

  1. 加锁,检查是否需要扩容,扩容先释放主锁,使用cas自旋锁,容量最少翻倍,释放自旋锁;可能存在竞争,检查是否扩容,如果扩容则复制数组,再度加主锁;
  2. 看构造入参是否有comparator,没有就使用自然排序;从数组待插入位置和父节点进行比较,如果大于父节点,那就直接待插入位置插入,否则就跟父节点交换,然后循环向上查找;队列数量加1,唤醒非空条件队列上的线程,最后释放锁。
  • 取出元素: take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();//数组没有元素则阻塞
} finally {
lock.unlock();
}
return result;
}


private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];//取出数组头节点
E x = (E) array[n];
array[n] = null;//清掉最后一个元素
Comparator<? super E> cmp = comparator;
if (cmp == null)//将数组尾节点自顶向下调整
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
//自顶向下调整
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1; //最后一个叶子节点的父节点位置
while (k < half) {
int child = (k << 1) + 1;//左节点
Object c = array[child];
int right = child + 1;//右节点
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];//取左右节点间较小的
if (cmp.compare(x, (T) c) <= 0)//父节点和左右节点较小值比较
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}

流程如下:

  1. 加锁,获取queue[0],清掉堆的最后一个叶子节点,并将其作为比较节点
  2. 调用从顶向下调整的方法:待调整位置节点左右子节点和之前的叶子节点比较,如果之前叶子节点最小,那就直接放入待调整位置;如果是子节点小,那就取小的那个放入待调整位置,并从子节点位置重新循环查找,循环次数根据2分查找,基本是元素数量的一半就到找到位置
  • 删除元素:remove(Object o)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(o);//查找o对于的位置
if (i == -1)//查找不到该元素
return false;
removeAt(i);//删除i处元素
return true;
} finally {
lock.unlock();
}
}

private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
for (int i = 0; i < n; i++)//数组遍历
if (o.equals(array[i]))
return i;
}
return -1;
}

private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) // removed last element
array[i] = null;//直接删除数组最后元素
else {
E moved = (E) array[n];
array[n] = null;//调整最后一个元素
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(i, moved, array, n);//从删除的位置处自顶向下调整元素
else
siftDownUsingComparator(i, moved, array, n, cmp);
//经过从上向下调整后,如果是直接将比较节点放在待调整位置,那只能说明这个节点在以它为堆顶的堆里面最小,但不能说明从这个节点就向上查找就最大,所以这里需要自底向上再来一次调整
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}

3. 参考

https://blog.csdn.net/xiaoxufox/article/details/51860543

【Java多线程】JUC集合 07. ConcurrentLinkedQueue

发表于 2018-08-18 | 分类于 Java多线程 , JUC集合

ConcurrentLinkedQueue

1. 前言

  • 基于链表的线程安全的队列,适用于高并发
  • 无界FIFO队列

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {

private transient volatile Node<E> head;

private transient volatile Node<E> tail;
}

/**
*Node内部类
*/
private static class Node<E> {
volatile E item;
volatile Node<E> next;

/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
}

UML类图如下:

ConcurrentLinkedQueue的链表Node中的item和next的类型是volatile,通过CAS来设置值。

ConcurrentLinkedQueue是通过volatile + CAS来实现多线程对竞争资源的互斥访问的

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

//创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
  • 添加元素: offer(E e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
checkNotNull(e);//不允许空
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {//q是尾节点p的next,设置p的下一节点为newNode
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;//若尾节点没有发生变化的话,那么,应该是头节点发生了变化,则设置p为头节点,然后重新遍历链表;否则(尾节点变化的话),则设置p为尾节点。
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;//如果p和t相等,则设置p为q。否则的话,判断“尾节点是否发生变化”,没有变化的话,则设置p为q;否则,设置p为尾节点
}
}
  • 取出元素: poll()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public E poll() {
// 设置“标记”
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;

// 情况1
// 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
// 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
if (item != null && p.casItem(item, null)) {
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 情况2
// 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 情况3
// 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
else if (p == q)
continue restartFromHead;
// 情况4
// 设置p为q
else
p = q;
}
}
}

//更新表头为p,并且设置h的next为自身,帮助gc
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
  • size()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//遍历链表获取size
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
//返回队列头节点
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3498995.html

【Java多线程】JUC集合 06. LinkedBlockingDeque

发表于 2018-08-18 | 分类于 Java多线程 , JUC集合

LinkedBlockingDeque

1. 前言

  • 双向链表实现的双向并发阻塞队列,可同时在队列的头和尾进行插入/删除操作
  • 默认大小Integer.MAX_VALUE,可设置容量,防止过度膨胀
  • 线程安全

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {

transient Node<E> first;//表头

transient Node<E> last;//表尾

/** Number of items in the deque */
private transient int count;

/** Maximum number of items in the deque */
private final int capacity;

/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();

/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();

/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
}

//双向链表节点
static final class Node<E> {
/**
* The item, or null if this node has been removed.
*/
E item;

/**
* One of:
* - the real predecessor Node
* - this Node, meaning the predecessor is tail
* - null, meaning there is no predecessor
*/
Node<E> prev;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head
* - null, meaning there is no successor
*/
Node<E> next;

Node(E x) {
item = x;
}
}

UML类图:

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
  • 添加元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//添加到队列尾部
public boolean offerLast(E e) {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();//获取锁
try {
return linkLast(node);
} finally {
lock.unlock();//释放锁
}
}

private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;//如果“双向链表的节点数量” > “容量”,则返回false,表示插入失败
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();//唤醒notEmpty上的等待线程
return true;
}
  • 取出元素: take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E take() throws InterruptedException {
return takeFirst();//取出头部节点
}

public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();//若队列为空,则等待
return x;
} finally {
lock.unlock();
}
}

private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();//唤醒notFull上等待的线程
return item;
}
  • iterator()

提供两种遍历方式:正向遍历和反向遍历

1
2
3
4
5
6
7
8
9
10
11
/** Forward iterator */
private class Itr extends AbstractItr {
Node<E> firstNode() { return first; }
Node<E> nextNode(Node<E> n) { return n.next; }
}

/** Descending iterator */
private class DescendingItr extends AbstractItr {
Node<E> firstNode() { return last; }
Node<E> nextNode(Node<E> n) { return n.prev; }
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3503480.html

【Java多线程】JUC集合 05. LinkedBlockingQueue

发表于 2018-08-18 | 分类于 Java多线程 , JUC集合

LinkedBlockingQueue

1. 前言

  • 单向链表实现的FIFO阻塞队列
  • 链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低
  • 默认容量大小等于Integer.MAX_VALUE ,可设置队列容量大小

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

private final int capacity;//链表容量
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();//链表实际大小
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;//从head取数据
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;//从last插入数据

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();//取出锁

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();//非空 条件

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();//插入锁

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();//未满 条件
}

UML类图:

  • 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它线程向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。
  • 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它线程取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);//默认大小
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
  • 添加元素:offer(E e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)//队列已满
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();//插入锁 加锁
try {
if (count.get() < capacity) {
enqueue(node);//入队
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();//c+1后队列未满,则唤醒notFull上的等待线程
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//入队前队列为空,则唤醒notEmpty上的等待线程
return c >= 0;
}

//添加元素到队列尾部
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

//唤醒notEmpty上的等待线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
  • 取出元素: take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常
try {
while (count.get() == 0) {//队列为空
notEmpty.await();
}
x = dequeue();//出队
c = count.getAndDecrement();//取出元素后,将count-1,返回原始count
if (c > 1)
notEmpty.signal();//唤醒notEmpty上等待的线程
} finally {
takeLock.unlock();
}
if (c == capacity)//取出元素前,队列是满的,则唤醒在notFull上等待的线程
signalNotFull();
return x;
}
//删除队列头节点,并把head设置为h.next
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

//唤醒在notFull上等待的线程
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
  • iterator()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public Iterator<E> iterator() {
return new Itr();
}

private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/

private Node<E> current;
private Node<E> lastRet;
private E currentElement;

Itr() {
fullyLock();//同时获取插入锁和取出锁
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}

public boolean hasNext() {
return current != null;
}
public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);//下一个节点
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3503458.html

【Java多线程】JUC集合 04. ArrayBlockingQueue

发表于 2018-08-16 | 分类于 Java多线程 , JUC集合

ArrayBlockingQueue

1. 前言

  • 通过循环数组实现的线程安全的有界阻塞队列
  • FIFO队列
  • 内部通过互斥锁ReentrantLock来实现多线程对竞争资源的互斥访问

2. 源码分析

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex; //下一个被取出元素的索引

/** items index for next put, offer, or add */
int putIndex; //下一个被添加元素的索引

/** Number of elements in the queue */
int count; //队列中元素个数

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;
}

UML类图如下:

2.2 核心函数

  • 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

//capacity: 数组容量, fail: true/false - 公平锁/非公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();//锁非空 条件
notFull = lock.newCondition();//锁满 条件
}

public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;//入数组指针
} finally {
lock.unlock();
}
}
  • 添加元素:offer(E e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//将e插入阻塞队列尾部,如果队列已满,返回false; 否则插入成功,返回true
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

实际调用enqueue(E x)方法:

1
2
3
4
5
6
7
8
9
10
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();//唤醒notEmpty上等待的线程
}
  • 取出元素: take()
1
2
3
4
5
6
7
8
9
10
11
12
//取出阻塞队列的头部元素并返回,若队列为空则等待
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

实际调用的dequeue()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//唤醒notFull上等待的线程
return x;
}

3. 参考

http://www.cnblogs.com/skywang12345/p/3498652.html

12…5
JumpsZ

JumpsZ

42 日志
10 分类
8 标签
© 2018 JumpsZ
All rights reserved
|
本站访客数: