filebeat_client.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. import datetime
  7. import json
  8. import socket
  9. class FilebeatExn(Exception):
  10. pass
  11. class FilebeatClient(object):
  12. def __init__(self, logger, host="127.0.0.1", port=9000, timeout=20):
  13. self._logger = logger.getChild("filebeat_client")
  14. self._filebeat_host = host
  15. self._filebeat_port = port
  16. self._socket_timeout = timeout
  17. self._socket = None
  18. self._open_socket()
  19. def send_event(self, payload, index, timestamp=None, pipeline="filebeat"):
  20. if timestamp is None:
  21. timestamp = datetime.datetime.utcnow().timestamp()
  22. event = {
  23. "index": index,
  24. "timestamp": timestamp,
  25. "pipeline": pipeline,
  26. "payload": json.dumps(payload)
  27. }
  28. # Serialise event, add new line and encode as UTF-8 before sending to Filebeat.
  29. data = json.dumps(event, sort_keys=True) + "\n"
  30. data = data.encode()
  31. self._logger.debug(f"-> {data}")
  32. self._send_data(data)
  33. def _open_socket(self):
  34. self._logger.info(f"Connecting to Filebeat on {self._filebeat_host}:{self._filebeat_port}")
  35. self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  36. self._socket.settimeout(self._socket_timeout)
  37. try:
  38. self._socket.connect((self._filebeat_host, self._filebeat_port))
  39. except (ConnectionError, socket.timeout):
  40. raise FilebeatExn("Failed to connect to Filebeat") from None
  41. def _send_data(self, data):
  42. total_sent = 0
  43. while total_sent < len(data):
  44. try:
  45. sent = self._socket.send(data[total_sent:])
  46. except BrokenPipeError:
  47. self._logger.debug("Filebeat socket closed by peer")
  48. self._socket.close()
  49. self._open_socket()
  50. total_sent = 0
  51. else:
  52. total_sent = total_sent + sent