GVKun编程网logo

Flume02——发送数据到Flume(flume sink 到文件)

13

对于Flume02——发送数据到Flume感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍flumesink到文件,并为您提供关于FlumesinkKafkaSpoutStormBoltHbas

对于Flume02——发送数据到Flume感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍flume sink 到文件,并为您提供关于Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)、Flume 篇 ---Flume 安装配置与相关使用、flume+flume+kafka消息传递+storm消费、flume+log4j+hdfs(日志通过flume传到hdfs)的有用信息。

本文目录一览:

Flume02——发送数据到Flume(flume sink 到文件)

Flume02——发送数据到Flume(flume sink 到文件)

构建Flume事件

事件是Flume中数据的基本形式,每个Flume事件包含header的一个map集合和一个body,是表示为字节数组的有效载荷。

package org.apache.flume;
import java.util.Map;

public interface Event {
  public Map<String, String> getHeaders();
  public void setHeaders(Map<String, String> headers);
  public byte[] getBody();
  public void setBody(byte[] body);
}

Event接口的不同实现类的数据内部可能不同,只要其显示接口的指定格式的header 和 body即可。

通常使用EventBuilder API来创建事件,EventBuilder 提供了4个通常用来创建Flume事件的方法。

package org.apache.flume.event;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import org.apache.flume.Event;

public class EventBuilder {

  //方法1:headers作为map传入
  public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();
    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);
    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }
    return event;
  }

  //方法2:没有header
  public static Event withBody(byte[] body) {
    return withBody(body, null);
  }

  //方法3:传入headers , String 和 编码
  public static Event withBody(String body, Charset charset,
      Map<String, String> headers) {
    return withBody(body.getBytes(charset), headers);
  }

  //方法4:传入String 和 编码
  public static Event withBody(String body, Charset charset) {
    return withBody(body, charset, null);
  }

}

使用Flume客户端SDK

创建Flume RPC客户端

通过RpcClientFactory类创建PRC客户端实例。

package org.apache.flume.api;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Locale;
import java.util.Properties;
import org.apache.flume.FlumeException;

public class RpcClientFactory {

  /**
   * Returns an instance of {@link RpcClient}, optionally with failover.
   * To create a failover client, the properties object should have a
   * property <tt>client.type</tt> which has the value "failover". The client
   * connects to hosts specified by <tt>hosts</tt> property in given properties.
   *
   * @see org.apache.flume.api.FailoverRpcClient
   * <p>
   * If no <tt>client.type</tt> is specified, a default client that connects to
   * single host at a given port is created.(<tt>type</tt> can also simply be
   * <tt>DEFAULT</tt> for the default client).
   *
   * @see org.apache.flume.api.NettyAvroClient
   *
   * @param properties The properties to instantiate the client with.
   * @throws FlumeException
   */
  @SuppressWarnings("unchecked")
  public static RpcClient getInstance(Properties properties)
      throws FlumeException {
    String type = null;
    type = properties.getProperty(
        RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE);
    if (type == null || type.isEmpty()) {
      type = ClientType.DEFAULT.getClientClassName();
    }
    Class<? extends AbstractRpcClient> clazz;
    AbstractRpcClient client;
    try {
      String clientClassType = type;
      ClientType clientType = null;
      try{
        clientType = ClientType.valueOf(type.toUpperCase(Locale.ENGLISH));
      } catch (IllegalArgumentException e){
        clientType = ClientType.OTHER;
      }
      if (!clientType.equals(ClientType.OTHER)){
        clientClassType = clientType.getClientClassName();
      }
      clazz =
          (Class<? extends AbstractRpcClient>) Class.forName(clientClassType);
    } catch (ClassNotFoundException e) {
      throw new FlumeException("No such client!", e);
    }

    try {
      client = clazz.newInstance();
    } catch (InstantiationException e) {
      throw new FlumeException("Cannot instantiate client. " +
          "Exception follows:", e);
    } catch (IllegalAccessException e) {
      throw new FlumeException("Cannot instantiate client. " +
          "Exception follows:", e);
    }
    client.configure(properties);
    return client;

  }

  /**
   * Delegates to {@link #getInstance(Properties props)}, given a File path
   * to a {@link Properties} file.
   * @param propertiesFile Valid properties file
   * @return RpcClient configured according to the given Properties file.
   * @throws FileNotFoundException If the file cannot be found
   * @throws IOException If there is an IO error
   */
  public static RpcClient getInstance(File propertiesFile)
      throws FileNotFoundException, IOException {
    Reader reader = new FileReader(propertiesFile);
    Properties props = new Properties();
    props.load(reader);
    return getInstance(props);
  }

  /**
   * Deprecated. Use
   * {@link getDefaultInstance() getDefaultInstance(String, Integer)} instead.
   * @throws FlumeException
   * @deprecated
   */
  @Deprecated
  public static RpcClient getInstance(String hostname, Integer port)
      throws FlumeException {
    return getDefaultInstance(hostname, port);
  }

  /**
   * Returns an instance of {@link RpcClient} connected to the specified
   * {@code hostname} and {@code port}.
   * @throws FlumeException
   */
  public static RpcClient getDefaultInstance(String hostname, Integer port)
      throws FlumeException {
    return getDefaultInstance(hostname, port, 0);

  }

  /**
   * Deprecated. Use
   * {@link getDefaultInstance() getDefaultInstance(String, Integer, Integer)}
   * instead.
   * @throws FlumeException
   * @deprecated
   */
  @Deprecated
  public static RpcClient getInstance(String hostname, Integer port,
      Integer batchSize) throws FlumeException {
    return getDefaultInstance(hostname, port, batchSize);
  }

  /**
   * Returns an instance of {@link RpcClient} connected to the specified
   * {@code hostname} and {@code port} with the specified {@code batchSize}.
   * @throws FlumeException
   */
  public static RpcClient getDefaultInstance(String hostname, Integer port,
      Integer batchSize) throws FlumeException {

    if (hostname == null) {
      throw new NullPointerException("hostname must not be null");
    }
    if (port == null) {
      throw new NullPointerException("port must not be null");
    }
    if (batchSize == null) {
      throw new NullPointerException("batchSize must not be null");
    }

    Properties props = new Properties();
    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
        hostname + ":" + port.intValue());
    props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString());
    NettyAvroRpcClient client = new NettyAvroRpcClient();
    client.configure(props);
    return client;
  }

  /**
   * Return an {@linkplain RpcClient} that uses Thrift for communicating with
   * the next hop. The next hop must have a ThriftSource listening on the
   * specified port.
   * @param hostname - The hostname of the next hop.
   * @param port - The port on which the ThriftSource is listening
   * @param batchSize - batch size of each transaction.
   * @return an {@linkplain RpcClient} which uses thrift configured with the
   * given parameters.
   */
  public static RpcClient getThriftInstance(String hostname, Integer port,
    Integer batchSize) {
    if (hostname == null) {
      throw new NullPointerException("hostname must not be null");
    }
    if (port == null) {
      throw new NullPointerException("port must not be null");
    }
    if (batchSize == null) {
      throw new NullPointerException("batchSize must not be null");
    }

    Properties props = new Properties();
    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
    props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1",
      hostname + ":" + port.intValue());
    props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, batchSize.toString());
    ThriftRpcClient client = new ThriftRpcClient();
    client.configure(props);
    return client;
  }

  /**
   * Return an {@linkplain RpcClient} that uses Thrift for communicating with
   * the next hop. The next hop must have a ThriftSource listening on the
   * specified port. This will use the default batch size. See {@linkplain
   * RpcClientConfigurationConstants}
   * @param hostname - The hostname of the next hop.
   * @param port - The port on which the ThriftSource is listening
   * @return - An {@linkplain RpcClient} which uses thrift configured with the
   * given parameters.
   */
  public static RpcClient getThriftInstance(String hostname, Integer port) {
    return getThriftInstance(hostname, port, RpcClientConfigurationConstants
      .DEFAULT_BATCH_SIZE);
  }

  /**
   * Return an {@linkplain RpcClient} that uses Thrift for communicating with
   * the next hop.
   * @param props
   * @return - An {@linkplain RpcClient} which uses thrift configured with the
   * given parameters.
   */
  public static RpcClient getThriftInstance(Properties props) {
    props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE,
      ClientType.THRIFT.clientClassName);
    return getInstance(props);
  }

  public static enum ClientType {
    OTHER(null),
    DEFAULT(NettyAvroRpcClient.class.getCanonicalName()),
    DEFAULT_FAILOVER(FailoverRpcClient.class.getCanonicalName()),
    DEFAULT_LOADBALANCE(LoadBalancingRpcClient.class.getCanonicalName()),
    THRIFT(ThriftRpcClient.class.getCanonicalName());


    private final String clientClassName;

    private ClientType(String className) {
      this.clientClassName = className;
    }

    protected String getClientClassName() {
      return this.clientClassName;
    }

  }
}

阅读源码可知,该类为我们提供了几个典型的接口,从Properties读取客户端配置,从文件读取客户端配置,从host和port获取配置

其中该类有一个必要的参数就是client.type,用来指明客户端的类型。客户端的类型包括:defalut,defalut_failover,defalut_loadbalance或者thrift

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)

Flume可以应用于日志采集.在本次的介绍中,主要用于采集应用系统的日志,将日志输出到kafka,再经过storm进行实施处理.

我们会一如既往的光顾一下flume的官网,地址如下:

flume官网

下图是官网的截图,其中的标注是如何配置source以及sink,flume支持多种source和sink,我们本次使用的是监控日志文件使用tail -f 命令作为source,sink则使用sink-kafka,之前已经将kafka和storm集成,所以,日志会直接采集到storm

输入图片说明

配置如下:flume-conf.properties

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called ''agent''

a1.sources = r1  
a1.sinks = k1  
a1.channels = c1  

# Describe/configure the source  
a1.sources.r1.type = exec  
a1.sources.r1.command = tail -F /home/logs/dccfront/dataCollect.log

#Describe the sink  
#a1.sinks.k1.type = logger  
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = dccfront
a1.sinks.k1.brokerList = node2:9092,node3:9092,node4:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory  
a1.channels.c1.type = memory
a1.channels.c1.keep-alive = 60
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel  
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

so easy,接下来就是启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

启动完成时候,就可向日志文件里写日志啦.比如,我是通过访问应用,通过应用产生日志

tail -f 日志文件截图如下:

输入图片说明

storm集群获取的日志如下:

输入图片说明

/猫小鞭/

温馨提示,官方文档其实很简单,看看就会了,从此丢弃二手鞋.

输入图片说明

输入图片说明

输入图片说明

Flume 篇 ---Flume 安装配置与相关使用

Flume 篇 ---Flume 安装配置与相关使用

一。前述

Copy 过来一段介绍 Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具 / 服务,或者数集中机制。flume 具有高可用,分布式,配置工具,其设计的原理也是基于将数据流,如日志数据从各种网站服务器上汇集起来存储到 HDFS,HBase 等集中存储器中。官网:http://flume.apache.org/FlumeUserGuide.html

二。架构

1. 基本架构

介绍:

Source:(相当于一个来源)

   从数据发生器接收数据,并将接收的数据以 Flume 的 event 格式传递给一个或者多个通道 channal,Flume 提供多种数据接收的方式,比如 Avro,Thrift,twitter1% 等

Channel:(相当于一个中转)

 channal 是一种短暂的存储容器,它将从 source 处接收到的 event 格式的数据缓存起来,直到它们被 sinks 消费掉,它在 source 和 sink 间起着一共桥梁的作用,channal 是一个完整的事务,这一点保证了数据在收发的时候的一致性。并且它可以和任意数量的 source 和 sink 链接。支持的类型有: JDBC channel , File System channel , Memort channel 等.

sink:(相当于最后的写出)

  sink 将数据存储到集中存储器比如 Hbase 和 HDFS, 它从 channals 消费数据 (events) 并将其传递给目标地。目标地可能是另一个 sink, 也可能 HDFS,HBase.

2. 延伸架构

  2.1 利用 AVRO 中转

2.2 一般多个来源时可以配置这样

ps:

Avro([ævrə])是 Hadoop 的一个子项目,由 Hadoop 的创始人 Doug Cutting(也是 Lucene,Nutch 等项目的创始人) 牵头开发 Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。
三。具体实施
3.1 安装
1、上传
2、解压
3、修改 conf/flume-env.sh  文件中的 JDK 目录
 注意:JAVA_OPTS 配置  如果我们传输文件过大 报内存溢出时 需要修改这个配置项
4、验证安装是否成功  ./flume-ng version
5、配置环境变量
    export FLUME_HOME=/home/apache-flume-1.6.0-bin


3.2 Source、Channel、Sink 有哪些类型
    Flume Source
    Source 类型                   | 说明
    Avro Source                 | 支持 Avro 协议(实际上是 Avro RPC),内置支持
    Thrift Source               | 支持 Thrift 协议,内置支持
    Exec Source                 | 基于 Unix 的 command 在标准输出上生产数据
    JMS Source                   | 从 JMS 系统(消息、主题)中读取数据
    Spooling Directory Source | 监控指定目录内数据变更
    Twitter 1% firehose Source|    通过 API 持续下载 Twitter 数据,试验性质
    Netcat Source               | 监控某个端口,将流经端口的每一个文本行数据作为 Event 输入
    Sequence Generator Source | 序列生成器数据源,生产序列数据
    Syslog Sources               | 读取 syslog 数据,产生 Event,支持 UDP 和 TCP 两种协议
    HTTP Source                 | 基于 HTTP POST 或 GET 方式的数据源,支持 JSON、BLOB 表示形式
    Legacy Sources               | 兼容老的 Flume OG 中 Source(0.9.x 版本)

    Flume Channel
    Channel 类型       说明
    Memory Channel                | Event 数据存储在内存中
    JDBC Channel                  | Event 数据存储在持久化存储中,当前 Flume Channel 内置支持 Derby
    File Channel                  | Event 数据存储在磁盘文件中
    Spillable Memory Channel   | Event 数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件
    Pseudo Transaction Channel | 测试用途
    Custom Channel                | 自定义 Channel 实现

    Flume Sink
    Sink 类型     说明
    HDFS Sink             | 数据写入 HDFS
    Logger Sink           | 数据写入日志文件
    Avro Sink             | 数据被转换成 Avro Event,然后发送到配置的 RPC 端口上
    Thrift Sink           | 数据被转换成 Thrift Event,然后发送到配置的 RPC 端口上
    IRC Sink              | 数据在 IRC 上进行回放
    File Roll Sink         | 存储数据到本地文件系统
    Null Sink             | 丢弃到所有数据
    HBase Sink             | 数据写入 HBase 数据库
    Morphline Solr Sink | 数据发送到 Solr 搜索服务器(集群)
    ElasticSearch Sink     | 数据发送到 Elastic Search 搜索服务器(集群)
    Kite Dataset Sink     | 写数据到 Kite Dataset,试验性质的
    Custom Sink           | 自定义 Sink 实现



案例 1、 A simple example
    http://flume.apache.org/FlumeUserGuide.html#a-simple-example
    
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

启动 flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console 指定配置目录
flume-ng agent -n a1 -f op5 -Dflume.root.logger=INFO,console 不用指定配置目录,将上诉 source,channel,sink 的文件起名为 a1, 同时指定这个文件在哪

安装 telnet
yum install telnet
退出 ctrl+]  quit

Memory Chanel 配置
  capacity:默认该通道中最大的可以存储的 event 数量是 100,
  trasactionCapacity:每次最大可以 source 中拿到或者送到 sink 中的 event 数量也是 100
  keep-alive:event 添加到通道中或者移出的允许时间
  byte**:即 event 的字节量的限制,只包括 eventbody



案例 2、两个 flume 做集群(第一个 agent 的 sink 作为第二个 agent 的 source)

    node01 服务器中,配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = node1
    a1.sources.r1.port = 44444

    # Describe the sink
    # a1.sinks.k1.type = logger
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node2
    a1.sinks.k1.port = 60000

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    node02 服务器中,安装 Flume(步骤略)
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 60000

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    先启动 node02 的 Flume
    flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
    
    再启动 node01 的 Flume
    flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
    
    打开 telnet 测试  node02 控制台输出结果


案例 3、Exec Source(监听一个文件)
        http://flume.apache.org/FlumeUserGuide.html#exec-source
        
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/flume.exec.log

    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    启动 Flume
    flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
    
    创建空文件演示 touch flume.exec.log
    循环添加数据
    for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
        
案例 4、Spooling Directory Source(监听一个目录)
        http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

    启动 Flume
    flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

    拷贝文件演示
    mkdir logs
    cp flume.exec.log logs/


案例 5、hdfs sink
        http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
    
        配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    *** 只修改上一个 spool sink 的配置代码块 a1.sinks.k1.type = logger
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
    
    ## 每隔 60s 或者文件大小超过 10M 的时候产生新文件
    # hdfs 有多少条消息时新建文件,0 不基于消息个数
    a1.sinks.k1.hdfs.rollCount=0
    # hdfs 创建多长时间新建文件,0 不基于时间
    a1.sinks.k1.hdfs.rollInterval=60
    # hdfs 多大时新建文件,0 不基于文件大小
    a1.sinks.k1.hdfs.rollSize=10240
    # 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
    a1.sinks.k1.hdfs.idleTimeout=3
    
    a1.sinks.k1.hdfs.fileType=DataStream
   #时间参数一定要带上 true
    a1.sinks.k1.hdfs.useLocalTimeStamp=true
    
    ## 每五分钟生成一目录:
    # 是否启用时间上的” 舍弃”,这里的” 舍弃”,类似于” 四舍五入”,后面再介绍。如果启用,则会影响除了 % t 的其他所有时间表达式
    a1.sinks.k1.hdfs.round=true
    # 时间上进行 “舍弃” 的值;
    a1.sinks.k1.hdfs.roundValue=5
    # 时间上进行” 舍弃” 的单位,包含:second,minute,hour
    a1.sinks.k1.hdfs.roundUnit=minute

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1(将 source,channel,sink 关联)
    ############################################################
    创建 HDFS 目录
    hadoop fs -mkdir /flume
    
    启动 Flume
    flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

    查看 hdfs 文件
    hadoop fs -ls /flume/...
    hadoop fs -get /flume/...


http://flume.apache.org/

安装
1、上传
2、解压
3、修改 conf/flume-env.sh  文件中的 JDK 目录
 注意:JAVA_OPTS 配置  如果我们传输文件过大 报内存溢出时 需要修改这个配置项
4、验证安装是否成功  ./flume-ng version
5、配置环境变量
    export FLUME_HOME=/home/apache-flume-1.6.0-bin


Source、Channel、Sink 有哪些类型
    Flume Source
    Source 类型                   | 说明
    Avro Source                 | 支持 Avro 协议(实际上是 Avro RPC),内置支持
    Thrift Source               | 支持 Thrift 协议,内置支持
    Exec Source                 | 基于 Unix 的 command 在标准输出上生产数据
    JMS Source                   | 从 JMS 系统(消息、主题)中读取数据
    Spooling Directory Source | 监控指定目录内数据变更
    Twitter 1% firehose Source|    通过 API 持续下载 Twitter 数据,试验性质
    Netcat Source               | 监控某个端口,将流经端口的每一个文本行数据作为 Event 输入
    Sequence Generator Source | 序列生成器数据源,生产序列数据
    Syslog Sources               | 读取 syslog 数据,产生 Event,支持 UDP 和 TCP 两种协议
    HTTP Source                 | 基于 HTTP POST 或 GET 方式的数据源,支持 JSON、BLOB 表示形式
    Legacy Sources               | 兼容老的 Flume OG 中 Source(0.9.x 版本)

    Flume Channel
    Channel 类型       说明
    Memory Channel                | Event 数据存储在内存中
    JDBC Channel                  | Event 数据存储在持久化存储中,当前 Flume Channel 内置支持 Derby
    File Channel                  | Event 数据存储在磁盘文件中
    Spillable Memory Channel   | Event 数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件
    Pseudo Transaction Channel | 测试用途
    Custom Channel                | 自定义 Channel 实现

    Flume Sink
    Sink 类型     说明
    HDFS Sink             | 数据写入 HDFS
    Logger Sink           | 数据写入日志文件
    Avro Sink             | 数据被转换成 Avro Event,然后发送到配置的 RPC 端口上
    Thrift Sink           | 数据被转换成 Thrift Event,然后发送到配置的 RPC 端口上
    IRC Sink              | 数据在 IRC 上进行回放
    File Roll Sink         | 存储数据到本地文件系统
    Null Sink             | 丢弃到所有数据
    HBase Sink             | 数据写入 HBase 数据库
    Morphline Solr Sink | 数据发送到 Solr 搜索服务器(集群)
    ElasticSearch Sink     | 数据发送到 Elastic Search 搜索服务器(集群)
    Kite Dataset Sink     | 写数据到 Kite Dataset,试验性质的
    Custom Sink           | 自定义 Sink 实现



案例 1、 A simple example
    http://flume.apache.org/FlumeUserGuide.html#a-simple-example
    
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

启动 flume
flume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console

安装 telnet
yum install telnet
退出 ctrl+]  quit

Memory Chanel 配置
  capacity:默认该通道中最大的可以存储的 event 数量是 100,
  trasactionCapacity:每次最大可以 source 中拿到或者送到 sink 中的 event 数量也是 100
  keep-alive:event 添加到通道中或者移出的允许时间
  byte**:即 event 的字节量的限制,只包括 eventbody



案例 2、两个 flume 做集群

    node01 服务器中,配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = node1
    a1.sources.r1.port = 44444

    # Describe the sink
    # a1.sinks.k1.type = logger
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = node2
    a1.sinks.k1.port = 60000

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    node02 服务器中,安装 Flume(步骤略)
    配置文件
    ############################################################
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = node2
    a1.sources.r1.port = 60000

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    先启动 node02 的 Flume
    flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console
    
    再启动 node01 的 Flume
    flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
    
    打开 telnet 测试  node02 控制台输出结果


案例 3、Exec Source
        http://flume.apache.org/FlumeUserGuide.html#exec-source
        
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/flume.exec.log

    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    
    启动 Flume
    flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console
    
    创建空文件演示 touch flume.exec.log
    循环添加数据
    for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done
        
案例 4、Spooling Directory Source
        http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
    配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################

    启动 Flume
    flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

    拷贝文件演示
    mkdir logs
    cp flume.exec.log logs/


案例 5、hdfs sink
        http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
    
        配置文件
    ############################################################
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/logs
    a1.sources.r1.fileHeader = true

    # Describe the sink
    *** 只修改上一个 spool sink 的配置代码块 a1.sinks.k1.type = logger
    a1.sinks.k1.type=hdfs
    a1.sinks.k1.hdfs.path=hdfs://sxt/flume/%Y-%m-%d/%H%M
    
    ## 每隔 60s 或者文件大小超过 10M 的时候产生新文件
    # hdfs 有多少条消息时新建文件,0 不基于消息个数
    a1.sinks.k1.hdfs.rollCount=0
    # hdfs 创建多长时间新建文件,0 不基于时间
    a1.sinks.k1.hdfs.rollInterval=60
    # hdfs 多大时新建文件,0 不基于文件大小
    a1.sinks.k1.hdfs.rollSize=10240
    # 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
    a1.sinks.k1.hdfs.idleTimeout=3
    
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.useLocalTimeStamp=true
    
    ## 每五分钟生成一个目录:
    # 是否启用时间上的” 舍弃”,这里的” 舍弃”,类似于” 四舍五入”,后面再介绍。如果启用,则会影响除了 % t 的其他所有时间表达式
    a1.sinks.k1.hdfs.round=true
    # 时间上进行 “舍弃” 的值;
    a1.sinks.k1.hdfs.roundValue=5
    # 时间上进行” 舍弃” 的单位,包含:second,minute,hour
    a1.sinks.k1.hdfs.roundUnit=minute

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    ############################################################
    创建 HDFS 目录
    hadoop fs -mkdir /flume
    
    启动 Flume
    flume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console

    查看 hdfs 文件
    hadoop fs -ls /flume/...
    hadoop fs -get /flume/...

作业:
1、flume 如何收集 java 请求数据
2、项目当中如何来做? 日志存放 /log/ 目录下 以 yyyyMMdd 为子目录 分别存放每天的数据




flume+flume+kafka消息传递+storm消费

flume+flume+kafka消息传递+storm消费

通过flume收集其他机器上flume的监测数据,发送到本机的kafka进行消费。

环境:slave中安装flume,master中安装flume+kafka(这里用两台虚拟机,也可以用三台以上)

masterIP 192.168.83.128    slaveIP 192.168.83.129

通过监控test.log文件的变化,收集变化信息发送到主机的flume中,再发送到kafka中进行消费

1、配置slave1在flume中配置conf目录中的example.conf文件,没有就创建一个

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
#监控文件夹下的test.log文件 a1.sources.r1.command
= tail -F /home/qq/pp/data/test.log a1.sources.r1.channels = c1 # Describe the sink ##sink端的avro是一个数据发送者 a1.sinks = k1 ##type设置成avro来设置发消息 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 ##下沉到master这台机器 a1.sinks.k1.hostname = 192.168.83.133 ##下沉到mini2中的44444 a1.sinks.k1.port = 44444 a1.sinks.k1.batch-size = 2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

2、master上配置flume/conf里面的example.conf(标红的注意下)

#me the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##source中的avro组件是一个接收者服务
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
#a1.sinks.k1.type = logger
#对于sink的配置描述 使用kafka做数据的消费
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_kafka
a1.sinks.k1.brokerList = 192.168.83.128:9092,192.168.83.129:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

3、向监听文件写入字符串(程序循环写入,不用手动修改test.log文件了)

[root@s1 # cd /home/qq/pp/data
[root@s1 home/qq/pp/data# while true
> do
> echo "toms" >> test.log
> sleep 1
> done

4、查看上面的程序是否执行

#cd /home/qq/pp/data
#tail -f test.log

5、打开消息接收者master的flume

进入flume安装目录,执行如下语句

bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console

现在回打印出一些信息

6、启动slave的flume

进入flume安装目录,执行如下语句

bin/flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console

7、 进入master ---kafka安装目录

    1)启动zookeeper

      bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

    2)启动kafka服务

      bin/kafka-server-start.sh -daemon config/server.properties 

    3)创建topic

kafka-topics.sh --create --topic flume_kafka  --zookeeper 192.168.83.129:2181,192.168.83.128:2181 --partitions 2 --replication-factor 1

    4)创建消费者

bin/kafka-console-consumer.sh --bootstrap-server 192.168.83.128:9092,192.168.83.129:9092 --topic flume_kafka --from-beginning

    5)然后就会看到消费之窗口打印写入的信息,  

                    

8、此时启动 eclipse实例(https://www.cnblogs.com/51python/p/10908660.html),注意修改ip以及topic

 如果启动不成功看看是不是kafka设置问题(https://www.cnblogs.com/51python/p/10919330.html第一步虚拟机部署)

   启动后会打印出结果(这是第二次测试不是用的toms而是hollo word测试的,此处只是一个实例)

 

 

 

 ok!一个流程终于走完了!

 

 

参考: 

https://blog.csdn.net/luozhonghua2014/article/details/80369469?utm_source=blogxgwz5

https://blog.csdn.net/wxgxgp/article/details/85701844

https://blog.csdn.net/tototuzuoquan/article/details/73203241

 

flume+log4j+hdfs(日志通过flume传到hdfs)

flume+log4j+hdfs(日志通过flume传到hdfs)

flume+log4j+hdfs(日志通过flume传到hdfs)

  • log4j 日志生成.
  • flume 日志收集系统,收集日志.
  • HDFS Hadoop分布式文件系统,存储日志,使用版本hadoop-3.0.0-alpha1.tar.gz 本文档采用伪分布式方式进行试验,后期进行集群测试.

hdfs 伪分布式安装见博文[hadoop基础环境搭建]一文。

flume安装参考链接


系统环境:centos6.5 linux 64系统 

  1. 官网下载apache-flume-1.6.0-bin.tar.gz
  2. 压缩包上传至/tmp目录,解压缩至/opt/目录
tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /opt/

3.配置flume的环境变量: 修改 /etc/profile(~/.bashrc)文件

export FLUME_HOME=/home/connect/software/flume
export PATH=$FLUME_HOME/bin:$PATH

修改生效:

source /etc/profile(~/.bashrc)

4 进入apahce-flume的bin目录下:

cd /opt/apache-flume-1.6.0-bin/bin

5 运行脚本程序:

./flume-ng version

如果出现版本号,则表明安装成功

6 使用实例: 在/opt/ apache-flume-1.6.0-bin/conf目录创建example.conf文件,内容如下:

# example.conf: A single-node Flume configuration  
  
# Name the components on this agent  
tier1.sources=source1  
tier1.channels=channel1  
tier1.sinks=sink1  
  
tier1.sources.source1.type=avro  
tier1.sources.source1.bind=0.0.0.0  
tier1.sources.source1.port=44444  
tier1.sources.source1.channels=channel1  
  
tier1.channels.channel1.type=memory  
tier1.channels.channel1.capacity=10000  
tier1.channels.channel1.transactionCapacity=1000  
tier1.channels.channel1.keep-alive=30  
  
tier1.sinks.sink1.type=hdfs  
tier1.sinks.sink1.channel=channel1  
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events  
tier1.sinks.sink1.hdfs.fileType=DataStream  
tier1.sinks.sink1.hdfs.writeFormat=Text  
tier1.sinks.sink1.hdfs.rollInterval=0  
tier1.sinks.sink1.hdfs.rollSize=10240  
tier1.sinks.sink1.hdfs.rollCount=0  
tier1.sinks.sink1.hdfs.idleTimeout=60  

然后,启动flume,在目录/opt/ apache-flume-1.6.0-bin下,运行flume

flume-ng  agent -c ../conf -f ../conf/flume_kafka.conf -Dflume.root.logger=INFO,console -n tier1 > ../logs/flume.log 2>&1 &

参数说明:

  • n 指定agent名称

  • c 指定配置文件目录

  • f 指定配置文件

  • Dflume.root.logger=DEBUG,console 设置日志等级

  • 然后idea新建maven工程

    package com.besttone.flume;  
  
    import java.util.Date;  
  
    import org.apache.commons.logging.Log;  
    import org.apache.commons.logging.LogFactory;  
  
    public class WriteLog {  
    protected static final Log logger = LogFactory.getLog(WriteLog.class);  
  
    /** 
     * [@param](https://my.oschina.net/u/2303379) args 
     * [@throws](https://my.oschina.net/throws) InterruptedException  
     */  
    public static void main(String[] args) throws InterruptedException {  
        // TODO Auto-generated method stub  
        while (true) {  
        //每隔两秒log输出一下当前系统时间戳  
            logger.info(new Date().getTime());  
            Thread.sleep(2000);  
        }  
    }  
}  

对应的pom文件为:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>flumeTest</groupId>
  <artifactId>test</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>test</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.21</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.6.0</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-war-plugin</artifactId>
        <version>2.6</version>
        <configuration>
          <warSourceDirectory>WebContent</warSourceDirectory>
          <failOnMissingWebXml>false</failOnMissingWebXml>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.5</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

log4j配置文件为:

### set log levels ###
log4j.rootLogger=INFO, stdout, file, flume  
log4j.logger.per.flume=INFO  

### flume ###
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender  
log4j.appender.flume.layout=org.apache.log4j.PatternLayout  
log4j.appender.flume.Hostname=10.37.167.204
log4j.appender.flume.Port=44444  

### stdout ###
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.Threshold=INFO  
log4j.appender.stdout.Target=System.out  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n  

### file ###
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender  
log4j.appender.file.Threshold=INFO  
log4j.appender.file.File=./logs/tracker/tracker.log  
log4j.appender.file.Append=true  
log4j.appender.file.DatePattern=''.''yyyy-MM-dd  
log4j.appender.file.layout=org.apache.log4j.PatternLayout  
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

然后写一个运行脚本:

#!/bin/sh
jarlist=$(ls  /../flume/lib/*.jar)
CLASSPATH =/.../flume/test-1.0-SNAPSHOT.jar
for jar in ${jarlist}
do
    CLASSPATH=${CLASSPATH}:${jar}
done
echo ${CLASSPATH}

java -classpath $CLASSPATH flumeTest.WriteLog

最后打包发送到服务器,然后运行就ok了。

程序成功运行会出现如下画面:

 2018-01-08 21:11:40 WriteLog [INFO] 1515417100168
  2018-01-08 21:11:42 WriteLog [INFO] 1515417102169
  2018-01-08 21:11:44 WriteLog [INFO] 1515417104170
  2018-01-08 21:11:46 WriteLog [INFO] 1515417106172
  2018-01-08 21:11:48 WriteLog [INFO] 1515417108175
  2018-01-08 21:11:50 WriteLog [INFO] 1515417110177
  2018-01-08 21:11:52 WriteLog [INFO] 1515417112178
  2018-01-08 21:11:54 WriteLog [INFO] 1515417114180
  2018-01-08 21:11:56 WriteLog [INFO] 1515417116181
  2018-01-08 21:11:58 WriteLog [INFO] 1515417118183
  2018-01-08 21:12:00 WriteLog [INFO] 1515417120184
  2018-01-08 21:12:02 WriteLog [INFO] 1515417122185
  2018-01-08 21:12:04 WriteLog [INFO] 1515417124186
  2018-01-08 21:12:06 WriteLog [INFO] 1515417126188
  2018-01-08 21:12:08 WriteLog [INFO] 1515417128189
  2018-01-08 21:12:10 WriteLog [INFO] 1515417130191
  2018-01-08 21:12:12 WriteLog [INFO] 1515417132192
  2018-01-08 21:12:14 WriteLog [INFO] 1515417134193
  2018-01-08 21:12:16 WriteLog [INFO] 1515417136194
  2018-01-08 21:12:18 WriteLog [INFO] 1515417138196
  2018-01-08 21:12:20 WriteLog [INFO] 1515417140197
  2018-01-08 21:12:22 WriteLog [INFO] 1515417142198

5 最后进入hdfs或在浏览器中查看日志是否进入hdfs中输入:

http:localhost:9870
  • [ ] 问题(提示hadoop不能加载本地库)

  • image

  • [ ] 问题2

  • 在linux下,不可避免的会用VIM打开一些windows下编辑过的文本文件。我们会发现文件的每行结尾都会有一个^M$符号,这是因为 DOS下的编辑器和Linux编辑器对文件行末的回车符处理不一致

  • 解决方案: 1 (个人认为是最方便的) 在终端下敲命令:

dos2unix filename 

直接转换成unix格式,就OK了.当出现不能用的时候,则说明dos2unix没有被安装,所以需要:

yum install dos2unix -y

然后继续运行就可以了


  • [ ] 问题3
  • 问题描述: 在hadoop伪分布式中,第一次NameNode格式化后,启动start-dfs.sh后,通过jps查看,namenode/datanode/namesecondly进程都启动了,而当再次格式化namenode之后,DataNode启动不起来了,可以查看问题相似链接 解决方案: 查看core-site.xml中所写的地址/opt/temp/lih-temp中dfs中的name,data,与namesecondary三者中的current中的VERSION中的关系,具体如下两图所示:
  • 正确的图 R-image
  • 错误的图 link

当第二次对namenode进行格式化时,必须要求namenode的clusterID与datanode的clusterID相同,而namesecondary的就不要求了


  • [ ] 4 问题描述:在安装好hadoop之后,配置文件设置如链接所示,当利用wordcount来进行计算: 下文用到的链接
在目录中先新建两个txt文档(如上链接)
#./hdfs dfs -mkdir /hdfsInput

[bin]# ./hadoop jar ../share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha1.jar wordcount /hdfsInput /hdfsOutput

当利用hadoop运行mapreduce的jar程序时,出现了如下问题: image

产生问题的原因: 路径设置不正确

解决办法: 在mapred-site.xm文件中添加(mapreduce的端口为:8088还有8042,虽然不知道是什么,但是后面会知道的):

<property>
    <name>mapreduce.application.classpath</name>
        <value>
                /opt/hadoop/share/hadoop/hdfs/*,
                /opt/hadoop/share/hadoop/hdfs/lib/*,
                /opt/hadoop/share/hadoop/mapreduce/*,
                /opt/hadoop/share/hadoop/mapreduce/lib/*,
                /opt/hadoop/share/hadoop/yarn/*,
                /opt/hadoop/share/hadoop/yarn/lib/*
        </value>
<property>

参考各种网站:

  1. 参考网站1

  2. 参考网站2

  3. 重点参看1

  4. 重点参考2

  5. 参考链接4

  6. hadoop集群运行jar


  • [ ] 拓展知识点

对项目进行打jar包,并利用命令对其进行运行

  1. 利用命令对项目进行打包jar并且进行运行: 编写java程序:
public class Helloword {
    public static void main(String[] args) {
      System.out.println("Hello word!!");
    }
}

然后进行编译:

javac Helloword.java
java Helloword

然后 使用
jar -cvf hello.jar Helloword.class
即将Helloword打成了hello.jar,然后进行运行
java -jar hello.jar

jar -cvf hello.war Helloword.class
将Helloword打成war包
java -jar hello.war
  1. 利用intellij idea进行打包
  • [对一般工程](http://jingyan.baidu.com/article/f25ef254a829a6482c1b8224.html)
    
  • 如果对含有依赖jar的maven项目,需要将maven中的依赖也打成jar包,需要在pom文件中添加相应的依赖:在首先要在pom里<dependencies>和<repositories>间增加<bulid>属性,build配置信息如下:
    
<build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <appendAssemblyId>false</appendAssemblyId>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
          <archive>
            <manifest>
              <mainClass>LIHAO.Helloword</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>assembly</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

然后利用:

mvn clean package install -Dmaven.test.skip -X

即可

关于Flume02——发送数据到Flumeflume sink 到文件的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)、Flume 篇 ---Flume 安装配置与相关使用、flume+flume+kafka消息传递+storm消费、flume+log4j+hdfs(日志通过flume传到hdfs)等相关知识的信息别忘了在本站进行查找喔。

本文标签: