Chapter2

Chapter Two - Intermediate Stuff

top prev next

1장에서는 ØMQ의 주요 패턴(request-reply, publish-subscribe, and pipeline)에 대한 기본 예제와 소개를 했습니다. 이번 장에서는 손을 좀 놀려 볼 것이며 실제 프로그램에서 이들 도구를 사용하는 방법을 배우기 시작할 것입니다.

본 장에서 다룰 내용 :

  • ØMQ를 생성하고 작동하는 방법.
  • 소켓에 메시지를 송/수신하는 방법.
  • ØMQ 비동기 I/O모듈을 만드는 방법.
  • 한 thread에서 멀티 소켓을 조작하는 방법.
  • 치명적이거나 비 치명적인 에러를 적절하게 처리하는 방법.
  • ‘Ctrl-C’와 같은 interrupt signals를 처리하는 방법
  • MQ 어플리케이션을 정상적으로 종료하는 방법
  • MQ 어플리케이션의 메모리 누수를 확인하는 방법
  • 다중(multipart) 메시지를 송/수신하는 방법
  • 네트워크를 통해 메시지를 전달하는 방법
  • 간단한 메시지 대기열 브로거(broker)를 만드는(build) 방법
  • ØMQ와 함께 멀티 스레드 응용 프로그램을 작성하는 방법.
  • 스레드 사이에 신호처리를 위해 ØMQ를 사용하는 방법.
  • 네트워크 노드를 조율하기 위해 ØMQ를 사용하는 방법.
  • 소켓 ID를 사용하여 내구성(durable) 있는 소켓을 만드는 방법.
  • Pub-Sub 메시지를 생성하고 사용하는 방법
  • 충돌에서 복구할 수 있는 내구성 있는 subscribers를 만드는 방법.
  • 메모리 overflow를 방지하기 위해 high-water mark(HWM)를 사용하는 것

The Zen of Zero

top prev next

ØMQ의 Ø는 모든 tradeoffs에 관한 것입니다. 한편으로 이 이상한 이름은 구글과 트위터에 ØMQ의 게재 빈도를 감소하고 다른 한편으로는, 일부 덴마크 민족이나 사용할 만한 "ØMG røtfl"같은 사용을 귀찮아 합니다. Ø는 쉽게 찾을 수 있는 zero가 아니며, "Rødgrød med Fløde!" 이 “당신 이웃이 Grendel의 직접적인 후예 일 수 있다”라는 의미를 뜻한다는 것은 분명한 모욕입니다. 공정 거래 처럼 보입니다

원래 ØMQ의 zero는 “zero broker”와 “zero latency” 를 의미 하였습니다. 한편으로는, 다른 목표를 충당해 왔습니다.: zero administration, zero cost, zero waste. 더 일반적으로 “zero”는 프로젝트에서 퍼진 최소주의 문화를 말합니다. 우리는 새로운 기능을 발표하기 보다는 복잡성을 제거하는데 힘을 집중합니다.

The Socket API

top prev next

솔직히 말해서, ØMQ는 미끼상품(switch-and-bait)과 같은 종류입니다. 이것은 여러분을 위한 것이고, 여러분 보다는 우리에게 더 피해가 가는 것이기에 사과하진 않겠습니다. 익숙한 BSD 소켓 API가 있지만, 이것은 분산 소프트웨어를 설계하고 작성하는 방법에 대한 여러분의 안목을 형성하는데 느리게 할 것이며 메시지 처리 기계 부분을 숨깁니다.

소켓은 사실상 네트워크 프로그래밍을 위한 표준 API입니다. 특히 개발자의 입맛에 맞게 만든 ØMQ는 표준 소켓 API를 사용합니다. 이것은 “메시지 지향 미들웨어(Message Oriented Middleware)”가 피자(pizza)에 대한 이상한 갈망에서 멀어지게 하는 “Extra Spicy Sockets”으로 변화하고 더 알기를 갈망하도록 했습니다.

좋은 페퍼로니 피자 처럼 ØMQ소켓은 소화하기 쉽습니다. 소켓은 BSD소켓처럼 네 부분으로 구분 됩니다. :

  • 메시지를 쓰고 읽는 것으로 데이터를 운반하기 위해 소켓을 사용(zmq_send(3), zmq_recv(3)).

C코드로는 아래와 같습니다. :

void *mousetrap;

// Create socket for catching mice
mousetrap = zmq_socket (context, ZMQ_PULL);

// Configure the socket
int64_t jawsize = 10000;
zmq_setsockopt (mousetrap, ZMQ_HWM, &jawsize, sizeof jawsize);

// Plug socket into mouse hole
zmq_connect (mousetrap, "tcp://192.168.55.221:5001");

// Wait for juicy mouse to arrive
zmq_msg_t mouse;
zmq_msg_init (&mouse);
zmq_recv (mousetrap, &mouse, 0);
// Destroy the mouse
zmq_msg_close (&mouse);

// Destroy the socket
zmq_close (mousetrap);

소켓은 항상 void pointers이고 구조화된 메시지입니다. 그래서 C에서 당신은 그렇게 소켓을 전달하지만, zmq_send(3) , zmq_recv(3) 와 같이 메시지를 처리하는 모든 함수에서는 메시지의 주소를 전달합니다. 알고 있는 대로 “ØMQ에서 모든 소켓은 우리에게 속해 있다”로 인식하지만, 메시지는 코드에서 실제로 당신이 소유합니다.

소켓을 생성, 소멸, 설정하는 것은 어떤 개체를 위해 필요한 작업입니다. 그러나 ØMQ는 비동기, 탄성 섬유라는 점을 기억해야 합니다. 이것은 우리가 네트워크 토폴로지에 소켓을 어떻게 끼워 넣고, 그 후에 어떻게 소켓을 사용하는지에 따라 좀 다릅니다.

Plugging Sockets Into the Topology

top prev next

두 노드간의 연결을 위해서는 한 노드에서 zmq_bind(3)를 사용하고 다른 노드에는 zmq_connect(3)을 사용합니다. 일반적으로, zmq_bind(3)을 수행하는 노드는 고정 네크워크 주소를 가지고 있는 서버이고 zmq_connect(3) 을 수행하는 노드는 잘 알려지지 않은 혹은 임시 네트워크 주소를 가지는 클라이언트 입니다. 그래서 우리는 "bind a socket to an endpoint"와 "connect a socket to an endpoint"이라 말합니다. Endpoint는 잘 알려진 네트워크 주소입니다.

ØMQ연결은 전통적인 TCP연결과 다소 다릅니다. 주요 주목할만한 차이점은 다음과 같습니다. :

  • 이것은 서버가 이미 endpoint에 zmq_bind(3)를 했든 안 했든 클라이언트가 endpoint에 zmq_connect(3) 했을 때 존재 합니다.
  • 이것은 비동기 이며, 필요할 시점과 장소에 마술과 같이 존재하는 대기열(queues)을 가지고 있습니다.
  • 이것은 각각의 끝점에 사용되는 소켓의 종류에 따라, 특정 "Messaging Pattern"을 표현 할 수 있습니다.
  • 하나의 소켓이 많은 incoming/outgoing 연결을 가질 수 있습니다.
  • zmq_accept() 메소드는 없습니다. 소켓이 endpoint에 바인딩하는 경우 자동으로 연결을 받아 들입니다.
  • 당신의 어플리케이션 코드는 이것을 직접 연결하면 작동하지 않을 수 있으며, 이들은 소켓으로 캡슐화되어 있습니다.

많은 아키텍쳐들은 client-server 모델의 몇가지 유형을 따릅니다 (서버는 가장 안정적인 컴포넌트이고, 클라이언트는 가장 다이나믹한 컴포넌트 입니다.). 종종 주소에 이슈가 있습니다:서버는 클라이언트를 볼 수 있지만 클라이언트는 그렇지 않습니다. 그래서 대개 서버는 zmq_bind(3)해야 되고 클라이언트는 zmq_connect(3) 를 해야 합니다. 이것은 또한 비정상적인 네트워크 아키텍쳐에 대한 몇 가지 예외로, 당신이 사용중인 소켓의 종류에 따라 달라집니다. 우리는 나중에 소켓 유형에 대해서 알아 볼 것입니다.

지금, 서버를 시작하기 전에 클라이언트를 시작하는 것을 생각해 봅시다. 전통적인 네트워크에서 우리는 큰 에러가 발생합니다. 그러나, ØMQ는 시작하고 임의로 중지하는 것을 허용합니다. 클라이언트가 zmq_connect(3)을 하자마자 연결은 존재하며 노드는 소켓에서 메시지를 쓰기위해 시작할 수 있습니다. 몇가지 단계를 거쳐, 서버는 살아나고, zmq_bind(3)하고 ØMQ는 메시지를 전달하기 시작합니다.

서버 노드는 많은 endpoint에 바인딩할 수 있으며 하나의 소켓을 사용하여 이 작업을 수행할 수 있습니다. 이것은 다른 전송매체를 통해 연결을 수락한다는 의미입니다. :

zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "ipc://myserver.ipc");

당신은 두번씩 같은 endpoint에 바인딩 할 수 없습니다. 이것은 에러의 원인이 됩니다.

매번 클라이언트 노드는 endpoints중 하나에 zmq_connect(3)을 하며 서버노드의 소켓의 다른 연결을 얻습니다. 소켓의 연결을 하는 수는 제한이 없습니다. 클라이언트 노드는 단일 소켓을 사용하여 많은 endpoint에 연결할 수 있습니다.

대부분의 경우, 어떤 노드가 클라이언트인지 혹은 서버인지는 메시지 흐름 보다는 네크워크 토폴로지에 따르게 됩니다. 그러나, 동일한 소켓 유형은 서버든 클라이언트든 다르게 처리되는 경우(연결이 끊긴 후 재전송할때)가 있다.

이것이 의미하는 것은 거의 고정된 endpoint 주소를 가진 토폴로지의 안정적인 부분으로 항상 “server”관점에서, 그리고 왔다 갔다하는 유동적인 부분은 ‘clients’ 관점에서 생각해야 된다는 것입니다. 그 다음 어플리케이션은 이 모델을 통해 디자인 합니다. 이처럼 해야 될 것들은 더 많이 있습니다.

소켓은 유형이 있습니다. 소켓유형은 소켓의 의미를 정의합니다. 이것은 안쪽과 바깥쪽 라우팅 메시지, 큐 등에 대한 정책입니다. 당신은 소켓의 어떤 유형(예를 들어 publisher 소켓과 subscriber 소켓)과도 연결할 수 있습니다. 소켓은 메시지 패턴과 함께 동작합니다. 더 자세한 것은 나중에 살펴 보겠습니다.

ØMQ은 기본으로 제공하는 메시지 큐잉 시스템을 이용하여 다른 방법으로 소켓을 연결할 수 있습니다. 우리가 나중에 얘기할 디바이스와 주제 라우팅와 같은 상위 단계가 있습니다. 그러나 본질적으로, ØMQ는 아이의 건축 장난감처럼 함께 조각들을 연결 하며 네트워크 아키텍처를 정의합니다.

Using Sockets to Carry Data

top prev next

당신이 zmq_send(3)zmq_recv(3) 메소드를 사용하여 메시지를 보내고 받을 수 있습니다. 메소드 이름은 일반적이지만, ØMQ의 I/O 모델은 당신이 이해하기에 시간이 필요하며, TCP모델과는 다릅니다.

fig10.png

데이터를 처리하는 경우 TCP 소켓과 ØMQ 소켓 사이의 주요 차이점에 대해 살펴보겠습니다. :

  • ØMQ 소켓은 바이트 (TCP에서와 같이) 또는 프레임(UDP와 같이)이라기 보다는 메시지를 처리합니다. 메시지는 이진 데이터로 길이가 지정된 BLOB입니다. 우리는 곧 설계가 성능에 최적화되어 있고 그래서 다소 이해하기 어려운 메시지를 볼 것입니다.
  • ØMQ 소켓은 백그라운드 스레드에서 I/O를 처리합니다. 이것은 메시지가 어플리케이션이 바쁘든 상관없이 로컬 입력 대기열에 도착하고, 로컬 출력 대기열로부터 전송한다는 것을 의미 합니다. 이러한 방법에 의해, 메모리 대기열을 구성할 수 있습니다.
  • ØMQ 소켓은 소켓의 종류에 따라 다르지만, 많은 다른 소켓을 연결하고 연결 될 수 있습니다. TCP는 일대일 전화통화를 에뮬레이트 하지만, ØMQ는 1:N (라디오 방송 등),N:N (우체국 등), N:1 (메일 박스 등), 그리고 1:1을 구현할 수 있습니다..
  • ØMQ 소켓은 여러 endpoint (fan-out 모델)로 보내거나, 여러 끝점 (fan-in모델)에서 받을 수 있습니다.
fig11.png

따라서 소켓에 메시지를 작성하는 것은 하나 또는 한 번에 여러 다른 곳으로 메시지를 보낼 수 있으며, 반대로 하나의 소켓은 메시지를 보내는 모든 연결에서 메시지를 수집합니다. 각 보낸 사람도 기회를 얻을 수 있도록 zmq_recv(3) 메소드는 공정한 큐잉 알고리즘을 사용합니다.

zmq_send(3)메소드는 실제로 소켓 연결로 메시지를 전송하지 않습니다. 그것은 I/O 스레드가 비동기적으로 그것을 보낼 수 있도록 메시지를 큐잉 합니다. 그것은 몇 가지 예외의 경우를 제외하고 차단하지 않습니다. 그래서 메시지는 어플리케이션이 zmq_send(3)의 리턴을 받아도 반드시 전송되지는 않습니다. 만약 당신이 zmq_msg_init_data(3)을 사용하여 메시지를 생성했다면 데이터를 재사용하거나 free할 수 없습니다. 그렇지 않다면 I/O스레드는 빠르게 그자체를 덮어 쓰거나 할당되지 않은 garbage가 될 것입니다. 이것은 초보자에게는 일반적인 실수입니다. 우리는 적당하게 메시지 처리하는 방법을 나중에 볼 것입니다.

Unicast Transports

top prev next

ØMQ는 유니캐스트(unicast) 전송(inproc, IPC, 그리고 TCP)과 멀티캐스트(multicast) 전송(epgm, PGM)을 제공합니다. 멀티캐스트는 나중에 다루게 될 고급 기술입니다. fan-out 비율이 1-to-N 유니캐스트를 불가능하게 한다는 것을 알지 못한다면 이것을 사용하지 마십시오

대부분의 경우 tcp를 사용합니다. 이것은 유연하고 간편하고 대개 충분히 빠릅니다. ØMQ의 TCP 전송은 연결하기 전에 endpoint가 존재하는지 요구하지 않기 때문에 ‘disconnected’라고 부릅니다. 클라이언트와 서버는 연결할 수 있고 언제든지 바인딩할 수 있으며, 갔다 되돌아 올 수 있습니다. 그리고 이것은 어플리케이션에 투명하게 남아 있습니다.

Inter-process전송, IPC는 LAN에서 추상화되어 있다는 것을 제외하고는 TCP와 같습니다. 이것은 IP주소나 도메인 명이 필요하지 않다는 것입니다. 이것은 몇몇 목적을 위해서 효과적이며 이 책에서 매우 자주 사용합니다. ØMQ의 IPCTCP처럼 연결이 끊어져 있습니다. 이것은 한가지 제한이 있습니다:Window 환경에서는 작동하지 않습니다. 이것은 ØMQ의 향후 버전에서는 가능하게 될지 모릅니다. 관습적으로 우리는 다른 파일명과 잠재적인 충돌을 피하기 위해 ‘.ipc’ 확장자로 endpoint이름을 사용합니다. UNIX에서 당신이 ipc endpoint을 사용한다면 다른 사용자로 실행중인 프로세스가 공유하지 못하도록 이것에 적당한 권한이 필요할 것입니다. 당신은 또한 모든 프로세스가 이 파일을 참조할 수 있도록 해야 합니다.

Inter-thread 전송, inproc는 연결된 다음에 신호 전송을 합니다. 이것은 tcpipc보다 휠씬 빠릅니다. 이것은 ipctcp에 비해 특별한 제한이 있습니다.:당신이 연결하기 전에 바인딩 해야 합니다. 이것은 ØMQ의 향후 버전에서 수정 될 수 있지만, 현재는 inproc소켓을 사용하여 정의합니다. 우리는 한 소켓을 만들고 바인딩하고, 다른 소켓을 생성하고 연결한 자식 스레드를 시작합니다.

ØMQ is Not a Neutral Carrier

top prev next

초심자가 ØMQ에 대해 일반적인 질문일 수 있지만, (이것은 나 자신에게 한 질문의 하나입니다.) "어떻게 ØMQ를 사용하여 XYZ 서버에 메시지를 쓸 수 있습니까?" 예를 들어, "내가 어떻게 ØMQ에서 HTTP 서버와 통신할 수 있습니까?"

이 의미는 만약 우리가 HTTP 요청과 응답을 처리하기 위해 일반적인 소켓을 사용한다면, ØMQ 는 좀더 빠르고 더 잘 할 수 있어야 한다는 것입니다.

아쉽게도 대답은 “이것은 작동되지 않습니다” 입니다. ØMQ는 중간 연결매체가 아니라, 이것을 사용하는 전송 프로토콜에서의 프레임입니다. 이 프레임은 그들 자신 프레임을 사용하는 기존 프로토콜과 호환되지 않습니다. 예를 들어, 여기 TCP/IP 구간에 HTTP요청과 ØMQ요청이 있습니다. :

fig12.png

HTTP 요청은 간단한 프레임 구분자로 CR - LF 사용하고, ØMQ는 길이가 지정된 프레임을 사용합니다. :

fig13.png

그래서 당신은 ØMQ를 사용해서(예를 들면 request-reply소켓 패턴) HTTP와 같은 프로토콜 처럼 사용 할 수 있습니다. 하지만 HTTP 일 수는 없습니다.

질문에 좋은 답변은 “나의 새로운 XYZ서버를 만들 때 어떻게 ØMQ를 사용하여 효과적으로 만들 수 있습니까?” 입니다. 당신은 어떤 경우에서든 연결을 원하는 어떤 프로토콜로든 구현하기를 원할 것입니다. 하지만 당신은 실제 작업을 수행하는 ØMQ 백앤드에 그 프로토콜 서버를 연결할 수 있습니다. 여기서 아름다운 부분은 당신이 원하는 대로 로컬 혹은 원격으로 실행하는 모든 언어 코드를 사용하여 백앤드로 확장할 수 있다는 것입니다. Mongrel2 웹서버가 그러한 구조의 좋은 예입니다

I/O Threads

top prev next

우리는 ØMQ가 백앤드 스레드에서 I/O를 수행한다고 알고 있습니다. 하나의 I / O 스레드(모든 소켓을 위한)는 모든 극단적인 어플리케이션을 제외한 모든 어플리케이션에서 충분합니다. 이것은 context를 만들 때 사용하는 마법’1’이며, “한 I/O스레드를 사용하라”라는 의미 입니다. :

void *context = zmq_init (1);

ØMQ 응용 프로그램과 연결 당 한 소켓을 생성하지 않는 전통적인 네트워크 응용 프로그램 사이의 주요 차이점이 있습니다. 한 소켓이 작업의 특정 지점을 위해 모든 송/수신 연결을 처리합니다. 예를 들어, 당신은 수천 subscriber에 publish할 때, 하나의 소켓을 통해서 합니다. 당신은 20개 서비스 간의 분산 작업을 할 때, 하나의 소켓을 통해서 합니다. 당신은 수천 웹 응용 프로그램에서 데이터를 수집할 때 , 하나의 소켓을 통해서 합니다.

이것은 응용 프로그램을 작성하는 방법에 근본적인 영향을 미치고 있습니다. 전통적인 네트워크 응용 프로그램이 원격 연결 당 한 프로세스 또는 한 스레드를 가지고 있으며, 그 프로세스 또는 스레드가 한 소켓을 핸들링 합니다. ØMQ은 단일 스레드로 이 전체 구조를 깨거나(collapse), 확장의 필요에 의해 그것을 깰(break up) 수 있습니다

Core Messaging Patterns

top prev next

ØMQ 소켓 API의 갈색 표지 안에 메시징 패턴의 세계가 자리잡고 있습니다. 당신은 엔터프라이즈 메시징에 대한 배경 지식이 있다면, 약간 친숙할 것입니다. 대부분 ØMQ 초보자들은 놀라겠지만, 우리는 이렇게 소켓이 다른 노드를 나타내는 TCP 패러다임으로 사용하고 있습니다.

이제 간단히 ØMQ가 당신을 위해 무엇을 하는지 정리해 보겠습니다. 그것은 신속하고 효율적으로 노드에 데이터 (메시지)를 제공합니다. 노드는 스레드, 프로세스, 또는 시스템에 매핑할 수 있습니다. 이것은 당신의 어플리케이션에 사용할 수있는 단일 소켓 API를 제공합니다. 이것은 실제적인 전송형식((like in-process, inter-process, TCP, or multicast)이 무엇이든지 상관없습니다. 이것은 오고 가고 함으로써 대상에 자동으로 재접속 합니다. 필요에 따라 송/수신 양쪽에 메시지를 큐잉할 수 있습니다. 그 때 해당 디스크에 넘치지 않고, 프로세스의 메모리가 부족하지 않도록 신중하게 대기열을 관리합니다. 그것은 소켓 오류로 처리합니다. 그것은 백그라운드 스레드의 I/O 모두에 해당됩니다. 그것은 노드 사이의 통신에 대해 잠금이 없는 기술(lock-free)을 사용하므로 잠금 장치(locks), 대기(waits), 세마포(semaphores), 또는 교착상태(deadlocks)가 절대로 필요 없습니다.

그러나 ‘패턴’이라 불리는 정확한 방법에 따라 메시지를 라우트 하고 큐잉합니다. 이것은 ØMQ가 제공하는 패턴입니다. 이것은 데이터와 분산 작업을 하기 위한 최고의 경험이 녹아 있습니다. ØMQ의 패턴이 고정 되어 있지만, 향후 버전에는 사용자 정의 패턴을 제공할 수도 있습니다.

ØMQ 패턴은 타입이 일치하는 소켓 쌍에 의해 구현됩니다. 즉, 당신은 소켓 유형을 이해하고 그것이 함께 동작하는 방법을 이해해야 ØMQ 패턴을 이해하는 것입니다. 대부분이 배우는데 조금 시간이 걸립니다.

기본으로 제공하는 핵심적인 ØMQ 패턴은 다음과 같습니다. :

  • Request-reply : 클라이언트와 서비스의 집합을 연결하는 패턴. 이것은 원격 프로시저 호출 및 작업 분산 패턴입니다.
  • Publish-subscribe : publisher와 subscriber 집합을 연결하는 패턴, 이것은 데이터 분산 패턴입니다.
  • Pipeline : 여러 단계와 루프를 가질 수 있는 fan-out /fan-in패턴으로 된 노드를 연결합니다. 이것은 병렬 작업 분산 및 수집 패턴입니다.

우리는 1장에서 이들 각각을 살펴 봤습니다. 사람들은 아직도 전통적인 TCP 소켓의 관점에서 ØMQ를 생각하여 사용 하려고 하는 경향이 있는 한 패턴이 더 있습니다. :

  • Exclusive pair : 독점 쌍의 두 소켓을 연결하는 패턴. 이것은 특정 고급 사용 - 경우에 낮은 수준의 패턴입니다. 우리는 이 장의 마지막에서 예를 다룰 것입니다..

zmq_socket(3) 설명(man) 페이지는 이 패턴에 대해 명백하게 설명합니다. 이것은 이해 할 때까지 여러 번 읽을 가치가 있습니다. 우리는 그것에 담겨 있는 각각의 패턴과 사례을 볼 것입니다.

연결 결합 쌍 (양쪽이 바인딩 할 수 있음)으로 유효한 소켓조합 입니다.

  • PUB and SUB
  • REQ and REP
  • REQ and ROUTER
  • DEALER and REP
  • DEALER and ROUTER
  • DEALER and DEALER
  • ROUTER and ROUTER
  • PUSH and PULL
  • PAIR and PAIR

기타 다른 조합은 문서화되지 않았으며, 신뢰할 수 없는 결과를 초래 할 것이고, 당신이 이것을 시도할 경우 ØMQ 향후버전에서는 에러를 리턴 할 것입니다. 당신은 코드를 통해 한 소켓에서 읽고 다른 소켓에 쓰는 것과 같이 다른 소켓유형간에 통할 수 있게 할 수 있습니다.

High-level Messaging Patterns

top prev next

이들 4가지 패턴은 ØMQ에 정의되어 있습니다. 이것들은 핵심 C++ 라이브러리로 구현된 ØMQ의 일부이며, 모든 소매점에서 사용할 수 있도록 보장합니다.

상위에, 우리는 높은 수준의 패턴을 추가합니다. 우리는 ØMQ 위에 이러한 높은 수준의 패턴을 구축하고 우리의 응용 프로그램에서 사용하는 어떤 언어로 그들을 구현합니다. 이것은 핵심 라이브러리의 일부가 아니며, ØMQ패키지도 아니며, ØMQ 커뮤니티에 존재합니다.

이 가이드를 통한 목표 중 하나는 당신이 작고(정상적으로 메시지를 처리하는 법), 큰(신뢰할 수 있는 PUB-SUB 구조를 만드는 방법) high-level 패턴을 만드는데 도움이 되는 것입니다.

Working with Messages

top prev next

ØMQ 메시지는 메모리에 맞는 제로보다 큰 특정 크기의 BOLB입니다. 당신은 Google 프로토콜 버퍼, XDR, JSON, 또는 당신이 원하는 것을 사용해서 직렬화 할 것입니다. 이것은 이식성이 좋고 빠르게 데이터를 표현하는 것을 선택하는 것이 현명하지만, trade-offs에 따라 결정할 수도 있습니다.

메모리에서 ØMQ 메시지는 zmq_msg_t 구조 (언어에 따라 다름)로 표현합니다. 여기에서 C로 ØMQ 메시지를 사용하기 위한 기본 규칙은 다음과 같습니다. :

  • zmq_msg_t 개체를 만들고 사용합니다.
  • 메시지를 읽기위해 당신은 zmq_msg_init(3)을 사용하여 메시지를 초기화 한 후 zmq_recv(3).으로 메시지를 수신합니다.
  • 새로운 데이타로 메시지를 작성하려면 메시지를 만들고 필요한 크기의 데이터 블록를 할당하기 위해 zmq_msg_init_size(3) 사용합니다. 그런 다음 memcpy를 사용하여 데이터를 채우고, zmq_send(3)로 메시지를 전송합니다.
  • 메시지를 릴리즈하기 위해 zmq_msg_close(3)를 호출합니다. 이것은 참조를 끊으며, 결국 ØMQ는 메시지를 초기화 합니다.
  • 메시지 내용에 액세스하려면 zmq_msg_data(3). 를 사용합니다. 메시지에 포함된 데이터의 크기를 알려면 zmq_msg_init_size(3)를 사용합니다.
  • 설명 페이지를 읽지 않고, 여러분이 용도를 정확하게 알지 못한다면, zmq_msg_move(3), zmq_msg_copy(3), 또는 zmq_msg_init_size(3)을 사용하지 말기 바랍니다.

다음은 주의를 기울여 알고 있어야 하는 메시지 처리의 전형적인 코드 입니다. 이것은 우리가 모든 예제에서 사용하는 zhelpers.h 파일에 있습니다. :

// Receive 0MQ string from socket and convert into C string
static char *
s_recv (void *socket) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
int size = zmq_msg_size (&message);
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
zmq_msg_close (&message);
string [size] = 0;
return (string);
}

// Convert C string to 0MQ string and send to socket
static int
s_send (void *socket, char *string) {
int rc;
zmq_msg_t message;
zmq_msg_init_size (&message, strlen (string));
memcpy (zmq_msg_data (&message), string, strlen (string));
rc = zmq_send (socket, &message, 0);
assert (!rc);
zmq_msg_close (&message);
return (rc);
}

당신은 쉽게 임의 길이의 메시지를 보내고 받을 때 이 코드를 확장 할 수 있습니다.

zmq_send (3)으로 메시지를 전송할 때 ØMQ는 메시지를 clear(즉 사이즈를 zero로 설정)한다는 것에 주의하세요. 그래서 두번 같은 메시지를 보낼 수 없고, 메시지를 보낸 후 메시지 데이터를 액세스 할 수도 없습니다.

만약 한번 이상 같은 메시지를 보내려면, 두번째 메시지를 생성하고 zmq_msg_init(3)를 사용하여 초기화 하고 첫번째 메시지를 복사하기 위해 zmq_msg_copy(3) 을 사용하십시오. 이것은 데이터를 복사하지 않고 reference합니다. 그 다음 당신은 두번 메시지를 보낼 수 있습니다.(만약 더 많이 복사하면 더 할 수 있습니다.) 그리고 메시지는 마지막 복사본이 보내지고 종료될 때 결국은 초기화 됩니다.

또한, ØMQ는 단일메시지를 여러 개 모은 리스트로 조작하는 다중(multipart) 메시지를 지원합니다. 이것은 널리 실제 어플리케이션에서 사용되며, 우리는 본 장 후반부와 3장에서 보겠습니다.

일반적인 알고 있는 메시지와 차이점 :

  • ØMQ는 자동으로 메시지를 보내고 받습니다. 즉 전체 메시지를 가져오거나, 아니면 전혀 가져오지 않습니다.
  • ØMQ 어떤 불특정한 시간 이후에 바로 메시지를 보낼 수 있지만 그렇게 하지 않습니다.
  • 당신이 제로 길이 메시지를 보낼 수 있습니다, 예를 들면 한 스레드 에서 다른 쪽으로 신호를 보낼 때 사용합니다.
  • 메시지는 메모리용량 내에서 사용해야 합니다. 당신은 임의 크기의 파일을 보내려면, 당신은 조각으로 그것을 분할하고, 별도의 메시지로 각 조각들을 보내야 합니다.
  • 당신은 프로그램이 종료할 때 자동으로 객체를 파괴하지(destroy) 않는 언어일 경우 메시지처리를 완료 했을 때 zmq_msg_close(3) 를 호출해야 합니다.

그리고 반드시 반복적으로 zmq_msg_init_data (3)을 사용하지 마십시오. 이것은 초기화 하는 메소드이며 문제가 될 수 있습니다. 마이크로초 절약을 걱정하기 전에 ØMQ에 대해서 배울 중요한 것들이 많이 있습니다.

Handling Multiple Sockets

top prev next

지금까지 예제들 중에서 대부분 예제의 메인 루프에서 처리하는 것은 아래와 같습니다. :

  1. 소켓에서 메시지 대기
  2. 메시지 처리
  3. 반복

우리가 동시에 여러 소켓에서 읽고 싶다면? 가장 간단한 방법은 여러 endpoint를 한 소켓에 연결하고 ØMQ fan-in을 하는 것입니다. 원격 endpoint가 같은 패턴에 있다면 이것은 정상적이지만, PULL소켓을 PUB endpoint에 연결하는 것은 비정상적입니다. 당신이 혼합 패턴을 시작하면 확장성이 깨집니다.

올바른 방법은 zmq_poll(3) 사용하는 것입니다. 더 좋은 방법은 좋은 이벤트 중심의 reactor에 그것을 전환하는 프레임워크로 zmq_poll(3)을 감쌀 수도 있지만, 우리가 여기서 다루려고 하려는 것 보다 훨씬 더 많은 노력이 필요합니다.

이것은 non-blocking 읽기를 사용하는 것으로 두 소켓에서 읽기를 하는 간단한 예제입니다. 이것은 날씨를 변경하는 subscriber로써 2가지를 일을 하는 혼재된 프로그램 입니다. worker는 병렬처리 합니다. :


C++ | C# | Clojure | CL | Erlang | F# | Java | Lua | Objective-C | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haskell | Haxe | Node.js | ooc

이 방법의 문제는 첫 번째 메시지 (처리를 위해 메시지를 기다리지 않을 때, 루프의 끝에 sleep)에 대한 몇 가지 추가 지연이 있습니다. 이것은 하위 밀리초 지연이 필수적 입니다 어플리케이션에서 문제가 될 것입니다. 또한, nanosleep()나 루프를 돌리지 않고 할 수 있는 함수를 위해 매뉴얼을 확인할 필요가 있습니다.

당신은 오히려 우리가 이 예제에서 했던 것처럼 소켓들을 우선순위보다는 오히려 하나에서 첫번째, 두번째 읽는 것처럼 공정하게 소켓을 다룰 수 있습니다. 한 소켓이 많은 소스로부터 메시지를 받을 때 ØMQ가 자동적으로 처리하는 것을 “fair-queuing”라고 부릅니다.

zmq_poll(3)을 사용하는 잘 동작하는 작은 어플리케이션을 봅시다.


C++ | C# | Clojure | CL | Erlang | F# | Haskell | Java | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haxe | ooc

Handling Errors and ETERM

top prev next

ØMQ의 오류처리 철학은 fail-fast와 탄력성(resilience)을 혼합했습니다. 우리가 생각하는 프로세스는 내부오류에 가능한 약하고 외부 공격이나 에러에는 가능한 한 강해야 합니다. 비유하면, 내부오류를 감지하면 살아 있는 세포는 자동 소멸됩니다. 그렇지만, 모든 가능한 방법에 의한 외부로부터의 공격은 저항합니다. ØMQ의 주장은 강력한 코드가 뒷받침되어야 한다는 것입니다. 그것은 단지 세포 벽 오른쪽에 있어야 하며, 그러한 벽이 있어야 합니다. 만약 내부 혹은 외부 결함인지 분명하지 않다면, 설계에 문제가 있을 수 있습니다.

C에서는 에러가 발생하면 즉시 어플리케이션이 중지합니다. 다른 언어에서는 예외를 얻어오거나, 중단 할 수 있습니다.

ØMQ가 외부 결함을 감지하면 그것을 호출 코드에 오류를 반환합니다. 오류에서 복구에 대한 분명한 전략이 없는 드문 경우 그것은 자동으로 메시지를 폐기 합니다. 몇 가지 관점에서 ØMQ는 여전히 외부 결함에 대한 것이라 주장하지만, 고려되어야 할 버그들이 있습니다.

우리가 지금까지 지켜본 C 예제의 대부분에는 오류 처리가 없습니다. 실제 코드는 모든 단일 ØMQ 호출에서 오류 처리를 해야 합니다. 당신은 C 이외의 언어 바인딩을 사용하는 경우 바인딩은 당신을 위해 오류를 처리할 수 있습니다. C에서는 직접해야 합니다. POSIX 규칙으로 시작하는 몇 가지 간단한 규칙이 있습니다. :

  • 어플리케이션이 실패한 경우, 오브젝트가 NULL을 반환하게 하는 방법
  • 다른 방법은 성공한 경우 0 리턴하고, 특별한 조건 (일반적으로 실패)에 다른 값 (주로 -1) 을 반환합니다.
  • 오류 코드는 errno 또는 zmq_errno(3)로 제공됩니다.
  • 로깅을 위해 설명하는 오류 텍스트는 zmq_strerror(3)에 의해 제공됩니다.

치명적인 오류로 처리 하지 말아야 하는 두가지 주요 예외 상태가 있습니다. :

  • 스레드가 NOBLOCK 옵션으로 zmq_recv(3)을 호출하고 대기 데이터가없는 경우. ØMQ는 errno에 EAGAIN을 설정하고 -1 반환합니다.
  • 스레드가 zmq_term(3)를 호출하고, 다른 스레드는 작업을 기다리고 있을때, zmq_term(3)호출은 컨텍스트를 종료하고 모든 대기중인 호출은 errno를 ETERM으로 설정하고 -1로 종료합니다.

C에서 대부분의 경우 ØMQ호출에 대한 에러처리는 아래와 같습니다. :

void *context = zmq_init (1);
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc;
rc = zmq_bind (socket, "tcp://*:5555");
assert (rc == 0);

이 코드의 첫 번째 버전에서는 assert()를 넣었습니다. 최적화 빌드가 assert()를 null로 바꾸기 때문에 좋은 생각이 아니었습니다.

정상적으로 프로세스를 종료하는 방법을 보겠습니다. 우리는 이전 섹션에서 병렬 파이프라인 예제를 가져 올 것입니다. 우리가 백그라운드에서 전체 worker가 시작하고, 배치가 완료되면 작업들을 종료하고 싶을 것입니다. Worker에 종료메시지를 보내 봅시다. 이것을 하기에 가장 좋은 위치는 sink입니다. 이것은 배치가 끝난 시점을 알고 있기 때문입니다.

어떻게 sink에 worker를 연결합니까? PUSH/PULL소켓은 단지 단방향(one-way)입니다. 표준 ØMQ의 답변 : 당신이 해결해야 할 문제의 각 유형을 위해 새로운 소켓흐름을 만들어라. 우리는 worker에게 종료 메시지를 보내기 위해 publish-subscribe모델을 사용할 것입니다. :

  • sink는 새 endpoint에 PUB소켓을 만듭니다.
  • worker는 이 endpoint에 그들의 입력소켓을 바인딩 합니다.
  • sink가 배치의 끝 부분을 감지하면 만든 PUB소켓에 종료메시지를 보냅니다.
  • Worker가 이 종료메시지를 감지하면 종료합니다.

sink에 많은 새로운 코드가 필요하지는 않습니다.:

void *control = zmq_socket (context, ZMQ_PUB);
zmq_bind (control, "tcp://*:5559");

// Send kill signal to workers
zmq_msg_init_data (&message, "KILL", 5);
zmq_send (control, &message, 0);
zmq_msg_close (&message);

fig14.png

여기에는 우리가 이전에 본 zmq_poll(3) 기술을 사용하여 두 소켓을 (PULL소켓이 작업을 얻어오고 SUB소켓은 제어명령어를 가져옵니다.) 관리하는 작업자 프로세스가 있습니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Objective-C | Perl | PHP | Python | Ruby | Ada | Basic | Go | Haskell | Node.js | ooc | Scala

여기 수정된 sink어플리케이션이 있습니다. 그 결과수집이 완료되면 그것은 모든 노동자에게 KILL 메시지를 전송합니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Lua | Objective-C | Perl | PHP | Python | Ruby | Ada | Basic | Go | Haskell | Java | Node.js | ooc | Scala

Handling Interrupt Signals

top prev next

Ctrl-C또는 SIGNTERM와 같은 제어신호가 발생했을 때 어플리케이션을 정상적으로 종료해야 합니다. 기본적으로, 이들은 단순히 프로세스를 죽이기만 하고, 메시지를 flush하지 않으며, 파일을 정상적으로 종료하지도 않습니다.

아래는 우리가 다양한 언어에서 신호를 처리하는 방법입니다. :


Erlang | Haxe | Lua | Python | Ruby | Ada | Basic | C++ | C# | Clojure | CL | F# | Go | Haskell | Java | Node.js | Objective-C | ooc | Perl | PHP | Scala

이 프로그램은 Ctrl-C (SIGINT)와 SIGTERM을 감지하기 위해 s_catch_signals ()을 제공합니다. 이러한 신호가 도착하면 s_catch_signals ()는 전역 변수 s_interrupted을 설정합니다. 응용 프로그램이 자동으로 죽지는 않을 것이므로, 당신은 지금 명시적으로 인터럽트를 확인하고 적절하게 처리해야 합니다. 방법은 다음과 같습니다. :

  • 당신의 메인코드의 시작에 s_catch_signals ()(interrupt.c에서 복사) 를 호출합니다. 이것은 신호 조작을 설정합니다.
  • 당신의 코드가 zmq_recv(3), zmq_poll(3) 또는 zmq_send(3)에서 차단하는 경우 신호가 도착하면, 호출은 EINTR로 리턴합니다.
  • 만약 그것이 인터럽트 된다면 NULL을 리턴하는 s_recv()와 같이 처리합니다.
  • 그래서, 당신의 어플리케이션은 EINTR반환코드, NULL리턴, 또는 s_interrupted를 체크합니다.

다음은 전형적인 코드의 일부입니다. :

s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {
    char *message = s_recv (client);
    if (!message)
        break;          //  Ctrl-C used
}
zmq_close (client);

당신이 s_catch_signals ()를 호출하고 인터럽트에 대한 테스트를 하지 않으면, 당신의 어플리케이션은 Ctrl - C와 SIGTERM를 제어하지 않을 것입니다. 이것은 유용할 수 있지만, 일반적이지 않습니다.

Detecting Memory Leaks

top prev next

모든 장기(long-running) 실행 응용 프로그램이 정상적으로 메모리를 관리하거나, 혹은, 결국 그것이 가능한 모든 메모리를 사용하고 에러가 발생될 것입니다. 자동으로 이것을 처리하는 언어를 사용하는 경우에는 행복하겠지만, 만약 C나 C++이나 메모리 관리의 책임이 있는 다른 언어로 개발을 한다면, 당신의 프로그램이 갖고 있는 누수(leak)를 찾아 주는 valgrind를 사용하는 짧은 tutorial을 참고 하시기 바랍니다.

  • Ubuntu 이나 Debian에서 valgrind 설치하기 위해: sudo apt-get install valgrind.
  • 기본적으로 ØMQ는 valgrind을 사용하게 되면 많은 경고가 발생합니다. 이러한 경고를 제거하려면, ZMQ_MAKE_VALGRIND_HAPPY를 매크로에 추가해서 ØMQ를 다시 빌드해야 합니다. 즉
$ cd zeromq2
$ export CPPFLAGS=-DZMQ_MAKE_VALGRIND_HAPPY
$ ./configure
$ make clean; make
$ sudo make install
  • Ctrl-C후 어플리케이션이 정상적으로 종료되게 수정해야 합니다. 저절로 종료 처리하는 어플리케이션에서는 필요하지 않지만, 장기 어플리케이션(like devices)은 이것이 필수적입니다. 그렇지 않다면 valgrind는 모든 현재 할당된 메모리에 대한 문제를 보여 줄 것입니다.
  • 만약 기본 설정이 아니라면, ‘-DDEBUG’로 응용 프로그램을 빌드합니다. 그러면 valgrids는 메모리 누수가 있는 지점을 정확하게 알려 줄 수 있습니다.
  • 마지막으로, valgrind을 실행합니다. :
valgrind --tool=memcheck --leak-check=full someprog

그리고 그것이 보고한 오류를 수정한 다음, 당신은 정상적인 메시지를 얻을 수 있습니다. :

==30536== ERROR SUMMARY: 0 errors from 0 contexts...

Multipart Messages

top prev next

ØMQ은 '다중 메시지(multipart messages)'를 제공하며 여러 프레임 메시지를 작성할 수 있습니다. 보통 어플리케이션은 특히 "봉투(envelop)"를 만들기 위해 어렵게 다중 메시지를 사용합니다. 지금 우리가 배울 것은 우리가 작성한 devices는 다중메시지를 사용하는 어플리케이션이 아니지만, 어떻게 안전하게 다중 메시지를 쓰고 읽느냐는 것입니다.

당신은 여러 부분 메시지로 작업할 때, 각 부분은 zmq_msg 으로 이루어 집니다. 예를 들어, 당신은 다섯 부분으로 메시지를 보내는 경우, 당신은 다섯 zmq_msg 항목을 만들고, 전송하고 파괴 해야 합니다. 당신은 사전에 이 작업을 수행할 수 있으며(zmq_msg 항목을 배열이나 구조체에 저장합니다), 또는 당신은 하나씩 보낼 수 있습니다.

이것은 우리가 다중 메시지에서 (우리는 각 프레임에 한 메시지 개체를 받습니다.) 프레임을 전송하는 방법입니다. :

zmq_send (socket, &message, ZMQ_SNDMORE);

zmq_send (socket, &message, ZMQ_SNDMORE);

zmq_send (socket, &message, 0);

이것은 single part나 multipart로 되어 있는 메시지를 어떻게 받고 처리하는지에 대한 것입니다. :

while (1) {
zmq_msg_t message;
zmq_msg_init (&message);
zmq_recv (socket, &message, 0);
// Process the message part
zmq_msg_close (&message);
int64_t more;
size_t more_size = sizeof (more);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (!more)
break; // Last message part
}

다중 메시지에 대해 알아야 할 몇가지 내용 :

  • 당신이 다중 메시지를 보낼 때, 한번에 보냅니다.(첫 번째부터 마지막 부분까지)
  • 당신이 zmq_poll(3)를 사용하는 경우 메시지의 첫 부분을 받을 때, 나머지도 도착합니다.
  • 당신은 메시지의 전체부분을 받거나 아무것도 받지 못할 것입니다.
  • 메시지의 각 부분은 구분된 zmq_msg 항목입니다.
  • 당신은 RCVMORE 옵션을 선택 여부에 상관없이 메시지의 모든 부분을 받게 됩니다.
  • 메시지를 보내자 마자 ØMQ는 마지막 메시지를 받을 때까지 메시지를 대기열에 넣고 한번에 모두 보낸다.
  • 소켓을 닫는 것을 제외하고 부분적으로 보낸 메시지를 취소할 수 있는 방법은 없습니다.

Intermediates and Devices

top prev next

어떤 연결된 장치는 장치 회원 증가에 따라 복잡한 곡선을 그립니다. 회원이 적을 때는 서로에 대해 알 수 있지만 장치가 커짐으로써, 모든 다른 흥미로운 회원을 알려고 하는 각각 회원의 비용은 선형적으로 증가하고, 연결 회원의 전체 비용은 factorially하게 커집니다. 솔루션은 더 작은 것들로 집합을 만들고, 집합을 연결하는 중계자를 만듭니다.

이 패턴은 현실 세계에서 매우 일반적이며, 우리 사회와 경제가 큰 네트워크의 복잡성과 크기 조정 비용을 절감하기 보다 다른 실제 기능이 없는 중개인으로 가득 차있는 이유입니다. 중개인은 일반적으로 도매업자, 유통, 관리자 등으로 불립니다.

이와 같이 ØMQ 네트워크는 필요한 중개인 없이 특정 크기 이상 성장할 수 없습니다. ØMQ에서, 우리는 이것을 "devices"라고 부릅니다. 우리가 ØMQ을 사용 할때, 우리는 일반적으로 중개인 없이 서로 얘기할 수 있는 노드로 된 네트워크, 노드의 집합으로 우리의 어플리케이션을 구축하기 시작합니다. :

fig15.png

그리고 우리는 특정 장소에 장치를 배치하고 노드의 수를 최대 확장하여, 더 넓은 네트워크를 통해 응용 프로그램을 확장할 수 있습니다. :

fig16.png

ØMQ 장치는 엄격한 설계 규칙은 없습니다, 하지만 일반적으로 '백엔드'소켓 세트에 '프론트 엔드'소켓 세트를 연결합니다. 이것은 이상적으로 상태 없이(no state) 동작합니다. 그래서 필요로 하는 많은 중계로 어플리케이션을 확장하는 것이 가능하게 됩니다. 당신은 프로세스 내에서 스레드, 또는 독립 실행형 프로세스로써 이것을 실행할 수 있습니다. ØMQ는 몇몇 매우 기본적인 devices를 제공하지만 당신은 실제로 자신을 개발하는 것입니다.

ØMQ devices는 주소, 서비스, 큐 혹은 메시지와 소켓 레이어 상에서 정의할 수 있는 어떤 다른 추상적인 것의 중계를 할 수 있습니다. 다른 메시징 패턴은 다른 복잡한 문제를 가지며, 여러 종류의 중개가 더 필요합니다. 예를 들면, request-reply는 대기열과 서비스 추상화와 잘 작동하고, publish-subscribe는 stream이나 topics과 잘 작동합니다.

어떤 전통적인 중앙 중개인에 비해 ØMQ에 대한 흥미로운 건, 당신이 필요로 하는 곳에 정확하게 device를 게재할 수 있다는 것입니다, 이것은 최적의 중개를 할 수 있습니다.

A Publish-Subscribe Proxy Server

top prev next

이것은 하나 이상의 네트워크 세그먼트 또는 전송을 통해 publish-subscribe아키텍처를 확장하기 위한 일반적인 요구 사항입니다. 아마, 원격시스템에 설정한 subscribers가 있다면, 우리는 멀티캐스트를 통해 지역 subscribers, 혹은 TCP를 통해 원격 subscribers에게 전파하기를 원할 것입니다.

우리는 두 네트워크를 연결하는, publisher와 subscribers 집합 사이에 설정된 간단한 프록시 서버를 쓰는 것입니다. 이것은 아마도 유용한 장치의 간단한 경우입니다. 이 장치는 두 소켓을 가집니다. 날씨서버가 있는 내부 네트워크의 frontend와 외부 네트워크에 subscriber가 있는 backend입니다. 이것은 frontend소켓에서 날씨서비스를 subscribe하고 backend소켓에 그 데이터를 republish합니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Ada | Basic | Go | Node.js | Objective-C | ooc | Scala

이것은 publisher에게 subscriber역할을 하고 subscriber에게 publisher역할을 하기 때문에 우리는 이것을 proxy라고 부릅니다. 이것은 당신이 그것에 영향을 주지 않는 기존 네트워크에 장치를 끼워 넣을 수 있다는 것을 의미 합니다.(물론 새로운 subscribers는 proxy와 통신하기 위해 알 필요가 있습니다.)

fig17.png

이 어플리케이션은 다중안정(multipart safe) 합니다. 이것은 정확하게 다중 메시지를 감지하고 그것을 읽고 그것을 보냅니다. 우리가 보내는 다중 데이터에 SNDMORE 옵션을 설정하지 않은 경우 최종 수신자가 손상된 메시지를 받을 수 있습니다. 그것이 스위치한 데이터가 손상될 수 있는 리스크를 없애기 위해 당신은 항상 당신의 장치를 다중안전하게 만들어야 합니다.

A Request-Reply Broker

top prev next

ØMQ에 작은 메시지 대기열 브로커를 작성하여 scale의 문제를 해결하는 방법을 알아 봅시다. 우리는 이 경우를 위해 request-reply패턴을 보겠습니다.

Hello World client-server어플리케이션에서 우리는 하나의 서비스와 통신하는 하나의 클라이언트가 있습니다. 그러나 실제의 경우 우리는 일반적으로 여러 서비스뿐만 아니라 여러 클라이언트를 허용해야 합니다. 이것은 우리가 서비스의 능력을 크게 할 필요가 (단지 하나보다는 여러 스레드, 프로세스, 서버) 있습니다. 유일한 제약 조건은 서비스는 무상태(stateless)여야 하며, 모든 상태(state)는 요청이나 데이터베이스 같은 공유 저장소에 존재합니다.

여러 서버에 여러 클라이언트를 연결하는 방법은 두 가지가 있습니다. 이 Brute-force방법은 여러 서비스 끝점에 각 클라이언트 소켓을 연결하는 것입니다. 하나의 클라이언트 소켓은 여러 서비스 소켓에 연결할 수 있고, 요청이 서비스간에 로드밸런스 됩니다. 자, 당신이 세 서비스 끝점에 대한 클라이언트 소켓연결 A, B, C가 있고, 클라이언트는 요청 R1, R2, R3, R4가 있습니다. R1과 R4는 서비스A, R2는 B로 이동하고 R3은 서비스 C로 이동합니다.

fig18.png

이 디자인은 적은 비용으로 더 많은 고객을 추가할 수 있습니다. 당신은 또한 더 많은 서비스를 추가할 수 있습니다. 각 클라이언트는 서비스 요청을 로드밸런스 합니다. 그러나 각각의 클라이언트는 서비스 토폴로지를 알고 있습니다. 당신이 100개 클라이언트를 가지고 있고, 3개 서비스를 추가하려는 경우, 당신은 재구성이 필요하고 3개 새로운 서비스에 대해 알고 있으며, 클라이언트을 위해 100개 클라이언트를 다시 시작합니다.

그것은 분명히 우리의 슈퍼 컴퓨팅 클러스터 리소스가 부족하면 오전 3시에서 일을 하려던 일을 못 합니다. 그래서 우리는 필사적으로 새로운 서비스 노드 수백개를 추가 해야 할 필요가 있습니다. 너무 많은 조작은 액상 콘크리트와 같습니다.:지식은 분산되어 있고 당신이 가지고 있는 많은 안정적인 조작과 노력은 토폴로지를 변경하는 것입니다. 우리가 원하는 것은 토폴로지의 모든 지식을 집중하여 클라이언트와 서버 사이에 두는 것입니다. 이상적으로, 우리는 토폴로지의 다른 부분을 건드리지 않고도 언제든지 서비스 또는 클라이언트를 추가하고 제거할 수 있습니다.

그래서 우리는 이 유연성을 제공하는 작은 메시지 대기열 브로커를 작성합니다. 브로커는 두 종점, 클라이언트을 위한 프런트 엔드 및 서비스에 대한 백엔드에 바인딩합니다. 그런 다음 활성화를 위해 두 소켓을 모니터링 하는 zmq_poll(3)사용하고 두 소켓 사이에 메시지가 돌아 다닙니다. 사실은 명시적으로 모든 대기열을 관리하지 않습니다 ? ØMQ는 각 소켓에서 자동으로 처리합니다.

당신이 REP와 대화하기 위해 REQ를 사용할 때 당신은 엄격한 synchronous request-reply 결과를 얻습니다. 클라이언트는 요청을 보내고 서비스는 요청을 읽고 응답을 보냅니다. 그 다음 클라이언트는 응답을 읽습니다. 만약 클라이언트나 서비스가 뭔가를 (예를 들어 응답을 기다리지 않고 연속 두 요청을 보내는)하려고 하면 에러가 발생합니다.

그러나 브러커는 non-blocking을 가집니다. 분명히 우리는 두 소켓의 활동을 기다리는 zmq_poll(3)를 사용할 수 있지만, REP와 REQ는 사용할 수 없습니다.

다행히 non-blocking request-response은 허락하는 DEALER과 ROUTER라고 하는 두 소켓이 있습니다. 이러한 소켓은 XREQ 및 XREP를 호출하는 데 사용되고, 예전 코드에서 이러한 이름을 볼 수 있습니다. 옛 이름은 XREQ은 " extended REQ"이고 XREP은 " extended REP"이라고 하지만 그건 정확하지 않습니다. 3장에서는 어떻게 DEALER과 ROUTER소켓이 비동기 request-reply 흐름의 모든 종류를 처리하는지 볼 것입니다.

지금, 우리는 단지 어떻게 DEALER와 ROUTER가 장치(즉 우리의 작은 브로커)를 통해 extend REQ-REP를 처리하는가를 볼 것입니다.

이것은 REQ가 ROUTER와 통신하고 DEALER가 REP와 통신하는 간단히 확장된 request-reply패턴입니다. 이 DEALER와 ROUTER사이에서 우리는 한 소켓에서 메시지를 가져오고 다른 쪽에서 그것을 밀어주는 코드(브로커 같은)를 가져야 합니다. :

fig19.png

Request-reply 브로커는 두개의 종점을 바인딩하며, 하나는 클라이언트를 위해 frontend socket에 연결하고 다른 한 개는 서비스를 위해 backend socket에 연결합니다. 이 브로커를 테스트 하기 위해 우리는 그것이 backend소켓에 연결할 수 있도록 서비스를 변경하려고 합니다. 이것의 의미가 무엇인지 보여주는 client와 service입니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Node.js | Objective-C | ooc

여기 서비스는 다음과 같습니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Node.js | Objective-C | ooc

그리고 여기 브로커가 있습니다. 당신은 다중안전(multipart safe)을 확인 할 수 있을 것 입니다:


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haskell | Node.js | Objective-C | ooc

Request-reply브로커를 사용하면 client는 service를 직접 볼 수 없고, service또한 client를 볼 수 없기 때문에 client-server 구조를 쉽게 만들 수 있습니다. 단지 안정한 노드는 중간에 있는 장치(device)입니다. :

fig20.png

Built-in Devices

top prev next

대부분의 고급 사용자가 자신의 장치를 작성하지만 ØMQ는 몇 가지 기본 장치를 제공합니다. Built-in장치는 다음과 같습니다

  • QUEUE. request-reply 브로커와 같습니다.
  • FORWARDER. pub-sub 프록시 서버와 같습니다.
  • STREAMER. pipeline 흐름을 제외하고 FORWARDER와 같습니다.

장치를 시작하려면, 당신은 zmq_device(3)을 호출하며, 이것은 두 소켓, 프론트엔드를 위해 하나, 백엔드를 위해 하나를 통과(pass)합니다. :

zmq_device (ZMQ_QUEUE, frontend, backend);

QUEUE를 시작하면 장치가 그 시점에 당신의 코드에 request-reply 브로커의 본체에 연결하는 것과 같습니다. 당신은 zmq_device(3).을 호출하기 전에 소켓을 만들어야 하고 그것에 바인드나 연결을 한 후 가능한 설정을 해야 합니다. 이것은 간단한 작업입니다. 이것은 QUEUE를 호출하기 위해 재 작성한 request-reply입니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Ada | Basic | Go | Node.js | Objective-C | ooc | Scala

위의 예제에는 보이지 않지만, built-in devices에는 적절한 에러처리가 되어 있습니다. Devices를 시작하기 전에 당신이 필요로 하는 소켓구성을 할 수 있기 때문에 built-in devices를 사용할 가치가 있습니다.

만약 당신이 대부분의 ØMQ 사용자와 같다면, 이 상황에서 당신의 마음은 “만약 devices에 임의의 소켓 유형을 꽂을 수 있다면 어떤 사악한 물건 같은 것을 만들 수 있을 텐데!”를 생각하기 시작할 것입니다. 짧게 말해서 : 그렇게 하지 말아라, 당신이 혼합된 소켓유형을 만들 수 있지만, 결과는 기괴하게 될 것입니다. 그래서 queue장치를 위해서는 ROUTER/DEALER, forwarder를 위해서는 SUB/PUB 그리고 streamer를 위해서는 PULL/PUSH를 사용하도록 되어 있습니다.

당신이 다른 조합이 필요하기 시작하면 자신의 장치를 작성할 때가 된 것입니다.

Multithreading with ØMQ

top prev next

ØMQ는 아마도 멀티 스레드(MT) 어플리케이션을 작성하는 가장 좋은 방법입니다. 당신이 전통적인 소켓을 사용하는 경우 ØMQ 소켓은 몇몇 재조정이 필요한 반면 ØMQ 멀티스레딩은 당신이 MT 응용 프로그램에 대해 아는대로 다 됩니다.

완전히 완벽한 MT 프로그램을 (그리고 그 말 그대로)하기 위해서 우리는 mutexes, locks, 또는 ØMQ 소켓을 통해 보내는 메시지를 제외하고는 스레드 간 통신의 다른 형태는 필요하지 않습니다.

"완벽한" MT 프로그램이라는 것은 작성과 이해가 쉽고, 어떤 언어/OS에서 한 기술로 동작이 되고, 제로 대기상태(zero wait states) 및 결과의 체감 없이 CPU의 수로 규모산정이 되는 것을 의미 합니다.

만약 당신이 locks, semaphores와 중요한 섹션을 혼자서 빠르게 당신의 MT 코드를 만드는데 몇 년이 소요되는데, 아무 것도 없이 그것이 실현된 다면 당신은 화가 날 것입니다. 만약 우리가 동시성 프로그램(단지, 상태를 공유하지 않는다.)에 30년 이상을 배워야 하는 강의코스가 있다면, 이것은 맥주를 공유하려고 하는 두 술주정꾼과 같습니다. 그들이 좋은 친구인지는 그리 중요하지 않습니다. 조만간 그들은 싸움을 하게 될 겁니다. 그리고 거리에 술수정꾼이 많아질수록 맥주를 통한 싸움은 더 많아 질 것입니다. MT 응용 프로그램의 비극 대부분은 술집 싸움과 같습니다.

이상한 문제리스트를 당신은 직접적인 스트레스나 리스크로 이해하지 않지만 전통적인 shared-state MT코드를 작성함으로써, 고통속에서 갑자기 에러를 발생시킬 것 같은 코드와 싸워야 합니다. 여기에 버그 코드(forgotten synchronization, incorrect granularity, read and write tearing, lock-free reordering, lock convoys, two-step dance, and priority inversion)에 있어서 세계적인 경험을 가지고 있는 큰 회사가 제시한 “당신의 스레드 코드에 11가지 잠재적 문제점”의 목록이 있습니다.

우리는 11개가 아닌 7개가 있습니다. 이것이 요점은 아닙니다. 요점은 바쁜 수요일 오후 3시에 두 단계 잠금 convoys를 시작하려는 전력망이나 주식시장을 운영할 코드를 실제 원하는가? 입니다. 누가 실제 의미하는 용어가 무엇인지 관여 하겠습니까. 이것은 더 복잡한 부작용과 더 복잡한 해킹과 싸우는 프로그램으로 바꾸는 것은 아닙니다.

수십억 달러 산업의 기초임에도 불구하고 몇가지 널리 사용되는 은유는 기본적으로 고장(broken)이며, 공유 상태 동시성(shared state concurrency)은 그중 하나 입니다. 제한 없이 사용하고 싶은 코드는 오류 프로그램 일부만 보이는 것 외에 공유하지 않고 메시지만 보내는 인터넷과 같습니다.

당신은 ØMQ로 만족할 만한 멀티 스레드 코드를 작성하기 위해 몇 가지 규칙을 따라야 합니다 :

  • 당신은 여러 스레드에서 같은 데이터를 액세스할 수 없습니다. mutexes 같은 고전 MT 기법을 사용하는 것은 ØMQ 응용 프로그램에서 anti-pattern입니다. 이것에 유일한 예외는 threadsafe한 ØMQ 컨텍스트 객체입니다.
  • 당신은 당신의 프로세스에서 ØMQ 컨텍스트를 생성하고 inproc 소켓을 통해 연결하려는 모든 스레드에게 전달해야 합니다.
  • 당신은 자신의 컨텍스트와 함께 별도의 작업으로 스레드를 취급해도 되지만, 이 스레드는 inproc을 통해 통신을 할 수 없습니다. 그러나 그것은 나중에 독립(standalone) 프로세스에 침투하기 쉽습니다.
  • 당신은 쓰레드간에 ØMQ 소켓을 공유하지 않아야 합니다. ØMQ 소켓은 threadsafe하지 않습니다. 기술적으로 그렇게 하는 것이 가능하지만 그것은 세마포, 잠금, 또는 mutexes을 요구합니다. 이것은 응용 프로그램이 느리고 약하게 합니다. 이 스레드간에 소켓을 공유하는 유일한 방법은 소켓에 대한 가비지 수집과 같은 기능을 제공하는 언어 바인딩에 있습니다.

당신은 응용 프로그램에서 하나 이상의 장치를 시작할 필요가 있다면 예를 들어, 당신은 자신의 스레드에 각각 실행하는 것이 좋습니다. 그것은 하나의 스레드에 있는 장치 소켓을 만들면 오류를 확인하기 쉽습니다. 그리고 다른 스레드에 있는 장치에 소켓을 통과 합니다. 이것은 작동하는 것처럼 보일 수 있지만 무작위로 실패합니다. 주의 사항 : 이것을 만든 스레드를 제외하고는 소켓을 사용하거나 닫지 마십시오.

당신이 이 규칙을 따른다면 당신이 필요로 할 때, 당신은 아주 쉽게, 별도의 프로세스로 스레드를 분리하실 수 있습니다. 어플리케이션 로직은 규모에 상관없이 스레드, 프로세스, 서버에 있습니다.

ØMQ는 가상의 ‘green’ 스레드보다는 기본 OS 스레드를 사용합니다. 이것의 장점은 당신이 새로운 스레딩 API를 배울 필요가 없다는 것입니다, 그리고 ØMQ 스레드는 운영 체제에 완전하게 연결됩니다. 당신의 어플리케이션이 무엇을 하는지 보기 위해 인텔의 ThreadChecker와 같은 표준 툴을 사용할 수 있습니다. 단점은 코드가, 예를 들어 그것이 새로운 스레드를 시작할 때, 이식성이 좋지 않다는 것입니다. 그리고 당신이 많은 스레드를 실행시키면 일부 운영 체제는 부하를 받을 것이다.

실제로 어떻게 처리되는지 봅시다. 우리는 기존 Hello World 서버에 몇가지 기능을 추가할 것입니다. 기존 서버는 단일 스레드 입니다. 만약 요청마다 작업이 느려도 괜찮습니다.: 단일 ØMQ 스레드는 많은 작업을 수행하는데 wait없이 단일 CPU에서 최대 속도로 실행할 수 있습니다. 그러나 현실적으로 서버는 요청에 따라 중요한 작업을 해야 합니다. 10,000개의 클라이언트가 모두 한번에 서버에 접속할 때 한개의 코어로는 충분하지 않을 수 있습니다. 그래서 현실적인 서버는 여러 개의 작업자 스레드를 시작합니다. 그런 다음 그것을 가능한 한 빨리 요청을 수용하고, 작업자 스레드에 이들을 배포합니다. 작업자 스레드는 작업을 통해 분쇄하고, 결국 다시 그들의 답장을 보냅니다.

당신은 물론 대기열 장치 및 외부 작업자 프로세스를 사용하여 이 모든 것을 할 수 있지만, 보통 한 코어에 16개 프로세스보다 16개 코어에 한 프로세스로 시작하는 것이 더 쉽습니다. 또한, 스레드로 worker를 실행하면 네트워크 홉 (Hop), 지연 시간 및 네트워크 트래픽이 없습니다.

기본적으로 Hello World 서비스의 MT버전은 단일 프로세스의 queue 장치와 worker로 구성됩니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haskell | Haxe | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Java | Node.js | Objective-C | ooc

어떻게 작동하는지 모든 코드를 읽을 수 있어야 합니다. :

  • 서버는 작업자 스레드들을 시작합니다. 각 작업자 스레드는 REP 소켓을 생성하고 이 소켓에 대한 요청을 처리합니다. 작업자 스레드는 단일 스레드 서버와 같습니다. 유일한 차이는 전송매체 (TCP대신 inproc)와 bind-connect direction입니다.
  • 서버가 클라이언트에 연결하기 위해 ROUTER소켓을 생성하고 외부 인터페이스 (TCP상) 자체에 이것을 바인딩합니다.
  • 서버는 worker와 연결하기 위해 DEALER를 생성하고, 내부 인터페이스 (inproc 상) 자체에 이것을 바인딩합니다.
  • 서버는 두 개의 소켓에 연결된 queue 장치를 시작합니다. 대기열 장치는 들어오는 요청에 대해 하나의 queue을 유지하고, workers에게 분배합니다. 그것은 또한 다시 그것의 회신을 라우팅을 합니다.

생성한 스레드는 대부분의 프로그래밍 언어로 이식되지 않습니다. POSIX 라이브러리는 pthreads이지만, 윈도우에서 당신은 다른 API를 사용해야 합니다. Portable API에서 이것을 포장(wrap)하는 방법은 3장에서 볼 것입니다.

여기 ‘work’는 단지 1초 정지됩니다. 우리는 다른 노드에 연결하는 것을 포함하여, worker에 어떤 것을 할 수 있습니다. 이것은 MT 서버가 ØMQ 소켓과 노드의 관점에서 비슷하게 보인다는 것입니다. 어떻게 Request-reply 연결이 REQ-ROUTER-queue-DEALER-REP로 되는지 주의해서 보시기 바랍니다. :

fig21.png

Signaling between Threads

top prev next

당신이 ØMQ로 멀티 스레드 응용 프로그램을 만들기 시작하면, 당신은 스레드를 조정하는 방법에 대한 질문을 던질 것입니다. 당신이 'sheep'문장을 삽입하려 하고, 또는 세마포 또는 mutexes와 같은 멀티 스레딩 기술을 사용하려고 시도 할 수 있지만, 당신이 사용해야 하는 유일한 메커니즘은 ØMQ 메시지입니다. Drunkards의 이야기와 Beer Bottle을 기억하십시오.

아래는 준비가 되었을 때 각각 다른 신호를 보내는 3개 스레드를 보여주는 간단한 예입니다.

fig22.png

이 예제에서 우리는 inproc 전송매체를 통해 PAIR소켓을 사용합니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Ada | Basic | Go | Node.js | Objective-C | ooc | Scala

이것은 ØMQ에서 멀티 스레딩에 대한 고전적인 패턴입니다. :

  1. 두 스레드는 공유 컨텍스트를 사용하여 inproc를 통해 통신합니다.

#부모 스레드가 inproc:// endpoint//에 바인딩한 하나의 소켓을 생성하고 그것에 컨텍스트를 전달하는 자식 스레드를 시작합니다.

  1. 자식 스레드가 inproc:// endpoint//에 연결하는 두 번째 소켓을 만들고 준비된 부모 스레드에 신호를 보냅니다.

이 패턴을 사용하는 멀티 스레딩 코드는 http://zguide.zeromq.org//프로세스로 확장되지 않는 것//**에 유의하시기 바랍니다. 당신이 inproc 및 소켓 쌍을 사용한다면, 당신은 밀접하게 바인딩된 응용 프로그램을 구축하고 있는 것입니다. 낮은 지연 시간이 정말 중요할 때 이 작업을 수행합니다. 모든 정상적인 어플리케이션을 위해 스레드 마다 하나의 컨텍스트를 사용하고 ipctcp를 사용합니다. 그러면 당신은 쉽게 필요에 따라, 별도의 프로세스, 또는 서버로 떼어내어 당신의 스레드를 분리할 수 있습니다.

여기서 우리가 PAIR소켓을 사용하는 예제를 보는 것은 처음입니다. 왜 PAIR을 사용합니까? 다른 소켓 조합도 작동하는 것으로 보일지 모르지만, 이것은 신호를 연계하는데 부작용을 가지고 있습니다. :

  • 당신은 sender를 위해 PUSH를 사용하고 receiver를 위해 PULL을 사용 할 수 있습니다. 이것은 간단하게 작동되지만, PUSH는 가능한 모든 receiver에 메시지를 로드밸런스 한다는 것을 기억하기 바랍니다. 만약 당신이 우연히 두개의 receivers를(예, 당신은 이미 한 개가 실행되어 있고, 두번째를 실행합니다.) 실행한다면 신호의 절반을 잃게 됩니다. PAIR는 하나 이상의 연결을 거부하는 장점이 있으며, 두 개가 독점을 합니다.

? 당신은 sender로 PUB, receiver로 SUB를 사용할 수 있습니다. PUB은 PUSH나 DEALER처럼 로드밸런스를 하지 않고 보내려는 메시지를 정확하게 보낼 것입니다. 그러나 당신은 빈 subscription으로 subscriber를 구성해야 합니다. 단점은, PUB-SUB 연결의 신뢰성은 시간에 달려 있고 PUB소켓이 메시지를 보내는 동안 SUB소켓이 연결중에 있다면 메시지는 유실 됩니다.

위와 같은 이유로, PAIR소켓은 한 쌍의 스레드 사이의 조화을 위한 최고의 선택입니다.

Node Coordination

top prev next

당신이 노드를 조정하고자 할 때, PAIR소켓은 더 이상 제대로 작동하지 않습니다. 이것은 스레드와 노드에 대한 전략이 다르기 때문에 발생되는 원인 중 하나입니다. 주로 노드는 스레드가 stable한 반면 오고 갑니다. 원격 노드가 사라지거나 다시 온다면 PAIR 소켓은 자동으로 다시 연결하지 않습니다.

스레드와 노드 사이의 두 번째 큰 차이점은 일반적으로 스레드는 고정 된 수를 가지고 있고, 노드는 다양한 수를 가집니다. 이전 시나리오 중 하나를 보면 (날씨 서버와 클라이언트) subscriber가 시작할 때 데이터 유실을 막기 위해 노드를 조정했습니다.

아래는 어플리케이션이 어떻게 작동하는지 보여 줍니다. :

  • Publisher는 subscriber가 얼마나 많은지 미리 알고 있습니다. 이것은 어딘가에서 얻은 마법의 숫자입니다.
  • Publisher는 시작된 후 모든 subscriber 연결되기를 기다립니다. 이것은 노드 조정 부분입니다. 각 subscriber는 subscribe하고 다음 소켓을 통해 연결된 publisher와 통신합니다.
  • Publisher는 모든 subscriber가 연결된 경우, 데이터를 게시(publish)하기 시작합니다.

이 경우에 우리는 subscriber와 publisher를 동기화 하기 위해 REQ-REP소켓을 사용합니다. 여기 publisher는 다음과 같습니다:


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Ada | Basic | Go | Haskell | Objective-C | ooc | Scala
fig23.png

이것은 subscriber입니다:


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Ada | Basic | Go | Haskell | Objective-C | ooc | Scala

이 리눅스 쉘 스크립트는 10개의 subscriber를 시작하고 그 다음 publisher를 시작합니다. :

echo "Starting subscribers..."
for a in 1 2 3 4 5 6 7 8 9 10; do
    syncsub &
done
echo "Starting publisher..."
syncpub

만족스러운 결과물 입니다. :

Starting subscribers...
Starting publisher...
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates
Received 1000000 updates

우리는 REQ/REP 출력화면이 완료되는 시점에 SUB연결이 끝날 거라고 추측 할 수 없습니다. 당신은 inproc 제외한 모든 전송매체를 사용하는 경우 아웃 바운드가 어떠한 순서로 완료된다고 보장할 수 없습니다. 그래서, 예제는 subscribing사이에 1초를 강제로 sleep하고 REQ/REP동기화를 보냅니다.

보다 강력한 모델이 되기 위해서 :

  • Publisher는 PUB소켓을 열고, “Hello”메시지(데이터는 없음)를 보내기 시작합니다.
  • Subscriber는 SUB소켓을 열고 “Hello”메시지를 받았을 때 REQ/REP 한 쌍의 소켓을 통해 publisher와 통신 합니다.
  • Publisher가 필요한 확인을 거친 후 실제 데이터를 전송하기 시작합니다.

Zero Copy

top prev next

Zero-copy를 모르는 ØMQ 초보였을 때를 지나, 여기까지 왔다면 당신은 zero-copy를 사용할 준비가 된 것입니다. 그러나 잘못 된 곳으로 가는 많은 길이 있고, 조기 최적화는 즐거운 일도 아니고 수익성도 없습니다. 당신의 아키텍쳐가 완벽하지 않는 상태에서 zero-copy를 하려고 하는 것은 아마도 시간 낭비이고 작업을 더 악화시키는 것이기에 좋을 것이 없습니다.

ØMQ의 메시지 API는 데이터를 복사하지 않고 데이터를 응용 프로그램 버퍼로부터 직접 메시지를 보내고 받을 수 있습니다. ØMQ가 백그라운드에서 메시지를 보낸다면 zero-copy는 일부 소스 수정이 필요합니다.

zero-copy를 사용하기 위해 우리는 malloc()으로 힙에 할당된 데이터 블록을 참조하는 메시지를 생성하기 위해 zmq_msg_init_data(3)을 사용하고, 다음 zmq_send(3)으로 전송합니다. 당신은 생성한 메시지의 송신이 완료되었을 때 데이터 블록을 풀기(free)위해 호출하는 함수를 사용합니다. 이것은 'buffer'가 힙에 할당된 1000 바이트의 블록을 설정하는 간단한 예제입니다. :

void my_free (void *data, void *hint) {
free (data);
}
// Send message from buffer, which we allocate and 0MQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_send (socket, &message, 0);

수신에서는 zero-copy하는 방법은 없습니다 : ØMQ는 당신이 원하는대로 저장한 버퍼를 전달할 수 있지만, 응용 프로그램 버퍼에 직접 데이터를 작성하지는 않을 것입니다.

ØMQ의 다중 메시지는 zero-copy와 함께 쓰기에 잘 작동합니다. 전통적인 메시징방식에서 당신은 보낼 수 있는 한 버퍼와 함께 다른 버퍼를 마샬링할 필요가 있습니다. 이것은 복사 데이터를 의미합니다. ØMQ를 사용하면 개별 메시지 부품 등 다양한 소스에서 오는 여러 버퍼를 보낼 수 있습니다. 우리는 length-delimited 프레임으로 각 필드를 보냅니다. 응용 프로그램은 전송과 수신 호출의 반복 입니다. 그러나 내부적으로 여러 부분이 네트워크에 쓰고, 단일 시스템 호출로 다시 읽습니다. 그래서 이것은 매우 효율적입니다.

Transient vs. Durable Sockets

top prev next

고전적인 네트워킹에서, 소켓은 API 객체입니다, 이것들의 수명은 그들을 사용하는 코드보다 절대 길지 않습니다. 그러나 소켓을 보면 자원(네트워크 버퍼)을 수집하는 것을 볼 수 있습니다. ØMQ 사용자가 물었습니다, "내 프로그램이 깨지는 경우 원복 말고, 어떤 방법이 있습니까? "

이것은 매우 유용한 것으로 밝혀 졌습니다. 이것은 간단하지는 않습니다. 특히, ØMQ에서는 pub-sub경우에 유용합니다. 잠시 보겠습니다.

여기 두 소켓이 날씨에 대해서 행복하게 채팅 하는 일반적인 모델이 있습니다.

fig24.png

만약 소켓의 receiver(SUB, PULL, REQ)쪽에서 identity를 설정한다면, 그다음 송신(PUB,PUSH,PULL)쪽은 HWM까지 연결되지 않는 경우 메시지가 버퍼에 쌓일 것입니다. 송신쪽은 처리하기 위해 identity를 설정할 필요가 없습니다.

ØMQ의 전송과 수신 버퍼는 보이지 않으며 자동 동작 합니다.(TCP 버퍼 처럼).

모든 소켓에서 우리는 일시적으로 과도하게 사용해 왔습니다. transient 소켓을 durable한 소켓으로 변경하기 위해 명시적으로 identity를 설정해야 합니다. 모든 ØMQ 소켓이 ID를 가지고 있지만 기본적으로 ØMQ는 누구와 얘기하는지 기억하기 위해 UUID(unique universal identifiers)를 생성합니다.

한 소켓과 다른 소켓이 연결할 때 우리는 모르는 사이 두 소켓은 identities를 교환합니다. 일반적으로 소켓은 대상의 ID를 알려고 하지 않기 때문에 서로간에 임의의 ID를 생성합니다.

fig25.png

하지만 소켓은 그것의 ID를 교환한 후 다음번에 만나서 이럴겁니다. “내가 들은 것은 당신이 사무실 가는 법을 알고 있는 어떤 방법과 다르다고 말했습니다. 그들은 수다쟁이 입니다. 나는 어떤 누구에도 어떤 것도 얘기를 하지 않았으며 사실이 아닙니다.”

fig26.png

여기 내구성 소켓을 만들기 위한 소켓 ID을 설정하는 방법은 다음과 같습니다. :

zmq_setsockopt (socket, ZMQ_IDENTITY, "Lucy", 4);

소켓 ID를 설정하는 몇 가지 설명 :

  • 당신은 소켓을 연결하거나 바인딩 전에 반드시 ID를 설정을 해야 합니다.
  • Receiver가 ID를 설정합니다. : 그것은 client/sender가 사용할 쿠키를 생성하는 경우는 제외하고, HTTP 웹 응용 프로그램에서 세션 쿠키 같은 것입니다.
  • ID는 이진 문자열 입니다 : zero바이트로 시작하는 ID는 ØMQ 사용을 위해 예약되어 있습니다.
  • 하나 이상의 소켓에 동일한 ID를 사용하지 마십시요. 이미 다른 소켓에서 만들어진 ID를 사용하여 연결하려고 하면 연결이 안 됩니다.
  • 많은 소켓을 사용하는 어플리케이션에서 임의의 ID를 사용하지 마십시요. 이렇게 하는 것은 충돌하는 durable 소켓이 많아지는 원인이 되고, 결국 노드가 깨집니다.
  • 당신이 메시지를 받은 peer의 ID를 인지할 필요가 있는 경우 ROUTER소켓이 자동으로 이 작업을 수행합니다. 다른 소켓 유형을 위해 명시적으로 주소를 메시지 일부로써 보내야 합니다.
  • Durable소켓을 사용하는 것은 종종 나쁜 생각이라고 말을 합니다. 이것은 sender가 아키텍쳐를 약하게 만드는 엔트로피를 쌓이게 합니다. ØMQ에서는 명시적인 정체성을 구현하지 않는 게 좋습니다.

ZMQ_IDENTITY 소켓 옵션의 요약을 위해 zmq_setsockopt(3)을 봅시다. zmq_getsockopt(3) 메소드는 작업중인 소켓의ID를 제공합니다.

Pub-sub Message Envelopes

top prev next

우리는 multipart 메시지를 간단히 봤습니다. 지금은 메시지 envelopes에 대해서 보겠습니다. Envelop은 데이터를 건드리지 않고 주소와 함께 안정하게 데이터를 포장하는 한 방법입니다.

pub-sub패턴에서, 적어도 envelop은 필터링을 위한 subscription key를 가지고 있지만, envelop에 발신자ID를 추가할 수 있습니다.

당신이 pub-sub envelop을 사용하기 원한다면, 당신이 직접 envelop을 만들 수 있습니다. 이것은 선택이며, 이전 pub-sub예제에서는 이렇게 하지 않았습니다. Pub-sub envelop을 사용하는 것은 간단한 경우에는 조금 더 많은 작업이 들어가지만, 키와 데이터가 자연스럽게 분리되는 실제 경우에는 더 간단합니다. 당신이 어플리케이션 버퍼에 직접 데이터를 쓴다면 더 속도가 빨라집니다.

envelop된 publish-subscribe 메시지는 아래와 같이 보여집니다. :

fig27.png

Pub-sub은 접두사가 일치하는 메시지를 가져옵니다. 구분된 프레임에 키를 넣는 것은 매우 분명한 일치성을 제공합니다. 그래서 우연히 데이터의 일부만 일치하는 경우를 없애 줍니다.

여기 pub-sub envelop이 코드상으로 어떻게 보이는지 간단한 예제가 있습니다. Publisher는 A,B 두개 유형의 메시지를 보냅니다. Envelop은 메시지 유형을 가집니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haskell | Node.js | Objective-C | ooc

Subscriber는 B타입의 메시지만 원합니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haskell | Node.js | Objective-C | ooc

이 두 프로그램을 실행하면, subscriber 결과는 아래와 같이 출력합니다. :

[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
...

이 예제는 subscription 필터를 반환하거나, 전체 multipart메시지(key와 data포함)를 가져오는 것을 보여 줍니다. 결코, multipart메시지의 일부만은 얻을 수 없습니다.

당신은 다중의 publisher에 가입하고 당신이 그들에게 또 다른 소켓을 통해 데이터를 (그리고 이것은 매우 일반적인 사용 케이스 입니다) 보낼 수 있도록 자신의 ID을 알고 싶다면, 당신은 세 부분으로 메시지를 작성하시면 됩니다. :

fig28.png

(Semi-)Durable Subscribers and High-Water Marks

top prev next

ID(Identities)는 모든 종류의 소켓에서 사용합니다.만약 당신이 PUB과 SUB 소켓을 가지고 있고, subscriber가 publisher에게 자체 ID를 준다면 publisher는 subscriber에게서 데이터를 넘길 때까지 잡고 있습니다.

이것은 동시에 놀랍고 끔찍한 것입니다, 이것이 놀라운 이유는 당신이 연결하고 이것을 수집할 때까지 업데이트가 publisher의 전송버퍼에서 기다릴 수 있다는 것입니다. 끔찍한 것은, 기본적으로 이것은 빠르게 publisher를 죽이고 당신의 시스템을 잠글 수 있다는 것입니다.

당신이 durable subscriber 소켓을 사용한다면(예, 당신이 SUB소켓에 ID를 설정하는 경우), 당신은 반드시 publisher 소켓에 HWM(high-water-mark)를 사용하여 대기열이 넘치는 것을 방지해야 합니다. Publisher의 HWM은 모든 subscriber에 독립적으로 영향을 미칩니다.

만약 당신이 이것을 증명하려면, 1장에서 wuclinet및 wuserver를 가져와서, 그것을 연결하기 전에 wuclient 라인을 추가해야 합니다.

zmq_setsockopt (subscriber, ZMQ_IDENTITY, "Hello", 5);

두 프로그램을 빌드하고 실행해 봅시다. 모두 정상으로 보일 것 입니다. 그러나 publisher가 사용하는 메모리를 주시해 보면, subscriber가 종료되면 Publisher의 메모리가 증가하고 있는 것을 확인할 수 있습니다. 당신이 subscriber를 다시 시작하면 publisher 대기열의 증가가 멈춥니다. 즉시 subscriber가 나가게 되면, 이것은 다시 커지고. 그것은 빠른 속도로 시스템을 압도합니다.

우리는 이것이 어떻게 작동 하는지, 그 다음 적당히 수행하는 방법을 볼 것입니다. 여기 2장에는 동기화 하기 위해 ‘node coordination’ 기술을 사용하는 publisher와 subscriber가 있습니다. Publisher는 매번 1초 기다리면서 10개 메시지를 보냅니다. 당신은 Ctrl-C를 사용하여 subscriber를 죽이기 위해 몇 초 동안 기다리고 재 시작 합니다.

여기 publisher는 다음과 같습니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haskell | Node.js | Objective-C | ooc

그리고 여기 subscriber가 있습니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haskell | Node.js | Objective-C | ooc

이것을 실행하려면, 자신의 윈도우에서 publisher를 시작하고, 그 다음 subscriber를 실행시킵니다. subscriber는 하나 또는 두 개의 메시지를 수집하도록 허용한 후 Ctrl-C를 누릅니다. 셋을 세고, 그리고 그것을 다시 시작합니다. 아래처럼 볼 수 있습니다 :

$ durasub
Update 0
Update 1
Update 2
^C
$ durasub
Update 3
Update 4
Update 5
Update 6
Update 7
^C
$ durasub
Update 8
Update 9
END

단지 그 차이를 보기 위해 소켓 ID를 설정한 subscriber의 라인을 막고, 다시 시도합니다. 당신은 메시지를 잃는 것을 볼 수 있습니다. ID를 설정하면 일시적인 subscriber를 지속적인 subscriber로 바뀝니다. 당신은 실제적으로 구성 파일에서 ID를 가져오거나, UUIDs를 생성하고 어딘가에 그들을 저장하는 것을 신중하게 선택할 것입니다.

우리가 PUB소켓에 high-water-mark를 설정하면, publisher는 많은 메시지를 저장하지만, 무한한 것은 아닙니다. 우리가 소켓의 publish 시작 전에, publisher에 HWM = 2로 설정하고 테스트해 봅시다.:

uint64_t hwm = 2;
zmq_setsockopt (publisher, ZMQ_HWM, &hwm, sizeof (hwm));

지금 테스트를 실행하고, 죽이고, 몇 초 후 subscriber를 재 시작하면 아래와 같은 결과를 볼 것입니다.:

$ durasub
Update 0
Update 1
^C
$ durasub
Update 2
Update 3
Update 7
Update 8
Update 9
END

자세히 보세요 : 우리는 기다리는 두 메시지가 있고, 여러 메시지들의 차이, 그리고 다시 새로운 Update들이 있습니다. HWM은 ØMQ가 대기열을 담을 수 없는 메시지를 버리게 하는 원인이 됩니다. ØMQ 매뉴얼 ‘exceptional condition’을 참고하세요.

간단히, subscriber ID를 사용한다면 publisher소켓에 high-water-mark를 설정해야 합니다. 그렇지 않으면 메모리가 부족하고 깨져서 서버가 위험해지게 됩니다. 그러나 다른 방법은 있습니다. ØMQ는 ‘swap’이라 불리는 것을 제공합니다. 이것은 대기열에 저장할 수 없는 메시지를 담는 디스크 파일 입니다. 이것은 적용하기에 매우 간단합니다.

// Specify swap space in bytes
uint64_t swap = 25000000;
zmq_setsockopt (publisher, ZMQ_SWAP, &swap, sizeof (swap));

우리는 속도가 저하되고 차단되고, subscriber가 없어지는 좋지 않은 publisher를 만들기 위해 이것을 넣을 수는 있으며, 여전히 이것이 필요한 곳에 영원한 가입(durable descriptions)을 제공하는 것은 가능합니다. :


C++ | C# | Clojure | CL | Erlang | F# | Haxe | Java | Lua | Perl | PHP | Python | Ruby | Scala | Ada | Basic | Go | Haskell | Node.js | Objective-C | ooc

실제로, HWM=1로 설정하여 디스크에 모든 것은 저장하는 것은 pub-sub시스템을 매우 느리게 할 것입니다. 알 수 없는 subscriber를 감당해야만 하는 publisher를 위해 좀더 합리적인 best practice가 있습니다. :

  • PUB소켓에 항상 HWM을 설정해라. 예상되는 최대 subscriber수, 큐에 할당할 수 있는 메모리 양, 메시지의 평균 사이즈에 기초해서 HWM을 설정 할 수 있습니다. 예를 들어, 5000 subscriber를 예상하고, 가용한 메모리가 1GB이고, 메시지가 ~200bytes라면, 그때 적당한 HWM은 (1000000000 / 200 / 5,000) = 1,000 입니다.
  • 만약 subscriber의 속도가 느리거나 데이터를 잃어 버리는 것을 원하지 않으면, 당신이 커버하려는 최대 메시지 속도, 메시지의 평균 크기와 시간, subscriber의 숫자에 따라 최고점(peek)를 처리하기에 충분한 대형의 SWAP을 설정해야 합니다. 예를 들어, subscriber는 5,000이고, 초당 100,000건씩 ~200 바이트의 메시지가 들어오면, 초당 디스크 공간 100MB까지 해야 합니다. 최대 1 분 정도의 정전을 커버하려면, 따라서 디스크 공간 6GB이 필요할 것이며 다른 이야기 이지만, 그것도 빨리 해야만 할 것 입니다.

Durable subscriber의 주의할 사항 :

  • subscriber가 어떻게 죽는지, updates의 빈도, 네트워크 버퍼의 크기, 사용하는 전송 프로토콜에 따라 데이터가 유실될 수 있습니다. Durable subscriber는 transient한 것 보다 훨씬 더 신뢰성을 가지고 있지만 완벽하지는 않습니다.
  • publisher가 죽고 다시 시작하는 경우 SWAP파일은 복구 할 수 없습니다. 이것은 임시버퍼와 네트워크 I/O 버퍼에 있는 데이터로 잃게 됩니다.

HWM 옵션 사용에 주의할 사항

  • 이것은 단일 소켓의 전송과 수신 모두에 영향을 미칩니다. 일부 소켓(PUB,SUB)는 전송버퍼에만 가집니다. 일부(SUB, PULL, REQ, REP)는 수신버퍼에만 가집니다. 일부(DEALER,ROUTER,PAIR)은 전송/수신버퍼 모두를 가집니다.
  • 당신의 소켓이 high-water-mark에 도달할 경우, 소켓 유형에 따라 대기하거나 데이터를 버릴 것입니다. PUB소켓은 high-water-mark에 도달하면 메시지를 버리고, 다른 소켓은 대기할 것입니다.
  • inproc 경우에는 송신자와 수신자가 같은 버퍼를 공유합니다. 그래서 실제 HWM은 양쪽에 설정한 HWM의 합계입니다. 이것은 한쪽에 HWM을 설정하지 않으면 버퍼크기의 제한이 없다는 것을 의미 합니다.

A Bare Necessity

top prev next

ØMQ는 당신의 상상과 진지함으로 한 조각 한 조각 끼워 넣는 조각상자와 같습니다.

당신이 얻을 확장 가능한 아키텍쳐에 눈을 뜨세요. 당신은 커피 한, 두 잔이 필요할 수 있습니다. 한번 만드는 실수를 한 다음 Entkoffeiniert 라벨이 붙은 이국적인 독일 커피를 구입하지 마세요. 이것이 맛있다는 것은 아닙니다. 확장 가능한 아키텍쳐는 새로운 아이디어가 아닙니다.- flow-based programmingErlang같은 언어는 이미 이와 같이 작동합니다. 그러나 ØMQ는 전에 어떤 것보다도 더 사용하기 쉽게 만들었습니다.

Gonzo Diethelm said 말에 따라, ‘나의 직감은 이 문장으로 요약이 됩니다:” 만약 ØMQ가 존재하지 않는다면 발명이 필요합니다.” 나는 몇 년 동안 구상한 후 ØMQ에 뛰어 들었고, 만들게 되었다는 것을 의미 합니다. ØMQ는 요즘 나에게 생활필수품인 것 같습니다.'