XRootD
Loading...
Searching...
No Matches
XrdCl::XRootDTransport Class Reference

XRootD transport handler. More...

#include <XrdClXRootDTransport.hh>

Inheritance diagram for XrdCl::XRootDTransport:
Collaboration diagram for XrdCl::XRootDTransport:

Public Member Functions

 XRootDTransport ()
 Constructor.
 ~XRootDTransport ()
 Destructor.
virtual void DecFileInstCnt (AnyObject &channelData)
 Decrement file object instance count bound to this channel.
virtual void Disconnect (AnyObject &channelData, uint16_t subStreamId)
 The stream has been disconnected, do the cleanups.
virtual void FinalizeChannel (AnyObject &channelData)
 Finalize channel.
virtual URL GetBindPreference (const URL &url, AnyObject &channelData)
 Get bind preference for the next data stream.
virtual XRootDStatus GetBody (Message &message, Socket *socket)
virtual XRootDStatus GetHeader (Message &message, Socket *socket)
virtual XRootDStatus GetMore (Message &message, Socket *socket)
virtual Status GetSignature (Message *toSign, Message *&sign, AnyObject &channelData)
 Get signature for given message.
virtual Status GetSignature (Message *toSign, Message *&sign, XRootDChannelInfo *info)
 Get signature for given message.
virtual XRootDStatus HandShake (HandShakeData *handShakeData, AnyObject &channelData)
 HandShake.
virtual bool HandShakeDone (HandShakeData *handShakeData, AnyObject &channelData)
virtual void InitializeChannel (const URL &url, AnyObject &channelData)
 Initialize channel.
virtual Status IsStreamBroken (time_t inactiveTime, AnyObject &channelData)
virtual bool IsStreamTTLElapsed (time_t time, AnyObject &channelData)
 Check if the stream should be disconnected.
virtual uint32_t MessageReceived (Message &msg, uint16_t subStream, AnyObject &channelData)
 Check if the message invokes a stream action.
virtual void MessageSent (Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
 Notify the transport about a message having been sent.
virtual PathID Multiplex (Message *msg, AnyObject &channelData, PathID *hint=0)
virtual PathID MultiplexSubStream (Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedControlConnection ()
virtual bool NeedEncryption (HandShakeData *handShakeData, AnyObject &channelData)
virtual Status Query (uint16_t query, AnyObject &result, AnyObject &channelData)
 Query the channel.
virtual uint16_t SubStreamNumber (AnyObject &channelData)
 Return a number of substreams per stream that should be created.
virtual void WaitBeforeExit ()
 Wait until the program can safely exit.
Public Member Functions inherited from XrdCl::TransportHandler
virtual ~TransportHandler ()

Static Public Member Functions

static void GenerateDescription (char *msg, std::ostringstream &o)
 Get the description of a message.
static void LogErrorResponse (const Message &msg)
 Log server error response.
static XRootDStatus MarshallRequest (char *msg)
 Marshal the outgoing message.
static XRootDStatus MarshallRequest (Message *msg)
 Marshal the outgoing message.
static uint16_t NbConnectedStrm (AnyObject &channelData)
 Number of currently connected data streams.
static void SetDescription (Message *msg)
 Get the description of a message.
static XRootDStatus UnMarchalStatusMore (Message &msg)
 Unmarshall the correction-segment of the status response for pgwrite.
static XRootDStatus UnMarshallBody (Message *msg, uint16_t reqType)
 Unmarshall the body of the incoming message.
static void UnMarshallHeader (Message &msg)
 Unmarshall the header incoming message.
static XRootDStatus UnMarshallRequest (Message *msg)
static XRootDStatus UnMarshalStatusBody (Message &msg, uint16_t reqType)
 Unmarshall the body of the status response.

Friends

struct PluginUnloadHandler

Additional Inherited Members

Public Types inherited from XrdCl::TransportHandler
enum  StreamAction {
  NoAction = 0x0000 ,
  DigestMsg = 0x0001 ,
  AbortStream = 0x0002 ,
  CloseStream = 0x0004 ,
  ResumeStream = 0x0008 ,
  HoldStream = 0x0010 ,
  RequestClose = 0x0020
}
 Stream actions that may be triggered by incoming control messages. More...

Detailed Description

XRootD transport handler.

Definition at line 47 of file XrdClXRootDTransport.hh.

Constructor & Destructor Documentation

◆ XRootDTransport()

XrdCl::XRootDTransport::XRootDTransport ( )

Constructor.

Definition at line 293 of file XrdClXRootDTransport.cc.

293 :
294 pSecUnloadHandler( new PluginUnloadHandler() )
295 {
296 }

References PluginUnloadHandler.

Here is the call graph for this function:

◆ ~XRootDTransport()

XrdCl::XRootDTransport::~XRootDTransport ( )

Destructor.

Definition at line 301 of file XrdClXRootDTransport.cc.

302 {
303 delete pSecUnloadHandler; pSecUnloadHandler = 0;
304 }

Member Function Documentation

◆ DecFileInstCnt()

void XrdCl::XRootDTransport::DecFileInstCnt ( AnyObject & channelData)
virtual

Decrement file object instance count bound to this channel.

Implements XrdCl::TransportHandler.

Definition at line 1828 of file XrdClXRootDTransport.cc.

1829 {
1830 XRootDChannelInfo *info = 0;
1831 channelData.Get( info );
1832 if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1833 info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1834 }

References XrdCl::XRootDChannelInfo::finstcnt, and XrdCl::AnyObject::Get().

Here is the call graph for this function:

◆ Disconnect()

void XrdCl::XRootDTransport::Disconnect ( AnyObject & channelData,
uint16_t subStreamId )
virtual

The stream has been disconnected, do the cleanups.

Implements XrdCl::TransportHandler.

Definition at line 1561 of file XrdClXRootDTransport.cc.

1563 {
1564 XRootDChannelInfo *info = 0;
1565 channelData.Get( info );
1566
1567 if (!info) {
1568 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1569 return;
1570 }
1571
1572 XrdSysMutexHelper scopedLock( info->mutex );
1573
1574 if( !info->stream.empty() )
1575 {
1576 XRootDStreamInfo &sInfo = info->stream[subStreamId];
1577 sInfo.status = XRootDStreamInfo::Disconnected;
1578 }
1579
1580 if( subStreamId == 0 )
1581 {
1582 CleanUpProtection( info );
1583 info->sidManager->ReleaseAllTimedOut();
1584 info->sentOpens.clear();
1585 info->sentCloses.clear();
1586 info->openFiles = 0;
1587 info->waitBarrier = 0;
1588 }
1589 }
static Log * GetLog()
Get default log.
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
const uint64_t XRootDTransportMsg

References XrdCl::XRootDStreamInfo::Disconnected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ FinalizeChannel()

void XrdCl::XRootDTransport::FinalizeChannel ( AnyObject & channelData)
virtual

Finalize channel.

Implements XrdCl::TransportHandler.

Definition at line 462 of file XrdClXRootDTransport.cc.

463 {
464 }

◆ GenerateDescription()

void XrdCl::XRootDTransport::GenerateDescription ( char * msg,
std::ostringstream & o )
static

Get the description of a message.

Definition at line 3006 of file XrdClXRootDTransport.cc.

3007 {
3008 Log *log = DefaultEnv::GetLog();
3009 if( log->GetLevel() < Log::ErrorMsg )
3010 return;
3011
3012 ClientRequestHdr *req = (ClientRequestHdr *)msg;
3013 switch( req->requestid )
3014 {
3015 //------------------------------------------------------------------------
3016 // kXR_open
3017 //------------------------------------------------------------------------
3018 case kXR_open:
3019 {
3020 ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
3021 o << "kXR_open (";
3022 char *fn = GetDataAsString( msg );
3023 o << "file: " << fn << ", ";
3024 delete [] fn;
3025 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3026 o << std::setbase(10);
3027 o << "flags: ";
3028 if( sreq->options == 0 )
3029 o << "none ";
3030 else
3031 {
3032 if( sreq->options & kXR_compress )
3033 o << "kXR_compress ";
3034 if( sreq->options & kXR_delete )
3035 o << "kXR_delete ";
3036 if( sreq->options & kXR_force )
3037 o << "kXR_force ";
3038 if( sreq->options & kXR_mkpath )
3039 o << "kXR_mkpath ";
3040 if( sreq->options & kXR_new )
3041 o << "kXR_new ";
3042 if( sreq->options & kXR_nowait )
3043 o << "kXR_nowait ";
3044 if( sreq->options & kXR_open_apnd )
3045 o << "kXR_open_apnd ";
3046 if( sreq->options & kXR_open_read )
3047 o << "kXR_open_read ";
3048 if( sreq->options & kXR_open_updt )
3049 o << "kXR_open_updt ";
3050 if( sreq->options & kXR_open_wrto )
3051 o << "kXR_open_wrto ";
3052 if( sreq->options & kXR_posc )
3053 o << "kXR_posc ";
3054 if( sreq->options & kXR_prefname )
3055 o << "kXR_prefname ";
3056 if( sreq->options & kXR_refresh )
3057 o << "kXR_refresh ";
3058 if( sreq->options & kXR_4dirlist )
3059 o << "kXR_4dirlist ";
3060 if( sreq->options & kXR_replica )
3061 o << "kXR_replica ";
3062 if( sreq->options & kXR_seqio )
3063 o << "kXR_seqio ";
3064 if( sreq->options & kXR_async )
3065 o << "kXR_async ";
3066 if( sreq->options & kXR_retstat )
3067 o << "kXR_retstat ";
3068 }
3069 o << "flagt: ";
3070 if( sreq->optiont == 0 )
3071 o << "none ";
3072 else
3073 {
3074 if( sreq->optiont & kXR_dup )
3075 o << "kXR_dup ";
3076 if( sreq->options & kXR_samefs )
3077 o << "kXR_samefs ";
3078 }
3079 o << "fhtemplt: " << FileHandleToStr( sreq->fhtemplt );
3080 o << ")";
3081 break;
3082 }
3083
3084 //------------------------------------------------------------------------
3085 // kXR_clone
3086 //------------------------------------------------------------------------
3087 case kXR_clone:
3088 {
3089 ClientCloneRequest *sreq = (ClientCloneRequest *)msg;
3090 XrdProto::clone_list *dataChunk = (XrdProto::clone_list*)(msg + 24 );
3091 o << "kXR_clone ( ";
3092 o << "handle: " << FileHandleToStr( sreq->fhandle );
3093 o << std::setbase(10);
3094 o << " list [ ";
3095 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::clone_list); ++i )
3096 {
3097 o << "(src_handle: ";
3098 o << FileHandleToStr( dataChunk[i].srcFH );
3099 o << ", ";
3100 o << std::setbase(10);
3101 o << "src_offset: " << dataChunk[i].srcOffs;
3102 o << ", src_length: " << dataChunk[i].srcLen;
3103 o << ", dst_offset: " << dataChunk[i].dstOffs << "); ";
3104 }
3105
3106 o << " ] )";
3107 break;
3108 }
3109
3110 //------------------------------------------------------------------------
3111 // kXR_close
3112 //------------------------------------------------------------------------
3113 case kXR_close:
3114 {
3115 ClientCloseRequest *sreq = (ClientCloseRequest *)msg;
3116 o << "kXR_close (";
3117 o << "handle: " << FileHandleToStr( sreq->fhandle );
3118 o << ")";
3119 break;
3120 }
3121
3122 //------------------------------------------------------------------------
3123 // kXR_stat
3124 //------------------------------------------------------------------------
3125 case kXR_stat:
3126 {
3127 ClientStatRequest *sreq = (ClientStatRequest *)msg;
3128 o << "kXR_stat (";
3129 if( sreq->dlen )
3130 {
3131 char *fn = GetDataAsString( msg );;
3132 o << "path: " << fn << ", ";
3133 delete [] fn;
3134 }
3135 else
3136 {
3137 o << "handle: " << FileHandleToStr( sreq->fhandle );
3138 o << ", ";
3139 }
3140 o << "flags: ";
3141 if( sreq->options == 0 )
3142 o << "none";
3143 else
3144 {
3145 if( sreq->options & kXR_vfs )
3146 o << "kXR_vfs";
3147 }
3148 o << ")";
3149 break;
3150 }
3151
3152 //------------------------------------------------------------------------
3153 // kXR_read
3154 //------------------------------------------------------------------------
3155 case kXR_read:
3156 {
3157 ClientReadRequest *sreq = (ClientReadRequest *)msg;
3158 o << "kXR_read (";
3159 o << "handle: " << FileHandleToStr( sreq->fhandle );
3160 o << std::setbase(10);
3161 o << ", ";
3162 o << "offset: " << sreq->offset << ", ";
3163 o << "size: " << sreq->rlen << ")";
3164 break;
3165 }
3166
3167 //------------------------------------------------------------------------
3168 // kXR_pgread
3169 //------------------------------------------------------------------------
3170 case kXR_pgread:
3171 {
3172 ClientPgReadRequest *sreq = (ClientPgReadRequest *)msg;
3173 o << "kXR_pgread (";
3174 o << "handle: " << FileHandleToStr( sreq->fhandle );
3175 o << std::setbase(10);
3176 o << ", ";
3177 o << "offset: " << sreq->offset << ", ";
3178 o << "size: " << sreq->rlen << ")";
3179 break;
3180 }
3181
3182 //------------------------------------------------------------------------
3183 // kXR_write
3184 //------------------------------------------------------------------------
3185 case kXR_write:
3186 {
3187 ClientWriteRequest *sreq = (ClientWriteRequest *)msg;
3188 o << "kXR_write (";
3189 o << "handle: " << FileHandleToStr( sreq->fhandle );
3190 o << std::setbase(10);
3191 o << ", ";
3192 o << "offset: " << sreq->offset << ", ";
3193 o << "size: " << sreq->dlen << ")";
3194 break;
3195 }
3196
3197 //------------------------------------------------------------------------
3198 // kXR_pgwrite
3199 //------------------------------------------------------------------------
3200 case kXR_pgwrite:
3201 {
3202 ClientPgWriteRequest *sreq = (ClientPgWriteRequest *)msg;
3203 o << "kXR_pgwrite (";
3204 o << "handle: " << FileHandleToStr( sreq->fhandle );
3205 o << std::setbase(10);
3206 o << ", ";
3207 o << "offset: " << sreq->offset << ", ";
3208 o << "size: " << sreq->dlen << ")";
3209 break;
3210 }
3211
3212 //------------------------------------------------------------------------
3213 // kXR_fattr
3214 //------------------------------------------------------------------------
3215 case kXR_fattr:
3216 {
3217 ClientFattrRequest *sreq = (ClientFattrRequest *)msg;
3218 int nattr = sreq->numattr;
3219 int options = sreq->options;
3220 o << "kXR_fattr";
3221 switch (sreq->subcode) {
3222 case kXR_fattrGet:
3223 o << "Get";
3224 break;
3225 case kXR_fattrSet:
3226 o << "Set";
3227 break;
3228 case kXR_fattrList:
3229 o << "List";
3230 break;
3231 case kXR_fattrDel:
3232 o << "Delete";
3233 break;
3234 default:
3235 o << " unknown subcode: " << sreq->subcode;
3236 break;
3237 }
3238 o << " (handle: " << FileHandleToStr( sreq->fhandle );
3239 o << std::setbase(10);
3240 if (nattr)
3241 o << ", numattr: " << nattr;
3242 if (options) {
3243 o << ", options: ";
3244 if (options & 0x01)
3245 o << "new";
3246 if (options & 0x10)
3247 o << "list values";
3248 }
3249 o << ", total size: " << req->dlen << ")";
3250 break;
3251 }
3252
3253 //------------------------------------------------------------------------
3254 // kXR_sync
3255 //------------------------------------------------------------------------
3256 case kXR_sync:
3257 {
3258 ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3259 o << "kXR_sync (";
3260 o << "handle: " << FileHandleToStr( sreq->fhandle );
3261 o << ")";
3262 break;
3263 }
3264
3265 //------------------------------------------------------------------------
3266 // kXR_truncate
3267 //------------------------------------------------------------------------
3268 case kXR_truncate:
3269 {
3270 ClientTruncateRequest *sreq = (ClientTruncateRequest *)msg;
3271 o << "kXR_truncate (";
3272 if( !sreq->dlen )
3273 o << "handle: " << FileHandleToStr( sreq->fhandle );
3274 else
3275 {
3276 char *fn = GetDataAsString( msg );
3277 o << "file: " << fn;
3278 delete [] fn;
3279 }
3280 o << std::setbase(10);
3281 o << ", ";
3282 o << "offset: " << sreq->offset;
3283 o << ")";
3284 break;
3285 }
3286
3287 //------------------------------------------------------------------------
3288 // kXR_readv
3289 //------------------------------------------------------------------------
3290 case kXR_readv:
3291 {
3292 unsigned char *fhandle = 0;
3293 o << "kXR_readv (";
3294
3295 o << "handle: ";
3296 readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3297 fhandle = dataChunk[0].fhandle;
3298 if( fhandle )
3299 o << FileHandleToStr( fhandle );
3300 else
3301 o << "unknown";
3302 o << ", ";
3303 o << std::setbase(10);
3304 o << "chunks: [";
3305 uint64_t size = 0;
3306 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3307 {
3308 size += dataChunk[i].rlen;
3309 o << "(offset: " << dataChunk[i].offset;
3310 o << ", size: " << dataChunk[i].rlen << "); ";
3311 }
3312 o << "], ";
3313 o << "total size: " << size << ")";
3314 break;
3315 }
3316
3317 //------------------------------------------------------------------------
3318 // kXR_writev
3319 //------------------------------------------------------------------------
3320 case kXR_writev:
3321 {
3322 unsigned char *fhandle = 0;
3323 o << "kXR_writev (";
3324
3325 XrdProto::write_list *wrtList =
3326 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3327 uint64_t size = 0;
3328 uint32_t numChunks = 0;
3329 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3330 {
3331 fhandle = wrtList[i].fhandle;
3332 size += wrtList[i].wlen;
3333 ++numChunks;
3334 }
3335 o << "handle: ";
3336 if( fhandle )
3337 o << FileHandleToStr( fhandle );
3338 else
3339 o << "unknown";
3340 o << ", ";
3341 o << std::setbase(10);
3342 o << "chunks: " << numChunks << ", ";
3343 o << "total size: " << size << ")";
3344 break;
3345 }
3346
3347 //------------------------------------------------------------------------
3348 // kXR_locate
3349 //------------------------------------------------------------------------
3350 case kXR_locate:
3351 {
3352 ClientLocateRequest *sreq = (ClientLocateRequest *)msg;
3353 char *fn = GetDataAsString( msg );;
3354 o << "kXR_locate (";
3355 o << "path: " << fn << ", ";
3356 delete [] fn;
3357 o << "flags: ";
3358 if( sreq->options == 0 )
3359 o << "none";
3360 else
3361 {
3362 if( sreq->options & kXR_refresh )
3363 o << "kXR_refresh ";
3364 if( sreq->options & kXR_prefname )
3365 o << "kXR_prefname ";
3366 if( sreq->options & kXR_nowait )
3367 o << "kXR_nowait ";
3368 if( sreq->options & kXR_force )
3369 o << "kXR_force ";
3370 if( sreq->options & kXR_compress )
3371 o << "kXR_compress ";
3372 }
3373 o << ")";
3374 break;
3375 }
3376
3377 //------------------------------------------------------------------------
3378 // kXR_mv
3379 //------------------------------------------------------------------------
3380 case kXR_mv:
3381 {
3382 ClientMvRequest *sreq = (ClientMvRequest *)msg;
3383 o << "kXR_mv (";
3384 o << "source: ";
3385 o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3386 o << ", ";
3387 o << "destination: ";
3388 o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3389 o << ")";
3390 break;
3391 }
3392
3393 //------------------------------------------------------------------------
3394 // kXR_query
3395 //------------------------------------------------------------------------
3396 case kXR_query:
3397 {
3398 ClientQueryRequest *sreq = (ClientQueryRequest *)msg;
3399 o << "kXR_query (";
3400 o << "code: ";
3401 switch( sreq->infotype )
3402 {
3403 case kXR_Qconfig: o << "kXR_Qconfig"; break;
3404 case kXR_Qckscan: o << "kXR_Qckscan"; break;
3405 case kXR_Qcksum: o << "kXR_Qcksum"; break;
3406 case kXR_Qopaque: o << "kXR_Qopaque"; break;
3407 case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3408 case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3409 case kXR_QPrep: o << "kXR_QPrep"; break;
3410 case kXR_Qspace: o << "kXR_Qspace"; break;
3411 case kXR_QStats: o << "kXR_QStats"; break;
3412 case kXR_Qvisa: o << "kXR_Qvisa"; break;
3413 case kXR_Qxattr: o << "kXR_Qxattr"; break;
3414 default: o << sreq->infotype; break;
3415 }
3416 o << ", ";
3417
3418 if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3419 {
3420 o << "handle: " << FileHandleToStr( sreq->fhandle );
3421 o << ", ";
3422 }
3423
3424 o << "arg length: " << sreq->dlen << ")";
3425 break;
3426 }
3427
3428 //------------------------------------------------------------------------
3429 // kXR_rm
3430 //------------------------------------------------------------------------
3431 case kXR_rm:
3432 {
3433 o << "kXR_rm (";
3434 char *fn = GetDataAsString( msg );;
3435 o << "path: " << fn << ")";
3436 delete [] fn;
3437 break;
3438 }
3439
3440 //------------------------------------------------------------------------
3441 // kXR_mkdir
3442 //------------------------------------------------------------------------
3443 case kXR_mkdir:
3444 {
3445 ClientMkdirRequest *sreq = (ClientMkdirRequest *)msg;
3446 o << "kXR_mkdir (";
3447 char *fn = GetDataAsString( msg );
3448 o << "path: " << fn << ", ";
3449 delete [] fn;
3450 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3451 o << std::setbase(10);
3452 o << "flags: ";
3453 if( sreq->options[0] == 0 )
3454 o << "none";
3455 else
3456 {
3457 if( sreq->options[0] & kXR_mkdirpath )
3458 o << "kXR_mkdirpath";
3459 }
3460 o << ")";
3461 break;
3462 }
3463
3464 //------------------------------------------------------------------------
3465 // kXR_rmdir
3466 //------------------------------------------------------------------------
3467 case kXR_rmdir:
3468 {
3469 o << "kXR_rmdir (";
3470 char *fn = GetDataAsString( msg );
3471 o << "path: " << fn << ")";
3472 delete [] fn;
3473 break;
3474 }
3475
3476 //------------------------------------------------------------------------
3477 // kXR_chmod
3478 //------------------------------------------------------------------------
3479 case kXR_chmod:
3480 {
3481 ClientChmodRequest *sreq = (ClientChmodRequest *)msg;
3482 o << "kXR_chmod (";
3483 char *fn = GetDataAsString( msg );
3484 o << "path: " << fn << ", ";
3485 delete [] fn;
3486 o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3487 break;
3488 }
3489
3490 //------------------------------------------------------------------------
3491 // kXR_ping
3492 //------------------------------------------------------------------------
3493 case kXR_ping:
3494 {
3495 o << "kXR_ping ()";
3496 break;
3497 }
3498
3499 //------------------------------------------------------------------------
3500 // kXR_protocol
3501 //------------------------------------------------------------------------
3502 case kXR_protocol:
3503 {
3504 ClientProtocolRequest *sreq = (ClientProtocolRequest *)msg;
3505 o << "kXR_protocol (";
3506 o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3507 break;
3508 }
3509
3510 //------------------------------------------------------------------------
3511 // kXR_dirlist
3512 //------------------------------------------------------------------------
3513 case kXR_dirlist:
3514 {
3515 o << "kXR_dirlist (";
3516 char *fn = GetDataAsString( msg );;
3517 o << "path: " << fn << ")";
3518 delete [] fn;
3519 break;
3520 }
3521
3522 //------------------------------------------------------------------------
3523 // kXR_set
3524 //------------------------------------------------------------------------
3525 case kXR_set:
3526 {
3527 o << "kXR_set (";
3528 char *fn = GetDataAsString( msg );;
3529 o << "data: " << fn << ")";
3530 delete [] fn;
3531 break;
3532 }
3533
3534 //------------------------------------------------------------------------
3535 // kXR_prepare
3536 //------------------------------------------------------------------------
3537 case kXR_prepare:
3538 {
3539 ClientPrepareRequest *sreq = (ClientPrepareRequest *)msg;
3540 o << "kXR_prepare (";
3541 o << "flags: ";
3542
3543 if( sreq->options == 0 )
3544 o << "none";
3545 else
3546 {
3547 if( sreq->options & kXR_stage )
3548 o << "kXR_stage ";
3549 if( sreq->options & kXR_wmode )
3550 o << "kXR_wmode ";
3551 if( sreq->options & kXR_coloc )
3552 o << "kXR_coloc ";
3553 if( sreq->options & kXR_fresh )
3554 o << "kXR_fresh ";
3555 }
3556
3557 o << ", priority: " << (int) sreq->prty << ", ";
3558
3559 char *fn = GetDataAsString( msg );
3560 char *cursor;
3561 for( cursor = fn; *cursor; ++cursor )
3562 if( *cursor == '\n' ) *cursor = ' ';
3563
3564 o << "paths: " << fn << ")";
3565 delete [] fn;
3566 break;
3567 }
3568
3569 case kXR_chkpoint:
3570 {
3571 ClientChkPointRequest *sreq = (ClientChkPointRequest*)msg;
3572 o << "kXR_chkpoint (";
3573 o << "opcode: ";
3574 if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3575 else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3576 else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3577 else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3578 else if( sreq->opcode == kXR_ckpXeq )
3579 {
3580 o << "kXR_ckpXeq) ";
3581 // In this case our request body will be one of kXR_pgwrite,
3582 // kXR_truncate, kXR_write, or kXR_writev request.
3583 GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3584 }
3585
3586 break;
3587 }
3588
3589 //------------------------------------------------------------------------
3590 // Default
3591 //------------------------------------------------------------------------
3592 default:
3593 {
3594 o << "kXR_unknown (length: " << req->dlen << ")";
3595 break;
3596 }
3597 };
3598 }
static const int kXR_ckpRollback
Definition XProtocol.hh:217
kXR_int16 arg1len
Definition XProtocol.hh:460
@ kXR_fattrDel
Definition XProtocol.hh:300
@ kXR_fattrSet
Definition XProtocol.hh:303
@ kXR_fattrList
Definition XProtocol.hh:302
@ kXR_fattrGet
Definition XProtocol.hh:301
kXR_char fhandle[4]
Definition XProtocol.hh:565
kXR_char fhandle[4]
Definition XProtocol.hh:823
kXR_char fhandle[4]
Definition XProtocol.hh:848
kXR_char fhandle[4]
Definition XProtocol.hh:812
kXR_int32 dlen
Definition XProtocol.hh:461
kXR_char fhtemplt[4]
Definition XProtocol.hh:516
kXR_unt16 options
Definition XProtocol.hh:513
static const int kXR_ckpXeq
Definition XProtocol.hh:218
@ kXR_open_wrto
Definition XProtocol.hh:499
@ kXR_compress
Definition XProtocol.hh:482
@ kXR_async
Definition XProtocol.hh:488
@ kXR_delete
Definition XProtocol.hh:483
@ kXR_prefname
Definition XProtocol.hh:491
@ kXR_nowait
Definition XProtocol.hh:497
@ kXR_open_read
Definition XProtocol.hh:486
@ kXR_open_updt
Definition XProtocol.hh:487
@ kXR_mkpath
Definition XProtocol.hh:490
@ kXR_seqio
Definition XProtocol.hh:498
@ kXR_replica
Definition XProtocol.hh:495
@ kXR_posc
Definition XProtocol.hh:496
@ kXR_refresh
Definition XProtocol.hh:489
@ kXR_new
Definition XProtocol.hh:485
@ kXR_force
Definition XProtocol.hh:484
@ kXR_4dirlist
Definition XProtocol.hh:494
@ kXR_open_apnd
Definition XProtocol.hh:492
@ kXR_retstat
Definition XProtocol.hh:493
kXR_char fhandle[4]
Definition XProtocol.hh:543
kXR_unt16 optiont
Definition XProtocol.hh:514
kXR_char fhandle[4]
Definition XProtocol.hh:681
kXR_char fhandle[4]
Definition XProtocol.hh:695
kXR_char fhandle[4]
Definition XProtocol.hh:258
kXR_unt16 requestid
Definition XProtocol.hh:159
kXR_char fhandle[4]
Definition XProtocol.hh:669
@ kXR_read
Definition XProtocol.hh:126
@ kXR_open
Definition XProtocol.hh:123
@ kXR_writev
Definition XProtocol.hh:144
@ kXR_clone
Definition XProtocol.hh:145
@ kXR_readv
Definition XProtocol.hh:138
@ kXR_mkdir
Definition XProtocol.hh:121
@ kXR_sync
Definition XProtocol.hh:129
@ kXR_chmod
Definition XProtocol.hh:115
@ kXR_dirlist
Definition XProtocol.hh:117
@ kXR_fattr
Definition XProtocol.hh:133
@ kXR_rm
Definition XProtocol.hh:127
@ kXR_query
Definition XProtocol.hh:114
@ kXR_write
Definition XProtocol.hh:132
@ kXR_set
Definition XProtocol.hh:131
@ kXR_rmdir
Definition XProtocol.hh:128
@ kXR_truncate
Definition XProtocol.hh:141
@ kXR_protocol
Definition XProtocol.hh:119
@ kXR_mv
Definition XProtocol.hh:122
@ kXR_ping
Definition XProtocol.hh:124
@ kXR_stat
Definition XProtocol.hh:130
@ kXR_pgread
Definition XProtocol.hh:143
@ kXR_chkpoint
Definition XProtocol.hh:125
@ kXR_locate
Definition XProtocol.hh:140
@ kXR_close
Definition XProtocol.hh:116
@ kXR_pgwrite
Definition XProtocol.hh:139
@ kXR_prepare
Definition XProtocol.hh:134
kXR_int32 rlen
Definition XProtocol.hh:696
kXR_char options[1]
Definition XProtocol.hh:446
static const int kXR_ckpCommit
Definition XProtocol.hh:215
kXR_int64 offset
Definition XProtocol.hh:697
@ kXR_vfs
Definition XProtocol.hh:799
@ kXR_mkdirpath
Definition XProtocol.hh:440
@ kXR_wmode
Definition XProtocol.hh:625
@ kXR_fresh
Definition XProtocol.hh:627
@ kXR_coloc
Definition XProtocol.hh:626
@ kXR_stage
Definition XProtocol.hh:624
static const int kXR_ckpQuery
Definition XProtocol.hh:216
@ kXR_dup
Definition XProtocol.hh:503
@ kXR_samefs
Definition XProtocol.hh:504
@ kXR_QPrep
Definition XProtocol.hh:650
@ kXR_Qopaqug
Definition XProtocol.hh:661
@ kXR_Qconfig
Definition XProtocol.hh:655
@ kXR_Qopaquf
Definition XProtocol.hh:660
@ kXR_Qckscan
Definition XProtocol.hh:654
@ kXR_Qxattr
Definition XProtocol.hh:652
@ kXR_Qspace
Definition XProtocol.hh:653
@ kXR_Qvisa
Definition XProtocol.hh:656
@ kXR_QStats
Definition XProtocol.hh:649
@ kXR_Qcksum
Definition XProtocol.hh:651
@ kXR_Qopaque
Definition XProtocol.hh:659
kXR_char fhandle[4]
Definition XProtocol.hh:231
static const int kXR_ckpBegin
Definition XProtocol.hh:214
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
XrdSysError Log
Definition XrdConfig.cc:113
kXR_char fhandle[4]
Definition XProtocol.hh:873
kXR_char fhandle[4]
Definition XProtocol.hh:318

References ClientMvRequest::arg1len, ClientProtocolRequest::clientpv, ClientMvRequest::dlen, ClientPgWriteRequest::dlen, ClientQueryRequest::dlen, ClientRequestHdr::dlen, ClientStatRequest::dlen, ClientTruncateRequest::dlen, ClientWriteRequest::dlen, XrdProto::clone_list::dstOffs, XrdCl::Log::ErrorMsg, ClientCloneRequest::fhandle, ClientCloseRequest::fhandle, ClientFattrRequest::fhandle, ClientPgReadRequest::fhandle, ClientPgWriteRequest::fhandle, ClientQueryRequest::fhandle, ClientReadRequest::fhandle, ClientStatRequest::fhandle, ClientSyncRequest::fhandle, ClientTruncateRequest::fhandle, ClientWriteRequest::fhandle, readahead_list::fhandle, XrdProto::write_list::fhandle, ClientOpenRequest::fhtemplt, GenerateDescription(), XrdCl::Log::GetLevel(), XrdCl::DefaultEnv::GetLog(), ClientQueryRequest::infotype, kXR_4dirlist, kXR_async, kXR_chkpoint, kXR_chmod, kXR_ckpBegin, kXR_ckpCommit, kXR_ckpQuery, kXR_ckpRollback, kXR_ckpXeq, kXR_clone, kXR_close, kXR_coloc, kXR_compress, kXR_delete, kXR_dirlist, kXR_dup, kXR_fattr, kXR_fattrDel, kXR_fattrGet, kXR_fattrList, kXR_fattrSet, kXR_force, kXR_fresh, kXR_locate, kXR_mkdir, kXR_mkdirpath, kXR_mkpath, kXR_mv, kXR_new, kXR_nowait, kXR_open, kXR_open_apnd, kXR_open_read, kXR_open_updt, kXR_open_wrto, kXR_pgread, kXR_pgwrite, kXR_ping, kXR_posc, kXR_prefname, kXR_prepare, kXR_protocol, kXR_Qckscan, kXR_Qcksum, kXR_Qconfig, kXR_Qopaque, kXR_Qopaquf, kXR_Qopaqug, kXR_QPrep, kXR_Qspace, kXR_QStats, kXR_query, kXR_Qvisa, kXR_Qxattr, kXR_read, kXR_readv, kXR_refresh, kXR_replica, kXR_retstat, kXR_rm, kXR_rmdir, kXR_samefs, kXR_seqio, kXR_set, kXR_stage, kXR_stat, kXR_sync, kXR_truncate, kXR_vfs, kXR_wmode, kXR_write, kXR_writev, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientFattrRequest::numattr, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, readahead_list::offset, ClientChkPointRequest::opcode, ClientFattrRequest::options, ClientLocateRequest::options, ClientMkdirRequest::options, ClientOpenRequest::options, ClientPrepareRequest::options, ClientStatRequest::options, ClientOpenRequest::optiont, ClientPrepareRequest::prty, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, XrdProto::clone_list::srcLen, XrdProto::clone_list::srcOffs, ClientFattrRequest::subcode, and XrdProto::write_list::wlen.

Referenced by GenerateDescription(), and SetDescription().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetBindPreference()

URL XrdCl::XRootDTransport::GetBindPreference ( const URL & url,
AnyObject & channelData )
virtual

Get bind preference for the next data stream.

Implements XrdCl::TransportHandler.

Definition at line 1926 of file XrdClXRootDTransport.cc.

1928 {
1929 XRootDChannelInfo *info = 0;
1930 channelData.Get( info );
1931
1932 if(!info || !info->bindSelector)
1933 return url;
1934
1935 return URL( info->bindSelector->Get() );
1936 }

References XrdCl::XRootDChannelInfo::bindSelector, and XrdCl::AnyObject::Get().

Here is the call graph for this function:

◆ GetBody()

XRootDStatus XrdCl::XRootDTransport::GetBody ( Message & message,
Socket * socket )
virtual

Read the message body from the socket, the socket is non-blocking, the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 349 of file XrdClXRootDTransport.cc.

350 {
351 //--------------------------------------------------------------------------
352 // Retrieve the body
353 //--------------------------------------------------------------------------
354 size_t leftToBeRead = 0;
355 uint32_t bodySize = 0;
356 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
357 bodySize = rsphdr->dlen;
358
359 if( message.GetSize() < bodySize + 8 )
360 message.ReAllocate( bodySize + 8 );
361
362 leftToBeRead = bodySize-(message.GetCursor()-8);
363 while( leftToBeRead )
364 {
365 int bytesRead = 0;
366 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
367
368 if( !status.IsOK() || status.code == suRetry )
369 return status;
370
371 leftToBeRead -= bytesRead;
372 message.AdvanceCursor( bytesRead );
373 }
374
375 return XRootDStatus( stOK, suDone );
376 }
const uint16_t suRetry
const uint16_t stOK
Everything went OK.
const uint16_t suDone

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Status::code, ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), XrdCl::stOK, XrdCl::suDone, and XrdCl::suRetry.

Here is the call graph for this function:

◆ GetHeader()

XRootDStatus XrdCl::XRootDTransport::GetHeader ( Message & message,
Socket * socket )
virtual

Read a message header from the socket, the socket is non-blocking, so if there is not enough data the function should return suRetry in which case it will be called again when more data arrives, with the data previously read stored in the message buffer

Parameters
messagethe message buffer
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 309 of file XrdClXRootDTransport.cc.

310 {
311 //--------------------------------------------------------------------------
312 // A new message - allocate the space needed for the header
313 //--------------------------------------------------------------------------
314 if( message.GetCursor() == 0 && message.GetSize() < 8 )
315 message.Allocate( 8 );
316
317 //--------------------------------------------------------------------------
318 // Read the message header
319 //--------------------------------------------------------------------------
320 if( message.GetCursor() < 8 )
321 {
322 size_t leftToBeRead = 8 - message.GetCursor();
323 while( leftToBeRead )
324 {
325 int bytesRead = 0;
326 XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
327 leftToBeRead, bytesRead );
328 if( !status.IsOK() || status.code == suRetry )
329 return status;
330
331 leftToBeRead -= bytesRead;
332 message.AdvanceCursor( bytesRead );
333 }
334 UnMarshallHeader( message );
335
336 uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
337 Log *log = DefaultEnv::GetLog();
338 log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
339 "body", (void*)&message, bodySize );
340
341 return XRootDStatus( stOK, suDone );
342 }
343 return XRootDStatus( stError, errInternal );
344 }
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.

References XrdCl::Buffer::AdvanceCursor(), XrdCl::Buffer::Allocate(), XrdCl::Status::code, XrdCl::Log::Dump(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), XrdCl::Socket::Read(), XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarshallHeader(), and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ GetMore()

XRootDStatus XrdCl::XRootDTransport::GetMore ( Message & message,
Socket * socket )
virtual

Read more of the message body from the socket, the socket is non-blocking the method may be called multiple times - see GetHeader for details

Parameters
messagethe message buffer containing the header
socketthe socket
Returns
stOK & suDone if the whole message has been processed stOK & suRetry if more data is needed stError on failure

Implements XrdCl::TransportHandler.

Definition at line 381 of file XrdClXRootDTransport.cc.

382 {
383 ServerResponseHeader* rsphdr = (ServerResponseHeader*)message.GetBuffer();
384 if( rsphdr->status != kXR_status )
385 return XRootDStatus( stError, errInvalidOp );
386
387 //--------------------------------------------------------------------------
388 // In case of non kXR_status responses we read all the response, including
389 // data. For kXR_status responses we first read only the remainder of the
390 // header. The header must then be unmarshalled, and then a second call to
391 // GetMore (repeated for suRetry as needed) will read the data.
392 //--------------------------------------------------------------------------
393
394 uint32_t bodySize = rsphdr->dlen;
395 if( bodySize+8 < sizeof( ServerResponseStatus ) )
396 return XRootDStatus( stError, errInvalidMessage, 0,
397 "kXR_status: invalid message size." );
398
399 ServerResponseStatus *rspst = (ServerResponseStatus*)message.GetBuffer();
400 bodySize += rspst->bdy.dlen;
401
402 if( message.GetSize() < bodySize + 8 )
403 message.ReAllocate( bodySize + 8 );
404
405 size_t leftToBeRead = bodySize-(message.GetCursor()-8);
406 while( leftToBeRead )
407 {
408 int bytesRead = 0;
409 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
410
411 if( !status.IsOK() || status.code == suRetry )
412 return status;
413
414 leftToBeRead -= bytesRead;
415 message.AdvanceCursor( bytesRead );
416 }
417
418 // Unmarchal to message body
419 Log *log = DefaultEnv::GetLog();
420 XRootDStatus st = XRootDTransport::UnMarchalStatusMore( message );
421 if( !st.IsOK() && st.code == errDataError )
422 {
423 log->Error( XRootDTransportMsg, "[msg: %p] %s", (void*)&message,
424 st.GetErrorMessage().c_str() );
425 return st;
426 }
427
428 if( !st.IsOK() )
429 {
430 log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
431 (void*)&message );
432 return st;
433 }
434
435 return XRootDStatus( stOK, suDone );
436 }
@ kXR_status
Definition XProtocol.hh:949
struct ServerResponseBody_Status bdy
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidOp
const uint16_t errInvalidMessage

References XrdCl::Buffer::AdvanceCursor(), ServerResponseStatus::bdy, XrdCl::Status::code, ServerResponseBody_Status::dlen, ServerResponseHeader::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errInvalidOp, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), XrdCl::Status::IsOK(), kXR_status, XrdCl::Socket::Read(), XrdCl::Buffer::ReAllocate(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stOK, XrdCl::suDone, XrdCl::suRetry, UnMarchalStatusMore(), and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ GetSignature() [1/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
AnyObject & channelData )
virtual

Get signature for given message.

Implements XrdCl::TransportHandler.

Definition at line 1788 of file XrdClXRootDTransport.cc.

1789 {
1790 XRootDChannelInfo *info = 0;
1791 channelData.Get( info );
1792 return GetSignature( toSign, sign, info );
1793 }
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.

References XrdCl::AnyObject::Get(), and GetSignature().

Referenced by GetSignature().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetSignature() [2/2]

Status XrdCl::XRootDTransport::GetSignature ( Message * toSign,
Message *& sign,
XRootDChannelInfo * info )
virtual

Get signature for given message.

Definition at line 1798 of file XrdClXRootDTransport.cc.

1801 {
1802 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1803 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1804
1805 ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1806 if( !info ) return Status( stError, errInternal );
1807 if( info->protection )
1808 {
1809 SecurityRequest *newreq = 0;
1810 // check if we have to secure the request in the first place
1811 if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1812 // secure (sign/encrypt) the request
1813 int rc = info->protection->Secure( newreq, *thereq, 0 );
1814 // there was an error
1815 if( rc < 0 )
1816 return Status( stError, errInternal, -rc );
1817
1818 sign = new Message();
1819 sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1820 }
1821
1822 return Status();
1823 }
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.

References XrdCl::errInternal, XrdCl::errInvalidOp, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::Grab(), NEED2SECURE, XrdCl::XRootDChannelInfo::protection, XrdSecProtect::Secure(), and XrdCl::stError.

Here is the call graph for this function:

◆ HandShake()

XRootDStatus XrdCl::XRootDTransport::HandShake ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

HandShake.

Implements XrdCl::TransportHandler.

Definition at line 469 of file XrdClXRootDTransport.cc.

471 {
472 XRootDChannelInfo *info = 0;
473 channelData.Get( info );
474
475 if (!info)
476 return XRootDStatus(stFatal, errInternal);
477
478 XrdSysMutexHelper scopedLock( info->mutex );
479
480 if( info->stream.size() <= handShakeData->subStreamId )
481 {
482 Log *log = DefaultEnv::GetLog();
483 log->Error( XRootDTransportMsg,
484 "[%s] Internal error: not enough substreams",
485 handShakeData->streamName.c_str() );
486 return XRootDStatus( stFatal, errInternal );
487 }
488
489 if( handShakeData->subStreamId == 0 )
490 {
491 info->streamName = handShakeData->streamName;
492 return HandShakeMain( handShakeData, channelData );
493 }
494 return HandShakeParallel( handShakeData, channelData );
495 }
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::stFatal, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::XRootDChannelInfo::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ HandShakeDone()

bool XrdCl::XRootDTransport::HandShakeDone ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual

Implements XrdCl::TransportHandler.

Definition at line 748 of file XrdClXRootDTransport.cc.

750 {
751 XRootDChannelInfo *info = 0;
752 channelData.Get( info );
753
754 if (!info) {
756 "[%s] Internal error: no channel info",
757 handShakeData->streamName.c_str());
758 return false;
759 }
760
761 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
762 return ( sInfo.status == XRootDStreamInfo::Connected );
763 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, XrdCl::HandShakeData::streamName, XrdCl::HandShakeData::subStreamId, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ InitializeChannel()

void XrdCl::XRootDTransport::InitializeChannel ( const URL & url,
AnyObject & channelData )
virtual

Initialize channel.

Implements XrdCl::TransportHandler.

Definition at line 441 of file XrdClXRootDTransport.cc.

443 {
444 XRootDChannelInfo *info = new XRootDChannelInfo( url );
445 XrdSysMutexHelper scopedLock( info->mutex );
446 channelData.Set( info );
447
448 Env *env = DefaultEnv::GetEnv();
449 int streams = DefaultSubStreamsPerChannel;
450 env->GetInt( "SubStreamsPerChannel", streams );
451 if( streams < 1 ) streams = 1;
452 info->stream.resize( streams );
453 info->strmSelector.reset( new StreamSelector( streams ) );
454 info->encrypted = url.IsSecure();
455 info->istpc = url.IsTPC();
456 info->logintoken = url.GetLoginToken();
457 }
static Env * GetEnv()
Get default client environment.
const int DefaultSubStreamsPerChannel

References XrdCl::DefaultSubStreamsPerChannel, XrdCl::XRootDChannelInfo::encrypted, XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::URL::GetLoginToken(), XrdCl::URL::IsSecure(), XrdCl::URL::IsTPC(), XrdCl::XRootDChannelInfo::istpc, XrdCl::XRootDChannelInfo::logintoken, XrdCl::XRootDChannelInfo::mutex, XrdCl::AnyObject::Set(), XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDChannelInfo::strmSelector.

Here is the call graph for this function:

◆ IsStreamBroken()

Status XrdCl::XRootDTransport::IsStreamBroken ( time_t inactiveTime,
AnyObject & channelData )
virtual

Check the stream is broken - ie. TCP connection got broken and went undetected by the TCP stack

Implements XrdCl::TransportHandler.

Definition at line 821 of file XrdClXRootDTransport.cc.

823 {
824 XRootDChannelInfo *info = 0;
825 channelData.Get( info );
826 Env *env = DefaultEnv::GetEnv();
827 Log *log = DefaultEnv::GetLog();
828
829 if (!info) {
830 log->Error(XRootDTransportMsg,
831 "Internal error: no channel info, behaving as if stream is broken");
832 return true;
833 }
834
835 int streamTimeout = DefaultStreamTimeout;
836 env->GetInt( "StreamTimeout", streamTimeout );
837
838 XrdSysMutexHelper scopedLock( info->mutex );
839
840 const time_t now = time(0);
841 const bool anySID =
842 info->sidManager->IsAnySIDOldAs( now - streamTimeout );
843
844 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
845 "stream timeout: %d, any SID: %d, wait barrier: %s",
846 info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
847 anySID, Utils::TimeToString(info->waitBarrier).c_str() );
848
849 if( inactiveTime < streamTimeout )
850 return Status();
851
852 if( now < info->waitBarrier )
853 return Status();
854
855 if( !anySID )
856 return Status();
857
858 return Status( stError, errSocketTimeout );
859 }
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
const uint16_t errSocketTimeout
const int DefaultStreamTimeout

References XrdCl::DefaultStreamTimeout, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::errSocketTimeout, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::sidManager, XrdCl::stError, XrdCl::XRootDChannelInfo::streamName, XrdCl::Utils::TimeToString(), XrdCl::XRootDChannelInfo::waitBarrier, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ IsStreamTTLElapsed()

bool XrdCl::XRootDTransport::IsStreamTTLElapsed ( time_t time,
AnyObject & channelData )
virtual

Check if the stream should be disconnected.

Implements XrdCl::TransportHandler.

Definition at line 768 of file XrdClXRootDTransport.cc.

770 {
771 XRootDChannelInfo *info = 0;
772 channelData.Get( info );
773
774 Env *env = DefaultEnv::GetEnv();
775 Log *log = DefaultEnv::GetLog();
776
777 if (!info) {
778 log->Error(XRootDTransportMsg,
779 "Internal error: no channel info, behaving as if TTL has elapsed");
780 return true;
781 }
782
783 //--------------------------------------------------------------------------
784 // Check the TTL settings for the current server
785 //--------------------------------------------------------------------------
786 int ttl;
787 if( info->serverFlags & kXR_isServer )
788 {
790 env->GetInt( "DataServerTTL", ttl );
791 }
792 else
793 {
795 env->GetInt( "LoadBalancerTTL", ttl );
796 }
797
798 //--------------------------------------------------------------------------
799 // See whether we can give a go-ahead for the disconnection
800 //--------------------------------------------------------------------------
801 XrdSysMutexHelper scopedLock( info->mutex );
802 uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
803 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
804 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
805 info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
806 info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
807
808 if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
809 return false;
810
811 if( !allocatedSIDs && inactiveTime > ttl )
812 return true;
813
814 return false;
815 }
#define kXR_isServer
const int DefaultLoadBalancerTTL
const int DefaultDataServerTTL

References XrdCl::DefaultDataServerTTL, XrdCl::DefaultLoadBalancerTTL, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), kXR_isServer, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::openFiles, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::sidManager, XrdCl::XRootDChannelInfo::streamName, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ LogErrorResponse()

void XrdCl::XRootDTransport::LogErrorResponse ( const Message & msg)
static

Log server error response.

Definition at line 1524 of file XrdClXRootDTransport.cc.

1525 {
1526 Log *log = DefaultEnv::GetLog();
1527 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1528 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1529 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1530 log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1531 rsp->body.error.errnum, errmsg );
1532 delete [] errmsg;
1533 }
union ServerResponse::@040373375333017131300127053271011057331004327334 body
ServerResponseHeader hdr

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ MarshallRequest() [1/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( char * msg)
static

Marshal the outgoing message.

Definition at line 1105 of file XrdClXRootDTransport.cc.

1106 {
1107 ClientRequest *req = (ClientRequest*)msg;
1108 switch( req->header.requestid )
1109 {
1110 //------------------------------------------------------------------------
1111 // kXR_protocol
1112 //------------------------------------------------------------------------
1113 case kXR_protocol:
1114 req->protocol.clientpv = htonl( req->protocol.clientpv );
1115 break;
1116
1117 //------------------------------------------------------------------------
1118 // kXR_login
1119 //------------------------------------------------------------------------
1120 case kXR_login:
1121 req->login.pid = htonl( req->login.pid );
1122 break;
1123
1124 //------------------------------------------------------------------------
1125 // kXR_locate
1126 //------------------------------------------------------------------------
1127 case kXR_locate:
1128 req->locate.options = htons( req->locate.options );
1129 break;
1130
1131 //------------------------------------------------------------------------
1132 // kXR_query
1133 //------------------------------------------------------------------------
1134 case kXR_query:
1135 req->query.infotype = htons( req->query.infotype );
1136 break;
1137
1138 //------------------------------------------------------------------------
1139 // kXR_truncate
1140 //------------------------------------------------------------------------
1141 case kXR_truncate:
1142 req->truncate.offset = htonll( req->truncate.offset );
1143 break;
1144
1145 //------------------------------------------------------------------------
1146 // kXR_mkdir
1147 //------------------------------------------------------------------------
1148 case kXR_mkdir:
1149 req->mkdir.mode = htons( req->mkdir.mode );
1150 break;
1151
1152 //------------------------------------------------------------------------
1153 // kXR_chmod
1154 //------------------------------------------------------------------------
1155 case kXR_chmod:
1156 req->chmod.mode = htons( req->chmod.mode );
1157 break;
1158
1159 //------------------------------------------------------------------------
1160 // kXR_open
1161 //------------------------------------------------------------------------
1162 case kXR_open:
1163 req->open.mode = htons( req->open.mode );
1164 req->open.options = htons( req->open.options );
1165 req->open.optiont = htons( req->open.optiont );
1166 break;
1167
1168 //------------------------------------------------------------------------
1169 // kXR_read
1170 //------------------------------------------------------------------------
1171 case kXR_read:
1172 req->read.offset = htonll( req->read.offset );
1173 req->read.rlen = htonl( req->read.rlen );
1174 break;
1175
1176 //------------------------------------------------------------------------
1177 // kXR_write
1178 //------------------------------------------------------------------------
1179 case kXR_write:
1180 req->write.offset = htonll( req->write.offset );
1181 break;
1182
1183 //------------------------------------------------------------------------
1184 // kXR_mv
1185 //------------------------------------------------------------------------
1186 case kXR_mv:
1187 req->mv.arg1len = htons( req->mv.arg1len );
1188 break;
1189
1190 //------------------------------------------------------------------------
1191 // kXR_readv
1192 //------------------------------------------------------------------------
1193 case kXR_readv:
1194 {
1195 uint16_t numChunks = (req->readv.dlen)/16;
1196 readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1197 for( size_t i = 0; i < numChunks; ++i )
1198 {
1199 dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1200 dataChunk[i].offset = htonll( dataChunk[i].offset );
1201 }
1202 break;
1203 }
1204
1205 case kXR_clone:
1206 {
1207 uint32_t numChunks = (req->clone.dlen)/sizeof(XrdProto::clone_list);
1208 XrdProto::clone_list *dataChunk =
1209 (XrdProto::clone_list*)( msg + sizeof( ClientRequestHdr ) );
1210 for( size_t i = 0; i < numChunks; ++i )
1211 {
1212 dataChunk[i].srcOffs = htonll( dataChunk[i].srcOffs );
1213 dataChunk[i].srcLen = htonll( dataChunk[i].srcLen );
1214 dataChunk[i].dstOffs = htonll( dataChunk[i].dstOffs );
1215 }
1216 break;
1217 }
1218
1219 //------------------------------------------------------------------------
1220 // kXR_writev
1221 //------------------------------------------------------------------------
1222 case kXR_writev:
1223 {
1224 uint16_t numChunks = (req->writev.dlen)/16;
1225 XrdProto::write_list *wrtList =
1226 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1227 for( size_t i = 0; i < numChunks; ++i )
1228 {
1229 wrtList[i].wlen = htonl( wrtList[i].wlen );
1230 wrtList[i].offset = htonll( wrtList[i].offset );
1231 }
1232
1233 break;
1234 }
1235
1236 case kXR_pgread:
1237 {
1238 req->pgread.offset = htonll( req->pgread.offset );
1239 req->pgread.rlen = htonl( req->pgread.rlen );
1240 break;
1241 }
1242
1243 case kXR_pgwrite:
1244 {
1245 req->pgwrite.offset = htonll( req->pgwrite.offset );
1246 break;
1247 }
1248
1249 //------------------------------------------------------------------------
1250 // kXR_prepare
1251 //------------------------------------------------------------------------
1252 case kXR_prepare:
1253 {
1254 req->prepare.optionX = htons( req->prepare.optionX );
1255 req->prepare.port = htons( req->prepare.port );
1256 break;
1257 }
1258
1259 case kXR_chkpoint:
1260 {
1261 if( req->chkpoint.opcode == kXR_ckpXeq )
1262 MarshallRequest( msg + 24 );
1263 break;
1264 }
1265 };
1266
1267 req->header.requestid = htons( req->header.requestid );
1268 req->header.dlen = htonl( req->header.dlen );
1269 return XRootDStatus();
1270 }
struct ClientTruncateRequest truncate
Definition XProtocol.hh:917
struct ClientPgReadRequest pgread
Definition XProtocol.hh:903
struct ClientMkdirRequest mkdir
Definition XProtocol.hh:900
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:904
struct ClientReadVRequest readv
Definition XProtocol.hh:910
struct ClientOpenRequest open
Definition XProtocol.hh:902
struct ClientRequestHdr header
Definition XProtocol.hh:887
struct ClientWriteVRequest writev
Definition XProtocol.hh:919
struct ClientLoginRequest login
Definition XProtocol.hh:899
@ kXR_login
Definition XProtocol.hh:120
struct ClientChmodRequest chmod
Definition XProtocol.hh:891
struct ClientQueryRequest query
Definition XProtocol.hh:908
struct ClientReadRequest read
Definition XProtocol.hh:909
struct ClientMvRequest mv
Definition XProtocol.hh:901
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:890
struct ClientPrepareRequest prepare
Definition XProtocol.hh:906
struct ClientWriteRequest write
Definition XProtocol.hh:918
struct ClientProtocolRequest protocol
Definition XProtocol.hh:907
struct ClientLocateRequest locate
Definition XProtocol.hh:898
struct ClientCloneRequest clone
Definition XProtocol.hh:892
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.

References ClientMvRequest::arg1len, ClientRequest::chkpoint, ClientRequest::chmod, ClientProtocolRequest::clientpv, ClientRequest::clone, ClientCloneRequest::dlen, ClientReadVRequest::dlen, ClientRequestHdr::dlen, ClientWriteVRequest::dlen, XrdProto::clone_list::dstOffs, ClientRequest::header, ClientQueryRequest::infotype, kXR_chkpoint, kXR_chmod, kXR_ckpXeq, kXR_clone, kXR_locate, kXR_login, kXR_mkdir, kXR_mv, kXR_open, kXR_pgread, kXR_pgwrite, kXR_prepare, kXR_protocol, kXR_query, kXR_read, kXR_readv, kXR_truncate, kXR_write, kXR_writev, ClientRequest::locate, ClientRequest::login, MarshallRequest(), ClientRequest::mkdir, ClientChmodRequest::mode, ClientMkdirRequest::mode, ClientOpenRequest::mode, ClientRequest::mv, ClientPgReadRequest::offset, ClientPgWriteRequest::offset, ClientReadRequest::offset, ClientTruncateRequest::offset, ClientWriteRequest::offset, readahead_list::offset, XrdProto::write_list::offset, ClientChkPointRequest::opcode, ClientRequest::open, ClientLocateRequest::options, ClientOpenRequest::options, ClientOpenRequest::optiont, ClientPrepareRequest::optionX, ClientRequest::pgread, ClientRequest::pgwrite, ClientLoginRequest::pid, ClientPrepareRequest::port, ClientRequest::prepare, ClientRequest::protocol, ClientRequest::query, ClientRequest::read, ClientRequest::readv, ClientRequestHdr::requestid, ClientPgReadRequest::rlen, ClientReadRequest::rlen, readahead_list::rlen, XrdProto::clone_list::srcLen, XrdProto::clone_list::srcOffs, ClientRequest::truncate, XrdProto::write_list::wlen, ClientRequest::write, and ClientRequest::writev.

Here is the call graph for this function:

◆ MarshallRequest() [2/2]

XRootDStatus XrdCl::XRootDTransport::MarshallRequest ( Message * msg)
inlinestatic

Marshal the outgoing message.

Definition at line 175 of file XrdClXRootDTransport.hh.

176 {
177 MarshallRequest( msg->GetBuffer() );
178 msg->SetIsMarshalled( true );
179 return XRootDStatus();
180 }

References XrdCl::Buffer::GetBuffer(), MarshallRequest(), and XrdCl::Message::SetIsMarshalled().

Referenced by MarshallRequest(), MarshallRequest(), MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), XrdCl::MessageUtils::SendMessage(), and UnMarshallRequest().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ MessageReceived()

uint32_t XrdCl::XRootDTransport::MessageReceived ( Message & msg,
uint16_t subStream,
AnyObject & channelData )
virtual

Check if the message invokes a stream action.

Implements XrdCl::TransportHandler.

Definition at line 1646 of file XrdClXRootDTransport.cc.

1649 {
1650 XRootDChannelInfo *info = 0;
1651 channelData.Get( info );
1652 XrdSysMutexHelper scopedLock( info->mutex );
1653 Log *log = DefaultEnv::GetLog();
1654
1655 //--------------------------------------------------------------------------
1656 // Update the substream queues
1657 //--------------------------------------------------------------------------
1658 info->strmSelector->MsgReceived( subStream );
1659
1660 //--------------------------------------------------------------------------
1661 // Check whether this message is a response to a request that has
1662 // timed out, and if so, drop it
1663 //--------------------------------------------------------------------------
1664 ServerResponse *rsp = (ServerResponse*)msg.GetBuffer();
1665 if( rsp->hdr.status == kXR_attn )
1666 {
1667 return NoAction;
1668 }
1669
1670 if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1671 {
1672 log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1673 "response that we're no longer interested in (timed out)",
1674 (void*)&msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1675 //------------------------------------------------------------------------
1676 // If it is kXR_waitresp there will be another one,
1677 // so we don't release the sid yet
1678 //------------------------------------------------------------------------
1679 if( rsp->hdr.status != kXR_waitresp )
1680 info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1681 //------------------------------------------------------------------------
1682 // If it is a successful response to an open request
1683 // that timed out, we need to send a close
1684 //------------------------------------------------------------------------
1685 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1686 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1687 if( sidIt != info->sentOpens.end() )
1688 {
1689 info->sentOpens.erase( sidIt );
1690 if( rsp->hdr.status == kXR_ok ) return RequestClose;
1691 }
1692 return DigestMsg;
1693 }
1694
1695 //--------------------------------------------------------------------------
1696 // If we have a wait or waitresp
1697 //--------------------------------------------------------------------------
1698 uint32_t seconds = 0;
1699 if( rsp->hdr.status == kXR_wait )
1700 seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1701 // to re-send the request
1702 else if( rsp->hdr.status == kXR_waitresp )
1703 {
1704 seconds = ntohl( rsp->body.waitresp.seconds );
1705
1706 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1707 "setting up wait barrier.",
1708 info->streamName.c_str(),
1709 seconds );
1710 }
1711
1712 time_t barrier = time(0) + seconds;
1713 if( info->waitBarrier < barrier )
1714 info->waitBarrier = barrier;
1715
1716 //--------------------------------------------------------------------------
1717 // If we got a response to an open request, we may need to bump the counter
1718 // of open files
1719 //--------------------------------------------------------------------------
1720 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1721 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1722 if( sidIt != info->sentOpens.end() )
1723 {
1724 if( rsp->hdr.status == kXR_waitresp )
1725 return NoAction;
1726 info->sentOpens.erase( sidIt );
1727 if( rsp->hdr.status == kXR_ok )
1728 {
1729 ++info->openFiles;
1730 info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1731 }
1732 return NoAction;
1733 }
1734
1735 //--------------------------------------------------------------------------
1736 // If we got a response to a close, we may need to decrement the counter of
1737 // open files
1738 //--------------------------------------------------------------------------
1739 sidIt = info->sentCloses.find( sid );
1740 if( sidIt != info->sentCloses.end() )
1741 {
1742 if( rsp->hdr.status == kXR_waitresp )
1743 return NoAction;
1744 info->sentCloses.erase( sidIt );
1745 --info->openFiles;
1746 return NoAction;
1747 }
1748 return NoAction;
1749 }
kXR_char streamid[2]
Definition XProtocol.hh:956
@ kXR_waitresp
Definition XProtocol.hh:948
@ kXR_ok
Definition XProtocol.hh:941
@ kXR_attn
Definition XProtocol.hh:943
@ kXR_wait
Definition XProtocol.hh:947
@ RequestClose
Send a close request.
const uint64_t XRootDMsg

References ServerResponse::body, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::XRootDChannelInfo::finstcnt, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), ServerResponse::hdr, kXR_attn, kXR_ok, kXR_wait, kXR_waitresp, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportHandler::NoAction, XrdCl::XRootDChannelInfo::openFiles, XrdCl::TransportHandler::RequestClose, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, XrdCl::XRootDChannelInfo::sidManager, ServerResponseHeader::status, ServerResponseHeader::streamid, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, XrdCl::XRootDChannelInfo::waitBarrier, XrdCl::XRootDMsg, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ MessageSent()

void XrdCl::XRootDTransport::MessageSent ( Message * msg,
uint16_t subStream,
uint32_t bytesSent,
AnyObject & channelData )
virtual

Notify the transport about a message having been sent.

Implements XrdCl::TransportHandler.

Definition at line 1754 of file XrdClXRootDTransport.cc.

1758 {
1759 // Called when a message has been sent. For messages that return on a
1760 // different pathid (and hence may use a different poller) it is possible
1761 // that the server has already replied and the reply will trigger
1762 // MessageReceived() before this method has been called. However for open
1763 // and close this is never the case and this method is used for tracking
1764 // only those.
1765 XRootDChannelInfo *info = 0;
1766 channelData.Get( info );
1767 XrdSysMutexHelper scopedLock( info->mutex );
1768 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1769 uint16_t reqid = ntohs( req->header.requestid );
1770
1771
1772 //--------------------------------------------------------------------------
1773 // We need to track opens to know if we can close streams due to idleness
1774 //--------------------------------------------------------------------------
1775 uint16_t sid;
1776 memcpy( &sid, req->header.streamid, 2 );
1777
1778 if( reqid == kXR_open )
1779 info->sentOpens.insert( sid );
1780 else if( reqid == kXR_close )
1781 info->sentCloses.insert( sid );
1782 }
kXR_char streamid[2]
Definition XProtocol.hh:158

References XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_close, kXR_open, XrdCl::XRootDChannelInfo::mutex, ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::sentCloses, XrdCl::XRootDChannelInfo::sentOpens, and ClientRequestHdr::streamid.

Here is the call graph for this function:

◆ Multiplex()

PathID XrdCl::XRootDTransport::Multiplex ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up stream this message should be sent by and the down stream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 864 of file XrdClXRootDTransport.cc.

865 {
866 return PathID( 0, 0 );
867 }

◆ MultiplexSubStream()

PathID XrdCl::XRootDTransport::MultiplexSubStream ( Message * msg,
AnyObject & channelData,
PathID * hint = 0 )
virtual

Return the ID for the up substream this message should be sent by and the down substream which the answer should be expected at. Modify the message itself if necessary. If hint is non-zero then the message should be modified such that the answer will be returned via the hinted stream.

Implements XrdCl::TransportHandler.

Definition at line 872 of file XrdClXRootDTransport.cc.

875 {
876 XRootDChannelInfo *info = 0;
877 channelData.Get( info );
878
879 if (!info) {
881 "Internal error: no channel info, cannot multiplex");
882 return PathID(0,0);
883 }
884
885 XrdSysMutexHelper scopedLock( info->mutex );
886
887 //--------------------------------------------------------------------------
888 // If we're not connected to a data server or we don't know that yet
889 // we stream through 0
890 //--------------------------------------------------------------------------
891 if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
892 return PathID( 0, 0 );
893
894 //--------------------------------------------------------------------------
895 // Select the streams
896 //--------------------------------------------------------------------------
897 Log *log = DefaultEnv::GetLog();
898 uint16_t upStream = 0;
899 uint16_t downStream = 0;
900
901 if( hint )
902 {
903 upStream = hint->up;
904 downStream = hint->down;
905 }
906 else
907 {
908 upStream = 0;
909 std::vector<bool> connected;
910 connected.reserve( info->stream.size() - 1 );
911 size_t nbConnected = 0;
912 for( size_t i = 1; i < info->stream.size(); ++i )
913 if( info->stream[i].status == XRootDStreamInfo::Connected )
914 {
915 connected.push_back( true );
916 ++nbConnected;
917 }
918 else
919 connected.push_back( false );
920
921 if( nbConnected == 0 )
922 downStream = 0;
923 else
924 downStream = info->strmSelector->Select( connected );
925 }
926
927 if( upStream >= info->stream.size() )
928 {
929 log->Debug( XRootDTransportMsg,
930 "[%s] Up link stream %d does not exist, using 0",
931 info->streamName.c_str(), upStream );
932 upStream = 0;
933 }
934
935 if( downStream >= info->stream.size() )
936 {
937 log->Debug( XRootDTransportMsg,
938 "[%s] Down link stream %d does not exist, using 0",
939 info->streamName.c_str(), downStream );
940 downStream = 0;
941 }
942
943 //--------------------------------------------------------------------------
944 // Modify the message
945 //--------------------------------------------------------------------------
946 UnMarshallRequest( msg );
947 ClientRequestHdr *hdr = (ClientRequestHdr*)msg->GetBuffer();
948 switch( hdr->requestid )
949 {
950 //------------------------------------------------------------------------
951 // Read - we update the path id to tell the server where we want to
952 // get the response, but we still send the request through stream 0
953 // We need to allocate space for read_args if we don't have it
954 // included yet
955 //------------------------------------------------------------------------
956 case kXR_read:
957 {
958 if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
959 {
960 msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
961 void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
962 memset( newBuf, 0, 8 );
963 ClientReadRequest *req = (ClientReadRequest*)msg->GetBuffer();
964 req->dlen += 8;
965 }
966 read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
967 args->pathid = info->stream[downStream].pathId;
968 break;
969 }
970
971
972 //------------------------------------------------------------------------
973 // PgRead - we update the path id to tell the server where we want to
974 // get the response, but we still send the request through stream 0
975 // We need to allocate space for ClientPgReadReqArgs if we don't have it
976 // included yet
977 //------------------------------------------------------------------------
978 case kXR_pgread:
979 {
980 if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
981 {
982 msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
983 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
984 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
985 ClientPgReadRequest *req = (ClientPgReadRequest*)msg->GetBuffer();
986 req->dlen += sizeof( ClientPgReadReqArgs );
987 }
988 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
989 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
990 args->pathid = info->stream[downStream].pathId;
991 break;
992 }
993
994 //------------------------------------------------------------------------
995 // ReadV - the situation is identical to read but we don't need any
996 // additional structures to specify the return path
997 //------------------------------------------------------------------------
998 case kXR_readv:
999 {
1000 ClientReadVRequest *req = (ClientReadVRequest*)msg->GetBuffer();
1001 req->pathid = info->stream[downStream].pathId;
1002 break;
1003 }
1004
1005 //------------------------------------------------------------------------
1006 // Write - multiplexing writes doesn't work properly in the server
1007 //------------------------------------------------------------------------
1008 case kXR_write:
1009 {
1010// ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
1011// req->pathid = info->stream[downStream].pathId;
1012 break;
1013 }
1014
1015 //------------------------------------------------------------------------
1016 // WriteV - multiplexing writes doesn't work properly in the server
1017 //------------------------------------------------------------------------
1018 case kXR_writev:
1019 {
1020// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1021// req->pathid = info->stream[downStream].pathId;
1022 break;
1023 }
1024
1025 //------------------------------------------------------------------------
1026 // PgWrite - multiplexing writes doesn't work properly in the server
1027 //------------------------------------------------------------------------
1028 case kXR_pgwrite:
1029 {
1030// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
1031// req->pathid = info->stream[downStream].pathId;
1032 break;
1033 }
1034 };
1035 MarshallRequest( msg );
1036 return PathID( upStream, downStream );
1037 }
kXR_char pathid
Definition XProtocol.hh:689
static XRootDStatus UnMarshallRequest(Message *msg)

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Debug(), ClientPgReadRequest::dlen, ClientReadRequest::dlen, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetLog(), XrdCl::Buffer::GetSize(), kXR_isServer, kXR_pgread, kXR_pgwrite, kXR_read, kXR_readv, kXR_write, kXR_writev, MarshallRequest(), XrdCl::XRootDChannelInfo::mutex, ClientPgReadReqArgs::pathid, ClientReadVRequest::pathid, read_args::pathid, XrdCl::Buffer::ReAllocate(), ClientRequestHdr::requestid, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::streamName, XrdCl::XRootDChannelInfo::strmSelector, UnMarshallRequest(), XrdCl::PathID::up, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ NbConnectedStrm()

uint16_t XrdCl::XRootDTransport::NbConnectedStrm ( AnyObject & channelData)
static

Number of currently connected data streams.

Definition at line 1538 of file XrdClXRootDTransport.cc.

1539 {
1540 XRootDChannelInfo *info = 0;
1541 channelData.Get( info );
1542
1543 if (!info) {
1544 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1545 return 0;
1546 }
1547
1548 XrdSysMutexHelper scopedLock( info->mutex );
1549
1550 uint16_t nbConnected = 0;
1551 for( size_t i = 1; i < info->stream.size(); ++i )
1552 if( info->stream[i].status == XRootDStreamInfo::Connected )
1553 ++nbConnected;
1554
1555 return nbConnected;
1556 }

References XrdCl::XRootDStreamInfo::Connected, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::stream, and XrdCl::XRootDTransportMsg.

Referenced by XrdCl::Channel::NbConnectedStrm().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ NeedControlConnection()

virtual bool XrdCl::XRootDTransport::NeedControlConnection ( )
inlinevirtual

Return the information whether a control connection needs to be valid before establishing other connections

Definition at line 167 of file XrdClXRootDTransport.hh.

168 {
169 return true;
170 }

◆ NeedEncryption()

bool XrdCl::XRootDTransport::NeedEncryption ( HandShakeData * handShakeData,
AnyObject & channelData )
virtual
Returns
: true if encryption should be turned on, false otherwise

Implements XrdCl::TransportHandler.

Definition at line 1848 of file XrdClXRootDTransport.cc.

1850 {
1851 XRootDChannelInfo *info = 0;
1852 channelData.Get( info );
1853
1854 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
1855 int notlsok = DefaultNoTlsOK;
1856 env->GetInt( "NoTlsOK", notlsok );
1857
1858
1859 if( notlsok )
1860 return info->encrypted;
1861
1862 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1863
1864 // Did the server instructed us to switch to TLS right away?
1865 if( sInfo.serverFlags & kXR_gotoTLS )
1866 {
1867 if( handShakeData->subStreamId == 0 ) info->encrypted = true;
1868 return true ;
1869 }
1870
1871 //--------------------------------------------------------------------------
1872 // The control stream (sub-stream 0) might need to switch to TLS before
1873 // login or after login
1874 //--------------------------------------------------------------------------
1875 if( handShakeData->subStreamId == 0 )
1876 {
1877 //------------------------------------------------------------------------
1878 // We are about to login and the server asked to start encrypting
1879 // before login
1880 //------------------------------------------------------------------------
1881 if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1882 ( info->serverFlags & kXR_tlsLogin ) )
1883 {
1884 info->encrypted = true;
1885 return true;
1886 }
1887
1888 //--------------------------------------------------------------------
1889 // The hand-shake is done and the server requested to encrypt the session
1890 //--------------------------------------------------------------------
1891 if( (sInfo.status == XRootDStreamInfo::Connected ||
1892 //--------------------------------------------------------------------
1893 // we really need to turn on TLS before we sent kXR_endsess and we
1894 // are about to do so (1st enable encryption, then send kXR_endsess)
1895 //--------------------------------------------------------------------
1896 sInfo.status == XRootDStreamInfo::EndSessionSent ) &&
1897 ( info->serverFlags & kXR_tlsSess ) )
1898 {
1899 info->encrypted = true;
1900 return true;
1901 }
1902 }
1903 //--------------------------------------------------------------------------
1904 // A data stream (sub-stream > 0) if need be will be switched to TLS before
1905 // bind.
1906 //--------------------------------------------------------------------------
1907 else
1908 {
1909 //------------------------------------------------------------------------
1910 // We are about to bind a data stream and the server asked to start
1911 // encrypting before bind
1912 //------------------------------------------------------------------------
1913 if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1914 ( info->serverFlags & kXR_tlsData ) )
1915 {
1916 return true;
1917 }
1918 }
1919
1920 return false;
1921 }
#define kXR_tlsLogin
#define kXR_gotoTLS
#define kXR_tlsSess
#define kXR_tlsData
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
const int DefaultNoTlsOK

References XrdCl::XRootDStreamInfo::BindSent, XrdCl::XRootDStreamInfo::Connected, XrdCl::DefaultNoTlsOK, XrdCl::XRootDChannelInfo::encrypted, XrdCl::XRootDStreamInfo::EndSessionSent, XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), kXR_gotoTLS, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDStreamInfo::LoginSent, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDStreamInfo::serverFlags, XrdCl::XRootDStreamInfo::status, XrdCl::XRootDChannelInfo::stream, and XrdCl::HandShakeData::subStreamId.

Here is the call graph for this function:

◆ Query()

Status XrdCl::XRootDTransport::Query ( uint16_t query,
AnyObject & result,
AnyObject & channelData )
virtual

Query the channel.

Implements XrdCl::TransportHandler.

Definition at line 1594 of file XrdClXRootDTransport.cc.

1597 {
1598 XRootDChannelInfo *info = 0;
1599 channelData.Get( info );
1600
1601 if (!info)
1602 return XRootDStatus(stFatal, errInternal);
1603
1604 XrdSysMutexHelper scopedLock( info->mutex );
1605
1606 switch( query )
1607 {
1608 //------------------------------------------------------------------------
1609 // Protocol name
1610 //------------------------------------------------------------------------
1612 result.Set( (const char*)"XRootD", false );
1613 return Status();
1614
1615 //------------------------------------------------------------------------
1616 // Authentication
1617 //------------------------------------------------------------------------
1619 result.Set( new std::string( info->authProtocolName ), false );
1620 return Status();
1621
1622 //------------------------------------------------------------------------
1623 // Server flags
1624 //------------------------------------------------------------------------
1626 result.Set( new int( info->serverFlags ), false );
1627 return Status();
1628
1629 //------------------------------------------------------------------------
1630 // Protocol version
1631 //------------------------------------------------------------------------
1633 result.Set( new int( info->protocolVersion ), false );
1634 return Status();
1635
1637 result.Set( new bool( info->encrypted ), false );
1638 return Status();
1639 };
1640 return Status( stError, errQueryNotSupported );
1641 }
const uint16_t errQueryNotSupported
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted

References XrdCl::TransportQuery::Auth, XrdCl::XRootDChannelInfo::authProtocolName, XrdCl::XRootDChannelInfo::encrypted, XrdCl::errInternal, XrdCl::errQueryNotSupported, XrdCl::AnyObject::Get(), XrdCl::XRootDQuery::IsEncrypted, XrdCl::XRootDChannelInfo::mutex, XrdCl::TransportQuery::Name, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::XRootDChannelInfo::protocolVersion, XrdCl::XRootDQuery::ServerFlags, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::AnyObject::Set(), XrdCl::stError, and XrdCl::stFatal.

Here is the call graph for this function:

◆ SetDescription()

void XrdCl::XRootDTransport::SetDescription ( Message * msg)
inlinestatic

Get the description of a message.

Definition at line 245 of file XrdClXRootDTransport.hh.

246 {
247 std::ostringstream o;
248 GenerateDescription( msg->GetBuffer(), o );
249 msg->SetDescription( o.str() );
250 }

References GenerateDescription(), XrdCl::Buffer::GetBuffer(), and XrdCl::Message::SetDescription().

Referenced by XrdCl::FileStateHandler::Checkpoint(), XrdCl::FileStateHandler::ChkptWrt(), XrdCl::FileStateHandler::ChkptWrtV(), XrdCl::FileSystem::ChMod(), XrdCl::FileStateHandler::Clone(), XrdCl::FileStateHandler::Close(), XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::Fcntl(), XrdCl::FileSystem::Locate(), XrdCl::FileSystem::MkDir(), XrdCl::FileSystem::Mv(), XrdCl::FileStateHandler::PgReadImpl(), XrdCl::FileStateHandler::PgWriteImpl(), XrdCl::FileSystem::Ping(), XrdCl::FileSystem::Prepare(), XrdCl::FileStateHandler::PreRead(), XrdCl::FileSystem::Protocol(), XrdCl::FileSystem::Query(), XrdCl::FileStateHandler::Read(), XrdCl::FileStateHandler::ReadV(), XrdCl::MessageUtils::RewriteCGIAndPath(), XrdCl::FileSystem::Rm(), XrdCl::FileSystem::RmDir(), XrdCl::FileStateHandler::Stat(), XrdCl::FileSystem::Stat(), XrdCl::FileSystem::StatVFS(), XrdCl::FileStateHandler::Sync(), XrdCl::FileStateHandler::Truncate(), XrdCl::FileSystem::Truncate(), XrdCl::FileStateHandler::VectorRead(), XrdCl::FileStateHandler::VectorWrite(), XrdCl::FileStateHandler::Visa(), XrdCl::FileStateHandler::Write(), and XrdCl::FileStateHandler::WriteV().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ SubStreamNumber()

uint16_t XrdCl::XRootDTransport::SubStreamNumber ( AnyObject & channelData)
virtual

Return a number of substreams per stream that should be created.

Implements XrdCl::TransportHandler.

Definition at line 1044 of file XrdClXRootDTransport.cc.

1045 {
1046 XRootDChannelInfo *info = 0;
1047 channelData.Get( info );
1048
1049 if (!info) {
1050 DefaultEnv::GetLog()->Error(XRootDTransportMsg, "Internal error: no channel info");
1051 return 1;
1052 }
1053
1054 XrdSysMutexHelper scopedLock( info->mutex );
1055
1056 //--------------------------------------------------------------------------
1057 // If the connection has been opened in order to orchestrate a TPC or
1058 // the remote server is a Manager or Metamanager we will need only one
1059 // (control) stream.
1060 //--------------------------------------------------------------------------
1061 if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1062
1063 //--------------------------------------------------------------------------
1064 // Number of streams requested by user
1065 //--------------------------------------------------------------------------
1066 uint16_t ret = info->stream.size();
1067
1068 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
1069 int nodata = DefaultTlsNoData;
1070 env->GetInt( "TlsNoData", nodata );
1071
1072 // Does the server require the stream 0 to be encrypted?
1073 bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1074 ( info->serverFlags & kXR_tlsLogin ) ||
1075 ( info->serverFlags & kXR_tlsSess );
1076 // Does the server NOT require the data streams to be encrypted?
1077 bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1078 // Does the user require the stream 0 to be encrypted?
1079 bool usrTlsStrm0 = info->encrypted;
1080 // Does the user NOT require the data streams to be encrypted?
1081 bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1082
1083 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1084 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1085 {
1086 //------------------------------------------------------------------------
1087 // The server or user asked us to encrypt stream 0, but to send the data
1088 // (read/write) using a plain TCP connection
1089 //------------------------------------------------------------------------
1090 if( ret == 1 ) ++ret;
1091 }
1092
1093 if( ret > info->stream.size() )
1094 {
1095 info->stream.resize( ret );
1096 info->strmSelector->AdjustQueues( ret );
1097 }
1098
1099 return ret;
1100 }
const int DefaultTlsNoData

References XrdCl::DefaultTlsNoData, XrdCl::XRootDChannelInfo::encrypted, XrdCl::Log::Error(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetEnv(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::XRootDChannelInfo::istpc, kXR_gotoTLS, kXR_isServer, kXR_tlsData, kXR_tlsLogin, kXR_tlsSess, XrdCl::XRootDChannelInfo::mutex, XrdCl::XRootDChannelInfo::serverFlags, XrdCl::XRootDChannelInfo::stream, XrdCl::XRootDChannelInfo::strmSelector, and XrdCl::XRootDTransportMsg.

Here is the call graph for this function:

◆ UnMarchalStatusMore()

XRootDStatus XrdCl::XRootDTransport::UnMarchalStatusMore ( Message & msg)
static

Unmarshall the correction-segment of the status response for pgwrite.

Definition at line 1451 of file XrdClXRootDTransport.cc.

1452 {
1453 ServerResponseV2 *rsp = (ServerResponseV2*)msg.GetBuffer();
1454 uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1455
1456 switch( reqType )
1457 {
1458 case kXR_pgwrite:
1459 {
1460 //--------------------------------------------------------------------------
1461 // If there's no additional data there's nothing to unmarshal
1462 //--------------------------------------------------------------------------
1463 if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1464 //--------------------------------------------------------------------------
1465 // If there's not enough data to form correction-segment report an error
1466 //--------------------------------------------------------------------------
1467 if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1468 return XRootDStatus( stError, errInvalidMessage, 0,
1469 "kXR_status: invalid message size." );
1470
1471 //--------------------------------------------------------------------------
1472 // Calculate the crc32c for the additional data
1473 //--------------------------------------------------------------------------
1474 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)msg.GetBuffer( sizeof( ServerResponseV2 ) );
1475 cse->cseCRC = ntohl( cse->cseCRC );
1476 size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1477 void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1478 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1479
1480 //--------------------------------------------------------------------------
1481 // Do the integrity checks
1482 //--------------------------------------------------------------------------
1483 if( crcval != cse->cseCRC )
1484 {
1485 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1486 "corrupted (crc32c integrity check failed)." );
1487 }
1488
1489 cse->dlFirst = ntohs( cse->dlFirst );
1490 cse->dlLast = ntohs( cse->dlLast );
1491
1492 size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1493 sizeof( kXR_int64 );
1494 kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1495 sizeof( ServerResponseBody_pgWrCSE ) );
1496
1497 for( size_t i = 0; i < pgcnt; ++i )
1498 pgoffs[i] = ntohll( pgoffs[i] );
1499
1500 return XRootDStatus();
1501 break;
1502 }
1503
1504 default:
1505 break;
1506 }
1507
1508 return XRootDStatus( stError, errNotSupported );
1509 }
ServerResponseStatus status
@ kXR_1stRequest
Definition XProtocol.hh:112
long long kXR_int64
Definition XPtypes.hh:98
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
const uint16_t errNotSupported

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_pgWrCSE::cseCRC, ServerResponseBody_Status::dlen, ServerResponseBody_pgWrCSE::dlFirst, ServerResponseBody_pgWrCSE::dlLast, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::errNotSupported, XrdCl::Buffer::GetBuffer(), kXR_1stRequest, kXR_pgwrite, ServerResponseBody_Status::requestid, ServerResponseV2::status, and XrdCl::stError.

Referenced by GetMore().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshallBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshallBody ( Message * msg,
uint16_t reqType )
static

Unmarshall the body of the incoming message.

Definition at line 1297 of file XrdClXRootDTransport.cc.

1298 {
1299 ServerResponse *m = (ServerResponse *)msg->GetBuffer();
1300
1301 //--------------------------------------------------------------------------
1302 // kXR_ok
1303 //--------------------------------------------------------------------------
1304 if( m->hdr.status == kXR_ok )
1305 {
1306 switch( reqType )
1307 {
1308 //----------------------------------------------------------------------
1309 // kXR_protocol
1310 //----------------------------------------------------------------------
1311 case kXR_protocol:
1312 if( m->hdr.dlen < 8 )
1313 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1314 m->body.protocol.pval = ntohl( m->body.protocol.pval );
1315 m->body.protocol.flags = ntohl( m->body.protocol.flags );
1316 break;
1317 }
1318 }
1319 //--------------------------------------------------------------------------
1320 // kXR_error
1321 //--------------------------------------------------------------------------
1322 else if( m->hdr.status == kXR_error )
1323 {
1324 if( m->hdr.dlen < 4 )
1325 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1326 m->body.error.errnum = ntohl( m->body.error.errnum );
1327 }
1328
1329 //--------------------------------------------------------------------------
1330 // kXR_wait
1331 //--------------------------------------------------------------------------
1332 else if( m->hdr.status == kXR_wait )
1333 {
1334 if( m->hdr.dlen < 4 )
1335 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1336 m->body.wait.seconds = htonl( m->body.wait.seconds );
1337 }
1338
1339 //--------------------------------------------------------------------------
1340 // kXR_redirect
1341 //--------------------------------------------------------------------------
1342 else if( m->hdr.status == kXR_redirect )
1343 {
1344 if( m->hdr.dlen < 4 )
1345 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1346 m->body.redirect.port = htonl( m->body.redirect.port );
1347 }
1348
1349 //--------------------------------------------------------------------------
1350 // kXR_waitresp
1351 //--------------------------------------------------------------------------
1352 else if( m->hdr.status == kXR_waitresp )
1353 {
1354 if( m->hdr.dlen < 4 )
1355 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1356 m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1357 }
1358
1359 //--------------------------------------------------------------------------
1360 // kXR_attn
1361 //--------------------------------------------------------------------------
1362 else if( m->hdr.status == kXR_attn )
1363 {
1364 if( m->hdr.dlen < 4 )
1365 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1366 m->body.attn.actnum = htonl( m->body.attn.actnum );
1367 }
1368
1369 return XRootDStatus();
1370 }
@ kXR_redirect
Definition XProtocol.hh:946
@ kXR_error
Definition XProtocol.hh:945

References ServerResponse::body, ServerResponseHeader::dlen, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), ServerResponse::hdr, kXR_attn, kXR_error, kXR_ok, kXR_protocol, kXR_redirect, kXR_wait, kXR_waitresp, ServerResponseHeader::status, and XrdCl::stError.

Referenced by XrdCl::XRootDMsgHandler::Process().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshallHeader()

void XrdCl::XRootDTransport::UnMarshallHeader ( Message & msg)
static

Unmarshall the header incoming message.

Definition at line 1514 of file XrdClXRootDTransport.cc.

1515 {
1516 ServerResponseHeader *header = (ServerResponseHeader *)msg.GetBuffer();
1517 header->status = ntohs( header->status );
1518 header->dlen = ntohl( header->dlen );
1519 }

References ServerResponseHeader::dlen, XrdCl::Buffer::GetBuffer(), and ServerResponseHeader::status.

Referenced by GetHeader().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshallRequest()

XRootDStatus XrdCl::XRootDTransport::UnMarshallRequest ( Message * msg)
static

Unmarshall the request - sometimes the requests need to be rewritten, so we need to unmarshall them

Definition at line 1276 of file XrdClXRootDTransport.cc.

1277 {
1278 if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1279 // We rely on the marshaling process to be symmetric!
1280 // First we unmarshall the request ID and the length because
1281 // MarshallRequest() relies on these, and then we need to unmarshall these
1282 // two again, because they get marshalled in MarshallRequest().
1283 // All this is pretty damn ugly and should be rewritten.
1284 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1285 req->header.requestid = htons( req->header.requestid );
1286 req->header.dlen = htonl( req->header.dlen );
1287 XRootDStatus st = MarshallRequest( msg );
1288 req->header.requestid = htons( req->header.requestid );
1289 req->header.dlen = htonl( req->header.dlen );
1290 msg->SetIsMarshalled( false );
1291 return st;
1292 }
const uint16_t suAlreadyDone

References ClientRequestHdr::dlen, XrdCl::Buffer::GetBuffer(), ClientRequest::header, XrdCl::Message::IsMarshalled(), MarshallRequest(), ClientRequestHdr::requestid, XrdCl::Message::SetIsMarshalled(), XrdCl::stOK, and XrdCl::suAlreadyDone.

Referenced by MultiplexSubStream(), XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnMarshalStatusBody()

XRootDStatus XrdCl::XRootDTransport::UnMarshalStatusBody ( Message & msg,
uint16_t reqType )
static

Unmarshall the body of the status response.

Definition at line 1375 of file XrdClXRootDTransport.cc.

1376 {
1377 //--------------------------------------------------------------------------
1378 // Calculate the crc32c before the unmarshaling the body!
1379 //--------------------------------------------------------------------------
1380 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
1381 char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1382 size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1383 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1384
1385 size_t stlen = sizeof( ServerResponseStatus );
1386 switch( reqType )
1387 {
1388 case kXR_pgread:
1389 {
1390 stlen += sizeof( ServerResponseBody_pgRead );
1391 break;
1392 }
1393
1394 case kXR_pgwrite:
1395 {
1396 stlen += sizeof( ServerResponseBody_pgWrite );
1397 break;
1398 }
1399 }
1400
1401 if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1402 "kXR_status: invalid message size." );
1403
1404 rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1405 rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1406
1407 switch( reqType )
1408 {
1409 case kXR_pgread:
1410 {
1411 ServerResponseBody_pgRead *pgrdbdy = (ServerResponseBody_pgRead*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1412 pgrdbdy->offset = ntohll( pgrdbdy->offset );
1413 break;
1414 }
1415
1416 case kXR_pgwrite:
1417 {
1418 ServerResponseBody_pgWrite *pgwrtbdy = (ServerResponseBody_pgWrite*)msg.GetBuffer( sizeof( ServerResponseStatus ) );
1419 pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1420 break;
1421 }
1422 }
1423
1424 //--------------------------------------------------------------------------
1425 // Do the integrity checks
1426 //--------------------------------------------------------------------------
1427 if( crcval != rspst->bdy.crc32c )
1428 {
1429 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1430 "corrupted (crc32c integrity check failed)." );
1431 }
1432
1433 if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1434 rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1435 {
1436 return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1437 "(stream ID mismatch)." );
1438 }
1439
1440
1441
1442 if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1443 {
1444 return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1445 "(request ID mismatch)." );
1446 }
1447
1448 return XRootDStatus();
1449 }
struct ServerResponseHeader hdr

References ServerResponseStatus::bdy, XrdOucCRC::Calc32C(), ServerResponseBody_Status::crc32c, ServerResponseBody_Status::dlen, ServerResponseHeader::dlen, XrdCl::errDataError, XrdCl::errInvalidMessage, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetSize(), ServerResponseStatus::hdr, kXR_1stRequest, kXR_pgread, kXR_pgwrite, ServerResponseBody_pgRead::offset, ServerResponseBody_pgWrite::offset, ServerResponseBody_Status::requestid, XrdCl::stError, ServerResponseBody_Status::streamID, and ServerResponseHeader::streamid.

Referenced by XrdCl::XRootDMsgHandler::InspectStatusRsp().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ WaitBeforeExit()

void XrdCl::XRootDTransport::WaitBeforeExit ( )
virtual

Wait until the program can safely exit.

Implements XrdCl::TransportHandler.

Definition at line 1839 of file XrdClXRootDTransport.cc.

1840 {
1841 XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1842 pSecUnloadHandler->unloaded = true;
1843 }

◆ PluginUnloadHandler

friend struct PluginUnloadHandler
friend

Definition at line 432 of file XrdClXRootDTransport.hh.

References PluginUnloadHandler.

Referenced by XRootDTransport(), and PluginUnloadHandler.


The documentation for this class was generated from the following files: