Skip to content

Commit

Permalink
Surface import counts from record imports
Browse files Browse the repository at this point in the history
  • Loading branch information
whitfin committed Sep 14, 2024
1 parent 9b0e4e2 commit 86ecf32
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 32 deletions.
4 changes: 2 additions & 2 deletions lib/cachex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ defmodule Cachex do
iex> Cachex.put(:my_cache, "key", "value")
iex> Cachex.import(:my_cache, [ { :entry, "key", "value", 1538714590095, nil } ])
{ :ok, true }
{ :ok, 1 }
"""
@spec import(Cachex.t(), Enumerable.t(), Keyword.t()) :: {status, any}
Expand Down Expand Up @@ -1128,7 +1128,7 @@ defmodule Cachex do
{ :ok, 0 }
iex> Cachex.restore(:my_cache, "/tmp/my_backup")
{ :ok, true }
{ :ok, 1 }
iex> Cachex.size(:my_cache)
{ :ok, 1 }
Expand Down
18 changes: 10 additions & 8 deletions lib/cachex/actions/import.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,32 @@ defmodule Cachex.Actions.Import do
a large import set.
"""
def execute(cache() = cache, entries, _options),
do: {Enum.each(entries, &import(cache, &1, now())), true}
do: {:ok, Enum.reduce(entries, 0, &import(cache, &1, &2, now()))}

# Imports an entry directly when no TTL is included.
#
# As this is a direct import, we just use `Cachex.put/4` with the provided
# key and value from the existing entry record - nothing special here.
defp import(cache, entry(key: k, expiration: nil, value: v), _t),
do: {:ok, true} = Cachex.put(cache, k, v, const(:notify_false))
defp import(cache, entry(key: k, expiration: nil, value: v), c, _t) do
Cachex.put!(cache, k, v, const(:notify_false))
c + 1
end

# Skips over entries which have already expired.
#
# This occurs in the case there was an existing modification time and expiration
# but the expiration time would already have passed (so there's no point in
# adding the record to the cache just to throw it away in future).
defp import(_cache, entry(modified: m, expiration: e), t) when m + e < t,
do: nil
defp import(_cache, entry(modified: m, expiration: e), c, t) when m + e < t,
do: c

# Imports an entry, using the current time to offset the TTL value.
#
# This is required to shift the TTLs set in a backup to match the current
# import time, so that the rest of the lifetime of the key is the same. If
# we didn't do this, the key would live longer in the cache than intended.
defp import(cache, entry(key: k, modified: m, expiration: e, value: v), t) do
opts = const(:notify_false) ++ [expire: m + e - t]
{:ok, true} = Cachex.put(cache, k, v, opts)
defp import(cache, entry(key: k, modified: m, expiration: e, value: v), c, t) do
Cachex.put!(cache, k, v, const(:notify_false) ++ [expire: m + e - t])
c + 1
end
end
2 changes: 1 addition & 1 deletion lib/cachex/actions/restore.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Cachex.Actions.Restore do

Import.execute(cache, stream, [])
rescue
_error -> error(:unreachable_file)
File.Error -> error(:unreachable_file)
end

# Read the next term from a file handle cbased on the TLV flags. Each
Expand Down
29 changes: 13 additions & 16 deletions lib/cachex/actions/save.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,23 @@ defmodule Cachex.Actions.Save do
"""
def execute(cache(router: router(module: router)) = cache, path, options) do
batch = Options.get(options, :batch_size, &is_positive_integer/1, 25)
file = File.open!(path, [:write, :compressed])

case File.open(path, [:write, :compressed]) do
{:ok, file} ->
{:ok, stream} =
options
|> Keyword.get(:local)
|> init_stream(router, cache, batch)
{:ok, stream} =
options
|> Keyword.get(:local)
|> init_stream(router, cache, batch)

stream
|> Stream.chunk_every(batch)
|> Stream.map(&handle_batch/1)
|> Enum.each(&IO.binwrite(file, &1))
stream
|> Stream.chunk_every(batch)
|> Stream.map(&handle_batch/1)
|> Enum.each(&IO.binwrite(file, &1))

with :ok <- File.close(file) do
{:ok, true}
end

_error ->
error(:unreachable_file)
with :ok <- File.close(file) do
{:ok, true}
end
rescue
File.Error -> error(:unreachable_file)
end

# Use a local stream to lazily walk through records on a local cache.
Expand Down
2 changes: 1 addition & 1 deletion test/cachex/actions/import_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Cachex.Actions.ImportTest do
result6 = Cachex.ttl!(cache, 3)

# verify that the import was ok
assert(result4 == {:ok, true})
assert(result4 == {:ok, 2})
assert(result5 == {:ok, 2})

# verify TTL offsetting happens
Expand Down
2 changes: 1 addition & 1 deletion test/cachex/actions/restore_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Cachex.Actions.RestoreTest do
result6 = Cachex.ttl!(cache, 3)

# verify that the load was ok
assert(result4 == {:ok, true})
assert(result4 == {:ok, 2})
assert(result5 == {:ok, 2})

# verify TTL offsetting happens
Expand Down
6 changes: 3 additions & 3 deletions test/cachex/actions/save_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Cachex.Actions.SaveTest do
result5 = Cachex.size(cache)

# verify that the load was ok
assert(result4 == {:ok, true})
assert(result4 == {:ok, 1})
assert(result5 == {:ok, 1})
end

Expand Down Expand Up @@ -72,7 +72,7 @@ defmodule Cachex.Actions.SaveTest do
size1 = Cachex.size(cache)

# verify that the load was ok
assert(load1 == {:ok, true})
assert(load1 == {:ok, 1})
assert(size1 == {:ok, 1})

# clear the cache again
Expand All @@ -83,7 +83,7 @@ defmodule Cachex.Actions.SaveTest do
size2 = Cachex.size(cache)

# verify that the load was ok
assert(load2 == {:ok, true})
assert(load2 == {:ok, 2})
assert(size2 == {:ok, 2})
end

Expand Down

0 comments on commit 86ecf32

Please sign in to comment.