1 Star 0 Fork 0

pushiqiang / event

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Message processing framework (redis/kafka/rabbitmq)

Requirement: python:3.7

PEP8 Check

yapf -dr . | (! grep '.')
pylava .
isort -rc . --check-only --diff

Usages

test

1. cd examples
2. setup docker container by `sudo docker-compose up -d`
3. enter container terminal_1 by `sudo docker exec -it your_container_id bash` and run main.py
4. enter a new container terminal_2 and run send_message.py 

in django

# event_manager.py
import os
from event import Manager
from event.server.kafka import Server as KafkaServer
from event.server.rabbitmq import Server as RabbitmqServer
from event.server.redis import Server as RedisServer

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
manager = Manager(base_dir=BASE_DIR)
manager.register_server('redis', RedisServer(url='redis://redis/3'))
manager.register_server('kafka', KafkaServer(url='kafka:9092'))

# wsgi.py
import os
from django.core.wsgi import get_wsgi_application
from .event_manager import manager

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "example.settings")
application = get_wsgi_application()
manager.run_in_django()

# handlers.py
from example.event_manager import manager

redis_server = manager.get_server('redis')
kafka_server = manager.get_server('kafka')

@redis_server.handler(channels=['example:test:django'])
def handle_example_django_redis(message):
    print('Received redis message', message)


@kafka_server.handler(topics=['example-test-django'])
def handle_example_django_kafka(message):
    print('Received kafka message: ', message, message.value)

# views.py
from django.shortcuts import HttpResponse
from django.views import View
from example.event_manager import manager

redis_server = manager.get_server('redis')
kafka_server = manager.get_server('kafka')

class TestView(View):
    def get(self, request, *args, **kwargs):
        redis_server.publish_wait(channel='example:test:django',
                                  message={
                                      'test_id': 'redis_id',
                                      'message': 'redis good'
                                  })
        kafka_server.publish_wait(topic='example-test-django',
                                  message={
                                      'test_id': 'kafka_id',
                                      'message': 'kafka good'
                                  })
        return HttpResponse('good')

redis

# Run server
import os

from event import Manager
from event.server.redis import Server

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
manager = Manager(base_dir=BASE_DIR)
config = {
    'url': 'redis://redis/3',
}
server = Server(**config)
manager.register_server('default', server)

@server.handler(channels=['example:test:redis'])
def handle_example_message(message):
    print('Received message: ', message)

manager.run_forever()

# send message
await server.publish(channel='example-test-redis',
                     message={
                         'test_id': 'redis',
                         'message': 'good test'
                     })
# OR
server.publish_soon(channel='example-test-redis',
                    message={
                        'test_id': 'redis',
                        'message': 'good test'
                    })
# OR
server.publish_wait(channel='example-test-redis',
                    message={
                        'test_id': 'redis',
                        'message': 'good test'
                    })

kafka

# Run server
import os

from event import Manager
from event.server.kafka import Server

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
manager = Manager(base_dir=BASE_DIR)
config = {
    'url': 'kafka:9092',
}
server = Server(**config)
manager.register_server('default', server)

@server.handler(topics=['example-test-kafka'])
def handle_example_message(message):
    print('Received message: ', message, message.value)

manager.run_forever()

# send message
await server.publish(topic='example-test-kafka',
                     message={
                         'test_id': 'kafka',
                         'message': 'good test'
                     })
# OR
server.publish_soon(topic='example-test-kafka',
                    message={
                        'test_id': 'kafka',
                        'message': 'good test'
                    })

rabbitmq

# Run server
import os

from event import Manager
from event.server.rabbitmq import Server

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
manager = Manager(base_dir=BASE_DIR)
config = {
    'url': 'amqp://rabbitmq:5672',
    'exchange': 'test',
    'exchange_type': 'topic',
}
server = Server(**config)
manager.register_server('default', server)

@server.handler(routing_key='example-test-rabbitmq',
                queue='example-test-rabbitmq-queue')
def handle_example_message(message):
    print('Received message: ', message)

manager.run_forever()

# send message
await server.publish(routing_key='example-test-rabbitmq',
                     message={
                         'test_id': 'rabbitmq',
                         'message': 'good test'
                     })
# OR
server.publish_soon(routing_key='example-test-rabbitmq',
                    message={
                        'test_id': 'rabbitmq',
                        'message': 'good test'
                    })

TODO-LIST

  • 消息异常处理
  • 消息处理确认
  • 消息处理重试

空文件

简介

Message processing framework (redis/kafka/rabbitmq ) 展开 收起
Python
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/pushiqiang/event.git
git@gitee.com:pushiqiang/event.git
pushiqiang
event
event
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891