2431 {
2433
2434 std::string checkSumMode;
2435 std::string checkSumType;
2436 std::string checkSumPreset;
2437 std::string zipSource;
2438 uint16_t parallelChunks;
2439 uint32_t chunkSize;
2440 uint64_t blockSize;
2441 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2442 rmOnBadCksum, continue_, zipappend, doserver;
2443 int32_t nbXcpSources;
2444 long long xRate;
2445 long long xRateThreshold;
2446 time_t cpTimeout;
2447 std::vector<std::string> addcksums;
2448
2451 pProperties->Get(
"checkSumPreset", checkSumPreset );
2452 pProperties->Get(
"parallelChunks", parallelChunks );
2458 pProperties->Get(
"dynamicSource", dynamicSource );
2462 pProperties->Get(
"preserveXAttr", preserveXAttr );
2464 pProperties->Get(
"xrateThreshold", xRateThreshold );
2471
2472 if( zip )
2474
2475 if( xcp )
2477
2478 if( force && continue_ )
2480 "Invalid argument combination: continue + force." );
2481
2482 if( zipappend && ( continue_ || force ) )
2484 "Invalid argument combination: ( continue | force ) + zip-append." );
2485
2486
2487
2488
2489 std::unique_ptr<timer_sec_t> cptimer;
2490 if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2491
2492
2493
2494
2495 if( rmOnBadCksum ) posc = true;
2496
2497
2498
2499
2500 if( checkSumType == "auto" )
2501 {
2503 if( checkSumType.empty() )
2505 else
2506 log->Info(
UtilityMsg,
"Using inferred checksum type: %s.", checkSumType.c_str() );
2507 }
2508
2509 if( cptimer && cptimer->elapsed() > cpTimeout )
2511
2512
2513
2514
2515 std::unique_ptr<Source> src;
2516 if( xcp )
2517 src.reset(
new XRootDSourceXCp( &
GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2518 else if( zip )
2519 src.reset(
new XRootDSourceZip( zipSource, &
GetSource(), chunkSize, parallelChunks,
2520 checkSumType, addcksums , doserver) );
2521 else if(
GetSource().GetProtocol() ==
"stdio" )
2522 src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2523 else
2524 {
2525 if( dynamicSource )
2526 src.reset(
new XRootDSourceDynamic( &
GetSource(), chunkSize, checkSumType, addcksums ) );
2527 else
2528 src.reset(
new XRootDSource( &
GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2529 }
2530
2531 XRootDStatus st = src->Initialize();
2532 if( !st.IsOK() ) return SourceError( st );
2533 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2534
2535 if( cptimer && cptimer->elapsed() > cpTimeout )
2537
2538 std::unique_ptr<Destination> dest;
2540
2541 if(
GetTarget().GetProtocol() ==
"stdio" )
2542 dest.reset( new StdOutDestination( checkSumType ) );
2543 else if( zipappend )
2544 {
2546 size_t pos = fn.rfind( '/' );
2547 if( pos != std::string::npos )
2548 fn = fn.substr( pos + 1 );
2549 int64_t size = src->GetSize();
2550 dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2551 }
2552
2553
2554
2555 else
2556 {
2557 if( src->GetSize() >= 0 )
2558 {
2560 std::ostringstream o; o << src->GetSize();
2561 params["oss.asize"] = o.str();
2562 newDestUrl.SetParams( params );
2563
2564 }
2565 dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2566 }
2567
2568 dest->SetForce( force );
2569 dest->SetPOSC( posc );
2570 dest->SetCoerce( coerce );
2571 dest->SetMakeDir( makeDir );
2572 dest->SetContinue( continue_ );
2573 st = dest->Initialize();
2574 if( !st.IsOK() ) return DestinationError( st );
2575
2576 if( cptimer && cptimer->elapsed() > cpTimeout )
2578
2579
2580
2581
2582 if( continue_ )
2583 {
2584 size -= dest->GetSize();
2585 XrdCl::XRootDStatus st = src->StartAt( dest->GetSize() );
2586 if( !st.
IsOK() )
return SetResult( st );
2587 }
2588
2589 PageInfo pageInfo;
2590 uint64_t total_processed = 0;
2591 uint64_t processed = 0;
2593 uint16_t threshold_interval = parallelChunks;
2594 bool threshold_draining = false;
2595 timer_nsec_t threshold_timer;
2596 while( 1 )
2597 {
2598 st = src->GetChunk( pageInfo );
2600 return SourceError( st);
2601
2603 break;
2604
2605 if( cptimer && cptimer->elapsed() > cpTimeout )
2607
2608 if( xRate )
2609 {
2610 auto elapsed = (
time_nsec() - start ).count();
2611 double transferred = total_processed + pageInfo.GetLength();
2612 double expected = double( xRate ) /
to_nsec( 1 ) * elapsed;
2613
2614
2615
2616
2617 if( elapsed &&
2618 transferred > expected )
2619 {
2620 auto nsec = ( transferred / xRate *
to_nsec( 1 ) ) - elapsed;
2622 }
2623 }
2624
2625 if( xRateThreshold )
2626 {
2627 auto elapsed = threshold_timer.elapsed();
2628 double transferred = processed + pageInfo.GetLength();
2629 double expected = double( xRateThreshold ) /
to_nsec( 1 ) * elapsed;
2630
2631
2632
2633
2634 if( elapsed &&
2635 transferred < expected &&
2636 threshold_interval == 0 )
2637 {
2638 if( !threshold_draining )
2639 {
2640 log->Warning(
UtilityMsg,
"Transfer rate dropped below requested ehreshold,"
2641 " trying different source!" );
2642 XRootDStatus st = src->TryOtherServer();
2644 "The transfer rate dropped below "
2645 "requested threshold!" );
2646 threshold_draining = true;
2647
2648 }
2649 else
2650 {
2651 processed = 0;
2652 threshold_timer.reset();
2653 threshold_interval = parallelChunks;
2654 threshold_draining = false;
2655 }
2656 }
2657
2658 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2659 }
2660
2661 total_processed += pageInfo.GetLength();
2662 processed += pageInfo.GetLength();
2663
2664 st = dest->PutChunk( std::move( pageInfo ) );
2665 if( !st.IsOK() )
2666 {
2668 {
2669 pResults->Set(
"LastURL", dest->GetLastURL() );
2670 pResults->Set(
"WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2671 return SetResult( st );
2672 }
2673 return DestinationError( st );
2674 }
2675
2676 if( progress )
2677 {
2678 progress->JobProgress(
pJobId, total_processed, size );
2679 if( progress->ShouldCancel(
pJobId ) )
2681 }
2682 }
2683
2684 st = dest->Flush();
2685 if( !st.IsOK() )
2686 return DestinationError( st );
2687
2688
2689
2690
2692 {
2693 std::vector<xattr_t> xattrs;
2694 st = src->GetXAttr( xattrs );
2695 if( !st.IsOK() ) return SourceError( st );
2696 st = dest->SetXAttr( xattrs );
2697 if( !st.IsOK() ) return DestinationError( st );
2698 }
2699
2700
2701
2702
2703
2704 if( src->GetSize() >= 0 && size != total_processed )
2705 {
2706 log->Error(
UtilityMsg,
"The declared source size is %llu bytes, but "
2707 "received %llu bytes.", (unsigned long long) size, (unsigned long long) total_processed );
2709 }
2710 pResults->Set(
"size", total_processed );
2711
2712
2713
2714
2715 st = dest->Finalize();
2716 if( !st.IsOK() )
2717 return DestinationError( st );
2718
2719
2720
2721
2722 if( checkSumMode != "none" )
2723 {
2724 log->Debug(
UtilityMsg,
"Attempting checksum calculation, mode: %s.",
2725 checkSumMode.c_str() );
2726 std::string sourceCheckSum;
2727 std::string targetCheckSum;
2728
2729 if( cptimer && cptimer->elapsed() > cpTimeout )
2731
2732
2733
2734
2735 timeval oStart, oEnd;
2736 XRootDStatus st;
2737
2738 if( checkSumMode == "end2end" || checkSumMode == "source" ||
2739 !checkSumPreset.empty() )
2740 {
2741 gettimeofday( &oStart, 0 );
2742 if( !checkSumPreset.empty() )
2743 {
2744 sourceCheckSum = checkSumType + ":";
2746 checkSumPreset );
2747 }
2748 else
2749 {
2750 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2751 }
2752 gettimeofday( &oEnd, 0 );
2753
2754 if( !st.IsOK() )
2755 return SourceError( st );
2756
2757 pResults->Set(
"sourceCheckSum", sourceCheckSum );
2758 }
2759
2760 if( !addcksums.empty() )
2761 pResults->Set(
"additionalCkeckSum", src->GetAddCks() );
2762
2763 if( cptimer && cptimer->elapsed() > cpTimeout )
2765
2766
2767
2768
2769 timeval tStart, tEnd;
2770
2771 if( checkSumMode == "end2end" || checkSumMode == "target" )
2772 {
2773 gettimeofday( &tStart, 0 );
2774 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2775 if( !st.IsOK() )
2776 return DestinationError( st );
2777 gettimeofday( &tEnd, 0 );
2778 pResults->Set(
"targetCheckSum", targetCheckSum );
2779 }
2780
2781 if( cptimer && cptimer->elapsed() > cpTimeout )
2783
2784
2785
2786
2787 auto sanitize_cksum = []( char c )
2788 {
2789 std::locale loc;
2790 if( std::isalpha( c ) ) return std::tolower( c, loc );
2791 return c;
2792 };
2793
2794 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2795 sourceCheckSum.begin(), sanitize_cksum );
2796
2797 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2798 targetCheckSum.begin(), sanitize_cksum );
2799
2800
2801
2802
2803 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2804 {
2805 bool match = false;
2806 if( sourceCheckSum == targetCheckSum )
2807 match = true;
2808
2810 if( mon )
2811 {
2812 Monitor::CheckSumInfo i;
2815 i.cksum = sourceCheckSum;
2818 i.isOK = match;
2820 }
2821
2822 if( !match )
2823 {
2824 if( rmOnBadCksum )
2825 {
2826 FileSystem fs( newDestUrl );
2827 st = fs.Rm( newDestUrl.GetPath() );
2828 if( !st.IsOK() )
2829 log->Error(
UtilityMsg,
"Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2830 else
2831 log->Info(
UtilityMsg,
"Target file removed due to bad checksum!" );
2832 }
2833
2834 st = dest->Finalize();
2835 if( !st.IsOK() )
2836 log->Error(
UtilityMsg,
"Failed to finalize the destination: %s", st.ToString().c_str() );
2837
2839 }
2840
2841 log->Info(
UtilityMsg,
"Checksum verification: succeeded." );
2842 }
2843 }
2844
2845 return SetResult();
2846 }
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
PropertyList * pProperties
static Monitor * GetMonitor()
Get the monitor object.
@ EvCheckSum
CheckSumInfo: File checksummed.
const std::string & GetPath() const
Get the path.
std::map< std::string, std::string > ParamsMap
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static bool HasXAttr(const XrdCl::URL &url)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidArgs
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.