XRootD
Loading...
Searching...
No Matches
XrdClHttpFile.hh
Go to the documentation of this file.
1/***************************************************************
2 *
3 * Copyright (C) 2025, Morgridge Institute for Research
4 *
5 ***************************************************************/
6
7#ifndef XRDCLHTTP_CURLFILE_HH
8#define XRDCLHTTP_CURLFILE_HH
9
12
13#include <XrdCl/XrdClFile.hh>
15
16#include <atomic>
17#include <condition_variable>
18#include <deque>
19#include <memory>
20#include <mutex>
21#include <shared_mutex>
22#include <string>
23#include <unordered_map>
24#include <utility>
25#include <variant>
26#include <vector>
27
28namespace XrdCl {
29
30class Log;
31
32}
33
34namespace XrdClHttp {
35
36class CurlPutOp;
37class CurlReadOp;
38class HandlerQueue;
39
40}
41namespace XrdClHttp {
42
43class File final : public XrdCl::FilePlugIn {
44public:
45 File(std::shared_ptr<XrdClHttp::HandlerQueue> queue, XrdCl::Log *log) :
46 m_queue(queue),
47 m_logger(log),
48 m_default_put_handler(new PutDefaultHandler(*this))
49 {}
50
51 virtual ~File() noexcept;
52
53 virtual XrdCl::XRootDStatus Open(const std::string &url,
54 XrdCl::OpenFlags::Flags flags,
55 XrdCl::Access::Mode mode,
56 XrdCl::ResponseHandler *handler,
57 time_t timeout) override;
58
59 virtual XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler,
60 time_t timeout) override;
61
62 virtual XrdCl::XRootDStatus Stat(bool force,
63 XrdCl::ResponseHandler *handler,
64 time_t timeout) override;
65
66 virtual XrdCl::XRootDStatus Fcntl(const XrdCl::Buffer &arg,
67 XrdCl::ResponseHandler *handler,
68 time_t timeout) override;
69
70 virtual XrdCl::XRootDStatus Read(uint64_t offset,
71 uint32_t size,
72 void *buffer,
73 XrdCl::ResponseHandler *handler,
74 time_t timeout) override;
75
76 virtual XrdCl::XRootDStatus PgRead(uint64_t offset,
77 uint32_t size,
78 void *buffer,
79 XrdCl::ResponseHandler *handler,
80 time_t timeout) override;
81
82 virtual XrdCl::XRootDStatus VectorRead(const XrdCl::ChunkList &chunks,
83 void *buffer,
84 XrdCl::ResponseHandler *handler,
85 time_t timeout ) override;
86
87 virtual XrdCl::XRootDStatus Write(uint64_t offset,
88 uint32_t size,
89 const void *buffer,
90 XrdCl::ResponseHandler *handler,
91 time_t timeout) override;
92
93 virtual XrdCl::XRootDStatus Write(uint64_t offset,
94 XrdCl::Buffer &&buffer,
95 XrdCl::ResponseHandler *handler,
96 time_t timeout) override;
97
98 virtual bool IsOpen() const override;
99
100 virtual bool SetProperty( const std::string &name,
101 const std::string &value ) override;
102
103 virtual bool GetProperty( const std::string &name,
104 std::string &value ) const override;
105
106 // Returns the flags used to open the file
107 XrdCl::OpenFlags::Flags Flags() const {return m_open_flags;}
108
109 // Sets the minimum client timeout
110 static void SetMinimumHeaderTimeout(struct timespec &ts) {m_min_client_timeout.tv_sec = ts.tv_sec; m_min_client_timeout.tv_nsec = ts.tv_nsec;}
111
112 // Gets the minimum client timeout
113 static const struct timespec &GetMinimumHeaderTimeout() {return m_min_client_timeout;}
114
115 // Sets the default header timeout
116 static void SetDefaultHeaderTimeout(struct timespec &ts) {m_default_header_timeout.tv_sec = ts.tv_sec; m_default_header_timeout.tv_nsec = ts.tv_nsec;}
117
118 // Gets the default header timeout
119 static const struct timespec &GetDefaultHeaderTimeout() {return m_default_header_timeout;}
120
121 // Sets the open file's header timeout
122 void SetHeaderTimeout(const struct timespec &ts) {m_header_timeout.tv_sec = ts.tv_sec; m_header_timeout.tv_nsec = ts.tv_nsec;}
123
124 // Get the header timeout value, taking into consideration the contents of the header and XrdCl's default values
125 static struct timespec ParseHeaderTimeout(const std::string &header_value, XrdCl::Log *logger);
126
127 // Get the header timeout value, taking into consideration the provided command timeout, the existing open timeout, and XrdCl's default values
128 struct timespec GetHeaderTimeout(time_t oper_timeout) const;
129
130 // Get the header timeout value, taking into consideration the provided command timeout, a default timeout, and XrdCl's default values
131 static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout);
132
133 // Set the federation metadata timeout
134 static void SetFederationMetadataTimeout(const struct timespec &ts) {m_fed_timeout.tv_sec = ts.tv_sec; m_fed_timeout.tv_nsec = ts.tv_nsec;}
135
136 // Get the federation metadata timeout
137 static struct timespec GetFederationMetadataTimeout() {return m_fed_timeout;}
138
139 // Get the global monitoring statistics data
140 static std::string GetMonitoringJson();
141
142private:
143
144 // Try to read a buffer via the prefetch mechanism.
145 //
146 // Returns tuple (status, ok); if `ok` is set to true, then the operation
147 // was attempted. Otherwise, the operation was skipped and `status` should
148 // be ignored.
149 std::tuple<XrdCl::XRootDStatus, bool> ReadPrefetch(uint64_t offset, uint64_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout, bool isPgRead);
150
151 // The "*Response" variant of the callback response objects defined in DirectorCacheResponse.hh
152 // are opt-in; if the caller isn't expecting them, then they will leak memory. This
153 // function determines whether the opt-in is enabled.
154 bool SendResponseInfo() const;
155
156 // Returns a pointer to the connection callout function
157 CreateConnCalloutType GetConnCallout() const;
158
159 // Get the current URL to use for file operations.
160 //
161 // The `XrdClHttpQueryParam` property allows for additional query parameters to be added by
162 // the owner of the file handle; these can change while the file is open (for example, if there
163 // is a credential in the query parameter that might expire) so we must reconstruct the URL,
164 // even for `Read`-type calls.
165 const std::string GetCurrentURL() const;
166
167 // Calculate the current URL given the query parameter value
168 //
169 // Must be called with the m_properties_mutex held for write.
170 void CalculateCurrentURL(const std::string &value) const;
171
172 bool m_is_opened{false};
173 std::atomic<bool> m_full_download{false}; // Whether the file was in "full download mode" when opened.
174
175 // The flags used to open the file
177
178 std::string m_url; // The URL as given to the Open() method.
179 std::string m_last_url; // The last server the file was connected to after Open() (potentially after redirections)
180 mutable std::string m_url_current; // The URL to use for future HTTP requests; may be the last URL plus additional query parameters.
181 std::shared_ptr<XrdClHttp::HandlerQueue> m_queue;
182 XrdCl::Log *m_logger{nullptr};
183 std::unordered_map<std::string, std::string> m_properties;
184
185 // Protects the contents of m_properties
186 mutable std::shared_mutex m_properties_mutex;
187
188 // The header timeout for the current file
189 struct timespec m_timeout{0, 0};
190
191 // The minimum timeout value requested by a client that we will honor
192 static struct timespec m_min_client_timeout;
193
194 // The default header timeout.
195 static struct timespec m_default_header_timeout;
196
197 // The per-file header timeout.
198 struct timespec m_header_timeout;
199
200 // The federation metadata timeout.
201 static struct timespec m_fed_timeout;
202
203 // An in-progress put operation.
204 //
205 // This shared pointer is also copied to the queue and kept
206 // by the curl worker thread. We will need to refer to the
207 // operation later to continue the write.
208 std::shared_ptr<XrdClHttp::CurlPutOp> m_put_op;
209
210 // A response handler for PUT operations that ensures multiple writes are serialized.
211 class PutResponseHandler : public XrdCl::ResponseHandler {
212 public:
213 PutResponseHandler(XrdCl::ResponseHandler *handler);
214
215 virtual void HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) override;
216
217 XrdCl::XRootDStatus QueueWrite(std::variant<std::pair<const void *, size_t>, XrdCl::Buffer> buffer, XrdCl::ResponseHandler *handler);
218
219 void SetOp(std::shared_ptr<XrdClHttp::CurlPutOp> op) {m_op = op;}
220
221 void WaitForCompletion();
222
223 private:
224 bool m_active{true};
225 bool m_initial{true};
226 std::shared_ptr<XrdClHttp::CurlPutOp> m_op;
227 XrdCl::ResponseHandler *m_active_handler;
228 std::condition_variable m_cv;
229 std::mutex m_mutex;
230 std::deque<std::tuple<std::variant<std::pair<const void *, size_t>, XrdCl::Buffer>, XrdCl::ResponseHandler*>> m_pending_writes;
231
232 // Start the next pending write operation.
233 // Returns false if the active handler was invoked due to an error.
234 bool ProcessQueue();
235 };
236
237 // The callback handler for the in-progress put operation.
238 // This handler wraps the user-provided handler to ensure
239 // that multiple writes are serialized.
240 std::atomic<PutResponseHandler *>m_put_handler{nullptr};
241
242 // Ultimate length of the in-progress PUT operation
243 off_t m_asize{-1};
244
245 // Handle a failure in the PUT code while there are no outstanding
246 // write requests
247 class PutDefaultHandler : public XrdCl::ResponseHandler {
248 public:
249 PutDefaultHandler(File &file) : m_logger(file.m_logger) {}
250
251 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
252
253 private:
254 XrdCl::Log *m_logger{nullptr};
255 };
256
257 // The default object for all put failures
258 std::shared_ptr<PutDefaultHandler> m_default_put_handler;
259
260 // An in-progress GET operation
261 //
262 // For the first read from the file, we will issue a GET for
263 // the entire rest of the file. As in-sequence reads are
264 // encountered, they will be fed from the prefetch operation instead
265 // of standalone reads.
266 std::shared_ptr<XrdClHttp::CurlReadOp> m_prefetch_op;
267
268 // Next offset for prefetching.
269 // Protected by m_prefetch_mutex
270 std::atomic<off_t> m_prefetch_offset{0};
271
272 // Prefetch callback handler class
273 //
274 // Objects form a linked list of pending prefetch handlers.
275 // Once the first entry in the list is completed, it will pass the prefetch
276 // operation to the subsequent entry.
277 class PrefetchResponseHandler : public XrdCl::ResponseHandler {
278 public:
279
280 // Constructor for the prefetch response handler.
281 // - `parent`: The parent file object for the prefetch.
282 // - `offset`: The offset within the file to start the prefetch.
283 // - `size`: The size of the prefetch operation.
284 // - `prefetch_offset`: A reference to the prefetch offset. As the prefetch
285 // operation progresses, this offset will be updated to reflect the new
286 // position in the file. Lifetime must exceed that of the response handler.
287 // - `buffer`: A pointer to the buffer to store the prefetch data.
288 // - `handler`: The response handler for the prefetch operation.
289 // - `lock`: A unique lock for the prefetch operation. If `lock` is the `nullptr`,
290 // then we assume this is called during the creation of the m_prefetch_op and we
291 // will assume that this is NOT a continuation of the existing operation. In that
292 // case, the lock will not be dropped during the constructor. A reference to the
293 // lock is not taken outside the constructor. The lock must be held when the
294 // constructor is called.
295 // - `timeout`: The timeout for the prefetch operation.
296 //
297 // The constructor can throw a std::runtime_exception if the handler would have
298 // continued an ongoing prefetch operation but it failed to submit it.
299 PrefetchResponseHandler(File &parent,
300 off_t offset, size_t size, std::atomic<off_t> *prefetch_offset, char *buffer, XrdCl::ResponseHandler *handler,
301 std::unique_lock<std::mutex> *lock, time_t timeout);
302
303 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
304
305 private:
306 // When the prefetch fails, we must resubmit our handler as a non-prefetching read.
307 void ResubmitOperation();
308
309 // The open file we are associated with
310 File &m_parent;
311
312 // A reference to the handler we are wrapping. Note we don't own the handler
313 // so this is not a unique_ptr.
314 XrdCl::ResponseHandler *m_handler;
315
316 // A reference to the next handle in the linked list.
317 PrefetchResponseHandler *m_next{nullptr};
318
319 // The buffer for this prefetch callback
320 char *m_buffer{nullptr};
321
322 // The size of the prefetch callback buffer
323 size_t m_size{0};
324
325 // The offset of the operation within the file.
326 off_t m_offset{0};
327
328 // A pointer to the prefetch offset. If the read is shorter than we had
329 // expected, we'll decrease the offset pointer to match the actual size.
330 std::atomic<off_t> *m_prefetch_offset{nullptr};
331
332
333 // The desired timeout for the operation.
334 time_t m_timeout{0};
335 };
336
337 // Last prefetch handler on the stack.
338 PrefetchResponseHandler *m_last_prefetch_handler{nullptr};
339
340 // Size of prefetch operation
341 off_t m_prefetch_size{-1};
342
343 // Offset of the next write operation;
344 std::atomic<off_t> m_put_offset{0};
345
346 // Handle a failure in the prefetch code while there is no outstanding
347 // read requests
348 //
349 // The status of the File's prefetching is kept in this class because the callback's
350 // lifetime is independent of the File and the callback needs to be able to disable
351 // prefetching.
352 class PrefetchDefaultHandler : public XrdCl::ResponseHandler {
353 public:
354 PrefetchDefaultHandler(File &file) : m_logger(file.m_logger), m_url(file.m_url) {}
355
356 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
357
358 // Disable prefetching for all future operations
359 void DisablePrefetch() {
360 auto enabled = m_prefetch_enabled.load(std::memory_order_relaxed);
361 if (enabled) {
362 std::unique_lock lock(m_prefetch_mutex);
363 m_prefetch_enabled.store(false, std::memory_order_relaxed);
364 }
365 }
366
367 // Determine if we are prefetching
368 bool IsPrefetching() const {
369 auto enabled = m_prefetch_enabled.load(std::memory_order_relaxed);
370 if (enabled) {
371 std::unique_lock lock(m_prefetch_mutex);
372 return m_prefetch_enabled.load(std::memory_order_relaxed);
373 }
374 return false;
375 }
376
377 XrdCl::Log *m_logger{nullptr};
378 std::string m_url;
379
380 // Mutex protecting the state of the in-progress GET operation
381 // and relevant callback handlers and state
382 mutable std::mutex m_prefetch_mutex;
383
384 // Whether prefetching is active
385 //
386 // If set to "false", then prefetch is disabled.
387 // If set to "true", then you must re-read the value with
388 // m_prefetch_mutex held to ensure is actually true and not
389 // a spurious reading.
390 mutable std::atomic<bool> m_prefetch_enabled{true};
391 };
392
393 // "Default" handler for prefetching
394 //
395 // When there is no outstanding read operation but the prefetch
396 // operation fails, this will be called
397 std::shared_ptr<PrefetchDefaultHandler> m_default_prefetch_handler;
398
399 // Pointer to the header callout function
400 std::atomic<XrdClHttp::HeaderCallout *> m_header_callout{nullptr};
401
402 // Class for setting up the required HTTP headers for S3 requests
403 class HeaderCallout : public XrdClHttp::HeaderCallout {
404 public:
405 HeaderCallout(File &fs) : m_parent(fs)
406 {}
407
408 virtual ~HeaderCallout() noexcept = default;
409
410 virtual std::shared_ptr<HeaderList> GetHeaders(const std::string &verb,
411 const std::string &url,
412 const HeaderList &headers) override;
413
414 private:
415 File &m_parent;
416 };
417
418 HeaderCallout m_default_header_callout{*this};
419
420 static std::atomic<uint64_t> m_prefetch_count; // Count of prefetch operations that have been initiated.
421 static std::atomic<uint64_t> m_prefetch_expired_count; // Count of prefetch operations that have expired due to unused data.
422 static std::atomic<uint64_t> m_prefetch_failed_count; // Count of prefetch operations that have failed due to errors.
423 static std::atomic<uint64_t> m_prefetch_reads_hit; // Count of read operations served from prefetch data.
424 static std::atomic<uint64_t> m_prefetch_reads_miss; // Count of read operations that were not served from prefetch data.
425 static std::atomic<uint64_t> m_prefetch_bytes_used; // Count of prefetch operations that have succeeded.
426};
427
428}
429
430#endif // XRDCLHTTP_CURLFILE_HH
static std::string ts()
timestamp output for logging messages
Definition XrdCephOss.cc:53
struct stat Stat
Definition XrdCks.cc:49
static void parent()
int Mode
XrdOucString File
virtual XrdCl::XRootDStatus Open(const std::string &url, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode mode, XrdCl::ResponseHandler *handler, time_t timeout) override
static struct timespec ParseHeaderTimeout(const std::string &header_value, XrdCl::Log *logger)
File(std::shared_ptr< XrdClHttp::HandlerQueue > queue, XrdCl::Log *log)
static const struct timespec & GetDefaultHeaderTimeout()
static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout)
virtual bool SetProperty(const std::string &name, const std::string &value) override
virtual bool IsOpen() const override
virtual XrdCl::XRootDStatus VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
void SetHeaderTimeout(const struct timespec &ts)
virtual XrdCl::XRootDStatus Fcntl(const XrdCl::Buffer &arg, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual ~File() noexcept
static std::string GetMonitoringJson()
virtual bool GetProperty(const std::string &name, std::string &value) const override
XrdCl::OpenFlags::Flags Flags() const
static const struct timespec & GetMinimumHeaderTimeout()
static void SetFederationMetadataTimeout(const struct timespec &ts)
static void SetDefaultHeaderTimeout(struct timespec &ts)
static struct timespec GetFederationMetadataTimeout()
static void SetMinimumHeaderTimeout(struct timespec &ts)
virtual XrdCl::XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
struct timespec GetHeaderTimeout(time_t oper_timeout) const
virtual XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler, time_t timeout) override
virtual ~HeaderCallout() noexcept=default
std::vector< std::pair< std::string, std::string > > HeaderList
An interface for file plug-ins.
Handle diagnostics.
Definition XrdClLog.hh:101
Handle an async response.
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
Flags
Open flags, may be or'd when appropriate.