21 #include "kinetic/byte_stream.h"
26 #include "glog/logging.h"
28 #include "kinetic/incoming_value.h"
29 #include "kinetic/outgoing_value.h"
30 #include "kinetic/reader_writer.h"
34 PlainByteStream::PlainByteStream(
int fd, IncomingValueFactoryInterface &value_factory)
35 : fd_(fd), value_factory_(value_factory) {}
37 bool PlainByteStream::Read(
void *buf,
size_t n) {
38 ReaderWriter reader_writer(fd_);
40 return reader_writer.Read(buf, n, &err);
43 bool PlainByteStream::Write(
const void *buf,
size_t n) {
44 ReaderWriter reader_writer(fd_);
45 return reader_writer.Write(buf, n);
48 IncomingValueInterface *PlainByteStream::ReadValue(
size_t n) {
49 return value_factory_.NewValue(fd_, n);
52 bool PlainByteStream::WriteValue(
const OutgoingValueInterface &value,
int* err) {
53 return value.TransferToSocket(fd_, err);
56 SslByteStream::SslByteStream(SSL *ssl) : ssl_(ssl) {}
58 SslByteStream::~SslByteStream() {
61 bool SslByteStream::Read(
void *buf,
size_t n) {
69 uint8_t* byte_buffer =
static_cast<uint8_t*
>(buf);
72 int bytes_read = SSL_read(ssl_, byte_buffer, n);
77 byte_buffer += bytes_read;
80 LOG(WARNING) <<
"Failed to read " << n <<
" bytes over SSL connection";
88 bool SslByteStream::Write(
const void *buf,
size_t n) {
96 const uint8_t* byte_buffer =
static_cast<const uint8_t*
>(buf);
99 int bytes_written = SSL_write(ssl_, byte_buffer, n);
101 if (bytes_written > 0) {
104 byte_buffer += bytes_written;
107 LOG(WARNING) <<
"Failed to write " << n <<
" bytes over SSL connection";
115 IncomingValueInterface *SslByteStream::ReadValue(
size_t n) {
118 char *buf =
new char[n];
123 std::string value(buf, n);
125 return new IncomingStringValue(value);
128 bool SslByteStream::WriteValue(
const OutgoingValueInterface &value,
int* err) {
130 if (!value.ToString(&s, err)) {
133 return Write(s.data(), s.size());