同步操作将从 开放金融技术/zbus 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
- 企业SOA服务治理
摈弃所谓的企业级,回归到服务总线,做两件简单的事情:
ZBus追求Z(Zero)的轻量级理念,尽量少的配置,尽量小的设计,跨平台几百K拷贝随时随地运行(Linux简单源码编译)
public class RpcService {
public static void main(String[] args) throws Exception {
RpcServiceHandler handler = new RpcServiceHandler();
//简单添加模块
handler.addModule("ServiceInterface", new TestService());
//注册服务名为MyRpc的服务到ZBus总线上
ZBusService service = new ZBusService("MyRpc", handler);
service.setHost("127.0.0.1");
service.setPort(15555);
service.setWorkerThreadCount(2); //以2个工作者线程运行
service.run();
}
}
其中TestService可以是任何JAVA代码逻辑
public class ServiceProxyClient {
public static void main(String[] args) throws Throwable {
ServiceInterface rpc = RpcFactory.getService(
ServiceInterface.class,
"zbus://localhost:15555/MyRpc");
String pong = rpc.echo();
System.out.println(pong);
int c = rpc.plus(1, 2);
System.out.println(c);
RpcFactory.destroy();
}
}
消息队列支持消息发布者PUB发布消息,消息消费者SUB订阅消息。消息队列种类包括:
请求应答服务设计:
消息由 消息业务 与 消息控制 两大部分组成
- 消息出队列[mq_fetch_msg]
mq_fetch_msg(mq_t* self, char* recver)
1.1 如果消息队列模式为Roller负载均衡模式,从公共消息队列头部取出消息,同时删除接收者队列中对应的节点,否则进入下面的操作
1.2 根据接收者查找对应私有消息队列,删除从消息队列头部取出消息,并将指向的公共消息队列节点清楚
增加订阅者[mq_put_recver] [见代码,略]
删除订阅者[mq_rem_recver] [见代码,略]
pub与sub可以分裂,由不同的socket,也可以由相同的socket完成
创建客户端 zbus_connect()
销毁客户端 zbus_destroy()
创建队列 zbus_createmq 在ZBus总线上创建队列
发布消息 zbus_pub 发布PUB消息指令,可选的ACK设置
同步订阅消息 zbus_sub 发送SUB消息队列指令,等到消息至超时
同步请求服务 zbus_call(zmsg) 请求服务,等待返回
注册提供服务 zbus_serve(service_handler)
以ZBUS实现RPC为例,ZBUS内部都是Binary二进制格式处理数据,具体业务协议格式与ZBUS本身透明。ZBUS的Java API提供了一个默认的基于JSON协议的RPC实现
服务端可以接口化,也可单纯的POJO,没有任何限制,这里采用服务Interface。经过简单代理封装后,服务端与客户端代码几乎与ZBUS无关(透明化总线的存在)
服务接口,客户端只需要这个接口类
public interface ServiceInterface {
public String echo(String ping);
public int plus(int a, int b);
public byte[] bin();
}
服务具体实现 MyService中唯一给ZBUS侵入的是注解@Remote,用于方便Java API检查哪些方法需要注册,如果采用默认全部注册的规则,则完全可以与ZBUS无关
class MyService implements ServiceInterface {
@Remote()
public String echo(String ping){
return ping;
}
@Remote()
public int plus(int a, int b) {
return a + b;
}
@Remote
public byte[] bin() {
byte[] res = new byte[100];
for (int i = 0; i < res.length; i++)
res[i] = (byte) i;
return res;
}
}
最后一步是如何把上述服务注册到ZBUS总线,Java API可以很简单注册如下
public class RpcService {
public static void main(String[] args) throws Exception {
RpcServiceHandler handler = new RpcServiceHandler();
//简单添加模块
handler.addModule("ServiceInterface", new TestService());
//注册服务名为MyRpc的服务到ZBus总线上
ZBusService service = new ZBusService("MyRpc", handler);
service.setHost("127.0.0.1");
service.setPort(15555);
service.setWorkerThreadCount(2); //以2个工作者线程运行
service.run();
}
}
###客户端
通过RpcFactory动态获取ServiceInterface的动态实现类
public class RpcFactoryExample {
public static void main(String[] args) throws Throwable {
ServiceInterface rpc = RpcFactory.getService(
ServiceInterface.class,
"zbus://localhost:15555/MyRpc");
String pong = rpc.echo("ping");
System.out.println(pong);
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。