Kinetic C/C++ Client
 All Classes Functions Variables Pages
nonblocking_packet_service.cc
1 /*
2  * kinetic-cpp-client
3  * Copyright (C) 2014 Seagate Technology.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  *
19  */
20 
21 #include "nonblocking_packet_service.h"
22 
23 namespace kinetic {
24 
25 using std::string;
26 using std::shared_ptr;
27 using std::unique_ptr;
28 using std::move;
29 using std::make_pair;
30 
31 NonblockingPacketService::NonblockingPacketService(
32  shared_ptr<SocketWrapperInterface> socket_wrapper,
33  unique_ptr<NonblockingSenderInterface> sender,
34  shared_ptr<NonblockingReceiverInterface> receiver)
35  : socket_wrapper_(socket_wrapper), sender_(move(sender)), receiver_(receiver),
36  failed_(false), next_key_(0) {}
37 
38 NonblockingPacketService::~NonblockingPacketService() {
39  CleanUp();
40 }
41 
42 HandlerKey NonblockingPacketService::Submit(unique_ptr<Message> message, unique_ptr<Command> command,
43  const shared_ptr<const string> value, unique_ptr<HandlerInterface> handler) {
44  HandlerKey key = next_key_++;
45 
46  if (failed_) {
47  handler->Error(
48  KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Client already shut down"), nullptr);
49  } else {
50  sender_->Enqueue(move(message), move(command), value, move(handler), key);
51  }
52 
53  return key;
54 }
55 
56 bool NonblockingPacketService::Run(fd_set *read_fds, fd_set *write_fds, int *nfds) {
57  if (failed_) {
58  return false;
59  }
60  NonblockingPacketServiceStatus sender_status = sender_->Send();
61  if (sender_status == kError) {
62  CleanUp();
63  return false;
64  }
65  NonblockingPacketServiceStatus receiver_status = receiver_->Receive();
66  if (receiver_status == kError) {
67  CleanUp();
68  return false;
69  }
70  FD_ZERO(read_fds);
71  FD_ZERO(write_fds);
72  *nfds = 0;
73  if (sender_status == kIoWait) {
74  FD_SET(socket_wrapper_->fd(), write_fds);
75  *nfds = socket_wrapper_->fd() + 1;
76  }
77  if (receiver_status == kIoWait) {
78  FD_SET(socket_wrapper_->fd(), read_fds);
79  *nfds = socket_wrapper_->fd() + 1;
80  }
81  return true;
82 }
83 
84 // Free all allocated resources and mark the service as having encountered an
85 // irrecoverable error. This function exists so that in the event of an error
86 // we can close the connection immediately instead of leaving it open until the
87 // destructor is called.
88 void NonblockingPacketService::CleanUp() {
89  failed_ = true;
90 }
91 
92 bool NonblockingPacketService::Remove(HandlerKey handler_key) {
93  return sender_->Remove(handler_key) || receiver_->Remove(handler_key);
94 }
95 
96 
97 } // namespace kinetic