-
Notifications
You must be signed in to change notification settings - Fork 388
/
Wdt.h
161 lines (134 loc) · 4.96 KB
/
Wdt.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* Copyright (c) 2014-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/
#pragma once
#include <wdt/Receiver.h>
#include <wdt/Sender.h>
// For Options
#include <wdt/WdtOptions.h>
// For ErrorCode
#include <wdt/ErrorCodes.h>
// For IAbortChecker and WdtTransferRequest - TODO: split out ?
#include <wdt/WdtBase.h>
#include <wdt/WdtResourceController.h>
#include <wdt/util/EncryptionUtils.h>
#include <ostream>
namespace facebook {
namespace wdt {
// Note: we use Wdt in the method names even if redundant with the class name
// so we can search callers easily
/**
* This class is the main API and entry point for using WDT (Warp speed Data
* Transfers).
*
* Example of use:
* // During the single threaded part of your service's initialization
* Wdt &wdt = Wdt::initializeWdt("app-name");
* // Optionally: change the WdtOptions as you need, for instance:
* wdt.getWdtOptions().overwrite = true;
* // Will use the (possibly changed above) settings, to configure wdt,
* // for instance throttler options
* // Sender for already setup receiver: (abortChecker is optional)
* wdtSend(transferRequest, myAbortChecker);
*/
class Wdt {
public:
/**
* Initialize the Wdt library and parse options from gflags.
* Also initialize crypto library if needed/not already initialized.
* @param:
* applicationName is used at fb for scuba reporting to differentiate apps
*/
static Wdt &initializeWdt(const std::string &appName);
/**
* Mutable reference to WdtOptions
*/
WdtOptions &getWdtOptions();
/**
* Return Wdt lib handle which has been previously initialized by calling
* @see initializeWdt()
* This is only needed if the caller code doesn't want to pass the Wdt
* instance around.
*/
static Wdt &getWdt(const std::string &appName);
/**
* TLS is unsupported for now
*/
virtual bool isTlsEnabled() {
return false;
}
/// High level APIs:
/**
* Send data for the shard identified by shardId to an already running/setup
* receiver whose connection url was used to make a WdtTransferRequest.
* Optionally passes an abort checker.
*/
virtual ErrorCode wdtSend(
const WdtTransferRequest &wdtRequest,
std::shared_ptr<IAbortChecker> abortChecker = nullptr,
bool terminateExistingOne = false);
virtual ErrorCode createWdtSender(const WdtTransferRequest &wdtRequest,
std::shared_ptr<IAbortChecker> abortChecker,
bool terminateExistingOne,
SenderPtr &senderPtr);
virtual ErrorCode releaseWdtSender(const WdtTransferRequest &wdtRequest);
/**
* Receive data. It creates a receiver on specified namespace/identifier and
* initialize it.
*/
virtual ErrorCode wdtReceiveStart(
const std::string &wdtNamespace, WdtTransferRequest &wdtRequest,
const std::string &identifier = "default",
std::shared_ptr<IAbortChecker> abortChecker = nullptr);
/**
* Finish receiving data. It finishes the receiver on specified
* namespace/identifier.
*/
virtual ErrorCode wdtReceiveFinish(const std::string &wdtNamespace,
const std::string &identifier = "default");
virtual ErrorCode printWdtOptions(std::ostream &out);
WdtResourceController *getWdtResourceController() {
return resourceController_.get();
}
/// destroys WDT object for an app
static void releaseWdt(const std::string &appName);
/// @return sender identifier for a transfer request
static std::string getSenderIdentifier(const WdtTransferRequest &req);
virtual void wdtSetReceiverSocketCreator(Receiver & /* unused */) {
}
/// Virtual Destructor (for class hierarchy)
virtual ~Wdt() {
}
protected:
/// Set to true when each instance is initialized
bool initDone_{false};
/// App name which is used in scuba reporting
std::string appName_;
WdtOptions options_;
/// responsible for initializing openssl
WdtCryptoIntializer cryptoInitializer_;
// TODO: share resource controller across apps
/// wdt resource controller
std::unique_ptr<WdtResourceController> resourceController_;
// Internal initialization so sub classes can share the code
virtual ErrorCode initializeWdtInternal(const std::string &appName);
// Optionally set socket creator and progress reporter (used for fb)
virtual ErrorCode wdtSetAbortSocketCreatorAndReporter(
WdtBase *target, const WdtTransferRequest &req,
std::shared_ptr<IAbortChecker> abortChecker);
// Internal wdt object creator/holder
static Wdt &getWdtInternal(const std::string &appName,
std::function<Wdt *()> factory);
/// Private constructor
explicit Wdt();
explicit Wdt(std::shared_ptr<Throttler> throttler);
/// Not copyable
Wdt(const Wdt &) = delete;
Wdt &operator=(const Wdt &) = delete;
};
} // namespace wdt
} // namespace facebook