diff --git a/doc/BrokerDesgin.md b/doc/BrokerDesgin.md index 2355c22b605793ff7690f81f54d0b28d68b5344a..3e006c65e37b3ce453bdb1da7cf9dbdd320f0777 100644 --- a/doc/BrokerDesgin.md +++ b/doc/BrokerDesgin.md @@ -39,3 +39,35 @@ ServerAddress => Voted Tracker list, MqServer's votes get down to some extent(de **BrokerRouteTable.removeServer** **BrokerRouteTable.updateVotes** + + + + + + +###------------------------------Tracker Algorithm----------------------------- + + ServerInfo: + + ServerAddress: { Address:String, SslEnabled:Boolean } + + ServerVersion: String + + TopicTable: { TopicName=>TopicInfo } + + TrackerInfo: + + TrackedServerList: [ServerAddress] + + TopicTable:{ TopicName=>{ServerAddress=>TopicInfo} } + + Algorithm MergeTrackerInfo(trackerInfo[0], trackerInfo[1]...): TrackerInfo + -- 1. Calculate LiveServerList[ServerAddress] based on votes. + Count on ServerAddress on every trackerInfo[i] + Keep ServerAddres's count>=#ActiveAccoun + + -- 2. Merge TopicTable based on TopicName and ServerAddress (If conflicts: trackerInfo1>trackerInfo2...) + NewTopicTable = { TopicName=>{ServerAddress=>TopicInfo} } + For Each trackerInfo in [trackerInfo[0], trackerInfo[1]...] + Merge trackerInfo.TopicTable to NewTopicTable (If entry exists no overwrite) + + -- 3. Remove dead Server entries + For Each ToicName in NewTopicTable: + TopicServerTable = NewTopicTable[TopicName] + Remove entry in TopicServerTable if entry's ServerAddress NOT in LiveServerList + Remove entry in NewTopicTable, if entry's TopicServerTable is empty. \ No newline at end of file diff --git a/src/main/java/io/zbus/mq/Broker.java b/src/main/java/io/zbus/mq/Broker.java index 69ff0d02d4bd93846f51d92fa7e09ddaef89b045..338a650a7753422af4157dae167609531b852514 100644 --- a/src/main/java/io/zbus/mq/Broker.java +++ b/src/main/java/io/zbus/mq/Broker.java @@ -269,7 +269,7 @@ public class Broker implements Closeable { for(ServerAddress serverAddress : serverAddressList){ final MqClient client = connectToServer(serverAddress); try { - ServerInfo info = client.queryServer(); + TrackerInfo info = client.queryTracker(); addrSet.addAll(info.trackedServerList); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/src/main/java/io/zbus/mq/Message.java b/src/main/java/io/zbus/mq/Message.java index 25c9f0e37fea451e0d0959ab227d91d4b1d3700e..35481c657b35db0118da51acf5c08013f1c0c0f4 100644 --- a/src/main/java/io/zbus/mq/Message.java +++ b/src/main/java/io/zbus/mq/Message.java @@ -293,11 +293,11 @@ public class Message { return this; } - public String getServer(){ - return this.getHeader(SERVER); + public String getHost(){ + return this.getHeader(HOST); } - public void setServer(String value){ - this.setHeader(SERVER, value); + public void setHost(String value){ + this.setHeader(HOST, value); } public String getSender() { diff --git a/src/main/java/io/zbus/mq/MqClient.java b/src/main/java/io/zbus/mq/MqClient.java index 3e318caa6cb8f97dd00057de14d9ae66030cd068..ddbe6eb2fdd480646a7a71e9e69dbe587028dbec 100644 --- a/src/main/java/io/zbus/mq/MqClient.java +++ b/src/main/java/io/zbus/mq/MqClient.java @@ -6,6 +6,7 @@ import io.zbus.kit.JsonKit; import io.zbus.mq.Protocol.ConsumeGroupInfo; import io.zbus.mq.Protocol.ServerInfo; import io.zbus.mq.Protocol.TopicInfo; +import io.zbus.mq.Protocol.TrackerInfo; import io.zbus.mq.net.MessageClient; import io.zbus.net.EventDriver; @@ -63,9 +64,17 @@ public class MqClient extends MessageClient{ return res; } + public TrackerInfo queryTracker() throws IOException, InterruptedException{ + Message msg = new Message(); + msg.setCommand(Protocol.TRACKER); + + Message res = invokeSync(msg, invokeTimeout); + return parseResult(res, TrackerInfo.class); + } + public ServerInfo queryServer() throws IOException, InterruptedException{ Message msg = new Message(); - msg.setCommand(Protocol.QUERY); + msg.setCommand(Protocol.SERVER); Message res = invokeSync(msg, invokeTimeout); return parseResult(res, ServerInfo.class); diff --git a/src/main/java/io/zbus/mq/Protocol.java b/src/main/java/io/zbus/mq/Protocol.java index 2a8cfd8b4f416e5b5ccb42c9796421832c447281..fbe894864bd964ceefa4558507218e04d8ed6b41 100644 --- a/src/main/java/io/zbus/mq/Protocol.java +++ b/src/main/java/io/zbus/mq/Protocol.java @@ -24,6 +24,8 @@ public class Protocol { //High Availability (HA) public static final String TRACK_PUB = "track_pub"; public static final String TRACK_SUB = "track_sub"; + public static final String TRACKER = "tracker"; + public static final String SERVER = "server"; public static final String TRACE = "trace"; //trace latest message in server public static final String VERSION = "version"; @@ -55,13 +57,13 @@ public class Protocol { public static final String RECVER = "recver"; public static final String ID = "id"; - public static final String SERVER = "server"; + public static final String HOST = "host"; public static final String ACK = "ack"; public static final String ENCODING = "encoding"; - public static final String ORIGIN_ID = "origin_id"; //original id, TODO compatible issue: rawid - public static final String ORIGIN_URL = "origin_url"; //original URL - public static final String ORIGIN_STATUS = "origin_status"; //original Status TODO compatible issue: reply_code + public static final String ORIGIN_ID = "origin_id"; + public static final String ORIGIN_URL = "origin_url"; + public static final String ORIGIN_STATUS = "origin_status"; //Security public static final String TOKEN = "token"; @@ -123,9 +125,11 @@ public class Protocol { } public static class ServerEvent{ - public ServerAddress serverAddress; + //public ServerAddress serverAddress; + public ServerInfo serverInfo; public boolean live = true; } + public static class TrackItem { public ServerAddress serverAddress; @@ -133,16 +137,17 @@ public class Protocol { public Exception error; //current item error encountered } - public static class TrackerInfo extends TrackItem{ + public static class TrackerInfo extends TrackItem { + public List trackerList; public List trackedServerList; + public Map serverTable; } - public static class ServerInfo extends TrackerInfo{ - public List trackerList; + public static class ServerInfo extends TrackItem { public Map topicTable = new ConcurrentHashMap(); } - public static class TopicInfo extends TrackItem{ + public static class TopicInfo extends TrackItem { public String topicName; public int mask; diff --git a/src/main/java/io/zbus/mq/server/MqAdaptor.java b/src/main/java/io/zbus/mq/server/MqAdaptor.java index 5471d9422e951e813c64988f2cb0df6a9484c1cf..82b0be5885c84df17626a7767a7a337e6550ef15 100644 --- a/src/main/java/io/zbus/mq/server/MqAdaptor.java +++ b/src/main/java/io/zbus/mq/server/MqAdaptor.java @@ -78,6 +78,9 @@ public class MqAdaptor extends MessageAdaptor implements Closeable { //Tracker registerHandler(Protocol.TRACK_PUB, trackPubServerHandler); registerHandler(Protocol.TRACK_SUB, trackSubHandler); + registerHandler(Protocol.TRACKER, trackerHandler); + + registerHandler(Protocol.SERVER, serverHandler); //Monitor/Management @@ -490,6 +493,20 @@ public class MqAdaptor extends MessageAdaptor implements Closeable { } }; + private MessageHandler trackerHandler = new MessageHandler() { + + @Override + public void handle(Message msg, Session session) throws IOException { + ReplyKit.replyJson(msg, session, tracker.trackerInfo()); + } + }; + + private MessageHandler serverHandler = new MessageHandler() { + public void handle(Message msg, Session sess) throws IOException { + ReplyKit.replyJson(msg, sess, mqServer.serverInfo()); + } + }; + private MessageHandler traceHandler = new MessageHandler() { @Override @@ -661,7 +678,7 @@ public class MqAdaptor extends MessageAdaptor implements Closeable { public void sessionMessage(Object obj, Session sess) throws IOException { Message msg = (Message)obj; msg.setSender(sess.id()); - msg.setServer(mqServer.getServerAddress().address); + msg.setHost(mqServer.getServerAddress().address); msg.setRemoteAddr(sess.getRemoteAddress()); if(msg.getId() == null){ msg.setId(UUID.randomUUID().toString()); diff --git a/src/main/java/io/zbus/mq/server/MqServer.java b/src/main/java/io/zbus/mq/server/MqServer.java index 7b834969d88a4e93b4e9d7afb1fcf0cce951a22b..1d60f5fe0522f99fd7bd57193888f51cd359f38a 100644 --- a/src/main/java/io/zbus/mq/server/MqServer.java +++ b/src/main/java/io/zbus/mq/server/MqServer.java @@ -88,7 +88,7 @@ public class MqServer implements Closeable{ }, 1000, config.cleanMqInterval, TimeUnit.MILLISECONDS); messageTracer = new MessageTracer(); - tracker = new Tracker(serverAddress, eventDriver, config.sslCertFileTable, + tracker = new Tracker(this, config.sslCertFileTable, !config.trackerModeOnly, config.trackReportInterval); mqAdaptor = new MqAdaptor(this); @@ -169,9 +169,7 @@ public class MqServer implements Closeable{ } ServerInfo info = new ServerInfo(); info.serverAddress = serverAddress; - info.topicTable = table; - info.trackerList = config.getTrackerList(); - info.trackedServerList = tracker.trackerInfo().trackedServerList; + info.topicTable = table; return info; } diff --git a/src/main/java/io/zbus/mq/server/Tracker.java b/src/main/java/io/zbus/mq/server/Tracker.java index d513d077b7e9c55d0cfb85333a3245a7ccd39b4b..9593f1f644ec13a163bb1947e307c0cccf93d292 100644 --- a/src/main/java/io/zbus/mq/server/Tracker.java +++ b/src/main/java/io/zbus/mq/server/Tracker.java @@ -4,7 +4,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -20,6 +19,7 @@ import io.zbus.mq.Message; import io.zbus.mq.Protocol; import io.zbus.mq.Protocol.ServerAddress; import io.zbus.mq.Protocol.ServerEvent; +import io.zbus.mq.Protocol.ServerInfo; import io.zbus.mq.Protocol.TrackerInfo; import io.zbus.mq.net.MessageClient; import io.zbus.net.Client.ConnectedHandler; @@ -33,10 +33,13 @@ public class Tracker implements Closeable{ private Map downstreamTrackers = new ConcurrentHashMap(); private Map healthyUpstreamTrackers = new ConcurrentHashMap(); - private List upstreamTrackers = new ArrayList(); + private Map upstreamTrackers = new ConcurrentHashMap(); + + private Map serverTable = new ConcurrentHashMap(); private Set subscribers = new HashSet(); + private MqServer mqServer; private EventDriver eventDriver; private final ServerAddress myServerAddress; private boolean myServerInTrack; @@ -46,44 +49,50 @@ public class Tracker implements Closeable{ protected volatile ScheduledExecutorService heartbeator = Executors.newSingleThreadScheduledExecutor(); - public Tracker(ServerAddress myServerAddress, EventDriver eventDriver, - Map sslCertFileTable, boolean myServerInTrack, long trackReportIntervalMs){ - this.myServerAddress = myServerAddress; - this.eventDriver = eventDriver.duplicate(); + public Tracker(MqServer mqServer, Map sslCertFileTable, boolean myServerInTrack, long trackReportIntervalMs){ + this.mqServer = mqServer; + this.myServerAddress = this.mqServer.getServerAddress(); + this.eventDriver = this.mqServer.getEventDriver(); this.myServerInTrack = myServerInTrack; this.sslCertFileTable = sslCertFileTable; this.heartbeator.scheduleAtFixedRate(new Runnable() { public void run() { try { - Iterator iter = upstreamTrackers.iterator(); - while(iter.hasNext()){ - MessageClient client = iter.next(); + for(MessageClient client : upstreamTrackers.values()){ try{ ServerEvent event = new ServerEvent(); - event.serverAddress = Tracker.this.myServerAddress; + event.serverInfo = serverInfo(); event.live = true; notifyUpstream(client, event); } catch (Exception e) { log.error(e.getMessage(), e); - } - } + } + } } catch (Exception e) { log.warn(e.getMessage(), e); } } }, trackReportIntervalMs, trackReportIntervalMs, TimeUnit.MILLISECONDS); } + + public ServerInfo serverInfo(){ + return mqServer.serverInfo(); + } public TrackerInfo trackerInfo(){ List serverList = new ArrayList(this.downstreamTrackers.keySet()); if(myServerInTrack){ serverList.add(myServerAddress); + serverTable.put(myServerAddress.toString(), serverInfo()); } TrackerInfo trackerInfo = new TrackerInfo(); trackerInfo.serverAddress = myServerAddress; + trackerInfo.trackerList = new ArrayList(this.upstreamTrackers.keySet()); trackerInfo.trackedServerList = serverList; + trackerInfo.serverTable = serverTable; + return trackerInfo; } @@ -113,12 +122,12 @@ public class Tracker implements Closeable{ log.info("Connected to Tracker(%s)", trackerAddress.address); healthyUpstreamTrackers.put(trackerAddress, client); ServerEvent event = new ServerEvent(); - event.serverAddress = myServerAddress; + event.serverInfo = serverInfo(); event.live = true; notifyUpstream(client, event); } }); - upstreamTrackers.add(client); + upstreamTrackers.put(trackerAddress, client); client.ensureConnectedAsync(); } } @@ -150,7 +159,7 @@ public class Tracker implements Closeable{ public void onDownstreamNotified(final ServerEvent event){ - final ServerAddress serverAddress = event.serverAddress; + final ServerAddress serverAddress = event.serverInfo.serverAddress; if(myServerAddress.equals(serverAddress)){//myServer changes, just ignore return; } @@ -180,11 +189,13 @@ public class Tracker implements Closeable{ }catch (Exception e) { log.error(e.getMessage(), e); } + serverTable.put(serverAddress.toString(), event.serverInfo); return; } if(!event.live){ //server down + serverTable.remove(serverAddress.toString()); MessageClient downstreamTracker = downstreamTrackers.remove(serverAddress); if(downstreamTracker != null){ try { @@ -200,7 +211,7 @@ public class Tracker implements Closeable{ public void myServerChanged() { ServerEvent event = new ServerEvent(); - event.serverAddress = myServerAddress; + event.serverInfo = serverInfo(); event.live = true; for(MessageClient tracker : healthyUpstreamTrackers.values()){ @@ -256,7 +267,7 @@ public class Tracker implements Closeable{ @Override public void close() throws IOException { this.heartbeator.shutdown(); - for(MessageClient client : upstreamTrackers){ + for(MessageClient client : upstreamTrackers.values()){ client.close(); } upstreamTrackers.clear(); diff --git a/zbus-dist/lib/zbus-0.8.0-SNAPSHOT.jar b/zbus-dist/lib/zbus-0.8.0-SNAPSHOT.jar index f34eac54b9d572b0b06417c4fd52125de995efa2..eea10923c661c842d591bdf2f5d09296f509edad 100644 Binary files a/zbus-dist/lib/zbus-0.8.0-SNAPSHOT.jar and b/zbus-dist/lib/zbus-0.8.0-SNAPSHOT.jar differ