博客
关于我
RabbitMQ - 以 MQ 为例,手写一个 RPC 框架 demo
阅读量:795 次
发布时间:2023-03-01

本文共 8593 字,大约阅读时间需要 28 分钟。

基于自定义协议的 RabbitMQ 模拟实现

前言

本文是笔者之前撰写的一个系列文章——“基于源码,模拟实现 RabbitMQ” 中的一部分。该系列文章旨在通过实际操作和实践,帮助开发者理解 RabbitMQ 的工作原理及其相关协议实现。如需了解更多内容,可参考笔者的完整系列文章。


一、自定义应用层协议

1.1 消息格式概述

自定义应用层协议主要定义了客户端与服务器之间消息的传输格式。该协议分为四个部分:typelengthpayload

  • type:使用整数类型,表示消息的类型,用于指示消息的具体用途,例如调用服务器端的哪个服务。
  • length:同样为整数类型,表示 payload 的长度。由于 TCP 是面向字节流的协议,数据的分割和拼接可能导致粘包问题,因此协议中加入了 length 字段,用于明确数据的边界。
  • payload:作为消息的数据载荷,承载调用VirtualHost中的具体服务所需的参数。由于 TCP 的特性,payload采用二进制数据序列化的方式进行传输。

1.2 数据序列化方案

由于 TCP 是面向字节流的传输协议,传统的文本格式(如 JSON)在传输过程中可能导致效率低下且可读性差。因此,本文采用二进制数据序列化的方式作为 payload 的编码方案。具体实现如下:

import java.io.Serializable;
class Request implements Serializable {
val type: Int
val length: Int
val payload: ByteArray
}
class Response implements Serializable {
val type: Int
val length: Int
val payload: ByteArray
}

1.3 请求与响应参数封装

为了实现客户端与服务器之间的通信,本文对请求和响应参数进行了封装。以下是基础请求和响应参数的定义:

open class ReqBaseArguments {
open val rid: String = ""
open val channelId: String = ""
}
open class RespBaseArguments {
open val rid: String
open val channelId: String
open val ok: Boolean = false
}

二、BrokerServer 实现

2.1 BrokerServer 的功能概述

BrokerServer 是一个中间服务,其主要功能是接收客户端的 TCP 连接请求,并代理转发到 VirtualHost 中的具体服务。其启动时会通过 accept() 方法阻塞等待客户端连接,成功建立连接后,为每个客户端分配一个线程处理请求。

2.2 BrokerServer 的工作流程

  • 接收客户端连接:BrokerServer 启动时通过 socket.accept() 等待客户端连接。
  • 处理客户端请求:客户端连接建立后,BrokerServer 为该连接分配一个线程,读取客户端的请求数据。
  • 解析请求数据:根据自定义协议,BrokerServer 解析 typelengthpayload,提取出 VirtualHost 中的具体调用参数。
  • 调用 VirtualHost 服务:根据请求类型(如创建交换机、删除队列等),调用 VirtualHost 的相应服务。
  • 生成响应数据:将 VirtualHost 处理后的响应数据封装成自定义协议格式,并通过 IO 流发送给客户端。
  • 2.3 BrokerServer 实现代码

    class BrokerServer(private val port: Int) {
    private val socket = ServerSocket(port)
    private val channelSession = ConcurrentHashMap
    ()
    fun start() {
    println("[BrokerServer] 启动!")
    while (true) {
    val client = socket.accept()
    clientPool.submit { clientProcess(client) }
    }
    }
    private fun clientProcess(client: Socket) {
    try {
    val inputStream = client.getInputStream()
    val outputStream = client.getOutputStream()
    DataInputStream(inputStream).use {
    DataOutputStream(outputStream).use {
    while (true) {
    val request = readRequest(it)
    val response = process(request, client)
    writeResponse(response, it)
    }
    }
    }
    } catch (e: EOFException) {
    println("[BrokerServer] 客户端正常下线!")
    } catch (e: Exception) {
    println("[BrokerServer] 客户端连接异常!")
    } finally {
    client.close()
    removeChannelSession(client)
    }
    }
    private fun process(request: Request, client: Socket): Response {
    val req = BinaryTool.bytesToAny(request.payload)
    val reqBase = req as ReqBaseArguments
    val ok = when (request.type) {
    1 -> {
    channelSession[reqBase.channelId] = client
    println("[BrokerServer] channel 创建成功!")
    true
    }
    2 -> {
    channelSession.remove(reqBase.channelId)
    println("[BrokerServer] channel 销毁成功!")
    true
    }
    3 -> virtualHost.exchangeDeclare(req as ExchangeDeclareReq)
    4 -> virtualHost.exchangeDelete(req as ExchangeDeleteReq)
    5 -> virtualHost.queueDeclare(req as QueueDeclareReq)
    else -> throw RuntimeException("[BrokerServer] 客户端请求 type 非法!")
    }
    val respBase = RespBaseArguments(reqBase.rid, reqBase.channelId, ok)
    val payload = BinaryTool.anyToBytes(respBase)
    Response(request.type, payload.size, payload)
    }
    private fun removeChannelSession(client: Socket) {
    val channelIdList = mutableListOf
    ()
    for (entry in channelSession) {
    if (entry.value == client) {
    channelIdList.add(entry.key)
    }
    }
    for (channelId in channelIdList) {
    channelSession.remove(channelId)
    }
    }
    }

    三、Connection 和 Channel 实现

    3.1 Connection 的功能概述

    Connection 是一个 TCP 连接的抽象表示,其主要功能是为多个 Channel 提供通讯服务。为了提高效率,Connection 实现了连接的复用机制,每个 Connection 可以维护多个 Channel。

    3.2 Channel 的功能概述

    Channel 是 Connection 中的一个逻辑连接,用于实现客户端与 VirtualHost 之间的通信。Channel 提供了一系列方法(如 createChannel()exchangeDeclare() 等),用于调用 VirtualHost 的服务。

    3.3 Connection 的工作流程

  • 建立连接:通过 newConnection() 方法创建一个新的 Connection 实例。
  • 创建 Channel:调用 createChannel() 方法,生成一个新的 Channel 实例,并通过 TCP 通道发送创建请求到 VirtualHost。
  • 等待响应:Channel 会阻塞等待 VirtualHost 的响应,通过 waitResp() 方法提取响应数据,并将其交给客户端处理。
  • 处理消息:客户端通过 putRespToChannel() 方法将响应数据传递给对应的 Channel,完成消息的处理。
  • 3.4 Channel 的实现代码

    class Channel(private val channelId: String, private val connection: Connection) {
    private val ridRespMap = ConcurrentHashMap
    ()
    private val locker = Object()
    fun generateRid(): String {
    return "R-${UUID.randomUUID()}"
    }
    fun waitResp(rid: String): RespBaseArguments {
    while (ridRespMap[rid] == null) {
    synchronized(locker) {
    locker.wait()
    }
    }
    return ridRespMap[rid]!!
    }
    fun notifyResp(respBase: RespBaseArguments) {
    ridRespMap[respBase.rid] = respBase
    synchronized(locker) {
    locker.notifyAll()
    }
    }
    fun createChannel(): Boolean {
    val reqBase = ReqBaseArguments(generateRid(), channelId)
    val payload = BinaryTool.anyToBytes(reqBase)
    val req = Request(1, payload.size, payload)
    connection.writeReq(req)
    val respBase = waitResp(reqBase.rid)
    return respBase.ok
    }
    fun removeChannel(): Boolean {
    val reqBase = ReqBaseArguments(generateRid(), channelId)
    val payload = BinaryTool.anyToBytes(reqBase)
    val req = Request(2, payload.size, payload)
    connection.writeReq(req)
    val respBase = waitResp(reqBase.rid)
    return respBase.ok
    }
    fun exchangeDeclare(
    name: String,
    type: ExchangeType,
    durable: Boolean,
    autoDelete: Boolean,
    arguments: MutableMap
    ): Boolean {
    val exchangeDeclareReq = ExchangeDeclareReq(
    name = name,
    type = type,
    durable = durable,
    autoDelete = autoDelete,
    arguments = arguments,
    rid = generateRid(),
    channelId = channelId
    )
    val payload = BinaryTool.anyToBytes(exchangeDeclareReq)
    val req = Request(3, payload.size, payload)
    connection.writeReq(req)
    val respBase = waitResp(exchangeDeclareReq.rid)
    return respBase.ok
    }
    fun exchangeDelete(name: String): Boolean {
    val exchangeDeleteReq = ExchangeDeleteReq(
    name = name,
    rid = generateRid(),
    channelId = channelId
    )
    val payload = BinaryTool.anyToBytes(exchangeDeleteReq)
    val req = Request(4, payload.size, payload)
    connection.writeReq(req)
    val respBase = waitResp(exchangeDeleteReq.rid)
    return respBase.ok
    }
    fun queueDeclare(
    name: String,
    durable: Boolean,
    exclusive: Boolean,
    autoDelete: Boolean,
    arguments: MutableMap
    ): Boolean {
    val queueDeclareReq = QueueDeclareReq(
    name = name,
    durable = durable,
    exclusive = exclusive,
    autoDelete = autoDelete,
    arguments = arguments,
    rid = generateRid(),
    channelId = channelId
    )
    val payload = BinaryTool.anyToBytes(queueDeclareReq)
    val req = Request(5, payload.size, payload)
    connection.writeReq(req)
    val resp = waitResp(queueDeclareReq.rid)
    return resp.ok
    }
    }

    四、Demo 与 实现说明

    4.1 启动 BrokerServer

    fun main() {
    val server = BrokerServer(9000)
    server.start()
    }

    4.2 客户端连接

    class Test2 {
    fun main() {
    val factory = ConnectionFactory("127.0.0.1", 9000)
    val connection = factory.newConnection()
    val channel = connection.createChannel()
    val ok1 = channel.createChannel()
    val ok2 = channel.exchangeDeclare("e1", ExchangeType.DIRECT, false, false, mutableMapOf())
    val ok3 = channel.removeChannel()
    println("ok1: $ok1, ok2: $ok2, ok3: $ok3")
    }
    }

    结论

    通过上述实现,可以看到自定义协议在模拟实现 RabbitMQ 的过程中发挥了重要作用。BrokerServer 作为中间服务,负责接收客户端请求并代理转发;Connection 和 Channel 则实现了 TCP 连接的复用和逻辑连接的管理。整个系统通过自定义协议实现了高效的消息传输和服务调用,具有一定的扩展性和实用性。

    转载地址:http://httfk.baihongyu.com/

    你可能感兴趣的文章
    PHP判断指定目录下是否存在文件
    查看>>
    php判断数组是否为空
    查看>>
    PHP判断数组是否有重复值、获取重复值
    查看>>
    PHP利用正则表达式实现手机号码中间4位用星号(*)替换显示
    查看>>
    PHP加密与安全的最佳实践
    查看>>
    PHP区分 企业微信浏览器 | 普通微信浏览器 | 其他浏览器
    查看>>
    php原生代码怎么连表查询,PHP tp5中使用原生sql查询代码实例
    查看>>
    PHP去掉转义符
    查看>>
    php去除字符串开头或末尾的字符(例如逗号)
    查看>>
    php反射api
    查看>>
    PHP反射ReflectionClass、ReflectionMethod 入门教程
    查看>>
    PHP反射机制
    查看>>
    php取当天的最后一秒_Docker快速搭建PHP开发环境详细教程
    查看>>
    php取绝对值
    查看>>
    PHP变量内容的获取
    查看>>
    php各种常用的算法
    查看>>
    php各种缓存策略对比
    查看>>
    RabbitMQ高级特性 - 消息分发(限流、负载均衡)
    查看>>
    php后台“爬虫”模拟登录第三方系统
    查看>>
    php后台的在控制器中就可以实现阅读数增加
    查看>>