Lang's Blog

Keep writing, Keep loving

本文安装配置环境为多个ubuntu20.04 集群,安装包皆来自apache官网,分别为kafka2.4.1 + zookeeper3.6.3 + jdk1.8,在文章末尾附上自用的zookeeper、kafka群起脚本
注: 本文不包含jdk安装,详情见我的上一篇jdk1.8+hadoop3.2+scala2.12+spark3.1.2配置记录

Zookeeper3.6.3安装配置(已完成解压,目录为/home/hadoop/Zookeeper)

复制配置文件并重命名

1
cp ~/Zookeeper/conf/zoo_sample.cfg ~/Zookeeper/conf/zoo.cfg

新建Data文件夹,存放运行数据

1
mkdir ~/Zookeeper/zkData

修改配置文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/hadoop/Zookeeper/zkData
# the port at which the clients will connect
clientPort=2181
#######################cluster##########################
server.1=hadoop101:2888:3888
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
# server.A=B:C:D。
# A,文件myid里面的数据就是A的值,位于zkData根目录;
# B,当前服务器地址;
# C,集群中Follower与Leader交换信息的端口;
# D,执行选举时服务器相互通信的端口。

运行:

1
/home/hadoop/Zookeeper/bin/zkServer.sh start

不同服务器中应修改对应的myid内容,文件路径为Zookeeper/zkData/myid
将该文件中的数值改成对应服务器序号即可,如我的hadoop101中保持1,hadoop102修改成2,以此类推。

注: 也可参考jdk1.8+hadoop3.2+scala2.12+spark3.1.2配置记录中将bin文件夹注入到.bashrc中作为环境变量使用

Kafka2.4.1安装配置(已完成解压,目录为/home/hadoop/Kafka)

修改Kafka/config/server.properties文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
############################# Server Basics #############################
# broker的全局唯一编号,不能重复
broker.id=1
# 删除topic功能使用
delete.topic.enable=true

############################# Socket Server Settings #############################
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘IO的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600

############################# Log Basics #############################
# kafka运行日志存放的路径
log.dirs=/home/hadoop/Kafka/logs
# topic在当前broker上的分区个数
num.partitions=1
# 用来恢复和清理data目录中数据的线程数量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# NULL

############################# Log Retention Policy #############################
# 日志segment文件保留的最长时间,超时将被删除
log.retention.hours=168
# 日志段文件的最大大小,超过将新建日志段。
log.segment.bytes=1073741824
# 日志段删除的时间间隔
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
# 配置连接Zookeeper集群地址
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
# 连接到ZooKeeper的超时时间(毫秒)
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

注: 在每个服务器中该文件中的broker.id值应不同
修改kafka中的zookeeper配置,文件路径:Kafka/config/zookeeper.properties,修改其中dataDir为响应路径即可,其他属性按需修改

1
dataDir=/home/hadoop/Zookeeper

运行:

1
/home/hadoop/Kafka/bin/kafka-server-start.sh -daemon /home/hadoop/Kafka/config/server.properties

当然在安装过程中也遇到不少问题,一下列出代表性问题,并给出解决方案:
1、使用群启脚本启动kafka时日志报错:

1
nohup: failed to run command 'java': No such file or directory

解决方案
修改Kafka/bin/kafka-run-class.sh中内容,在该文件头部添加需要使用的JAVA_HOME绝对路径即可(网传在群起脚本中添加的source /etc/profile无果可采取这种办法):

1
export JAVA_HOME="/home/hadoop/JDK"

2、启动后kafka闪退,日志报错zookeeper连接2181超时
解决方案
该错误有以下几点可能:

  1. 防火墙策略问题
  2. zookeeper未安装
  3. zookeeper配置错误
  4. zookeeper没在启动Kafka之前启动

3、zookeeper本地启动无问题,但群起Zookeeper脚本会导致日志报错JAVA_HOME找不到
解决方案
和问题1一样,添加JAVA_HOME的绝对路径,文件路径为Zookeeper/bin/zkEnv.sh

1
JAVA_HOME="/home/hadoop/JDK"

4、群起脚本跑完后发现只有一台机器成功运行zookeeper
重点关注上文提及的唯一编号的属性值(zookeeper:myid;kafka:broker.id)是否在每台机器上作出修改,再考虑配置错误

群起脚本附录

Zookeeper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#! /bin/bash

case $1 in
"start"){
for i in `cat /home/hadoop/Hadoop/etc/hadoop/workers`
do
echo "========== $i =========="
ssh $i "/home/hadoop/Zookeeper/bin/zkServer.sh start"
echo $?
done
};;
"stop"){
for i in `cat /home/hadoop/Hadoop/etc/hadoop/workers`
do
echo "========== $i =========="
ssh $i "/home/hadoop/Zookeeper/bin/zkServer.sh stop"
echo $?
done
};;
"status"){
for i in `cat /home/hadoop/Hadoop/etc/hadoop/workers`
do
echo "========== $i =========="
ssh $i "/home/hadoop/Zookeeper/bin/zkServer.sh status"
echo $?
done
};;
esac

Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#! /bin/bash

case $1 in
"start"){
for i in `cat /home/hadoop/Hadoop/etc/hadoop/workers`
do
echo "========== $i =========="
ssh $i "/home/hadoop/Kafka/bin/kafka-server-start.sh -daemon /home/hadoop/Kafka/config/server.properties"
echo $?
done
};;
"stop"){
for i in `cat /home/hadoop/Hadoop/etc/hadoop/workers`
do
echo "========== $i =========="
ssh $i "/home/hadoop/Kafka/bin/kafka-server-stop.sh"
echo $?
done
};;
esac

简单粗暴

1
2
3
4
5
6
7
# tensorflow 内存不够的情况 适用于tensorflow 2.x
import tensorflow as tf

os.environ["CUDA_VISIBLE_DEVICES"] = "0" # 指定显卡
config = tf.compat.v1.ConfigProto(allow_soft_placement=True) # 开启软放置,OP放到CPU上了,为啥源码默认值是7?
config.gpu_options.per_process_gpu_memory_fraction = 0.3 # 进程最多采用30%显存,默认是1,完整使用
tf.compat.v1.keras.backend.set_session(tf.compat.v1.Session(config=config))

问题1:
第一次启动集群前需要将master进行hdfs格式化

1
hdfs namenode -format

注:若以后想要再次格式化需关闭所有namenode、datanode,并删除hadoop目录下data、logs文件夹

问题2:
启动报错“Attempting to operate on hdfs namenode as root”

可在/etc/profile.d中添加一个sh脚本注入环境变量,脚本内容如下

1
2
3
4
5
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

问题3:
jps命令查询后发现datanode、namenode、nodemanager都正常启动了,唯独不见resourcemanager

启动集群时,不应直接调用./start-all.sh,应当在master上调用./start-dfs.sh,在准备部署resourcemanager的服务器上调用./start-yarn.sh

问题4:
localhost: mv: 无法获取“XXX”状态信息

要么是xml文件配置存在错误,要么是防火墙存在问题
centos7关闭开启防火墙命令如下:

1
2
3
4
5
6
7
8
#关闭防火墙:
systemctl stop firewalld.service
#开启防火墙:
systemctl start firewalld.service
#关闭开机启动:
systemctl disable firewalld.service
#开启开机启动:
systemctl enable firewalld.service

问题5:
secondnamenode可视化查看网页打不开

同上

问题6:
启动nodemanagers时报出警告“localhost: Warning: Permanently added ‘localhost’ (ECDSA) to the list of known hosts.”

删除hadoop/etc/hadoop/workers文件中的localhost一行即可

问题描述

解码被base64编码过的字段信息,在base64编码解码工具中正常解码,但在python下的模块base64、binascii中的b64decode、a2b_base64等方法中出现解码报错
报错信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
---------------------------------------------------------------------------
Error Traceback (most recent call last)
<ipython-input-11-787bc11958b4> in get_proxies(urls)
14 try:
---> 15 raw = base64.b64decode(response)
16 except Exception as r:

c:\program files\python3\lib\base64.py in b64decode(s, altchars, validate)
86 raise binascii.Error('Non-base64 digit found')
---> 87 return binascii.a2b_base64(s)
88

Error: Incorrect padding

解决方案

python中的base64是4个4个的读取的,所以待解码的字段应当为4的倍数,不足添‘=’

1
2
# 对待解码的字段a进行判断,若为4的倍数则不变,反之缺多少补多少
a = a + '=' * (4 - len(a) % 4) if len(a) % 4 != 0 else a

模块导入,常量参数设定

1
2
3
4
5
6
7
8
9
10
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torchsummary import summary

BATCH_SIZE = 512
EPOCH = 20
DEIVCE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

数据加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(
'data',
train=True,
download=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=BATCH_SIZE, shuffle=True)


test_loader = torch.utils.data.DataLoader(
datasets.MNIST(
'data',
train=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=BATCH_SIZE, shuffle=True)

LeNet模型搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class LeNet(nn.Module):
def __init__(self):
super(LeNet, self).__init__()
# nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0,dilation=1,groups=1, bias=True)
self.conv1 = nn.Conv2d(1, 6, 5)
self.pool1 = nn.MaxPool2d(2)
self.conv2 = nn.Conv2d(6, 16, 3)
self.pool2 = nn.MaxPool2d(2)
# nn.Linear(in_features, out_features, bias=True)
self.fc1 = nn.Linear(16*5*5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

def forward(self, x):
out = F.relu(self.conv1(x)) # 24
# nn.MaxPool2d(kernel_size, stride=None, padding=0, dilation=1,return_indices=False,ceil_mode=False)
# max_pool2d(*args, **kwargs)
out = self.pool1(out) # 12
out = F.relu(self.conv2(out)) # 10
out = self.pool2(out) # 5
out = out.view(out.size(0), -1) # flatten
out = F.relu(self.fc1(out))
out = F.relu(self.fc2(out))
out = self.fc3(out)
out = F.log_softmax(out,dim=1)
return out

查看模型结构

1
2
model = LeNet()
summary(model, (1,28,28))

模型训练测试函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
model = LeNet().to(device)
optimizer = optim.Adam(model.parameters())

def train(device, model, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad() # zero all gradients
output = model(data)
loss = F.nll_loss(output, target) # calculate loss
loss.backward()
optimizer.step() # update all parameters
if (batch_idx + 1) % 30 == 0:
print(f"Train Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} Loss: {loss.item():.6f}]")


def test(device, model, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data) # predict class probability
test_loss += F.nll_loss(output, target, reduction='sum').item() # superposition whole batch loss
pred = output.max(1, keepdim=True)[1] # return max probability index
correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)
print(f'\nTest Epoch: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.2f}%)\n')

模型训练,测试

for epoch in range(1, EPOCH + 1):
    train(DEIVCE, model, train_loader, optimizer, epoch)
    test(DEIVCE, model, train_loader)

今日小技巧GET
Python 控制台输出时刷新当前行内容

为了跑代码时不至于对了白屏发呆,提高用户(自己)体验,经过几次尝试,正确代码如下:

import time
for i in range(100):
    time.sleep(0.1)
    print("\rReading image {}/{}".format(1+i,100),end='')

环境: Tensorflow2.4.1
报错: Cannot convert a symbolic Keras input/output to a numpy array. This error may indicate that you're trying to pass a symbolic value to a NumPy call, which is not supported. Or, you may be trying to pass Keras symbolic inputs/outputs to a TF API that does not register dispatching, preventing Keras from automatically converting the API call to a lambda layer in the Functional Model.
产生原因: 采用了tf下的Keras 自定义损函数
分析:
损失函数部分代码:

1
2
3
4
5
6
def vae_loss(x, x_decoded_mean):
xent_loss = original_dim * losses.binary_crossentropy(x, x_decoded_mean)
print(type(xent_loss))
kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
print(type(kl_loss))
return xent_loss + kl_loss

这是变分自编码的自定义损失函数,从中可以发现分别得到两种张量:

1
2
xent_loss --> tensorflow.python.framework.ops.Tensor
kl_loss --> tensorflow.python.keras.engine.keras_tensor.KerasTensor

自定义函数的输入是KerasTensor,但是默认的loss函数输出的是Tensor,KerasTensor和Tensor是完全不同的类,kera_tensor源码中可以发现Keras可以将Tensor转成KerasTensor,但是没发现将KerasTensor转成Tensor的部分。。。
所以。。。我们可以说:

1
Tensor+KerasTensor = KerasTensor

但是keras自定义损失函数输入的是KerasTensor,默认输出的是Tensor,而这里会导致输出KerasTensor,所以就报错了。

解决方案: 我采用的解决方案如下:

1
2
3
# 一般情况下采用该代码能够解决问题,可以发现函数返回结果已经转化成tensorflow.python.framework.ops.Tensor类了
from tensorflow.python.framework.ops import disable_eager_execution
disable_eager_execution()

所以keras还是避免使用吧,还是用的不太灵活的亚子,我还是老老实实学明白tensorflow怎么用吧~

tensorflow运行时会输出一大串的日志信息
眼花缭乱,用以下方法可以去除错误之外的日志信息(屏蔽通知信息和警告信息)

1
2
3
4
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

import tensorflow as tf

切记:日志等级设置代码要放在导入tensorflow之前!

0%