본문 바로가기

공부

🍎 레디스 톺아보기 - pub/sub

728x90

레디스의 pub/sub은 순서가 보장되는 at-most-once(최대 한 번) 성격을 가진 pub/sub 메시징 구현으로, 구독자(subscriber)는 하나 이상의 채널을 구독하고 메시지를 전달받을 수 있다. 이때 채널을 구독한 RESP2 클라이언트는 SUBSCRIBE, UNSUBSCRIBE를 제외한 명령어를 수행하면 안 된다. (RESP3은 구독 중이라도 명령어를 수행할 수 있다.)

 

앞서 언급했듯 레디스의 pub/sub은 메시지 전달 시 누락될 가능성이 있다(at-most-once) 따라서 메시지 전송에 있어 조건이 있다면 레디스의 stream을 활용하는 것이 좋다.

 

레디스의 pub/sub은 데이터베이스와 상관없이 채널명으로 분류된다. 즉 DB 0에 게시해도 DB 10의 구독자는 메시지를 수신받을 수 있다. 따라서 특정 범위 대상에게만 메시지를 전달하기 위해선 채널명 앞에 그룹을 지정하기 위한 이름을 붙여야 한다. 여기서 채널을 구독할 경우 구독할 채널의 이름과 완벽히 일치한 경우와 패턴 매칭을 통해 구독할 수 있는데, 만약 두 경우 모두 채널을 구독한 경우 메시지를 중복해서 받을 수 있다.

코드 레벨 분석하기

pub/sub과 관련된 코드는 src/pubsub.c를 참고하면 된다.

Subscribe

    /* Add the channel to the client -> channels hash table */
    if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(*type.serverPubSubChannels, channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(*type.serverPubSubChannels, channel, clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }

가장 먼저 구독 명령어를 분석하면 다음과 같다. 구독을 하게 되면 클라이언트는 자신의 구조체 내 pubsub_channels에 구독한 채널 정보를 저장하고, 서버에 해당 채널을 생성 후 구독 요청한 클라이언트를 해당 채널 목록에 추가한다.

Publish

    /* Send to clients listening for that channel */
    de = dictFind(*type.serverPubSubChannels, channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            addReplyPubsubMessage(c,channel,message,*type.messageBulk);
            updateClientMemUsageAndBucket(c);
            receivers++;
        }
    }

    if (type.shard) {
        /* Shard pubsub ignores patterns. */
        return receivers;
    }

    /* Send to clients listening to matching channels */
    di = dictGetIterator(server.pubsub_patterns);
    if (di) {
        channel = getDecodedObject(channel);
        while((de = dictNext(di)) != NULL) {
            robj *pattern = dictGetKey(de);
            list *clients = dictGetVal(de);
            if (!stringmatchlen((char*)pattern->ptr,
                                sdslen(pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) continue;

            listRewind(clients,&li);
            while ((ln = listNext(&li)) != NULL) {
                client *c = listNodeValue(ln);
                addReplyPubsubPatMessage(c,pattern,channel,message);
                updateClientMemUsageAndBucket(c);
                receivers++;
            }
        }
        decrRefCount(channel);
        dictReleaseIterator(di);
    }

발행을 하게 되면, 서버 채널 목록에서 클라이언트 목록을 조회한 후 순회하며 전송하게 된다. 이후 shard flag 유무에 따라서 추가적으로 패턴 매칭된 채널에 추가 메시지를 발행한다.

 

10년 간 redis를 혼자 관리하며 단순한 코드 구조를 유지할 수 있었던 만큼, 작은 코드지만 분석하기 쉬웠다. 향후 c언어를 공부하게 된다면 좋은 참고 자료가 될 것 같다. 고마워요 Salvatore!