(十一)JAVA操作InfluxDB

04-26 725阅读

以下内容来自 尚硅谷,写这一系列的文章,主要是为了方便后续自己的查看,不用带着个PDF找来找去的,太麻烦!

第 11 章 JAVA操作InfluxDB

1、InfluxDB客户端可以参考:https://github.com/influxdata/influxdb-client-java

11.1 创建一个maven项目

1、这里我创建了一个名为java4influx的maven项目

(十一)JAVA操作InfluxDB

11.2 导入maven依赖

1、在pom.xml里加入如下依赖。

(十一)JAVA操作InfluxDB

 
 com.influxdb
 influxdb-client-java
 6.5.0
 

刷新一下maven,下载依赖。

11.3 创建一个package

1、在src/main/java下创建一个package。这里名为com.atguigu.influxdb.client。最后,项目结构如图所示

(十一)JAVA操作InfluxDB

11.4 示例:查看InfluxDB健康状态

11.4.1 创建ExampleHealthy类

1、如图所示

(十一)JAVA操作InfluxDB

11.4.2 创建客户端对象

1、influxdb-client-java内部其实封装的是各种HTTP请求,所以 token,org什么 HTTPAPI上需要的东西,在创建客户端的时候都需要考虑。

(十一)JAVA操作InfluxDB

2、从上图可以看到InfluxDBClientFactory.create方法其实有多种重载。这是因为不同的接口它需要的权限和操作的范围不同。比如一个读写权限的token,它只能对某个存储桶进行操作,那么建立连接时就应该指定bucket,也就是使用下图的重载。

(十一)JAVA操作InfluxDB

3、但如果你用的是操作员token,希望完成一些创建组织,删除用户的操作,那么就不应该在创建连接时指定存储桶。此时,应该使用下图所示的重载。

(十一)JAVA操作InfluxDB

4、不过、检查InfluxDB的健康状态不需要任何权限和token。此时,我们只需指定一个URL,那就可以使用下图所示的重载了。

(十一)JAVA操作InfluxDB

5、我这里是在Ubuntu上演示,目标URL是http://localhost:8086 。所以最终代码如下。

InfluxDBClient对象就是我们的客户端对象。

InfluxDBClient可以返回各种Api对象。

InfluxDBClient influxDBClient =  InfluxDBClientFactory.create("http://localhost:8086");

6、如下图所示。这体现了java对InfluxDB HTTP API的封装。

(十一)JAVA操作InfluxDB

11.4.3 调用API

1、一些简单的api,也可以通过InfluxDBClient对象直接调用。比如我们的检查InfluxDB健康状态,就可以直接调用InfluxDBClient对象的ping方法。如下所示。

System.out.println(influxDBClient.ping());

(十一)JAVA操作InfluxDB

11.4.4 运行

1、ping方法会返回一个布尔值。如果InfluxDB可以ping通,那么就会得到true,否则返回flase,并记录一条失败日志。

(十一)JAVA操作InfluxDB

11.4.5 补充

1、在之前的版本,有一个测试InfluxDB是否健康的API叫做health,不过现在这个接口已经被标记为废弃。health方法返回一个HealthCheck对象,相对而言,对这个对象的处理比直接处理布尔值要麻烦很多。在以后的版本,提倡用ping方法检查健康状态。

(十一)JAVA操作InfluxDB

11.5 示例:查询InfluxDB中的数据

11.5.1 创建一个JAVA类

1、在com.atguigu.influxdb.client下创建一个新的java类,ExampleQuery。

(十一)JAVA操作InfluxDB

11.5.2 加入一个main方法

1、稍后main方法里面会写我们的查询逻辑。

(十一)JAVA操作InfluxDB

11.5.3 创建InfluxDB客户端对象

1、这次我们要操作InfluxDB中具体存储桶的数据,建立连接时,推荐选择图中的重载方法。

(十一)JAVA操作InfluxDB

2、这个方法需要 4 个参数。

参数说明
urlInfluxDB服务的URL,在老师这里就是http://localhost:8086。
token授权的token,而且类型还必须得是char[ ]。
org指定要访问的组织。
bucket要访问的存储桶。

3、现在,我们在ExampleQuery下声明 4 个静态变量。如下图所示:

(十一)JAVA操作InfluxDB

11.5.4 获取查询API对象

1、使用InfluxDBClient对象点一下,可以看到InfluxDBClient其实提供了两种API。这也是为了兼容性来考虑的。InfluxQLQueryAPI是InfluxDB2.x做的向前兼容。这里我们选择第一个方法,也就是getQueryApi,这意味着我们使用v2 api进行查询。

(十一)JAVA操作InfluxDB

11.5.5 了解查询API

1、概括性地说,QueryApi对象下有两个方法query和queryRaw。

(十一)JAVA操作InfluxDB

2、两个方法都需要传入一个FLUX脚本作为查询语句。但主要的不同点在于返回的结果上。

  • queryRaw 方法返回API中的CSV格式数据(String类型)。
  • query 方法视图将查询后的结果封装为各种对象(可以自己指定也可以使用influxdb-client-java提供的FluxTable)。

    3、两个方法各有很多不同的实现,其中一大部分是用来制定连接参数的,比如你创建连接对象的时候没有制定org和bucket,那么可以延迟到调用具体api的时候再指定。

    11.5.6 query

    1、现在,我们先用query方法去查询一下InfluxDB中的数据,现在我们要查询 test_init 存储桶最近 2 分钟的数据。代码如下:

    List query = queryApi.query("from(bucket:\"test_init\") |> range(start:-2m)");
    

    (十一)JAVA操作InfluxDB

    2、我们的查询结果List 其实对应了FLUX查询语言中的表流概念。我们可以打印一下query变量。

    (十一)JAVA操作InfluxDB

    3、现在,我们只取第一个FluxTable,看看里面有什么。

    (十一)JAVA操作InfluxDB

    4、之前我们讲FLUX的时候,有讲过表流和groupKey之间的关系。现在可以打印一下groupKey,看看里面有什么东西。

    代码如下:

    (十一)JAVA操作InfluxDB

    5、输出结果如下:

    (十一)JAVA操作InfluxDB

    6、这其实可以说明我们的整个表流是以_start,_stop,_field,_measurement 4 列为groupKey分组的结果。现在,我们可以尝试打印一下数据。代码如下:

    (十一)JAVA操作InfluxDB

    7、结果如下:

    (十一)JAVA操作InfluxDB

    8、剩下的功能大家可以自己探索。

    11.5.7 queryRaw

    1、我在这里创建了一个新类,叫ExampleQueryRaw。代码复制的都是ExampleQuery的。唯一不同的地方就是把queryApi.query改为了queryApi.queryRaw。同时,query变量的类型也从List变成了String

    (十一)JAVA操作InfluxDB

    2、现在,我们将查询的结果打印一下。

    (十一)JAVA操作InfluxDB

    3、可以看到,我们打印出了CSV格式的数据,这是因为InfluxDB HTTP API本来在请求体中放的就是CSV格式的数据。所以QueryRaw方法其实就是返回原始的CSV。

    11.6 同步写和异步写的区别

    1、同步写,就是当我调用写入方法时,立刻向InfluxDB发起一个请求,将数据传送过去,而且当前线程会一直阻塞等待写入操作完成。

    2、异步写,其实是我调用写入方法的时候,先不执行写入这个操作,而是将数据放入一个缓冲区。当缓冲区满了,我再真正地将数据发送给InfluxDB,这样相当于实现了一个攒批的效果。在后面的示例中,我会向建议先在InfluxDB中创建一个名为example_java的存储桶,再了解后面的写入示例。

    11.7 示例:同步写入InfluxDB

    11.7.1 创建一个类

    1、这次我们创建一个名为ExampleWriteSync的类,org,bucket,url,token什么的还是复制之前例子中的,但是此处我们把bucket改为example_java。总体代码如下:

    public class ExampleWriteSync {
    	private static String org = "atguigu";
    	private static String bucket = "example_java";
    	private static String url = "http://localhost:8086";
    	private static char[] token = "ZA8uWTSRFflhKhFvNW4TcZwwvd2NHFW1YIVlcj9Am5iJ4ueHawWh49_jszoKybEymHqgR5mAWg4XMv4tb9TP3w==".toCharArray();
    	
    	public static void main(String[] args) {
    	InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url, token, org, bucket);
    	}
    }
    

    (十一)JAVA操作InfluxDB

    11.7.2 获取API对象

    1、我们可以看到,InfluxDBClient 上有多种方法获取写操作 API 对象。其中WriteApiBlocking是同步写入,WriteApi是异步写入。

    (十一)JAVA操作InfluxDB

    2、我们现在先使用getWriteApiBlocking方法获取同步写入的API。

    11.7.3 有哪些写入方法

    (十一)JAVA操作InfluxDB

    1、简单归纳,写入API给用户提供了 3 类方法写入数据。另外,这三类方法都有与之对应的带s后缀的版本,表示可以一次写入多条。

    项目Value
    writeMeasuremen用户可以写入自己的POJO类
    writePointinfluxdb-client-java提供了一个Point类,用户可以将一条条数据封装为一个个Point,写入InfluxDB。
    writeRecord用户可以用符合InfluxDB行协议的字符串向InfluxDB写入数据。

    11.7.4 通过Point对象写入InfluxDB

    11.7.4.1 构建Point对象

    1、使用下面的代码创建一个point对象。

    Point point = Point.measurement("temperature")
    		.addTag("location", "west")
    		.addField("value", 55D)
    		.time(Instant.now(), WritePrecision.MS);
    

    2、这是典型的构造器设计模式,measurement是一个静态方法,它会帮我们 new一个Point。addTag和addField不再解释。最终的time,我们通过第二个参数指定写入时间戳的精度。这里是将写入的时间精度确定为了毫秒,如果你传入了一个纳秒时间戳,但精度指明了毫秒,那超出毫秒的部分会被直接截断。

    11.7.4.2 将point写出

    1、使用下面的代码,直接将point写到InfluxDB中。记得在此之前创建example_java存储桶。

    writeApiBlocking.writePoint(point);
    
    11.7.4.3 验证写入结果

    1、执行程序后,在InfluxDB DataExplorer查看example_java里面有没有新的数据,并将它展示出来。

    (十一)JAVA操作InfluxDB

    11.7.5 通过行协议写入InfluxDB

    11.7.5.1 注释上一个示例的代码

    1、现在,我们将前面通过Point写数据的代码注释掉。

    (十一)JAVA操作InfluxDB

    11.7.5.2 编写代码

    1、在main方法中追下下面的代码

    此处我们在行协议中省略时间戳,让InfluxDB自动帮我们把时间补上。

    writeApiBlocking.writeRecord(WritePrecision.NS,"temperature,location=west value=60.0");
    
    11.7.5.3 验证写入结果

    1、运行代码后,一样还是去InfluxDB上查看数据。如图所示,第二条数据已经成功进入InfluxDB了。

    (十一)JAVA操作InfluxDB

    11.7.6 通过POJO类写入InfluxDB

    11.7.6.1 注释上一个示例的代码

    1、同样,我们还是先注释掉上一次用InfluxDB行协议写入数据的代码。

    (十一)JAVA操作InfluxDB

    11.7.6.2 添加一个静态内部类
    private static class Temperature {
    }
    
    11.7.6.3 @Measurement注解

    1、给静态内部类加一个注解。

    @Measuremet注解必须加到类上,表示这个类对应InfluxDB中的哪个测量名称。

    @Measurement(name = "temperature")
    private static class Temperature {
    }
    
    11.7.6.4 添加成员变量
    @Measurement(name = "temperature")
    private static class Temperature {
    String location;
    Double value;
    Instant time;
    } 
    
    11.7.6.5 @Column注解

    1、@Column注解只能用在成员变量上。

    2、 @Column有 4 种实现,如下图所示。

    (十一)JAVA操作InfluxDB

    3、你可以将一个成员变量指定为tag、measurement、timestamp还是field。最终代码如下:

    @Measurement(name = "temperature")
    private static class Temperature {
    @Column(tag = true)
    String location;
    @Column
    Double value;
    @Column(timestamp = true)
    Instant time;
    }
    
    11.7.6.6 创建一个Temperature对象并给其属性赋值

    1、代码如下

    Temperature temperature = new Temperature();
    temperature.location = "west";
    temperature.value = 40D;
    temperature.time = Instant.now();
    
    11.7.6.7 写到InfluxDB

    现在,我们将这个POJO类的对象写到InfluxDB

    writeApiBlocking.writeMeasurement(WritePrecision.NS,temperature);
    

    最终的代码如下图所示:

    (十一)JAVA操作InfluxDB

    11.7.6.8 验证写入效果

    1、运行main方法,去DataExplorer上查看输出效果。如图所示,数据已经成功进入InfluxDB。

    (十一)JAVA操作InfluxDB

    11.8 示例:异步写入InfluxDB

    11.8.1 创建一个类

    1、创建一个名为ExampleWriteAsync的类,一样还是复用我们之前的org、bucket、url和token。并创建InfluxDBClient对象,基础代码如下图所示。

    (十一)JAVA操作InfluxDB

    11.8.2 获取API对象

    1、可以看到getWriteApi方法已经被标记为弃用了

    (十一)JAVA操作InfluxDB

    2、现在鼓励使用的是makeWriteApi方法。实际上,在目前版本(2.4.0),getWriteApi的内部实现已经是直接调用makeWriteApi了。

    (十一)JAVA操作InfluxDB

    11.8.3 编写写入代码

    1、和之前的同步写入一样,writeApi对象也有writeRecord、writePoint、wirteMeasurement多种写入方法,这里不再赘述。这里,我们只用最简单的writeRecord方法插入一条数据,把代码跑通。

    writeApi.writeRecord(WritePrecision.NS,"temperature,location=north value=60.0");
    

    11.8.4 验证写入结果(写入失败)

    1、此时,我们运行的代码如下图所示。

    (十一)JAVA操作InfluxDB

    2、在Web UI上打开Data Explorer查看写入结果。可以看到,这里没有出现我们刚才的数据,这表示我们刚才的写入失败了.

    (十一)JAVA操作InfluxDB

    3、但是,我们的java程序没有报错。这是因为WriteApi会使用一个守护线程,帮我们管理缓冲区,它会在缓冲区满或者距离上次写出数据过 1 秒时将数据写出去。我们刚才就放了一条数据,缓冲区没满、write方法调用完程序就立刻退出了,所以后台线程压根就没有做写的操作。

    11.8.5 修改代码

    1、现在,有两种方式让守护线程执行写的操作。

    • 手动触发缓冲区刷写
       writeApi.flush();
      
      • 关闭InfluxDBClient
        influxDBClient.close();
        

        2、这里,我们先用第二种。修改后的代码如下

        (十一)JAVA操作InfluxDB

        3、运行,之后去Data Explorer上查看结果。

        11.8.6 验证写入结果(写入成功)

        1、如果能看到location Tag上出现的north标签,而且能够查出来一条数据,那么写入操作就成功了!

        (十一)JAVA操作InfluxDB

        11.8.7 小结:异步写入工作逻辑

        • writeApi里有一个缓冲区,这个缓冲区的大小默认是 10000 条数据。
        • 虽然有缓冲区但是writeApi写出数据并不是一次把整个缓冲区都写出去,而是按照批次(默认是 1000 条)的单位来写。
        • 当产生被压或者写入失败时,守护线程会自动重试写入数据。

          11.8.8 异步写入的配置

          1、异步攒批的操作的守护线程隐式进行的,好在它的行为我们可以进行具体的配置。

          (十一)JAVA操作InfluxDB

          2、influxdb-client-java为我们提供了一个WriteOption对象,调用makeWriteApi时可以传入这个对象,通过上图的提示我们可以看到,缓冲区的大小,批的大小,刷写的间隔我们都是可以进行明确指定的。

          11.8.9 默认配置

          (十一)JAVA操作InfluxDB

          11.9 兼容V1 Api

          1、这里只做简单的介绍。

          2、使用InfluxDBClientFactory创建Client对象时,调用createV1方法。这个时候你获取的就是兼容V1 Api 的Client对象。

          (十一)JAVA操作InfluxDB

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]