48 m_default_put_handler(new PutDefaultHandler(*this))
51 virtual ~File() noexcept;
53 virtual
XrdCl::XRootDStatus
Open(const std::
string &url,
56 XrdCl::ResponseHandler *handler,
57 time_t timeout) override;
60 time_t timeout) override;
62 virtual
XrdCl::XRootDStatus
Stat(
bool force,
63 XrdCl::ResponseHandler *handler,
64 time_t timeout) override;
67 XrdCl::ResponseHandler *handler,
68 time_t timeout) override;
70 virtual
XrdCl::XRootDStatus
Read(uint64_t offset,
73 XrdCl::ResponseHandler *handler,
74 time_t timeout) override;
79 XrdCl::ResponseHandler *handler,
80 time_t timeout) override;
84 XrdCl::ResponseHandler *handler,
85 time_t timeout ) override;
87 virtual
XrdCl::XRootDStatus
Write(uint64_t offset,
90 XrdCl::ResponseHandler *handler,
91 time_t timeout) override;
93 virtual
XrdCl::XRootDStatus
Write(uint64_t offset,
94 XrdCl::Buffer &&buffer,
95 XrdCl::ResponseHandler *handler,
96 time_t timeout) override;
98 virtual
bool IsOpen() const override;
101 const std::
string &value ) override;
104 std::
string &value ) const override;
122 void SetHeaderTimeout(
const struct timespec &
ts) {m_header_timeout.tv_sec =
ts.tv_sec; m_header_timeout.tv_nsec =
ts.tv_nsec;}
149 std::tuple<XrdCl::XRootDStatus, bool> ReadPrefetch(uint64_t offset, uint64_t size,
void *buffer,
XrdCl::ResponseHandler *handler, time_t timeout,
bool isPgRead);
154 bool SendResponseInfo()
const;
165 const std::string GetCurrentURL()
const;
170 void CalculateCurrentURL(
const std::string &value)
const;
172 bool m_is_opened{
false};
173 std::atomic<bool> m_full_download{
false};
179 std::string m_last_url;
180 mutable std::string m_url_current;
181 std::shared_ptr<XrdClHttp::HandlerQueue> m_queue;
182 XrdCl::Log *m_logger{
nullptr};
183 std::unordered_map<std::string, std::string> m_properties;
186 mutable std::shared_mutex m_properties_mutex;
189 struct timespec m_timeout{0, 0};
192 static struct timespec m_min_client_timeout;
195 static struct timespec m_default_header_timeout;
198 struct timespec m_header_timeout;
201 static struct timespec m_fed_timeout;
208 std::shared_ptr<XrdClHttp::CurlPutOp> m_put_op;
211 class PutResponseHandler :
public XrdCl::ResponseHandler {
213 PutResponseHandler(XrdCl::ResponseHandler *handler);
215 virtual void HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw)
override;
217 XrdCl::XRootDStatus QueueWrite(std::variant<std::pair<const void *, size_t>, XrdCl::Buffer> buffer, XrdCl::ResponseHandler *handler);
219 void SetOp(std::shared_ptr<XrdClHttp::CurlPutOp> op) {m_op = op;}
221 void WaitForCompletion();
225 bool m_initial{
true};
226 std::shared_ptr<XrdClHttp::CurlPutOp> m_op;
227 XrdCl::ResponseHandler *m_active_handler;
228 std::condition_variable m_cv;
230 std::deque<std::tuple<std::variant<std::pair<const void *, size_t>, XrdCl::Buffer>, XrdCl::ResponseHandler*>> m_pending_writes;
240 std::atomic<PutResponseHandler *>m_put_handler{
nullptr};
247 class PutDefaultHandler :
public XrdCl::ResponseHandler {
249 PutDefaultHandler(
File &file) : m_logger(file.m_logger) {}
251 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
254 XrdCl::Log *m_logger{
nullptr};
258 std::shared_ptr<PutDefaultHandler> m_default_put_handler;
266 std::shared_ptr<XrdClHttp::CurlReadOp> m_prefetch_op;
270 std::atomic<off_t> m_prefetch_offset{0};
277 class PrefetchResponseHandler :
public XrdCl::ResponseHandler {
300 off_t offset,
size_t size, std::atomic<off_t> *prefetch_offset,
char *buffer, XrdCl::ResponseHandler *handler,
301 std::unique_lock<std::mutex> *lock, time_t timeout);
303 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
307 void ResubmitOperation();
314 XrdCl::ResponseHandler *m_handler;
317 PrefetchResponseHandler *m_next{
nullptr};
320 char *m_buffer{
nullptr};
330 std::atomic<off_t> *m_prefetch_offset{
nullptr};
338 PrefetchResponseHandler *m_last_prefetch_handler{
nullptr};
341 off_t m_prefetch_size{-1};
344 std::atomic<off_t> m_put_offset{0};
352 class PrefetchDefaultHandler :
public XrdCl::ResponseHandler {
354 PrefetchDefaultHandler(
File &file) : m_logger(file.m_logger), m_url(file.m_url) {}
356 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
359 void DisablePrefetch() {
360 auto enabled = m_prefetch_enabled.load(std::memory_order_relaxed);
362 std::unique_lock lock(m_prefetch_mutex);
363 m_prefetch_enabled.store(
false, std::memory_order_relaxed);
368 bool IsPrefetching()
const {
369 auto enabled = m_prefetch_enabled.load(std::memory_order_relaxed);
371 std::unique_lock lock(m_prefetch_mutex);
372 return m_prefetch_enabled.load(std::memory_order_relaxed);
377 XrdCl::Log *m_logger{
nullptr};
382 mutable std::mutex m_prefetch_mutex;
390 mutable std::atomic<bool> m_prefetch_enabled{
true};
397 std::shared_ptr<PrefetchDefaultHandler> m_default_prefetch_handler;
400 std::atomic<XrdClHttp::HeaderCallout *> m_header_callout{
nullptr};
403 class HeaderCallout :
public XrdClHttp::HeaderCallout {
405 HeaderCallout(
File &fs) : m_parent(fs)
410 virtual std::shared_ptr<
HeaderList> GetHeaders(const std::
string &verb,
411 const std::
string &url,
418 HeaderCallout m_default_header_callout{*
this};
420 static std::atomic<uint64_t> m_prefetch_count;
421 static std::atomic<uint64_t> m_prefetch_expired_count;
422 static std::atomic<uint64_t> m_prefetch_failed_count;
423 static std::atomic<uint64_t> m_prefetch_reads_hit;
424 static std::atomic<uint64_t> m_prefetch_reads_miss;
425 static std::atomic<uint64_t> m_prefetch_bytes_used;