Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-s3streamingoutput-73196.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "bugfix",
"category": "s3, streaming output",
"description": "Output files created by S3 Select and streaming output commands are now created with owner-only permissions (0600). Existing files are also tightened to 0600 when overwritten."
}
56 changes: 34 additions & 22 deletions awscli/customizations/s3events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Add S3 specific event streaming output arg."""
from awscli.arguments import CustomArgument

import os

from awscli.arguments import CustomArgument

STREAM_HELP_TEXT = 'Filename where the records will be saved'

Expand All @@ -24,26 +26,28 @@ class DocSectionNotFoundError(Exception):
def register_event_stream_arg(event_handlers):
event_handlers.register(
'building-argument-table.s3api.select-object-content',
add_event_stream_output_arg)
add_event_stream_output_arg,
)
event_handlers.register_last(
'doc-output.s3api.select-object-content',
replace_event_stream_docs
'doc-output.s3api.select-object-content', replace_event_stream_docs
)


def register_document_expires_string(event_handlers):
event_handlers.register_last(
'doc-output.s3api',
document_expires_string
)
event_handlers.register_last('doc-output.s3api', document_expires_string)

def add_event_stream_output_arg(argument_table, operation_model,
session, **kwargs):

def add_event_stream_output_arg(
argument_table, operation_model, session, **kwargs
):
argument_table['outfile'] = S3SelectStreamOutputArgument(
name='outfile', help_text=STREAM_HELP_TEXT,
cli_type_name='string', positional_arg=True,
name='outfile',
help_text=STREAM_HELP_TEXT,
cli_type_name='string',
positional_arg=True,
stream_key=operation_model.output_shape.serialization['payload'],
session=session)
session=session,
)


def replace_event_stream_docs(help_command, **kwargs):
Expand All @@ -56,11 +60,14 @@ def replace_event_stream_docs(help_command, **kwargs):
# This should never happen, but in the rare case that it does
# we should be raising something with a helpful error message.
raise DocSectionNotFoundError(
'Could not find the "output" section for the command: %s'
% help_command)
f'Could not find the "output" section for the command: {help_command}'
)
doc.write('======\nOutput\n======\n')
doc.write("This command generates no output. The selected "
"object content is written to the specified outfile.\n")
doc.write(
"This command generates no output. The selected "
"object content is written to the specified outfile.\n"
)


def document_expires_string(help_command, **kwargs):
doc = help_command.doc
Expand All @@ -78,7 +85,7 @@ def document_expires_string(help_command, **kwargs):
f'\n\n{" " * doc.style.indentation * doc.style.indent_width}',
'ExpiresString -> (string)\n\n',
'\tThe raw, unparsed value of the ``Expires`` field.',
f'\n\n{" " * doc.style.indentation * doc.style.indent_width}'
f'\n\n{" " * doc.style.indentation * doc.style.indent_width}',
]

for idx, write in enumerate(deprecation_note_and_expires_string):
Expand All @@ -91,7 +98,7 @@ class S3SelectStreamOutputArgument(CustomArgument):
_DOCUMENT_AS_REQUIRED = True

def __init__(self, stream_key, session, **kwargs):
super(S3SelectStreamOutputArgument, self).__init__(**kwargs)
super().__init__(**kwargs)
# This is the key in the response body where we can find the
# streamed contents.
self._stream_key = stream_key
Expand All @@ -100,8 +107,9 @@ def __init__(self, stream_key, session, **kwargs):

def add_to_params(self, parameters, value):
self._output_file = value
self._session.register('after-call.s3.SelectObjectContent',
self.save_file)
self._session.register(
'after-call.s3.SelectObjectContent', self.save_file
)

def save_file(self, parsed, **kwargs):
# This method is hooked into after-call which fires
Expand All @@ -112,7 +120,11 @@ def save_file(self, parsed, **kwargs):
if self._stream_key not in parsed:
return
event_stream = parsed[self._stream_key]
with open(self._output_file, 'wb') as fp:
fd = os.open(
self._output_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600
)
os.chmod(self._output_file, 0o600)
with os.fdopen(fd, 'wb') as fp:
for event in event_stream:
if 'Records' in event:
fp.write(event['Records']['Payload'])
Expand Down
36 changes: 23 additions & 13 deletions awscli/customizations/streamingoutputarg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os

from botocore.model import Shape

from awscli.arguments import BaseCLIArgument


def add_streaming_output_arg(argument_table, operation_model,
session, **kwargs):
def add_streaming_output_arg(
argument_table, operation_model, session, **kwargs
):
# Implementation detail: hooked up to 'building-argument-table'
# event.
if _has_streaming_output(operation_model):
streaming_argument_name = _get_streaming_argument_name(operation_model)
argument_table['outfile'] = StreamingOutputArgument(
response_key=streaming_argument_name,
operation_model=operation_model,
session=session, name='outfile')
session=session,
name='outfile',
)


def _has_streaming_output(model):
Expand All @@ -36,15 +41,16 @@ def _get_streaming_argument_name(model):


class StreamingOutputArgument(BaseCLIArgument):

BUFFER_SIZE = 32768
HELP = 'Filename where the content will be saved'

def __init__(self, response_key, operation_model, name,
session, buffer_size=None):
def __init__(
self, response_key, operation_model, name, session, buffer_size=None
):
self._name = name
self.argument_model = Shape('StreamingOutputArgument',
{'type': 'string'})
self.argument_model = Shape(
'StreamingOutputArgument', {'type': 'string'}
)
if buffer_size is None:
buffer_size = self.BUFFER_SIZE
self._buffer_size = buffer_size
Expand Down Expand Up @@ -81,15 +87,15 @@ def documentation(self):
return self.HELP

def add_to_parser(self, parser):
parser.add_argument(self._name, metavar=self.py_name,
help=self.HELP)
parser.add_argument(self._name, metavar=self.py_name, help=self.HELP)

def add_to_params(self, parameters, value):
self._output_file = value
service_id = self._operation_model.service_model.service_id.hyphenize()
operation_name = self._operation_model.name
self._session.register('after-call.%s.%s' % (
service_id, operation_name), self.save_file)
self._session.register(
f'after-call.{service_id}.{operation_name}', self.save_file
)

def save_file(self, parsed, **kwargs):
if self._response_key not in parsed:
Expand All @@ -100,7 +106,11 @@ def save_file(self, parsed, **kwargs):
return
body = parsed[self._response_key]
buffer_size = self._buffer_size
with open(self._output_file, 'wb') as fp:
fd = os.open(
self._output_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600
)
os.chmod(self._output_file, 0o600)
with os.fdopen(fd, 'wb') as fp:
data = body.read(buffer_size)
while data:
fp.write(data)
Expand Down
98 changes: 70 additions & 28 deletions tests/functional/s3api/test_select_object_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import tempfile
import shutil
import tempfile

from awscli.testutils import BaseAWSCommandParamsTest
from awscli.testutils import BaseAWSHelpOutputTest
from awscli.testutils import (
BaseAWSCommandParamsTest,
BaseAWSHelpOutputTest,
skip_if_windows,
)


class TestGetObject(BaseAWSCommandParamsTest):

prefix = ['s3api', 'select-object-content']

def setUp(self):
Expand All @@ -36,11 +38,23 @@ def create_fake_payload(self):
yield {'Records': {'Payload': b'a,b,c,d\n'}}
# These next two events are ignored because they aren't
# "Records".
yield {'Progress': {'Details': {'BytesScanned': 1048576,
'BytesProcessed': 37748736}}}
yield {
'Progress': {
'Details': {
'BytesScanned': 1048576,
'BytesProcessed': 37748736,
}
}
}
yield {'Records': {'Payload': b'e,f,g,h\n'}}
yield {'Stats': {'Details': {'BytesProcessed': 62605400,
'BytesScanned': 1662276}}}
yield {
'Stats': {
'Details': {
'BytesProcessed': 62605400,
'BytesScanned': 1662276,
}
}
}
yield {'End': {}}

def test_can_stream_to_file(self):
Expand All @@ -51,14 +65,15 @@ def test_can_stream_to_file(self):
cmdline.extend(['--expression', 'SELECT * FROM S3Object'])
cmdline.extend(['--expression-type', 'SQL'])
cmdline.extend(['--request-progress', 'Enabled=True'])
cmdline.extend(['--input-serialization',
'{"CSV": {}, "CompressionType": "GZIP"}'])
cmdline.extend(
['--input-serialization', '{"CSV": {}, "CompressionType": "GZIP"}']
)
cmdline.extend(['--output-serialization', '{"CSV": {}}'])
cmdline.extend([filename])

expected_params = {
'Bucket': 'mybucket',
'Key': u'mykey',
'Key': 'mykey',
'Expression': 'SELECT * FROM S3Object',
'ExpressionType': 'SQL',
'InputSerialization': {'CSV': {}, 'CompressionType': 'GZIP'},
Expand All @@ -67,12 +82,31 @@ def test_can_stream_to_file(self):
}
stdout = self.assert_params_for_cmd(cmdline, expected_params)[0]
self.assertEqual(stdout, '')
with open(filename, 'r') as f:
with open(filename) as f:
contents = f.read()
self.assertEqual(contents, (
'a,b,c,d\n'
'e,f,g,h\n'
))
self.assertEqual(contents, ('a,b,c,d\n' 'e,f,g,h\n'))

@skip_if_windows('chmod is not supported on Windows')
def test_output_file_permissions(self):
filename = os.path.join(self._tempdir, 'outfile_perms')
cmdline = self.prefix + [
'--bucket',
'mybucket',
'--key',
'mykey',
'--expression',
'SELECT * FROM S3Object',
'--expression-type',
'SQL',
'--input-serialization',
'{"CSV": {}}',
'--output-serialization',
'{"CSV": {}}',
filename,
]
self.assert_params_for_cmd(cmdline, ignore_params=True)
# Mask file type bits to isolate permission bits (rwxrwxrwx)
self.assertEqual(os.stat(filename).st_mode & 0o777, 0o600)

def test_errors_are_propagated(self):
self.http_response.status_code = 400
Expand All @@ -83,30 +117,39 @@ def test_errors_are_propagated(self):
}
}
cmdline = self.prefix + [
'--bucket', 'mybucket',
'--key', 'mykey',
'--expression', 'SELECT * FROM S3Object',
'--expression-type', 'SQL',
'--request-progress', 'Enabled=True',
'--input-serialization', '{"CSV": {}, "CompressionType": "GZIP"}',
'--output-serialization', '{"CSV": {}}',
'--bucket',
'mybucket',
'--key',
'mykey',
'--expression',
'SELECT * FROM S3Object',
'--expression-type',
'SQL',
'--request-progress',
'Enabled=True',
'--input-serialization',
'{"CSV": {}, "CompressionType": "GZIP"}',
'--output-serialization',
'{"CSV": {}}',
os.path.join(self._tempdir, 'outfile'),
]
expected_params = {
'Bucket': 'mybucket',
'Key': u'mykey',
'Key': 'mykey',
'Expression': 'SELECT * FROM S3Object',
'ExpressionType': 'SQL',
'InputSerialization': {'CSV': {}, 'CompressionType': 'GZIP'},
'OutputSerialization': {'CSV': {}},
'RequestProgress': {'Enabled': True},
}
self.assert_params_for_cmd(
cmd=cmdline, params=expected_params,
cmd=cmdline,
params=expected_params,
expected_rc=255,
stderr_contains=(
'An error occurred (CastFailed) when '
'calling the SelectObjectContent operation'),
'calling the SelectObjectContent operation'
),
)


Expand All @@ -116,8 +159,7 @@ def test_output(self):
# We don't want to be super picky because the wording may change
# We just want to verify the Output section was customized.
self.assert_contains(
'Output\n======\n'
'This command generates no output'
'Output\n======\n' 'This command generates no output'
)
self.assert_not_contains('[outfile')
self.assert_contains('outfile')
Loading