同步操作将从 开放金融技术/zbus 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
##ZBUS 特性
##ZBUS 通讯基础之znet
##ZBUS SDK
##ZBUS 桥接
总线默认占用15555端口, http://localhost:15555 可以直接进入监控,注意zbus因为原生兼容HTTP协议所以监控与消息队列使用同一个端口
高可用模式启动总线 分别启动ZbusServer与TrackServer,无顺序之分,默认ZbusServer占用15555端口,TrackServer占用16666端口。
<dependency>
<groupId>org.zbus</groupId>
<artifactId>zbus</artifactId>
<version>5.2.0</version>
</dependency>
//1)创建Broker代理【重量级对象,需要释放】
SingleBrokerConfig config = new SingleBrokerConfig();
config.setBrokerAddress("127.0.0.1:15555");
final Broker broker = new SingleBroker(config);
//2) 创建生产者 【轻量级对象,不需要释放,随便使用】
Producer producer = new Producer(broker, "MyMQ");
producer.createMQ(); //如果已经确定存在,不需要创建
Message msg = new Message();
msg.setBody("hello world");
Message res = producer.sendSync(msg, 1000);
System.out.println(res);
//3)销毁Broker
broker.close();
//1)创建Broker代表
SingleBrokerConfig brokerConfig = new SingleBrokerConfig();
brokerConfig.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(brokerConfig);
MqConfig config = new MqConfig();
config.setBroker(broker);
config.setMq("MyMQ");
//2) 创建消费者
@SuppressWarnings("resource")
Consumer c = new Consumer(config);
c.onMessage(new MessageCallback() {
public void onMessage(Message msg, Session sess) throws IOException {
System.out.println(msg);
}
});
参考源码test目下的rpc部分
SingleBrokerConfig config = new SingleBrokerConfig();
config.setBrokerAddress("127.0.0.1:15555");
Broker broker = new SingleBroker(config);
RpcConfig rpcConfig = new RpcConfig();
rpcConfig.setBroker(broker);
rpcConfig.setMq("MyRpc");
//动态代理处Interface通过zbus调用的动态实现类
Interface hello = RpcProxy.getService (Interface.class, rpcConfig);
Object[] res = hello.objectArray();
for (Object obj : res) {
System.out.println(obj);
}
Object[] array = new Object[] { getUser("rushmore"), "hong", true, 1,
String.class };
int saved = hello.saveObjectArray(array);
System.out.println(saved);
Class<?> ret = hello.classTest(String.class);
System.out.println(ret);
无任何代码侵入使得你已有的业务接口接入到zbus,获得跨平台和多语言支持
<!-- 暴露的的接口实现示例 -->
<bean id="interface" class="org.zbus.rpc.biz.InterfaceImpl"></bean>
<bean id="serviceHandler" class="org.zbus.client.rpc.RpcServiceHandler">
<constructor-arg>
<list>
<!-- 放入你需要暴露的接口 ,其他配置基本不变-->
<ref bean="interface"/>
</list>
</constructor-arg>
</bean>
<!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
<bean id="broker" class="org.zbus.client.broker.SingleBroker">
<constructor-arg>
<bean class="org.zbus.client.broker.SingleBrokerConfig">
<property name="brokerAddress" value="127.0.0.1:15555" />
</bean>
</constructor-arg>
</bean>
<!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
<bean id="zbusService" class="org.zbus.client.service.Service" init-method="start">
<constructor-arg>
<bean class="org.zbus.client.service.ServiceConfig">
<property name="broker" ref="broker"/>
<property name="mq" value="MyRpc"/>
<property name="threadCount" value="2"/>
<property name="serviceHandler" ref="serviceHandler"/>
</bean>
</constructor-arg>
</bean>
<!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
<bean id="broker" class="org.zbus.client.broker.SingleBroker">
<constructor-arg>
<bean class="org.zbus.client.broker.SingleBrokerConfig">
<property name="brokerAddress" value="127.0.0.1:15555" />
</bean>
</constructor-arg>
</bean>
<!-- 动态代理由RpcProxy的getService生成,需要知道对应的MQ配置信息(第二个参数) -->
<bean id="interface" class="org.zbus.client.rpc.RpcProxy" factory-method="getService">
<constructor-arg type="java.lang.Class" value="org.zbus.rpc.biz.Interface"/>
<constructor-arg>
<bean class="org.zbus.client.rpc.RpcConfig">
<property name="broker" ref="broker"/>
<property name="mq" value="MyRpc"/>
</bean>
</constructor-arg>
</bean>
Spring完成zbus代理透明化,zbus设施从你的应用逻辑中彻底消失
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("ZbusSpringClient.xml");
Interface intf = (Interface) context.getBean("interface");
System.out.println(intf.listMap());
}
ZBUS协议继承于HTTP协议格式,主体采用HTTP头部协议扩展完成,HTTP协议由HTTP头部和HTTP包体组成, ZBUS协议在HTTP头部KV值做了扩展,支持浏览器方式直接访问,但是ZBUS链接机制采用保持长连接方式。 原则上, 编写客户端SDK只需要遵循下面ZBUS协议扩展即可。
ZBUS扩展头部主要是完成
扩展头部Key-Value解释
###1. 消息命令 命令标识,决定Broker(ZbusServer|TrackServer)的处理
cmd: produce | consume | request | heartbeat | admin(默认值)
###2. 消息队列寻址
mq: 消息目标队列
mq_reply: 消息回复队列
###3. 异步消息匹配 msgid: 消息唯一UUID
msgid_raw: 原始消息唯一UUID, 消息消费路由后ID发生变化,该字段保留最原始的消息ID
###4. 安全控制 token: 访问控制码,不填默认空
###5. 其他可扩展 broker: 消息经过Broker的地址
topic: 消息主题,发布订阅时使用
ack: 是否需要对当前消息ACK,不填默认true
encoding: 消息体的编码格式
sub_cmd: 管理命令的二级命令
###6 HTTP头部第一行,ZBUS协议保持一致理解
请求:GET|POST URI
应答:200 OK
URI做扩展Key-Value的字符串理解
请求格式
应答格式(在启用ack的时候才有应答)
请求格式
应答格式
请求格式
应答格式(在启用ack的时候才有应答)
请求格式
应答格式
URI = /
监控首页 = /?cmd=admin&&method=index
URI = /MyMQ
第一个?之前理解为消息队列 mq=MyMQ
第一个?之后理解为Key-Value, URI的KV优先级低于头部扩展
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。