본문으로 바로가기

[Python][기본 04] Redis 연동

category Launguage/Python 2023. 2. 12. 17:54

패키지 설치

(mp-crawler) ➜  ~/Project/python-project/mp-crawler pip install redis
Collecting redis
  Downloading redis-4.5.1-py3-none-any.whl (238 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 238.5/238.5 kB 3.7 MB/s eta 0:00:00
Collecting async-timeout>=4.0.2
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Installing collected packages: async-timeout, redis
Successfully installed async-timeout-4.0.2 redis-4.5.1
(mp-crawler) ➜  ~/Project/python-project/mp-crawler pip list
Package       Version
------------- -------
async-timeout 4.0.2
pip           23.0
redis         4.5.1
setuptools    63.2.0
wheel         0.37.1

GET / SET

import redis

# 레디스 연결
rd = redis.StrictRedis(host='localhost', port=6379, db=0, password = "******")

# 레디스에 키-값 저장
rd.set("foo", "bar")

value = rd.get("foo")

print(value)

Publish / Subscribe

import time
import redis

# 레디스 연결
rd = redis.StrictRedis(host='localhost', port=6379, db=0, password = "*******")

pubsub = rd.pubsub()
pubsub.subscribe('first', 'second')

# 1. Daemon으로 구동해야 하는 경우(Listen)
for msg in pubsub.listen():
    print(msg)
    
    
# 2. Timeout 설정 시 
# 시작 시간
start_ts = time.time()
while time.time() - start_ts < time_limit:
    time.sleep(1)
    current_ts = time.time()
    print('Time Elapsed : ', int(current_ts - start_ts))
    recieve_message = pubsub.get_message(True)
    if recieve_message is not None:
        print(recieve_message)

테스트를 위해 redis-cli 를 통헤 메시지를 발행(Publish) 한다.

# in Redis CLI
127.0.0.1:6379> publish first "publish activation code cwlee"

실행화면