21 #include "nonblocking_packet_sender.h"
26 using std::shared_ptr;
27 using std::unique_ptr;
31 NonblockingSender::NonblockingSender(shared_ptr<SocketWrapperInterface> socket_wrapper,
32 shared_ptr<NonblockingReceiverInterface> receiver,
33 shared_ptr<NonblockingPacketWriterFactoryInterface> packet_writer_factory,
34 HmacProvider hmac_provider,
35 const ConnectionOptions &connection_options)
36 : socket_wrapper_(socket_wrapper), receiver_(receiver),
37 packet_writer_factory_(packet_writer_factory), hmac_provider_(hmac_provider),
38 connection_options_(connection_options), sequence_number_(0), current_writer_(nullptr),
41 void NonblockingSender::Enqueue(unique_ptr<Message> message, unique_ptr<Command> command,
42 const shared_ptr<const string> value, unique_ptr<HandlerInterface> handler,
43 HandlerKey handler_key) {
45 command->mutable_header()->set_connectionid(receiver_->connection_id());
46 command->mutable_header()->set_sequence(sequence_number_++);
48 message->set_commandbytes(command->SerializeAsString());
50 if(message->authtype() == com::seagate::kinetic::client::proto::Message_AuthType_HMACAUTH){
51 message->mutable_hmacauth()->set_identity(connection_options_.
user_id);
52 message->mutable_hmacauth()->set_hmac(hmac_provider_.ComputeHmac(*message, connection_options_.
hmac_key));
55 unique_ptr<Request> request(
new Request());
56 request->message = move(message);
57 request->command = move(command);
58 request->value = value;
59 request->handler = move(handler);
60 request->handler_key = handler_key;
62 request_queue_.push_back(move(request));
65 NonblockingSender::~NonblockingSender() {
66 while (!request_queue_.empty()) {
67 unique_ptr<Request> request = move(request_queue_.front());
68 request_queue_.pop_front();
69 request->handler->Error(
70 KineticStatus(StatusCode::CLIENT_SHUTDOWN,
"Sender shutdown"),
75 NonblockingPacketServiceStatus NonblockingSender::Send() {
77 if (!current_writer_) {
78 if (request_queue_.empty()) {
83 unique_ptr<Request> request = move(request_queue_.front());
84 request_queue_.pop_front();
85 message_sequence_ = request->command->header().sequence();
86 handler_key_ = request->handler_key;
87 current_writer_ = move(packet_writer_factory_->CreateWriter(socket_wrapper_,
88 move(request->message), request->value));
89 handler_ = move(request->handler);
92 NonblockingStringStatus status = current_writer_->Write();
93 if (status != kDone) {
94 if (status == kInProgress) {
98 CHECK_EQ(kFailed, status);
101 KineticStatus(StatusCode::CLIENT_IO_ERROR,
"I/O write error"),
nullptr);
104 while (!request_queue_.empty()) {
105 unique_ptr<Request> request = move(request_queue_.front());
106 request_queue_.pop_front();
107 request->handler->Error(KineticStatus(StatusCode::CLIENT_IO_ERROR,
108 "I/O write error"),
nullptr);
114 current_writer_.reset();
116 if (!receiver_->Enqueue(handler_, message_sequence_, handler_key_)) {
117 LOG(WARNING) <<
"Could not enqueue handler; already had a handler for sequence " <<
118 message_sequence_ <<
" and handler key " << handler_key_;
119 handler_->Error(KineticStatus(StatusCode::CLIENT_INTERNAL_ERROR,
120 "Could not enqueue handler"),
nullptr);
126 bool NonblockingSender::Remove(HandlerKey key) {
127 for (
auto it = request_queue_.begin(); it != request_queue_.end(); it++) {
128 if ((*it)->handler_key == key) {
129 request_queue_.erase(it);
int user_id
The ID of the user to connect as.
std::string hmac_key
The HMAC key of the user specified in user_id.