Skip to content

Commit

Permalink
Raw JSON writer (apache#5314)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 23, 2024
1 parent a0148ba commit 259163b
Show file tree
Hide file tree
Showing 3 changed files with 478 additions and 43 deletions.
99 changes: 58 additions & 41 deletions arrow-json/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,6 @@
//! This JSON writer converts Arrow [`RecordBatch`]es into arrays of
//! JSON objects or JSON formatted byte streams.
//!
//! ## Writing JSON Objects
//!
//! To serialize [`RecordBatch`]es into array of
//! [JSON](https://docs.serde.rs/serde_json/) objects, use
//! [`record_batches_to_json_rows`]:
//!
//! ```
//! # use std::sync::Arc;
//! # use arrow_array::{Int32Array, RecordBatch};
//! # use arrow_schema::{DataType, Field, Schema};
//!
//! let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
//! let a = Int32Array::from(vec![1, 2, 3]);
//! let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
//!
//! let json_rows = arrow_json::writer::record_batches_to_json_rows(&[&batch]).unwrap();
//! assert_eq!(
//! serde_json::Value::Object(json_rows[1].clone()),
//! serde_json::json!({"a": 2}),
//! );
//! ```
//!
//! ## Writing JSON formatted byte streams
//!
//! To serialize [`RecordBatch`]es into line-delimited JSON bytes, use
Expand Down Expand Up @@ -97,6 +75,8 @@
//! In order to explicitly write null values for keys, configure a custom [`Writer`] by
//! using a [`WriterBuilder`] to construct a [`Writer`].
mod encoder;

use std::iter;
use std::{fmt::Debug, io::Write};

Expand All @@ -109,7 +89,9 @@ use arrow_array::types::*;
use arrow_array::*;
use arrow_schema::*;

use crate::writer::encoder::EncoderOptions;
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use encoder::make_encoder;

fn primitive_array_to_json<T>(array: &dyn Array) -> Result<Vec<Value>, ArrowError>
where
Expand Down Expand Up @@ -481,6 +463,7 @@ fn set_column_for_json_rows(

/// Converts an arrow [`RecordBatch`] into a `Vec` of Serde JSON
/// [`JsonMap`]s (objects)
#[deprecated(note = "Use Writer")]
pub fn record_batches_to_json_rows(
batches: &[&RecordBatch],
) -> Result<Vec<JsonMap<String, Value>>, ArrowError> {
Expand Down Expand Up @@ -597,11 +580,7 @@ pub type ArrayWriter<W> = Writer<W, JsonArray>;

/// JSON writer builder.
#[derive(Debug, Clone, Default)]
pub struct WriterBuilder {
/// Controls whether null values should be written explicitly for keys
/// in objects, or whether the key should be omitted entirely.
explicit_nulls: bool,
}
pub struct WriterBuilder(EncoderOptions);

impl WriterBuilder {
/// Create a new builder for configuring JSON writing options.
Expand Down Expand Up @@ -629,7 +608,7 @@ impl WriterBuilder {

/// Returns `true` if this writer is configured to keep keys with null values.
pub fn explicit_nulls(&self) -> bool {
self.explicit_nulls
self.0.explicit_nulls
}

/// Set whether to keep keys with null values, or to omit writing them.
Expand All @@ -654,7 +633,7 @@ impl WriterBuilder {
///
/// Default is to skip nulls (set to `false`).
pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
self.explicit_nulls = explicit_nulls;
self.0.explicit_nulls = explicit_nulls;
self
}

Expand All @@ -669,7 +648,7 @@ impl WriterBuilder {
started: false,
finished: false,
format: F::default(),
explicit_nulls: self.explicit_nulls,
options: self.0,
}
}
}
Expand Down Expand Up @@ -703,7 +682,7 @@ where
format: F,

/// Whether keys with null values should be written or skipped
explicit_nulls: bool,
options: EncoderOptions,
}

impl<W, F> Writer<W, F>
Expand All @@ -718,11 +697,12 @@ where
started: false,
finished: false,
format: F::default(),
explicit_nulls: false,
options: EncoderOptions::default(),
}
}

/// Write a single JSON row to the output writer
#[deprecated(note = "Use Writer::write")]
pub fn write_row(&mut self, row: &Value) -> Result<(), ArrowError> {
let is_first_row = !self.started;
if !self.started {
Expand All @@ -738,18 +718,48 @@ where
Ok(())
}

/// Convert the `RecordBatch` into JSON rows, and write them to the output
/// Serialize `batch` to JSON output
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
for row in record_batches_to_json_rows_internal(&[batch], self.explicit_nulls)? {
self.write_row(&Value::Object(row))?;
if batch.num_rows() == 0 {
return Ok(());
}

// BufWriter uses a buffer size of 8KB
// We therefore double this and flush once we have more than 8KB
let mut buffer = Vec::with_capacity(16 * 1024);

let mut is_first_row = !self.started;
if !self.started {
self.format.start_stream(&mut buffer)?;
self.started = true;
}

let array = StructArray::from(batch.clone());
let mut encoder = make_encoder(&array, &self.options)?;

for idx in 0..batch.num_rows() {
self.format.start_row(&mut buffer, is_first_row)?;
is_first_row = false;

encoder.encode(idx, &mut buffer);
if buffer.len() > 8 * 1024 {
self.writer.write_all(&buffer)?;
buffer.clear();
}
self.format.end_row(&mut buffer)?;
}

if !buffer.is_empty() {
self.writer.write_all(&buffer)?;
}

Ok(())
}

/// Convert the [`RecordBatch`] into JSON rows, and write them to the output
/// Serialize `batches` to JSON output
pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
for row in record_batches_to_json_rows_internal(batches, self.explicit_nulls)? {
self.write_row(&Value::Object(row))?;
for b in batches {
self.write(b)?;
}
Ok(())
}
Expand Down Expand Up @@ -803,6 +813,9 @@ mod tests {

/// Asserts that the NDJSON `input` is semantically identical to `expected`
fn assert_json_eq(input: &[u8], expected: &str) {
let s = std::str::from_utf8(input).unwrap();
println!("{s}");

let expected: Vec<Option<Value>> = expected
.split('\n')
.map(|s| (!s.is_empty()).then(|| serde_json::from_str(s).unwrap()))
Expand Down Expand Up @@ -1453,6 +1466,7 @@ mod tests {
}

#[test]
#[allow(deprecated)]
fn json_writer_one_row() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
let v = json!({ "an": "object" });
Expand All @@ -1465,6 +1479,7 @@ mod tests {
}

#[test]
#[allow(deprecated)]
fn json_writer_two_rows() {
let mut writer = ArrayWriter::new(vec![] as Vec<u8>);
let v = json!({ "an": "object" });
Expand Down Expand Up @@ -1564,9 +1579,9 @@ mod tests {
r#"{"a":{"list":[1,2]},"b":{"list":[1,2]}}
{"a":{"list":[null]},"b":{"list":[null]}}
{"a":{"list":[]},"b":{"list":[]}}
{"a":null,"b":{"list":[3,null]}}
{"b":{"list":[3,null]}}
{"a":{"list":[4,5]},"b":{"list":[4,5]}}
{"a":null,"b":{}}
{"b":{}}
{"a":{},"b":{}}
"#,
);
Expand Down Expand Up @@ -1621,7 +1636,7 @@ mod tests {
assert_json_eq(
&buf,
r#"{"map":{"foo":10}}
{"map":null}
{}
{"map":{}}
{"map":{"bar":20,"baz":30,"qux":40}}
{"map":{"quux":50}}
Expand Down Expand Up @@ -1918,6 +1933,8 @@ mod tests {
writer.finish()?;
}

println!("{}", std::str::from_utf8(&buf).unwrap());

let actual = serde_json::from_slice::<Vec<Value>>(&buf).unwrap();
let expected = serde_json::from_value::<Vec<Value>>(json!([
{
Expand Down
Loading

0 comments on commit 259163b

Please sign in to comment.