1 Star 0 Fork 8

chenp / blog

forked from 1264644959 / blog 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
关于 kafka.md 3.33 KB
一键复制 编辑 原始数据 按行查看 历史
zyl-fun 提交于 2020-08-17 16:40 . commit

关于 kafka


下载-安装-问题

链接:https://pan.baidu.com/s/1RKTD807egx4oPwocMX0mig 提取码:w5s9 复制这段内容后打开百度网盘手机App,操作更方便哦


案例 https://github.com/muscledreamer/Kafka_Demo/tree/master/KfkClient_pykafka


kafka信息消费 https://www.kingname.info/2020/03/23/how-kafka-consume/


为什么学习 kafka https://www.kingname.info/2019/12/14/use-kakfa-in-spider/


使用python读写kafka https://www.kingname.info/2020/03/23/operate-kafka-by-python/

注意 不要安装 kafka库, 要安装 kafka-python,亲自踩坑,反复横跳

参考文章的代码有个地方需要注意

image-20200804185613747

要写成列表的形式


python查看kafka所有的topic


win10安装kafkahttps://blog.csdn.net/github_38482082/article/details/82112641

需要注意,路径一定不要太长,我直接放在 c盘下 C:\kafka_2.12-2.5.0

路径太长在启动服务的时候会报错,亲自踩坑

image-20200804154857446

出现 binding to port 则表示 zookeeper启动成功


实际测试操作

image-20200804154959795

但是测试中文为什么无法被消费者接收到,待解答


WARN Exception causing close of session 0x0: Unreasonable length = 1919902837 (org.apache.zookeeper.server.NIOServerCnxn)


raise Errors.UnrecognizedBrokerVersion() kafka.errors.UnrecognizedBrokerVersion: UnrecognizedBrokerVersion


python2 python3 的坑

kafka kafka-python的安装切换

本人使用 pipenv 出现了bug


self._sslobj.do_handshake() OSError: [Errno 0] Error


更新2020年8月17日

INFO shutting down (kafka.server.KafkaServer)

重新启动报错需要删除 kafka-logs文件夹


使用基本流程

.\bin\windows\zookeeper-server-start.bat  .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

my_pro生产者

import json
import os
import  time
import datetime

import kafka


producer = kafka.KafkaProducer( bootstrap_servers=["localhost:9092"], value_serializer = lambda m:json.dumps(m).encode())

for i in range(100):
    data = {
        "num":i,
        "ts":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")


    }
    producer.send("howtousekafka", data)
    time.sleep(1)

my_con消费者

import kafka
consumer = kafka.KafkaConsumer("howtousekafka",bootstrap_servers=["localhost:9092"],group_id = "test_2",auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)

微博的使用

关键词生产者,关键词很多

一个生产者可以依次爬取多个关键词对应的最新消息,一个消费者根据前面传过的key来筛选自己想要的关键词的消息

根据 groupid的不同,可以对同一个主题的数据进行不同的处理,入库,统计,供数据分析等

爬取数据的时候可以多线程(生产者)

接收数据的时候也可以多线程(消费者)


如何过滤数据,以微博文章的id为标识,如果已经存在,则不再进行生产此条数据

使用去重表

id进行标识


img

马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/nchenp/blog.git
git@gitee.com:nchenp/blog.git
nchenp
blog
blog
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891