XRootD
Loading...
Searching...
No Matches
XrdCl::XCpCtx Class Reference

#include <XrdClXCpCtx.hh>

Collaboration diagram for XrdCl::XCpCtx:

Public Member Functions

 XCpCtx (const std::vector< std::string > &urls, uint64_t blockSize, uint8_t parallelSrc, uint64_t chunkSize, uint64_t parallelChunks, int64_t fileSize)
bool AllDone ()
void Delete ()
std::pair< uint64_t, uint64_t > GetBlock ()
XRootDStatus GetChunk (XrdCl::PageInfo &ci)
bool GetNextUrl (std::string &url)
int64_t GetSize ()
XRootDStatus Initialize ()
void NotifyIdleSrc ()
void NotifyInitExpectant ()
void PutChunk (PageInfo *chunk)
void Release ()
void RemoveSrc (XCpSrc *src)
XCpCtxSelf ()
void SetFileSize (int64_t size)
XCpSrcWeakestLink (XCpSrc *exclude)

Detailed Description

Definition at line 40 of file XrdClXCpCtx.hh.

Constructor & Destructor Documentation

◆ XCpCtx()

XrdCl::XCpCtx::XCpCtx ( const std::vector< std::string > & urls,
uint64_t blockSize,
uint8_t parallelSrc,
uint64_t chunkSize,
uint64_t parallelChunks,
int64_t fileSize )

Constructor

Parameters
urls: list of replica urls
blockSize: the default block size
parallelSrc: maximum number of parallel sources
chunkSize: the default chunk size
parallelChunks: the default number of parallel chunks per source
fileSize: the file size if specified in the metalink file (-1 indicates that the file size is not known and a stat should be done)

Definition at line 36 of file XrdClXCpCtx.cc.

36 :
37 pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38 pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39 pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40 pDoneCV( 0 ), pRefCount( 1 ), pDeleteCV( 0 ), pDelete( false )
41{
42 SetFileSize( fileSize );
43}
void SetFileSize(int64_t size)

References SetFileSize().

Referenced by Self().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Function Documentation

◆ AllDone()

bool XrdCl::XCpCtx::AllDone ( )

Returns true if all chunks have been transferred, otherwise blocks until NotifyIdleSrc is called, or a 1 minute timeout occurs.

Returns
: true is all chunks have been transferred, false otherwise.

Definition at line 193 of file XrdClXCpCtx.cc.

194{
195 XrdSysCondVarHelper lck( pDoneCV );
196
197 if( !pDone )
198 pDoneCV.Wait( 60 );
199
200 return pDone;
201}

◆ Delete()

void XrdCl::XCpCtx::Delete ( )
inline

Decrements the reference count and then waits for it to reach zero, then deletes the instance. Should only be called once.

Definition at line 62 of file XrdClXCpCtx.hh.

63 {
64 XrdSysMutexHelper lckmtx( pMtx );
65 --pRefCount;
66 if( !pRefCount )
67 {
68 lckmtx.UnLock();
69 delete this;
70 return;
71 }
72 lckmtx.UnLock();
73
74 XrdSysCondVarHelper lckcv( pDoneCV );
75 pDone = true;
76 pDoneCV.Broadcast();
77 lckcv.UnLock();
78
79 lckcv.Lock( &pDeleteCV );
80 while( !pDelete ) pDeleteCV.Wait();
81 lckcv.UnLock();
82 delete this;
83 }

References XrdSysCondVarHelper::Lock(), XrdSysCondVarHelper::UnLock(), and XrdSysMutexHelper::UnLock().

Here is the call graph for this function:

◆ GetBlock()

std::pair< uint64_t, uint64_t > XrdCl::XCpCtx::GetBlock ( )

Get next block that has to be transferred

Returns
: pair of offset and block size

Definition at line 95 of file XrdClXCpCtx.cc.

96{
97 XrdSysMutexHelper lck( pMtx );
98
99 uint64_t blkSize = pBlockSize, offset = pOffset;
100 if( pOffset + blkSize > uint64_t( pFileSize ) )
101 blkSize = pFileSize - pOffset;
102 pOffset += blkSize;
103
104 return std::make_pair( offset, blkSize );
105}

◆ GetChunk()

XRootDStatus XrdCl::XCpCtx::GetChunk ( XrdCl::PageInfo & ci)

Gets the next chunk from the sink, if the sink is empty blocks.

Parameters
ci: the chunk retrieved from sink (output parameter)
Returns
: stError if we failed to transfer the file, stOK otherwise, with one of the following codes:
  • suDone : the whole file has been transferred, we are done
  • suContinue : a chunk has been written into ci, continue calling GetChunk in order to retrieve remaining chunks
  • suRetry : a chunk has not been written into ci, try again.

Definition at line 156 of file XrdClXCpCtx.cc.

157{
158 // if we received all the data we are done here
159 if( pDataReceived == uint64_t( pFileSize ) )
160 {
161 XrdSysCondVarHelper lck( pDoneCV );
162 pDone = true;
163 pDoneCV.Broadcast();
164 return XRootDStatus( stOK, suDone );
165 }
166
167 // if we don't have active sources it means we failed
168 if( GetRunning() == 0 )
169 {
170 XrdSysCondVarHelper lck( pDoneCV );
171 pDone = true;
172 pDoneCV.Broadcast();
173 return XRootDStatus( stError, errNoMoreReplicas );
174 }
175
176 PageInfo *chunk = pSink.Get();
177 if( chunk )
178 {
179 pDataReceived += chunk->GetLength();
180 ci = std::move( *chunk );
181 delete chunk;
182 return XRootDStatus( stOK, suContinue );
183 }
184
185 return XRootDStatus( stOK, suRetry );
186}
const uint16_t suRetry
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t stOK
Everything went OK.
const uint16_t suDone
const uint16_t suContinue
const uint16_t errNoMoreReplicas
No more replicas to try.

References XrdCl::errNoMoreReplicas, XrdCl::PageInfo::GetLength(), XrdCl::stError, XrdCl::stOK, XrdCl::suContinue, XrdCl::suDone, and XrdCl::suRetry.

Here is the call graph for this function:

◆ GetNextUrl()

bool XrdCl::XCpCtx::GetNextUrl ( std::string & url)

Gets the next URL from the list of file replicas

Parameters
url: the output parameter
Returns
: true if a url has been written to the url parameter, false otherwise

Definition at line 57 of file XrdClXCpCtx.cc.

58{
59 XrdSysMutexHelper lck( pMtx );
60 if( pUrls.empty() ) return false;
61 url = pUrls.front();
62 pUrls.pop();
63 return true;
64}

◆ GetSize()

int64_t XrdCl::XCpCtx::GetSize ( )
inline

Get file size. The call blocks until the file size is being set using SetFileSize.

Definition at line 157 of file XrdClXCpCtx.hh.

158 {
159 XrdSysCondVarHelper lck( pFileSizeCV );
160 while( pFileSize < 0 && GetRunning() > 0 ) pFileSizeCV.Wait();
161 return pFileSize;
162 }

◆ Initialize()

XRootDStatus XrdCl::XCpCtx::Initialize ( )

Starts one thread per source, each thread tries to open a file, stat the file if necessary, and then starts reading the file, all chunks read go to the sink.

Returns
Error if we were not able to create any threads

Definition at line 124 of file XrdClXCpCtx.cc.

125{
126 for( uint8_t i = 0; i < pParallelSrc; ++i )
127 {
128 XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
129 pSources.push_back( src );
130 }
131
132 auto scpy = pSources;
133 bool ok = false;
134 for(auto src: scpy) {
135 if( src->Start() )
136 {
137 // src destructor will remove src from pSources
138 src->Delete();
139 }
140 else
141 {
142 ok = true;
143 }
144 }
145
146 if( !ok )
147 {
148 Log *log = DefaultEnv::GetLog();
149 log->Error( UtilityMsg, "Failed to initialize (failed to create new threads)" );
150 return XRootDStatus( stError, errInternal, EAGAIN, "XCpCtx: failed to create new threads." );
151 }
152
153 return XRootDStatus();
154}
static Log * GetLog()
Get default log.
const uint16_t errInternal
Internal error.
const uint64_t UtilityMsg
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::stError, and XrdCl::UtilityMsg.

Here is the call graph for this function:

◆ NotifyIdleSrc()

void XrdCl::XCpCtx::NotifyIdleSrc ( )

Notify idle sources, used in two case:

  • if one of the sources failed and an idle source needs to take over
  • or if we are done and all idle source should be stopped

Definition at line 188 of file XrdClXCpCtx.cc.

189{
190 pDoneCV.Broadcast();
191}

◆ NotifyInitExpectant()

void XrdCl::XCpCtx::NotifyInitExpectant ( )
inline

Notify those who are waiting for initialization. In particular the GetSize() caller will be waiting on the result of initialization.

Definition at line 225 of file XrdClXCpCtx.hh.

226 {
227 pFileSizeCV.Broadcast();
228 }

◆ PutChunk()

void XrdCl::XCpCtx::PutChunk ( PageInfo * chunk)

Put a chunk into the sink

Parameters
chunk: the chunk

Definition at line 90 of file XrdClXCpCtx.cc.

91{
92 pSink.Put( chunk );
93}

◆ Release()

void XrdCl::XCpCtx::Release ( )
inline

Decrements the reference count and signal when we reach 0

Definition at line 88 of file XrdClXCpCtx.hh.

89 {
90 XrdSysMutexHelper lck( pMtx );
91 --pRefCount;
92 if( !pRefCount )
93 {
94 XrdSysCondVarHelper lckcv( pDeleteCV );
95 pDelete = true;
96 pDeleteCV.Broadcast();
97 }
98 }

◆ RemoveSrc()

void XrdCl::XCpCtx::RemoveSrc ( XCpSrc * src)
inline

Remove given source

Parameters
src: the source to be removed

Definition at line 195 of file XrdClXCpCtx.hh.

196 {
197 XrdSysMutexHelper lck( pMtx );
198 pSources.remove( src );
199 }

◆ Self()

XCpCtx * XrdCl::XCpCtx::Self ( )
inline

Increments the reference counter.

Returns
: myself.

Definition at line 105 of file XrdClXCpCtx.hh.

106 {
107 XrdSysMutexHelper lck( pMtx );
108 ++pRefCount;
109 return this;
110 }

References XCpCtx().

Here is the call graph for this function:

◆ SetFileSize()

void XrdCl::XCpCtx::SetFileSize ( int64_t size)

Set the file size (GetSize will block until SetFileSize will be called). Also calculates the block size.

Parameters
size: file size

Definition at line 107 of file XrdClXCpCtx.cc.

108{
109 XrdSysCondVarHelper lckcv( pFileSizeCV );
110 XrdSysMutexHelper lckmtx( pMtx );
111 if( pFileSize < 0 && size >= 0 )
112 {
113 pFileSize = size;
114 pFileSizeCV.Broadcast();
115
116 if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
117 pBlockSize = pFileSize / pParallelSrc;
118
119 if( pBlockSize < pChunkSize )
120 pBlockSize = pChunkSize;
121 }
122}

Referenced by XCpCtx().

Here is the caller graph for this function:

◆ WeakestLink()

XCpSrc * XrdCl::XCpCtx::WeakestLink ( XCpSrc * exclude)

Get the 'weakest' sources

Parameters
exclude: the source that is excluded from the search
Returns
: the weakest source

Definition at line 66 of file XrdClXCpCtx.cc.

67{
68 uint64_t transferRate = -1; // set transferRate to max uint64 value
69 XCpSrc *ret = 0;
70
71 std::list<XCpSrc*>::iterator itr;
72 XrdSysMutexHelper lck( pMtx );
73
74 for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
75 {
76 XCpSrc *src = *itr;
77 if( src == exclude ) continue;
78 uint64_t tmp = src->TransferRate();
79 if( src->HasData() && tmp < transferRate )
80 {
81 ret = src;
82 transferRate = tmp;
83 }
84 }
85
86 if( !ret ) return ret;
87 return ret->Self();
88}

References XrdCl::XCpSrc::HasData(), XrdCl::XCpSrc::Self(), and XrdCl::XCpSrc::TransferRate().

Here is the call graph for this function:

The documentation for this class was generated from the following files: