-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
110 additions
and
110 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,110 +1,110 @@ | ||
import asyncio | ||
import functools | ||
import logging | ||
import pickle | ||
import random | ||
from inspect import iscoroutinefunction | ||
|
||
import redis | ||
from app import app | ||
|
||
app.config.setdefault("REDIS_URL", "redis://redis:6379/0") | ||
|
||
client = redis.from_url(app.config["REDIS_URL"], decode_responses=True) | ||
|
||
|
||
class RedisStorage(object): | ||
def __init__(self, **kwargs): | ||
for k, v in kwargs.items(): | ||
if v: | ||
self.set(k, v) | ||
|
||
def get(self, name): | ||
return client.get(name) | ||
|
||
def set(self, name, value): | ||
client.set(name, value) | ||
|
||
|
||
def get_client(decode_responses=False): | ||
return redis.from_url(app.config["REDIS_URL"], decode_responses=decode_responses) | ||
|
||
|
||
def gen_prefix(obj, method): | ||
return ".".join([obj.__module__, obj.__class__.__name__, method.__name__]) | ||
|
||
|
||
def stalecache( | ||
key=None, | ||
expire=600, | ||
stale=3600, | ||
time_lock=1, | ||
time_delay=1, | ||
max_time_delay=10, | ||
): | ||
def decorate(method): | ||
@functools.wraps(method) | ||
def wrapper(*args, **kwargs): | ||
if kwargs.get("skip_cache"): | ||
return method(*args, **kwargs) | ||
name = args[0] if args and not key else None | ||
|
||
res = get_client(False).pipeline().ttl(name).get(name).execute() | ||
v = pickle.loads(res[1]) if res[0] > 0 and res[1] else None | ||
if res[0] <= 0 or res[0] < stale: | ||
|
||
def func(): | ||
value = method(*args, **kwargs) | ||
logging.debug("update cache: %s", name) | ||
get_client(False).pipeline().set(name, pickle.dumps(value)).expire( | ||
name, expire + stale | ||
).execute() | ||
return value | ||
|
||
# create new cache in blocking modal, if cache not exists. | ||
if res[0] <= 0: | ||
return func() | ||
|
||
# create new cache in non blocking modal, and return stale data. | ||
# set expire to get a "lock", and delay to run the task | ||
real_time_delay = random.randrange(time_delay, max_time_delay) | ||
get_client(False).expire(name, stale + real_time_delay + time_lock) | ||
# 创建一个 asyncio 任务来执行 func | ||
asyncio.create_task(asyncio.sleep(real_time_delay, func())) | ||
|
||
return v | ||
|
||
@functools.wraps(method) | ||
async def async_wrapper(*args, **kwargs): | ||
if kwargs.get("skip_cache"): | ||
return await method(*args, **kwargs) | ||
|
||
name = args[0] if args and not key else None | ||
|
||
res = get_client(False).pipeline().ttl(name).get(name).execute() | ||
v = pickle.loads(res[1]) if res[0] > 0 and res[1] else None | ||
if res[0] <= 0 or res[0] < stale: | ||
|
||
async def func(): | ||
value = await method(*args, **kwargs) | ||
logging.debug("update cache: %s", name) | ||
get_client(False).pipeline().set(name, pickle.dumps(value)).expire( | ||
name, expire + stale | ||
).execute() | ||
return value | ||
|
||
# create new cache in blocking modal, if cache not exists. | ||
if res[0] <= 0: | ||
return await func() | ||
|
||
# create new cache in non blocking modal, and return stale data. | ||
# set expire to get a "lock", and delay to run the task | ||
real_time_delay = random.randrange(time_delay, max_time_delay) | ||
get_client(False).expire(name, stale + real_time_delay + time_lock) | ||
asyncio.create_task(asyncio.sleep(real_time_delay, func())) | ||
|
||
return v | ||
|
||
return async_wrapper if iscoroutinefunction(method) else wrapper | ||
|
||
return decorate | ||
import asyncio | ||
import functools | ||
import logging | ||
import pickle | ||
import random | ||
from inspect import iscoroutinefunction | ||
|
||
import redis | ||
from app import app | ||
|
||
app.config.setdefault("REDIS_URL", "redis://redis:6379/0") | ||
|
||
client = redis.from_url(app.config["REDIS_URL"], decode_responses=True) | ||
|
||
|
||
class RedisStorage(object): | ||
def __init__(self, **kwargs): | ||
for k, v in kwargs.items(): | ||
if v: | ||
self.set(k, v) | ||
|
||
def get(self, name): | ||
return client.get(name) | ||
|
||
def set(self, name, value): | ||
client.set(name, value) | ||
|
||
|
||
def get_client(decode_responses=False): | ||
return redis.from_url(app.config["REDIS_URL"], decode_responses=decode_responses) | ||
|
||
|
||
def gen_prefix(obj, method): | ||
return ".".join([obj.__module__, obj.__class__.__name__, method.__name__]) | ||
|
||
|
||
def stalecache( | ||
key=None, | ||
expire=600, | ||
stale=3600, | ||
time_lock=1, | ||
time_delay=1, | ||
max_time_delay=10, | ||
): | ||
def decorate(method): | ||
@functools.wraps(method) | ||
def wrapper(*args, **kwargs): | ||
if kwargs.get("skip_cache"): | ||
return method(*args, **kwargs) | ||
name = args[0] if args and not key else None | ||
|
||
res = get_client(False).pipeline().ttl(name).get(name).execute() | ||
v = pickle.loads(res[1]) if res[0] > 0 and res[1] else None | ||
if res[0] <= 0 or res[0] < stale: | ||
|
||
def func(): | ||
value = method(*args, **kwargs) | ||
logging.debug("update cache: %s", name) | ||
get_client(False).pipeline().set(name, pickle.dumps(value)).expire( | ||
name, expire + stale | ||
).execute() | ||
return value | ||
|
||
# create new cache in blocking modal, if cache not exists. | ||
if res[0] <= 0: | ||
return func() | ||
|
||
# create new cache in non blocking modal, and return stale data. | ||
# set expire to get a "lock", and delay to run the task | ||
real_time_delay = random.randrange(time_delay, max_time_delay) | ||
get_client(False).expire(name, stale + real_time_delay + time_lock) | ||
# 创建一个 asyncio 任务来执行 func | ||
asyncio.create_task(asyncio.sleep(real_time_delay, func())) | ||
|
||
return v | ||
|
||
@functools.wraps(method) | ||
async def async_wrapper(*args, **kwargs): | ||
if kwargs.get("skip_cache"): | ||
return await method(*args, **kwargs) | ||
|
||
name = args[0] if args and not key else None | ||
|
||
res = get_client(False).pipeline().ttl(name).get(name).execute() | ||
v = pickle.loads(res[1]) if res[0] > 0 and res[1] else None | ||
if res[0] <= 0 or res[0] < stale: | ||
|
||
async def func(): | ||
value = await method(*args, **kwargs) | ||
logging.debug("update cache: %s", name) | ||
get_client(False).pipeline().set(name, pickle.dumps(value)).expire( | ||
name, expire + stale | ||
).execute() | ||
return value | ||
|
||
# create new cache in blocking modal, if cache not exists. | ||
if res[0] <= 0: | ||
return await func() | ||
|
||
# create new cache in non blocking modal, and return stale data. | ||
# set expire to get a "lock", and delay to run the task | ||
real_time_delay = random.randrange(time_delay, max_time_delay) | ||
get_client(False).expire(name, stale + real_time_delay + time_lock) | ||
asyncio.create_task(asyncio.sleep(real_time_delay, func())) | ||
|
||
return v | ||
|
||
return async_wrapper if iscoroutinefunction(method) else wrapper | ||
|
||
return decorate |