-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstreaming_payload.go
More file actions
105 lines (89 loc) · 2.84 KB
/
streaming_payload.go
File metadata and controls
105 lines (89 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package client
import (
"io"
"sync"
// Packages
"github.com/mutablelogic/go-client/pkg/multipart"
"github.com/mutablelogic/go-server/pkg/types"
)
///////////////////////////////////////////////////////////////////////////////
// TYPES
// streamingRequest is a payload that streams data via an io.Pipe rather than
// buffering the entire payload in memory. Useful for large file uploads.
type streamingRequest struct {
method string
accept string
mimetype string
reader *io.PipeReader
wg sync.WaitGroup
}
///////////////////////////////////////////////////////////////////////////////
// LIFECYCLE
// NewStreamingMultipartRequest returns a new request with a Multipart Form data payload
// that streams the encoded data rather than buffering it in memory. This is useful for
// large file uploads where buffering would consume too much memory.
//
// The encoding happens in a background goroutine that writes to a pipe while the HTTP
// client reads from the other end. Encoding errors are propagated via the pipe.
func NewStreamingMultipartRequest(payload any, accept string) (Payload, error) {
pr, pw := io.Pipe()
// Create the encoder - we need the content type before starting the goroutine
enc := multipart.NewMultipartEncoder(pw)
mimetype := enc.ContentType()
req := &streamingRequest{
method: "POST",
accept: accept,
mimetype: mimetype,
reader: pr,
}
// Encode in a goroutine - writes to pipe while HTTP client reads
req.wg.Add(1)
go func() {
defer req.wg.Done()
var err error
defer func() {
// Close the encoder first to write the final boundary
if closeErr := enc.Close(); closeErr != nil && err == nil {
err = closeErr
}
// Close the pipe writer, propagating any error to the reader
if err != nil {
pw.CloseWithError(err)
} else {
pw.Close()
}
}()
err = enc.Encode(payload)
}()
return req, nil
}
///////////////////////////////////////////////////////////////////////////////
// PAYLOAD METHODS
// Method returns the HTTP method
func (req *streamingRequest) Method() string {
return req.method
}
// Type returns the request mimetype
func (req *streamingRequest) Type() string {
return req.mimetype
}
// Accept returns the acceptable mimetype responses
func (req *streamingRequest) Accept() string {
if req.accept == "" {
return types.ContentTypeAny
}
return req.accept
}
// Read implements the io.Reader interface for a streaming payload
func (req *streamingRequest) Read(b []byte) (n int, err error) {
return req.reader.Read(b)
}
// Close closes the reader, which signals the encoding goroutine to terminate,
// and waits for the goroutine to complete. Callers must ensure Close is invoked
// once the HTTP client has finished reading the request body to avoid
// goroutine leaks.
func (req *streamingRequest) Close() error {
err := req.reader.Close()
req.wg.Wait()
return err
}