1086 {
1087 int max_pending = 50;
1089 m_continue_queue.reset(new HandlerQueue(max_pending));
1090 auto &queue = *m_queue.get();
1092
1093 CURLM *multi_handle = curl_multi_init();
1094 if (multi_handle == nullptr) {
1095 throw std::runtime_error("Failed to create curl multi-handle");
1096 }
1097
1098 int running_handles = 0;
1099 time_t last_maintenance = time(NULL);
1100 CURLMcode mres = CURLM_OK;
1101
1102
1103
1104 std::unordered_map<int, WaitingForBroker> broker_reqs;
1105 std::vector<struct curl_waitfd> waitfds;
1106
1107 bool want_shutdown = false;
1108 while (!want_shutdown) {
1109 m_last_completed_cycle.store(std::chrono::system_clock::now().time_since_epoch().count());
1110 auto oldest_op = std::chrono::system_clock::now();
1111 for (const auto &entry : m_op_map) {
1112 OpRecord(*entry.second.first, OpKind::Update);
1113 if (entry.second.second < oldest_op) {
1114 oldest_op = entry.second.second;
1115 }
1116 }
1117 m_oldest_op.store(oldest_op.time_since_epoch().count());
1118
1119
1120 while (true) {
1121 auto op = m_continue_queue->TryConsume();
1122 if (!op) {
1123 break;
1124 }
1125
1126
1127 if (op->IsDone()) {
1128 m_logger->Debug(
kLogXrdClHttp,
"Ignoring continuation of operation that has already completed");
1129 continue;
1130 }
1131 m_logger->Debug(
kLogXrdClHttp,
"Continuing the curl handle from op %p on thread %d", op.get(), getthreadid());
1132 auto curl = op->GetCurlHandle();
1133 if (!op->ContinueHandle()) {
1134 op->Fail(
XrdCl::errInternal, 0,
"Failed to continue the curl handle for the operation");
1135 OpRecord(*op, OpKind::Error);
1136 op->ReleaseHandle();
1137 if (curl) {
1138 curl_multi_remove_handle(multi_handle, curl);
1139 curl_easy_cleanup(curl);
1140 m_op_map.erase(curl);
1141 }
1142 running_handles -= 1;
1143 continue;
1144 } else {
1145 auto iter = m_op_map.find(curl);
1146 if (iter != m_op_map.end()) iter->second.second = std::chrono::system_clock::now();
1147 }
1148 }
1149
1150 while (running_handles < static_cast<int>(m_max_ops)) {
1151 auto op = running_handles == 0 ? queue.Consume(std::chrono::seconds(1)) : queue.TryConsume();
1152 if (!op) {
1153 break;
1154 }
1155 auto curl = queue.GetHandle();
1156 if (curl == nullptr) {
1157 m_logger->Debug(
kLogXrdClHttp,
"Unable to allocate a curl handle");
1159 continue;
1160 }
1161 try {
1162 auto rv = op->Setup(curl, *this);
1163 if (!rv) {
1164 m_logger->Debug(
kLogXrdClHttp,
"Failed to setup the curl handle");
1165 op->Fail(
XrdCl::errInternal, ENOMEM,
"Failed to setup the curl handle for the operation");
1166 continue;
1167 }
1168 if (!op->FinishSetup(curl)) {
1169 m_logger->Debug(
kLogXrdClHttp,
"Failed to finish setup of the curl handle");
1170 op->Fail(
XrdCl::errInternal, ENOMEM,
"Failed to finish setup of the curl handle for the operation");
1171 continue;
1172 }
1173 } catch (...) {
1174 m_logger->Debug(
kLogXrdClHttp,
"Unable to setup the curl handle");
1175 op->Fail(
XrdCl::errInternal, ENOMEM,
"Failed to setup the curl handle for the operation");
1176 continue;
1177 }
1178 op->SetContinueQueue(m_continue_queue);
1179
1180 if (op->IsDone()) {
1181 continue;
1182 }
1183 m_op_map[curl] = {op, std::chrono::system_clock::now()};
1184
1185
1186
1187 if (op->RequiresOptions()) {
1188 std::string modified_url;
1189 std::shared_ptr<CurlOptionsOp> options_op(
1190 new CurlOptionsOp(
1191 curl, op,
1192 std::string(
1194 ),
1195 m_logger, op->GetConnCalloutFunc()
1196 )
1197 );
1198
1199
1200
1201 curl = queue.GetHandle();
1202 if (curl == nullptr) {
1203 m_logger->Debug(
kLogXrdClHttp,
"Unable to allocate a curl handle");
1205 OpRecord(*op, OpKind::Error);
1206 continue;
1207 }
1208 auto rv = options_op->Setup(curl, *this);
1209 if (!rv) {
1210 m_logger->Debug(
kLogXrdClHttp,
"Failed to allocate a curl handle for OPTIONS");
1211 continue;
1212 }
1213 m_op_map[curl] = {options_op, std::chrono::system_clock::now()};
1214 OpRecord(*options_op, OpKind::Start);
1215 running_handles += 1;
1216 } else {
1217 OpRecord(*op, OpKind::Start);
1218 }
1219
1220 auto mres = curl_multi_add_handle(multi_handle, curl);
1221 if (mres != CURLM_OK) {
1222 m_logger->Debug(
kLogXrdClHttp,
"Unable to add operation to the curl multi-handle");
1223 op->Fail(
XrdCl::errInternal, mres,
"Unable to add operation to the curl multi-handle");
1224 OpRecord(*op, OpKind::Error);
1225 continue;
1226 }
1227 m_logger->Debug(
kLogXrdClHttp,
"Added request for URL %s to worker thread for processing", op->GetUrl().c_str());
1228 running_handles += 1;
1229 }
1230
1231
1232
1233 time_t now = time(NULL);
1234 time_t next_maintenance = last_maintenance + m_maintenance_period.load(std::memory_order_relaxed);
1235 if (now >= next_maintenance) {
1236 m_queue->Expire();
1237 m_continue_queue->Expire();
1238 m_logger->Debug(
kLogXrdClHttp,
"Curl worker thread %d is running %d operations",
1239 getthreadid(), running_handles);
1240 last_maintenance = now;
1241
1242
1243 std::vector<std::pair<int, CURL *>> expired_ops;
1244 for (const auto &entry : broker_reqs) {
1245 if (entry.second.expiry < now) {
1246 expired_ops.emplace_back(entry.first, entry.second.curl);
1247 }
1248 }
1249 for (const auto &entry : expired_ops) {
1250 auto iter = m_op_map.find(entry.second);
1251 if (iter == m_op_map.end()) {
1252 m_logger->Warning(
kLogXrdClHttp,
"Found an expired curl handle with no corresponding operation!");
1253 } else {
1254
1255 CurlOptionsOp *options_op = nullptr;
1256 if ((options_op = dynamic_cast<CurlOptionsOp*>(iter->second.first.get())) != nullptr) {
1258 bool parent_op_failed = false;
1259 if (parent_op->IsRedirect()) {
1260 std::string target;
1263 if (iter != m_op_map.end()) {
1264 OpRecord(*iter->second.first, OpKind::Error);
1266 m_op_map.erase(iter);
1267 running_handles -= 1;
1268 }
1269 parent_op_failed = true;
1270 } else {
1271 OpRecord(*parent_op, OpKind::Start);
1272 }
1273 } else {
1274 OpRecord(*parent_op, OpKind::Start);
1275 }
1276 if (!parent_op_failed){
1278 }
1279 }
1280
1282 iter->second.first->ReleaseHandle();
1283 OpRecord(*(iter->second.first), OpKind::ConncallTimeout);
1284 m_op_map.erase(entry.second);
1285 curl_easy_cleanup(entry.second);
1286 running_handles -= 1;
1287 }
1288 broker_reqs.erase(entry.first);
1289 m_conncall_timeout.fetch_add(1, std::memory_order_relaxed);
1290 }
1291
1292
1294 }
1295
1296 waitfds.clear();
1297 waitfds.resize(3 + broker_reqs.size());
1298
1299 waitfds[0].fd = queue.PollFD();
1300 waitfds[0].events = CURL_WAIT_POLLIN;
1301 waitfds[0].revents = 0;
1302 waitfds[1].fd = m_continue_queue->PollFD();
1303 waitfds[1].events = CURL_WAIT_POLLIN;
1304 waitfds[1].revents = 0;
1305 waitfds[2].fd = m_shutdown_pipe_r;
1306 waitfds[2].revents = 0;
1307 waitfds[2].events = CURL_WAIT_POLLIN | CURL_WAIT_POLLPRI;
1308
1309 int idx = 3;
1310 for (const auto &entry : broker_reqs) {
1311 waitfds[idx].fd = entry.first;
1312 waitfds[idx].events = CURL_WAIT_POLLIN|CURL_WAIT_POLLPRI;
1313 waitfds[idx].revents = 0;
1314 idx += 1;
1315 }
1316
1317 long timeo;
1318 curl_multi_timeout(multi_handle, &timeo);
1319
1320
1321
1322 if (running_handles && timeo == -1) {
1323
1324
1325
1326
1327 mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50, nullptr);
1328 } else {
1329
1330
1331
1332
1333 mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50, nullptr);
1334 }
1335 if (mres != CURLM_OK) {
1336 m_logger->Warning(
kLogXrdClHttp,
"Failed to wait on multi-handle: %d", mres);
1337 }
1338
1339
1340 for (const auto &entry : waitfds) {
1341
1342 if (waitfds[0].fd == entry.fd || waitfds[1].fd == entry.fd) {
1343 continue;
1344 }
1345
1346 if ((waitfds[2].fd == entry.fd) && entry.revents) {
1347 want_shutdown = true;
1348 break;
1349 }
1350 if ((entry.revents & CURL_WAIT_POLLIN) != CURL_WAIT_POLLIN) {
1351 continue;
1352 }
1353 auto handle = broker_reqs[entry.fd].curl;
1354 auto iter = m_op_map.find(handle);
1355 if (iter == m_op_map.end()) {
1356 m_logger->Warning(
kLogXrdClHttp,
"Internal error: broker responded on FD %d but no corresponding curl operation", entry.fd);
1357 broker_reqs.erase(entry.fd);
1358 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1359 continue;
1360 }
1361 std::string err;
1362 auto result = iter->second.first->WaitSocketCallback(err);
1363 if (result == -1) {
1364 m_logger->Warning(
kLogXrdClHttp,
"Error when invoking the broker callback: %s", err.c_str());
1365
1366 CurlOptionsOp *options_op = nullptr;
1367 if ((options_op = dynamic_cast<CurlOptionsOp*>(iter->second.first.get())) != nullptr) {
1369 bool parent_op_failed = false;
1370 if (parent_op->IsRedirect()) {
1371 std::string target;
1374 if (iter != m_op_map.end()) {
1375 OpRecord(*iter->second.first, OpKind::Error);
1377 m_op_map.erase(iter);
1378 running_handles -= 1;
1379 }
1380 parent_op_failed = true;
1381 } else {
1382 OpRecord(*parent_op, OpKind::Start);
1383 }
1384 } else {
1385 OpRecord(*parent_op, OpKind::Start);
1386 }
1387 if (!parent_op_failed){
1389 }
1390 }
1391
1393 OpRecord(*iter->second.first, OpKind::Error);
1394 m_op_map.erase(handle);
1395 broker_reqs.erase(entry.fd);
1396 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1397 running_handles -= 1;
1398 } else {
1399 broker_reqs.erase(entry.fd);
1400 curl_multi_add_handle(multi_handle, handle);
1401 m_conncall_success.fetch_add(1, std::memory_order_relaxed);
1402 }
1403 }
1404
1405
1406 int still_running;
1407 auto mres = curl_multi_perform(multi_handle, &still_running);
1408 if (mres == CURLM_CALL_MULTI_PERFORM) {
1409 continue;
1410 } else if (mres != CURLM_OK) {
1411 m_logger->Warning(
kLogXrdClHttp,
"Failed to perform multi-handle operation: %d", mres);
1412 break;
1413 }
1414
1415 CURLMsg *msg;
1416 do {
1417 int msgq = 0;
1418 msg = curl_multi_info_read(multi_handle, &msgq);
1419 if (msg && (msg->msg == CURLMSG_DONE)) {
1420 if (!msg->easy_handle) {
1421 m_logger->Warning(
kLogXrdClHttp,
"Logic error: got a callback for a null handle");
1422 mres = CURLM_BAD_EASY_HANDLE;
1423 break;
1424 }
1425 auto iter = m_op_map.find(msg->easy_handle);
1426 if (iter == m_op_map.end()) {
1427 m_logger->Error(
kLogXrdClHttp,
"Logic error: got a callback for an entry that doesn't exist");
1428 mres = CURLM_BAD_EASY_HANDLE;
1429 break;
1430 }
1431 auto op = iter->second.first;
1432 auto res = msg->data.result;
1433 bool keep_handle = false;
1434 bool waiting_on_callout = false;
1435 if (res == CURLE_OK) {
1436 auto sc = op->GetStatusCode();
1437 OpRecord(*op, OpKind::Finish);
1440 op->Fail(httpErr.first, httpErr.second, op->GetStatusMessage());
1441 op->ReleaseHandle();
1442
1443
1444
1445 CurlOptionsOp *options_op = nullptr;
1446 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1448 bool parent_op_failed = false;
1449 if (parent_op->IsRedirect()) {
1450 std::string target;
1452 OpRecord(*parent_op, OpKind::Error);
1454 running_handles -= 1;
1455 parent_op_failed = true;
1456 } else {
1457 OpRecord(*parent_op, OpKind::Start);
1458 }
1459 } else {
1460 OpRecord(*parent_op, OpKind::Start);
1461 }
1462
1463 if (!parent_op_failed) {
1465 }
1466 }
1467
1468 queue.RecycleHandle(iter->first);
1469 } else {
1470 CurlOptionsOp *options_op = nullptr;
1471
1472 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get()))) {
1475
1477 op->OptionsDone();
1478 OpRecord(*op, OpKind::Start);
1480 curl_multi_remove_handle(multi_handle, iter->first);
1481 queue.RecycleHandle(iter->first);
1482 }
1483
1484
1485
1486 if (op->IsRedirect()) {
1487 std::string target;
1488 switch (op->Redirect(target)) {
1490 if (options_op) {
1491
1492
1493
1494
1495
1496 OpRecord(*op, OpKind::Error);
1497 }
1498 keep_handle = false;
1499 break;
1501 if (!options_op) {
1502
1503
1504
1505 keep_handle = true;
1506 OpRecord(*op, OpKind::Start);
1507 }
1508 break;
1510 {
1511
1512
1513
1514
1515
1516 std::string modified_url;
1518 options_op = new CurlOptionsOp(iter->first, op, target, m_logger, op->GetConnCalloutFunc());
1519 std::shared_ptr<CurlOperation> new_op(options_op);
1520 auto curl = queue.GetHandle();
1521 if (curl == nullptr) {
1522 m_logger->Debug(
kLogXrdClHttp,
"Unable to allocate a curl handle");
1524 keep_handle = false;
1525 options_op = nullptr;
1526 break;
1527 }
1528 OpRecord(*new_op, OpKind::Start);
1529 try {
1530 auto rv = new_op->Setup(curl, *this);
1531 if (!rv) {
1532 m_logger->Debug(
kLogXrdClHttp,
"Unable to configure a curl handle for OPTIONS");
1533 keep_handle = false;
1534 options_op = nullptr;
1535 break;
1536 }
1537 } catch (...) {
1538 m_logger->Debug(
kLogXrdClHttp,
"Unable to setup the curl handle for the OPTIONS operation");
1539 new_op->Fail(
XrdCl::errInternal, ENOMEM,
"Failed to setup the curl handle for the OPTIONS operation");
1540 OpRecord(*new_op, OpKind::Error);
1541 keep_handle = false;
1542 break;
1543 }
1544 new_op->SetContinueQueue(m_continue_queue);
1545 m_op_map[curl] = {new_op, std::chrono::system_clock::now()};
1546 auto mres = curl_multi_add_handle(multi_handle, curl);
1547 if (mres != CURLM_OK) {
1548 m_logger->Debug(
kLogXrdClHttp,
"Unable to add OPTIONS operation to the curl multi-handle: %s", curl_multi_strerror(mres));
1549 op->Fail(
XrdCl::errInternal, mres,
"Unable to add OPTIONS operation to the curl multi-handle");
1550 OpRecord(*new_op, OpKind::Error);
1551 break;
1552 }
1553 running_handles += 1;
1554 m_logger->Debug(
kLogXrdClHttp,
"Invoking the OPTIONS operation before redirect to %s", target.c_str());
1555
1556
1557 keep_handle = true;
1558 }
1559 }
1560 int callout_socket = op->WaitSocket();
1561 if ((waiting_on_callout = callout_socket >= 0)) {
1562 auto expiry = time(nullptr) + 20;
1563 m_logger->Debug(
kLogXrdClHttp,
"Creating a callout wait request on socket %d", callout_socket);
1564 broker_reqs[callout_socket] = {iter->first, expiry};
1565 m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1566 }
1567 } else if (options_op) {
1568
1570 }
1571 if (keep_handle) {
1572 curl_multi_remove_handle(multi_handle, iter->first);
1573 if (!waiting_on_callout && !options_op) {
1574 curl_multi_add_handle(multi_handle, iter->first);
1575 }
1576 } else if (!options_op) {
1577 op->Success();
1578 op->ReleaseHandle();
1579
1580 queue.RecycleHandle(iter->first);
1581 }
1582 }
1583 } else if (res == CURLE_COULDNT_CONNECT && op->UseConnectionCallout() && !op->GetTriedBoker()) {
1584
1585
1586 keep_handle = true;
1587 op->SetTriedBoker();
1588 std::string err;
1589 int wait_socket = -1;
1590 if (!op->StartConnectionCallout(err) || (wait_socket=op->WaitSocket()) == -1) {
1591 m_logger->Error(
kLogXrdClHttp,
"Failed to start broker-based connection: %s", err.c_str());
1592 op->ReleaseHandle();
1593 keep_handle = false;
1594 } else {
1595 curl_multi_remove_handle(multi_handle, iter->first);
1596 auto expiry = time(nullptr) + 20;
1597 m_logger->Debug(
kLogXrdClHttp,
"Curl operation requires a new TCP socket; waiting for callout to respond on socket %d", wait_socket);
1598 broker_reqs[wait_socket] = {iter->first, expiry};
1599 m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1600 }
1601 } else {
1602 if (res == CURLE_ABORTED_BY_CALLBACK || res == CURLE_WRITE_ERROR) {
1603
1604
1605 switch (op->GetError()) {
1607#ifdef HAVE_XPROTOCOL_TIMEREXPIRED
1609#else
1611#endif
1612 OpRecord(*op, OpKind::Error);
1613 break;
1615 auto [ecode,
emsg] = op->GetCallbackError();
1617 OpRecord(*op, OpKind::Error);
1618 break;
1619 }
1622 OpRecord(*op, op->IsPaused() ? OpKind::ClientTimeout : OpKind::ServerTimeout);
1623 break;
1626 OpRecord(*op, OpKind::ServerTimeout);
1627 break;
1630 OpRecord(*op, OpKind::ClientTimeout);
1631 break;
1634 OpRecord(*op, OpKind::ServerTimeout);
1635 break;
1637 op->Fail(
XrdCl::errInternal, 0,
"Operation was aborted without recording an abort reason");
1638 OpRecord(*op, OpKind::Error);
1639 break;
1640 };
1641 CurlOptionsOp *options_op = nullptr;
1642 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1644 bool parent_op_failed = false;
1645 if (parent_op->IsRedirect()) {
1646 std::string target;
1649 if (iter != m_op_map.end()) {
1650 OpRecord(*iter->second.first, OpKind::Error);
1652 m_op_map.erase(iter);
1653 running_handles -= 1;
1654 }
1655 parent_op_failed = true;
1656 } else {
1657 OpRecord(*parent_op, OpKind::Start);
1658 }
1659 } else {
1660 OpRecord(*parent_op, OpKind::Start);
1661 }
1662 if (!parent_op_failed){
1664 }
1665 }
1666 } else {
1668 const auto curl_err = op->GetCurlErrorMessage();
1669 const char *curl_easy_err = curl_easy_strerror(res);
1670 const std::string fail_err = !curl_err.empty() ? curl_err : curl_easy_err;
1671 m_logger->Debug(
kLogXrdClHttp,
"Curl generated an error: %s (%d)", fail_err.c_str(), res);
1672 op->Fail(xrdCode.first, xrdCode.second, fail_err);
1673 OpRecord(*op, OpKind::Error);
1674 CurlOptionsOp *options_op = nullptr;
1675 if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1677 bool parent_op_failed = false;
1678 if (parent_op->IsRedirect()) {
1679 std::string target;
1682 if (iter != m_op_map.end()) {
1683 OpRecord(*iter->second.first, OpKind::Error);
1685 m_op_map.erase(iter);
1686 running_handles -= 1;
1687 }
1688 parent_op_failed = true;
1689 }
1690 }
1691 if (!parent_op_failed){
1693 }
1694 }
1695 }
1696 op->ReleaseHandle();
1697 }
1698 if (!keep_handle) {
1699 curl_multi_remove_handle(multi_handle, iter->first);
1700 if (res != CURLE_OK) {
1701 curl_easy_cleanup(iter->first);
1702 }
1703 for (auto &req : broker_reqs) {
1704 if (req.second.curl == iter->first) {
1705 m_logger->Warning(
kLogXrdClHttp,
"Curl handle finished while a broker operation was outstanding");
1706 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1707 }
1708 }
1709 m_op_map.erase(iter);
1710 running_handles -= 1;
1711 }
1712 }
1713 } while (msg);
1714 }
1715
1716 for (auto map_entry : m_op_map) {
1717 if (mres) {
1719 OpRecord(*map_entry.second.first, OpKind::Error);
1720 }
1721 if (multi_handle && map_entry.first) curl_multi_remove_handle(multi_handle, map_entry.first);
1722 }
1723
1724 m_queue->ReleaseHandles();
1725 curl_multi_cleanup(multi_handle);
1726}
std::pair< uint16_t, uint32_t > CurlCodeConvert(CURLcode res)
int emsg(int rc, char *msg)
static void CleanupDnsCache()
std::shared_ptr< CurlOperation > GetOperation() const
CURL * GetParentCurlHandle() const
void ReleaseHandle() override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override
static std::string_view GetUrlKey(const std::string &url, std::string &modified_url)
bool GetInt(const std::string &key, int &value)
std::pair< uint16_t, uint32_t > HTTPStatusConvert(unsigned status)
bool HTTPStatusIsError(unsigned status)
const uint64_t kLogXrdClHttp
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t errInternal
Internal error.
const uint16_t errConnectionError