XRootD
Loading...
Searching...
No Matches
XrdClPostMaster.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClPoller.hh"
29#include "XrdCl/XrdClChannel.hh"
31#include "XrdCl/XrdClLog.hh"
33
35
36#include <unordered_set>
37
38namespace XrdCl
39{
40 struct ConnErrJob : public Job
41 {
43 std::function<void( const URL&, const XRootDStatus& )> handler) : url( url ),
44 status( status ),
46 {
47 }
48
49 void Run( void *arg )
50 {
51 handler( url, status );
52 delete this;
53 }
54
57 std::function<void( const URL&, const XRootDStatus& )> handler;
58 };
59
61 {
62 PostMasterImpl() : pPoller( 0 ), pInitialized( false ), pRunning( false )
63 {
64 Env *env = DefaultEnv::GetEnv();
65 int workerThreads = DefaultWorkerThreads;
66 env->GetInt( "WorkerThreads", workerThreads );
67
69 pJobManager = new JobManager(workerThreads);
70 }
71
73 {
74 delete pPoller;
75 delete pTaskManager;
76 delete pJobManager;
77 }
78
79 //--------------------------------------------------------------------------
81 //--------------------------------------------------------------------------
83 {
85 pFinalizeSet.insert( ch );
86 }
87
88 //--------------------------------------------------------------------------
90 //--------------------------------------------------------------------------
91 std::shared_ptr<Channel> GetChannel( const URL &url );
92
93 //--------------------------------------------------------------------------
95 //--------------------------------------------------------------------------
97 {
99 pFinalizeSet.erase( ch );
100 }
101
102 typedef std::map<std::string, std::shared_ptr<Channel> > ChannelMap;
103
107 std::unordered_set<Channel*> pFinalizeSet;
108
109 // Mutex protecting access of pChannelMap
111
112 // Mutex protecting access of pFinalizeSet
114
118
120 std::unique_ptr<Job> pOnConnJob;
121 std::function<void( const URL&, const XRootDStatus& )> pOnConnErrCB;
122 };
123
124 //----------------------------------------------------------------------------
125 // Constructor
126 //----------------------------------------------------------------------------
128 {
129 }
130
131 //----------------------------------------------------------------------------
132 // Destructor
133 //----------------------------------------------------------------------------
137
138 //----------------------------------------------------------------------------
139 // Initializer
140 //----------------------------------------------------------------------------
142 {
143 Env *env = DefaultEnv::GetEnv();
144 std::string pollerPref = DefaultPollerPreference;
145 env->GetString( "PollerPreference", pollerPref );
146
147 pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
148
149 if( !pImpl->pPoller )
150 return false;
151
152 bool st = pImpl->pPoller->Initialize();
153
154 if( !st )
155 {
156 delete pImpl->pPoller;
157 return false;
158 }
159
160 pImpl->pJobManager->Initialize();
161 pImpl->pInitialized = true;
162 return true;
163 }
164
165 //----------------------------------------------------------------------------
166 // Finalizer
167 //----------------------------------------------------------------------------
169 {
170 //--------------------------------------------------------------------------
171 // Clean up the channels
172 //--------------------------------------------------------------------------
173 if( !pImpl->pInitialized )
174 return true;
175
176 pImpl->pInitialized = false;
177 pImpl->pJobManager->Finalize();
178
179 //--------------------------------------------------------------------------
180 // Finalize may cause some of the channels to remove themselves from the
181 // finalize set. So make a copy. Should be no concurrency as poller and
182 // jobmanager are stopped, so no lock.
183 //--------------------------------------------------------------------------
184 auto finSet = pImpl->pFinalizeSet;
185 for( auto ch: finSet ) ch->Finalize();
186
187 pImpl->pChannelMap.clear();
188 return pImpl->pPoller->Finalize();
189 }
190
191 //----------------------------------------------------------------------------
192 // Start the post master
193 //----------------------------------------------------------------------------
195 {
196 if( !pImpl->pInitialized )
197 return false;
198
199 if( !pImpl->pPoller->Start() )
200 return false;
201
202 if( !pImpl->pTaskManager->Start() )
203 {
204 pImpl->pPoller->Stop();
205 return false;
206 }
207
208 if( !pImpl->pJobManager->Start() )
209 {
210 pImpl->pPoller->Stop();
211 pImpl->pTaskManager->Stop();
212 return false;
213 }
214
215 pImpl->pRunning = true;
216 return true;
217 }
218
219 //----------------------------------------------------------------------------
220 // Stop the postmaster
221 //----------------------------------------------------------------------------
223 {
224 if( !pImpl->pInitialized || !pImpl->pRunning )
225 return true;
226
227 if( !pImpl->pJobManager->Stop() )
228 return false;
229 if( !pImpl->pPoller->Stop() )
230 return false;
231 if( !pImpl->pTaskManager->Stop() )
232 return false;
233 pImpl->pRunning = false;
234 return true;
235 }
236
237 //----------------------------------------------------------------------------
238 // Reinitialize after fork
239 //----------------------------------------------------------------------------
241 {
242 return true;
243 }
244
245 //----------------------------------------------------------------------------
246 // Send the message asynchronously
247 //----------------------------------------------------------------------------
249 Message *msg,
250 MsgHandler *handler,
251 bool stateful,
252 time_t expires )
253 {
254 auto channel = pImpl->GetChannel( url );
255
256 if( !channel )
258
259 return channel->Send( msg, handler, stateful, expires );
260 }
261
263 Message *msg,
264 MsgHandler *inHandler )
265 {
267 VirtualRedirector *redirector = registry.Get( url );
268 if( !redirector )
269 return Status( stError, errInvalidOp );
270 return redirector->HandleRequest( msg, inHandler );
271 }
272
273 //----------------------------------------------------------------------------
274 // Query the transport handler
275 //----------------------------------------------------------------------------
277 uint16_t query,
278 AnyObject &result )
279 {
280 std::shared_ptr<Channel> channel;
281 {
282 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
283 PostMasterImpl::ChannelMap::iterator it =
284 pImpl->pChannelMap.find( url.GetChannelId() );
285 if( it == pImpl->pChannelMap.end() )
286 return Status( stError, errInvalidOp );
287 channel = it->second;
288 }
289
290 if( !channel )
291 return Status( stError, errNotSupported );
292
293 return channel->QueryTransport( query, result );
294 }
295
296 //----------------------------------------------------------------------------
297 // Register channel event handler
298 //----------------------------------------------------------------------------
300 ChannelEventHandler *handler )
301 {
302 auto channel = pImpl->GetChannel( url );
303
304 if( !channel )
305 return Status( stError, errNotSupported );
306
307 channel->RegisterEventHandler( handler );
308 return Status();
309 }
310
311 //----------------------------------------------------------------------------
312 // Remove a channel event handler
313 //----------------------------------------------------------------------------
315 ChannelEventHandler *handler )
316 {
317 auto channel = pImpl->GetChannel( url );
318
319 if( !channel )
320 return Status( stError, errNotSupported );
321
322 channel->RemoveEventHandler( handler );
323 return Status();
324 }
325
326 //------------------------------------------------------------------------
327 // Get the task manager object user by the post master
328 //------------------------------------------------------------------------
330 {
331 return pImpl->pTaskManager;
332 }
333
334 //------------------------------------------------------------------------
335 // Get the job manager object user by the post master
336 //------------------------------------------------------------------------
338 {
339 return pImpl->pJobManager;
340 }
341
342 //------------------------------------------------------------------------
343 // Shut down a channel
344 //------------------------------------------------------------------------
346 {
347 return ForceDisconnect(url, false);
348 }
349
350 //------------------------------------------------------------------------
351 // Shut down a channel
352 //------------------------------------------------------------------------
353 Status PostMaster::ForceDisconnect( const URL &url, bool hush )
354 {
355 std::shared_ptr<Channel> channel;
356 {
357 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
358 PostMasterImpl::ChannelMap::iterator it =
359 pImpl->pChannelMap.find( url.GetChannelId() );
360
361 if( it == pImpl->pChannelMap.end() )
362 return Status( stError, errInvalidOp );
363 channel = it->second;
364 pImpl->pChannelMap.erase( it );
365 }
366
367 channel->ForceDisconnect( hush );
368 return Status();
369 }
370
371 //------------------------------------------------------------------------
372 // Shut down a channel. This version is used by the channel itself.
373 //------------------------------------------------------------------------
374 Status PostMaster::ForceDisconnect( std::shared_ptr<Channel> channel,
375 const uint64_t sess )
376 {
377 if( !channel )
378 return Status( stError, errNotSupported );
379
380 {
381 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
382 PostMasterImpl::ChannelMap::iterator it =
383 pImpl->pChannelMap.find( channel->GetURL().GetChannelId() );
384
385 if( it != pImpl->pChannelMap.end() && it->second == channel )
386 pImpl->pChannelMap.erase( it );
387 }
388
389 channel->ForceDisconnect( channel, sess );
390 return Status();
391 }
392
394 {
395 std::shared_ptr<Channel> channel;
396 {
397 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
398 PostMasterImpl::ChannelMap::iterator it =
399 pImpl->pChannelMap.find( url.GetChannelId() );
400
401 if( it == pImpl->pChannelMap.end() )
402 return Status( stError, errInvalidOp );
403 channel = it->second;
404 }
405
406 channel->ForceReconnect();
407 return Status();
408 }
409
410 //------------------------------------------------------------------------
411 // Get the number of connected data streams
412 //------------------------------------------------------------------------
413 uint16_t PostMaster::NbConnectedStrm( const URL &url )
414 {
415 auto channel = pImpl->GetChannel( url );
416 if( !channel ) return 0;
417 return channel->NbConnectedStrm();
418 }
419
420 //------------------------------------------------------------------------
422 //------------------------------------------------------------------------
424 std::shared_ptr<Job> onConnJob )
425 {
426 auto channel = pImpl->GetChannel( url );
427 if( !channel ) return;
428 channel->SetOnDataConnectHandler( onConnJob );
429 }
430
431 //------------------------------------------------------------------------
433 //------------------------------------------------------------------------
434 void PostMaster::SetOnConnectHandler( std::unique_ptr<Job> onConnJob )
435 {
436 XrdSysMutexHelper lck( pImpl->pMtx );
437 pImpl->pOnConnJob = std::move( onConnJob );
438 }
439
440 //------------------------------------------------------------------------
441 // Set the global connection error handler
442 //------------------------------------------------------------------------
443 void PostMaster::SetConnectionErrorHandler( std::function<void( const URL&, const XRootDStatus& )> handler )
444 {
445 XrdSysMutexHelper lck( pImpl->pMtx );
446 pImpl->pOnConnErrCB = std::move( handler );
447 }
448
449 //------------------------------------------------------------------------
450 // Notify the global on-connect handler
451 //------------------------------------------------------------------------
453 {
454 XrdSysMutexHelper lck( pImpl->pMtx );
455 if( pImpl->pOnConnJob )
456 {
457 URL *ptr = new URL( url );
458 pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
459 }
460 }
461
462 //------------------------------------------------------------------------
463 // Notify the global error connection handler
464 //------------------------------------------------------------------------
465 void PostMaster::NotifyConnErrHandler( const URL &url, const XRootDStatus &status )
466 {
467 XrdSysMutexHelper lck( pImpl->pMtx );
468 if( pImpl->pOnConnErrCB )
469 {
470 ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
471 pImpl->pJobManager->QueueJob( job, nullptr );
472 }
473 }
474
475 //----------------------------------------------------------------------------
477 //----------------------------------------------------------------------------
478 void PostMaster::CollapseRedirect( const URL &alias, const URL &url )
479 {
480 XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
481 //--------------------------------------------------------------------------
482 // Get the passive channel
483 //--------------------------------------------------------------------------
484 std::shared_ptr<Channel> passive;
485 PostMasterImpl::ChannelMap::iterator it =
486 pImpl->pChannelMap.find( alias.GetChannelId() );
487 if( it != pImpl->pChannelMap.end() )
488 passive = it->second;
489
490 //--------------------------------------------------------------------------
491 // If the channel does not exist there's nothing to do
492 //--------------------------------------------------------------------------
493 if( !passive ) return;
494
495 //--------------------------------------------------------------------------
496 // Check if this URL is eligible for collapsing. To avoid depencencies
497 // we don't call CanCollapse while holding the channel map mutex. So we
498 // reverify the content of the map afterwards.
499 //--------------------------------------------------------------------------
500 scopedLock.UnLock();
501 if( !passive->CanCollapse( url ) ) return;
502
503 scopedLock.Lock( &pImpl->pChannelMapMutex );
504 it = pImpl->pChannelMap.find( alias.GetChannelId() );
505 if( it == pImpl->pChannelMap.end() || it->second != passive )
506 {
507 // something changed. Retry.
508 scopedLock.UnLock();
509 CollapseRedirect( alias, url );
510 return;
511 }
512
513 //--------------------------------------------------------------------------
514 // Create the active channel
515 //--------------------------------------------------------------------------
517 TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
518
519 if( !trHandler )
520 {
521 Log *log = DefaultEnv::GetLog();
522 log->Error( PostMasterMsg, "Unable to get transport handler for %s "
523 "protocol", url.GetProtocol().c_str() );
524 return;
525 }
526
527 Log *log = DefaultEnv::GetLog();
528 log->Info( PostMasterMsg, "Label channel %s with alias %s.",
529 url.GetHostId().c_str(), alias.GetHostId().c_str() );
530
531 std::shared_ptr<Channel> active(new Channel{ alias,
532 pImpl->pPoller, trHandler, pImpl->pTaskManager, pImpl->pJobManager, url },
533 [this](Channel *ch) { this->pImpl->removeFinalize( ch ); delete ch; });
534 pImpl->addFinalize( active.get() );
535 active->SetSelf( active );
536
537 pImpl->pChannelMap[alias.GetChannelId()] = active;
538 //--------------------------------------------------------------------------
539 // The passive channel will be deallocated by TTL
540 //--------------------------------------------------------------------------
541 }
542
543 //------------------------------------------------------------------------
544 // Decrement file object instance count bound to this channel
545 //------------------------------------------------------------------------
547 {
548 auto channel = pImpl->GetChannel( url );
549
550 if( !channel ) return;
551
552 return channel->DecFileInstCnt();
553 }
554
555 //------------------------------------------------------------------------
556 //true if underlying threads are running, false otherwise
557 //------------------------------------------------------------------------
559 {
560 return pImpl->pRunning;
561 }
562
563 //----------------------------------------------------------------------------
564 // Get the channel
565 //----------------------------------------------------------------------------
566 std::shared_ptr<Channel> PostMasterImpl::GetChannel( const URL &url )
567 {
569 std::shared_ptr<Channel> channel;
570 PostMasterImpl::ChannelMap::iterator it = pChannelMap.find( url.GetChannelId() );
571
572 if( it == pChannelMap.end() )
573 {
575 TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
576
577 if( !trHandler )
578 {
579 Log *log = DefaultEnv::GetLog();
580 log->Error( PostMasterMsg, "Unable to get transport handler for %s "
581 "protocol", url.GetProtocol().c_str() );
582 return 0;
583 }
584
585 std::shared_ptr<Channel> newchan(new Channel{ url, pPoller,
586 trHandler, pTaskManager, pJobManager },
587 [this](Channel *ch) { this->removeFinalize( ch ); delete ch; });
588 addFinalize( newchan.get() );
589 channel = newchan;
590 channel->SetSelf( channel );
591
592 pChannelMap[url.GetChannelId()] = channel;
593 }
594 else
595 channel = it->second;
596 return channel;
597 }
598}
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A synchronized queue.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
The message representation used throughout the system.
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
PostMaster()
Constructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition XrdClURL.hh:31
std::string GetChannelId() const
Definition XrdClURL.cc:512
std::string GetHostId() const
Get the host part of the URL (user:password@host:port).
Definition XrdClURL.hh:99
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
void Lock(XrdSysMutex *Mutex)
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
const char *const DefaultPollerPreference
const uint16_t errNotSupported
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
std::map< std::string, std::shared_ptr< Channel > > ChannelMap
std::unique_ptr< Job > pOnConnJob
void addFinalize(Channel *ch)
Used to maintain a non-owning set of live Channels. Used by Finalize.
std::unordered_set< Channel * > pFinalizeSet
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
void removeFinalize(Channel *ch)
Used to maintain a non-owning set of live Channels. Used by Finalize.
std::shared_ptr< Channel > GetChannel(const URL &url)
Get a channel for url, creating one if needed.
Procedure execution status.