21 #include "nonblocking_packet_service.h"
26 using std::shared_ptr;
27 using std::unique_ptr;
31 NonblockingPacketService::NonblockingPacketService(
32 shared_ptr<SocketWrapperInterface> socket_wrapper,
33 unique_ptr<NonblockingSenderInterface> sender,
34 shared_ptr<NonblockingReceiverInterface> receiver)
35 : socket_wrapper_(socket_wrapper), sender_(move(sender)), receiver_(receiver),
36 failed_(false), next_key_(0) {}
38 NonblockingPacketService::~NonblockingPacketService() {
42 HandlerKey NonblockingPacketService::Submit(unique_ptr<Message> message, unique_ptr<Command> command,
43 const shared_ptr<const string> value, unique_ptr<HandlerInterface> handler) {
44 HandlerKey key = next_key_++;
48 KineticStatus(StatusCode::CLIENT_SHUTDOWN,
"Client already shut down"),
nullptr);
50 sender_->Enqueue(move(message), move(command), value, move(handler), key);
56 bool NonblockingPacketService::Run(fd_set *read_fds, fd_set *write_fds,
int *nfds) {
60 NonblockingPacketServiceStatus sender_status = sender_->Send();
61 if (sender_status == kError) {
65 NonblockingPacketServiceStatus receiver_status = receiver_->Receive();
66 if (receiver_status == kError) {
73 if (sender_status == kIoWait) {
74 FD_SET(socket_wrapper_->fd(), write_fds);
75 *nfds = socket_wrapper_->fd() + 1;
77 if (receiver_status == kIoWait) {
78 FD_SET(socket_wrapper_->fd(), read_fds);
79 *nfds = socket_wrapper_->fd() + 1;
88 void NonblockingPacketService::CleanUp() {
92 bool NonblockingPacketService::Remove(HandlerKey handler_key) {
93 return sender_->Remove(handler_key) || receiver_->Remove(handler_key);