GVKun编程网logo

java influx DB工具类(java influxdb示例)

4

想了解javainfluxDB工具类的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于javainfluxdb示例的相关问题,此外,我们还将为您介绍关于GaussDB(forInflux)与开

想了解java influx DB工具类的新动态吗?本文将为您提供详细的信息,我们还将为您解答关于java influxdb示例的相关问题,此外,我们还将为您介绍关于GaussDB(for Influx)与开源企业版性能对比、grafana+prometheus 使用 influx DB、Grafana/Influx Db 对数据源的身份验证失败、influx - java client的新知识。

本文目录一览:

java influx DB工具类(java influxdb示例)

java influx DB工具类(java influxdb示例)

配置

application-properties:

spring.influxdb.url=${influxdb_host:127.0.0.1}
spring.influxdb.port=${influxdb_port:8086}
spring.influxdb.username=${influxdb_username:root}
spring.influxdb.password=${influxdb_password:root}
spring.influxdb.database=${influxdb_database:test}

InfluxDBConfig

import com....utils.InfluxDBConnect;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@ConfigurationProperties("application.properties")
@Configuration
public class InfluxDBConfig {

    @Value("${spring.influxdb.username}")
    private String username;
    @Value("${spring.influxdb.password}")
    private String password;
    @Value("${spring.influxdb.url}")
    private String url;
    @Value("${spring.influxdb.port}")
    private String port;
    @Value("${spring.influxdb.database}")
    private String database;

    @Bean
    public InfluxDBConnect getInfluxDBConnect(){
        InfluxDBConnect influxDB = new InfluxDBConnect(username,password,"http://"+url+":"+port,database);

        influxDB.influxDbBuild();

        influxDB.createRetentionPolicy();
        return influxDB;
    }

}

工具类

influxDBConnect类:

import lombok.Data;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import java.util.Map;
import java.util.concurrent.TimeUnit;



@Data
public class InfluxDBConnect {
    private String username;// 用户名
    private String password;// 密码
    private String openurl;// 连接地址
    private String database;// 数据库

    private InfluxDB influxDB;



    public InfluxDBConnect(String username,String password,String openurl,String database) {
        this.username = username;
        this.password = password;
        this.openurl = openurl;
        this.database = database;
    }

    /** 连接时序数据库;获得InfluxDB **/
    public InfluxDB influxDbBuild() {
        if (influxDB == null) {
            influxDB = InfluxDBFactory.connect(openurl,username,password);
            influxDB.createDatabase(database);

        }
        return influxDB;
    }

    /**
     * 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT
     * 表示 设为默认的策略
     */
    public void createRetentionPolicy() {
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT","defalut",database,"7200d",1);
        this.query(command);
    }

    /**
     * 查询
     *
     * @param command 查询语句
     * @return
     */
    public QueryResult query(String command) {
        return influxDB.query(new Query(command,database));
    }
    public QueryResult query(String command,TimeUnit unit) {
        return influxDB.query(new Query(command,database),unit);
    }

    /**
     * 插入
     * @param measurement 表
     * @param tags 标签
     * @param fields 字段
     */
    public void insert(String measurement,Map<String,String> tags,Object> fields,long time) {
        Point.Builder builder = Point.measurement(measurement);
        builder.time(time,TimeUnit.MILLISECONDS);
        builder.tag(tags);
        builder.fields(fields);
        influxDB.write(database,"",builder.build());
    }

    /**
     * 插入
      * @param measurement 表
       * @param tags 标签
       * @param fields 字段
        */
     public void insert(String measurement,Object> fields){
         Point.Builder builder = Point.measurement(measurement);
         builder.tag(tags);
         builder.fields(fields);

         influxDB.write(database,builder.build());
     }


    /**
     * 插入
     * @param batchPoints 批量插入
     */
    public void batchInsert(BatchPoints batchPoints){
        influxDB.write(batchPoints);
    }

    /**
     * 删除
     * @param command 删除语句
     * @return 返回错误信息
     */
    public String deleteMeasurementData(String command) {
        QueryResult result = influxDB.query(new Query(command,database));
        return result.getError();
    }

    /**
     * 创建数据库
     * @param dbname
     */
    public void createDB(String dbname) {
        influxDB.createDatabase(dbname);
    }

    /**
     * 删除数据库
     *
     * @param dbname
     */
    public void deleteDB(String dbname) {
        influxDB.deleteDatabase(dbname);
    }

}

 查询返回值包的层数很多,可以预处理

import lombok.extern.slf4j.Slf4j;
import org.influxdb.dto.QueryResult;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;



@Slf4j
public class InfluxdbUtil {

    public static List<Object> queryInfluxdb(String query_sql,String table_type,InfluxDBConnect influxDB) {
        long starttime = System.currentTimeMillis();
        QueryResult result = influxDB.query(query_sql.toString(),TimeUnit.MILLISECONDS);
        long endtime = System.currentTimeMillis();

        List<Object> influx_data_list = getInfluxData(result.getResults().get(0),table_type);

        return influx_data_list;

    }
    public static List<Object> getInfluxData(QueryResult.Result result,String table_type) {
        List<Object> influx_data_list = null;
        if (null != result) {
            influx_data_list = new ArrayList<>();

            List<QueryResult.Series> series = result.getSeries();
            if (null != series && series.size() > 0) {
                for (QueryResult.Series serie : series) {
                    List<List<Object>> values = serie.getValues();

                    List<Object> result_list = getInfluxDataAndBuild(values,serie,table_type);

                    influx_data_list.addAll(result_list);
                }
            }
        }

        return influx_data_list;
    }

    public static  List<Object> getInfluxDataAndBuild(List<List<Object>> values,QueryResult.Series
            serie,String table_type) {
        List<Object> influx_data_list = new ArrayList<>();
        List<String> influx_cloumns = serie.getColumns();
        Map<String,String> influx_tags = serie.getTags();

        Map<String,Object> build_maps;

        if (values.size() > 0) {

            for (int i = 0; i < values.size(); i++) {
                build_maps = new LinkedHashMap<>();
                if (null != influx_tags) {

                    for (Map.Entry<String,String> entry : influx_tags.entrySet()) {
                        String entry_key = entry.getKey();
                        if (entry_key.contains("tag_")) {
                            entry_key = entry_key.substring(4);
                        }
                        build_maps.put(entry_key,entry.getValue());
                    }
                }
                Object[] influx_Obj = values.get(i).toArray();

                for (int j = 0; j < influx_Obj.length; j++) {
                    String build_maps_key = influx_cloumns.get(j);
                    if (build_maps_key.equals("time")) {
                        if (table_type.equals("normal")) {
                            BigDecimal bd = new BigDecimal(String.valueOf(influx_Obj[j]));
                            build_maps.put("timestamp",bd.toPlainString());
                        } else if (table_type.equals("polymerize")) {
                            //  build_maps.put("timestamp",DateUtil.datetoStampAndAddMinute(String.valueOf(influx_Obj[j])));
                        }
                    } else {
                        if (build_maps_key.contains("tag_")) {
                            build_maps_key = build_maps_key.substring(4);
                        }

                        build_maps.put(build_maps_key,influx_Obj[j]);
                    }
                }

                influx_data_list.add(build_maps);
            }
        }
        return influx_data_list;
    }
}

GaussDB(for Influx)与开源企业版性能对比

GaussDB(for Influx)与开源企业版性能对比

摘要:相比于企业版InfluxDB,GaussDB(for Influx)能为客户提供更高的写入能力、更稳定的查询能力、更高的数据压缩率,高效满足各大时序应用场景需求。

本文分享自华为云社区《华为云GaussDB(for Influx)揭秘第八期:GaussDB(for Influx)与开源企业版性能对比》,作者:高斯Influx官方博客 。

“你们的数据库性能怎么样?”

“能不能满足我们的业务?”

“和其他数据库对比性能有优势么?”

客户在使用数据库时常有这样的担心和疑问。

本文从测试方案、测试工具、测试场景、测试结果等方面详细介绍了GaussDB(for Influx)和开源InfluxDB集群在X86架构下的性能测试情况。测试结果显示,GaussDB(for Influx)较企业版InfluxDB集群能提供更高的写入性能、更低的访问延迟以及更高的数据压缩率。

1. 测试方案

1.1 资源配置

服务端配置

1.2 测试工具

测试工具为开源性能工具TS-benchMark。

2. 测试设计

2.1 测试模型

本次测试采用风力发电数据模型,每个风场50个设备,每个设备50个传感器,1个风场1个线程,通过load数据的线程数来控制时间线的大小,通过收集时间的长短来控制数据量。

模型每条数据大小约为24字节,具体的类型如下:

Timestamp | farm | device |sensor | value

2.2 测试数据量

测试数据分为两个场景,大数据量和小数据量,具体数据量如下:

注:企业版InfluxDB在插入到47亿数据时OOM,以下性能对比都基于此数据量。

2.3 测试场景

2.3.1 数据写入场景

  • batch_size(每个批次写入的数据量) 固定为50,线程数分别从1、2、4、8、16、32、64、128、256、512 递增;
  • 线程数(客户端并发请求的连接数)固定为8, batch_size分别从50、100、150、200、250、300 递增。

2.3.2 数据查询场景

单线程进行不同语句的查询,并统计其时延信息。

第一类查询: 所有TAG查询

select * 
from sensor 
where f=''f1'' and d=''d2'' and s=''s1'' and time>=1514768400000000000 and time<=1514772000000000000

第二类查询: TAG + VALUE查询

select * 
from sensor 
where f=''f1'' and s=''d2'' and value>=3.0 and time>=1514768400000000000 and time<1514854800000000000

第三类查询: 聚合查询

select mean(value) 
from sensor 
where f=''f1'' and s=''s1'' and time>=1514768400000000000 and time<=1514854800000000000 group by f,d,s,time(1h)

第四类查询: 或条件查询

select * 
from sensor 
where f=''f1'' and (s=''s1'' or s=''s2'' or s=''s3'' or s=''s4'' or s=''s5'') and time>=1514768400000000000 and time<=1514769150000000000

第五类查询: 单个TAG查询

select * 
from sensor 
where f=''f1'' and time>=1514768400000000000 and time<=1514769150000000000

3. 测试结果分析

3.1 写入吞性能比对

在小数据量场景下,GaussDB(for Influx)的写入性能是企业版InfluxDB的13倍左右,在大数据量的场景下可以达到1.8倍左右。

3.2 查询性能对比

1)第一类查询(所有TAG查询):无论是大数据量还是小数据量场景下,GaussDB(for Influx)的吞吐量是开源InfluxDB企业版的2倍左右。

2)第二类查询(TAG + VALUE查询):在小数据量场景下,开源InfluxDB企业版性能高于GaussDB(for Influx),GaussDB(for Influx)在大数据量和小数据量场景下性能基本持平。

3)第三类查询(聚合查询):GaussDB(for Influx)查询性能明显优于开源InfluxDB企业版,在小数据量场景下是开源版本的14倍,大数据量下也是开源版本的8倍左右。

4)第四类查询(或条件查询):GaussDB(for Influx)查询性能在两种场景下比较稳定,开源企业版InfluxDB在两种场景下差异较大;GaussDB(for Influx)在小数据量场景下表现优于开源版,在大数据量场景下低于开源版。

5)第五类查询(单个TAG查询):GaussDB(for Influx)查询性能在两种场景下比较稳定,在大数据量场景下低于开源版。

3.3 数据压缩率对比

在250万时间线场景下,GaussDB(for Influx)导入了151亿条数据,导入前数据大小为337.5G,导入后为49.8G,压缩率为6.8;开源企业版导入了47亿条数据,导入前105G,导入后21.3G,压缩率为4.9。GaussDB(for Influx)压缩率是开源企业版的1.4倍左右。

Influx引擎采用LSM tree架构,随着后台compaction的进行,压缩率会进一步提升,当前数据对比是数据刚导入时的结果。

4. 总结

在GaussDB(for Influx)2节点对比开源版3节点场景下,GaussDB(for Influx)给客户带来了更高的写入能力、更稳定的查询能力、更高的压缩率。GaussDB(for Influx)写入能力在小数据量场景下是开源企业版的13倍,在大数据量场景下是开源企业版的1.8倍;查询能力在两种场景下表现稳定,在大部分查询场景下优于开源企业版;在压缩率方面,同样数据模型下,高出开源版本40%。
除了以上优势外,GaussDB(for Influx)还在集群化、冷热分级存储、高可用方面也做了深度优化,能更好地满足时序应用的各种场景。

5. 结束

本文作者:华为 云数据库创新Lab & 华为云时空数据库团队
更多技术文章,关注GaussDB(for Influx)官方博客
https://bbs.huaweicloud.com/community/usersnew/id_1586596796288328
Lab官网https://www.huaweicloud.com/lab/clouddb/home.html
产品首页https://www.huaweicloud.com/product/gaussdbforinflux.html
欢迎加入我们!
云数据库创新Lab(成都、北京)简历投递邮箱:xiangyu9@huawei.com
华为云时空数据库团队(西安、深圳)简历投递邮箱:yujiandong@huawei.com

 

点击关注,第一时间了解华为云新鲜技术~

grafana+prometheus 使用 influx DB

grafana+prometheus 使用 influx DB

如何解决grafana+prometheus 使用 influx DB

我们现有的 Grafana 仪表板使用 Prometheus 作为源并使用 PromQL 查询数据。 由于我们希望将数据保留更长的时间(例如:2个月)并且数据量非常大,我们将 Prometheus 切换到 Influx DB,使用 remote_write 和 remote_read: https://devopstales.github.io/monitoring/prometheus-influxdb/

是否可以切换 Grafana 仪表板以使用存储在我们的配置和现有仪表板中的 influx 中的数据,并使用现有的 PromQL 查询从 InfluxDB 检索数据?

预先感谢您的帮助

Grafana/Influx Db 对数据源的身份验证失败

Grafana/Influx Db 对数据源的身份验证失败

如何解决Grafana/Influx Db 对数据源的身份验证失败

我正在尝试集成 grafana 和 influxdb 以获得一些指标。但不确定当我尝试测试它时,对数据源的身份验证失败。请帮我解决这个问题。

这是下面的 yaml 和 conf 文件。

Docker-compose 文件

  1. version: "3"
  2. services:
  3. grafana:
  4. image: grafana/grafana
  5. container_name: grafana
  6. restart: always
  7. ports:
  8. - 3000:3000
  9. networks:
  10. - monitoring
  11. volumes:
  12. - grafana-volume:/vol01/Docker/monitoring
  13. environment:
  14. - GF_LOG_LEVEL=debug
  15. - GF_DATAPROXY_LOGGING=true
  16. - GF_DATAPROXY_TIMEOUT=60
  17. influxdb:
  18. image: influxdb
  19. container_name: influxdb
  20. restart: always
  21. ports:
  22. - 8086:8086
  23. networks:
  24. - monitoring
  25. volumes:
  26. - influxdb-volume:/vol01/Docker/monitoring
  27. environment:
  28. - INFLUXDB_DB=telegraf
  29. - INFLUXDB_USER=telegraf
  30. - INFLUXDB_ADMIN_ENABLED=true
  31. - INFLUXDB_HTTP_AUTH_ENABLED=false
  32. - INFLUXDB_ADMIN_USER=admin
  33. - INFLUXDB_ADMIN_PASSWORD=Welcome1
  34. - GF_LOG_LEVEL=debug
  35. - GF_DATAPROXY_LOGGING=true
  36. telegraf:
  37. image: telegraf
  38. container_name: telegraf
  39. restart: always
  40. extra_hosts:
  41. - "influxdb:18.216.224.127"
  42. environment:
  43. ST_PROC: /rootfs/proc
  44. HOST_SYS: /rootfs/sys
  45. HOST_ETC: /rootfs/etc
  46. volumes:
  47. - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
  48. - /var/run/docker.sock:/var/run/docker.sock:ro
  49. - /sys:/rootfs/sys:ro
  50. - /proc:/rootfs/proc:ro
  51. - /etc:/rootfs/etc:ro
  52. networks:
  53. monitoring:
  54. volumes:
  55. grafana-volume:

配置文件

  1. [global_tags]
  2. [agent]
  3. interval = "60s"
  4. round_interval = true
  5. metric_batch_size = 1000
  6. metric_buffer_limit = 10000
  7. collection_jitter = "0s"
  8. flush_interval = "10s"
  9. flush_jitter = "0s"
  10. precision = ""
  11. hostname = "18.216.224.127"
  12. omit_hostname = false
  13. [[outputs.influxdb]]
  14. urls = ["http://18.216.224.127:8086"]
  15. database = "telegraf"
  16. timeout = "5s"
  17. username = "telegraf"
  18. password = "Welcome1"
  19. [[inputs.ping]]
  20. interval = "5s"
  21. urls = ["192.168.0.44","192.168.0.131","192.168.0.130","google.com","amazon.com","github.com"]
  22. count = 4
  23. ping_interval = 1.0
  24. timeout = 2.0
  25. [[inputs.cpu]]
  26. percpu = true
  27. totalcpu = true
  28. collect_cpu_time = false
  29. report_active = false
  30. [[inputs.disk]]
  31. ignore_fs = ["tmpfs","devtmpfs","devfs","iso9660","overlay","aufs","squashfs"]
  32. [[inputs.diskio]]
  33. [[inputs.kernel]]
  34. [[inputs.mem]]
  35. [[inputs.processes]]
  36. [[inputs.swap]]
  37. [[inputs.system]]

错误

t=2021-05-08T11:02:29+0000 lvl=info msg="数据源认证失败" logger=data-proxy-log userId=1 orgId=1 uname=admin path=/api/datasources /proxy/1/query remote_addr=108.237.178.97 referer=http://18.216.224.127:3000/datasources/edit/1/ body="{"code":"unauthorized","message":"Unauthorized"}"状态码=401 t=2

influx - java client

influx - java client

maven

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>${influxdb.version}</version>
</dependency>

spring application.yml

spring:
  influx:
    url: http://localhost:8086
    user: canaan
    password: 123456
    database: test

java junit test

import lombok.Data;
import lombok.experimental.Accessors;
import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBImpl;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;

class InfluxDBTest {

    static InfluxDB influxDB = null;

    @BeforeAll
    static void before() {
        influxDB = influxDB();
        influxDB.setDatabase("test");
        influxDB.setRetentionPolicy("autogen");
//        influxDB.setLogLevel()
//        influxDB.ping()
//        influxDB.version()
//        influxDB.flush();
//        influxDB.close();
//        influxDB.disableBatch();
//        influxDB.disableGzip()
//        influxDB.enableBatch()
//        influxDB.enableGzip()
//        influxDB.isGzipEnabled()
//        influxDB.isBatchEnabled()
    }

    public static InfluxDB influxDB() {
        return new InfluxDBImpl("http://localhost:8086", "username", "password",
                new OkHttpClient.Builder());
    }

    @Test
    void query() {
//        Query query = new Query("select * from test.autogen.device_status");
//        Query query = new Query("select * from autogen.device_status");
        Query query = new Query("select * from device_status");
        QueryResult queryResult = influxDB.query(query);
        for (QueryResult.Result result : queryResult.getResults()) {
            for (QueryResult.Series series : result.getSeries()) {
                System.out.println(series);
            }
        }
    }

    @Test
    void write() {
        DeviceStatus deviceStatus = new DeviceStatus()
                .setStatus("2")
                .setDeviceCode("YB002")
                .setDuration(56L);

        Point point = Point.measurementByPOJO(DeviceStatus.class)
                .addFieldsFromPOJO(deviceStatus)
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .build();
        influxDB.write("test", "autogen", point);
    }


    @Data
    @Accessors(chain = true)
    @Measurement(name = "device_status")
    public static class DeviceStatus {
        /*field*/
        @Column(name = "duration")
        private Long duration;

        /*tag*/
        @Column(name = "device_code", tag = true)
        private String deviceCode;
        @Column(name = "status", tag = true)
        private String status;
    }

}

今天关于java influx DB工具类java influxdb示例的讲解已经结束,谢谢您的阅读,如果想了解更多关于GaussDB(for Influx)与开源企业版性能对比、grafana+prometheus 使用 influx DB、Grafana/Influx Db 对数据源的身份验证失败、influx - java client的相关知识,请在本站搜索。

本文标签: