Java应用中的数据湖集成:Parquet/ORC文件格式处理与优化
大家好,今天我们来聊聊Java应用如何与数据湖集成,特别是如何高效地处理Parquet和ORC这两种常见的文件格式。数据湖的核心优势在于能够以原始格式存储各种类型的数据,为后续的分析和处理提供灵活性。而Parquet和ORC则是列式存储格式,它们在数据压缩和查询性能方面表现出色,非常适合与数据湖结合使用。
1. 数据湖与文件格式概述
数据湖是一个集中存储各种结构化、半结构化和非结构化数据的存储库。与传统的数据仓库不同,数据湖以原始格式存储数据,允许用户在需要时进行转换和分析。数据湖通常构建在廉价的存储基础设施之上,如HDFS、Amazon S3或Azure Blob Storage。
Parquet和ORC是两种流行的列式存储文件格式,专为大数据分析而设计。它们的主要优势包括:
- 列式存储: 将同一列的数据存储在一起,提高了读取特定列的效率,尤其是在只需要访问部分列的情况下。
- 数据压缩: 采用高效的压缩算法,减少存储空间和I/O开销。
- 谓词下推: 允许将过滤条件推送到存储层,减少需要读取的数据量。
- Schema Evolution: 支持在不影响现有数据的情况下修改schema。
2. Java集成Parquet:读写操作详解
Apache Parquet提供了Java API,方便我们在Java应用中读写Parquet文件。我们需要引入Parquet的依赖:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
</dependency>
2.1 写入Parquet文件
以下是一个简单的示例,演示如何使用Parquet API写入数据到Parquet文件:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ParquetWriterExample {
public static class User {
private int id;
private String name;
private String city;
public User(int id, String name, String city) {
this.id = id;
this.name = name;
this.city = city;
}
public int getId() {
return id;
}
public String getName() {
return name;
}
public String getCity() {
return city;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + ''' +
", city='" + city + ''' +
'}';
}
}
public static class UserWriteSupport extends WriteSupport<User> {
private MessageType schema;
private org.apache.parquet.io.api.RecordConsumer recordConsumer;
@Override
public WriteSupport.WriteContext init(Configuration configuration) {
schema = Types.buildMessage()
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("id").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32).named("id")
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("name").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY).named("name")
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("city").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY).named("city")
.named("User");
return new WriteSupport.WriteContext(schema, null);
}
@Override
public void prepareForWrite(org.apache.parquet.io.api.RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
@Override
public void write(User record) {
recordConsumer.startMessage();
recordConsumer.startField("id", 0);
recordConsumer.addInteger(record.getId());
recordConsumer.endField("id", 0);
recordConsumer.startField("name", 1);
recordConsumer.addBinary(org.apache.parquet.io.api.Binary.fromString(record.getName()));
recordConsumer.endField("name", 1);
recordConsumer.startField("city", 2);
recordConsumer.addBinary(org.apache.parquet.io.api.Binary.fromString(record.getCity()));
recordConsumer.endField("city", 2);
recordConsumer.endMessage();
}
}
public static void main(String[] args) throws IOException {
String filePath = "users.parquet";
List<User> users = new ArrayList<>();
users.add(new User(1, "Alice", "New York"));
users.add(new User(2, "Bob", "Los Angeles"));
users.add(new User(3, "Charlie", "Chicago"));
Configuration conf = new Configuration();
Path path = new Path(filePath);
MessageType schema = Types.buildMessage()
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("id").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32).named("id")
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("name").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY).named("name")
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("city").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY).named("city")
.named("User");
ParquetWriter<User> writer = ParquetWriter.builder(path, new UserWriteSupport())
.withConf(conf)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
for (User user : users) {
writer.write(user);
}
writer.close();
System.out.println("Parquet file created successfully!");
}
}
代码解释:
- 依赖引入: 首先,我们添加了
parquet-hadoop和hadoop-client依赖。 - 定义数据模型: 创建了一个
User类,表示要写入的数据结构。 - 定义WriteSupport: 需要自定义一个UserWriteSupport类继承WriteSupport,并实现init,prepareForWrite,write方法。
- 构建ParquetWriter: 使用
ParquetWriter.builder()创建ParquetWriter实例。withPath():指定输出文件的路径。withSchema():指定数据的schema。withCompressionCodec():指定压缩编解码器(例如,SNAPPY)。withConf(): 指定Hadoop Configuration。build():构建ParquetWriter实例。
- 写入数据: 循环遍历
users列表,使用writer.write()方法将每个User对象写入Parquet文件。 - 关闭writer: 使用
writer.close()方法关闭writer,确保数据写入完成。
2.2 读取Parquet文件
以下是一个读取Parquet文件的示例:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import java.io.IOException;
public class ParquetReaderExample {
public static class User {
private int id;
private String name;
private String city;
public User(int id, String name, String city) {
this.id = id;
this.name = name;
this.city = city;
}
public int getId() {
return id;
}
public String getName() {
return name;
}
public String getCity() {
return city;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + ''' +
", city='" + city + ''' +
'}';
}
}
public static class UserReadSupport extends ReadSupport<User> {
@Override
public ReadContext init(Configuration configuration, java.util.Map<String, String> keyValueMetaData, MessageType fileSchema) {
return new ReadContext(fileSchema);
}
@Override
public RecordMaterializer<User> prepareForRead(Configuration configuration, java.util.Map<String, String> keyValueMetaData, MessageType fileSchema, ReadContext readContext) {
return new UserRecordMaterializer(fileSchema);
}
}
public static class UserRecordMaterializer extends RecordMaterializer<User> {
private MessageType schema;
private int id;
private String name;
private String city;
public UserRecordMaterializer(MessageType schema) {
this.schema = schema;
}
@Override
public User getCurrentRecord() {
return new User(id, name, city);
}
@Override
public org.apache.parquet.io.api.GroupConverter getRootConverter() {
return new GroupConverter() {
@Override
public void start() {
}
@Override
public void end() {
}
@Override
public Converter getConverter(int fieldIndex) {
switch (fieldIndex) {
case 0:
return new IntConverter() {
@Override
public void addInt(int value) {
id = value;
}
};
case 1:
return new BinaryConverter() {
@Override
public void addBinary(org.apache.parquet.io.api.Binary value) {
name = value.toStringUsingUTF8();
}
};
case 2:
return new BinaryConverter() {
@Override
public void addBinary(org.apache.parquet.io.api.Binary value) {
city = value.toStringUsingUTF8();
}
};
default:
throw new IllegalArgumentException("Unknown field index: " + fieldIndex);
}
}
};
}
}
public static void main(String[] args) throws IOException {
String filePath = "users.parquet";
Configuration conf = new Configuration();
Path path = new Path(filePath);
MessageType schema = Types.buildMessage()
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("id").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32).named("id")
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("name").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY).named("name")
.required(org.apache.parquet.schema.Type.Repetition.REQUIRED).named("city").primitive(org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY).named("city")
.named("User");
ParquetReader<User> reader = ParquetReader.builder(new UserReadSupport(), path)
.withConf(conf)
.build();
User user;
while ((user = reader.read()) != null) {
System.out.println(user);
}
reader.close();
}
}
代码解释:
- 构建ParquetReader: 使用
ParquetReader.builder()创建ParquetReader实例。withPath():指定输入文件的路径。withReadSupport():指定读取数据的ReadSupport实现。withConf():指定Hadoop Configuration。build():构建ParquetReader实例。
- 读取数据: 使用
reader.read()方法逐行读取数据。循环直到reader.read()返回null,表示文件已读取完毕。 - 关闭reader: 使用
reader.close()方法关闭reader。
2.3 使用Avro Schema读写Parquet文件
Parquet可以与Avro Schema集成,简化数据的序列化和反序列化。首先,我们需要添加Avro依赖:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.2</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.3</version>
</dependency>
写入:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class AvroParquetWriterExample {
public static void main(String[] args) throws IOException {
String filePath = "users_avro.parquet";
String avroSchemaStr = "{"type":"record","name":"User","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"city","type":"string"}]}";
Schema schema = new Schema.Parser().parse(avroSchemaStr);
List<GenericRecord> users = new ArrayList<>();
GenericRecord user1 = new GenericData.Record(schema);
user1.put("id", 1);
user1.put("name", "Alice");
user1.put("city", "New York");
users.add(user1);
GenericRecord user2 = new GenericData.Record(schema);
user2.put("id", 2);
user2.put("name", "Bob");
user2.put("city", "Los Angeles");
users.add(user2);
Configuration conf = new Configuration();
Path path = new Path(filePath);
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(path)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
for (GenericRecord user : users) {
writer.write(user);
}
writer.close();
System.out.println("Avro Parquet file created successfully!");
}
}
读取:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException;
public class AvroParquetReaderExample {
public static void main(String[] args) throws IOException {
String filePath = "users_avro.parquet";
String avroSchemaStr = "{"type":"record","name":"User","fields":[{"name":"id","type":"int"},{"name":"name","type":"string"},{"name":"city","type":"string"}]}";
Schema schema = new Schema.Parser().parse(avroSchemaStr);
Configuration conf = new Configuration();
Path path = new Path(filePath);
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(path)
.withConf(conf)
.build();
GenericRecord user;
while ((user = reader.read()) != null) {
System.out.println(user);
}
reader.close();
}
}
3. Java集成ORC:读写操作详解
Apache ORC也提供了Java API,用于读写ORC文件。我们需要引入ORC的依赖:
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.6</version>
</dependency>
3.1 写入ORC文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspectorFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class OrcWriterExample {
public static void main(String[] args) throws IOException {
String filePath = "users.orc";
Configuration conf = new Configuration();
Path path = new Path(filePath);
List<String> fieldNames = new ArrayList<>();
fieldNames.add("id");
fieldNames.add("name");
fieldNames.add("city");
List<ObjectInspector> fieldOIs = new ArrayList<>();
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
ObjectInspector structOI = ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
OrcFile.WriterOptions options = OrcFile.writerOptions(conf)
.setSchema(structOI.getTypeString());
Writer writer = OrcFile.createWriter(path, options);
List<Object> row1 = new ArrayList<>();
row1.add(1);
row1.add("Alice");
row1.add("New York");
OrcStruct struct1 = new OrcStruct(row1.size());
struct1.setValues(row1);
writer.write(struct1);
List<Object> row2 = new ArrayList<>();
row2.add(2);
row2.add("Bob");
row2.add("Los Angeles");
OrcStruct struct2 = new OrcStruct(row2.size());
struct2.setValues(row2);
writer.write(struct2);
writer.close();
System.out.println("ORC file created successfully!");
}
}
代码解释:
- 定义Schema: 使用
ObjectInspector定义ORC文件的schema。 - 构建Writer: 使用
OrcFile.createWriter()创建Writer实例。setSchema():设置schema。orc.compress:设置压缩编解码器(例如,ZLIB)。
- 写入数据: 创建
OrcStruct对象,并将数据写入ORC文件。 - 关闭Writer: 使用
writer.close()方法关闭writer。
3.2 读取ORC文件
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import java.io.IOException;
public class OrcReaderExample {
public static void main(String[] args) throws IOException {
String filePath = "users.orc";
Configuration conf = new Configuration();
Path path = new Path(filePath);
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
RecordReader rows = reader.rows();
StructObjectInspector inspector = (StructObjectInspector) reader.getObjectInspector();
Object row = null;
while (rows.hasNext()) {
row = rows.next(row);
System.out.println(inspector.getStructFieldsData(row));
}
rows.close();
}
}
代码解释:
- 构建Reader: 使用
OrcFile.createReader()创建Reader实例。 - 读取数据: 使用
reader.rows()方法获取RecordReader,并逐行读取数据。 - 关闭Reader: 使用
rows.close()方法关闭RecordReader。
4. Parquet/ORC文件格式优化
为了获得最佳的性能,我们需要对Parquet和ORC文件进行优化。
| 优化项 | Parquet | ORC |
|---|---|---|
| 压缩编解码器 | Snappy, Gzip, LZO, Zstd | ZLIB, Snappy, LZO, Zstd |
| Row Group Size | 64MB – 128MB | 64MB – 256MB |
| Page Size | 1MB | N/A |
| Dictionary Encoding | 适用于低基数列 | 适用于低基数列 |
| Bloom Filters | 适用于加速点查询 | 适用于加速点查询 |
| 数据排序 | 根据查询模式排序,提高谓词下推效率 | 根据查询模式排序,提高谓词下推效率 |
| 分区 | 根据常用过滤条件进行分区,减少扫描数据量 | 根据常用过滤条件进行分区,减少扫描数据量 |
| 向量化 | 利用向量化引擎(例如,Arrow)加速数据处理 | 利用向量化引擎(例如,Arrow)加速数据处理 |
| 文件大小 | 避免过多小文件,影响查询性能,建议合并小文件 | 避免过多小文件,影响查询性能,建议合并小文件 |
| Schema设计 | 避免schema过于复杂,影响查询性能,合理拆分schema | 避免schema过于复杂,影响查询性能,合理拆分schema |
4.1 压缩编解码器选择
压缩编解码器的选择直接影响存储空间和I/O性能。以下是一些常见的压缩编解码器:
- Snappy: 速度快,压缩率适中,适用于CPU密集型场景。
- Gzip: 压缩率高,但速度较慢,适用于存储空间敏感型场景。
- LZO: 速度较快,压缩率较低,适用于需要快速解压的场景。
- Zstd: 压缩率和速度都比较好,是一种现代的压缩算法。
选择合适的压缩编解码器需要根据实际的业务场景进行权衡。
4.2 Row Group Size和Page Size
Row Group Size和Page Size是Parquet的两个重要参数,它们影响数据的读取和写入性能。
- Row Group Size: Parquet将数据分成多个Row Group,每个Row Group包含多个Page。较大的Row Group Size可以提高写入性能,但会降低读取性能。建议设置为64MB – 128MB。
- Page Size: 每个Row Group包含多个Page,每个Page包含同一列的数据。较小的Page Size可以提高随机读取的效率,但会增加元数据开销。建议设置为1MB。
4.3 Dictionary Encoding
Dictionary Encoding是一种针对低基数列的优化技术。它将列中的唯一值存储在字典中,然后使用字典中的索引来表示实际的数据。这样可以减少存储空间,并提高查询性能。
4.4 Bloom Filters
Bloom Filters是一种概率数据结构,用于快速判断一个元素是否存在于集合中。Parquet和ORC都支持Bloom Filters,可以用于加速点查询。
4.5 数据排序
对数据进行排序可以提高谓词下推的效率。例如,如果我们需要查询某个时间范围内的数据,可以按照时间戳对数据进行排序。这样可以减少需要扫描的数据量。
4.6 分区
分区是一种将数据按照某个条件进行分割的技术。例如,可以按照日期对数据进行分区。这样可以减少需要扫描的数据量,提高查询性能。
4.7 向量化
向量化是一种利用CPU的SIMD指令加速数据处理的技术。Parquet和ORC都可以与向量化引擎(例如,Arrow)集成,以提高数据处理性能。
5. 总结:Parquet/ORC文件格式的关键点
Parquet和ORC是Java应用集成数据湖的理想选择。通过选择合适的压缩编解码器、调整Row Group Size和Page Size、使用Dictionary Encoding和Bloom Filters、对数据进行排序和分区,以及利用向量化技术,我们可以显著提高数据处理性能。希望今天的分享能够帮助大家更好地利用Parquet和ORC,构建高效的数据湖应用。