DevToolBox免费
博客

Redis完整指南:缓存、发布订阅、流和生产模式

13 分钟阅读作者 DevToolBox
TL;DRRedis 是一个内存数据结构存储,可用作数据库、缓存、消息代理和流引擎。它支持字符串、列表、集合、有序集合、哈希和流。缓存场景使用基于 TTL 的过期策略配合 cache-aside 或 write-through 模式。Redis Cluster 提供水平扩展和自动故障转移。Node.js 推荐 ioredis,Python 推荐 redis-py。通过 ACL、TLS 和网络隔离保障安全。使用 INFO、SLOWLOG 和 Prometheus 导出器进行监控。
关键要点
  • Redis 支持 6 种核心数据结构:字符串、列表、集合、有序集合、哈希和流
  • 使用 cache-aside 模式配合 TTL 进行缓存,按业务选择合适的淘汰策略
  • Pub/Sub 适用于实时广播,Streams 适用于可靠的事件处理和消费者组
  • 使用 Lua 脚本实现原子性多步操作,避免竞态条件
  • Redis Cluster 提供自动分片和故障转移,Sentinel 提供独立 Redis 的高可用
  • 通过管道(pipelining)批处理命令可提升 10-100 倍吞吐量
  • 使用 ACL、TLS、网络隔离保障生产安全,禁止直接暴露到公网
  • 使用 INFO、SLOWLOG 和 Prometheus Redis Exporter 进行监控和告警

1. Redis 数据结构

Redis 不仅仅是一个键值存储。它是一个数据结构服务器,支持多种丰富的数据类型,每种类型都有一组专用命令。理解这些数据结构是有效使用 Redis 的基础。

字符串 (Strings)

字符串是 Redis 最基础的数据类型。它可以存储文本、整数或二进制数据(最大 512MB)。字符串支持原子性的 INCR/DECR 操作,非常适合计数器和分布式锁。

# String operations
SET user:1001:name "Alice"
GET user:1001:name                    # "Alice"

# Atomic increment/decrement
SET page:views 0
INCR page:views                       # 1
INCRBY page:views 10                  # 11

# Set with TTL (seconds)
SET session:abc123 "user_data" EX 3600
TTL session:abc123                    # 3600

# Set only if not exists (distributed lock)
SET lock:order:5001 "worker-1" NX EX 30

# Multiple operations
MSET user:1:name "Alice" user:1:email "alice@example.com"
MGET user:1:name user:1:email

列表 (Lists)

列表是有序的字符串集合,底层使用快速列表(quicklist)实现。支持两端的推入和弹出操作,非常适合消息队列、最近活动记录和时间线。

# List operations — task queue
LPUSH queue:emails "email_1" "email_2" "email_3"
RPOP queue:emails                     # "email_1" (FIFO)
LLEN queue:emails                     # 2

# Blocking pop (wait up to 30s)
BRPOP queue:emails 30

# Recent activity feed (keep last 100)
LPUSH feed:user:1001 "liked post #42"
LTRIM feed:user:1001 0 99
LRANGE feed:user:1001 0 9             # Last 10 items

集合 (Sets)

集合是无序的唯一字符串集合。支持交集、并集和差集运算,适合标签系统、唯一访客计数和好友关系。

# Set operations — tagging
SADD article:1001:tags "redis" "database" "nosql"
SADD article:1002:tags "redis" "caching" "performance"

# Intersection — articles sharing tags
SINTER article:1001:tags article:1002:tags  # ["redis"]

# Union — all tags
SUNION article:1001:tags article:1002:tags
# ["redis", "database", "nosql", "caching", "performance"]

# Membership check
SISMEMBER article:1001:tags "redis"   # 1 (true)
SCARD article:1001:tags               # 3 (count)

有序集合 (Sorted Sets)

有序集合类似于集合,但每个元素都关联一个分数(score),元素按分数排序。非常适合排行榜、优先级队列和时间序列索引。

# Sorted set — game leaderboard
ZADD leaderboard 1500 "player:alice"
ZADD leaderboard 2200 "player:bob"
ZADD leaderboard 1800 "player:charlie"
ZADD leaderboard 3100 "player:diana"

# Top 3 players (highest scores)
ZREVRANGE leaderboard 0 2 WITHSCORES
# ["player:diana", "3100", "player:bob", "2200", "player:charlie", "1800"]

# Rank of a player (0-indexed, descending)
ZREVRANK leaderboard "player:bob"     # 1

# Increment score
ZINCRBY leaderboard 500 "player:alice"  # 2000

# Range by score
ZRANGEBYSCORE leaderboard 1500 2500 WITHSCORES

哈希 (Hashes)

哈希是字段-值对的集合,类似于对象或字典。适合存储对象数据(用户档案、配置项等),比将 JSON 序列化为字符串更高效。

# Hash — user profile
HSET user:1001 name "Alice" email "alice@example.com" age 28 role "admin"
HGET user:1001 name                   # "Alice"
HGETALL user:1001
# {name: "Alice", email: "alice@example.com", age: "28", role: "admin"}

# Update specific fields
HSET user:1001 age 29 last_login "2026-02-28"

# Increment numeric field
HINCRBY user:1001 login_count 1

# Check field existence
HEXISTS user:1001 email               # 1 (true)
HDEL user:1001 role

流 (Streams)

Redis Streams 是 5.0 引入的追加日志数据结构,支持消费者组、消息确认和持久化。类似于 Apache Kafka 的轻量级替代品。

# Stream — event log
XADD events * type "order" user_id "1001" amount "59.99"
XADD events * type "payment" user_id "1001" status "completed"

# Read last 10 events
XREVRANGE events + - COUNT 10

# Create consumer group
XGROUP CREATE events order-processors $ MKSTREAM

# Consumer reads (blocks up to 5000ms)
XREADGROUP GROUP order-processors worker-1 COUNT 1 BLOCK 5000 STREAMS events >

# Acknowledge processed message
XACK events order-processors 1677000000000-0

# Check pending messages
XPENDING events order-processors - + 10

数据结构对比

类型最佳用途时间复杂度最大大小
String缓存、计数器、分布式锁O(1)512 MB
List队列、活动流、最近项O(1) push/pop4B+ 元素
Set标签、唯一值、关系O(1) add/check4B+ 成员
Sorted Set排行榜、范围查询、优先级队列O(log N)4B+ 成员
Hash对象存储、用户档案、配置O(1) per field4B+ 字段
Stream事件日志、消息队列、审计O(1) append受内存限制

2. Redis 缓存策略

缓存是 Redis 最常见的用途之一。正确的缓存策略可以将数据库负载降低 80% 以上,并将响应时间从数百毫秒降至亚毫秒级。

TTL 策略与淘汰策略

# TTL strategies
SET product:1001 '{"name":"Widget","price":29.99}' EX 3600    # 1 hour
SET user:session:abc '{"uid":1001}' PX 1800000               # 30 min (ms)

# Check remaining TTL
TTL product:1001                      # seconds remaining
PTTL user:session:abc                 # milliseconds remaining

# Refresh TTL on access (sliding expiration)
GET product:1001
EXPIRE product:1001 3600

# Set TTL only if key exists
EXPIRE product:9999 3600              # 0 (key does not exist)

# Remove TTL (make persistent)
PERSIST product:1001

# --- Eviction policies (redis.conf) ---
# maxmemory 4gb
# maxmemory-policy allkeys-lru        # Best for general caching
# maxmemory-policy allkeys-lfu        # Best for skewed workloads
# maxmemory-policy volatile-lru       # Only evict keys with TTL
# maxmemory-policy volatile-ttl       # Evict soonest-expiring first
# maxmemory-policy noeviction         # Return errors when full

Cache-Aside vs Write-Through 模式

Cache-Aside(旁路缓存)是最常用的模式:应用先查缓存,未命中则查数据库并写入缓存。Write-Through 模式在每次写入数据库时同步更新缓存,保证一致性但增加写延迟。

// Cache-Aside pattern (Node.js with ioredis)
async function getUser(userId: string) {
  const cacheKey = `user:${userId}`;

  // 1. Check cache
  const cached = await redis.get(cacheKey);
  if (cached) {
    return JSON.parse(cached);  // Cache hit
  }

  // 2. Cache miss — fetch from DB
  const user = await db.query("SELECT * FROM users WHERE id = $1", [userId]);

  // 3. Populate cache with TTL
  await redis.set(cacheKey, JSON.stringify(user), "EX", 3600);

  return user;
}

// Write-Through pattern
async function updateUser(userId: string, data: Partial<User>) {
  // 1. Update database
  const user = await db.query(
    "UPDATE users SET name=$1, email=$2 WHERE id=$3 RETURNING *",
    [data.name, data.email, userId]
  );

  // 2. Update cache synchronously
  const cacheKey = `user:${userId}`;
  await redis.set(cacheKey, JSON.stringify(user), "EX", 3600);

  return user;
}

// Cache invalidation on delete
async function deleteUser(userId: string) {
  await db.query("DELETE FROM users WHERE id = $1", [userId]);
  await redis.del(`user:${userId}`);
}

3. Redis Pub/Sub 与 Streams

Redis 提供两种消息传递机制:Pub/Sub 用于实时即发即忘广播,Streams 用于持久化的可靠消息处理。

Pub/Sub 实时消息

// Publisher (Node.js)
import Redis from "ioredis";
const publisher = new Redis();

async function publishEvent(channel: string, event: object) {
  await publisher.publish(channel, JSON.stringify(event));
}

// Publish order events
await publishEvent("orders", {
  type: "order.created",
  orderId: "ORD-5001",
  userId: "1001",
  total: 99.99,
  timestamp: Date.now(),
});

// Subscriber
const subscriber = new Redis();

subscriber.subscribe("orders", "payments", (err, count) => {
  console.log(`Subscribed to ${count} channels`);
});

subscriber.on("message", (channel, message) => {
  const event = JSON.parse(message);
  console.log(`[${channel}] ${event.type}:`, event);
});

// Pattern subscription (wildcard)
subscriber.psubscribe("orders.*", (err, count) => {
  console.log(`Pattern subscribed to ${count} patterns`);
});

subscriber.on("pmessage", (pattern, channel, message) => {
  console.log(`[${pattern}] ${channel}:`, message);
});

Streams 与消费者组

// Redis Streams with consumer groups (Node.js)
import Redis from "ioredis";
const redis = new Redis();

// Producer: add events to stream
async function addOrderEvent(order: { id: string; userId: string; total: number }) {
  const id = await redis.xadd(
    "stream:orders",
    "*",                          // Auto-generate ID
    "orderId", order.id,
    "userId", order.userId,
    "total", String(order.total),
    "timestamp", String(Date.now())
  );
  return id;
}

// Create consumer group (run once)
await redis.xgroup("CREATE", "stream:orders", "order-service", "$", "MKSTREAM")
  .catch(() => {}); // Ignore if group already exists

// Consumer: read and process
async function consumeOrders(consumerName: string) {
  while (true) {
    const results = await redis.xreadgroup(
      "GROUP", "order-service", consumerName,
      "COUNT", "10",
      "BLOCK", "5000",            // Block 5s if no messages
      "STREAMS", "stream:orders", ">"
    );

    if (results) {
      for (const [stream, messages] of results) {
        for (const [id, fields] of messages) {
          // Process the order
          console.log(`Processing order ${fields[1]} for user ${fields[3]}`);

          // Acknowledge after successful processing
          await redis.xack("stream:orders", "order-service", id);
        }
      }
    }
  }
}

// Start consumers
consumeOrders("worker-1");
consumeOrders("worker-2");

4. Redis 事务与 Lua 脚本

Redis 事务(MULTI/EXEC)保证一组命令的原子执行。但对于需要条件逻辑的场景,Lua 脚本是更强大的选择——脚本在服务端原子执行,无竞态条件。

MULTI/EXEC 事务

# Basic transaction
MULTI
SET account:1001:balance 500
SET account:1002:balance 300
EXEC
# Both commands execute atomically

# Optimistic locking with WATCH
WATCH account:1001:balance
# Read current balance
GET account:1001:balance              # "500"
MULTI
DECRBY account:1001:balance 100
INCRBY account:1002:balance 100
EXEC
# EXEC returns nil if account:1001:balance changed between WATCH and EXEC
# Application must retry in that case

Lua 脚本(原子操作)

-- Lua: atomic transfer between accounts
-- KEYS[1] = source account, KEYS[2] = destination account
-- ARGV[1] = transfer amount

local source_balance = tonumber(redis.call("GET", KEYS[1]))
local amount = tonumber(ARGV[1])

if source_balance >= amount then
  redis.call("DECRBY", KEYS[1], amount)
  redis.call("INCRBY", KEYS[2], amount)
  return 1  -- success
else
  return 0  -- insufficient funds
end
// Execute Lua script from Node.js (ioredis)
const transferScript = `
local source_balance = tonumber(redis.call("GET", KEYS[1]))
local amount = tonumber(ARGV[1])
if source_balance >= amount then
  redis.call("DECRBY", KEYS[1], amount)
  redis.call("INCRBY", KEYS[2], amount)
  return 1
else
  return 0
end
`;

// Define custom command
redis.defineCommand("transfer", {
  numberOfKeys: 2,
  lua: transferScript,
});

// Use it
const result = await (redis as any).transfer(
  "account:1001:balance",
  "account:1002:balance",
  "100"
);
console.log(result === 1 ? "Transfer successful" : "Insufficient funds");

5. Redis Cluster 与 Sentinel

生产环境的 Redis 需要高可用性和可扩展性。Redis Sentinel 提供自动故障转移,Redis Cluster 提供数据分片和分布式处理。

Redis Sentinel 配置

# sentinel.conf
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1
sentinel auth-pass mymaster your_strong_password

# Start sentinel
redis-sentinel /etc/redis/sentinel.conf

# Check sentinel status
redis-cli -p 26379 SENTINEL masters
redis-cli -p 26379 SENTINEL replicas mymaster
redis-cli -p 26379 SENTINEL get-master-addr-by-name mymaster

Redis Cluster 部署

# Create a 6-node cluster (3 masters + 3 replicas)
# Start 6 Redis instances on ports 7000-7005
for port in 7000 7001 7002 7003 7004 7005; do
  mkdir -p /etc/redis/cluster/$port
  cat > /etc/redis/cluster/$port/redis.conf << EOF
port $port
cluster-enabled yes
cluster-config-file nodes-$port.conf
cluster-node-timeout 5000
appendonly yes
appendfilename "appendonly-$port.aof"
requirepass your_password
masterauth your_password
EOF
  redis-server /etc/redis/cluster/$port/redis.conf &
done

# Create the cluster
redis-cli --cluster create \
  127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
  127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
  --cluster-replicas 1 -a your_password

# Check cluster info
redis-cli -p 7000 -a your_password CLUSTER INFO
redis-cli -p 7000 -a your_password CLUSTER NODES

# Add a new node
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000 -a your_password

# Reshard slots to the new node
redis-cli --cluster reshard 127.0.0.1:7000 -a your_password

连接 Redis Cluster(Node.js)

import Redis from "ioredis";

// Connect to Redis Cluster
const cluster = new Redis.Cluster(
  [
    { host: "127.0.0.1", port: 7000 },
    { host: "127.0.0.1", port: 7001 },
    { host: "127.0.0.1", port: 7002 },
  ],
  {
    redisOptions: {
      password: "your_password",
    },
    scaleReads: "slave",              // Read from replicas
    natMap: {},                        // NAT mapping if needed
  }
);

// Connect to Sentinel
const sentinel = new Redis({
  sentinels: [
    { host: "127.0.0.1", port: 26379 },
    { host: "127.0.0.1", port: 26380 },
    { host: "127.0.0.1", port: 26381 },
  ],
  name: "mymaster",
  password: "your_password",
  sentinelPassword: "sentinel_password",
});

6. Redis 与 Node.js

ioredis 是 Node.js 最推荐的 Redis 客户端,支持集群、Sentinel、管道、Lua 脚本和流。它提供基于 Promise 的 API 和自动重连机制。

连接与管道

import Redis from "ioredis";

// Basic connection with options
const redis = new Redis({
  host: "127.0.0.1",
  port: 6379,
  password: "your_password",
  db: 0,
  maxRetriesPerRequest: 3,
  retryStrategy(times) {
    const delay = Math.min(times * 50, 2000);
    return delay;
  },
  lazyConnect: true,                  // Connect on first command
});

await redis.connect();

// --- Pipelining: batch commands (10-100x throughput) ---
const pipeline = redis.pipeline();
for (let i = 0; i < 1000; i++) {
  pipeline.set(`key:${i}`, `value:${i}`, "EX", 3600);
}
const results = await pipeline.exec();
// results: [[null, "OK"], [null, "OK"], ...]

// Pipeline with mixed read/write
const pipe = redis.pipeline();
pipe.hgetall("user:1001");
pipe.lrange("feed:1001", 0, 9);
pipe.zrevrange("leaderboard", 0, 4, "WITHSCORES");
pipe.get("config:feature_flags");
const [user, feed, topPlayers, flags] = await pipe.exec();

// --- Connection pool pattern ---
class RedisPool {
  private pool: Redis[] = [];
  private index = 0;

  constructor(private size: number, private options: object) {
    for (let i = 0; i < size; i++) {
      this.pool.push(new Redis(options));
    }
  }

  getClient(): Redis {
    const client = this.pool[this.index % this.size];
    this.index++;
    return client;
  }

  async disconnectAll(): Promise<void> {
    await Promise.all(this.pool.map((c) => c.quit()));
  }
}

7. Redis 与 Python

redis-py 是 Python 的标准 Redis 客户端。从 4.2 版本开始内置 async 支持。支持连接池、管道、Pub/Sub 和集群。

import redis
import json
from datetime import timedelta

# Connection pool (recommended for production)
pool = redis.ConnectionPool(
    host="127.0.0.1",
    port=6379,
    password="your_password",
    db=0,
    max_connections=20,
    decode_responses=True,           # Auto-decode bytes to str
)
r = redis.Redis(connection_pool=pool)

# Basic operations
r.set("user:1001", json.dumps({"name": "Alice", "email": "alice@example.com"}))
r.expire("user:1001", timedelta(hours=1))
user = json.loads(r.get("user:1001"))

# Pipeline (batch commands)
with r.pipeline() as pipe:
    pipe.hset("product:1", mapping={"name": "Widget", "price": "29.99", "stock": "150"})
    pipe.hset("product:2", mapping={"name": "Gadget", "price": "49.99", "stock": "75"})
    pipe.expire("product:1", 3600)
    pipe.expire("product:2", 3600)
    results = pipe.execute()

# Pub/Sub
pubsub = r.pubsub()
pubsub.subscribe("notifications")

for message in pubsub.listen():
    if message["type"] == "message":
        data = json.loads(message["data"])
        print(f"Received: {data}")

# --- Async redis (built-in since 4.2) ---
import redis.asyncio as aioredis

async def async_example():
    r = aioredis.Redis(
        host="127.0.0.1",
        port=6379,
        password="your_password",
        decode_responses=True,
    )

    await r.set("async_key", "async_value", ex=3600)
    value = await r.get("async_key")
    print(f"Async value: {value}")

    # Async pipeline
    async with r.pipeline() as pipe:
        await pipe.set("k1", "v1").set("k2", "v2").execute()

    await r.aclose()

8. Redis 限流

Redis 的原子操作和过期机制使其成为实现分布式限流的理想选择。以下是三种常见的限流算法实现。

滑动窗口限流

// Sliding Window with Sorted Set (Node.js)
async function slidingWindowRateLimit(
  redis: Redis,
  key: string,
  limit: number,
  windowSec: number
): Promise<{ allowed: boolean; remaining: number; retryAfter: number }> {
  const now = Date.now();
  const windowStart = now - windowSec * 1000;

  const pipe = redis.pipeline();
  pipe.zremrangebyscore(key, 0, windowStart);   // Remove expired
  pipe.zadd(key, String(now), `${now}:` + Math.random());
  pipe.zcard(key);                               // Count in window
  pipe.expire(key, windowSec);                   // Auto-cleanup

  const results = await pipe.exec();
  const count = results![2][1] as number;

  if (count > limit) {
    // Get oldest entry to calculate retry-after
    const oldest = await redis.zrange(key, 0, 0, "WITHSCORES");
    const retryAfter = oldest.length > 1
      ? Math.ceil((Number(oldest[1]) + windowSec * 1000 - now) / 1000)
      : windowSec;

    return { allowed: false, remaining: 0, retryAfter };
  }

  return { allowed: true, remaining: limit - count, retryAfter: 0 };
}

// Usage: 100 requests per 60 seconds
const result = await slidingWindowRateLimit(redis, "rate:user:1001", 100, 60);
if (!result.allowed) {
  res.status(429).json({
    error: "Rate limit exceeded",
    retryAfter: result.retryAfter,
  });
}

令牌桶算法(Lua 脚本)

-- Token Bucket Lua Script
-- KEYS[1] = bucket key
-- ARGV[1] = max tokens, ARGV[2] = refill rate (tokens/sec)
-- ARGV[3] = current timestamp (ms), ARGV[4] = tokens to consume

local key = KEYS[1]
local max_tokens = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- Get current state
local data = redis.call("HMGET", key, "tokens", "last_refill")
local tokens = tonumber(data[1]) or max_tokens
local last_refill = tonumber(data[2]) or now

-- Calculate refill
local elapsed = (now - last_refill) / 1000
local new_tokens = math.min(max_tokens, tokens + elapsed * refill_rate)

-- Check if enough tokens
if new_tokens >= requested then
  new_tokens = new_tokens - requested
  redis.call("HMSET", key, "tokens", new_tokens, "last_refill", now)
  redis.call("EXPIRE", key, math.ceil(max_tokens / refill_rate) * 2)
  return {1, math.floor(new_tokens)}  -- allowed, remaining
else
  redis.call("HMSET", key, "tokens", new_tokens, "last_refill", now)
  redis.call("EXPIRE", key, math.ceil(max_tokens / refill_rate) * 2)
  local wait = math.ceil((requested - new_tokens) / refill_rate)
  return {0, wait}  -- denied, retry_after_seconds
end

漏桶算法

# Leaky Bucket with Redis (Python)
import time
import redis

class LeakyBucket:
    def __init__(self, r: redis.Redis, key: str, capacity: int, leak_rate: float):
        """
        capacity: max requests in the bucket
        leak_rate: requests processed per second
        """
        self.r = r
        self.key = key
        self.capacity = capacity
        self.leak_rate = leak_rate

    def allow(self) -> bool:
        now = time.time()
        pipe = self.r.pipeline()

        # Remove leaked (processed) requests
        pipe.zremrangebyscore(self.key, 0, now - self.capacity / self.leak_rate)
        pipe.zcard(self.key)  # Current queue size
        _, queue_size = pipe.execute()

        if queue_size < self.capacity:
            # Add request to queue
            self.r.zadd(self.key, {f"{now}:{id(self)}": now})
            self.r.expire(self.key, int(self.capacity / self.leak_rate) + 10)
            return True

        return False  # Bucket is full

# Usage: 10 requests max, processes 2/sec
bucket = LeakyBucket(r, "leaky:api:user:1001", capacity=10, leak_rate=2.0)
if bucket.allow():
    print("Request accepted")
else:
    print("Rate limited — bucket full")

9. 会话管理

Redis 是分布式会话存储的理想选择。它支持快速读写、自动过期和跨服务器的会话共享。

分布式会话(Express.js)

import express from "express";
import session from "express-session";
import RedisStore from "connect-redis";
import Redis from "ioredis";

const redis = new Redis({
  host: "127.0.0.1",
  port: 6379,
  password: "your_password",
});

const app = express();

app.use(
  session({
    store: new RedisStore({ client: redis, prefix: "sess:" }),
    secret: "your-session-secret",
    resave: false,
    saveUninitialized: false,
    cookie: {
      secure: true,                   // HTTPS only
      httpOnly: true,                 // No JS access
      maxAge: 24 * 60 * 60 * 1000,   // 24 hours
      sameSite: "lax",
    },
  })
);

// Session is automatically stored in Redis
app.post("/login", async (req, res) => {
  const { username, password } = req.body;
  const user = await authenticateUser(username, password);

  if (user) {
    req.session.userId = user.id;
    req.session.role = user.role;
    res.json({ message: "Logged in" });
  } else {
    res.status(401).json({ error: "Invalid credentials" });
  }
});

// Logout — destroy session in Redis
app.post("/logout", (req, res) => {
  req.session.destroy((err) => {
    if (err) return res.status(500).json({ error: "Logout failed" });
    res.clearCookie("connect.sid");
    res.json({ message: "Logged out" });
  });
});

JWT + Redis(令牌黑名单)

import jwt from "jsonwebtoken";
import Redis from "ioredis";

const redis = new Redis();
const JWT_SECRET = process.env.JWT_SECRET!;

// Issue JWT
function issueToken(userId: string, role: string): string {
  return jwt.sign(
    { sub: userId, role },
    JWT_SECRET,
    { expiresIn: "1h", jti: crypto.randomUUID() }
  );
}

// Revoke JWT by adding to blacklist
async function revokeToken(token: string): Promise<void> {
  const decoded = jwt.decode(token) as jwt.JwtPayload;
  if (!decoded?.jti || !decoded?.exp) return;

  const ttl = decoded.exp - Math.floor(Date.now() / 1000);
  if (ttl > 0) {
    await redis.set(`blacklist:${decoded.jti}`, "1", "EX", ttl);
  }
}

// Verify JWT (check blacklist)
async function verifyToken(token: string): Promise<jwt.JwtPayload | null> {
  try {
    const decoded = jwt.verify(token, JWT_SECRET) as jwt.JwtPayload;

    // Check if token is blacklisted
    const isBlacklisted = await redis.exists(`blacklist:${decoded.jti}`);
    if (isBlacklisted) return null;

    return decoded;
  } catch {
    return null;
  }
}

// Middleware
async function authMiddleware(req: any, res: any, next: any) {
  const token = req.headers.authorization?.replace("Bearer ", "");
  if (!token) return res.status(401).json({ error: "No token" });

  const payload = await verifyToken(token);
  if (!payload) return res.status(401).json({ error: "Invalid token" });

  req.user = payload;
  next();
}

10. RediSearch 与 RedisJSON

RediSearch 模块为 Redis 添加了全文搜索、二级索引和聚合功能。RedisJSON 允许在 Redis 中原生存储和操作 JSON 文档。两者结合可以构建强大的文档搜索系统。

# --- RedisJSON ---
# Store JSON document
JSON.SET product:1001 $ '{"name":"Wireless Mouse","brand":"TechCo","price":29.99,"category":"electronics","tags":["wireless","ergonomic","bluetooth"],"specs":{"dpi":1600,"battery":"AA","weight":"85g"}}'

# Read nested fields
JSON.GET product:1001 $.name             # '"Wireless Mouse"'
JSON.GET product:1001 $.specs.dpi        # '1600'
JSON.GET product:1001 $.tags[0]          # '"wireless"'

# Update nested fields
JSON.SET product:1001 $.price 24.99
JSON.NUMINCRBY product:1001 $.specs.dpi 400  # 2000
JSON.ARRAPPEND product:1001 $.tags '"usb-c"'

# --- RediSearch ---
# Create search index on JSON documents
FT.CREATE idx:products ON JSON PREFIX 1 product: SCHEMA
  $.name AS name TEXT WEIGHT 5.0
  $.brand AS brand TEXT
  $.category AS category TAG
  $.price AS price NUMERIC SORTABLE
  $.tags[*] AS tags TAG

# Full-text search
FT.SEARCH idx:products "wireless mouse" LIMIT 0 10

# Filter by category and price range
FT.SEARCH idx:products "@category:{electronics} @price:[10 50]" SORTBY price ASC

# Autocomplete suggestions
FT.SUGADD autocomplete "Wireless Mouse" 100
FT.SUGADD autocomplete "Wireless Keyboard" 90
FT.SUGGET autocomplete "wire" FUZZY MAX 5

# Aggregation — average price per category
FT.AGGREGATE idx:products "*"
  GROUPBY 1 @category
  REDUCE AVG 1 @price AS avg_price
  REDUCE COUNT 0 AS total
  SORTBY 2 @avg_price DESC

在 Node.js 中使用 RediSearch

import { createClient, SchemaFieldTypes } from "redis";

const client = createClient({ url: "redis://localhost:6379" });
await client.connect();

// Create index
try {
  await client.ft.create("idx:products", {
    "$.name": { type: SchemaFieldTypes.TEXT, AS: "name", WEIGHT: 5 },
    "$.brand": { type: SchemaFieldTypes.TEXT, AS: "brand" },
    "$.category": { type: SchemaFieldTypes.TAG, AS: "category" },
    "$.price": { type: SchemaFieldTypes.NUMERIC, AS: "price", SORTABLE: true },
  }, { ON: "JSON", PREFIX: "product:" });
} catch (e) {
  // Index already exists
}

// Store products as JSON
await client.json.set("product:2001", "$", {
  name: "Mechanical Keyboard",
  brand: "KeyTech",
  category: "electronics",
  price: 89.99,
  tags: ["mechanical", "rgb", "cherry-mx"],
});

// Search
const results = await client.ft.search("idx:products", "@category:{electronics} @price:[50 100]", {
  SORTBY: { BY: "price", DIRECTION: "ASC" },
  LIMIT: { from: 0, size: 10 },
});

console.log(`Found ${results.total} products:`);
for (const doc of results.documents) {
  console.log(doc.id, doc.value);
}

11. Redis 性能调优

优化 Redis 性能需要从内存管理、持久化配置和命令优化三个方面入手。以下是生产环境中的关键调优参数。

内存优化

# redis.conf — Memory optimization

# Set memory limit
maxmemory 4gb
maxmemory-policy allkeys-lfu

# Ziplist encoding for small hashes (saves 5-10x memory)
hash-max-ziplist-entries 128
hash-max-ziplist-value 64

# Ziplist encoding for small sorted sets
zset-max-ziplist-entries 128
zset-max-ziplist-value 64

# Listpack for small lists
list-max-ziplist-size -2      # 8kb per node
list-compress-depth 1         # Compress all but head/tail

# Intset for small integer sets
set-max-intset-entries 512

# Lazy freeing (non-blocking deletes)
lazyfree-lazy-eviction yes
lazyfree-lazy-expire yes
lazyfree-lazy-server-del yes
replica-lazy-flush yes

持久化:RDB vs AOF

特性RDBAOFRDB + AOF
机制定期快照追加写操作日志两者结合
数据丢失风险取决于快照间隔最多丢失 1 秒最多丢失 1 秒
恢复速度慢(重放日志)快(先加载 RDB)
写性能影响fork 时短暂影响持续少量开销两者都有
建议仅缓存场景需要持久化生产环境推荐
# redis.conf — Persistence configuration

# RDB snapshots
save 900 1          # Snapshot if 1+ keys changed in 900s
save 300 10         # Snapshot if 10+ keys changed in 300s
save 60 10000       # Snapshot if 10000+ keys changed in 60s
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir /var/lib/redis

# AOF (recommended for production)
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec          # Fsync every second (best tradeoff)
# appendfsync always          # Fsync after every write (safest, slowest)
# appendfsync no              # Let OS decide (fastest, riskiest)

# AOF rewrite (compact the log)
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-use-rdb-preamble yes      # Hybrid format (fast load + AOF durability)

性能基准与诊断

# Built-in benchmark tool
redis-benchmark -h 127.0.0.1 -p 6379 -c 50 -n 100000 -q

# Benchmark specific commands
redis-benchmark -t set,get,lpush,lpop,zadd -n 100000 -q

# Benchmark with pipeline
redis-benchmark -t set -n 1000000 -P 16 -q

# Latency diagnostics
redis-cli --latency                    # Continuous latency sampling
redis-cli --latency-history -i 5       # Latency over time (5s intervals)
redis-cli --latency-dist               # Latency histogram
redis-cli --intrinsic-latency 10       # System baseline latency (10s)

# Memory analysis
redis-cli INFO memory
redis-cli MEMORY DOCTOR
redis-cli MEMORY USAGE user:1001       # Bytes used by specific key

# Find big keys (scan without blocking)
redis-cli --bigkeys
redis-cli --memkeys

12. Redis 安全

Redis 默认设计为在可信网络中运行。生产环境必须配置认证、网络隔离和访问控制来保护数据安全。

ACL(访问控制列表)

# redis.conf — ACL configuration

# Default user (disable or set strong password)
requirepass your_very_strong_password_here

# Create users with specific permissions
# user <username> on|off [password] [commands] [keys]

# Read-only user for analytics
user analytics on >analytics_pass ~analytics:* +get +mget +hgetall +zrange +lrange -@all

# Application user with limited commands
user webapp on >webapp_secret_pass ~user:* ~session:* ~cache:* +@read +@write +@set +@hash -@admin -@dangerous

# Admin user (all permissions)
user admin on >admin_super_secret ~* +@all

# Disable default user (after creating named users)
user default off

# --- Runtime ACL management ---
ACL SETUSER readonly on >readonly_pass ~cache:* +get +mget
ACL LIST                                # List all users
ACL WHOAMI                              # Current user
ACL GETUSER webapp                      # Get user details
ACL DELUSER temp_user                   # Delete user
ACL LOG 10                              # Last 10 ACL violations

TLS 加密与网络安全

# redis.conf — TLS configuration

# Enable TLS
tls-port 6380
port 0                                  # Disable non-TLS port

tls-cert-file /etc/redis/tls/redis.crt
tls-key-file /etc/redis/tls/redis.key
tls-ca-cert-file /etc/redis/tls/ca.crt

# Require client certificate (mutual TLS)
tls-auth-clients yes

# TLS for replication
tls-replication yes

# TLS for cluster bus
tls-cluster yes

# --- Network security ---

# Bind to specific interfaces only
bind 127.0.0.1 10.0.1.5              # Localhost + private network

# Enable protected mode
protected-mode yes

# Disable dangerous commands
rename-command FLUSHALL ""
rename-command FLUSHDB ""
rename-command DEBUG ""
rename-command CONFIG "REDIS_CONFIG_8fj3k"  # Rename to obscure name

# --- Firewall rules (iptables) ---
# Allow only app servers
# iptables -A INPUT -p tcp --dport 6379 -s 10.0.1.0/24 -j ACCEPT
# iptables -A INPUT -p tcp --dport 6379 -j DROP

TLS 连接(Node.js)

import Redis from "ioredis";
import fs from "fs";

const redis = new Redis({
  host: "redis.example.com",
  port: 6380,
  username: "webapp",
  password: "webapp_secret_pass",
  tls: {
    ca: fs.readFileSync("/path/to/ca.crt"),
    cert: fs.readFileSync("/path/to/client.crt"),
    key: fs.readFileSync("/path/to/client.key"),
    rejectUnauthorized: true,
  },
});

13. Redis 监控与可观测性

监控是维护健康 Redis 部署的关键。Redis 提供内置的 INFO 命令和 SLOWLOG,结合 Prometheus + Grafana 可以实现全面的可观测性。

INFO 命令与关键指标

# Essential monitoring commands
redis-cli INFO server                   # Version, uptime, OS info
redis-cli INFO memory                   # Memory usage details
redis-cli INFO stats                    # Ops/sec, hits, misses
redis-cli INFO replication              # Master/replica status
redis-cli INFO clients                  # Connected clients
redis-cli INFO keyspace                 # DB key counts

# Key metrics to monitor:
# used_memory / used_memory_rss         — Memory usage
# connected_clients                     — Active connections
# instantaneous_ops_per_sec             — Current throughput
# keyspace_hits / keyspace_misses       — Cache hit ratio
# evicted_keys                          — Keys removed by eviction
# rejected_connections                  — Max clients reached
# rdb_last_bgsave_status                — Last snapshot status

# Calculate hit ratio
# hit_ratio = keyspace_hits / (keyspace_hits + keyspace_misses) * 100
# Target: > 95% for caching workloads

# SLOWLOG — queries exceeding threshold
CONFIG SET slowlog-log-slower-than 10000   # Log queries > 10ms
CONFIG SET slowlog-max-len 128
SLOWLOG GET 10                              # Last 10 slow queries
SLOWLOG LEN                                 # Total slow query count
SLOWLOG RESET                               # Clear the log

# Real-time command monitoring (use briefly, high overhead)
redis-cli MONITOR                           # Prints all commands

# Client list (debug connections)
redis-cli CLIENT LIST
redis-cli CLIENT KILL ID 123               # Kill specific client

Prometheus + Grafana 监控

# docker-compose.yml — Redis monitoring stack
version: "3.8"
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --requirepass your_password
    volumes:
      - redis_data:/data

  redis-exporter:
    image: oliver006/redis_exporter:latest
    ports:
      - "9121:9121"
    environment:
      REDIS_ADDR: redis://redis:6379
      REDIS_PASSWORD: your_password
    depends_on:
      - redis

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin

volumes:
  redis_data:
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "redis"
    static_configs:
      - targets: ["redis-exporter:9121"]
    metrics_path: /metrics

告警规则

# prometheus-alerts.yml
groups:
  - name: redis_alerts
    rules:
      - alert: RedisHighMemoryUsage
        expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Redis memory usage above 85%"
          description: "Redis instance {{ $labels.instance }} memory at {{ $value | humanizePercentage }}"

      - alert: RedisHighLatency
        expr: redis_commands_duration_seconds_total / redis_commands_processed_total > 0.01
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Redis average latency above 10ms"

      - alert: RedisLowHitRatio
        expr: |
          redis_keyspace_hits_total /
          (redis_keyspace_hits_total + redis_keyspace_misses_total) < 0.9
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Redis cache hit ratio below 90%"

      - alert: RedisReplicationBroken
        expr: redis_connected_slaves < 1
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Redis has no connected replicas"

      - alert: RedisTooManyConnections
        expr: redis_connected_clients > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Redis has over 1000 connected clients"

自定义健康检查脚本

#!/bin/bash
# redis-health-check.sh — Quick Redis health report

REDIS_CLI="redis-cli -a your_password --no-auth-warning"

echo "=== Redis Health Report ==="
echo ""

# Uptime
UPTIME=$($REDIS_CLI INFO server | grep uptime_in_days | tr -d "\r")
echo "Uptime: $UPTIME"

# Memory
USED_MEM=$($REDIS_CLI INFO memory | grep used_memory_human | tr -d "\r")
MAX_MEM=$($REDIS_CLI CONFIG GET maxmemory | tail -1)
echo "Memory: $USED_MEM (max: $MAX_MEM bytes)"

# Hit ratio
HITS=$($REDIS_CLI INFO stats | grep keyspace_hits | cut -d: -f2 | tr -d "\r")
MISSES=$($REDIS_CLI INFO stats | grep keyspace_misses | cut -d: -f2 | tr -d "\r")
if [ "$HITS" -gt 0 ] 2>/dev/null; then
  RATIO=$(echo "scale=2; $HITS * 100 / ($HITS + $MISSES)" | bc)
  echo "Hit Ratio: $RATIO%"
fi

# Connected clients
CLIENTS=$($REDIS_CLI INFO clients | grep connected_clients | tr -d "\r")
echo "Clients: $CLIENTS"

# Ops per second
OPS=$($REDIS_CLI INFO stats | grep instantaneous_ops_per_sec | tr -d "\r")
echo "Throughput: $OPS"

# Slow queries
SLOW=$($REDIS_CLI SLOWLOG LEN)
echo "Slow queries: $SLOW"

# Evicted keys
EVICTED=$($REDIS_CLI INFO stats | grep evicted_keys | tr -d "\r")
echo "Evicted: $EVICTED"

echo ""
echo "=== End Report ==="

总结

Redis 是现代应用架构中不可或缺的组件。从简单的缓存到复杂的实时数据处理,Redis 的丰富数据结构和高性能使其成为开发者的首选。以下是选择正确 Redis 模式的快速参考:

场景推荐方案关键命令
缓存Cache-aside + TTL + LFU 淘汰SET EX, GET, DEL
排行榜有序集合ZADD, ZREVRANGE, ZINCRBY
限流滑动窗口(有序集合)或令牌桶(Lua)ZADD, ZRANGEBYSCORE, EVALSHA
会话管理哈希 + TTL 或 connect-redisHSET, HGETALL, EXPIRE
消息队列Streams + 消费者组XADD, XREADGROUP, XACK
实时通知Pub/SubPUBLISH, SUBSCRIBE, PSUBSCRIBE
全文搜索RediSearch + RedisJSONFT.CREATE, FT.SEARCH, JSON.SET
分布式锁SET NX EX + Lua 续期SET NX EX, EVALSHA

无论你是将 Redis 用作缓存层、消息代理还是主数据库,本指南涵盖的模式和最佳实践将帮助你构建高性能、可靠和安全的 Redis 部署。记住始终使用连接池、管道化批量命令、配置适当的持久化策略,以及通过 ACL 和 TLS 保障安全。

𝕏 Twitterin LinkedIn
这篇文章有帮助吗?

保持更新

获取每周开发技巧和新工具通知。

无垃圾邮件,随时退订。

试试这些相关工具

{ }JSON Formatter#Hash GeneratorIDUUID Generator

相关文章

Docker命令:从基础到生产的完整指南

掌握Docker的完整命令指南。含docker run/build/push、Dockerfile、多阶段构建、卷、网络、Docker Compose、安全、注册表和生产部署模式。

WebSocket完整指南:使用ws和Socket.io实现实时通信

掌握WebSocket实时通信。含浏览器API、Node.js ws、Socket.io、React hooks、Python websockets、Go gorilla/websocket、认证、扩展和错误处理完整指南。

API测试:cURL、Supertest和k6完整指南

掌握API测试的完整指南。含HTTP方法、cURL、fetch/axios、Postman/Newman、supertest、Python httpx、Mock服务器、契约测试、k6负载测试和OpenAPI文档。