本文共 8593 字,大约阅读时间需要 28 分钟。
本文是笔者之前撰写的一个系列文章——“基于源码,模拟实现 RabbitMQ” 中的一部分。该系列文章旨在通过实际操作和实践,帮助开发者理解 RabbitMQ 的工作原理及其相关协议实现。如需了解更多内容,可参考笔者的完整系列文章。
自定义应用层协议主要定义了客户端与服务器之间消息的传输格式。该协议分为四个部分:type、length 和 payload。
payload 的长度。由于 TCP 是面向字节流的协议,数据的分割和拼接可能导致粘包问题,因此协议中加入了 length 字段,用于明确数据的边界。payload采用二进制数据序列化的方式进行传输。由于 TCP 是面向字节流的传输协议,传统的文本格式(如 JSON)在传输过程中可能导致效率低下且可读性差。因此,本文采用二进制数据序列化的方式作为 payload 的编码方案。具体实现如下:
import java.io.Serializable;class Request implements Serializable { val type: Int val length: Int val payload: ByteArray} class Response implements Serializable { val type: Int val length: Int val payload: ByteArray} 为了实现客户端与服务器之间的通信,本文对请求和响应参数进行了封装。以下是基础请求和响应参数的定义:
open class ReqBaseArguments { open val rid: String = "" open val channelId: String = ""}open class RespBaseArguments { open val rid: String open val channelId: String open val ok: Boolean = false} BrokerServer 是一个中间服务,其主要功能是接收客户端的 TCP 连接请求,并代理转发到 VirtualHost 中的具体服务。其启动时会通过 accept() 方法阻塞等待客户端连接,成功建立连接后,为每个客户端分配一个线程处理请求。
socket.accept() 等待客户端连接。type、length 和 payload,提取出 VirtualHost 中的具体调用参数。class BrokerServer(private val port: Int) { private val socket = ServerSocket(port) private val channelSession = ConcurrentHashMap () fun start() { println("[BrokerServer] 启动!") while (true) { val client = socket.accept() clientPool.submit { clientProcess(client) } } } private fun clientProcess(client: Socket) { try { val inputStream = client.getInputStream() val outputStream = client.getOutputStream() DataInputStream(inputStream).use { DataOutputStream(outputStream).use { while (true) { val request = readRequest(it) val response = process(request, client) writeResponse(response, it) } } } } catch (e: EOFException) { println("[BrokerServer] 客户端正常下线!") } catch (e: Exception) { println("[BrokerServer] 客户端连接异常!") } finally { client.close() removeChannelSession(client) } } private fun process(request: Request, client: Socket): Response { val req = BinaryTool.bytesToAny(request.payload) val reqBase = req as ReqBaseArguments val ok = when (request.type) { 1 -> { channelSession[reqBase.channelId] = client println("[BrokerServer] channel 创建成功!") true } 2 -> { channelSession.remove(reqBase.channelId) println("[BrokerServer] channel 销毁成功!") true } 3 -> virtualHost.exchangeDeclare(req as ExchangeDeclareReq) 4 -> virtualHost.exchangeDelete(req as ExchangeDeleteReq) 5 -> virtualHost.queueDeclare(req as QueueDeclareReq) else -> throw RuntimeException("[BrokerServer] 客户端请求 type 非法!") } val respBase = RespBaseArguments(reqBase.rid, reqBase.channelId, ok) val payload = BinaryTool.anyToBytes(respBase) Response(request.type, payload.size, payload) } private fun removeChannelSession(client: Socket) { val channelIdList = mutableListOf () for (entry in channelSession) { if (entry.value == client) { channelIdList.add(entry.key) } } for (channelId in channelIdList) { channelSession.remove(channelId) } }} Connection 是一个 TCP 连接的抽象表示,其主要功能是为多个 Channel 提供通讯服务。为了提高效率,Connection 实现了连接的复用机制,每个 Connection 可以维护多个 Channel。
Channel 是 Connection 中的一个逻辑连接,用于实现客户端与 VirtualHost 之间的通信。Channel 提供了一系列方法(如 createChannel()、exchangeDeclare() 等),用于调用 VirtualHost 的服务。
newConnection() 方法创建一个新的 Connection 实例。createChannel() 方法,生成一个新的 Channel 实例,并通过 TCP 通道发送创建请求到 VirtualHost。waitResp() 方法提取响应数据,并将其交给客户端处理。putRespToChannel() 方法将响应数据传递给对应的 Channel,完成消息的处理。class Channel(private val channelId: String, private val connection: Connection) { private val ridRespMap = ConcurrentHashMap () private val locker = Object() fun generateRid(): String { return "R-${UUID.randomUUID()}" } fun waitResp(rid: String): RespBaseArguments { while (ridRespMap[rid] == null) { synchronized(locker) { locker.wait() } } return ridRespMap[rid]!! } fun notifyResp(respBase: RespBaseArguments) { ridRespMap[respBase.rid] = respBase synchronized(locker) { locker.notifyAll() } } fun createChannel(): Boolean { val reqBase = ReqBaseArguments(generateRid(), channelId) val payload = BinaryTool.anyToBytes(reqBase) val req = Request(1, payload.size, payload) connection.writeReq(req) val respBase = waitResp(reqBase.rid) return respBase.ok } fun removeChannel(): Boolean { val reqBase = ReqBaseArguments(generateRid(), channelId) val payload = BinaryTool.anyToBytes(reqBase) val req = Request(2, payload.size, payload) connection.writeReq(req) val respBase = waitResp(reqBase.rid) return respBase.ok } fun exchangeDeclare( name: String, type: ExchangeType, durable: Boolean, autoDelete: Boolean, arguments: MutableMap ): Boolean { val exchangeDeclareReq = ExchangeDeclareReq( name = name, type = type, durable = durable, autoDelete = autoDelete, arguments = arguments, rid = generateRid(), channelId = channelId ) val payload = BinaryTool.anyToBytes(exchangeDeclareReq) val req = Request(3, payload.size, payload) connection.writeReq(req) val respBase = waitResp(exchangeDeclareReq.rid) return respBase.ok } fun exchangeDelete(name: String): Boolean { val exchangeDeleteReq = ExchangeDeleteReq( name = name, rid = generateRid(), channelId = channelId ) val payload = BinaryTool.anyToBytes(exchangeDeleteReq) val req = Request(4, payload.size, payload) connection.writeReq(req) val respBase = waitResp(exchangeDeleteReq.rid) return respBase.ok } fun queueDeclare( name: String, durable: Boolean, exclusive: Boolean, autoDelete: Boolean, arguments: MutableMap ): Boolean { val queueDeclareReq = QueueDeclareReq( name = name, durable = durable, exclusive = exclusive, autoDelete = autoDelete, arguments = arguments, rid = generateRid(), channelId = channelId ) val payload = BinaryTool.anyToBytes(queueDeclareReq) val req = Request(5, payload.size, payload) connection.writeReq(req) val resp = waitResp(queueDeclareReq.rid) return resp.ok }} fun main() { val server = BrokerServer(9000) server.start()} class Test2 { fun main() { val factory = ConnectionFactory("127.0.0.1", 9000) val connection = factory.newConnection() val channel = connection.createChannel() val ok1 = channel.createChannel() val ok2 = channel.exchangeDeclare("e1", ExchangeType.DIRECT, false, false, mutableMapOf()) val ok3 = channel.removeChannel() println("ok1: $ok1, ok2: $ok2, ok3: $ok3") }} 通过上述实现,可以看到自定义协议在模拟实现 RabbitMQ 的过程中发挥了重要作用。BrokerServer 作为中间服务,负责接收客户端请求并代理转发;Connection 和 Channel 则实现了 TCP 连接的复用和逻辑连接的管理。整个系统通过自定义协议实现了高效的消息传输和服务调用,具有一定的扩展性和实用性。
转载地址:http://httfk.baihongyu.com/