#!/usr/bin/env python3 """ Production Board Model Context Protocol (MCP) server. Stdlib-only; runs on any Python >=3.8 without an extra install. Exposes the Production Board data models as MCP tools so AI assistants (Claude Desktop, Cursor, Cline, Continue, ...) can read and write data on your behalf. Transport: stdio (JSON-RPC 2.0 over stdin/stdout). This is the standard MCP transport every host supports. Auth: Bearer token. Use a personal access token (PAT) for safety - PATs have explicit per-model scopes you control from the Integrations menu. Configure via the ``PRODMCP_TOKEN`` environment variable in your MCP host config (recommended) or run ``prodmcp login --token pat_...`` once to persist it under ``~/.prodmcp/credentials.json``. The server only exposes data-model tools. It cannot create users, change passwords, mint or revoke tokens, modify billing, or touch the integrations surface - those are deliberately out of scope. """ from __future__ import annotations import argparse import getpass import io import json import os import platform import ssl import sys import threading import time import traceback import urllib.error import urllib.parse import urllib.request import uuid from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # ── Configuration (substituted at server build time) ────────────────── APP_SLUG = "prod" APP_NAME = "Production Board" COMMAND_NAME = "prodmcp" MCP_VERSION = "1.0.2" DEFAULT_BASE = "https://deploysition.cloud" ENV_PREFIX = "PRODMCP" MODELS: Dict[str, Dict[str, Any]] = json.loads(r"""{"board":{"ops":["list","read","create","update","delete"],"create_fields":["name","description","accent","settings","tags","columns"],"update_fields":["name","description","accent","settings","tags","columns"],"allowed_filters":["data__name","data__accent","data__tags","status","is_archived","owned_by"],"allowed_sorts":["created_at","updated_at","data__name"],"default_sort":"created_at","max_limit":50,"fields":[{"name":"name","type":"string","max_len":200},{"name":"tags","type":"tags"},{"name":"accent","type":"enum","values":["slate","gray","blue","indigo","violet","fuchsia","amber","orange","emerald","green","rose","red"]},{"name":"settings","type":"dict"},{"name":"description","type":"string","max_len":2000}]},"card":{"ops":["list","read","create","update","delete"],"create_fields":["title","description","status","position","priority","tags","assignee","due_date","board_id"],"update_fields":["title","description","status","position","priority","tags","assignee","due_date","board_id"],"allowed_filters":["data__status","data__priority","data__tags","data__assignee","data__board_id","status","is_archived","owned_by"],"allowed_sorts":["created_at","updated_at","data__position","data__status","data__priority","data__due_date"],"default_sort":"data__position","max_limit":200,"fields":[{"name":"tags","type":"tags"},{"name":"title","type":"string","max_len":200},{"name":"status","type":"string","max_len":64},{"name":"assignee","type":"string","max_len":64},{"name":"board_id","type":"string","max_len":64,"ref":{"type":"board","owned":true,"optional":true}},{"name":"due_date","type":"string","max_len":32},{"name":"position","type":"number"},{"name":"priority","type":"enum","values":["low","medium","high","critical"]},{"name":"description","type":"string","max_len":4000}]}}""") # MCP protocol version this server speaks. The MCP host negotiates this # during the ``initialize`` handshake; bump if the protocol contract # changes in a way clients should renegotiate. MCP_PROTOCOL_VERSION = "2025-06-18" REFRESH_HEADER = "x-auth-refresh-token" # Top-level columns the server lets us query directly. Anything else on # `filter` / `fields` is assumed to live under `data.*`. TOP_LEVEL_COLUMNS = frozenset(( "id", "type", "status", "owned_by", "created_by", "updated_by", "is_archived", "is_deleted", "created_at", "updated_at", "data", )) def _env(name: str, default: str = "") -> str: return os.environ.get(f"PRODMCP_{name}", default) # ── State directory ──────────────────────────────────────────────────── def _root_dir() -> Path: p = Path(os.path.expanduser("~")) / f".prodmcp" p.mkdir(parents=True, exist_ok=True) try: os.chmod(p, 0o700) except OSError: pass return p def _creds_path() -> Path: return _root_dir() / "credentials.json" def _device_path() -> Path: return _root_dir() / "device.json" def _update_check_path() -> Path: return _root_dir() / "update_check.json" def _read_json(p: Path) -> Dict[str, Any]: if not p.exists(): return {} try: return json.loads(p.read_text(encoding="utf-8") or "{}") except (json.JSONDecodeError, OSError): return {} def _write_json(p: Path, data: Dict[str, Any]) -> None: tmp = p.with_suffix(p.suffix + ".tmp") tmp.write_text(json.dumps(data, indent=2), encoding="utf-8") os.replace(tmp, p) try: os.chmod(p, 0o600) except OSError: pass def _load_creds() -> Dict[str, Any]: return _read_json(_creds_path()) def _save_creds(data: Dict[str, Any]) -> None: _write_json(_creds_path(), data) def _clear_creds() -> None: p = _creds_path() if p.exists(): try: p.unlink() except OSError: pass def _device_id() -> str: blob = _read_json(_device_path()) did = blob.get("device_id") if isinstance(did, str) and len(did) >= 32: return did did = str(uuid.uuid4()) _write_json(_device_path(), {"device_id": did}) return did _session_id_cache: Optional[str] = None def _session_id() -> str: global _session_id_cache if _session_id_cache is None: _session_id_cache = str(uuid.uuid4()) return _session_id_cache def _base_url() -> str: return _env("BASE_URL") or DEFAULT_BASE def _auth_token() -> Optional[str]: return _env("TOKEN") or _load_creds().get("token") or None def _telemetry_enabled() -> bool: if _env("NO_TELEMETRY", "").lower() in ("1", "true", "yes"): return False return True def _autoupdate_enabled() -> bool: if _env("NO_AUTOUPDATE", "").lower() in ("1", "true", "yes"): return False return True # ── HTTP transport ───────────────────────────────────────────────────── class ApiError(RuntimeError): def __init__(self, status: int, message: str, body: Any = None): super().__init__(f"HTTP {status}: {message}") self.status = status self.message = message self.body = body _USER_AGENT = ( f"prodmcp/1.0.2 " f"(mcp; {platform.system().lower()}; py{platform.python_version()})" ) class _StripAuthRedirectHandler(urllib.request.HTTPRedirectHandler): """Strip Authorization on cross-origin redirects (defence against misconfigured proxies bouncing tokens to internal hosts).""" def redirect_request(self, req, fp, code, msg, headers, newurl): new_req = super().redirect_request(req, fp, code, msg, headers, newurl) if new_req is None: return None try: from_host = urllib.parse.urlparse(req.full_url).netloc.lower() to_host = urllib.parse.urlparse(newurl).netloc.lower() except Exception: return new_req if from_host != to_host: for h in ("Authorization", "authorization"): try: new_req.headers.pop(h, None) new_req.unredirected_hdrs.pop(h, None) except Exception: pass return new_req def _ensure_opener() -> None: if getattr(_ensure_opener, "_done", False): return opener = urllib.request.build_opener(_StripAuthRedirectHandler()) urllib.request.install_opener(opener) setattr(_ensure_opener, "_done", True) def _maybe_persist_refresh(headers: Any) -> None: """Sliding-session token rotation. Skipped for env-supplied tokens (intentionally transient) and PATs (the server never refreshes them).""" if _env("TOKEN"): return try: new_tok = headers.get(REFRESH_HEADER) except AttributeError: new_tok = None if not new_tok: return creds = _load_creds() if creds.get("kind") != "session": return creds["token"] = new_tok creds["saved_at"] = int(time.time()) _save_creds(creds) def _http( method: str, path: str, *, params: Optional[Dict[str, Any]] = None, body: Any = None, auth: bool = True, timeout: float = 30.0, ) -> Any: url = _base_url().rstrip("/") + path if params: flat: List[Tuple[str, str]] = [] for k, v in params.items(): if v is None: continue if isinstance(v, (list, tuple)): for item in v: flat.append((k, str(item))) else: flat.append((k, str(v))) url += "?" + urllib.parse.urlencode(flat) data = None headers = {"Accept": "application/json", "User-Agent": _USER_AGENT} if _telemetry_enabled(): headers["X-Analytics-Device-Id"] = _device_id() headers["X-Analytics-Session-Id"] = _session_id() headers["X-MCP-Channel"] = "mcp" headers["X-MCP-Version"] = MCP_VERSION if body is not None: data = json.dumps(body).encode("utf-8") headers["Content-Type"] = "application/json" if auth: tok = _auth_token() if not tok: raise ApiError( 401, f"Not signed in. Set PRODMCP_TOKEN=pat_... in the MCP host " f"config, or run `prodmcp login --token pat_...` once.", ) headers["Authorization"] = f"Bearer {tok}" _ensure_opener() ctx = ssl.create_default_context() req = urllib.request.Request(url, data=data, method=method, headers=headers) try: with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: if auth: _maybe_persist_refresh(resp.headers) raw = resp.read() if not raw: return None return json.loads(raw.decode("utf-8")) except urllib.error.HTTPError as e: raw = b"" try: raw = e.read() or b"" except Exception: pass parsed: Any = None msg = e.reason or "request failed" try: parsed = json.loads(raw.decode("utf-8")) if raw else None if isinstance(parsed, dict): bilingual = parsed.get("i18n") if isinstance(bilingual, dict) and bilingual.get("en"): msg = bilingual["en"] elif parsed.get("detail"): msg = str(parsed["detail"]) except (UnicodeDecodeError, json.JSONDecodeError): pass raise ApiError(e.code, msg, parsed) except urllib.error.URLError as e: raise ApiError(0, f"network error: {e.reason}") # ── Telemetry events (best-effort, never raises) ────────────────────── def _emit_event(event_type: str, **fields: Any) -> None: if not _telemetry_enabled(): return try: evt: Dict[str, Any] = { "type": event_type, "ts_client": int(time.time()), "meta": {k: v for k, v in fields.items() if v is not None}, } body = { "device_id": _device_id(), "session_id": _session_id(), "events": [evt], "meta": { "channel": "mcp", "mcp_version": MCP_VERSION, "command_name": COMMAND_NAME, "os": f"{platform.system()} {platform.release()}", "py": platform.python_version(), }, } _http("POST", "/xapi2/analytics/track", body=body, auth=False, timeout=4.0) except Exception: pass # ── Auto-update ──────────────────────────────────────────────────────── def _check_due() -> bool: blob = _read_json(_update_check_path()) last = blob.get("checked_at") if not isinstance(last, (int, float)): return True return (time.time() - float(last)) >= 86400 def _record_check(latest: Optional[str]) -> None: _write_json(_update_check_path(), {"checked_at": int(time.time()), "latest": latest}) def _looks_like_valid_server(blob: bytes) -> bool: if not blob or len(blob) < 1024: return False if not blob.startswith(b"#!"): return False for marker in (b"COMMAND_NAME", b"APP_SLUG", b"MCP_VERSION", b"def main"): if marker not in blob: return False try: compile(blob, "", "exec") except SyntaxError: return False return True def _self_replace(new_source: bytes) -> None: target = Path(__file__).resolve() import tempfile as _tf fd, tmp_name = _tf.mkstemp(prefix=COMMAND_NAME + ".", dir=str(target.parent)) try: with os.fdopen(fd, "wb") as fh: fh.write(new_source) os.chmod(tmp_name, 0o755) except OSError as e: try: os.unlink(tmp_name) except OSError: pass raise e aside = target.with_name(target.name + ".old") try: os.replace(tmp_name, target) return except OSError: pass try: if aside.exists(): aside.unlink() except OSError: pass try: os.replace(target, aside) os.replace(tmp_name, target) except OSError as e: try: os.unlink(tmp_name) except OSError: pass raise e def _maybe_autoupdate(force: bool = False) -> Optional[str]: if not force: if not _autoupdate_enabled(): return None if not _check_due(): return None try: info = _http("GET", "/xapi2/mcp/version", auth=False, timeout=5.0) except ApiError: _record_check(None) return None if not isinstance(info, dict): _record_check(None) return None latest = info.get("version") _record_check(latest if isinstance(latest, str) else None) if not isinstance(latest, str) or latest == MCP_VERSION: return None try: url = _base_url().rstrip("/") + "/xapi2/mcp/script" req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT}) with urllib.request.urlopen(req, timeout=10.0) as resp: new_source = resp.read() except Exception: return None if not _looks_like_valid_server(new_source): return None try: _self_replace(new_source) except OSError: return None return latest # ── Tool catalogue ───────────────────────────────────────────────────── # Tools map 1:1 to data-model CRUD ops. A model with ops # {list, read, create, update, delete} produces five tools: # _list, _get, _create, _update, # _delete. The server only exposes data-model tools - no user # creation, no token mint/revoke, no billing, no integrations surface. # Use the web UI for anything outside of data. _OP_VERB = { "list": "list", "read": "get", "create": "create", "update": "update", "delete": "delete", } def _friendly_filter_map(type_name: str) -> Dict[str, str]: """{friendly: wire}. Wire keys live in ``allowed_filters``; the friendly form drops the ``data__`` prefix and converts ``__`` to ``.``.""" out: Dict[str, str] = {} cfg = MODELS.get(type_name) or {} for wire in (cfg.get("allowed_filters") or []): if not isinstance(wire, str): continue if wire.startswith("data__"): friendly = wire[len("data__"):].replace("__", ".") else: friendly = wire out.setdefault(friendly, wire) return out def _resolve_filter_key(type_name: str, key: str) -> str: if key.startswith("data__") or key in TOP_LEVEL_COLUMNS: return key return _friendly_filter_map(type_name).get(key, key) def _field_to_json_schema(field: Dict[str, Any]) -> Dict[str, Any]: """Translate one declared field schema entry into the JSON-Schema fragment MCP hosts expect under ``inputSchema.properties``. Keeps the surface compact: enums + types + length caps.""" out: Dict[str, Any] = {} ftype = (field.get("type") or "").lower() values = field.get("values") if isinstance(values, list) and values: out["enum"] = list(values) out["type"] = "string" return out if ftype in ("string", "url"): out["type"] = "string" elif ftype in ("integer",): out["type"] = "integer" elif ftype in ("number",): out["type"] = "number" elif ftype in ("bool", "boolean"): out["type"] = "boolean" elif ftype in ("dict", "object"): out["type"] = "object" elif ftype in ("tags",): out["type"] = "array" out["items"] = {"type": "string"} else: out["type"] = "string" if isinstance(field.get("max_len"), int): out["maxLength"] = field["max_len"] if field.get("ref"): ref = field["ref"] out["description"] = f"Reference to a {ref.get('type')} object id." return out def _build_create_schema(type_name: str) -> Dict[str, Any]: cfg = MODELS.get(type_name) or {} create_fields = cfg.get("create_fields") or [] fields_meta = {f.get("name"): f for f in (cfg.get("fields") or []) if isinstance(f, dict)} properties: Dict[str, Any] = {} required: List[str] = [] for name in create_fields: meta = fields_meta.get(name) or {} properties[name] = _field_to_json_schema(meta) # Required-by-schema: the BE marks via field_schemas; surfacing # *all* create_fields as optional is the safe default - the BE # rejects with 400 when a true required is omitted, and the host # surfaces that to the user. Conservative on the client side. return { "type": "object", "properties": properties, "additionalProperties": True, **({"required": required} if required else {}), } def _build_update_schema(type_name: str) -> Dict[str, Any]: cfg = MODELS.get(type_name) or {} update_fields = cfg.get("update_fields") or [] fields_meta = {f.get("name"): f for f in (cfg.get("fields") or []) if isinstance(f, dict)} properties: Dict[str, Any] = { "id": {"type": "string", "description": "Object id (uuid)."}, } for name in update_fields: meta = fields_meta.get(name) or {} properties[name] = _field_to_json_schema(meta) return { "type": "object", "properties": properties, "required": ["id"], "additionalProperties": True, } def _build_list_schema(type_name: str) -> Dict[str, Any]: cfg = MODELS.get(type_name) or {} max_limit = cfg.get("max_limit") or 100 friendly = sorted(_friendly_filter_map(type_name).keys()) sorts = list(cfg.get("allowed_sorts") or []) return { "type": "object", "properties": { "limit": { "type": "integer", "minimum": 1, "maximum": int(max_limit), "description": f"Max rows to return (server cap: {max_limit}).", }, "offset": { "type": "integer", "minimum": 0, "description": "Skip this many rows. Prefer cursor-style paging via offset.", }, "sort": { "type": "string", "description": ( "Sort key. Allowed: " + ", ".join(sorts) + ". " "Prefix with '-' for descending." ) if sorts else "Sort key. Prefix with '-' for descending.", }, "q": { "type": "string", "description": "Free-text search term (matches any indexed text field).", }, "filter": { "type": "object", "description": ( "Filter map. Keys: " + ", ".join(friendly) + ". " "Values are exact matches; arrays produce IN clauses." ) if friendly else "Filter map. Keys are friendly field names.", "additionalProperties": True, }, "all": { "type": "boolean", "description": "Walk every page until exhausted (caps at 5000 rows).", }, }, "additionalProperties": False, } def _build_get_schema(_type_name: str) -> Dict[str, Any]: return { "type": "object", "properties": {"id": {"type": "string", "description": "Object id (uuid)."}}, "required": ["id"], "additionalProperties": False, } def _build_delete_schema(_type_name: str) -> Dict[str, Any]: return { "type": "object", "properties": {"id": {"type": "string", "description": "Object id (uuid)."}}, "required": ["id"], "additionalProperties": False, } def _build_tool(type_name: str, op: str) -> Optional[Dict[str, Any]]: cfg = MODELS.get(type_name) or {} if op not in (cfg.get("ops") or []): return None verb = _OP_VERB.get(op, op) name = f"{type_name}_{verb}" if op == "list": schema = _build_list_schema(type_name) title = f"List {type_name} objects" desc = ( f"List {type_name} objects with optional filter, sort, and pagination. " f"Returns {{data: [...], meta: {{count, has_more}} }}. " f"Read-only; safe to call repeatedly." ) elif op == "read": schema = _build_get_schema(type_name) title = f"Get one {type_name}" desc = f"Fetch a single {type_name} by id. Read-only; safe to call repeatedly." elif op == "create": schema = _build_create_schema(type_name) title = f"Create a {type_name}" desc = ( f"Create a new {type_name} object. Pass field values directly as the " f"top-level keys of the arguments object. Returns the created row. " f"Field validation runs server-side; invalid values are rejected with 400." ) elif op == "update": schema = _build_update_schema(type_name) title = f"Update a {type_name}" desc = ( f"Update a {type_name} by id. Only the fields you pass are touched; " f"everything else is preserved. Returns the updated row." ) elif op == "delete": schema = _build_delete_schema(type_name) title = f"Delete a {type_name}" desc = ( f"Delete a {type_name} by id. The object is removed from list/read " f"immediately. Confirm with the user before calling." ) else: return None return { "name": name, "title": title, "description": desc, "inputSchema": schema, "_app_op": op, "_app_type": type_name, "_destructive": op in ("delete",), "_readonly": op in ("list", "read"), } def _build_tool_catalogue() -> List[Dict[str, Any]]: tools: List[Dict[str, Any]] = [] for type_name in MODELS.keys(): for op in ("list", "read", "create", "update", "delete"): t = _build_tool(type_name, op) if t is not None: tools.append(t) # whoami-style probe: useful for the AI to confirm auth before doing # anything else. Strictly read-only against /xapi2/auth/me. tools.append({ "name": "whoami", "title": "Current signed-in user", "description": ( "Return the user the configured token belongs to (id, email, role). " "Use this once at the start of a session to confirm auth is set up." ), "inputSchema": {"type": "object", "properties": {}, "additionalProperties": False}, "_app_op": "_whoami", "_app_type": None, "_destructive": False, "_readonly": True, }) return tools _TOOLS = _build_tool_catalogue() _TOOLS_BY_NAME: Dict[str, Dict[str, Any]] = {t["name"]: t for t in _TOOLS} # ── Tool dispatch ────────────────────────────────────────────────────── _LIST_HARD_CAP = 5000 def _normalise_filter(type_name: str, raw: Any) -> Dict[str, Any]: if not isinstance(raw, dict): return {} out: Dict[str, Any] = {} for k, v in raw.items(): wire = _resolve_filter_key(type_name, str(k)) out[wire] = v if isinstance(v, list) else v return out def _dispatch_tool(name: str, args: Dict[str, Any]) -> Any: tool = _TOOLS_BY_NAME.get(name) if not tool: raise ApiError(404, f"unknown tool: {name}") op = tool["_app_op"] type_name = tool["_app_type"] args = args if isinstance(args, dict) else {} if op == "_whoami": me = _http("GET", "/xapi2/auth/me") _emit_event("mcp.whoami") return me if op == "list": params: Dict[str, Any] = {} if isinstance(args.get("limit"), int): params["limit"] = args["limit"] if isinstance(args.get("offset"), int): params["offset"] = args["offset"] if isinstance(args.get("sort"), str): params["sort"] = args["sort"] if isinstance(args.get("q"), str): params["q"] = args["q"] params.update(_normalise_filter(type_name, args.get("filter"))) path = f"/xapi2/data/{type_name}" if not args.get("all"): resp = _http("GET", path, params=params) _emit_event("mcp.crud", model=type_name, op="list") return resp # --all mode: walk pages until empty / hard cap. merged: List[Any] = [] cursor: Optional[str] = None pages = 0 page_meta: Dict[str, Any] = {} while True: if cursor: params["after"] = cursor resp = _http("GET", path, params=params) pages += 1 if not isinstance(resp, dict): break data = resp.get("data") or [] if isinstance(data, list): merged.extend(data) meta = resp.get("meta") or {} page_meta = meta if isinstance(meta, dict) else {} if not page_meta.get("has_more") or not data: break last = data[-1] if isinstance(data, list) and data else None cursor = (last or {}).get("id") if isinstance(last, dict) else None if not cursor: break if len(merged) >= _LIST_HARD_CAP: break _emit_event("mcp.crud", model=type_name, op="list_all", pages=pages) return { "data": merged, "meta": {**page_meta, "count": len(merged), "pages": pages, "has_more": False}, } if op == "read": oid = args.get("id") if not isinstance(oid, str) or not oid: raise ApiError(400, "id is required") resp = _http("GET", f"/xapi2/data/{type_name}/{urllib.parse.quote(oid)}") _emit_event("mcp.crud", model=type_name, op="read") return resp if op == "create": body = {k: v for k, v in args.items() if v is not None} resp = _http("POST", f"/xapi2/data/{type_name}", body=body) _emit_event("mcp.crud", model=type_name, op="create") return resp if op == "update": oid = args.get("id") if not isinstance(oid, str) or not oid: raise ApiError(400, "id is required") body = {k: v for k, v in args.items() if k != "id" and v is not None} resp = _http("PATCH", f"/xapi2/data/{type_name}/{urllib.parse.quote(oid)}", body=body) _emit_event("mcp.crud", model=type_name, op="update") return resp if op == "delete": oid = args.get("id") if not isinstance(oid, str) or not oid: raise ApiError(400, "id is required") resp = _http("DELETE", f"/xapi2/data/{type_name}/{urllib.parse.quote(oid)}") _emit_event("mcp.crud", model=type_name, op="delete") return resp if resp is not None else {"ok": True} raise ApiError(500, f"unsupported op: {op}") # ── JSON-RPC 2.0 over stdio ──────────────────────────────────────────── # MCP framing is line-delimited JSON-RPC 2.0: one request per line on # stdin, one response per line on stdout. Anything we want the host to # show its user goes via the `notifications/message` log channel. _STDIN_LOCK = threading.Lock() # only one reader thread, but lock for clarity _STDOUT_LOCK = threading.Lock() def _write_message(msg: Dict[str, Any]) -> None: line = json.dumps(msg, ensure_ascii=False, separators=(",", ":")) with _STDOUT_LOCK: sys.stdout.write(line + "\n") sys.stdout.flush() def _log(level: str, message: str) -> None: """Emit an MCP `notifications/message` log entry. Hosts surface these in their UI so the user sees what the server is doing.""" try: _write_message({ "jsonrpc": "2.0", "method": "notifications/message", "params": {"level": level, "logger": COMMAND_NAME, "data": message}, }) except Exception: pass def _resp_ok(req_id: Any, result: Any) -> Dict[str, Any]: return {"jsonrpc": "2.0", "id": req_id, "result": result} def _resp_err(req_id: Any, code: int, message: str, data: Any = None) -> Dict[str, Any]: err: Dict[str, Any] = {"code": code, "message": message} if data is not None: err["data"] = data return {"jsonrpc": "2.0", "id": req_id, "error": err} # JSON-RPC error codes. -32601 method not found, -32602 invalid params, # -32603 internal, -32000..-32099 reserved for server-defined errors. _RPC_PARSE_ERROR = -32700 _RPC_INVALID_REQUEST = -32600 _RPC_METHOD_NOT_FOUND = -32601 _RPC_INVALID_PARAMS = -32602 _RPC_INTERNAL = -32603 def _handle_initialize(params: Dict[str, Any]) -> Dict[str, Any]: # The host tells us which protocol version it speaks; we echo back # what we support. MCP says: pick the lower of the two. client_proto = params.get("protocolVersion") if isinstance(params, dict) else None proto = MCP_PROTOCOL_VERSION if isinstance(client_proto, str) and client_proto: proto = client_proto if client_proto <= MCP_PROTOCOL_VERSION else MCP_PROTOCOL_VERSION return { "protocolVersion": proto, "serverInfo": { "name": COMMAND_NAME, "title": f"Production Board (MCP)", "version": MCP_VERSION, }, "capabilities": { "tools": {"listChanged": False}, "logging": {}, }, "instructions": ( f"Tools in this server map to Production Board data models. " f"Read-only tools (`*_list`, `*_get`) are safe to call freely; " f"write tools (`*_create`, `*_update`, `*_delete`) mutate data and " f"should be confirmed with the user. The server cannot create users, " f"manage tokens, or change billing - those happen in the Production Board web UI." ), } def _public_tool(t: Dict[str, Any]) -> Dict[str, Any]: """Strip our private bookkeeping keys before sending to the host.""" return { "name": t["name"], "title": t["title"], "description": t["description"], "inputSchema": t["inputSchema"], "annotations": { "readOnlyHint": bool(t.get("_readonly")), "destructiveHint": bool(t.get("_destructive")), "idempotentHint": bool(t.get("_readonly")), "openWorldHint": True, }, } def _handle_tools_list(_params: Dict[str, Any]) -> Dict[str, Any]: return {"tools": [_public_tool(t) for t in _TOOLS]} def _format_text_content(value: Any) -> str: if isinstance(value, str): return value try: return json.dumps(value, ensure_ascii=False, indent=2) except (TypeError, ValueError): return str(value) def _handle_tools_call(params: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(params, dict): raise _MCPError(_RPC_INVALID_PARAMS, "params must be an object") name = params.get("name") args = params.get("arguments") or {} if not isinstance(name, str) or not name: raise _MCPError(_RPC_INVALID_PARAMS, "name is required") if name not in _TOOLS_BY_NAME: raise _MCPError(_RPC_METHOD_NOT_FOUND, f"unknown tool: {name}") try: result = _dispatch_tool(name, args if isinstance(args, dict) else {}) except ApiError as e: # Surface as a tool-call error (isError=True) rather than a # JSON-RPC error. The host treats the difference: JSON-RPC # errors mean "the framing was wrong"; tool errors mean "the # tool ran and reported a failure" - so the AI can reason # about it. return { "isError": True, "content": [{ "type": "text", "text": _format_text_content({ "error": e.message, "http_status": e.status, }), }], } text = _format_text_content(result) return { "content": [{"type": "text", "text": text}], # Send the raw structure too - hosts that support # ``structuredContent`` (newer MCP clients) can render it # natively without parsing the text. "structuredContent": result if isinstance(result, (dict, list)) else {"value": result}, "isError": False, } class _MCPError(Exception): def __init__(self, code: int, message: str, data: Any = None): super().__init__(message) self.code = code self.data = data _HANDLERS = { "initialize": _handle_initialize, "tools/list": _handle_tools_list, "tools/call": _handle_tools_call, # A few well-known reads we deliberately respond as empty so hosts # that probe them don't see a method-not-found error. "resources/list": lambda _p: {"resources": []}, "prompts/list": lambda _p: {"prompts": []}, "ping": lambda _p: {}, } def _handle_message(raw: str) -> Optional[Dict[str, Any]]: try: msg = json.loads(raw) except json.JSONDecodeError as e: return _resp_err(None, _RPC_PARSE_ERROR, f"invalid json: {e}") if not isinstance(msg, dict): return _resp_err(None, _RPC_INVALID_REQUEST, "request must be an object") method = msg.get("method") req_id = msg.get("id") params = msg.get("params") or {} # Notifications (no id) get no response per JSON-RPC 2.0. is_notification = "id" not in msg if not isinstance(method, str): if is_notification: return None return _resp_err(req_id, _RPC_INVALID_REQUEST, "method must be a string") handler = _HANDLERS.get(method) if handler is None: if is_notification: # Common: notifications/initialized, notifications/cancelled. # Silently accept; we don't have lifecycle work tied to them. return None return _resp_err(req_id, _RPC_METHOD_NOT_FOUND, f"method not found: {method}") try: result = handler(params) except _MCPError as e: return _resp_err(req_id, e.code, str(e), e.data) except Exception as e: # Capture the traceback for the log channel; surface a terse # message in the JSON-RPC response so hosts have something to # show without the full stack. _log("error", f"unhandled exception in {method}: {e}\n{traceback.format_exc()}") return _resp_err(req_id, _RPC_INTERNAL, str(e)) if is_notification: return None return _resp_ok(req_id, result) def _serve_stdio() -> int: # MCP hosts launch us as a subprocess and pipe JSON-RPC over stdio. # Read line-delimited; respond line-delimited. Any unexpected stderr # output is fine - hosts capture it for diagnostics. if sys.platform != "win32": # Line-buffered stdout so hosts see responses as they happen. try: sys.stdout.reconfigure(line_buffering=True) # py3.7+ except Exception: pass _ensure_opener() _emit_event("mcp.serve_start") if not _auth_token(): _log( "warning", f"No token configured. Set PRODMCP_TOKEN=pat_... in your MCP host " f"config, or run `prodmcp login --token pat_...` once. Tool calls " f"will return 401 until then.", ) # Background autoupdate is fire-and-forget so the handshake doesn't # block on a slow upstream. We do NOT replace the running process # mid-session - the new bytes activate on the next host start. if _autoupdate_enabled(): threading.Thread(target=_maybe_autoupdate, kwargs={"force": False}, daemon=True).start() reader = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", newline="") while True: try: line = reader.readline() except (KeyboardInterrupt, OSError): break if line == "": # EOF - host closed stdin (normal shutdown). break line = line.strip() if not line: continue resp = _handle_message(line) if resp is not None: _write_message(resp) _emit_event("mcp.serve_stop") return 0 # ── CLI surface (login / logout / version / update / serve) ─────────── # The same script is also runnable from a shell for setup tasks. The # default verb is `serve`, so an MCP host running `` (no args) # gets a JSON-RPC server immediately. def _is_tty() -> bool: return sys.stdout.isatty() def _color(text: str, code: str) -> str: if not _is_tty(): return text return f"\033[{code}m{text}\033[0m" def _print_human(msg: str) -> None: # Anything we print on the human path lands on stderr so it can't # contaminate stdio when this script is wrapped by a future caller. print(msg, file=sys.stderr) def _validate_token_and_save(token: str, kind: str) -> Dict[str, Any]: _save_creds({"token": token, "kind": kind}) try: me = _http("GET", "/xapi2/auth/me") except ApiError as e: _clear_creds() _print_human(f"login failed: {e.message}") sys.exit(1) _save_creds({ "token": token, "kind": kind, "user_id": me.get("id") if isinstance(me, dict) else None, "email": me.get("email") if isinstance(me, dict) else None, "saved_at": int(time.time()), }) return me if isinstance(me, dict) else {} def cmd_login(args: argparse.Namespace) -> int: token = args.token if not token: try: token = input("Paste your personal access token (pat_...): ").strip() or None except (EOFError, KeyboardInterrupt): print(); return 130 if not token: # Fall back to email + password. try: email = input("Email: ").strip() except (EOFError, KeyboardInterrupt): print(); return 130 if not email: _print_human("email is required") return 2 try: password = getpass.getpass("Password: ") except (EOFError, KeyboardInterrupt): print(); return 130 try: resp = _http("POST", "/xapi2/auth/login", body={"email": email, "password": password}, auth=False) except ApiError as e: _print_human(f"login failed: {e.message}") return 1 if not isinstance(resp, dict) or not resp.get("access_token"): _print_human("login failed: unexpected response") return 1 user = resp.get("user") or {} _save_creds({ "token": resp["access_token"], "kind": "session", "user_id": user.get("id") if isinstance(user, dict) else None, "email": user.get("email") if isinstance(user, dict) else email, "saved_at": int(time.time()), }) _print_human(_color("ok", "32") + f" signed in as {(user or {}).get('email') or email}") return 0 kind = "pat" if token.startswith("pat_") else "session" me = _validate_token_and_save(token, kind=kind) _print_human(_color("ok", "32") + f" signed in as {me.get('email') or me.get('id') or '?'}") return 0 def cmd_logout(_args: argparse.Namespace) -> int: if not _load_creds().get("token"): _print_human("not signed in") return 0 _clear_creds() _print_human(_color("ok", "32") + " signed out") return 0 def cmd_whoami(_args: argparse.Namespace) -> int: if not _auth_token(): _print_human("not signed in") return 3 try: me = _http("GET", "/xapi2/auth/me") except ApiError as e: _print_human(f"error: {e.message}") return 1 print(json.dumps(me, indent=2, ensure_ascii=False)) return 0 def cmd_version(_args: argparse.Namespace) -> int: print(f"prodmcp 1.0.2") print(f"server: {_base_url()}") print(f"app: Production Board (prod)") print(f"models: {len(MODELS)}") print(f"tools: {len(_TOOLS)}") return 0 def cmd_tools(_args: argparse.Namespace) -> int: """Print the tool catalogue for inspection. Useful for debugging an MCP host config without launching the server.""" print(json.dumps([_public_tool(t) for t in _TOOLS], indent=2, ensure_ascii=False)) return 0 def cmd_update(_args: argparse.Namespace) -> int: new_version = _maybe_autoupdate(force=True) if not new_version: _print_human("already up to date") return 0 _print_human(_color("ok", "32") + f" updated to {new_version}") return 0 def cmd_serve(_args: argparse.Namespace) -> int: return _serve_stdio() def cmd_doctor(_args: argparse.Namespace) -> int: """Quick health check: server reachable, token valid, tool surface rendered. Helpful when an MCP host says 'server failed to start' and you want to know whether it's the token, the network, or the host.""" print(f"prodmcp 1.0.2") print(f"server: {_base_url()}") tok = _auth_token() print(f"token: {'set' if tok else 'NOT SET'} ({'env' if _env('TOKEN') else 'file' if tok else '-'})") try: info = _http("GET", "/xapi2/mcp/version", auth=False, timeout=5.0) print(f"upstream: ok (version {info.get('version') if isinstance(info, dict) else '?'})") except ApiError as e: print(f"upstream: FAIL - {e.message}") return 1 if tok: try: me = _http("GET", "/xapi2/auth/me") who = me.get("email") if isinstance(me, dict) else None print(f"identity: {who or '?'}") except ApiError as e: print(f"identity: FAIL - {e.message}") return 1 else: print("identity: skipped (no token)") print(f"tools: {len(_TOOLS)}") for t in _TOOLS[:5]: print(f" - {t['name']}") if len(_TOOLS) > 5: print(f" ...and {len(_TOOLS) - 5} more") return 0 def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser( prog=COMMAND_NAME, description=f"Production Board MCP server. Default action is `serve` (stdio JSON-RPC).", ) sub = parser.add_subparsers(dest="cmd") p_serve = sub.add_parser("serve", help="run the MCP server over stdio (default)") p_serve.set_defaults(func=cmd_serve) p_login = sub.add_parser("login", help="store a token for the MCP host to use") p_login.add_argument("--token", help="personal access token (pat_...)") p_login.set_defaults(func=cmd_login) p_logout = sub.add_parser("logout", help="remove the saved token") p_logout.set_defaults(func=cmd_logout) p_who = sub.add_parser("whoami", help="print the user the saved token belongs to") p_who.set_defaults(func=cmd_whoami) p_ver = sub.add_parser("version", help="print version + base URL") p_ver.set_defaults(func=cmd_version) p_tools = sub.add_parser("tools", help="print the tool catalogue (json)") p_tools.set_defaults(func=cmd_tools) p_doctor = sub.add_parser("doctor", help="run a health check: server, token, tools") p_doctor.set_defaults(func=cmd_doctor) p_update = sub.add_parser("update", help="check for a newer version and replace this script") p_update.set_defaults(func=cmd_update) args = parser.parse_args(argv) func = getattr(args, "func", cmd_serve) return int(func(args) or 0) if __name__ == "__main__": try: sys.exit(main()) except KeyboardInterrupt: sys.exit(130)