#------------------------------------------------------------------ # Proxy Script Generated by Delphi Microservices Container 4.3.20 # Do not modify this code! # Generated at: 2025-04-26 18.03.46 #------------------------------------------------------------------ import requests import urllib3 urllib3.disable_warnings() class EmailRPCException(Exception): pass class EmailRPCProxy: def __init__(self, base_url): self.__id = 1 self.base_url = base_url self.headers = { "content-type": "application/json", "accept": "application/json" } def __get_next_id(self): self.__id += 1 return self.__id def __execute(self, req): isnotif = req.get("id") is None res = requests.post(self.base_url, json=req, headers=self.headers, verify=False) if res.status_code == 204: # no content if isnotif: return None else: raise EmailRPCException(0,"Protocol error for Notification") if res.headers["content-type"].lower().find("application/json") != 0: raise EmailRPCException("Invalid ContentType in response: " + res.headers["content-type"]) jres = res.json() if "error" in jres: raise EmailRPCException(jres.get("error").get("message")) if not "result" in jres: raise EmailRPCException(0,"Protocol error in JSON-RPC response - no result nor error node exists") return jres["result"] def __new_req(self, method, typ): req = dict(jsonrpc="2.0", method=method, params=[]) if typ == "request": req["id"] = self.__get_next_id() return req # end of library code # Following methods are automatically generated def set_user_sender(self, token, userid, asender) -> None: """ Invokes [procedure SetUserSender(const Token: string; const UserID: Int64; aSender: TJsonObject)] """ req = self.__new_req("SetUserSender", "notification") req["params"].append(token) req["params"].append(userid) req["params"].append(asender) result = self.__execute(req) return result def remove_user_sender(self, token, userid): """ Invokes [function RemoveUserSender(const Token: string; const UserID: Int64): TJsonObject] """ req = self.__new_req("RemoveUserSender", "request") req["params"].append(token) req["params"].append(userid) result = self.__execute(req) return result def get_user_sender(self, token, userid): """ Invokes [function GetUserSender(const Token: string; const UserID: Int64): TSender] """ req = self.__new_req("GetUserSender", "request") req["params"].append(token) req["params"].append(userid) result = self.__execute(req) return result def get_messages_by_user_name(self, token, senderusername): """ Invokes [function GetMessagesByUserName(const Token: string; const SenderUserName: string): TObjectList] Returns the messages sent by SenderUserName """ req = self.__new_req("GetMessagesByUserName", "request") req["params"].append(token) req["params"].append(senderusername) result = self.__execute(req) return result def get_my_messages(self, token, statuslist): """ Invokes [function GetMyMessages(const Token: string; const StatusList: TJsonArray): TObjectList] Returns the messages owned by the logged user """ req = self.__new_req("GetMyMessages", "request") req["params"].append(token) req["params"].append(statuslist) result = self.__execute(req) return result def get_message_by_id(self, token, messageid): """ Invokes [function GetMessageByID(const Token: string; const MessageID: Int64): TMessage] Returns the message with id MessageID """ req = self.__new_req("GetMessageByID", "request") req["params"].append(token) req["params"].append(messageid) result = self.__execute(req) return result def get_messages_by_status(self, token, statuslist): """ Invokes [function GetMessagesByStatus(const Token: string; const StatusList: TJsonArray): TObjectList] Returns the messages with the status contained in the StatusList array """ req = self.__new_req("GetMessagesByStatus", "request") req["params"].append(token) req["params"].append(statuslist) result = self.__execute(req) return result def get_messages_by_rql(self, token, filter): """ Invokes [function GetMessagesByRQL(const Token: string; const Filter: TJsonObject): TJsonObject] Returns all messages (up to 6 months) which match the filter.rql property """ req = self.__new_req("GetMessagesByRQL", "request") req["params"].append(token) req["params"].append(filter) result = self.__execute(req) return result def send_message(self, token, amessage): """ Invokes [function SendMessage(const Token: string; const aMessage: TJsonObject): TJsonObject] Send a message using information stored in aMessage """ req = self.__new_req("SendMessage", "request") req["params"].append(token) req["params"].append(amessage) result = self.__execute(req) return result def delete_message_by_id(self, token, messageid): """ Invokes [function DeleteMessageByID(const Token: string; const MessageID: Int64): TJsonObject] """ req = self.__new_req("DeleteMessageByID", "request") req["params"].append(token) req["params"].append(messageid) result = self.__execute(req) return result def admin_send_message(self, token, senderusername, amessage): """ Invokes [function AdminSendMessage(const Token: string; const SenderUserName: string; const aMessage: TJsonObject): TJsonObject] """ req = self.__new_req("AdminSendMessage", "request") req["params"].append(token) req["params"].append(senderusername) req["params"].append(amessage) result = self.__execute(req) return result def admin_send_test_message(self, token, senderusername, recipient): """ Invokes [function AdminSendTestMessage(const Token: string; const SenderUserName: string; const Recipient: string): TJsonObject] """ req = self.__new_req("AdminSendTestMessage", "request") req["params"].append(token) req["params"].append(senderusername) req["params"].append(recipient) result = self.__execute(req) return result def create_message(self, token, amessage): """ Invokes [function CreateMessage(const Token: string; const aMessage: TJsonObject): TJsonObject] """ req = self.__new_req("CreateMessage", "request") req["params"].append(token) req["params"].append(amessage) result = self.__execute(req) return result def add_attachment_to_message(self, token, amessageid, aisrelated, aattachment): """ Invokes [function AddAttachmentToMessage(const Token: string; const aMessageID: Int64; const aIsRelated: Boolean; const aAttachment: TJsonObject): TJsonObject] """ req = self.__new_req("AddAttachmentToMessage", "request") req["params"].append(token) req["params"].append(amessageid) req["params"].append(aisrelated) req["params"].append(aattachment) result = self.__execute(req) return result def complete_message(self, token, amessageid): """ Invokes [function CompleteMessage(const Token: string; const aMessageID: Int64): TJsonObject] """ req = self.__new_req("CompleteMessage", "request") req["params"].append(token) req["params"].append(amessageid) result = self.__execute(req) return result def bulk_send_messages(self, token, ametamessage, abulkmessagesdata): """ Invokes [function BulkSendMessages(const Token: string; const aMetaMessage: TJsonObject; const aBulkMessagesData: TJsonObject): TJsonObject] """ req = self.__new_req("BulkSendMessages", "request") req["params"].append(token) req["params"].append(ametamessage) req["params"].append(abulkmessagesdata) result = self.__execute(req) return result def login(self, username, password): """ Invokes [function Login(const UserName: string; const Password: string): TJsonObject] Returns the JWT token which can be used to call all other methods. """ req = self.__new_req("Login", "request") req["params"].append(username) req["params"].append(password) result = self.__execute(req) return result def refresh_token(self, token): """ Invokes [function RefreshToken(const Token: string): TJsonObject] Extends the expiration time of a still-valid token. Clients must use the token returned instead of the previous one. """ req = self.__new_req("RefreshToken", "request") req["params"].append(token) result = self.__execute(req) return result # end of generated proxy