Compare commits

...

10 commits

Author SHA1 Message Date
031c5ff8dd Improved error messages with AlphaVantage stock API calls. 2026-01-20 19:40:21 -05:00
08e22632bc Improved comments 2026-01-16 00:21:07 -05:00
fa2717e1f7 Split the AlphaVantage stock script into 2 scripts. One to make the API calls to AlphaVantage (this runs infrequently due to rate limit of free API keys) and saves the retrieved JSON files to cache. The other script can run more frequently and scans the cache dir for updated JSON files, and returns the formatted stock data to Conky. 2026-01-15 23:47:22 -05:00
6b04d9296a Filtered out temporary mounts like from timeshift, systemd, etc from being considered as mount points to display in the Conky drive list. 2026-01-14 01:28:18 -05:00
579509fe87 The AlphaVantage stock script now uses a cache file that is built slowly while waiting 15 seconds per API call, because the free AlphaVantage API keys are rate limited and only the first stock symbol was being returned. 2026-01-03 21:43:01 -05:00
f78d5d4778 Fixed df error about swap, fixed user instructions for which dependencies are required and scripts require executable permissions 2025-11-08 11:49:17 -05:00
a4b49e05e6 Cleaned up the formatting of the active connections 2025-01-31 00:53:53 -05:00
02b71f33fe Added another filter to name the CPU properly 2025-01-29 00:14:19 -05:00
9238875364 AMD graphics card name was not being parsed correctly from lspci 2025-01-28 17:15:53 -05:00
f8b3fda0b5 People were having trouble with wrongly named folder when downloaded from git 2025-01-28 13:22:09 -05:00
5 changed files with 379 additions and 110 deletions

View file

@ -4,4 +4,4 @@ This should be the folder:
~/.conky/mgconky/
Thank you.
Thank you!

33
conf
View file

@ -15,10 +15,11 @@
-- !!!IMPORTANT!!! THE FOLLOWING ARE REQUIREMENTS FOR THIS SCRIPT TO WORK PROPERLY:
-- (1) DEPENDENCIES.
-- Install the following software, if not already installed:
-- "Conky" sudo apt-get install conky-all (or use software manager)
-- "Jq" sudo apt-get install jq (or use software manager)
-- "Curl" sudo apt-get install curl (or use software manager)
-- "Wget" sudo apt-get install wget (or use software manager)
-- "Conky" sudo apt-get install conky-all
-- "Jq" sudo apt-get install jq
-- "Curl" sudo apt-get install curl
-- "Wget" sudo apt-get install wget
-- "python3" sudo apt-get install python3
-- (2) FONTS.
-- Install the following custom fonts:
-- "Neuropolitical" Place .ttf file in ~/.fonts/ https://www.dafont.com/font-comment.php?file=neuropolitical
@ -28,8 +29,9 @@
-- chmod +x ~/.conky/mgconky/weather/get_weather.sh
-- chmod +x ~/.conky/mgconky/weather/parse_weather.sh
-- chmod +x ~/.conky/mgconky/weather/parse_forecast.sh
-- chmod +x ~/.conky/mgconky/weather/parse_forecast.sh
-- chmod +x ~/.conky/mgconky/stocks/get_stocks.py (Requires python3, which is usually pre-installed on your OS)
-- chmod +x ~/.conky/mgconky/stocks/get_stocks_alphavantage.py
-- chmod +x ~/.conky/mgconky/stocks/process_stocks_alphavantage.py
-- chmod +x ~/.conky/mgconky/stocks/get_stocks_finnhub.py
-- (4) WEATHER.
-- Make a free account at https://openweathermap.org/
-- Write down your API key, which is found on the "API keys" tab after you log in. (https://home.openweathermap.org/api_key
@ -91,16 +93,16 @@ conky.config = {
color7 = "#FF0000", -- Bad values (red)
-- Weather variables
template0 = "YOUR_OPENWEATHERMAP_API_KEY_HERE", -- OpenWeatherMap API key (https://home.openweathermap.org/api_keys)
template1 = "YOUR_OPENWEATHERMAP_CITY_ID_HERE", -- OpenWeatherMap City ID (the number in the URL of your city, for example: https://openweathermap.org/city/5128581)
template0 = "<ENTER YOUR OPEANWEATHERMAP API KEY HERE>", -- OpenWeatherMap API key (https://home.openweathermap.org/api_keys)
template1 = "4467657", -- OpenWeatherMap City ID (the number in the URL of your city, for example: https://openweathermap.org/city/5128581)
template2 = "imperial", -- Temp unit ("default" for Kelvin, "metric" for Celsius, "imperial" for Fahrenheit)
template3 = "", -- Locale (e.g., "es_ES.UTF-8") # Leave empty for default
-- Stock variables
template4 = "YOUR_FINNHUB_API_KEY_HERE", -- FinnHub API key (https://finnhub.io/)
template5 = "YOUR_ALPHAVANTAGE_API_KEY_HERE", -- Alpha Vantage API key (https://www.alphavantage.co/)
template6 = "goog,amzn,aapl,msft,meta,tsla,avgo,tsm,brk.a,pg,nvda", -- Stock symbols for FinnHub (comma separated, no spaces, i.e. goog,amzn,aapl)
template7 = "nvda", -- Stock symbols for Alpha Vantage (keep to a minimum unless you have a paid API key)
template4 = "<ENTER YOUR FINNHUB API KEY HERE>", -- FinnHub API key (https://finnhub.io/)
template5 = "<ENTER YOUR ALPHAVANTAGE API KEY HERE", -- Alpha Vantage API key (https://www.alphavantage.co/)
template6 = "spy,dia,nvda,tsla,amzn,aapl,msft,meta,pg,ibit", -- Stock symbols for FinnHub (comma separated, no spaces, i.e. goog,amzn,aapl)
template7 = "tsla,spy,dia", -- Stock symbols for Alpha Vantage (keep to a minimum unless you have a paid API key)
-- Load Lua script(s) -- If multiple files, separate each path with a space. They should all be loaded on a single lua_load command.
-- lua_load = "~/.conky/mgconky/script1.lua ~/.conky/mgconky/script2.lua",
@ -177,7 +179,8 @@ ${voffset -6}${execpi 60 $HOME/.conky/mgconky/stocks/get_stocks_finnhub.py --api
${endif}
# ***** Alpha Vantage API *****
${if_match "${template5}" != "YOUR_ALPHAVANTAGE_API_KEY_HERE"}
${voffset -12}${execpi 7200 $HOME/.conky/mgconky/stocks/get_stocks_alphavantage.py --api_key ${template5} --symbols ${template7} --range_in_days 30 --price_dec_places 0 --percent_dec_places 1}
${execpi 43200 $HOME/.conky/mgconky/stocks/get_stocks_alphavantage.py --api_key ${template5} --symbols ${template7} --range_in_days 30}
${voffset -12}${execpi 60 $HOME/.conky/mgconky/stocks/process_stocks_alphavantage.py --symbols ${template7} --range_in_days 30 --price_dec_places 0 --percent_dec_places 1 --stale_seconds 43200}
${endif}
#
#--------------------------
@ -234,7 +237,7 @@ ${voffset -6}Total: ${color3}${totaldown ${gw_iface}}${color}${goto 140}Total: $
# Connections - netstat shows number of connections from your computer and application/PID making it. Kill spyware!
#--------------------
${voffset 6}${color0}${font Neuropolitical:size=8:bold}CONNECTIONS ${color1}${hr 2}${color}${font Courier:size=9}
${voffset 6}Num. connections / PID / Process
${voffset 2}${color3}${execi 30 netstat -ept | grep ESTAB | awk '{print $9}' | cut -d: -f1 | sort | uniq -c | sort -nr}${color}
#${voffset 2}${color3}${execi 30 netstat -ept | grep ESTAB | awk '{print $9}' | cut -d: -f1 | sort | uniq -c | sort -nr}${color}
${voffset 2}${lua_parse conky_get_connections}
]]

View file

@ -13,6 +13,7 @@ function conky_shorten_cpu_name(cpu_name)
:gsub("CPU", "") -- Remove "CPU"
:gsub("Processor", "") -- Remove "Processor"
:gsub("@[%s%w%.]+", "") -- Remove "@ XGHz"
:gsub("with.*", "") -- Remove "with" and anything after the word "with"
:gsub("%s%s+", " ") -- Remove extra spaces
:gsub("^%s+", "") -- Trim leading spaces
:gsub("%s+$", "") -- Trim trailing spaces
@ -51,7 +52,7 @@ function conky_get_cpu_info()
end
end
-- ########################################################## GET GU MAKE AND MODEL ###########################################################
-- ########################################################## GET GPU MAKE AND MODEL ##########################################################
function conky_shorten_gpu_name(gpu_name)
return (gpu_name
@ -73,11 +74,15 @@ function conky_shorten_gpu_name(gpu_name)
:gsub("Raspberry Pi Foundation", "Raspberry Pi")
:gsub("Broadcom Inc%.", "Broadcom")
:gsub("Lite Hash Rate", "LHR")
:gsub("GA%d+%s%[", "") -- Remove chip identifier like "GA106 ["
:gsub("%].*", "") -- Remove everything after "]"
:gsub("%(.*%)", "") -- Remove revision info like "(rev a1)"
:gsub("%s+", " ") -- Normalize spaces
:match("^%s*(.-)%s*$") -- Trim leading/trailing spaces
:gsub("Renoir", "") -- Remove "Renoir"
:gsub("Ryzen %d+/%d+ Mobile Series", "") -- Remove Ryzen details (for APUs)
:gsub("%(Ryzen.-%)", "") -- Remove Ryzen-related info in parentheses
:gsub("GA%d+%s%[", "") -- Remove chip identifier like "GA106 ["
:gsub("%[.*%]%s*", "") -- Remove chip code in brackets like "[1002:1636]"
:gsub("%((rev .-)%)", "") -- Remove revision info like "(rev a1)"
:gsub("%s+", " ") -- Normalize spaces
:gsub("%]%s*$", "") -- Remove trailing square bracket "]" if it exists
:match("^%s*(.-)%s*$") -- Trim leading/trailing spaces
)
end
@ -166,6 +171,23 @@ function conky_get_drives_and_volumes()
if #data.mountpoints > 0 then
output = output .. '${voffset 0}${color0}${font Neuropolitical:size=8:bold}DRIVE${font Courier:size=9} /dev/' .. drive .. '${color} ${color1}${hr 2}${color}\n'
for _, mount in ipairs(data.mountpoints) do
-- Skip entries that are not real mountpoints like [SWAP]
if mount:sub(1, 1) ~= "/" then
goto continue
end
-- Skip common temporary mounts
if mount:match("^/sys/") then goto continue end
if mount:match("^/proc/") then goto continue end
if mount:match("^/dev/") then goto continue end
-- Skip mounts under /var/, but allow a mount at /var itself
if mount ~= "/var" and mount:match("^/var/") then goto continue end
-- Skip runtime mounts except removable media
if mount:match("^/run/") and not mount:match("^/run/media/") then goto continue end
-- Use df to get the size and used space for the mount
local handle = io.popen("df -h --output=target,size,used " .. mount .. " | tail -n 1")
local df_result = handle:read("*a")
@ -173,10 +195,19 @@ function conky_get_drives_and_volumes()
-- Parse df output
local target, size, used = df_result:match("(%S+)%s+(%S+)%s+(%S+)")
-- Ensure mount still exists before emitting fs_bar
local test = io.open(target, "r")
if not test then goto continue end
test:close()
-- Create output lines for this mountpoint.
if target and size and used then
output = output .. '${voffset 2}' .. target .. ': ${alignr}${color3}' .. used .. 'B${color} of ${color3}' .. size .. 'B${color}\n'
output = output .. '${voffset -2}${color5}${fs_bar ' .. target .. '}${color}\n'
end
::continue::
end
output = output .. '\n' -- Add a blank line after the last mount point of the current drive
end
@ -356,3 +387,64 @@ print(" CHECK SWAP STATUS = " .. conky_check_swap_status())
print(" GET MEMORY USAGE = " .. conky_get_memory_usage("mem"))
print(" GET SWAP USAGE = " .. conky_get_memory_usage("swap"))
-- ####################################################### GET ACTIVE CONNECTIONS ##############################################################
function conky_get_key_with_highest_value(t)
local max_key = nil
local max_value = -math.huge -- Initialize with the smallest possible number
for key, value in pairs(t) do
if type(value) == "number" and value > max_value then
max_value = value
max_key = key
end
end
return max_key
end
function conky_get_connections()
local handle = io.popen("ss -eptH | grep ESTAB 2>/dev/null")
if not handle then return "Error: Unable to execute ss command" end
local process_names = {}
local process_counts = {}
-- Read each line of output
for line in handle:lines() do
-- Extract process name and PID using pattern matching
local pname, pid = line:match('users:%(%("([^"]+)",pid=(%d+)')
pid = tonumber(pid) -- Ensure pid is a number
if pname and pid then
-- Store process name by PID and count occurrences
process_names[pid] = pname
process_counts[pid] = (process_counts[pid] or 0) + 1
end
end
handle:close()
-- Header line
local tab1_offset = "${goto 145}"
local line_prefix = "${voffset 0}${color}${font StyleBats:size=10}h${font Courier:size=9}${voffset -1}${color3}"
local result = string.format("${color}Process%sPID${alignr}Num \n", tab1_offset)
-- Sort and format output
while true do
local max_key = conky_get_key_with_highest_value(process_counts)
if max_key == nil then
break -- Stop when the table is empty
end
result = result .. string.format("%s %s%s%d${alignr}%d \n", line_prefix, string.sub(process_names[max_key], 1, 16), tab1_offset, max_key, process_counts[max_key])
-- Remove the highest key from both tables
process_counts[max_key] = nil
process_names[max_key] = nil
end
return result
end

View file

@ -1,11 +1,31 @@
#!/usr/bin/env python3
import os
import sys
import time
import json
import argparse
import requests
from datetime import datetime, timedelta
import argparse
# Directory where per-symbol cache JSON files are stored
CACHE_DIR = os.path.expanduser("~/.cache/mgconky/stocks_alphavantage/")
# Delay between API calls to respect AlphaVantage rate limits
API_DELAY_SECONDS = 1
# Minimum age before re-querying a symbol to avoid API thrashing
NO_THRASH_SECONDS = 60 * 60 # 1 hour
def log_error(message):
"""Print timestamped error message to stderr and flush immediately."""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{ts}] {message}", file=sys.stderr, flush=True)
def fetch_intraday_data(api_key, symbol, interval="1min"):
"""Fetch current and historical intraday price using TIME_SERIES_INTRADAY."""
# Query AlphaVantage intraday endpoint for the given symbol
url = "https://www.alphavantage.co/query"
params = {
"function": "TIME_SERIES_INTRADAY",
@ -19,36 +39,31 @@ def fetch_intraday_data(api_key, symbol, interval="1min"):
data = response.json()
time_series = data.get(f"Time Series ({interval})")
# Debug: Log the raw response
#print(f"DEBUG: Response data for {symbol}: {data}")
if time_series:
# Sort the timestamps to get the first (open) and latest price
# Use earliest bar as open price and latest bar as current price
sorted_timestamps = sorted(time_series.keys())
open_time = sorted_timestamps[0] # Earliest timestamp of the day
latest_time = sorted_timestamps[-1] # Most recent timestamp
# Extract open price and latest price
open_price = float(time_series[open_time]["1. open"])
latest_price = float(time_series[latest_time]["4. close"])
open_time = sorted_timestamps[0]
latest_time = sorted_timestamps[-1]
return {
"current_price": latest_price,
"compare_price": open_price
"current_price": float(time_series[latest_time]["4. close"]),
"compare_price": float(time_series[open_time]["1. open"]),
}
else:
print(f"Error: No 'Time Series ({interval})' data found for {symbol}.")
return None
# API responded but did not include expected time series data
log_error(f"No 'Time Series ({interval})' data found for {symbol}.")
else:
print(f"Error: Failed to fetch intraday data for {symbol} (HTTP {response.status_code})")
return None
# HTTP error from AlphaVantage
log_error(f"Failed to fetch intraday data for {symbol} (HTTP {response.status_code})")
except requests.RequestException as e:
print(f"Error: Failed to fetch intraday data for {symbol} - {e}")
return None
# Network or request failure
log_error(f"Failed to fetch intraday data for {symbol} - {e}")
return None
def fetch_historical_data(api_key, symbol, range_in_days):
"""Fetch current and historical price using TIME_SERIES_DAILY, allowing fallback to nearby dates."""
# Query AlphaVantage daily endpoint for historical comparison
url = "https://www.alphavantage.co/query"
params = {
"function": "TIME_SERIES_DAILY",
@ -61,96 +76,109 @@ def fetch_historical_data(api_key, symbol, range_in_days):
data = response.json()
time_series = data.get("Time Series (Daily)")
# Debug: Log the raw response
#print(f"DEBUG: Response data for {symbol}: {data}")
if time_series:
# Current price is the latest closing price
# Most recent trading day's close is the current price
latest_date = max(time_series.keys())
latest_data = time_series[latest_date]
current_price = float(latest_data["4. close"])
current_price = float(time_series[latest_date]["4. close"])
# Dates to check: exact, one day before, one day after
target_date = (datetime.now() - timedelta(days=range_in_days)).strftime("%Y-%m-%d")
fallback_dates = [
target_date,
(datetime.now() - timedelta(days=range_in_days + 1)).strftime("%Y-%m-%d"),
(datetime.now() - timedelta(days=range_in_days - 1)).strftime("%Y-%m-%d")
]
# Target calendar date for comparison
target = datetime.now() - timedelta(days=range_in_days)
# Find the first available date in the fallback list
# Walk backward up to 10 days to find a valid trading day
historical_price = None
for date in fallback_dates:
if date in time_series:
historical_price = float(time_series[date]["4. close"])
for offset in range(10):
d = (target - timedelta(days=offset)).strftime("%Y-%m-%d")
if d in time_series:
historical_price = float(time_series[d]["4. close"])
break
return {
"current_price": current_price,
"compare_price": historical_price
"compare_price": historical_price,
}
else:
print(f"Error: No 'Time Series (Daily)' data found for {symbol}.")
return None
# API responded but did not include expected daily series data
log_error(f"No 'Time Series (Daily)' data found for {symbol}.")
else:
print(f"Error: Failed to fetch historical data for {symbol} (HTTP {response.status_code})")
return None
# HTTP error from AlphaVantage
log_error(f"Failed to fetch historical data for {symbol} (HTTP {response.status_code})")
except requests.RequestException as e:
print(f"Error: Failed to fetch historical data for {symbol} - {e}")
return None
# Network or request failure
log_error(f"Failed to fetch historical data for {symbol} - {e}")
return None
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Fetch stock data from Alpha Vantage.")
parser.add_argument("--api_key", required=True, help="Your Alpha Vantage API key")
parser.add_argument("--symbols", required=True, help="Comma-separated list of stock symbols")
parser.add_argument("--range_in_days", type=int, default=0, help="Number of days for historical comparison (0 for none)")
parser.add_argument("--price_dec_places", type=int, default=0, help="Decimal places for prices")
parser.add_argument("--percent_dec_places", type=int, default=1, help="Decimal places for percentages")
parser = argparse.ArgumentParser()
parser.add_argument("--api_key", required=True)
parser.add_argument("--symbols", required=True)
parser.add_argument("--range_in_days", type=int, default=0)
args = parser.parse_args()
# Split symbols
# Normalize and split ticker symbols
symbols = args.symbols.strip().upper().split(",")
# Use a list to build the final output string
output = []
# Ensure cache directory exists
os.makedirs(CACHE_DIR, exist_ok=True)
# Prepare for printing output
color_header = "${color}"
color_label = "${color3}"
color_value = "${color3}"
color_good = "${color6}"
color_bad = "${color7}"
line_tab1_offset = "${goto 25}"
line_tab2_offset = "${goto 90}"
line_tab3_offset = "${alignr}" # Could also replace this with goto 120 if you don't like the right alignment
for i, symbol in enumerate(symbols):
cache_path = os.path.join(CACHE_DIR, f"{symbol}.json")
# Iterate symbols
for symbol in symbols:
fetched_data = fetch_intraday_data(args.api_key, symbol) if args.range_in_days < 1 else fetch_historical_data(args.api_key, symbol, args.range_in_days)
if fetched_data:
current_price = fetched_data["current_price"]
compare_price = fetched_data["compare_price"]
price_difference = current_price - compare_price
percent_change = (price_difference / current_price) * 100
color_dynamic = (
color_good if round(price_difference, args.price_dec_places) > 0
else color_bad if round(price_difference, args.price_dec_places) < 0
else color_value
)
output.append(
f"{line_tab1_offset}{color_label}{symbol}: {line_tab2_offset}{color_value}{round(current_price, args.price_dec_places):.{args.price_dec_places}f} "
f"{line_tab3_offset}{color_dynamic}{round(price_difference, args.price_dec_places):+.{args.price_dec_places}f} "
f"({round(percent_change, args.percent_dec_places):+.{args.percent_dec_places}f}%)"
)
else:
output.append(f"{symbol}: Error fetching data")
# --- No-thrash protection: skip API call if cache is still fresh ---
if os.path.exists(cache_path):
try:
with open(cache_path, "r") as f:
payload = json.load(f)
if (
isinstance(payload, dict)
and "timestamp" in payload
and isinstance(payload["timestamp"], (int, float))
and (time.time() - payload["timestamp"]) < NO_THRASH_SECONDS
):
# Cached data is recent enough -> skip querying AlphaVantage
continue
except Exception:
# Any read/parse error -> ignore cache and refetch
pass
# Fetch either intraday or historical data depending on range
fetched_data = (
fetch_intraday_data(args.api_key, symbol)
if args.range_in_days < 1
else fetch_historical_data(args.api_key, symbol, args.range_in_days)
)
# Only write cache if fetched data is structurally valid
if (
isinstance(fetched_data, dict)
and "current_price" in fetched_data
and isinstance(fetched_data["current_price"], (int, float))
):
tmp = os.path.join(CACHE_DIR, f"{symbol}.json.tmp")
final = os.path.join(CACHE_DIR, f"{symbol}.json")
# Payload includes metadata for staleness and debugging
payload = {
"symbol": symbol,
"range_in_days": args.range_in_days,
"timestamp": int(time.time()),
"data": fetched_data,
}
# Atomic write to avoid corrupting existing cache
with open(tmp, "w") as f:
json.dump(payload, f)
os.replace(tmp, final)
# Rate-limit delay between symbols
if i < len(symbols) - 1:
time.sleep(API_DELAY_SECONDS)
# Join all parts of the output and print it
header_label = f"Intraday" if args.range_in_days < 1 else f"{args.range_in_days} Day"
header_line = f"{line_tab1_offset}{color_header}Ticker{line_tab2_offset}Price ($$){line_tab3_offset}{header_label}{color_label}"
return header_line + "\n" + f"{line_tab1_offset}{color_header}${{voffset -5}}${{hr 1}}" + "\n" + "\n".join(output)
if __name__ == "__main__":
print(main())
main()

View file

@ -0,0 +1,146 @@
#!/usr/bin/env python3
import os
import sys
import time
import json
import argparse
from datetime import datetime, timedelta
# Directory containing cached per-symbol JSON files
CACHE_DIR = os.path.expanduser("~/.cache/mgconky/stocks_alphavantage/")
def main():
# Parse command-line arguments (reader does not need api_key)
parser = argparse.ArgumentParser(description="Process cached Alpha Vantage stock data.")
parser.add_argument("--symbols", required=True, help="Comma-separated list of stock symbols")
parser.add_argument("--range_in_days", type=int, default=0, help="Number of days for historical comparison")
parser.add_argument("--price_dec_places", type=int, default=0, help="Decimal places for prices")
parser.add_argument("--percent_dec_places", type=int, default=1, help="Decimal places for percentages")
parser.add_argument("--stale_seconds", type=int, default=13 * 3600, help="Seconds before cached data is considered stale")
args = parser.parse_args()
# Split ticker symbols by comma
symbols = args.symbols.strip().upper().split(",")
# Collect formatted output lines for Conky
output = []
# Conky color and layout formatting
color_header = "${color}"
color_label = "${color3}"
color_value = "${color3}"
color_good = "${color6}"
color_bad = "${color7}"
line_tab1_offset = "${goto 25}"
line_tab2_offset = "${goto 90}"
line_tab3_offset = "${alignr}"
# Process each symbol
for symbol in symbols:
cache_path = os.path.join(CACHE_DIR, f"{symbol}.json")
# Missing cache file -- display formatted placeholder
if not os.path.exists(cache_path):
output.append(
f"{line_tab1_offset}{color_bad}{symbol}: "
f"{line_tab2_offset}{color_value}-- "
f"{line_tab3_offset}{color_value}-- (--%)"
)
continue
# Determine cache age using file modification time
age_seconds = time.time() - os.stat(cache_path).st_mtime
# Symbol turns red (color_bad) if cached data is stale
symbol_color = color_bad if age_seconds > args.stale_seconds else color_label
# Load cached JSON payload
try:
with open(cache_path, "r") as f:
payload = json.load(f)
# Validate payload structure before use
if not isinstance(payload, dict) or "data" not in payload:
raise ValueError("Invalid cache payload")
fetched_data = payload["data"]
except Exception:
# Corrupt or unreadable cache file -- display formatted placeholder
output.append(
f"{line_tab1_offset}{color_bad}{symbol}: "
f"{line_tab2_offset}{color_value}-- "
f"{line_tab3_offset}{color_value}-- (--%)"
)
continue
# ------------------------------
# VALIDATION
# ------------------------------
if (
isinstance(fetched_data, dict)
and "current_price" in fetched_data
and isinstance(fetched_data["current_price"], (int, float))
):
current_price = fetched_data["current_price"]
compare_price = fetched_data["compare_price"]
# Comparison price may be missing if no trading day was found
if not isinstance(compare_price, (int, float)):
symbol_color = color_bad
output.append(
f"{line_tab1_offset}{symbol_color}{symbol}: "
f"{line_tab2_offset}{color_value}-- "
f"{line_tab3_offset}{color_value}-- (--%)"
)
continue
# Compute absolute and percentage change
price_difference = current_price - compare_price
percent_change = (price_difference / current_price) * 100
# Color change based on price movement direction
color_dynamic = (
color_good if round(price_difference, args.price_dec_places) > 0
else color_bad if round(price_difference, args.price_dec_places) < 0
else color_value
)
# The actual formatted line with valid stock data
output.append(
f"{line_tab1_offset}{symbol_color}{symbol}: "
f"{line_tab2_offset}{color_value}{round(current_price, args.price_dec_places):.{args.price_dec_places}f} "
f"{line_tab3_offset}{color_dynamic}{round(price_difference, args.price_dec_places):+.{args.price_dec_places}f} "
f"({round(percent_change, args.percent_dec_places):+.{args.percent_dec_places}f}%)"
)
else:
# Cached data missing required fields -- display formatted placeholder
output.append(
f"{line_tab1_offset}{color_bad}{symbol}: "
f"{line_tab2_offset}{color_value}-- "
f"{line_tab3_offset}{color_value}-- (--%)"
)
# Header label depends on intraday vs historical mode
header_label = "Intraday" if args.range_in_days < 1 else f"{args.range_in_days} Day"
# Formatted header line
header_line = (
f"{line_tab1_offset}{color_header}Ticker"
f"{line_tab2_offset}Price ($$)"
f"{line_tab3_offset}{header_label}{color_label}"
)
# Print final output for Conky to render
print(
header_line
+ "\n"
+ f"{line_tab1_offset}{color_header}${{voffset -5}}${{hr 1}}"
+ "\n"
+ "\n".join(output)
)
if __name__ == "__main__":
main()