- Redis 支持 6 种核心数据结构:字符串、列表、集合、有序集合、哈希和流
- 使用 cache-aside 模式配合 TTL 进行缓存,按业务选择合适的淘汰策略
- Pub/Sub 适用于实时广播,Streams 适用于可靠的事件处理和消费者组
- 使用 Lua 脚本实现原子性多步操作,避免竞态条件
- Redis Cluster 提供自动分片和故障转移,Sentinel 提供独立 Redis 的高可用
- 通过管道(pipelining)批处理命令可提升 10-100 倍吞吐量
- 使用 ACL、TLS、网络隔离保障生产安全,禁止直接暴露到公网
- 使用 INFO、SLOWLOG 和 Prometheus Redis Exporter 进行监控和告警
1. Redis 数据结构
Redis 不仅仅是一个键值存储。它是一个数据结构服务器,支持多种丰富的数据类型,每种类型都有一组专用命令。理解这些数据结构是有效使用 Redis 的基础。
字符串 (Strings)
字符串是 Redis 最基础的数据类型。它可以存储文本、整数或二进制数据(最大 512MB)。字符串支持原子性的 INCR/DECR 操作,非常适合计数器和分布式锁。
# String operations
SET user:1001:name "Alice"
GET user:1001:name # "Alice"
# Atomic increment/decrement
SET page:views 0
INCR page:views # 1
INCRBY page:views 10 # 11
# Set with TTL (seconds)
SET session:abc123 "user_data" EX 3600
TTL session:abc123 # 3600
# Set only if not exists (distributed lock)
SET lock:order:5001 "worker-1" NX EX 30
# Multiple operations
MSET user:1:name "Alice" user:1:email "alice@example.com"
MGET user:1:name user:1:email列表 (Lists)
列表是有序的字符串集合,底层使用快速列表(quicklist)实现。支持两端的推入和弹出操作,非常适合消息队列、最近活动记录和时间线。
# List operations — task queue
LPUSH queue:emails "email_1" "email_2" "email_3"
RPOP queue:emails # "email_1" (FIFO)
LLEN queue:emails # 2
# Blocking pop (wait up to 30s)
BRPOP queue:emails 30
# Recent activity feed (keep last 100)
LPUSH feed:user:1001 "liked post #42"
LTRIM feed:user:1001 0 99
LRANGE feed:user:1001 0 9 # Last 10 items集合 (Sets)
集合是无序的唯一字符串集合。支持交集、并集和差集运算,适合标签系统、唯一访客计数和好友关系。
# Set operations — tagging
SADD article:1001:tags "redis" "database" "nosql"
SADD article:1002:tags "redis" "caching" "performance"
# Intersection — articles sharing tags
SINTER article:1001:tags article:1002:tags # ["redis"]
# Union — all tags
SUNION article:1001:tags article:1002:tags
# ["redis", "database", "nosql", "caching", "performance"]
# Membership check
SISMEMBER article:1001:tags "redis" # 1 (true)
SCARD article:1001:tags # 3 (count)有序集合 (Sorted Sets)
有序集合类似于集合,但每个元素都关联一个分数(score),元素按分数排序。非常适合排行榜、优先级队列和时间序列索引。
# Sorted set — game leaderboard
ZADD leaderboard 1500 "player:alice"
ZADD leaderboard 2200 "player:bob"
ZADD leaderboard 1800 "player:charlie"
ZADD leaderboard 3100 "player:diana"
# Top 3 players (highest scores)
ZREVRANGE leaderboard 0 2 WITHSCORES
# ["player:diana", "3100", "player:bob", "2200", "player:charlie", "1800"]
# Rank of a player (0-indexed, descending)
ZREVRANK leaderboard "player:bob" # 1
# Increment score
ZINCRBY leaderboard 500 "player:alice" # 2000
# Range by score
ZRANGEBYSCORE leaderboard 1500 2500 WITHSCORES哈希 (Hashes)
哈希是字段-值对的集合,类似于对象或字典。适合存储对象数据(用户档案、配置项等),比将 JSON 序列化为字符串更高效。
# Hash — user profile
HSET user:1001 name "Alice" email "alice@example.com" age 28 role "admin"
HGET user:1001 name # "Alice"
HGETALL user:1001
# {name: "Alice", email: "alice@example.com", age: "28", role: "admin"}
# Update specific fields
HSET user:1001 age 29 last_login "2026-02-28"
# Increment numeric field
HINCRBY user:1001 login_count 1
# Check field existence
HEXISTS user:1001 email # 1 (true)
HDEL user:1001 role流 (Streams)
Redis Streams 是 5.0 引入的追加日志数据结构,支持消费者组、消息确认和持久化。类似于 Apache Kafka 的轻量级替代品。
# Stream — event log
XADD events * type "order" user_id "1001" amount "59.99"
XADD events * type "payment" user_id "1001" status "completed"
# Read last 10 events
XREVRANGE events + - COUNT 10
# Create consumer group
XGROUP CREATE events order-processors $ MKSTREAM
# Consumer reads (blocks up to 5000ms)
XREADGROUP GROUP order-processors worker-1 COUNT 1 BLOCK 5000 STREAMS events >
# Acknowledge processed message
XACK events order-processors 1677000000000-0
# Check pending messages
XPENDING events order-processors - + 10数据结构对比
| 类型 | 最佳用途 | 时间复杂度 | 最大大小 |
|---|---|---|---|
| String | 缓存、计数器、分布式锁 | O(1) | 512 MB |
| List | 队列、活动流、最近项 | O(1) push/pop | 4B+ 元素 |
| Set | 标签、唯一值、关系 | O(1) add/check | 4B+ 成员 |
| Sorted Set | 排行榜、范围查询、优先级队列 | O(log N) | 4B+ 成员 |
| Hash | 对象存储、用户档案、配置 | O(1) per field | 4B+ 字段 |
| Stream | 事件日志、消息队列、审计 | O(1) append | 受内存限制 |
2. Redis 缓存策略
缓存是 Redis 最常见的用途之一。正确的缓存策略可以将数据库负载降低 80% 以上,并将响应时间从数百毫秒降至亚毫秒级。
TTL 策略与淘汰策略
# TTL strategies
SET product:1001 '{"name":"Widget","price":29.99}' EX 3600 # 1 hour
SET user:session:abc '{"uid":1001}' PX 1800000 # 30 min (ms)
# Check remaining TTL
TTL product:1001 # seconds remaining
PTTL user:session:abc # milliseconds remaining
# Refresh TTL on access (sliding expiration)
GET product:1001
EXPIRE product:1001 3600
# Set TTL only if key exists
EXPIRE product:9999 3600 # 0 (key does not exist)
# Remove TTL (make persistent)
PERSIST product:1001
# --- Eviction policies (redis.conf) ---
# maxmemory 4gb
# maxmemory-policy allkeys-lru # Best for general caching
# maxmemory-policy allkeys-lfu # Best for skewed workloads
# maxmemory-policy volatile-lru # Only evict keys with TTL
# maxmemory-policy volatile-ttl # Evict soonest-expiring first
# maxmemory-policy noeviction # Return errors when fullCache-Aside vs Write-Through 模式
Cache-Aside(旁路缓存)是最常用的模式:应用先查缓存,未命中则查数据库并写入缓存。Write-Through 模式在每次写入数据库时同步更新缓存,保证一致性但增加写延迟。
// Cache-Aside pattern (Node.js with ioredis)
async function getUser(userId: string) {
const cacheKey = `user:${userId}`;
// 1. Check cache
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached); // Cache hit
}
// 2. Cache miss — fetch from DB
const user = await db.query("SELECT * FROM users WHERE id = $1", [userId]);
// 3. Populate cache with TTL
await redis.set(cacheKey, JSON.stringify(user), "EX", 3600);
return user;
}
// Write-Through pattern
async function updateUser(userId: string, data: Partial<User>) {
// 1. Update database
const user = await db.query(
"UPDATE users SET name=$1, email=$2 WHERE id=$3 RETURNING *",
[data.name, data.email, userId]
);
// 2. Update cache synchronously
const cacheKey = `user:${userId}`;
await redis.set(cacheKey, JSON.stringify(user), "EX", 3600);
return user;
}
// Cache invalidation on delete
async function deleteUser(userId: string) {
await db.query("DELETE FROM users WHERE id = $1", [userId]);
await redis.del(`user:${userId}`);
}3. Redis Pub/Sub 与 Streams
Redis 提供两种消息传递机制:Pub/Sub 用于实时即发即忘广播,Streams 用于持久化的可靠消息处理。
Pub/Sub 实时消息
// Publisher (Node.js)
import Redis from "ioredis";
const publisher = new Redis();
async function publishEvent(channel: string, event: object) {
await publisher.publish(channel, JSON.stringify(event));
}
// Publish order events
await publishEvent("orders", {
type: "order.created",
orderId: "ORD-5001",
userId: "1001",
total: 99.99,
timestamp: Date.now(),
});
// Subscriber
const subscriber = new Redis();
subscriber.subscribe("orders", "payments", (err, count) => {
console.log(`Subscribed to ${count} channels`);
});
subscriber.on("message", (channel, message) => {
const event = JSON.parse(message);
console.log(`[${channel}] ${event.type}:`, event);
});
// Pattern subscription (wildcard)
subscriber.psubscribe("orders.*", (err, count) => {
console.log(`Pattern subscribed to ${count} patterns`);
});
subscriber.on("pmessage", (pattern, channel, message) => {
console.log(`[${pattern}] ${channel}:`, message);
});Streams 与消费者组
// Redis Streams with consumer groups (Node.js)
import Redis from "ioredis";
const redis = new Redis();
// Producer: add events to stream
async function addOrderEvent(order: { id: string; userId: string; total: number }) {
const id = await redis.xadd(
"stream:orders",
"*", // Auto-generate ID
"orderId", order.id,
"userId", order.userId,
"total", String(order.total),
"timestamp", String(Date.now())
);
return id;
}
// Create consumer group (run once)
await redis.xgroup("CREATE", "stream:orders", "order-service", "$", "MKSTREAM")
.catch(() => {}); // Ignore if group already exists
// Consumer: read and process
async function consumeOrders(consumerName: string) {
while (true) {
const results = await redis.xreadgroup(
"GROUP", "order-service", consumerName,
"COUNT", "10",
"BLOCK", "5000", // Block 5s if no messages
"STREAMS", "stream:orders", ">"
);
if (results) {
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
// Process the order
console.log(`Processing order ${fields[1]} for user ${fields[3]}`);
// Acknowledge after successful processing
await redis.xack("stream:orders", "order-service", id);
}
}
}
}
}
// Start consumers
consumeOrders("worker-1");
consumeOrders("worker-2");4. Redis 事务与 Lua 脚本
Redis 事务(MULTI/EXEC)保证一组命令的原子执行。但对于需要条件逻辑的场景,Lua 脚本是更强大的选择——脚本在服务端原子执行,无竞态条件。
MULTI/EXEC 事务
# Basic transaction
MULTI
SET account:1001:balance 500
SET account:1002:balance 300
EXEC
# Both commands execute atomically
# Optimistic locking with WATCH
WATCH account:1001:balance
# Read current balance
GET account:1001:balance # "500"
MULTI
DECRBY account:1001:balance 100
INCRBY account:1002:balance 100
EXEC
# EXEC returns nil if account:1001:balance changed between WATCH and EXEC
# Application must retry in that caseLua 脚本(原子操作)
-- Lua: atomic transfer between accounts
-- KEYS[1] = source account, KEYS[2] = destination account
-- ARGV[1] = transfer amount
local source_balance = tonumber(redis.call("GET", KEYS[1]))
local amount = tonumber(ARGV[1])
if source_balance >= amount then
redis.call("DECRBY", KEYS[1], amount)
redis.call("INCRBY", KEYS[2], amount)
return 1 -- success
else
return 0 -- insufficient funds
end// Execute Lua script from Node.js (ioredis)
const transferScript = `
local source_balance = tonumber(redis.call("GET", KEYS[1]))
local amount = tonumber(ARGV[1])
if source_balance >= amount then
redis.call("DECRBY", KEYS[1], amount)
redis.call("INCRBY", KEYS[2], amount)
return 1
else
return 0
end
`;
// Define custom command
redis.defineCommand("transfer", {
numberOfKeys: 2,
lua: transferScript,
});
// Use it
const result = await (redis as any).transfer(
"account:1001:balance",
"account:1002:balance",
"100"
);
console.log(result === 1 ? "Transfer successful" : "Insufficient funds");5. Redis Cluster 与 Sentinel
生产环境的 Redis 需要高可用性和可扩展性。Redis Sentinel 提供自动故障转移,Redis Cluster 提供数据分片和分布式处理。
Redis Sentinel 配置
# sentinel.conf
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1
sentinel auth-pass mymaster your_strong_password
# Start sentinel
redis-sentinel /etc/redis/sentinel.conf
# Check sentinel status
redis-cli -p 26379 SENTINEL masters
redis-cli -p 26379 SENTINEL replicas mymaster
redis-cli -p 26379 SENTINEL get-master-addr-by-name mymasterRedis Cluster 部署
# Create a 6-node cluster (3 masters + 3 replicas)
# Start 6 Redis instances on ports 7000-7005
for port in 7000 7001 7002 7003 7004 7005; do
mkdir -p /etc/redis/cluster/$port
cat > /etc/redis/cluster/$port/redis.conf << EOF
port $port
cluster-enabled yes
cluster-config-file nodes-$port.conf
cluster-node-timeout 5000
appendonly yes
appendfilename "appendonly-$port.aof"
requirepass your_password
masterauth your_password
EOF
redis-server /etc/redis/cluster/$port/redis.conf &
done
# Create the cluster
redis-cli --cluster create \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1 -a your_password
# Check cluster info
redis-cli -p 7000 -a your_password CLUSTER INFO
redis-cli -p 7000 -a your_password CLUSTER NODES
# Add a new node
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000 -a your_password
# Reshard slots to the new node
redis-cli --cluster reshard 127.0.0.1:7000 -a your_password连接 Redis Cluster(Node.js)
import Redis from "ioredis";
// Connect to Redis Cluster
const cluster = new Redis.Cluster(
[
{ host: "127.0.0.1", port: 7000 },
{ host: "127.0.0.1", port: 7001 },
{ host: "127.0.0.1", port: 7002 },
],
{
redisOptions: {
password: "your_password",
},
scaleReads: "slave", // Read from replicas
natMap: {}, // NAT mapping if needed
}
);
// Connect to Sentinel
const sentinel = new Redis({
sentinels: [
{ host: "127.0.0.1", port: 26379 },
{ host: "127.0.0.1", port: 26380 },
{ host: "127.0.0.1", port: 26381 },
],
name: "mymaster",
password: "your_password",
sentinelPassword: "sentinel_password",
});6. Redis 与 Node.js
ioredis 是 Node.js 最推荐的 Redis 客户端,支持集群、Sentinel、管道、Lua 脚本和流。它提供基于 Promise 的 API 和自动重连机制。
连接与管道
import Redis from "ioredis";
// Basic connection with options
const redis = new Redis({
host: "127.0.0.1",
port: 6379,
password: "your_password",
db: 0,
maxRetriesPerRequest: 3,
retryStrategy(times) {
const delay = Math.min(times * 50, 2000);
return delay;
},
lazyConnect: true, // Connect on first command
});
await redis.connect();
// --- Pipelining: batch commands (10-100x throughput) ---
const pipeline = redis.pipeline();
for (let i = 0; i < 1000; i++) {
pipeline.set(`key:${i}`, `value:${i}`, "EX", 3600);
}
const results = await pipeline.exec();
// results: [[null, "OK"], [null, "OK"], ...]
// Pipeline with mixed read/write
const pipe = redis.pipeline();
pipe.hgetall("user:1001");
pipe.lrange("feed:1001", 0, 9);
pipe.zrevrange("leaderboard", 0, 4, "WITHSCORES");
pipe.get("config:feature_flags");
const [user, feed, topPlayers, flags] = await pipe.exec();
// --- Connection pool pattern ---
class RedisPool {
private pool: Redis[] = [];
private index = 0;
constructor(private size: number, private options: object) {
for (let i = 0; i < size; i++) {
this.pool.push(new Redis(options));
}
}
getClient(): Redis {
const client = this.pool[this.index % this.size];
this.index++;
return client;
}
async disconnectAll(): Promise<void> {
await Promise.all(this.pool.map((c) => c.quit()));
}
}7. Redis 与 Python
redis-py 是 Python 的标准 Redis 客户端。从 4.2 版本开始内置 async 支持。支持连接池、管道、Pub/Sub 和集群。
import redis
import json
from datetime import timedelta
# Connection pool (recommended for production)
pool = redis.ConnectionPool(
host="127.0.0.1",
port=6379,
password="your_password",
db=0,
max_connections=20,
decode_responses=True, # Auto-decode bytes to str
)
r = redis.Redis(connection_pool=pool)
# Basic operations
r.set("user:1001", json.dumps({"name": "Alice", "email": "alice@example.com"}))
r.expire("user:1001", timedelta(hours=1))
user = json.loads(r.get("user:1001"))
# Pipeline (batch commands)
with r.pipeline() as pipe:
pipe.hset("product:1", mapping={"name": "Widget", "price": "29.99", "stock": "150"})
pipe.hset("product:2", mapping={"name": "Gadget", "price": "49.99", "stock": "75"})
pipe.expire("product:1", 3600)
pipe.expire("product:2", 3600)
results = pipe.execute()
# Pub/Sub
pubsub = r.pubsub()
pubsub.subscribe("notifications")
for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
print(f"Received: {data}")
# --- Async redis (built-in since 4.2) ---
import redis.asyncio as aioredis
async def async_example():
r = aioredis.Redis(
host="127.0.0.1",
port=6379,
password="your_password",
decode_responses=True,
)
await r.set("async_key", "async_value", ex=3600)
value = await r.get("async_key")
print(f"Async value: {value}")
# Async pipeline
async with r.pipeline() as pipe:
await pipe.set("k1", "v1").set("k2", "v2").execute()
await r.aclose()8. Redis 限流
Redis 的原子操作和过期机制使其成为实现分布式限流的理想选择。以下是三种常见的限流算法实现。
滑动窗口限流
// Sliding Window with Sorted Set (Node.js)
async function slidingWindowRateLimit(
redis: Redis,
key: string,
limit: number,
windowSec: number
): Promise<{ allowed: boolean; remaining: number; retryAfter: number }> {
const now = Date.now();
const windowStart = now - windowSec * 1000;
const pipe = redis.pipeline();
pipe.zremrangebyscore(key, 0, windowStart); // Remove expired
pipe.zadd(key, String(now), `${now}:` + Math.random());
pipe.zcard(key); // Count in window
pipe.expire(key, windowSec); // Auto-cleanup
const results = await pipe.exec();
const count = results![2][1] as number;
if (count > limit) {
// Get oldest entry to calculate retry-after
const oldest = await redis.zrange(key, 0, 0, "WITHSCORES");
const retryAfter = oldest.length > 1
? Math.ceil((Number(oldest[1]) + windowSec * 1000 - now) / 1000)
: windowSec;
return { allowed: false, remaining: 0, retryAfter };
}
return { allowed: true, remaining: limit - count, retryAfter: 0 };
}
// Usage: 100 requests per 60 seconds
const result = await slidingWindowRateLimit(redis, "rate:user:1001", 100, 60);
if (!result.allowed) {
res.status(429).json({
error: "Rate limit exceeded",
retryAfter: result.retryAfter,
});
}令牌桶算法(Lua 脚本)
-- Token Bucket Lua Script
-- KEYS[1] = bucket key
-- ARGV[1] = max tokens, ARGV[2] = refill rate (tokens/sec)
-- ARGV[3] = current timestamp (ms), ARGV[4] = tokens to consume
local key = KEYS[1]
local max_tokens = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- Get current state
local data = redis.call("HMGET", key, "tokens", "last_refill")
local tokens = tonumber(data[1]) or max_tokens
local last_refill = tonumber(data[2]) or now
-- Calculate refill
local elapsed = (now - last_refill) / 1000
local new_tokens = math.min(max_tokens, tokens + elapsed * refill_rate)
-- Check if enough tokens
if new_tokens >= requested then
new_tokens = new_tokens - requested
redis.call("HMSET", key, "tokens", new_tokens, "last_refill", now)
redis.call("EXPIRE", key, math.ceil(max_tokens / refill_rate) * 2)
return {1, math.floor(new_tokens)} -- allowed, remaining
else
redis.call("HMSET", key, "tokens", new_tokens, "last_refill", now)
redis.call("EXPIRE", key, math.ceil(max_tokens / refill_rate) * 2)
local wait = math.ceil((requested - new_tokens) / refill_rate)
return {0, wait} -- denied, retry_after_seconds
end漏桶算法
# Leaky Bucket with Redis (Python)
import time
import redis
class LeakyBucket:
def __init__(self, r: redis.Redis, key: str, capacity: int, leak_rate: float):
"""
capacity: max requests in the bucket
leak_rate: requests processed per second
"""
self.r = r
self.key = key
self.capacity = capacity
self.leak_rate = leak_rate
def allow(self) -> bool:
now = time.time()
pipe = self.r.pipeline()
# Remove leaked (processed) requests
pipe.zremrangebyscore(self.key, 0, now - self.capacity / self.leak_rate)
pipe.zcard(self.key) # Current queue size
_, queue_size = pipe.execute()
if queue_size < self.capacity:
# Add request to queue
self.r.zadd(self.key, {f"{now}:{id(self)}": now})
self.r.expire(self.key, int(self.capacity / self.leak_rate) + 10)
return True
return False # Bucket is full
# Usage: 10 requests max, processes 2/sec
bucket = LeakyBucket(r, "leaky:api:user:1001", capacity=10, leak_rate=2.0)
if bucket.allow():
print("Request accepted")
else:
print("Rate limited — bucket full")9. 会话管理
Redis 是分布式会话存储的理想选择。它支持快速读写、自动过期和跨服务器的会话共享。
分布式会话(Express.js)
import express from "express";
import session from "express-session";
import RedisStore from "connect-redis";
import Redis from "ioredis";
const redis = new Redis({
host: "127.0.0.1",
port: 6379,
password: "your_password",
});
const app = express();
app.use(
session({
store: new RedisStore({ client: redis, prefix: "sess:" }),
secret: "your-session-secret",
resave: false,
saveUninitialized: false,
cookie: {
secure: true, // HTTPS only
httpOnly: true, // No JS access
maxAge: 24 * 60 * 60 * 1000, // 24 hours
sameSite: "lax",
},
})
);
// Session is automatically stored in Redis
app.post("/login", async (req, res) => {
const { username, password } = req.body;
const user = await authenticateUser(username, password);
if (user) {
req.session.userId = user.id;
req.session.role = user.role;
res.json({ message: "Logged in" });
} else {
res.status(401).json({ error: "Invalid credentials" });
}
});
// Logout — destroy session in Redis
app.post("/logout", (req, res) => {
req.session.destroy((err) => {
if (err) return res.status(500).json({ error: "Logout failed" });
res.clearCookie("connect.sid");
res.json({ message: "Logged out" });
});
});JWT + Redis(令牌黑名单)
import jwt from "jsonwebtoken";
import Redis from "ioredis";
const redis = new Redis();
const JWT_SECRET = process.env.JWT_SECRET!;
// Issue JWT
function issueToken(userId: string, role: string): string {
return jwt.sign(
{ sub: userId, role },
JWT_SECRET,
{ expiresIn: "1h", jti: crypto.randomUUID() }
);
}
// Revoke JWT by adding to blacklist
async function revokeToken(token: string): Promise<void> {
const decoded = jwt.decode(token) as jwt.JwtPayload;
if (!decoded?.jti || !decoded?.exp) return;
const ttl = decoded.exp - Math.floor(Date.now() / 1000);
if (ttl > 0) {
await redis.set(`blacklist:${decoded.jti}`, "1", "EX", ttl);
}
}
// Verify JWT (check blacklist)
async function verifyToken(token: string): Promise<jwt.JwtPayload | null> {
try {
const decoded = jwt.verify(token, JWT_SECRET) as jwt.JwtPayload;
// Check if token is blacklisted
const isBlacklisted = await redis.exists(`blacklist:${decoded.jti}`);
if (isBlacklisted) return null;
return decoded;
} catch {
return null;
}
}
// Middleware
async function authMiddleware(req: any, res: any, next: any) {
const token = req.headers.authorization?.replace("Bearer ", "");
if (!token) return res.status(401).json({ error: "No token" });
const payload = await verifyToken(token);
if (!payload) return res.status(401).json({ error: "Invalid token" });
req.user = payload;
next();
}10. RediSearch 与 RedisJSON
RediSearch 模块为 Redis 添加了全文搜索、二级索引和聚合功能。RedisJSON 允许在 Redis 中原生存储和操作 JSON 文档。两者结合可以构建强大的文档搜索系统。
# --- RedisJSON ---
# Store JSON document
JSON.SET product:1001 $ '{"name":"Wireless Mouse","brand":"TechCo","price":29.99,"category":"electronics","tags":["wireless","ergonomic","bluetooth"],"specs":{"dpi":1600,"battery":"AA","weight":"85g"}}'
# Read nested fields
JSON.GET product:1001 $.name # '"Wireless Mouse"'
JSON.GET product:1001 $.specs.dpi # '1600'
JSON.GET product:1001 $.tags[0] # '"wireless"'
# Update nested fields
JSON.SET product:1001 $.price 24.99
JSON.NUMINCRBY product:1001 $.specs.dpi 400 # 2000
JSON.ARRAPPEND product:1001 $.tags '"usb-c"'
# --- RediSearch ---
# Create search index on JSON documents
FT.CREATE idx:products ON JSON PREFIX 1 product: SCHEMA
$.name AS name TEXT WEIGHT 5.0
$.brand AS brand TEXT
$.category AS category TAG
$.price AS price NUMERIC SORTABLE
$.tags[*] AS tags TAG
# Full-text search
FT.SEARCH idx:products "wireless mouse" LIMIT 0 10
# Filter by category and price range
FT.SEARCH idx:products "@category:{electronics} @price:[10 50]" SORTBY price ASC
# Autocomplete suggestions
FT.SUGADD autocomplete "Wireless Mouse" 100
FT.SUGADD autocomplete "Wireless Keyboard" 90
FT.SUGGET autocomplete "wire" FUZZY MAX 5
# Aggregation — average price per category
FT.AGGREGATE idx:products "*"
GROUPBY 1 @category
REDUCE AVG 1 @price AS avg_price
REDUCE COUNT 0 AS total
SORTBY 2 @avg_price DESC在 Node.js 中使用 RediSearch
import { createClient, SchemaFieldTypes } from "redis";
const client = createClient({ url: "redis://localhost:6379" });
await client.connect();
// Create index
try {
await client.ft.create("idx:products", {
"$.name": { type: SchemaFieldTypes.TEXT, AS: "name", WEIGHT: 5 },
"$.brand": { type: SchemaFieldTypes.TEXT, AS: "brand" },
"$.category": { type: SchemaFieldTypes.TAG, AS: "category" },
"$.price": { type: SchemaFieldTypes.NUMERIC, AS: "price", SORTABLE: true },
}, { ON: "JSON", PREFIX: "product:" });
} catch (e) {
// Index already exists
}
// Store products as JSON
await client.json.set("product:2001", "$", {
name: "Mechanical Keyboard",
brand: "KeyTech",
category: "electronics",
price: 89.99,
tags: ["mechanical", "rgb", "cherry-mx"],
});
// Search
const results = await client.ft.search("idx:products", "@category:{electronics} @price:[50 100]", {
SORTBY: { BY: "price", DIRECTION: "ASC" },
LIMIT: { from: 0, size: 10 },
});
console.log(`Found ${results.total} products:`);
for (const doc of results.documents) {
console.log(doc.id, doc.value);
}11. Redis 性能调优
优化 Redis 性能需要从内存管理、持久化配置和命令优化三个方面入手。以下是生产环境中的关键调优参数。
内存优化
# redis.conf — Memory optimization
# Set memory limit
maxmemory 4gb
maxmemory-policy allkeys-lfu
# Ziplist encoding for small hashes (saves 5-10x memory)
hash-max-ziplist-entries 128
hash-max-ziplist-value 64
# Ziplist encoding for small sorted sets
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
# Listpack for small lists
list-max-ziplist-size -2 # 8kb per node
list-compress-depth 1 # Compress all but head/tail
# Intset for small integer sets
set-max-intset-entries 512
# Lazy freeing (non-blocking deletes)
lazyfree-lazy-eviction yes
lazyfree-lazy-expire yes
lazyfree-lazy-server-del yes
replica-lazy-flush yes持久化:RDB vs AOF
| 特性 | RDB | AOF | RDB + AOF |
|---|---|---|---|
| 机制 | 定期快照 | 追加写操作日志 | 两者结合 |
| 数据丢失风险 | 取决于快照间隔 | 最多丢失 1 秒 | 最多丢失 1 秒 |
| 恢复速度 | 快 | 慢(重放日志) | 快(先加载 RDB) |
| 写性能影响 | fork 时短暂影响 | 持续少量开销 | 两者都有 |
| 建议 | 仅缓存场景 | 需要持久化 | 生产环境推荐 |
# redis.conf — Persistence configuration
# RDB snapshots
save 900 1 # Snapshot if 1+ keys changed in 900s
save 300 10 # Snapshot if 10+ keys changed in 300s
save 60 10000 # Snapshot if 10000+ keys changed in 60s
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir /var/lib/redis
# AOF (recommended for production)
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec # Fsync every second (best tradeoff)
# appendfsync always # Fsync after every write (safest, slowest)
# appendfsync no # Let OS decide (fastest, riskiest)
# AOF rewrite (compact the log)
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-use-rdb-preamble yes # Hybrid format (fast load + AOF durability)性能基准与诊断
# Built-in benchmark tool
redis-benchmark -h 127.0.0.1 -p 6379 -c 50 -n 100000 -q
# Benchmark specific commands
redis-benchmark -t set,get,lpush,lpop,zadd -n 100000 -q
# Benchmark with pipeline
redis-benchmark -t set -n 1000000 -P 16 -q
# Latency diagnostics
redis-cli --latency # Continuous latency sampling
redis-cli --latency-history -i 5 # Latency over time (5s intervals)
redis-cli --latency-dist # Latency histogram
redis-cli --intrinsic-latency 10 # System baseline latency (10s)
# Memory analysis
redis-cli INFO memory
redis-cli MEMORY DOCTOR
redis-cli MEMORY USAGE user:1001 # Bytes used by specific key
# Find big keys (scan without blocking)
redis-cli --bigkeys
redis-cli --memkeys12. Redis 安全
Redis 默认设计为在可信网络中运行。生产环境必须配置认证、网络隔离和访问控制来保护数据安全。
ACL(访问控制列表)
# redis.conf — ACL configuration
# Default user (disable or set strong password)
requirepass your_very_strong_password_here
# Create users with specific permissions
# user <username> on|off [password] [commands] [keys]
# Read-only user for analytics
user analytics on >analytics_pass ~analytics:* +get +mget +hgetall +zrange +lrange -@all
# Application user with limited commands
user webapp on >webapp_secret_pass ~user:* ~session:* ~cache:* +@read +@write +@set +@hash -@admin -@dangerous
# Admin user (all permissions)
user admin on >admin_super_secret ~* +@all
# Disable default user (after creating named users)
user default off
# --- Runtime ACL management ---
ACL SETUSER readonly on >readonly_pass ~cache:* +get +mget
ACL LIST # List all users
ACL WHOAMI # Current user
ACL GETUSER webapp # Get user details
ACL DELUSER temp_user # Delete user
ACL LOG 10 # Last 10 ACL violationsTLS 加密与网络安全
# redis.conf — TLS configuration
# Enable TLS
tls-port 6380
port 0 # Disable non-TLS port
tls-cert-file /etc/redis/tls/redis.crt
tls-key-file /etc/redis/tls/redis.key
tls-ca-cert-file /etc/redis/tls/ca.crt
# Require client certificate (mutual TLS)
tls-auth-clients yes
# TLS for replication
tls-replication yes
# TLS for cluster bus
tls-cluster yes
# --- Network security ---
# Bind to specific interfaces only
bind 127.0.0.1 10.0.1.5 # Localhost + private network
# Enable protected mode
protected-mode yes
# Disable dangerous commands
rename-command FLUSHALL ""
rename-command FLUSHDB ""
rename-command DEBUG ""
rename-command CONFIG "REDIS_CONFIG_8fj3k" # Rename to obscure name
# --- Firewall rules (iptables) ---
# Allow only app servers
# iptables -A INPUT -p tcp --dport 6379 -s 10.0.1.0/24 -j ACCEPT
# iptables -A INPUT -p tcp --dport 6379 -j DROPTLS 连接(Node.js)
import Redis from "ioredis";
import fs from "fs";
const redis = new Redis({
host: "redis.example.com",
port: 6380,
username: "webapp",
password: "webapp_secret_pass",
tls: {
ca: fs.readFileSync("/path/to/ca.crt"),
cert: fs.readFileSync("/path/to/client.crt"),
key: fs.readFileSync("/path/to/client.key"),
rejectUnauthorized: true,
},
});13. Redis 监控与可观测性
监控是维护健康 Redis 部署的关键。Redis 提供内置的 INFO 命令和 SLOWLOG,结合 Prometheus + Grafana 可以实现全面的可观测性。
INFO 命令与关键指标
# Essential monitoring commands
redis-cli INFO server # Version, uptime, OS info
redis-cli INFO memory # Memory usage details
redis-cli INFO stats # Ops/sec, hits, misses
redis-cli INFO replication # Master/replica status
redis-cli INFO clients # Connected clients
redis-cli INFO keyspace # DB key counts
# Key metrics to monitor:
# used_memory / used_memory_rss — Memory usage
# connected_clients — Active connections
# instantaneous_ops_per_sec — Current throughput
# keyspace_hits / keyspace_misses — Cache hit ratio
# evicted_keys — Keys removed by eviction
# rejected_connections — Max clients reached
# rdb_last_bgsave_status — Last snapshot status
# Calculate hit ratio
# hit_ratio = keyspace_hits / (keyspace_hits + keyspace_misses) * 100
# Target: > 95% for caching workloads
# SLOWLOG — queries exceeding threshold
CONFIG SET slowlog-log-slower-than 10000 # Log queries > 10ms
CONFIG SET slowlog-max-len 128
SLOWLOG GET 10 # Last 10 slow queries
SLOWLOG LEN # Total slow query count
SLOWLOG RESET # Clear the log
# Real-time command monitoring (use briefly, high overhead)
redis-cli MONITOR # Prints all commands
# Client list (debug connections)
redis-cli CLIENT LIST
redis-cli CLIENT KILL ID 123 # Kill specific clientPrometheus + Grafana 监控
# docker-compose.yml — Redis monitoring stack
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --requirepass your_password
volumes:
- redis_data:/data
redis-exporter:
image: oliver006/redis_exporter:latest
ports:
- "9121:9121"
environment:
REDIS_ADDR: redis://redis:6379
REDIS_PASSWORD: your_password
depends_on:
- redis
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
redis_data:# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "redis"
static_configs:
- targets: ["redis-exporter:9121"]
metrics_path: /metrics告警规则
# prometheus-alerts.yml
groups:
- name: redis_alerts
rules:
- alert: RedisHighMemoryUsage
expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "Redis memory usage above 85%"
description: "Redis instance {{ $labels.instance }} memory at {{ $value | humanizePercentage }}"
- alert: RedisHighLatency
expr: redis_commands_duration_seconds_total / redis_commands_processed_total > 0.01
for: 2m
labels:
severity: critical
annotations:
summary: "Redis average latency above 10ms"
- alert: RedisLowHitRatio
expr: |
redis_keyspace_hits_total /
(redis_keyspace_hits_total + redis_keyspace_misses_total) < 0.9
for: 10m
labels:
severity: warning
annotations:
summary: "Redis cache hit ratio below 90%"
- alert: RedisReplicationBroken
expr: redis_connected_slaves < 1
for: 1m
labels:
severity: critical
annotations:
summary: "Redis has no connected replicas"
- alert: RedisTooManyConnections
expr: redis_connected_clients > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Redis has over 1000 connected clients"自定义健康检查脚本
#!/bin/bash
# redis-health-check.sh — Quick Redis health report
REDIS_CLI="redis-cli -a your_password --no-auth-warning"
echo "=== Redis Health Report ==="
echo ""
# Uptime
UPTIME=$($REDIS_CLI INFO server | grep uptime_in_days | tr -d "\r")
echo "Uptime: $UPTIME"
# Memory
USED_MEM=$($REDIS_CLI INFO memory | grep used_memory_human | tr -d "\r")
MAX_MEM=$($REDIS_CLI CONFIG GET maxmemory | tail -1)
echo "Memory: $USED_MEM (max: $MAX_MEM bytes)"
# Hit ratio
HITS=$($REDIS_CLI INFO stats | grep keyspace_hits | cut -d: -f2 | tr -d "\r")
MISSES=$($REDIS_CLI INFO stats | grep keyspace_misses | cut -d: -f2 | tr -d "\r")
if [ "$HITS" -gt 0 ] 2>/dev/null; then
RATIO=$(echo "scale=2; $HITS * 100 / ($HITS + $MISSES)" | bc)
echo "Hit Ratio: $RATIO%"
fi
# Connected clients
CLIENTS=$($REDIS_CLI INFO clients | grep connected_clients | tr -d "\r")
echo "Clients: $CLIENTS"
# Ops per second
OPS=$($REDIS_CLI INFO stats | grep instantaneous_ops_per_sec | tr -d "\r")
echo "Throughput: $OPS"
# Slow queries
SLOW=$($REDIS_CLI SLOWLOG LEN)
echo "Slow queries: $SLOW"
# Evicted keys
EVICTED=$($REDIS_CLI INFO stats | grep evicted_keys | tr -d "\r")
echo "Evicted: $EVICTED"
echo ""
echo "=== End Report ==="总结
Redis 是现代应用架构中不可或缺的组件。从简单的缓存到复杂的实时数据处理,Redis 的丰富数据结构和高性能使其成为开发者的首选。以下是选择正确 Redis 模式的快速参考:
| 场景 | 推荐方案 | 关键命令 |
|---|---|---|
| 缓存 | Cache-aside + TTL + LFU 淘汰 | SET EX, GET, DEL |
| 排行榜 | 有序集合 | ZADD, ZREVRANGE, ZINCRBY |
| 限流 | 滑动窗口(有序集合)或令牌桶(Lua) | ZADD, ZRANGEBYSCORE, EVALSHA |
| 会话管理 | 哈希 + TTL 或 connect-redis | HSET, HGETALL, EXPIRE |
| 消息队列 | Streams + 消费者组 | XADD, XREADGROUP, XACK |
| 实时通知 | Pub/Sub | PUBLISH, SUBSCRIBE, PSUBSCRIBE |
| 全文搜索 | RediSearch + RedisJSON | FT.CREATE, FT.SEARCH, JSON.SET |
| 分布式锁 | SET NX EX + Lua 续期 | SET NX EX, EVALSHA |
无论你是将 Redis 用作缓存层、消息代理还是主数据库,本指南涵盖的模式和最佳实践将帮助你构建高性能、可靠和安全的 Redis 部署。记住始终使用连接池、管道化批量命令、配置适当的持久化策略,以及通过 ACL 和 TLS 保障安全。