Kinetic C/C++ Client
 All Classes Functions Variables Pages
message_stream.cc
1 /*
2  * kinetic-cpp-client
3  * Copyright (C) 2014 Seagate Technology.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18  *
19  */
20 
21 #include "kinetic/message_stream.h"
22 
23 #include <arpa/inet.h>
24 #include <string.h>
25 
26 #include "glog/logging.h"
27 
28 #include "kinetic/incoming_value.h"
29 #include "kinetic/outgoing_value.h"
30 
31 namespace kinetic {
32 
33 MessageStream::MessageStream(uint32_t max_message_size_bytes, ByteStreamInterface *byte_stream)
34  : max_message_size_bytes_(max_message_size_bytes), byte_stream_(byte_stream) {}
35 
36 MessageStream::~MessageStream() {
37  delete byte_stream_;
38 }
39 
40 MessageStream::MessageStreamReadStatus MessageStream::ReadMessage(
41  ::google::protobuf::Message *message,
42  IncomingValueInterface** value) {
43  // First the header
44  uint32_t message_size, value_size;
45  if (!ReadHeader(&message_size, &value_size)) {
46  return MessageStreamReadStatus_INTERNAL_ERROR;
47  }
48 
49  // Reject large messages because they will cause huge allocations and other undesirable
50  // behavior
51  if (message_size > max_message_size_bytes_) {
52  return MessageStreamReadStatus_TOO_LARGE;
53  }
54 
55  // Now the message
56  char *message_bytes = new char[message_size];
57  if (!byte_stream_->Read(message_bytes, message_size)) {
58  LOG(WARNING) << "Unable to read message";
59  delete[] message_bytes;
60  return MessageStreamReadStatus_INTERNAL_ERROR;
61  }
62 
63  if (!message->ParseFromArray(message_bytes, message_size)) {
64  LOG(WARNING) << "Failed to parse protobuf message";
65  delete[] message_bytes;
66  return MessageStreamReadStatus_INTERNAL_ERROR;
67  }
68 
69  delete[] message_bytes;
70 
71  // Now read the value (if any)
72  *value = byte_stream_->ReadValue(value_size);
73  if (*value == NULL) {
74  return MessageStreamReadStatus_INTERNAL_ERROR;
75  }
76 
77  return MessageStreamReadStatus_SUCCESS;
78 }
79 
80 int MessageStream::WriteMessage(const ::google::protobuf::Message &message,
81  const OutgoingValueInterface& value, int* err) {
82  // First the header
83  if (!WriteHeader(message.ByteSize(), value.size())) {
84  LOG(WARNING) << "Failed to write header";
85  return 1;
86  }
87 
88  // Now the message
89  std::string message_string;
90  if (!message.SerializeToString(&message_string)) {
91  LOG(WARNING) << "Failed to serialize protocol buffer";
92  return 2;
93  }
94  if (!byte_stream_->Write(message_string.data(), message_string.size())) {
95  LOG(WARNING) << "Failed to write message";
96  return 3;
97  }
98 
99  // And finally the value if any
100  if (!byte_stream_->WriteValue(value, err)) {
101  LOG(WARNING) << "Failed to write value";
102  return 4;
103  }
104 
105  return 0;
106 }
107 
108 MessageStreamFactory::MessageStreamFactory(SSL_CTX *ssl_context,
109  IncomingValueFactoryInterface &value_factory)
110  : ssl_context_(ssl_context), value_factory_(value_factory) {
111  ssl_created_ = false;
112  }
113 
114 
115 MessageStreamFactory::~MessageStreamFactory() {
116  if (ssl_created_) {
117  SSL_free(ssl_);
118  }
119 }
120 
121 bool MessageStreamFactory::NewMessageStream(int fd, bool use_ssl, SSL *ssl, uint32_t max_message_size_bytes,
122  MessageStreamInterface **message_stream) {
123  if (use_ssl) {
124  if (ssl == NULL) {
125  ssl_ = SSL_new(ssl_context_);
126  // We want to automatically retry reads and writes when a renegotiation
127  // takes place. This way the only errors we have to handle are real,
128  // permanent ones.
129 
130  if (ssl_ == NULL) {
131  LOG(ERROR) << "Failed to create new SSL object";
132  return false;
133  }
134  SSL_set_mode(ssl_, SSL_MODE_AUTO_RETRY);
135  if (SSL_set_fd(ssl_, fd) != 1) {
136  LOG(ERROR) << "Failed to associate SSL object with file descriptor";
137  SSL_free(ssl_);
138  return false;
139  }
140  if (SSL_accept(ssl_) != 1) {
141  LOG(ERROR) << "Failed to perform SSL handshake";
142  LOG(ERROR) << "The client may have attempted to use an SSL/TLS version below TLSv1.1";
143  SSL_free(ssl_);
144  return false;
145  }
146  ssl_created_ = true;
147  ssl = ssl_;
148  }
149  LOG(INFO) << "Successfully performed SSL handshake";
150  *message_stream = new MessageStream(max_message_size_bytes, new SslByteStream(ssl));
151  } else {
152  *message_stream =
153  new MessageStream(max_message_size_bytes, new PlainByteStream(fd, value_factory_));
154  }
155 
156  return true;
157 }
158 
159 bool MessageStream::ReadHeader(uint32_t *message_size, uint32_t *value_size) {
160  char header[9];
161  if (!byte_stream_->Read(header, sizeof(header))) {
162  return false;
163  }
164 
165  if (header[0] != 'F') {
166  LOG(WARNING) << "Received invalid magic value " << header[0];
167  return false;
168  }
169 
170  memcpy(reinterpret_cast<char *>(message_size), header + 1, sizeof(*message_size));
171  memcpy(reinterpret_cast<char *>(value_size), header + 5, sizeof(*value_size));
172  *message_size = ntohl(*message_size);
173  *value_size = ntohl(*value_size);
174 
175  return true;
176 }
177 
178 bool MessageStream::WriteHeader(uint32_t message_size, uint32_t value_size) {
179  char header[9];
180  header[0] = 'F';
181  message_size = htonl(message_size);
182  value_size = htonl(value_size);
183  memcpy(header + 1, reinterpret_cast<char *>(&message_size), sizeof(message_size));
184  memcpy(header + 5, reinterpret_cast<char *>(&value_size), sizeof(value_size));
185  return byte_stream_->Write(header, sizeof(header));
186 }
187 
188 } // namespace kinetic