在本文中,我们将为您详细介绍大数据还是大炒作,这是一个问题——为什么NoSQL很重要的相关知识,并且为您解答关于nosql在大数据体系中的作用的疑问,此外,我们还会提供一些关于AI:是猫还是狗,这是个
在本文中,我们将为您详细介绍大数据还是大炒作,这是一个问题 ——为什么NoSQL很重要的相关知识,并且为您解答关于nosql在大数据体系中的作用的疑问,此外,我们还会提供一些关于AI:是猫还是狗,这是个问题、K8s 选 cgroupfs 还是 systemd?这是一个问题、netty 系列之:选 byte 还是选 message? 这是一个问题、netty系列之:选byte还是选message?这是一个问题的有用信息。
本文目录一览:- 大数据还是大炒作,这是一个问题 ——为什么NoSQL很重要(nosql在大数据体系中的作用)
- AI:是猫还是狗,这是个问题
- K8s 选 cgroupfs 还是 systemd?这是一个问题
- netty 系列之:选 byte 还是选 message? 这是一个问题
- netty系列之:选byte还是选message?这是一个问题
大数据还是大炒作,这是一个问题 ——为什么NoSQL很重要(nosql在大数据体系中的作用)
现如今,有关“大数据”的各种评论和赞美每天都占据着人们的视线,但作为更加重要且更加有现实意义的一点,大数据技术对现实公司的显著影响,尤其是在与顾客的交互方面的极大提升却鲜有人关注。不可否认的是,大数据确实是革命性的,但前提是它关注的焦点需要集中在这项技术本身真正可以解决的问题上,以及大数据对这个世界起到的推动作用上。在现实的商业社会中,被动的、消极的公司只会着眼于解决现有的问题,而主动的、积极的、有远见的公司则已经开始寻找新的技术来帮助他们能够以更加激动人心的新方式与顾客进行交互,并由此来产生有价值的数据,用以帮助公司制定下一步赢得竞争击败对手的计划。
Nosql 是大数据的一种基础技术,它正引领着新一代的数据库来解决那些困扰传统数据库已久的挑战。可以说,Nosql数据库就是为解决关系型数据库的一系列主要问题而生的:
关系型数据库可扩展性差
扩展关系型数据库代价高昂(服务器昂贵,新功能开发周期长)
关系型数据库难以处理多类型数据
关系型数据库难以处理特定的数据类型(非结构化数据,大量的文本,传感器数据)
因此,在绝大多数企业中,从过去简单地使用数据来进行信息管理向现代利用大数据来创造极致用户体验的转变,因为传统数据库自身的限制成为了不切实际的挑战。于是一些先驱者们已经开始寻找新的替代者了——这就是 Nosql。国内某视频网站巨头就是一个绝佳的Nosql成功应用案例。这家视频网站仅运营数据,每天收集到的各类访问日志总量已经达到TB级,经分析及压缩处理后留存下来的历史运营数据已达数百TB,而且很快将会达到 PB级。通过追踪用户的每一次页面浏览、评论收藏、视频播放以及播放时的各种操作来达成以下目标:1.确保用户体验不断地提升;2.分析并创造用户喜欢的内容; 3.更加精准的广告投放。而这家视频网站企业在自营原创流媒体领域的快速发展就是其完美利用Nosql的最好证明,一步步利用其在大数据利用领域的优势赶超着传统媒体企业。
这家视频网站只是众多例子中的一个。为数众多的有远见的公司正在利用类似的方法重新设计他们与用户、供应商和合作者之间的交互,创造分析着大量的各类数据,将他们的事业推向全新的高度。这些新的交互方式不仅产生了全新类型的数据,而且还创造出更加丰富的数据集,在这样一个史无前例的规模下,我们却又回到了问题的原点——关系型数据库难以处理应对这其中的任何一种情景。
在今天需要处理丰富海量数据的网络应用环境中,Nosql的发展与应用非常繁荣。它使得企业变得更加敏捷迅速,尤其是在部署新功能的时候,同时也变得更加灵活(存储更加多种多样复杂的数据类型)。而它的扩展也相对廉价和简单的多。例如一组支持大规模Oracle部署的硬件动辄就耗费数百万美元,而使用Nosql解决方案可以将开销轻易控制在百万美元以下。如果再算上在软件上的投入,这两者的差距会进一步增加。小型企业甚至可以用低于5万美元的低廉成本使用Nosql部署自己的内部应用(在云端甚至更加低廉),方便他们更加灵活敏捷地将产品展示在顾客面前。
随着企业持续地创新,开辟与顾客交互的新方式,并创造出大量的数据和不同的数据类型,企业必然将不断面临类似这样的问题:“我们何时才会需要Nosql来解决问题?”或是“我们当下怎样才能更好地利用Nosql来改善我们的产品?”。有远见的公司正在思考着第二个问题。
AI:是猫还是狗,这是个问题
首发自公众号:RAIS
如果你不喜欢小猫和小狗,你可能不知道他们具体是哪一种品种,但是一般来说,你都能区分出这是猫还是狗,猫和狗的特征还是不一样的,那我们如何用机器学习的方法训练一个网络区分猫狗呢?
我们选用的是 Kaggle 的一个数据集(https://www.kaggle.com/c/dogs...,用神经网络的方法进行模型的训练。下载下来的数据集对于我们测试来说数据有点大,这里面分别有 12500 个猫和狗的训练图片,我们先来缩小一下训练集,然后再进行模型的搭建和训练。我们的做法做法是猫和狗分别选择 1000 个训练图片,500 个验证集和 500 个测试集,我们可以手工完成这个工作,需要做的就是:
// 如下非可执行代码,含义非常清楚的表达,最后会附上可执行代码
mkdir dog-vs-cats-small
cp dog-vs-cats/train/cat/pic-{0-999}.jpg dog-vs-cats-small/train/cat/
cp dog-vs-cats/train/dog/pic-{0-999}.jpg dog-vs-cats-small/train/dog/
cp dog-vs-cats/validation/cat/pic-{1000-1499}.jpg dog-vs-cats-small/validation/cat/
cp dog-vs-cats/validation/dog/pic-{1000-1499}.jpg dog-vs-cats-small/validation/dog/
cp dog-vs-cats/test/cat/pic-{1500-1999}.jpg dog-vs-cats-small/test/cat/
cp dog-vs-cats/test/dog/pic-{1500-1999}.jpg dog-vs-cats-small/test/dog/
从我们前面文章的经验中,我们可以知道,这个卷积神经网络我们可以用 relu 激活的 Conv2D 层与 MaxPooling2D 层堆叠而成,与之前相比稍微需要修改就是网络的大小,更大的网络处理更多是数据。
卷积神经网络网络的深度往往与特征图的尺寸负相关,越深的网络每个特征图的尺寸往往是越小的,我看到的数据往往是:深度 32-> 128,特征图尺寸 150x150 -> 7x7。如下,这是我们构架的网络:
优化器依旧采用 RMSprop,学习率由默认的 0.001 设置为 0.0001,后续我们也将 对不同的优化器进行介绍。由于需要输出的结果是“猫 or 狗”,所以我们最后一层激活参数为 sigmoid,自然损失函数就为 binary_crossentropy 了,如此一来,网络就构建好了,接下来就应该喂给网络数据了。
由于我们这里是一张又一张的图片,jpg 格式,这可不是我们网络所喜欢的格式,需要进行处理,读出图片,将其解码为 RGB 像素,再将 RGB 中的像素值转换成浮点数进行计算,又由于我们的网络对于处理 0-1 之间的数效果更好,因此我们需要将像素值转换区间,即从 0-255 转换到 0-1,是不是觉得有点麻烦,确实!Keras 之所以说是最容易上手的深度学习框架,就是因为它同样把这些繁琐但是使用的工具内置了,Image 包下的 ImageDataGenerator 就可以帮上大忙,这样我们就可以得到 RGB 图像与二进制标签组成的批量。
接下来,我们就要对数据进行拟合了,fit_generator,上面的生成器也将传给它,这样,这一个网络我们就建立完成了,可以进行训练了,与前文一样,我们仍然画出损失曲线和精度曲线。
训练精度逐渐接近百分之百,提醒我们注意过拟合的危险;训练精度在第五次(或六次)次后就维持在 70%左右不再上升了。
第五次或第十次后,验证损失就达到了最小值,嗯……,很显然,过拟合了,我们需要降低过拟合。
出现过拟合的原因是学习样本太少了,我们采用 数据增强 来解决这个问题。我们的做法就是在现有的训练数据中生成更多的训练数据,就是增加一些随机变换,这种随机变化生成的图片依然要保证是有效的。这样模型在训练的时候就可以看到不同的更多的图像了,这就使得训练出的模型泛化能力更好。怎么做呢,就可以把图片进行随机的旋转,缩放,平移和翻转等,ImageDataGenerator 提供了这样的能力。同时在密集层之前添加一个 Dropout 层,会更好的降低过拟合,如此一来,看看结果:
可以看出来,效果好了很多。训练精度至少可以到达 80%,再想大幅度提高精度,就需要一些其他的方法了,下一篇文章我们再聊。
老规矩,附上全部代码:
#!/usr/bin/env python3
import os
import shutil
import time
import matplotlib.pyplot as plt
from keras import layers
from keras import models
from keras import optimizers
from keras.preprocessing.image import ImageDataGenerator
def make_small():
original_dataset_dir = ''/Users/renyuzhuo/Desktop/cat/dogs-vs-cats/train''
base_dir = ''/Users/renyuzhuo/Desktop/cat/dogs-vs-cats-small''
os.mkdir(base_dir)
train_dir = os.path.join(base_dir, ''train'')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, ''validation'')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, ''test'')
os.mkdir(test_dir)
train_cats_dir = os.path.join(train_dir, ''cats'')
os.mkdir(train_cats_dir)
train_dogs_dir = os.path.join(train_dir, ''dogs'')
os.mkdir(train_dogs_dir)
validation_cats_dir = os.path.join(validation_dir, ''cats'')
os.mkdir(validation_cats_dir)
validation_dogs_dir = os.path.join(validation_dir, ''dogs'')
os.mkdir(validation_dogs_dir)
test_cats_dir = os.path.join(test_dir, ''cats'')
os.mkdir(test_cats_dir)
test_dogs_dir = os.path.join(test_dir, ''dogs'')
os.mkdir(test_dogs_dir)
fnames = [''cat.{}.jpg''.format(i) for i in range(1000)]
for fname in fnames:
src = os.path.join(original_dataset_dir, fname)
dst = os.path.join(train_cats_dir, fname)
shutil.copyfile(src, dst)
fnames = [''cat.{}.jpg''.format(i) for i in range(1000, 1500)]
for fname in fnames:
src = os.path.join(original_dataset_dir, fname)
dst = os.path.join(validation_cats_dir, fname)
shutil.copyfile(src, dst)
fnames = [''cat.{}.jpg''.format(i) for i in range(1500, 2000)]
for fname in fnames:
src = os.path.join(original_dataset_dir, fname)
dst = os.path.join(test_cats_dir, fname)
shutil.copyfile(src, dst)
fnames = [''dog.{}.jpg''.format(i) for i in range(1000)]
for fname in fnames:
src = os.path.join(original_dataset_dir, fname)
dst = os.path.join(train_dogs_dir, fname)
shutil.copyfile(src, dst)
fnames = [''dog.{}.jpg''.format(i) for i in range(1000, 1500)]
for fname in fnames:
src = os.path.join(original_dataset_dir, fname)
dst = os.path.join(validation_dogs_dir, fname)
shutil.copyfile(src, dst)
fnames = [''dog.{}.jpg''.format(i) for i in range(1500, 2000)]
for fname in fnames:
src = os.path.join(original_dataset_dir, fname)
dst = os.path.join(test_dogs_dir, fname)
shutil.copyfile(src, dst)
def cat():
base_dir = ''/Users/renyuzhuo/Desktop/cat/dogs-vs-cats-small''
train_dir = os.path.join(base_dir, ''train'')
validation_dir = os.path.join(base_dir, ''validation'')
model = models.Sequential()
model.add(layers.Conv2D(32, (3, 3), activation=''relu'', input_shape=(150, 150, 3)))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation=''relu''))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation=''relu''))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Conv2D(128, (3, 3), activation=''relu''))
model.add(layers.MaxPooling2D((2, 2)))
model.add(layers.Flatten())
model.add(layers.Dropout(0.5))
model.add(layers.Dense(512, activation=''relu''))
model.add(layers.Dense(1, activation=''sigmoid''))
model.summary()
model.compile(loss=''binary_crossentropy'', optimizer=optimizers.RMSprop(lr=1e-4), metrics=[''acc''])
# train_datagen = ImageDataGenerator(rescale=1. / 255)
train_datagen = ImageDataGenerator(
rescale=1. / 255,
rotation_range=40,
width_shift_range=0.2,
height_shift_range=0.2,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True, )
test_datagen = ImageDataGenerator(rescale=1. / 255)
train_generator = train_datagen.flow_from_directory(
train_dir,
target_size=(150, 150),
batch_size=32,
class_mode=''binary'')
validation_generator = test_datagen.flow_from_directory(
validation_dir,
target_size=(150, 150),
batch_size=32,
class_mode=''binary'')
history = model.fit_generator(
train_generator,
steps_per_epoch=100,
epochs=100,
validation_data=validation_generator,
validation_steps=50)
model.save(''cats_and_dogs_small_2.h5'')
acc = history.history[''acc'']
val_acc = history.history[''val_acc'']
loss = history.history[''loss'']
val_loss = history.history[''val_loss'']
epochs = range(len(acc))
plt.plot(epochs, acc, ''bo'', label=''Training acc'')
plt.plot(epochs, val_acc, ''b'', label=''Validation acc'')
plt.title(''Training and validation accuracy'')
plt.legend()
plt.show()
plt.figure()
plt.plot(epochs, loss, ''bo'', label=''Training loss'')
plt.plot(epochs, val_loss, ''b'', label=''Validation loss'')
plt.title(''Training and validation loss'')
plt.legend()
plt.show()
if __name__ == "__main__":
time_start = time.time()
# make_small()
cat()
time_end = time.time()
print(''Time Used: '', time_end - time_start)
- 首发自公众号:RAIS
K8s 选 cgroupfs 还是 systemd?这是一个问题
原文链接:https://xyz.uscwifi.xyz/post/C6TKCS8wZ/
什么是 cgroup
Cgroup 是一个 Linux 内核特性,对一组进程的资源使用(CPU、内存、磁盘 I/O 和网络等)进行限制、审计和隔离。
cgroups (Control Groups) 是 linux 内核提供的一种机制,这种机制可以根据需求把一系列系统任务及其子任务整合 (或分隔) 到按资源划分等级的不同组内,从而为系统资源管理提供一个统一的框架。简单说,cgroups 可以限制、记录任务组所使用的物理资源。本质上来说,cgroups 是内核附加在程序上的一系列钩子 (hook),通过程序运行时对资源的调度触发相应的钩子以达到资源追踪和限制的目的。
什么是 cgroupfs
docker 默认的 Cgroup Driver 是 cgroupfs
$ docker info | grep cgroup
Cgroup Driver: cgroupfs
Cgroup 提供了一个原生接口并通过 cgroupfs 提供(从这句话我们可以知道 cgroupfs 就是 Cgroup 的一个接口的封装)。类似于 procfs 和 sysfs,是一种虚拟文件系统。并且 cgroupfs 是可以挂载的,默认情况下挂载在 /sys/fs/cgroup 目录。
什么是 Systemd?
Systemd 也是对于 Cgroup 接口的一个封装。systemd 以 PID1 的形式在系统启动的时候运行,并提供了一套系统管理守护程序、库和实用程序,用来控制、管理 Linux 计算机操作系统资源。
为什么使用 systemd 而不是 croupfs
这里引用以下 kubernetes 官方的原话 [1]:
❝当某个 Linux 系统发行版使用 systemd[2] 作为其初始化系统时,初始化进程会生成并使用一个 root 控制组(
cgroup
),并充当 cgroup 管理器。Systemd 与 cgroup 集成紧密,并将为每个 systemd 单元分配一个 cgroup。你也可以配置容器运行时和 kubelet 使用cgroupfs
。连同 systemd 一起使用cgroupfs
意味着将有两个不同的 cgroup 管理器。单个 cgroup 管理器将简化分配资源的视图,并且默认情况下将对可用资源和使用 中的资源具有更一致的视图。 当有两个管理器共存于一个系统中时,最终将对这些资源产生两种视图。在此领域人们已经报告过一些案例,某些节点配置让 kubelet 和 docker 使用
cgroupfs
,而节点上运行的其余进程则使用 systemd; 这类节点在资源压力下 会变得不稳定。
ubuntu 系统,debian 系统,centos7 系统,都是使用 systemd 初始化系统的。systemd 这边已经有一套 cgroup 管理器了,如果容器运行时和 kubelet 使用 cgroupfs,此时就会存在 cgroups 和 systemd 两种 cgroup 管理器。也就意味着操作系统里面存在两种资源分配的视图,当操作系统上存在 CPU,内存等等资源不足的时候,操作系统上的进程会变得不稳定。
我们可以简单得理解为一山不要容二虎,一个国家只能有一个国王。
注意事项: 不要尝试修改集群里面某个节点的 cgroup 驱动,如果有需要,最好移除该节点重新加入。
如何修改 docker 默认的 cgroup 驱动
增加 "exec-opts": ["native.cgroupdriver=systemd"]
配置,重启 docker 即可
$ cat /etc/docker/daemon.json
{
"exec-opts": ["native.cgroupdriver=systemd"],
"registry-mirrors": [
"https://docker.mirrors.ustc.edu.cn",
"http://hub-mirror.c.163.com"
],
"max-concurrent-downloads": 10,
"log-driver": "json-file",
"log-level": "warn",
"log-opts": {
"max-size": "10m",
"max-file": "3"
},
"data-root": "/var/lib/docker"
}
kubelet 配置 cgroup 驱动
参考官方 [3]
❝说明: 在版本 1.22 中,如果用户没有在
KubeletConfiguration
中设置cgroupDriver
字段,kubeadm init
会将它设置为默认值systemd
。
# kubeadm-config.yaml
kind: ClusterConfiguration
apiVersion: kubeadm.k8s.io/v1beta3
kubernetesVersion: v1.21.0
---
kind: KubeletConfiguration
apiVersion: kubelet.config.k8s.io/v1beta1
cgroupDriver: systemd
然后使用 kubeadm 初始化
$ kubeadm init --config kubeadm-config.yaml
引用链接
[1]
原话: https://kubernetes.io/zh-cn/docs/setup/production-environment/container-runtimes/#cgroup-drivers
[2]
systemd: https://www.freedesktop.org/wiki/Software/systemd/
[3]
官方: https://kubernetes.io/zh-cn/docs/tasks/administer-cluster/kubeadm/configure-cgroup-driver/
netty 系列之:选 byte 还是选 message? 这是一个问题
简介
UDT 给了你两种选择,byte stream 或者 message, 到底选哪一种呢?经验告诉我们,只有小学生才做选择题,而我们应该全都要!
类型的定义
UDT 的两种类型是怎么定义的呢?
翻看 com.barchart.udt 包,可以发现这两种类型定义在 TypeUDT 枚举类中。
STREAM(1),
DATAGRAM(2),
一个叫做 STREAM,它的 code 是 1。一个叫做 DATAGRAM,他的 code 是 2.
根据两个不同的类型我们可以创建不同的 selectorProvider 和 channelFactory。而这两个正是构建 netty 服务所需要的。
在 NioUdtProvider 这个工具类中,netty 为我们提供了 TypeUDT 和 KindUDT 的六种组合 ChannelFactory,他们分别是:
用于 Stream 的:BYTE_ACCEPTOR,BYTE_CONNECTOR,BYTE_RENDEZVOUS。
和用于 Message 的:MESSAGE_ACCEPTOR,MESSAGE_CONNECTOR 和 MESSAGE_RENDEZVOUS。
同样的,还有两个对应的 SelectorProvider, 分别是:
BYTE_PROVIDER 和 MESSAGE_PROVIDER.
搭建 UDT stream 服务器
如果要搭建 UDT stream 服务器,首先需要使用 NioUdtProvider.BYTE_PROVIDER 来创建 NioEventLoopGroup:
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);
这里,我们创建两个 eventLoop,分别是 acceptLoop 和 connectLoop。
接下来就是在 ServerBootstrap 中绑定上面的两个 group,并且指定 channelFactory。这里我们需要 NioUdtProvider.BYTE_ACCEPTOR:
final ServerBootstrap boot = new ServerBootstrap();
boot.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 10)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(final UdtChannel ch) {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new UDTByteEchoServerHandler());
}
});
就这么简单。
搭建 UDT message 服务器
搭建 UDT message 服务器的步骤和 stream 很类似,不同的是需要使用 NioUdtProvider.MESSAGE_PROVIDER 作为 selectorProvider:
final NioEventLoopGroup acceptGroup =
new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
然后在绑定 ServerBootstrap 的时候使用 NioUdtProvider.MESSAGE_ACCEPTOR 作为 channelFactory:
final ServerBootstrap boot = new ServerBootstrap();
boot.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.MESSAGE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 10)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(final UdtChannel ch)
throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new UDTMsgEchoServerHandler());
}
});
同样很简单。
Stream 和 Message 的 handler
不同的 UDT 类型,需要使用不同的 handler。
对于 Stream 来说,它的底层是 byte, 所以我们的消息处理也是以 byte 的形式进行的,我们以下面的方式来构建 message:
private final ByteBuf message;
message = Unpooled.buffer(UDTByteEchoClient.SIZE);
message.writeBytes("www.flydean.com".getBytes(StandardCharsets.UTF_8));
然后使用 ctx.writeAndFlush (message) 将其写入到 channel 中。
对于 message 来说,它实际上格式对 ByteBuf 的封装。netty 中有个对应的类叫做 UdtMessage:
public final class UdtMessage extends DefaultByteBufHolder
UdtMessage 是一个 ByteBufHolder,所以它实际上是一个 ByteBuf 的封装。
我们需要将 ByteBuf 封装成 UdtMessage:
private final UdtMessage message;
final ByteBuf byteBuf = Unpooled.buffer(UDTMsgEchoClient.SIZE);
byteBuf.writeBytes("www.flydean.com".getBytes(StandardCharsets.UTF_8));
message = new UdtMessage(byteBuf);
然后将这个 UdtMessage 发送到 channel 中:
ctx.writeAndFlush(message);
这样你就学会了在 UDT 协议中使用 stream 和 message 两种数据类型了。
总结
大家可能觉得不同的数据类型原来实现起来这么简单。这全都要归功于 netty 优秀的封装和设计。
感谢 netty!
本文的例子可以参考:learn-netty4
本文已收录于 http://www.flydean.com/40-netty-udt-support-2/
最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!
欢迎关注我的公众号:「程序那些事」, 懂技术,更懂你!
netty系列之:选byte还是选message?这是一个问题
简介
UDT给了你两种选择,byte stream或者message,到底选哪一种呢?经验告诉我们,只有小学生才做选择题,而我们应该全都要!
类型的定义
UDT的两种类型是怎么定义的呢?
翻看com.barchart.udt包,可以发现这两种类型定义在TypeUDT枚举类中。
STREAM(1),
DATAGRAM(2),
一个叫做STREAM,它的code是1。一个叫做DATAGRAM,他的code是2.
根据两个不同的类型我们可以创建不同的selectorProvider和channelFactory。而这两个正是构建netty服务所需要的。
在NioUdtProvider这个工具类中,netty为我们提供了TypeUDT和KindUDT的六种组合ChannelFactory,他们分别是:
用于Stream的:BYTE_ACCEPTOR,BYTE_CONNECTOR,BYTE_RENDEZVOUS。
和用于Message的:MESSAGE_ACCEPTOR,MESSAGE_CONNECTOR和MESSAGE_RENDEZVOUS。
同样的,还有两个对应的SelectorProvider,分别是:
BYTE_PROVIDER 和 MESSAGE_PROVIDER.
搭建UDT stream服务器
如果要搭建UDT stream服务器,首先需要使用NioUdtProvider.BYTE_PROVIDER来创建NioEventLoopGroup:
final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.BYTE_PROVIDER);
final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1, connectFactory, NioUdtProvider.BYTE_PROVIDER);
这里,我们创建两个eventLoop,分别是acceptLoop和connectLoop。
接下来就是在ServerBootstrap中绑定上面的两个group,并且指定channelFactory。这里我们需要NioUdtProvider.BYTE_ACCEPTOR:
final ServerBootstrap boot = new ServerBootstrap();
boot.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 10)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(final UdtChannel ch) {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new UDTByteEchoServerHandler());
}
});
就这么简单。
搭建UDT message服务器
搭建UDT message服务器的步骤和stream很类似,不同的是需要使用NioUdtProvider.MESSAGE_PROVIDER作为selectorProvider:
final NioEventLoopGroup acceptGroup =
new NioEventLoopGroup(1, acceptFactory, NioUdtProvider.MESSAGE_PROVIDER);
final NioEventLoopGroup connectGroup =
new NioEventLoopGroup(1, connectFactory, NioUdtProvider.MESSAGE_PROVIDER);
然后在绑定ServerBootstrap的时候使用NioUdtProvider.MESSAGE_ACCEPTOR作为channelFactory:
final ServerBootstrap boot = new ServerBootstrap();
boot.group(acceptGroup, connectGroup)
.channelFactory(NioUdtProvider.MESSAGE_ACCEPTOR)
.option(ChannelOption.SO_BACKLOG, 10)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<UdtChannel>() {
@Override
public void initChannel(final UdtChannel ch)
throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new UDTMsgEchoServerHandler());
}
});
同样很简单。
Stream和Message的handler
不同的UDT类型,需要使用不同的handler。
对于Stream来说,它的底层是byte,所以我们的消息处理也是以byte的形式进行的,我们以下面的方式来构建message:
private final ByteBuf message;
message = Unpooled.buffer(UDTByteEchoClient.SIZE);
message.writeBytes("www.flydean.com".getBytes(StandardCharsets.UTF_8));
然后使用ctx.writeAndFlush(message)将其写入到channel中。
对于message来说,它实际上格式对ByteBuf的封装。netty中有个对应的类叫做UdtMessage:
public final class UdtMessage extends DefaultByteBufHolder
UdtMessage是一个ByteBufHolder,所以它实际上是一个ByteBuf的封装。
我们需要将ByteBuf封装成UdtMessage:
private final UdtMessage message;
final ByteBuf byteBuf = Unpooled.buffer(UDTMsgEchoClient.SIZE);
byteBuf.writeBytes("www.flydean.com".getBytes(StandardCharsets.UTF_8));
message = new UdtMessage(byteBuf);
然后将这个UdtMessage发送到channel中:
ctx.writeAndFlush(message);
这样你就学会了在UDT协议中使用stream和message两种数据类型了。
总结
大家可能觉得不同的数据类型原来实现起来这么简单。这全都要归功于netty优秀的封装和设计。
感谢netty!
本文的例子可以参考:learn-netty4
本文已收录于 http://www.flydean.com/40-netty-udt-support-2/
最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!
欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!
关于大数据还是大炒作,这是一个问题 ——为什么NoSQL很重要和nosql在大数据体系中的作用的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于AI:是猫还是狗,这是个问题、K8s 选 cgroupfs 还是 systemd?这是一个问题、netty 系列之:选 byte 还是选 message? 这是一个问题、netty系列之:选byte还是选message?这是一个问题的相关知识,请在本站寻找。
本文标签: