2 Star 2 Fork 0

Kenny小狼 / python-tools

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
docker-network-consul.py 3.45 KB
一键复制 编辑 原始数据 按行查看 历史
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import urllib
import base64
import subprocess
from sys import argv
docker_network_name = 'gkxt_default'
# docker exec -i consul consul operator raft -list-peers | awk 'gsub('8300','8500',$2) {print $2}'
api_server_url = None
# api_server_url = 'http://192.168.3.156:8500'
# docker network inspect gkxt_default --format='{{.Id}}'
network_id = None
# network_id = '7647cb7de709cec3006fb578aa954f1befb197b405b5d862dc7bbbd4492e12c8'
if network_id is None:
network_id = subprocess.check_output("docker network inspect %s --format='{{.Id}}'" % docker_network_name,
shell=True).strip()
if api_server_url is None:
api_server_url = 'http://' + subprocess.check_output(
"docker exec -i consul consul operator raft -list-peers | awk 'gsub('8300','8500',$2) {print $2}'",
shell=True).strip()
network_key = 'docker/network/v1.0/endpoint/' + network_id + '/'
print("network_id: %s" % network_id)
print("api_server_url: %s" % api_server_url)
def pp_json(json_thing, sort=True, indents=4):
if type(json_thing) is str:
print(json.dumps(json.loads(json_thing), sort_keys=sort, indent=indents))
else:
print(json.dumps(json_thing, sort_keys=sort, indent=indents))
return None
def get_json(url):
result = json.load(urllib.urlopen(url))
return result
def keys():
url = api_server_url + '/v1/kv/?keys'
return get_json(url)
def recurse():
url = api_server_url + '/v1/kv/?recurse'
return get_json(url)
def network_exist():
ks = keys()
for key in ks:
if network_key == key:
return True
return False
def get_network_services():
if network_exist():
kvs = recurse()
subs = {}
for kv in kvs:
if kv['Key'] != network_key and kv['Key'].startswith(network_key):
v_str = base64.b64decode(kv['Value'])
o = json.loads(v_str)
o['consul_key'] = kv['Key']
subs[o['name']] = o
return subs
def service_exist(name):
return name in get_network_services()
def ip_exist(ip_addr):
servs = get_service_by_ip(ip_addr)
return servs is not None
def get_service_by_ip(ip_addr):
servs = get_network_services()
for name in servs:
if servs[name]['ep_iface']['addr'] == ip_addr + '/24':
return servs[name]
return None
def get_consul_key_by_service(name):
if not service_exist(name):
return None
serv = get_network_services()[name]
return serv['consul_key']
def delete_value_by_service(name):
val = get_consul_key_by_service(name)
if val is None:
return False
url = api_server_url + '/v1/kv/' + val
try:
subprocess.check_output('curl -s --request DELETE %s ' % url, shell=True)
return True
except subprocess.CalledProcessError:
return False
if __name__ == '__main__':
if len(argv[1:]) == 0:
services = get_network_services()
for p in services:
print (p)
elif len(argv) >= 3:
fnc = argv[1]
v = argv[2]
if fnc == 'delete':
print (delete_value_by_service(v))
elif fnc == 'ip_exist':
print (ip_exist(v))
elif fnc == 'ip':
print (pp_json(get_service_by_ip(v)))
elif fnc == 'service_exist':
print (service_exist(v))
elif fnc == 'service':
print (pp_json(get_network_services()[v]))
Python
1
https://gitee.com/kennylee/python-tools.git
git@gitee.com:kennylee/python-tools.git
kennylee
python-tools
python-tools
master

搜索帮助