Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ description = "A generic object store interface for uniformly interacting with A
keywords = ["object", "storage", "cloud"]
repository = "https://github.com/apache/arrow-rs-object-store"
rust-version = "1.85"
include = ["src/**/*.rs", "README.md", "LICENSE.txt", "NOTICE.txt", "Cargo.toml"]
include = [
"src/**/*.rs",
"README.md",
"LICENSE.txt",
"NOTICE.txt",
"Cargo.toml",
]

[package.metadata.docs.rs]
all-features = true
Expand All @@ -46,10 +52,14 @@ url = "2.2"
walkdir = { version = "2", optional = true }

# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"], optional = true }
base64 = { version = "0.22", default-features = false, features = [
"std",
], optional = true }
form_urlencoded = { version = "1.2", optional = true }
http-body-util = { version = "0.1.2", optional = true }
httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true }
httparse = { version = "1.8.0", default-features = false, features = [
"std",
], optional = true }
hyper = { version = "1.2", default-features = false, optional = true }
md-5 = { version = "0.11.0", default-features = false, optional = true }
quick-xml = { version = "0.39.0", features = ["serialize", "overlapped-lists"], optional = true }
Expand All @@ -65,6 +75,9 @@ serde_urlencoded = { version = "0.7", optional = true }
tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"], optional = true }
tracing = { version = "0.1", optional = true }

[target.'cfg(target_family="unix")'.dependencies]
xattr = { version = "1", optional = true }

[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.31.1", features = ["fs"] }

Expand All @@ -74,7 +87,7 @@ wasm-bindgen-futures = "0.4.18"
futures-channel = {version = "0.3", features = ["sink"]}

[features]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this missing the xattr feature?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought you didn't want it by default.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but you still need to define the feature. An optional dependency doesn't automatically define a feature, no?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ cargo metadata --format-version 1 --no-deps | jq '.packages[0].features'

[...]
  "xattr": [
    "dep:xattr"
  ]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird, but since it passes the test this is obviously working 🤷

default = ["fs"]
default = ["fs", "xattr"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded", "tokio"]
azure = ["cloud", "httparse"]
fs = ["walkdir", "tokio"]
Expand Down
222 changes: 205 additions & 17 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ pub(crate) enum Error {

#[error("Upload aborted")]
Aborted,

#[cfg(feature = "xattr")]
#[error("Unable to set extended attribute on {}: {source}", path.display())]
UnableToSetXattr {
source: std::io::Error,
path: PathBuf,
},

#[cfg(feature = "xattr")]
#[error("Unable to read extended attribute on {}: {source}", path.display())]
UnableToReadXattr {
source: std::io::Error,
path: PathBuf,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -325,6 +339,103 @@ fn is_valid_file_path(path: &Path) -> bool {
}
}

/// Sets extended attributes on a file from an Attributes collection
#[cfg(feature = "xattr")]
fn set_xattrs(path: &std::path::Path, attributes: &Attributes) -> Result<()> {
use crate::Attribute;
use std::borrow::Cow;

for (attr, value) in attributes {
let name: Cow<'static, str> = match attr {
Attribute::CacheControl => Cow::Borrowed("user.object_store.cache_control"),
Attribute::ContentDisposition => Cow::Borrowed("user.object_store.content_disposition"),
Attribute::ContentEncoding => Cow::Borrowed("user.object_store.content_encoding"),
Attribute::ContentLanguage => Cow::Borrowed("user.object_store.content_language"),
Attribute::ContentType => Cow::Borrowed("user.object_store.content_type"),
Attribute::StorageClass => Cow::Borrowed("user.object_store.storage_class"),
Attribute::Metadata(key) => Cow::Owned(format!("user.object_store.{key}")),
};
xattr::set(path, name.as_ref(), value.as_ref().as_bytes()).map_err(|source| {
Error::UnableToSetXattr {
source,
path: path.into(),
}
})?;
}
Ok(())
}

/// Reads extended attributes from a file and returns an Attributes collection
#[cfg(feature = "xattr")]
fn get_xattrs(path: &std::path::Path) -> Result<Attributes> {
use crate::Attribute;

let mut attributes = Attributes::new();

let list = match xattr::list(path) {
Ok(list) => list,
Err(source) => {
return Err(Error::UnableToReadXattr {
source,
path: path.into(),
}
.into());
}
};

for name in list {
let name_str = match name.to_str() {
Some(s) if s.starts_with("user.object_store.") => s,
_ => continue,
};

let value = match xattr::get(path, &name) {
Ok(Some(v)) => match String::from_utf8(v) {
Ok(s) => s,
Err(_) => continue,
},
Ok(None) => continue,
Err(source) => {
return Err(Error::UnableToReadXattr {
source,
path: path.into(),
}
.into());
}
};

let key = &name_str["user.object_store.".len()..];
let attr = match key {
"cache_control" => Attribute::CacheControl,
"content_disposition" => Attribute::ContentDisposition,
"content_encoding" => Attribute::ContentEncoding,
"content_language" => Attribute::ContentLanguage,
"content_type" => Attribute::ContentType,
"storage_class" => Attribute::StorageClass,
key => Attribute::Metadata(key.to_string().into()),
};
attributes.insert(attr, value.into());
}
Ok(attributes)
}

/// Returns an error if attributes are non-empty when xattr feature is disabled.
#[cfg(not(feature = "xattr"))]
fn set_xattrs(_path: &std::path::Path, attributes: &Attributes) -> Result<()> {
if !attributes.is_empty() {
return Err(super::Error::NotSupported {
source: "Setting extended attributes requires the 'xattr' feature".into(),
});
}
Ok(())
}

/// No-op when xattr feature is disabled: returns empty attributes.
#[cfg(not(feature = "xattr"))]
fn get_xattrs(_path: &std::path::Path) -> Result<Attributes> {
Ok(Attributes::new())
}

#[async_trait]
impl ObjectStore for LocalFileSystem {
async fn put_opts(
Expand All @@ -340,13 +451,6 @@ impl ObjectStore for LocalFileSystem {
});
}

if !opts.attributes.is_empty() {
return Err(crate::Error::NotImplemented {
operation: "`put_opts` with `opts.attributes` specified".into(),
implementer: self.to_string(),
});
}

let path = self.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, staging_path) = new_staged_upload(&path)?;
Expand All @@ -359,6 +463,9 @@ impl ObjectStore for LocalFileSystem {
path: path.to_string_lossy().to_string(),
})?;
e_tag = Some(get_etag(&metadata));

set_xattrs(&staging_path, &opts.attributes)?;

match opts.mode {
PutMode::Overwrite => {
// For some fuse types of file systems, the file must be closed first
Expand Down Expand Up @@ -406,16 +513,9 @@ impl ObjectStore for LocalFileSystem {
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
if !opts.attributes.is_empty() {
return Err(crate::Error::NotImplemented {
operation: "`put_multipart_opts` with `opts.attributes` specified".into(),
implementer: self.to_string(),
});
}

let dest = self.path_to_filesystem(location)?;
let (file, src) = new_staged_upload(&dest)?;
Ok(Box::new(LocalUpload::new(src, dest, file)))
Ok(Box::new(LocalUpload::new(src, dest, file, opts.attributes)))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand All @@ -434,9 +534,11 @@ impl ObjectStore for LocalFileSystem {
None => 0..meta.size,
};

let attributes = get_xattrs(&path)?;

Ok(GetResult {
payload: GetResultPayload::File(file, path),
attributes: Attributes::default(),
attributes,
range,
meta,
})
Expand Down Expand Up @@ -835,6 +937,8 @@ struct LocalUpload {
src: Option<PathBuf>,
/// The next offset to write into the file
offset: u64,
/// Attributes to set on the file
attributes: Attributes,
}

#[derive(Debug)]
Expand All @@ -844,14 +948,15 @@ struct UploadState {
}

impl LocalUpload {
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, attributes: Attributes) -> Self {
Self {
state: Arc::new(UploadState {
dest,
file: Mutex::new(file),
}),
src: Some(src),
offset: 0,
attributes,
}
}
}
Expand Down Expand Up @@ -882,9 +987,13 @@ impl MultipartUpload for LocalUpload {
async fn complete(&mut self) -> Result<PutResult> {
let src = self.src.take().ok_or(Error::Aborted)?;
let s = Arc::clone(&self.state);
let attributes = std::mem::take(&mut self.attributes);
maybe_spawn_blocking(move || {
// Ensure no inflight writes
let file = s.file.lock();

set_xattrs(&src, &attributes)?;

std::fs::rename(&src, &s.dest)
.map_err(|source| Error::UnableToRenameFile { source })?;
let metadata = file.metadata().map_err(|e| Error::Metadata {
Expand Down Expand Up @@ -1280,6 +1389,8 @@ mod tests {
copy_rename_nonexistent_object(&integration).await;
stream_get(&integration).await;
put_opts(&integration, false).await;
#[cfg(feature = "xattr")]
put_get_attributes(&integration).await;
}

#[test]
Expand Down Expand Up @@ -1914,3 +2025,80 @@ mod unix_test {
spawned.await.unwrap();
}
}

#[cfg(feature = "xattr")]
#[cfg(test)]
mod xattr_test {
use tempfile::TempDir;

use crate::local::LocalFileSystem;
use crate::{Attribute, Attributes, ObjectStore, ObjectStoreExt, Path, PutOptions};

#[tokio::test]
async fn test_put_get_attributes() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("test_file");
let data = "test data";

let mut attributes = Attributes::new();
attributes.insert(Attribute::ContentType, "text/plain".into());
attributes.insert(Attribute::CacheControl, "max-age=3600".into());
attributes.insert(Attribute::ContentDisposition, "inline".into());
attributes.insert(Attribute::ContentEncoding, "gzip".into());
attributes.insert(Attribute::ContentLanguage, "en-US".into());
attributes.insert(Attribute::StorageClass, "STANDARD".into());
attributes.insert(
Attribute::Metadata("custom_key".into()),
"custom_value".into(),
);

let opts = PutOptions {
attributes: attributes.clone(),
..Default::default()
};

integration
.put_opts(&location, data.into(), opts)
.await
.unwrap();

let result = integration.get(&location).await.unwrap();
assert_eq!(result.attributes, attributes);

let bytes = result.bytes().await.unwrap();
assert_eq!(bytes.as_ref(), data.as_bytes());
}

#[tokio::test]
async fn test_multipart_attributes() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("multipart_file");

let mut attributes = Attributes::new();
attributes.insert(Attribute::ContentType, "application/octet-stream".into());
attributes.insert(Attribute::Metadata("part_count".into()), "2".into());

let opts = crate::PutMultipartOptions {
attributes: attributes.clone(),
..Default::default()
};

let mut upload = integration
.put_multipart_opts(&location, opts)
.await
.unwrap();
upload.put_part("part1".into()).await.unwrap();
upload.put_part("part2".into()).await.unwrap();
upload.complete().await.unwrap();

let result = integration.get(&location).await.unwrap();
assert_eq!(result.attributes, attributes);

let bytes = result.bytes().await.unwrap();
assert_eq!(bytes.as_ref(), b"part1part2");
}
}
Loading