Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

## Bugfixes

* Fixed GCS filesystem glob matching to correctly handle `/` in object names and support `**` for recursive matching (Go) ([#38059](https://github.com/apache/beam/issues/38059)).
* Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)).

## Security Fixes
Expand Down
103 changes: 97 additions & 6 deletions sdks/go/pkg/beam/io/filesystem/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"context"
"fmt"
"io"
"path/filepath"
"regexp"
"strings"
"time"

"cloud.google.com/go/storage"
Expand All @@ -38,6 +39,91 @@ const (
projectBillingHook = "beam:go:hook:filesystem:billingproject"
)

// globToRegex translates a glob pattern to a regular expression.
// It differs from filepath.Match in that:
// - / is treated as a regular character (not a separator), since GCS object
// names are flat with / being just another character
// - ** matches any sequence of characters including / (zero or more)
// - **/ matches zero or more path segments (e.g., "" or "dir/" or "dir/subdir/")
// - * matches any sequence of characters except / (zero or more)
// - ? matches a single character (any character including /)
//
// This matches the behavior of the Python and Java SDKs.
func globToRegex(pattern string) (*regexp.Regexp, error) {
var result strings.Builder
result.WriteString("^")

for i := 0; i < len(pattern); i++ {
c := pattern[i]
switch c {
case '*':
// Check for ** (double asterisk)
if i+1 < len(pattern) && pattern[i+1] == '*' {
// Check if followed by / (e.g., "**/" matches zero or more path segments)
if i+2 < len(pattern) && pattern[i+2] == '/' {
// **/ matches "" or "something/" or "a/b/c/"
result.WriteString("(.*/)?")
i += 2 // Skip the second * and the /
} else {
// ** at end or before non-slash matches any characters
result.WriteString(".*")
i++ // Skip the second *
}
} else {
result.WriteString("[^/]*")
}
case '?':
result.WriteString(".")
case '[':
// Character class - find the closing bracket
j := i + 1
if j < len(pattern) && pattern[j] == '!' {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ? wildcard is translated to . (matches any character including /), but * is explicitly translated to [^/]* (excludes /). This is inconsistent: a user writing dir?file.txt would unexpectedly match dir/file.txt.

Since the PR's design treats * as a single-segment matcher and ** as the cross-segment matcher, ? should follow the same rule and not cross /:

case '?':
    result.WriteString("[^/]")

The comment above the function even documents this asymmetry as intentional (? matches a single character (any character including /)), but no other major glob implementation (Python pathlib, Java PathMatcher) lets ? cross path separators when * does not.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, resolved the inconsistency and updated the comment

j++
}
if j < len(pattern) && pattern[j] == ']' {
j++
}
for j < len(pattern) && pattern[j] != ']' {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is no closing ], the code silently treats [ as a literal character. Previously, filepath.Match would return filepath.ErrBadPattern for the same input. This is a silent behavior change: a caller who was relying on the error to detect bad patterns will no longer get one. Consider returning an error instead:

if j >= len(pattern) {
    return nil, fmt.Errorf("syntax error: unclosed '[' in pattern %q", pattern)
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Now returns fmt.Errorf("syntax error: unclosed '[' in pattern %q", pattern)

j++
}
if j >= len(pattern) {
// No closing bracket, treat [ as literal
result.WriteString(regexp.QuoteMeta(string(c)))
} else {
// Copy the character class, converting ! to ^ for negation
result.WriteByte('[')
content := pattern[i+1 : j]
if len(content) > 0 && content[0] == '!' {
result.WriteByte('^')
content = content[1:]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isRegexSpecial maintains a hand-written list of regex metacharacters. The list is currently missing ] (harmless in Go RE2 outside [...], but still surprising) and is a maintenance risk if the code is adapted later. The default case in globToRegex can just call regexp.QuoteMeta directly, which is already imported and handles all metacharacters correctly:

default:
    result.WriteString(regexp.QuoteMeta(string(c)))

This removes the need for isRegexSpecial entirely.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, makes a lot of sense. Done. Replaced with regexp.QuoteMeta() and removed isRegexSpecial altogether

}
result.WriteString(content)
result.WriteByte(']')
i = j
}
default:
// Escape regex special characters
if isRegexSpecial(c) {
result.WriteByte('\\')
}
result.WriteByte(c)
}
}

result.WriteString("$") // match end
return regexp.Compile(result.String())
}

// isRegexSpecial returns true if the character is a regex metacharacter
// that needs to be escaped.
func isRegexSpecial(c byte) bool {
switch c {
case '.', '+', '^', '$', '(', ')', '|', '{', '}', '\\':
return true
}
return false
}

var billingProject string = ""

func init() {
Expand Down Expand Up @@ -107,6 +193,15 @@ func (f *fs) List(ctx context.Context, glob string) ([]string, error) {
return nil, err
}

// Compile the glob pattern to a regex. We use a custom glob-to-regex
// translation that treats / as a regular character (not a separator),
// since GCS object names are flat. This also supports ** for recursive
// matching, similar to the Java and Python SDKs.
re, err := globToRegex(object)
if err != nil {
return nil, fmt.Errorf("invalid glob pattern %q: %w", object, err)
}

var candidates []string

// We handle globs by list all candidates and matching them here.
Expand All @@ -125,11 +220,7 @@ func (f *fs) List(ctx context.Context, glob string) ([]string, error) {
return nil, err
}

match, err := filepath.Match(object, obj.Name)
if err != nil {
return nil, err
}
if match {
if re.MatchString(obj.Name) {
candidates = append(candidates, obj.Name)
}
}
Expand Down
118 changes: 118 additions & 0 deletions sdks/go/pkg/beam/io/filesystem/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,124 @@ func TestGCS_copy(t *testing.T) {
}
}

func TestGlobToRegex(t *testing.T) {
tests := []struct {
pattern string
name string
want bool
}{
// Single * should NOT match / in object names
{"*.txt", "file.txt", true},
{"*.txt", "dir/file.txt", false},
{"prefix*", "prefix123", true},
{"prefix*", "prefix/subdir", false},

// ** should match any characters including /
{"**", "file.txt", true},
{"**", "dir/file.txt", true},
{"**", "dir/subdir/file.txt", true},
{"prefix/**", "prefix/file.txt", true},
{"prefix/**", "prefix/subdir/file.txt", true},
{"**/file.txt", "file.txt", true},
{"**/file.txt", "dir/file.txt", true},
{"**/file.txt", "dir/subdir/file.txt", true},

// Mixed patterns
{"dir/*.txt", "dir/file.txt", true},
{"dir/*.txt", "dir/subdir/file.txt", false},
{"dir/**/*.txt", "dir/file.txt", true},
{"dir/**/*.txt", "dir/subdir/file.txt", true},
{"dir/**/file.txt", "dir/file.txt", true},
{"dir/**/file.txt", "dir/a/b/c/file.txt", true},

// ? should match any single character including /
{"file?.txt", "file1.txt", true},
{"file?.txt", "file12.txt", false},

// Character classes
{"file[0-9].txt", "file1.txt", true},
{"file[0-9].txt", "filea.txt", false},
{"file[!0-9].txt", "filea.txt", true},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ? test cases only verify matching a digit (file1.txt) and rejecting two characters (file12.txt). There is no test confirming whether ? matches or rejects /. Given the inconsistency with *, add an explicit case:

{"file?.txt", "file/.txt", false}, // ? should not cross /

(Or true if crossing is intentional, but document it clearly.)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added {"file?.txt", "file/.txt", false}, // ? should not cross /

{"file[!0-9].txt", "file1.txt", false},

// Exact match (no wildcards)
{"exact.txt", "exact.txt", true},
{"exact.txt", "notexact.txt", false},

// Regex special characters should be escaped
{"file.txt", "file.txt", true},
{"file.txt", "fileXtxt", false},
{"file(1).txt", "file(1).txt", true},
}

for _, tt := range tests {
t.Run(tt.pattern+"_"+tt.name, func(t *testing.T) {
re, err := globToRegex(tt.pattern)
if err != nil {
t.Fatalf("globToRegex(%q) error = %v", tt.pattern, err)
}
got := re.MatchString(tt.name)
if got != tt.want {
t.Errorf("globToRegex(%q).MatchString(%q) = %v, want %v", tt.pattern, tt.name, got, tt.want)
}
})
}
}

func TestGCS_listWithSlashesInObjectNames(t *testing.T) {
ctx := context.Background()
bucket := "beamgogcsfilesystemtest"
dirPath := "gs://" + bucket

// Create server with objects that have / in their names
server := fakestorage.NewServer([]fakestorage.Object{
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "file.txt"}, Content: []byte("")},
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "dir/file.txt"}, Content: []byte("")},
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "dir/subdir/file.txt"}, Content: []byte("")},
{ObjectAttrs: fakestorage.ObjectAttrs{BucketName: bucket, Name: "other.txt"}, Content: []byte("")},
})
t.Cleanup(server.Stop)
c := &fs{client: server.Client()}

tests := []struct {
glob string
want []string
}{
// Single * should only match top-level files
{dirPath + "/*.txt", []string{dirPath + "/file.txt", dirPath + "/other.txt"}},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The integration test for /** lists all four objects including file.txt and other.txt. The /** pattern is compiled to ^.*$ which matches any string, including an empty one. This is correct for GCS, but it is also worth adding a case like dirPath + "/dir/subdir/**" to verify deeply nested ** matching, since the nested case (dir/subdir/file.txt) is the core scenario from issue #38059.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added dirPath + "/dir/subdir/**" test case to verify deeply nested matching, which is the core scenario from issue #38059

// ** should match all files recursively
{dirPath + "/**", []string{
dirPath + "/file.txt",
dirPath + "/dir/file.txt",
dirPath + "/dir/subdir/file.txt",
dirPath + "/other.txt",
}},
// dir/* should only match immediate children
{dirPath + "/dir/*", []string{dirPath + "/dir/file.txt"}},
// dir/** should match all descendants
{dirPath + "/dir/**", []string{
dirPath + "/dir/file.txt",
dirPath + "/dir/subdir/file.txt",
}},
}

for _, tt := range tests {
t.Run(tt.glob, func(t *testing.T) {
got, err := c.List(ctx, tt.glob)
if err != nil {
t.Fatalf("List(%q) error = %v", tt.glob, err)
}

sort.Strings(got)
sort.Strings(tt.want)

if !cmp.Equal(got, tt.want) {
t.Errorf("List(%q) = %v, want %v", tt.glob, got, tt.want)
}
})
}
}

func createFakeGCSServer(tb testing.TB) *fakestorage.Server {
tb.Helper()

Expand Down
Loading