XRootD
Loading...
Searching...
No Matches
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

Inheritance diagram for XrdPfc::Cache:
Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor.
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue.
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
bool blocksize_str2value (const char *from, const char *str, long long &val, long long min, long long max) const
void ClearPurgeProtectedSet ()
bool Config (const char *config_filename, const char *parameters, XrdOucEnv *env)
 Parse configuration file.
virtual int ConsiderCached (const char *url)
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached.
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
void DeRegisterPrefetchFile (File *)
long long DetermineFullFileSize (const std::string &cinfo_fname)
void ExecuteCommandUrl (const std::string &command_url)
void FileSyncDone (File *, bool high_debug)
int GetCacheControlXAttr (const std::string &cinfo_fname, std::string &res) const
int GetCacheControlXAttr (int fd, std::string &res) const
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
XrdXrootdGStreamGetGStream ()
XrdSysErrorGetLog () const
FileGetNextFileToPrefetch ()
XrdOssGetOss () const
PurgePinGetPurgePin () const
XrdSysTraceGetTrace () const
bool is_prefetch_enabled () const
bool IsFileActiveOrPurgeProtected (const std::string &) const
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
void Prefetch ()
bool prefetch_str2value (const char *from, const char *str, int &val, int min, int max) const
virtual int Prepare (const char *url, int oflags, mode_t mode)
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk.
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration.
ResourceMonitorRefResMon ()
void RegisterPrefetchFile (File *)
void ReleaseFile (File *, IO *)
void ReleaseRAM (char *buf, long long size)
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.
char * RequestRAM (long long size)
void ScheduleFileSync (File *f)
virtual int Stat (const char *url, struct stat &sbuff)
virtual int Unlink (const char *url)
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache.
void WriteCacheControlXAttr (int cinfo_fd, const char *path, const std::string &cc)
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
long long WritesSinceLastCall ()
Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
virtual ~XrdOucCache ()
 Destructor.
virtual int Fcntl (XrdOucCacheOp::Code opc, const std::string &args, std::string &resp)
virtual int Rename (const char *oldp, const char *newp)
virtual int Rmdir (const char *dirp)
virtual int Truncate (const char *path, off_t size)
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)

Static Public Member Functions

static const ConfigurationConf ()
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation.
static CacheGetInstance ()
 Singleton access.
static ResourceMonitorResMon ()
static const CacheTheOne ()
static bool VCheck (XrdVersionInfo &urVersion)
 Version check.

Static Public Attributes

static XrdSchedulerschedP = nullptr
Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file).
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write).
static const int optRW = 0x0004
 File is read/write (o/w read/only).
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache.

Additional Inherited Members

Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
enum  XeqCmd { xeqNoop = 0 }
Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc).
XrdOucCacheStats Statistics

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 168 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger * logger,
XrdOucEnv * env )

Constructor.

Definition at line 162 of file XrdPfc.cc.

162 :
163 XrdOucCache("pfc"),
164 m_env(env),
165 m_log(logger, "XrdPfc_"),
166 m_trace(new XrdSysTrace("XrdPfc", logger)),
167 m_traceID("Cache"),
168 m_oss(0),
169 m_gstream(0),
170 m_purge_pin(0),
171 m_prefetch_condVar(0),
172 m_prefetch_enabled(false),
173 m_RAM_used(0),
174 m_RAM_write_queue(0),
175 m_RAM_std_size(0),
176 m_isClient(false),
177 m_active_cond(0)
178{
179 // Default log level is Warning.
180 m_trace->What = 2;
181}
XrdOucCache(const char *ctype)

References XrdOucCache::XrdOucCache().

Referenced by CreateInstance(), GetInstance(), and TheOne().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block * b,
bool from_read )

Add downloaded block in write queue.

Definition at line 229 of file XrdPfc.cc.

230{
231 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
232
233 {
234 XrdSysMutexHelper lock(&m_RAM_mutex);
235 m_RAM_write_queue += b->get_size();
236 }
237
238 m_writeQ.condVar.Lock();
239 if (fromRead)
240 m_writeQ.queue.push_back(b);
241 else
242 m_writeQ.queue.push_front(b);
243 m_writeQ.size++;
244 m_writeQ.condVar.Signal();
245 m_writeQ.condVar.UnLock();
246}
#define TRACE(act, x)
Definition XrdTrace.hh:63
int get_size() const
long long m_offset
File * get_file() const
const std::string & GetLocalPath() const

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

Here is the call graph for this function:

◆ Attach()

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO * io,
int Options = 0 )
virtual

Implements XrdOucCache.

Definition at line 183 of file XrdPfc.cc.

184{
185 const char* tpfx = "Attach() ";
186
187 if (Options & XrdOucCache::optRW)
188 {
189 TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
190 }
191 else if (Cache::GetInstance().Decide(io))
192 {
193 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
194
195 IO *cio;
196
197 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
198 {
199 cio = new IOFileBlock(io, *this);
200 }
201 else
202 {
203 IOFile *iof = new IOFile(io, *this);
204
205 if ( ! iof->HasFile())
206 {
207 delete iof;
208 // TODO - redirect instead. But this is kind of an awkward place for it.
209 // errno is set during IOFile construction.
210 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
211 return io;
212 }
213
214 cio = iof;
215 }
216
217 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
218 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
219
220 return cio;
221 }
222 else
223 {
224 TRACE(Info, tpfx << "decision decline " << io->Path());
225 }
226 return io;
227}
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
bool Debug
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
static const int optRW
File is read/write (o/w read/only).
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:225
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:136
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:141
bool HasFile() const
Check if File was opened successfully.

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCache::optRW, XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

Here is the call graph for this function:

◆ blocksize_str2value()

bool Cache::blocksize_str2value ( const char * from,
const char * str,
long long & val,
long long min,
long long max ) const

Definition at line 106 of file XrdPfcConfiguration.cc.

108{
109 if (XrdOuca2x::a2sz(m_log, "Error parsing block-size", str, &val, min, max))
110 return false;
111
112 if (val & 0xFFF) {
113 val &= ~0x0FFF;
114 val += 0x1000;
115 m_log.Emsg(from, "blocksize must be a multiple of 4 kB. Rounded up.");
116 }
117
118 return true;
119}
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257

References XrdOuca2x::a2sz().

Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 694 of file XrdPfc.cc.

695{
696 XrdSysCondVarHelper lock(&m_active_cond);
697 m_purge_delay_set.clear();
698}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 138 of file XrdPfc.cc.

138{ return m_instance->RefConfiguration(); }

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char * config_filename,
const char * parameters,
XrdOucEnv * env )

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
envoptional environment to use for configuration
Returns
parse status

Definition at line 434 of file XrdPfcConfiguration.cc.

435{
436 // Indicate whether or not we are a client instance
437 const char *theINS = getenv("XRDINSTANCE");
438 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
439
440 // Tell everyone else we are a caching proxy
441 XrdOucEnv::Export("XRDPFC", 1);
442
443 XrdOucEnv emptyEnv;
444 XrdOucEnv *myEnv = env ? env : &emptyEnv;
445
446 XrdOucStream Config(&m_log, theINS, myEnv, "=====> ");
447
448 if (! config_filename || ! *config_filename)
449 {
450 TRACE(Error, "Config() configuration file not specified.");
451 return false;
452 }
453
454 int fd;
455 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
456 {
457 TRACE( Error, "Config() can't open configuration file " << config_filename);
458 return false;
459 }
460
461 Config.Attach(fd);
462 static const char *cvec[] = { "*** pfc plugin config:", 0 };
463 Config.Capture(cvec);
464
465 // Obtain OFS configurator for OSS plugin.
466 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
467 &XrdVERSIONINFOVAR(XrdOucGetCache));
468 if (! ofsCfg) return false;
469
470 TmpConfiguration tmpc;
471
472 Configuration &CFG = m_configuration;
473
474 // Adjust default parameters for client/serverless caching
475 if (m_isClient)
476 {
477 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
478 m_configuration.m_wqueue_blocks = 8;
479 m_configuration.m_wqueue_threads = 1;
480 }
481
482 // If network checksum processing is the default, indicate so.
483 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
484
485 // Actual parsing of the config file.
486 bool retval = true, aOK = true;
487 char *var;
488 while ((var = Config.GetMyFirstWord()))
489 {
490 if (! strcmp(var,"pfc.osslib"))
491 {
492 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
493 }
494 else if (! strcmp(var,"pfc.cschk"))
495 {
496 retval = xcschk(Config);
497 }
498 else if (! strcmp(var,"pfc.decisionlib"))
499 {
500 retval = xdlib(Config);
501 }
502 else if (! strcmp(var,"pfc.purgelib"))
503 {
504 retval = xplib(Config);
505 }
506 else if (! strcmp(var,"pfc.trace"))
507 {
508 retval = xtrace(Config);
509 }
510 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
511 {
512 m_configuration.m_allow_xrdpfc_command = true;
513 }
514 else if (! strncmp(var,"pfc.", 4))
515 {
516 retval = ConfigParameters(std::string(var+4), Config, tmpc);
517 }
518
519 if ( ! retval)
520 {
521 TRACE(Error, "Config() error in parsing");
522 aOK = false;
523 }
524 }
525
526 Config.Close();
527
528 // Load OSS plugin.
529 auto orig_runmode = myEnv->Get("oss.runmode");
530 myEnv->Put("oss.runmode", "pfc");
531 if (m_configuration.is_cschk_cache())
532 {
533 char csi_conf[128];
534 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
535 {
536 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
537 } else {
538 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
539 return false;
540 }
541 }
542 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, myEnv))
543 {
544 ofsCfg->Plugin(m_oss);
545 }
546 else
547 {
548 TRACE(Error, "Config() Unable to create an OSS object");
549 return false;
550 }
551 if (orig_runmode) myEnv->Put("oss.runmode", orig_runmode);
552 else myEnv->Put("oss.runmode", "");
553
554 // Test if OSS is operational, determine optional features.
555 aOK &= test_oss_basics_and_features();
556
557 // sets default value for disk usage
558 XrdOssVSInfo sP;
559 {
560 if (m_configuration.m_meta_space != m_configuration.m_data_space &&
561 m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
562 {
563 m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
564 return false;
565 }
566 if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
567 {
568 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
569 m_configuration.m_meta_space.c_str());
570 return false;
571 }
572 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
573 {
574 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
575 return false;
576 }
577 if (sP.Total < 10ll << 20)
578 {
579 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
580 m_configuration.m_data_space.c_str());
581 return false;
582 }
583
584 m_configuration.m_diskTotalSpace = sP.Total;
585
586 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
587 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
588 {
589 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
590 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
591 aOK = false;
592 }
593 }
594 else aOK = false;
595
596 if ( ! tmpc.m_fileUsageMax.empty())
597 {
598 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
599 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
600 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
601 {
602 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
603 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
604 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
605 {
606 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
607 aOK = false;
608 }
609
610
611 if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
612 {
613 m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
614 aOK = false;
615 }
616 }
617 else aOK = false;
618 }
619 }
620
621 // sets flush frequency
622 if ( ! tmpc.m_flushRaw.empty())
623 {
624 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
625 {
626 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
627 &m_configuration.m_flushCnt,
628 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
629 {
630 return false;
631 }
632 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
633 }
634 else
635 {
636 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
637 &m_configuration.m_flushCnt, 100, 100000))
638 {
639 return false;
640 }
641 }
642 }
643
644 // get number of available RAM blocks after process configuration
645 if (m_configuration.m_RamAbsAvailable == 0)
646 {
647 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
648 char buff[1024];
649 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
650 m_log.Say("Config info: ", buff);
651 }
652 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
653 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
654
655 // Set tracing to debug if this is set in environment
656 char* cenv = getenv("XRDDEBUG");
657 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
658
659 if (aOK)
660 {
661// 000 001 010
662 const char *csc[] = {"off", "cache nonet", "nocache net notls",
663// 011
664 "cache net notls",
665// 100 101 110
666 "off", "cache nonet", "nocache net tls",
667// 111
668 "cache net tls"};
669 char uvk[32];
670 if (m_configuration.m_cs_UVKeep < 0)
671 strcpy(uvk, "lru");
672 else
673 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
674 float ram_gb = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
675
676 char urlcgi_blks[64] = "ignore", urlcgi_npref[32] = "ignore";
678 snprintf(urlcgi_blks, sizeof(urlcgi_blks), "%lldk %lldk",
679 CFG.m_cgi_min_bufferSize >> 10, CFG.m_cgi_max_bufferSize >> 10);
681 snprintf(urlcgi_npref, sizeof(urlcgi_npref), "%d %d",
683
684 char buff[8192];
685 int loff = 0;
686 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
687 " pfc.cschk %s uvkeep %s\n"
688 " pfc.blocksize %lldk\n"
689 " pfc.prefetch %d\n"
690 " pfc.urlcgi blocksize %s prefetch %s\n"
691 " pfc.ram %.fg\n"
692 " pfc.writequeue %d %d\n"
693 " # Total available disk: %lld\n"
694 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
695 " pfc.spaces %s %s\n"
696 " pfc.trace %d\n"
697 " pfc.flush %lld\n"
698 " pfc.acchistorysize %d\n"
699 " pfc.onlyIfCachedMinBytes %lld\n"
700 " pfc.onlyIfCachedMinFrac %.2f\n",
701 config_filename,
702 csc[int(m_configuration.m_cs_Chk)], uvk,
703 m_configuration.m_bufferSize >> 10,
704 m_configuration.m_prefetch_max_blocks,
705 urlcgi_blks, urlcgi_npref,
706 ram_gb,
707 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
708 sP.Total,
709 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
710 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
711 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
712 m_configuration.m_data_space.c_str(),
713 m_configuration.m_meta_space.c_str(),
714 m_trace->What,
715 m_configuration.m_flushCnt,
716 m_configuration.m_accHistorySize,
717 m_configuration.m_onlyIfCachedMinSize,
718 m_configuration.m_onlyIfCachedMinFrac);
719
720 if (m_configuration.is_dir_stat_reporting_on())
721 {
722 loff += snprintf(buff + loff, sizeof(buff) - loff,
723 " pfc.dirstats interval %d maxdepth %d (internal: size_of_dirlist %d, size_of_globlist %d)\n",
724 m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsStoreDepth,
725 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
726 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
727 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
728 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
729 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
730 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
731 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
732 }
733
734 if (m_configuration.m_hdfsmode)
735 {
736 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
737 }
738
739 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.writethrough %s\n", m_configuration.m_write_through ? "on" : "off");
740
741 if (m_configuration.m_username.empty())
742 {
743 char unameBuff[256];
744 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
745 m_configuration.m_username = unameBuff;
746 }
747 else
748 {
749 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
750 }
751
752 if (m_configuration.m_httpcc)
753 {
754 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.httpcc on\n");
755 }
756 if (m_configuration.m_qfsredir)
757 {
758 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.qfsredir on\n");
759 }
760
761 m_log.Say(buff);
762
763 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
764 }
765
766 // Derived settings
767 m_prefetch_enabled = CFG.m_prefetch_max_blocks > 0 || CFG.m_cgi_max_prefetch_max_blocks > 0;
769
770 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
771
772 m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
773
774 // Create the ResourceMonitor and get it ready for starting the main thread function.
775 if (aOK)
776 {
777 m_res_mon = new ResourceMonitor(*m_oss);
778 m_res_mon->init_before_main();
779 }
780
781 m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
782
783 if (ofsCfg) delete ofsCfg;
784
785 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
786 // Building of xrdpfc_print fails when this is enabled.
787#ifdef XRDPFC_CKSUM_TEST
788 {
789 int xxx = m_configuration.m_cs_Chk;
790
791 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
792 {
793 Info::TestCksumStuff();
794 }
795
796 m_configuration.m_cs_Chk = xxx;
797 }
798#endif
799
800 return aOK;
801}
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:80
#define open
Definition XrdPosix.hh:76
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition XrdOssVS.hh:90
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
static size_t s_maxNumAccess
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:118
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:103
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:119
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition XrdPfc.hh:122
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition XrdPfc.hh:115
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:117
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:120
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition XrdPfc.hh:121
std::string m_diskUsageLWM
Definition XrdPfc.hh:147
std::string m_diskUsageHWM
Definition XrdPfc.hh:148
std::string m_fileUsageBaseline
Definition XrdPfc.hh:149
std::string m_fileUsageNominal
Definition XrdPfc.hh:150
std::string m_flushRaw
Definition XrdPfc.hh:152
std::string m_fileUsageMax
Definition XrdPfc.hh:151

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), Config(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, Error, XrdOucEnv::Export(), XrdOucEnv::Get(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_cgi_blocksize_allowed, XrdPfc::Configuration::m_cgi_max_bufferSize, XrdPfc::Configuration::m_cgi_max_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_min_bufferSize, XrdPfc::Configuration::m_cgi_min_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_prefetch_allowed, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_prefetch_max_blocks, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), and XrdOucGetCache().

Referenced by Config(), and XrdOucGetCache().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char * curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1064 of file XrdPfc.cc.

1065{
1066 static const char* tpfx = "ConsiderCached ";
1067
1068 TRACE(Debug, tpfx << curl);
1069
1070 XrdCl::URL url(curl);
1071 std::string f_name = url.GetPath();
1072
1073 File *file = nullptr;
1074 {
1075 XrdSysCondVarHelper lock(&m_active_cond);
1076 auto it = m_active.find(f_name);
1077 if (it != m_active.end()) {
1078 file = it->second;
1079 // If the file-open is in progress, `file` is a nullptr
1080 // so we cannot increase the reference count. For now,
1081 // simply treat it as if the file open doesn't exist instead
1082 // of trying to wait and see if it succeeds.
1083 if (file) {
1084 inc_ref_cnt(file, false, false);
1085 }
1086 }
1087 }
1088 if (file) {
1089 struct stat sbuff;
1090 int res = file->Fstat(sbuff);
1091 dec_ref_cnt(file, false);
1092 if (res)
1093 return res;
1094 // DecideIfConsideredCached() already called in File::Fstat().
1095 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1096 }
1097
1098 struct stat sbuff;
1099 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1100 if (res != XrdOssOK) {
1101 TRACE(Debug, tpfx << curl << " -> " << res);
1102 return res;
1103 }
1104 if (S_ISDIR(sbuff.st_mode))
1105 {
1106 TRACE(Debug, tpfx << curl << " -> EISDIR");
1107 return -EISDIR;
1108 }
1109
1110 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1111 if (file_size < 0) {
1112 TRACE(Debug, tpfx << curl << " -> " << file_size);
1113 return (int) file_size;
1114 }
1115 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1116
1117 return is_cached ? 0 : -EREMOTE;
1118}
#define XrdOssOK
Definition XrdOss.hh:54
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString File
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:949
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:1031
int Fstat(struct stat &sbuff)
static const char * s_infoExtension

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger * logger,
XrdOucEnv * env )
static

Singleton creation.

Definition at line 129 of file XrdPfc.cc.

130{
131 assert (m_instance == 0);
132 m_instance = new Cache(logger, env);
133 return *m_instance;
134}
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:162

References Cache().

Referenced by XrdOucGetCache().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO * io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 141 of file XrdPfc.cc.

142{
143 if (! m_decisionpoints.empty())
144 {
145 XrdCl::URL url(io->Path());
146 std::string filename = url.GetPath();
147 std::vector<Decision*>::const_iterator it;
148 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
149 {
150 XrdPfc::Decision *d = *it;
151 if (! d) continue;
152 if (! d->Decide(filename, *m_oss))
153 {
154 return false;
155 }
156 }
157 }
158
159 return true;
160}
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long file_size,
long long bytes_on_disk )

Definition at line 1031 of file XrdPfc.cc.

1032{
1033 if (file_size == 0 || bytes_on_disk >= file_size)
1034 return true;
1035
1036 double frac_on_disk = (double) bytes_on_disk / file_size;
1037
1038 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
1039 {
1040 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1041 return true;
1042 }
1043 else
1044 {
1045 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
1046 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1047 return true;
1048 }
1049 return false;
1050}

Referenced by ConsiderCached(), and Stat().

Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File * file)

Definition at line 720 of file XrdPfc.cc.

721{
722 // Can be called with other locks held.
723
724 if ( ! m_prefetch_enabled)
725 {
726 return;
727 }
728
729 m_prefetch_condVar.Lock();
730 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
731 {
732 if (*it == file)
733 {
734 m_prefetchList.erase(it);
735 break;
736 }
737 }
738 m_prefetch_condVar.UnLock();
739}

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string & cinfo_fname)

Definition at line 949 of file XrdPfc.cc.

950{
951 if (m_metaXattr) {
952 char pfn[4096];
953 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
954 long long fsize = -1ll;
955 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
956 if (res == sizeof(long long))
957 {
958 return fsize;
959 }
960 else
961 {
962 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
963 }
964 }
965
966 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
967 XrdOucEnv env;
968 long long ret;
969 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
970 if (res < 0) {
971 ret = res;
972 } else {
973 Info info(m_trace, 0);
974 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
975 ret = -EBADF;
976 } else {
977 ret = info.GetFileSize();
978 }
979 infoFile->Close();
980 }
981 delete infoFile;
982 return ret;
983}
XrdSysXAttr * XrdSysXAttrActive
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:228
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdPfc::Info::GetFileSize(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string & command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52{
53 static const char *top_epfx = "ExecuteCommandUrl ";
54
55 SplitParser cp(command_url, "/");
56
57 std::string token = cp.get_token();
58
59 if (token != "xrdpfc_command")
60 {
61 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62 return;
63 }
64
65 // Get the command
66 token = cp.get_token_as_string();
67
68 auto get_opt = [](SplitParser &sp) -> char {
69 const char *t = sp.get_token();
70 if (t)
71 return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72 else
73 return -1;
74 };
75
76 //================================================================
77 // create_file
78 //================================================================
79
80 if (token == "create_file")
81 {
82 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83 static const char* usage =
84 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85 " Creates a cache file with given parameters. Data in file is random.\n"
86 " Useful for cache purge testing.\n"
87 "Notes:\n"
88 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90 " . -t and -d can be given multiple times to record several accesses.\n"
91 " . Negative arguments given to -t are interpreted as relative to now.\n";
92
93 const Configuration &conf = m_configuration;
94
95 token = cp.get_token_as_string();
96 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97 if (token.empty()) {
98 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99 return;
100 }
101 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102 if ( ! cp.has_reminder()) {
103 TRACE(Error, err_prefix << "Path section must not be empty.");
104 return;
105 }
106
107 long long file_size = ONE_GB;
108 long long block_size = conf.m_bufferSize;
109 int access_time [MAX_ACCESSES];
110 int access_duration[MAX_ACCESSES];
111 int at_count = 0, ad_count = 0;
112
113 time_t time_now = time(0);
114
115 SplitParser ap(token, " ");
116 char theOpt;
117
118 while ((theOpt = get_opt(ap)) != (char) -1)
119 {
120 switch (theOpt)
121 {
122 case 'h': {
123 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124 return;
125 }
126 case 's': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128 &file_size, 0ll, 32 * ONE_GB))
129 return;
130 break;
131 }
132 case 'b': {
133 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134 &block_size, 0ll, 64 * ONE_MB))
135 return;
136 break;
137 }
138 case 't': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140 &access_time[at_count++], INT_MIN, INT_MAX))
141 return;
142 break;
143 }
144 case 'd': {
145 if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146 &access_duration[ad_count++], 0, 24 * 3600))
147 return;
148 break;
149 }
150 default: {
151 TRACE(Error, err_prefix << "Unhandled command argument.");
152 return;
153 }
154 }
155 }
156
157 if (at_count < 1) access_time [at_count++] = time_now - 10;
158 if (ad_count < 1) access_duration[ad_count++] = 10;
159
160 if (at_count != ad_count)
161 {
162 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163 return;
164 }
165
166 std::string file_path (cp.get_reminder_with_delim());
167 std::string cinfo_path(file_path + Info::s_infoExtension);
168
169 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170
171 // Check if cinfo exists ... bail out if it does.
172 {
173 struct stat infoStat;
174 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175 {
176 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177 return;
178 }
179 }
180
181 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182
183 {
184 const char *myUser = conf.m_username.c_str();
185 XrdOucEnv myEnv;
186
187 // Create the data file.
188
189 char size_str[32]; sprintf(size_str, "%lld", file_size);
190 myEnv.Put("oss.asize", size_str);
191 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192 int cret;
193 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194 {
195 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196 return;
197 }
198
199 XrdOssDF *myFile = GetOss()->newFile(myUser);
200 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201 {
202 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203 delete myFile;
204 return;
205 }
206
207 // Create the info file.
208
209 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212 {
213 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214 myFile->Close(); delete myFile;
215 return;
216 }
217
218 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220 {
221 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222 delete myInfoFile;
223 myFile->Close(); delete myFile;
224 return;
225 }
226
227 // Allocate space for the data file.
228
229 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230 {
231 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232 }
233
234 // Fill up cinfo.
235
236 Info myInfo(m_trace, false);
237 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238 myInfo.SetAllBitsSynced();
239
240 for (int i = 0; i < at_count; ++i)
241 {
242 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243
244 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245 }
246
247 myInfo.Write(myInfoFile, cinfo_path.c_str());
248
249 // Fake last modified time to the last access_time
250 {
251 time_t last_detach;
252 myInfo.GetLatestDetachTime(last_detach);
253 struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254
255 futimens(myInfoFile->getFD(), acc_mod_time);
256 }
257
258 myInfoFile->Close(); delete myInfoFile;
259 myFile->Close(); delete myFile;
260
261 struct stat dstat;
262 GetOss()->Stat(file_path.c_str(), &dstat);
263 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264 << "st_blocks=" << dstat.st_blocks);
265
266 {
267 XrdSysCondVarHelper lock(&m_writeQ.condVar);
268
269 m_writeQ.writes_between_purges += file_size;
270 }
271 {
272 int token = m_res_mon->register_file_open(file_path, time_now, false);
273 XrdPfc::Stats stats;
274 stats.m_BytesWritten = file_size;
275 stats.m_StBlocksAdded = dstat.st_blocks;
276 m_res_mon->register_file_update_stats(token, stats);
277 m_res_mon->register_file_close(token, time(0), stats);
278 }
279 }
280 }
281
282 //================================================================
283 // remove_file
284 //================================================================
285
286 else if (token == "remove_file")
287 {
288 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289 static const char* usage =
290 "Usage: remove_file/ [-h] /<path>\n"
291 " Removes given file from the cache unless it is currently open.\n"
292 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293 "Notes:\n"
294 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295
296 token = cp.get_token_as_string();
297 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298 if (token.empty()) {
299 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300 return;
301 }
302 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303 if ( ! cp.has_reminder()) {
304 TRACE(Error, err_prefix << "Path section must not be empty.");
305 return;
306 }
307
308 SplitParser ap(token, " ");
309 char theOpt;
310
311 while ((theOpt = get_opt(ap)) != (char) -1)
312 {
313 switch (theOpt)
314 {
315 case 'h': {
316 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317 return;
318 }
319 default: {
320 TRACE(Error, err_prefix << "Unhandled command argument.");
321 return;
322 }
323 }
324 }
325
326 std::string f_name(cp.get_reminder_with_delim());
327
328 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329
330 int ret = UnlinkFile(f_name, true);
331
332 TRACE(Info, err_prefix << "returned with status " << ret);
333 }
334
335 //================================================================
336 // unknown command
337 //================================================================
338
339 else
340 {
341 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342 }
343}
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XRDOSS_mkpath
Definition XrdOss.hh:526
#define ERRNO_AND_ERRSTR(err_code)
bool Create
virtual int getFD()
Definition XrdOss.hh:486
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1264
XrdOss * GetOss() const
Definition XrdPfc.hh:290
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesWritten
number of bytes written to disk
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:91
long long m_bufferSize
cache block size, default 128 kB
Definition XrdPfc.hh:110
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:92
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:90

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::Info::s_infoExtension, XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), Stat, XrdOss::Stat(), stat, TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File * f,
bool high_debug )

Definition at line 551 of file XrdPfc.cc.

552{
553 dec_ref_cnt(f, high_debug);
554}

◆ GetCacheControlXAttr() [1/2]

int Cache::GetCacheControlXAttr ( const std::string & cinfo_fname,
std::string & res ) const

Definition at line 989 of file XrdPfc.cc.

990{
991 if (m_metaXattr) {
992
993 char pfn[4096];
994 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
995
996 char cc[512];
997 int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1);
998 if (res > 0)
999 {
1000 std::string tmp(cc, res);
1001 ival = tmp;
1002 }
1003 return res;
1004 }
1005 return 0;
1006}

References XrdSysXAttrActive.

◆ GetCacheControlXAttr() [2/2]

int Cache::GetCacheControlXAttr ( int fd,
std::string & res ) const

Definition at line 1012 of file XrdPfc.cc.

1013{
1014 if (m_metaXattr) {
1015 char cc[512];
1016 int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd);
1017 if (res > 0)
1018 {
1019 ival = std::string(cc, res);
1020 return res;
1021 }
1022 }
1023 return 0;
1024}

References XrdSysXAttrActive.

◆ GetFile()

File * Cache::GetFile ( const std::string & path,
IO * io,
long long off = 0,
long long filesize = 0 )

Definition at line 397 of file XrdPfc.cc.

398{
399 // Called from virtual IOFile constructor.
400
401 TRACE(Debug, "GetFile " << path << ", io " << io);
402
403 ActiveMap_i it;
404
405 {
406 XrdSysCondVarHelper lock(&m_active_cond);
407
408 while (true)
409 {
410 it = m_active.find(path);
411
412 // File is not open or being opened. Mark it as being opened and
413 // proceed to opening it outside of while loop.
414 if (it == m_active.end())
415 {
416 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
417 break;
418 }
419
420 if (it->second != 0)
421 {
422 it->second->AddIO(io);
423 inc_ref_cnt(it->second, false, true);
424
425 return it->second;
426 }
427 else
428 {
429 // Wait for some change in m_active, then recheck.
430 m_active_cond.Wait();
431 }
432 }
433 }
434
435 // This is always true, now that IOFileBlock is unsupported.
436
437 if (filesize == 0)
438 {
439 struct stat st;
440 int res = io->Fstat(st);
441 if (res < 0) {
442 errno = res;
443 TRACE(Error, "GetFile, could not get valid stat");
444 } else if (res > 0) {
445 errno = ENOTSUP;
446 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
447 } else {
448 filesize = st.st_size;
449 }
450 }
451
452 File *file = 0;
453
454 if (filesize >= 0)
455 {
456 file = File::FileOpen(path, off, filesize, io->GetInput());
457 }
458
459 {
460 XrdSysCondVarHelper lock(&m_active_cond);
461
462 if (file)
463 {
464 inc_ref_cnt(file, false, true);
465 it->second = file;
466
467 file->AddIO(io);
468 }
469 else
470 {
471 m_active.erase(it);
472 }
473
474 m_active_cond.Broadcast();
475 }
476
477 return file;
478}
virtual int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31

References XrdPfc::File::AddIO(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), XrdPfc::IO::GetInput(), stat, and TRACE.

Referenced by XrdPfc::IOFile::IOFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream * XrdPfc::Cache::GetGStream ( )
inline

Definition at line 308 of file XrdPfc.hh.

308{ return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 136 of file XrdPfc.cc.

136{ return *m_instance; }

References Cache().

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), Prepare(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetLog()

XrdSysError * XrdPfc::Cache::GetLog ( ) const
inline

Definition at line 304 of file XrdPfc.hh.

304{ return &m_log; }

Referenced by XrdPfc::File::GetLog().

Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 742 of file XrdPfc.cc.

743{
744 m_prefetch_condVar.Lock();
745 while (m_prefetchList.empty())
746 {
747 m_prefetch_condVar.Wait();
748 }
749
750 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
751
752 size_t l = m_prefetchList.size();
753 int idx = rand() % l;
754 File* f = m_prefetchList[idx];
755
756 m_prefetch_condVar.UnLock();
757 return f;
758}

Referenced by Prefetch().

Here is the caller graph for this function:

◆ GetOss()

XrdOss * XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 290 of file XrdPfc.hh.

290{ return m_oss; }

Referenced by ExecuteCommandUrl().

Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin * XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 294 of file XrdPfc.hh.

294{ return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace * XrdPfc::Cache::GetTrace ( ) const
inline

Definition at line 305 of file XrdPfc.hh.

305{ return m_trace; }

Referenced by XrdPfc::File::GetTrace().

Here is the caller graph for this function:

◆ is_prefetch_enabled()

bool XrdPfc::Cache::is_prefetch_enabled ( ) const
inline

Definition at line 317 of file XrdPfc.hh.

317{ return m_prefetch_enabled; }

Referenced by XrdOucGetCache().

Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string & path) const

Definition at line 686 of file XrdPfc.cc.

687{
688 XrdSysCondVarHelper lock(&m_active_cond);
689
690 return m_active.find(path) != m_active.end() ||
691 m_purge_delay_set.find(path) != m_purge_delay_set.end();
692}

◆ LocalFilePath()

int Cache::LocalFilePath ( const char * curl,
char * buff = 0,
int blen = 0,
LFP_Reason why = ForAccess,
bool forall = false )
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 803 of file XrdPfc.cc.

805{
806 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
807 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
808 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
809
810 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
811
812 if (buff && blen > 0) buff[0] = 0;
813
814 XrdCl::URL url(curl);
815 std::string f_name = url.GetPath();
816 std::string i_name = f_name + Info::s_infoExtension;
817
818 if (why == ForPath)
819 {
820 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
821 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
822 return ret;
823 }
824
825 {
826 XrdSysCondVarHelper lock(&m_active_cond);
827 m_purge_delay_set.insert(f_name);
828 }
829
830 struct stat sbuff, sbuff2;
831 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
832 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
833 {
834 if (S_ISDIR(sbuff.st_mode))
835 {
836 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
837 return -EISDIR;
838 }
839 else
840 {
841 bool read_ok = false;
842 bool is_complete = false;
843
844 // Lock and check if the file is active. If NOT, keep the lock
845 // and add dummy access after successful reading of info file.
846 // If it IS active, just release the lock, this ongoing access will
847 // assure the file continues to exist.
848
849 // XXXX How can I just loop over the cinfo file when active?
850 // Can I not get is_complete from the existing file?
851 // Do I still want to inject access record?
852 // Oh, it writes only if not active .... still let's try to use existing File.
853
854 m_active_cond.Lock();
855
856 bool is_active = m_active.find(f_name) != m_active.end();
857
858 if (is_active) m_active_cond.UnLock();
859
860 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
861 XrdOucEnv myEnv;
862 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
863 if (res >= 0)
864 {
865 Info info(m_trace, 0);
866 if (info.Read(infoFile, i_name.c_str()))
867 {
868 read_ok = true;
869
870 is_complete = info.IsComplete();
871
872 // Add full-size access if reason is for access.
873 if ( ! is_active && is_complete && why == ForAccess)
874 {
875 info.WriteIOStatSingle(info.GetFileSize());
876 info.Write(infoFile, i_name.c_str());
877 }
878 }
879 infoFile->Close();
880 }
881 delete infoFile;
882
883 if ( ! is_active) m_active_cond.UnLock();
884
885 if (read_ok)
886 {
887 if ((is_complete || why == ForInfo) && buff != 0)
888 {
889 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
890 if (res2 < 0)
891 return res2;
892
893 // Normally, files are owned by us but when direct cache access
894 // is wanted and possible, make sure the file is world readable.
895 if (why == ForAccess)
896 {mode_t mode = (forall ? worldReadable : groupReadable);
897 if (((sbuff.st_mode & worldReadable) != mode)
898 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
899 {is_complete = false;
900 *buff = 0;
901 }
902 }
903 }
904
905 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
906 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
907
908 return is_complete ? 0 : -EREMOTE;
909 }
910 }
911 }
912
913 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
914 return -ENOENT;
915}

References XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, TRACE, XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 761 of file XrdPfc.cc.

762{
763 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
764
765 while (true)
766 {
767 m_RAM_mutex.Lock();
768 bool doPrefetch = (m_RAM_used < limit_RAM);
769 m_RAM_mutex.UnLock();
770
771 if (doPrefetch)
772 {
774 f->Prefetch();
775 }
776 else
777 {
779 }
780 }
781}
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:742
static void Wait(int milliseconds)

References GetNextFileToPrefetch(), XrdPfc::File::Prefetch(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ prefetch_str2value()

bool Cache::prefetch_str2value ( const char * from,
const char * str,
int & val,
int min,
int max ) const

Definition at line 121 of file XrdPfcConfiguration.cc.

123{
124 if (XrdOuca2x::a2i(m_log, "Error parsing prefetch block count", str, &val, min, max))
125 return false;
126
127 return true;
128}

References XrdOuca2x::a2i().

Here is the call graph for this function:

◆ Prepare()

int Cache::Prepare ( const char * curl,
int oflags,
mode_t mode )
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1130 of file XrdPfc.cc.

1131{
1132 XrdCl::URL url(curl);
1133 std::string f_name = url.GetPath();
1134 std::string i_name = f_name + Info::s_infoExtension;
1135
1136 // Do not allow write access.
1137 if ((oflags & O_ACCMODE) != O_RDONLY)
1138 {
1139 if (Cache::GetInstance().RefConfiguration().m_write_through)
1140 {
1141 return 0;
1142 }
1143 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1144 return -EROFS;
1145 }
1146
1147 // Intercept xrdpfc_command requests.
1148 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1149 {
1150 // Schedule a job to process command request.
1151 {
1152 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1153
1154 schedP->Schedule(ce);
1155 }
1156
1157 return -EAGAIN;
1158 }
1159
1160 {
1161 XrdSysCondVarHelper lock(&m_active_cond);
1162 m_purge_delay_set.insert(f_name);
1163 }
1164
1165 struct stat sbuff;
1166 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1167 {
1168
1169 if (m_configuration.m_httpcc && !is_http_cache_valid(f_name, i_name, url))
1170 {
1171 TRACE(Info, "Http cache not valid " << f_name);
1172 UnlinkFile(f_name, false);
1173 return 0;
1174 }
1175
1176 TRACE(Dump, "Prepare defer open " << f_name);
1177 return 1;
1178 }
1179 else
1180 {
1181 return 0;
1182 }
1183}
static XrdScheduler * schedP
Definition XrdPfc.hh:312

References GetInstance(), XrdCl::URL::GetPath(), RefConfiguration(), XrdPfc::Info::s_infoExtension, schedP, stat, TRACE, UnlinkFile(), and XrdOssOK.

Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 281 of file XrdPfc.cc.

282{
283 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
284
285 while (true)
286 {
287 m_writeQ.condVar.Lock();
288 while (m_writeQ.size == 0)
289 {
290 m_writeQ.condVar.Wait();
291 }
292
293 // MT -- optimize to pop several blocks if they are available (or swap the list).
294 // This makes sense especially for smallish block sizes.
295
296 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
297 long long sum_size = 0;
298
299 for (int bi = 0; bi < n_pushed; ++bi)
300 {
301 Block* block = m_writeQ.queue.front();
302 m_writeQ.queue.pop_front();
303 m_writeQ.writes_between_purges += block->get_size();
304 sum_size += block->get_size();
305
306 blks_to_write[bi] = block;
307
308 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
309 }
310 m_writeQ.size -= n_pushed;
311
312 m_writeQ.condVar.UnLock();
313
314 {
315 XrdSysMutexHelper lock(&m_RAM_mutex);
316 m_RAM_write_queue -= sum_size;
317 }
318
319 for (int bi = 0; bi < n_pushed; ++bi)
320 {
321 Block* block = blks_to_write[bi];
322
323 block->m_file->WriteBlockToDisk(block);
324 }
325 }
326}
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration & XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 225 of file XrdPfc.hh.

225{ return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Prepare(), and XrdOucGetCache().

Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor & XrdPfc::Cache::RefResMon ( )
inline

Definition at line 307 of file XrdPfc.hh.

307{ return *m_res_mon; }

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File * file)

Definition at line 704 of file XrdPfc.cc.

705{
706 // Can be called with other locks held.
707
708 if ( ! m_prefetch_enabled)
709 {
710 return;
711 }
712
713 m_prefetch_condVar.Lock();
714 m_prefetchList.push_back(file);
715 m_prefetch_condVar.Signal();
716 m_prefetch_condVar.UnLock();
717}

◆ ReleaseFile()

void Cache::ReleaseFile ( File * f,
IO * io )

Definition at line 480 of file XrdPfc.cc.

481{
482 // Called from virtual IO::DetachFinalize.
483
484 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
485
486 {
487 XrdSysCondVarHelper lock(&m_active_cond);
488
489 f->RemoveIO(io);
490 }
491 dec_ref_cnt(f, true);
492}
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char * buf,
long long size )

Definition at line 379 of file XrdPfc.cc.

380{
381 bool std_size = (size == m_configuration.m_bufferSize);
382 {
383 XrdSysMutexHelper lock(&m_RAM_mutex);
384
385 m_RAM_used -= size;
386
387 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
388 {
389 m_RAM_std_blocks.push_back(buf);
390 ++m_RAM_std_size;
391 return;
392 }
393 }
394 free(buf);
395}

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File * f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 248 of file XrdPfc.cc.

249{
250 std::list<Block*> removed_blocks;
251 long long sum_size = 0;
252
253 m_writeQ.condVar.Lock();
254 std::list<Block*>::iterator i = m_writeQ.queue.begin();
255 while (i != m_writeQ.queue.end())
256 {
257 if ((*i)->m_file == file)
258 {
259 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
260 std::list<Block*>::iterator j = i++;
261 removed_blocks.push_back(*j);
262 sum_size += (*j)->get_size();
263 m_writeQ.queue.erase(j);
264 --m_writeQ.size;
265 }
266 else
267 {
268 ++i;
269 }
270 }
271 m_writeQ.condVar.UnLock();
272
273 {
274 XrdSysMutexHelper lock(&m_RAM_mutex);
275 m_RAM_write_queue -= sum_size;
276 }
277
278 file->BlocksRemovedFromWriteQ(removed_blocks);
279}

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long size)

Definition at line 339 of file XrdPfc.cc.

340{
341 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
342
343 bool std_size = (size == m_configuration.m_bufferSize);
344
345 m_RAM_mutex.Lock();
346
347 long long total = m_RAM_used + size;
348
349 if (total <= m_configuration.m_RamAbsAvailable)
350 {
351 m_RAM_used = total;
352 if (std_size && m_RAM_std_size > 0)
353 {
354 char *buf = m_RAM_std_blocks.back();
355 m_RAM_std_blocks.pop_back();
356 --m_RAM_std_size;
357
358 m_RAM_mutex.UnLock();
359
360 return buf;
361 }
362 else
363 {
364 m_RAM_mutex.UnLock();
365 char *buf;
366 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
367 {
368 // Report out of mem? Probably should report it at least the first time,
369 // then periodically.
370 return 0;
371 }
372 return buf;
373 }
374 }
375 m_RAM_mutex.UnLock();
376 return 0;
377}

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 139 of file XrdPfc.cc.

139{ return m_instance->RefResMon(); }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File * f)
inline

Definition at line 300 of file XrdPfc.hh.

300{ schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char * curl,
struct stat & sbuff )
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1193 of file XrdPfc.cc.

1194{
1195 const char *tpfx = "Stat ";
1196
1197 XrdCl::URL url(curl);
1198 std::string f_name = url.GetPath();
1199
1200 File *file = nullptr;
1201 {
1202 XrdSysCondVarHelper lock(&m_active_cond);
1203 auto it = m_active.find(f_name);
1204 if (it != m_active.end()) {
1205 file = it->second;
1206 // If `file` is nullptr, the file-open is in progress; instead
1207 // of waiting for the file-open to finish, simply treat it as if
1208 // the file-open doesn't exist.
1209 if (file) {
1210 inc_ref_cnt(file, false, false);
1211 }
1212 }
1213 }
1214 if (file) {
1215 int res = file->Fstat(sbuff);
1216 dec_ref_cnt(file, false);
1217 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1218 return res;
1219 }
1220
1221 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1222 if (res != XrdOssOK) {
1223 TRACE(Debug, tpfx << curl << " -> " << res);
1224 return 1; // res; -- for only-if-cached
1225 }
1226 if (S_ISDIR(sbuff.st_mode))
1227 {
1228 TRACE(Debug, tpfx << curl << " -> EISDIR");
1229 return -EISDIR;
1230 }
1231
1232 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1233 if (file_size < 0) {
1234 TRACE(Debug, tpfx << curl << " -> " << file_size);
1235 return 1; // (int) file_size; -- for only-if-cached
1236 }
1237 sbuff.st_size = file_size;
1238 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1239 if ( ! is_cached)
1240 sbuff.st_atime = 0;
1241
1242 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1243
1244 return 0;
1245}

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 137 of file XrdPfc.cc.

137{ return *m_instance; }

References Cache().

Referenced by XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char * curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1254 of file XrdPfc.cc.

1255{
1256 XrdCl::URL url(curl);
1257 std::string f_name = url.GetPath();
1258
1259 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1260
1261 return UnlinkFile(f_name, false);
1262}

References XrdCl::URL::GetPath(), and UnlinkFile().

Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string & f_name,
bool fail_if_open )

Remove cinfo and data files from cache.

Definition at line 1264 of file XrdPfc.cc.

1265{
1266 static const char* trc_pfx = "UnlinkFile ";
1267 ActiveMap_i it;
1268 File *file = 0;
1269 long long st_blocks_to_purge = 0;
1270 {
1271 XrdSysCondVarHelper lock(&m_active_cond);
1272
1273 it = m_active.find(f_name);
1274
1275 if (it != m_active.end())
1276 {
1277 if (fail_if_open)
1278 {
1279 TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1280 return -EBUSY;
1281 }
1282
1283 // Null File* in m_active map means an operation is ongoing, probably
1284 // Attach() with possible File::Open(). Ask for retry.
1285 if (it->second == 0)
1286 {
1287 TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1288 return -EAGAIN;
1289 }
1290
1291 file = it->second;
1292 st_blocks_to_purge = file->initiate_emergency_shutdown();
1293 it->second = 0;
1294 }
1295 else
1296 {
1297 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1298 }
1299 }
1300
1301 if (file) {
1303 } else {
1304 struct stat f_stat;
1305 if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1306 st_blocks_to_purge = f_stat.st_blocks;
1307 }
1308
1309 std::string i_name = f_name + Info::s_infoExtension;
1310
1311 // Unlink file & cinfo
1312 int f_ret = m_oss->Unlink(f_name.c_str());
1313 int i_ret = m_oss->Unlink(i_name.c_str());
1314
1315 if (st_blocks_to_purge)
1316 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1317
1318 TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1319
1320 {
1321 XrdSysCondVarHelper lock(&m_active_cond);
1322 m_active.erase(it);
1323 m_active_cond.Broadcast();
1324 }
1325
1326 return std::min(f_ret, i_ret);
1327}
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:248
long long initiate_emergency_shutdown()

References Debug, XrdPfc::File::initiate_emergency_shutdown(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl(), Prepare(), XrdPfc::File::Sync(), and Unlink().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ VCheck()

bool XrdPfc::Cache::VCheck ( XrdVersionInfo & urVersion)
inlinestatic

Version check.

Definition at line 255 of file XrdPfc.hh.

255{ return true; }

◆ WriteCacheControlXAttr()

void Cache::WriteCacheControlXAttr ( int cinfo_fd,
const char * path,
const std::string & cc )

Definition at line 921 of file XrdPfc.cc.

922{
923 if (m_metaXattr) {
924 int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0);
925 if (res != 0) {
926 TRACE(Error, "WritecacheControlXAttr error setting xattr " << res);
927 }
928 }
929}
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Error, TRACE, and XrdSysXAttrActive.

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int cinfo_fd,
long long file_size )

Definition at line 934 of file XrdPfc.cc.

935{
936 if (m_metaXattr) {
937 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
938 if (res != 0) {
939 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
940 }
941 }
942}

References Debug, TRACE, and XrdSysXAttrActive.

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 328 of file XrdPfc.cc.

329{
330 // Called from ResourceMonitor for an alternative estimation of disk writes.
331 XrdSysCondVarHelper lock(&m_writeQ.condVar);
332 long long ret = m_writeQ.writes_between_purges;
333 m_writeQ.writes_between_purges = 0;
334 return ret;
335}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: