Daytime.3 - An asynchronous TCP daytime server

Programming/Boost asio 2016. 4. 3. 23:09

다음에 기반함

http://www.boost.org/doc/libs/1_57_0/doc/html/boost_asio/tutorial/tutdaytime3.html



main 함수

int main()
{
  try
  {



먼저, 클라이언트의 연결요청을 받아들일 객체가 필요하다. io_service 객체는 소켓과 같은 I/O 서비스를 제공하므로, 이를 이용하자.


boost::asio::io_service io_service; tcp_server server(io_service);


io_service 객체를 run 하여 비동기 연산을 수행하도록 하자. 


    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }

  return 0;
}


tcp_server 클래스

class tcp_server
{
public:


생성자에서 포트 13의 요청을 listen 하도록 초기화 해주자.

  tcp_server(boost::asio::io_service& io_service)
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), 13))
  {
    start_accept();
  }

private:


start_accept() 함수에서 소켓을 생성하고 async accept 를 시작하여 새로운 연결을 받아들이자.

  void start_accept()
  {
    tcp_connection::pointer new_connection =
      tcp_connection::create(acceptor_.get_io_service());

    acceptor_.async_accept(new_connection->socket(),
        boost::bind(&tcp_server::handle_accept, this, new_connection,
          boost::asio::placeholders::error));
  }


handle_accept 는 start_accept 함수가 종료되어 accept 연산이 개시될 때 호출된다. 이 함수에서 클라이언트의 요청을 처리하고, start_accept() 를 다시 호출하여 다음 연결 요청을 받을 수 있도록 하자.

  void handle_accept(tcp_connection::pointer new_connection,
      const boost::system::error_code& error)
  {
    if (!error)
    {
      new_connection->start();
    }

    start_accept();
  }


tcp_connection 클래스


shared_ptr 과 enable_shared_from_this 를 활용하여 관련한 연산이 수행되는 한 tcp_connection 객체가 소멸되지 않도록 유지하자.

class tcp_connection
  : public boost::enable_shared_from_this<tcp_connection>
{
public:
  typedef boost::shared_ptr<tcp_connection> pointer;

  static pointer create(boost::asio::io_service& io_service)
  {
    return pointer(new tcp_connection(io_service));
  }

  tcp::socket& socket()
  {
    return socket_;
  }


start() 함수에서는 boost::asio::async_write() 함수를 활용하여 클라이언트로 데이터를 전송한다.

ip::tcp::socket::async_write_some() 대신 async_write 를 사용하여 전체 데이터 블록이 전송될 수 있도록 했음을 확인하자. 

  void start()
  {



비동기로 메시지를 전송하는 것이 완료될 때까지 데이터가 유효함을 보장하기 위해 클래스 멤버로 데이터를 보관 할 것이다. 

  message_ = make_daytime_string();



boost::bind() 를 활용하여 비동기 연산을 수행할때, 핸들러의 parameters 와 호출부의 arguments 가 일치하도록 주의한다.

placeholder 들 (boost::asio::placeholders::error / boost::asio::placeholders::bytes_transferred) 이 handle_write() 에서 사용되지 않기때문에, 실수로 제거될 수 있다.

    boost::asio::async_write(socket_, boost::asio::buffer(message_),
        boost::bind(&tcp_connection::handle_write, shared_from_this(),
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));


이 연결에 필요한 기타 추가 작업들은 이제 handle_write() 에서 모두 처리하면 된다. 

  }

private:
  tcp_connection(boost::asio::io_service& io_service)
    : socket_(io_service)
  {
  }

  void handle_write(const boost::system::error_code& /*error*/,
      size_t /*bytes_transferred*/)
  {
  }

  tcp::socket socket_;
  std::string message_;
};



필요한 핸들러 인자 제거


우리의 예제에서, error 와 bytes_transsferred 인자는 handle_write 함수 몸체에서 불필요하다. 이러한 경우, 함수에서 이를 제거하여 다음과 같은 시그니쳐를 갖게 할 수 있다.

  void handle_write()
  {
  }


이 함수를 호출부인 boost::asio::async_write() 를 호출할때에는 다음과 같이 수정하면 된다. 

  boost::asio::async_write(socket_, boost::asio::buffer(message_),
      boost::bind(&tcp_connection::handle_write, shared_from_this()));


  

정리

1. main 함수

* tcp_server 에 io_service 를 전달하여 생성하고, tcp_server 클래스를 initiate 한다.

* 메인 스레드에서 tcp_server 에서 사용하는 io_service 의 run() 함수 호출


2. tcp_server 클래스

* boost::asio::ip::tcp::begin_accept 를 활용하여 클라이언트의 연결을 받고, tcp_connection 객체를 생성한다.

* connection 의 start 함수를 호출한다.

* 완료 된 후, 다음 연결을 받기 위해 다시 accept 를 시작한다. 


3. tcp_connection 클래스

* enable_shared_from_this 를 활용하여 이 객체에 대한 연산이 수행되는 경우 소멸되지 않도록 보장한다.

* 클라이언트로 전달하는 데이터가 소멸되지 않게 하기 위해, 클래스의 멤버로 데이터를 보전한다.

* 전송 데이터가 분리되지 않게 하기 위해 async_write() 함수를 활용

* 사용하지 않는 place_holder 를 제거하여 핸들러 인자의 시그니쳐를 단순히 가져 갈 수도 있다 (async_write)


Full source code

--

#pragma once

#include 
#include 
#include 
#include "boost/shared_ptr.hpp"
#include "boost/asio.hpp"
#include "boost/bind.hpp"
#include "boost/enable_shared_from_this.hpp"

//* enable_shared_from_this 를 활용하여 이 객체에 대한 연산이 수행되는 경우 소멸되지 않도록 보장한다.
//* 클라이언트로 전달하는 데이터가 소멸되지 않게 하기 위해, 클래스의 멤버로 데이터를 보전한다.
//* 전송 데이터가 분리되지 않게 하기 위해 async_write() 함수를 활용
//* 사용하지 않는 place_holder 를 제거하여 핸들러 인자의 시그니쳐를 단순히 가져 갈 수도 있다(async_write)
class TCPConnection : public boost::enable_shared_from_this
{
public:
  typedef boost::shared_ptr pointer;
  static pointer create(boost::asio::io_service& io)
  {
    return pointer(new TCPConnection(io));
  }

  boost::asio::ip::tcp::socket& socket()
  {
    return socket_;
  }

  void start()
  {
    message_ = make_daytime_string();

    boost::asio::async_write(socket_, boost::asio::buffer(message_),
      boost::bind(&TCPConnection::handle_write, shared_from_this()));
  }

private:
  TCPConnection(boost::asio::io_service& io) : socket_(io)
  {
  }

  void handle_write()
  {
    std::cout << message_ << ", write_done" << std::endl;
  }

  std::string make_daytime_string()
  {
    std::time_t now = time(0);
    return ctime(&now);
  }

  boost::asio::ip::tcp::socket socket_;
  std::string message_;
};

//* boost::asio::ip::tcp::begin_accept 를 활용하여 클라이언트의 연결을 받고, tcp_connection 객체를 생성한다.
//* connection 의 start() 함수를 호출한다.
//* 완료 된 후, 다음 연결을 받기 위해 다시 accept 를 시작한다.
class TCPServer
{
public:
  TCPServer(boost::asio::io_service& io) :
    acceptor_(io, boost::asio::ip::tcp::endpoint(
      boost::asio::ip::tcp::v4(), 13))
  {
    start_accept();
  }
private:
  void start_accept()
  {
    TCPConnection::pointer new_connection =
      TCPConnection::create(acceptor_.get_io_service());
    // async_accept(socket, ACCEPT_HANDLER);
    acceptor_.async_accept(new_connection->socket(),
      boost::bind(&TCPServer::handle_accept, this, new_connection, boost::asio::placeholders::error));
  }
  void handle_accept(TCPConnection::pointer new_connection,
    const boost::system::error_code& error)
  {
    if (!error)
    {
      new_connection->start();
    }
    start_accept();
  }

  boost::asio::ip::tcp::acceptor acceptor_;
};

//* tcp_server 에 io_service 를 전달하여 생성하고, tcp_server 클래스를 initiate 한다.
//* 메인 스레드에서 tcp_server 에서 사용하는 io_service 의 run() 함수 호출
void server_run()
{
  try 
  {
    boost::asio::io_service io_service;
    TCPServer server(io_service);
    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }
}



2.3.4 Huffman Encoding Trees

허프만 인코딩 트리





이 절에서는 리스트 구조와 set 과 tree 를 다루기 위한 데이터 추상화를 연습해 보았다.

컴퓨터에서는 데이터를 나타내기 위해 0, 1 의 연속된 값을 사용하는데, 

아스키코드가 각의 문자를 7개 비트를 활용하여 표현하는 것이 그 예라 할 수 있다.


7 비트를 사용하면 2^7 개의 데이터를 구분할 수 있다, 


일반적으로 n 개를 표현하기 위해 개의 비트가 필요하다


고정길이 인코딩 방식


예를 들면 문자를 표현하기 위해


A 000 C 010 E 100 G 110

B 001 D 011 F 101 H 111


와 같이 표현할 수 있다.

 

이를 따를때, 

BACADAEAFABBAAAGAH

는 다음과 같이 인코딩 된다 


001000010000011000100000101000001001000000000110000111 (54비트)


이런 코드방식을 각각의 심볼이 같은 개수의 비트로 구성되어 있으므로, fixed-length (고정길이) 코드라고 부른다.


고정 길이 대신 가변 방식의 코딩도 쓸모 있을때가 있다. 


가변길이 인코딩 방식


빈도가 월등히 높은 문자에 대해 좀더 적은 비트수를 할당하여, 데이터를 좀더 효율적으로 메시지를 인코딩 할 수 있다.


위 고정길이 인코딩방식의 예를 variable-length (가변길이) 인코딩 방식으로 다음과 같이 재정의해보자.

A 0 C 1010 E 1100 G 1110

B 100 D 1011 F 1101 H 1111


이제, 같은 문자를 다음과 같이 인코딩한다.

100010100101101100011010100100000111001111 (42비트)

고정 길이 인코딩 방식과 비교했을때 20% 가량의 공간절약 효과를 얻을 수 있다.


이런 가변길이 인코딩방식에도 문제가 있는데, 바로 어떤 지점에서 하나의 심볼이 종료된 것인지 알기가 어렵다는 것이다. 

모스코드는 이런문제를 특별한 구분자(seperator code) 를 두는 것으로 해결 했다. 또 다른 해결책은, 

어떤 심볼도 다른 문자의 인코딩 값을 시작 코드값으로 가질 수 없도록 디자인 하는 것이다.

A를 0 이라고 정한 위 예의 경우, 어떤 문자도 0으로 시작할 수 없다. 


가변길이 인코딩 방식의 문제점

* 특정 심볼 종료 지점을 알기가 어렵다


solution

1. separator code

각각의 구분자를 심볼종료시점에 넣는 인코딩 방식

2. prefix code

어떤 문자도 다른 문자를 시작코드 값으로 가질 수 없는 인코딩 방식 


일반적으로 가변길이 prefix 인코딩 방식을 적절히 사용하면 상당한 이점을 볼수있다. 

이런 인코딩의 한 scheme 이 huffman 인코딩 방식이며, 허프만인코딩은 바이너리 트리로 표현되어 심볼을 인코드 할 수 있다.


허프만 인코딩


허프만 인코드는

* 각각의 리프가 아닌 트리의 노드는 자기 자식 노드들의 심볼을 모두 포함하고 있다. 

* 리프노드는 상대적 빈도에 따른 가중치가 부여 되어있다.

* 리프가 아닌 노드는 자기 자식 노드들의 가중치 값의 합을 가지고 있다. 

* 이 가중치는 인코딩/디코딩 과정 모두에 사용 된다.  


허프만트리에서 각 리프는 상대빈도와 나타내고자하는 문자가 함께 표시되어 있다. 

어떤 심볼이든 root 에서 시작하여 리프로 도달하여야 문자가 있음을 확인할 수 있다. 

아래로 내려갈때, 왼쪽 가지로 가는경우 0을 추가하고, 반대로 오른쪽으로 이동하면 1을 추가한다. 

D 의 경우 1011 이 된다.


비트열을 허프만 트리를 통해 decode 하고자 하면, root 에서 시작하여 비트열의 숫자를 보고 트리를 쫗아가면 된다.

리프에 도달할때마다 심볼을 알아낸 것이되고, 다시 다음 심볼을 알아내기위해 root 로 돌아가면 된다. 


허프만 트리의 생성


알파벳과 상대빈도가 주어져있으면, 어떻게 코드를 만드는게 최선일까? 

다음 예제를 살펴보자. 





알고리즘은 간단하다. 

가장 적은 상대빈도를 지닌 심볼이 가장 멀리 위치하는 것이다.

리프노드의 집합으로부터 시작하여 가장 빈도수가 낮은 두개의 노드를 찾아 이를 왼쪽노드, 오른쪽노드로 갖는 

부모노드를 만들며, 이 노드가 두 가중치의 합을 갖게끔한다.

두 노드를 set 에서 제거하고, 대신 이 새 노드로 대체한다. 이제 이 과정을 반복한다. 

노드가 단 하나만 남을때까지 이 과정을 반복한다. 


다음과 같은 과정으로 진행된다 .


Initial leaves {(A 8) (B 3) (C 1) (D 1) (E 1) (F 1) (G 1) (H 1)}

Merge {(A 8) (B 3) ({C D} 2) (E 1) (F 1) (G 1) (H 1)}

Merge {(A 8) (B 3) ({C D} 2) ({E F} 2) (G 1) (H 1)}

Merge {(A 8) (B 3) ({C D} 2) ({E F} 2) ({G H} 2)}

Merge {(A 8) (B 3) ({C D} 2) ({E F G H} 4)}

Merge {(A 8) ({B C D} 5) ({E F G H} 4)}

Merge {(A 8) ({B C D E F G H} 9)}

Final merge {({A B C D E F G H} 17)}


이 알고리즘은 항상 유일한 트리를 만드는 것이 아닌데, 여러 노드가 가장 작은 가중치를 가질 수도 있기때문에다. 또한, 어떤 두 노드가 먼저 merge 될지 결정하는 것도 임의기 때문이기도 하다. 



허프만 트리의 표현


이제 허프만트리를 메시지를 인코드/디코드하는데 사용하고, 허프만트리를 생성하는 시스템을 살펴보자.

각 리프들은 leaf 라는 심볼을 포함한 리스트로 표현한다.


(define (make-leaf symbol weight)

(list 'leaf symbol weight))

(define (leaf? object)

(eq? (car object) 'leaf))

(define (symbol-leaf x) (cadr x))

(define (weight-leaf x) (caddr x))


두 노드를 merge 하게 되면, 두 노드의 union 을 한 심볼의 집합이 된다.

우리의 심볼집합은 리스트로 표현했기 때문에, union 을 위해 append 프로시져를 사용한다.


(define (make-code-tree left right)

(list left

right

(append (symbols left) (symbols right))

(+ (weight left) (weight right))))


- (left right (symbol-l symbol-r) sum-wgt-lr) 와 같은 리스트 구조가 되겠다.


이제 selector 를 정의한다. 


(define (left-branch tree) (car tree))

(define (right-branch tree) (cadr tree))

(define (symbols tree)

(if (leaf? tree)

   (list (symbol-leaf tree))

   (caddr tree)))

(define (weight tree)

(if (leaf? tree)

(weight-leaf tree)

(cadddr tree)))


- 위에서 설명한 리스트 형태와 컬러링으로 매칭시켜보면 (left right (symbol-l symbol-r) sum-wgt-lr)


여기서보면 symbols 와 weight selector 들은 leaf 여부에 따라 그 동작이 다르다.

이를 잘 다루기 위해 친숙한 개념인 generic 을 scheme 에서도 사용할 수 있는데, 이는 2.4 절과 2.5절에서 살펴보도록 한다. 


디코딩 프로시저


(define (decode bits tree)

(define (decode-1 bits current-branch)

(if (null? bits)

   '()

   (let ((next-branch

   (choose-branch (car bits) current-branch)))

   (if (leaf? next-branch)

 (cons (symbol-leaf next-branch)

     (decode-1 (cdr bits) tree))

 (decode-1 (cdr bits) next-branch)))))

(decode-1 bits tree))

(define (choose-branch bit branch)

(cond ((= bit 0) (left-branch branch))

  ((= bit 1) (right-branch branch))

           (else (error "bad bit -- CHOOSE-BRANCH" bit))))


- leaf 까지 내려왔으면 그때의 심볼을 다음 디코딩대상 비트들의 결과에 붙인다.


각 요소에 가중치가 있는 set


방금의 트리표현에서, leaf 가 아닌 트리 노드의 경우 리스트로 표현한 심볼의 set 을 구성했었다. (symbol-l symbol-r)

하지만 요전에 알고리즘을 설명했을때 두 가장 작은 아이템을 merge 함으로써 얻어진 leaf 들과 tree 의 집합에 대해 동작할 필요가 있었다.

반복해서 set 에서 가장 작은 아이템들을 찾을 필요가 있었으므로, 순서가 있는 표현 방식을 사용하는것이 좋겠다.


가중치의 순서에 따라 정렬한 리프와 노드의 리스트의 집합을 구성할것이다. 

아래의 adjoin-set 프로시져는 예제 2.61 에 묘사된 것과 비슷하나, weight 를 통해서 비교가 되고 이미 존재하는 요소는 추가되지 않는 다는 점이 다르다.


(define (adjoin-set x set)

(cond ((null? set) (list x))

  ((< (weight x) (weight (car set))) (cons x set))

 (else (cons (car set)

  (adjoin-set x (cdr set))))))


아래의 프로시져는 심볼-빈도 페어리스트를 받아 최초의 leaf 들에 대한 ordered set 을 구성한다. 


(define (make-leaf-set pairs)

(if (null? pairs)

   '()

   (let ((pair (car pairs)))

       (adjoin-set (make-leaf (car pair) ; symbol

      (cadr pair)) ; frequency

  (make-leaf-set (cdr pairs))))))


'SICP > 2. Building Abstractions with Data' 카테고리의 다른 글

2.5.3 Example: Symbolic Algebra  (0) 2015.11.01
2.5 System with Generic Operations  (0) 2015.09.13
2.1.3 데이터란 무엇인가 ?  (0) 2015.02.07

Tutorial 5. Synchronising handlers in multithreaded programs

Programming/Boost asio 2015. 4. 11. 23:00

다음 내용에 기반함 

http://www.boost.org/doc/libs/1_57_0/doc/html/boost_asio/tutorial/tuttimer5.html


이번 튜토리얼 에서는 boost::asio::strand 클래스를 사용하여 멀티스레드 프로그램에서 콜백 핸들러를 동기화하는 방법을 알아보자.

이전 4개의 튜토리얼에서는 io_service::run() 함수를 오로지 하나의 스레드에서만 호출하였기 때문에 핸들러 동기화 이슈를 피할수 있었다. ( asio 라이브러리는 콜백 핸들러들이 io_service::run() 을 호출한 스레드에서만 실행될 수 있도록 보장해준다. 거기에, io_service::run() 을 하나의 스레드에서만 부름으로써, 동시에 실행되지 않는 것을 보장받은 것이다.) 


스레드를 하나만 이용하여 접근하는 것은 asio 를 이용한 어플리케이션을 시작하기에 좋았지만, 서버와 같은 프로그램에서 다음과 같은 제한이 존재한다는 것이다. 


* 핸들러가 많은 시간을 소요할 경우 응답성이 떨어진다.

* 멀티 프로세서 시스템에서 확장성이 없다.


이 문제를 해결하기 위해, io_service::run() 을 스레드 풀에서 호출하는 것으로 차선책을 찾아볼 수 있다.

그러나, 이렇게 되어 핸들러가 동시에 실행되는 것이 가능하게 되어, 스레드 safe 하지 않은 공용되는 데이터에 접근하는 핸들러들을 동기화하는 방법이 필요하게 된다.


이전 튜토리얼과 마찬가지로 Printer 를 정의하는 것에서부터 시작해보자. 타이머를 동시에 두개 돌리는 것으로 확장해볼 것이다.

class printer
{
public:


여기에 boost::asio::deadline_timer 멤버 쌍을 초기화 하는 코드와 boost::asio::strand 타입인 stand_ 멤버를 초기화하는 코드를 추가한다. strand 를 통해 디스패치 되는 핸들러들이 다음에 호출 되는 핸들러가 시작되기 전에 끝나는 것을 보장한다. 

이건 몇개의 스레드가 io_service::run() 을 호출 했냐와는 관계가 없다. 물론, 핸들러들은 동시 다발적으로 다른 핸들러들에서 실행 되어 boost::asio::strand 로 인해 디스패치 되지 않을 것이며, 다른 boost::asio::strand 객체를 통해 디스패치 되지 않을 것이다. 


  printer(boost::asio::io_service& io)
    : strand_(io),
      timer1_(io, boost::posix_time::seconds(1)),
      timer2_(io, boost::posix_time::seconds(1)),
      count_(0)
  {


비동기 동작들을 초기화 할때, 각각의 콜백들은 boost::asio::strand 객체를 통해 "wrapped" 된다. strand::wrap() 함수는 핸들러를 포함한 객체를 boost::asio::strand 객체를 통해 새롭게 반환한다. 핸들러를 동일한 boost::asio::strand 를 통해 'wrapping' 함으로써, 동시에 실행되지 않음을 보장할 수 있다.


    timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
    timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
  }

  ~printer()
  {
    std::cout << "Final count is " << count_ << "\n";
  }


멀티스레드 프로그램에서, 비동기 동작들은 공유 리소스에 접근할때는 동기화가 이루어 져야한다. 이 튜토리얼에서는, 핸들러 (print1 과 print2 ) 에서 공유되는 리소스가 std::cout 과 count_ 라고 하자.

  void print1()
  {
    if (count_ < 10)
    {
      std::cout << "Timer 1: " << count_ << "\n";
      ++count_;

      timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
      timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1, this)));
    }
  }

  void print2()
  {
    if (count_ < 10)
    {
      std::cout << "Timer 2: " << count_ << "\n";
      ++count_;

      timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
      timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2, this)));
    }
  }

private:
  boost::asio::io_service::strand strand_;
  boost::asio::deadline_timer timer1_;
  boost::asio::deadline_timer timer2_;
  int count_;
};


메인 함수에서는 이제 io_service::run() 이 두개의 스레드에서 호출되게 된다 : 하나는 메인스레드, 하나는 추가된 스레드에서 - 이것은 boost::thread 객체를 통해 이루어진다. 


싱글스레드에서 그랬던 것처럼 io_service::run() 에 대한 동시적 호출은 work 가 가 남아 있는 동안 실행되게 된다. 백그라운드의 스레드는 비동기 작업이 완료되기 전까지 완료되지 않는다.


int main()
{
  boost::asio::io_service io;
  printer p(io);
  boost::thread t(boost::bind(&boost::asio::io_service::run, &io));
  io.run();
  t.join();

  return 0;
}


full-source code



사용 코드 - main


실행 결과