Files
netbox/netbox/core/views.py
Arthur Hanson 1d6987bca0 14731 plugins catalog (#16763)
* 14731 plugin catalog

* 14731 detal page

* 14731 plugin table

* 14731 cleanup

* 14731 cache API results

* 14731 fix install name

* 14731 filtering

* 14731 filtering

* 14731 fix detail view

* 14731 fix detail view

* 14731 sort / status

* 14731 sort / status

* 14731 cleanup detail view

* 14731 htmx plugin list

* 14731 align quicksearch

* 14731 remove pytz

* 14731 change to table

* 14731 change to table

* 14731 remove status from table

* 14731 quick search

* 14731 cleanup

* 14731 cleanup

* Employ datetime_from_timestamp() to parse timestamps

* 14731 review changes

* 14731 move to plugins.py file

* 14731 use dataclasses

* 14731 review changes

* Tweak table columns

* Use is_staff (for now) to evaluate user permission for plugin views

* Use table for ordering

* 7025 change to api fields

* 14731 tweaks

* Remove filtering for is_netboxlabs_supported

* Misc cleanup

* Update logic for determining whether to display plugin installation instructions

* 14731 review changes

* 14731 review changes

* 14731 review changes

* 14731 add user agent string, proxy settings

* Clean up templates

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
2024-07-25 14:58:48 -04:00

692 lines
22 KiB
Python

import json
import platform
from django import __version__ as DJANGO_VERSION
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.mixins import UserPassesTestMixin
from django.core.cache import cache
from django.db import connection, ProgrammingError
from django.http import HttpResponse, HttpResponseForbidden, Http404
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from django.views.generic import View
from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection
from django_rq.settings import QUEUES_MAP, QUEUES_LIST
from django_rq.utils import get_jobs, get_statistics, stop_jobs
from rq import requeue_job
from rq.exceptions import NoSuchJobError
from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
from rq.registry import (
DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, StartedJobRegistry,
)
from rq.worker import Worker
from rq.worker_registration import clean_worker_registry
from netbox.config import get_config, PARAMS
from netbox.views import generic
from netbox.views.generic.base import BaseObjectView
from netbox.views.generic.mixins import TableMixin
from utilities.data import shallow_compare_dict
from utilities.forms import ConfirmationForm
from utilities.htmx import htmx_partial
from utilities.query import count_related
from utilities.views import ContentTypePermissionRequiredMixin, GetRelatedModelsMixin, register_model_view
from . import filtersets, forms, tables
from .models import *
from .plugins import get_plugins
from .tables import CatalogPluginTable, PluginVersionTable
#
# Data sources
#
class DataSourceListView(generic.ObjectListView):
queryset = DataSource.objects.annotate(
file_count=count_related(DataFile, 'source')
)
filterset = filtersets.DataSourceFilterSet
filterset_form = forms.DataSourceFilterForm
table = tables.DataSourceTable
@register_model_view(DataSource)
class DataSourceView(GetRelatedModelsMixin, generic.ObjectView):
queryset = DataSource.objects.all()
def get_extra_context(self, request, instance):
return {
'related_models': self.get_related_models(request, instance),
}
@register_model_view(DataSource, 'sync')
class DataSourceSyncView(BaseObjectView):
queryset = DataSource.objects.all()
def get_required_permission(self):
return 'core.sync_datasource'
def get(self, request, pk):
# Redirect GET requests to the object view
datasource = get_object_or_404(self.queryset, pk=pk)
return redirect(datasource.get_absolute_url())
def post(self, request, pk):
datasource = get_object_or_404(self.queryset, pk=pk)
job = datasource.enqueue_sync_job(request)
messages.success(request, f"Queued job #{job.pk} to sync {datasource}")
return redirect(datasource.get_absolute_url())
@register_model_view(DataSource, 'edit')
class DataSourceEditView(generic.ObjectEditView):
queryset = DataSource.objects.all()
form = forms.DataSourceForm
@register_model_view(DataSource, 'delete')
class DataSourceDeleteView(generic.ObjectDeleteView):
queryset = DataSource.objects.all()
class DataSourceBulkImportView(generic.BulkImportView):
queryset = DataSource.objects.all()
model_form = forms.DataSourceImportForm
class DataSourceBulkEditView(generic.BulkEditView):
queryset = DataSource.objects.annotate(
count_files=count_related(DataFile, 'source')
)
filterset = filtersets.DataSourceFilterSet
table = tables.DataSourceTable
form = forms.DataSourceBulkEditForm
class DataSourceBulkDeleteView(generic.BulkDeleteView):
queryset = DataSource.objects.annotate(
count_files=count_related(DataFile, 'source')
)
filterset = filtersets.DataSourceFilterSet
table = tables.DataSourceTable
#
# Data files
#
class DataFileListView(generic.ObjectListView):
queryset = DataFile.objects.defer('data')
filterset = filtersets.DataFileFilterSet
filterset_form = forms.DataFileFilterForm
table = tables.DataFileTable
actions = {
'bulk_delete': {'delete'},
}
@register_model_view(DataFile)
class DataFileView(generic.ObjectView):
queryset = DataFile.objects.all()
@register_model_view(DataFile, 'delete')
class DataFileDeleteView(generic.ObjectDeleteView):
queryset = DataFile.objects.all()
class DataFileBulkDeleteView(generic.BulkDeleteView):
queryset = DataFile.objects.defer('data')
filterset = filtersets.DataFileFilterSet
table = tables.DataFileTable
#
# Jobs
#
class JobListView(generic.ObjectListView):
queryset = Job.objects.all()
filterset = filtersets.JobFilterSet
filterset_form = forms.JobFilterForm
table = tables.JobTable
actions = {
'export': {'view'},
'bulk_delete': {'delete'},
}
class JobView(generic.ObjectView):
queryset = Job.objects.all()
class JobDeleteView(generic.ObjectDeleteView):
queryset = Job.objects.all()
class JobBulkDeleteView(generic.BulkDeleteView):
queryset = Job.objects.all()
filterset = filtersets.JobFilterSet
table = tables.JobTable
#
# Change logging
#
class ObjectChangeListView(generic.ObjectListView):
queryset = ObjectChange.objects.valid_models()
filterset = filtersets.ObjectChangeFilterSet
filterset_form = forms.ObjectChangeFilterForm
table = tables.ObjectChangeTable
template_name = 'core/objectchange_list.html'
actions = {
'export': {'view'},
}
@register_model_view(ObjectChange)
class ObjectChangeView(generic.ObjectView):
queryset = ObjectChange.objects.valid_models()
def get_extra_context(self, request, instance):
related_changes = ObjectChange.objects.valid_models().restrict(request.user, 'view').filter(
request_id=instance.request_id
).exclude(
pk=instance.pk
)
related_changes_table = tables.ObjectChangeTable(
data=related_changes[:50],
orderable=False
)
objectchanges = ObjectChange.objects.valid_models().restrict(request.user, 'view').filter(
changed_object_type=instance.changed_object_type,
changed_object_id=instance.changed_object_id,
)
next_change = objectchanges.filter(time__gt=instance.time).order_by('time').first()
prev_change = objectchanges.filter(time__lt=instance.time).order_by('-time').first()
if not instance.prechange_data and instance.action in ['update', 'delete'] and prev_change:
non_atomic_change = True
prechange_data = prev_change.postchange_data_clean
else:
non_atomic_change = False
prechange_data = instance.prechange_data_clean
if prechange_data and instance.postchange_data:
diff_added = shallow_compare_dict(
prechange_data or dict(),
instance.postchange_data_clean or dict(),
exclude=['last_updated'],
)
diff_removed = {
x: prechange_data.get(x) for x in diff_added
} if prechange_data else {}
else:
diff_added = None
diff_removed = None
return {
'diff_added': diff_added,
'diff_removed': diff_removed,
'next_change': next_change,
'prev_change': prev_change,
'related_changes_table': related_changes_table,
'related_changes_count': related_changes.count(),
'non_atomic_change': non_atomic_change
}
#
# Config Revisions
#
class ConfigRevisionListView(generic.ObjectListView):
queryset = ConfigRevision.objects.all()
filterset = filtersets.ConfigRevisionFilterSet
filterset_form = forms.ConfigRevisionFilterForm
table = tables.ConfigRevisionTable
@register_model_view(ConfigRevision)
class ConfigRevisionView(generic.ObjectView):
queryset = ConfigRevision.objects.all()
class ConfigRevisionEditView(generic.ObjectEditView):
queryset = ConfigRevision.objects.all()
form = forms.ConfigRevisionForm
@register_model_view(ConfigRevision, 'delete')
class ConfigRevisionDeleteView(generic.ObjectDeleteView):
queryset = ConfigRevision.objects.all()
class ConfigRevisionBulkDeleteView(generic.BulkDeleteView):
queryset = ConfigRevision.objects.all()
filterset = filtersets.ConfigRevisionFilterSet
table = tables.ConfigRevisionTable
class ConfigRevisionRestoreView(ContentTypePermissionRequiredMixin, View):
def get_required_permission(self):
return 'core.configrevision_edit'
def get(self, request, pk):
candidate_config = get_object_or_404(ConfigRevision, pk=pk)
# Get the current ConfigRevision
config_version = get_config().version
current_config = ConfigRevision.objects.filter(pk=config_version).first()
params = []
for param in PARAMS:
params.append((
param.name,
current_config.data.get(param.name, None) if current_config else None,
candidate_config.data.get(param.name, None)
))
return render(request, 'core/configrevision_restore.html', {
'object': candidate_config,
'params': params,
})
def post(self, request, pk):
if not request.user.has_perm('core.configrevision_edit'):
return HttpResponseForbidden()
candidate_config = get_object_or_404(ConfigRevision, pk=pk)
candidate_config.activate()
messages.success(request, f"Restored configuration revision #{pk}")
return redirect(candidate_config.get_absolute_url())
#
# Background Tasks (RQ)
#
class BaseRQView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
class BackgroundQueueListView(TableMixin, BaseRQView):
table = tables.BackgroundQueueTable
def get(self, request):
data = get_statistics(run_maintenance_tasks=True)["queues"]
table = self.get_table(data, request, bulk_actions=False)
return render(request, 'core/rq_queue_list.html', {
'table': table,
})
class BackgroundTaskListView(TableMixin, BaseRQView):
table = tables.BackgroundTaskTable
def get_table_data(self, request, queue, status):
jobs = []
# Call get_jobs() to returned queued tasks
if status == RQJobStatus.QUEUED:
return queue.get_jobs()
# For other statuses, determine the registry to list (or raise a 404 for invalid statuses)
try:
registry_cls = {
RQJobStatus.STARTED: StartedJobRegistry,
RQJobStatus.DEFERRED: DeferredJobRegistry,
RQJobStatus.FINISHED: FinishedJobRegistry,
RQJobStatus.FAILED: FailedJobRegistry,
RQJobStatus.SCHEDULED: ScheduledJobRegistry,
}[status]
except KeyError:
raise Http404
registry = registry_cls(queue.name, queue.connection)
job_ids = registry.get_job_ids()
if status != RQJobStatus.DEFERRED:
jobs = get_jobs(queue, job_ids, registry)
else:
# Deferred jobs require special handling
for job_id in job_ids:
try:
jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
except NoSuchJobError:
pass
if jobs and status == RQJobStatus.SCHEDULED:
for job in jobs:
job.scheduled_at = registry.get_scheduled_time(job)
return jobs
def get(self, request, queue_index, status):
queue = get_queue_by_index(queue_index)
data = self.get_table_data(request, queue, status)
table = self.get_table(data, request, False)
# If this is an HTMX request, return only the rendered table HTML
if htmx_partial(request):
return render(request, 'htmx/table.html', {
'table': table,
})
return render(request, 'core/rq_task_list.html', {
'table': table,
'queue': queue,
'status': status,
})
class BackgroundTaskView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
try:
exc_info = job._exc_info
except AttributeError:
exc_info = None
return render(request, 'core/rq_task.html', {
'queue': queue,
'job': job,
'queue_index': queue_index,
'dependency_id': job._dependency_id,
'exc_info': exc_info,
})
class BackgroundTaskDeleteView(BaseRQView):
def get(self, request, job_id):
if not request.htmx:
return redirect(reverse('core:background_queue_list'))
form = ConfirmationForm(initial=request.GET)
return render(request, 'htmx/delete_form.html', {
'object_type': 'background task',
'object': job_id,
'form': form,
'form_url': reverse('core:background_task_delete', kwargs={'job_id': job_id})
})
def post(self, request, job_id):
form = ConfirmationForm(request.POST)
if form.is_valid():
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
# Remove job id from queue and delete the actual job
queue.connection.lrem(queue.key, 0, job.id)
job.delete()
messages.success(request, f'Deleted job {job_id}')
else:
messages.error(request, f'Error deleting job: {form.errors[0]}')
return redirect(reverse('core:background_queue_list'))
class BackgroundTaskRequeueView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
messages.success(request, f'You have successfully requeued: {job_id}')
return redirect(reverse('core:background_task', args=[job_id]))
class BackgroundTaskEnqueueView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
try:
# _enqueue_job is new in RQ 1.14, this is used to enqueue
# job regardless of its dependencies
queue._enqueue_job(job)
except AttributeError:
queue.enqueue_job(job)
# Remove job from correct registry if needed
if job.get_status() == RQJobStatus.DEFERRED:
registry = DeferredJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.FINISHED:
registry = FinishedJobRegistry(queue.name, queue.connection)
registry.remove(job)
elif job.get_status() == RQJobStatus.SCHEDULED:
registry = ScheduledJobRegistry(queue.name, queue.connection)
registry.remove(job)
messages.success(request, f'You have successfully enqueued: {job_id}')
return redirect(reverse('core:background_task', args=[job_id]))
class BackgroundTaskStopView(BaseRQView):
def get(self, request, job_id):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
try:
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
except NoSuchJobError:
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
queue_index = QUEUES_MAP[job.origin]
queue = get_queue_by_index(queue_index)
stopped, _ = stop_jobs(queue, job_id)
if len(stopped) == 1:
messages.success(request, f'You have successfully stopped {job_id}')
else:
messages.error(request, f'Failed to stop {job_id}')
return redirect(reverse('core:background_task', args=[job_id]))
class WorkerListView(TableMixin, BaseRQView):
table = tables.WorkerTable
def get_table_data(self, request, queue):
clean_worker_registry(queue)
all_workers = Worker.all(queue.connection)
workers = [worker for worker in all_workers if queue.name in worker.queue_names()]
return workers
def get(self, request, queue_index):
queue = get_queue_by_index(queue_index)
data = self.get_table_data(request, queue)
table = self.get_table(data, request, False)
# If this is an HTMX request, return only the rendered table HTML
if htmx_partial(request):
if not request.htmx.target:
table.embedded = True
# Hide selection checkboxes
if 'pk' in table.base_columns:
table.columns.hide('pk')
return render(request, 'htmx/table.html', {
'table': table,
'queue': queue,
})
return render(request, 'core/rq_worker_list.html', {
'table': table,
'queue': queue,
})
class WorkerView(BaseRQView):
def get(self, request, key):
# all the RQ queues should use the same connection
config = QUEUES_LIST[0]
worker = Worker.find_by_key('rq:worker:' + key, connection=get_redis_connection(config['connection_config']))
# Convert microseconds to milliseconds
worker.total_working_time = worker.total_working_time / 1000
return render(request, 'core/rq_worker.html', {
'worker': worker,
'job': worker.get_current_job(),
'total_working_time': worker.total_working_time * 1000,
})
#
# System
#
class SystemView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
def get(self, request):
# System stats
psql_version = db_name = db_size = None
try:
with connection.cursor() as cursor:
cursor.execute("SELECT version()")
psql_version = cursor.fetchone()[0]
psql_version = psql_version.split('(')[0].strip()
cursor.execute("SELECT current_database()")
db_name = cursor.fetchone()[0]
cursor.execute(f"SELECT pg_size_pretty(pg_database_size('{db_name}'))")
db_size = cursor.fetchone()[0]
except (ProgrammingError, IndexError):
pass
stats = {
'netbox_release': settings.RELEASE,
'django_version': DJANGO_VERSION,
'python_version': platform.python_version(),
'postgresql_version': psql_version,
'database_name': db_name,
'database_size': db_size,
'rq_worker_count': Worker.count(get_connection('default')),
}
# Configuration
try:
config = ConfigRevision.objects.get(pk=cache.get('config_version'))
except ConfigRevision.DoesNotExist:
# Fall back to using the active config data if no record is found
config = get_config()
# Raw data export
if 'export' in request.GET:
data = {
**stats,
'config': {
k: config.data[k] for k in sorted(config.data)
},
}
response = HttpResponse(json.dumps(data, indent=4), content_type='text/json')
response['Content-Disposition'] = 'attachment; filename="netbox.json"'
return response
return render(request, 'core/system.html', {
'stats': stats,
'config': config,
})
#
# Plugins
#
class PluginListView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
def get(self, request):
q = request.GET.get('q', None)
plugins = get_plugins().values()
if q:
plugins = [obj for obj in plugins if q.casefold() in obj.title_short.casefold()]
table = CatalogPluginTable(plugins, user=request.user)
table.configure(request)
# If this is an HTMX request, return only the rendered table HTML
if htmx_partial(request):
return render(request, 'htmx/table.html', {
'table': table,
})
return render(request, 'core/plugin_list.html', {
'table': table,
})
class PluginView(UserPassesTestMixin, View):
def test_func(self):
return self.request.user.is_staff
def get(self, request, name):
plugins = get_plugins()
if name not in plugins:
raise Http404(_("Plugin {name} not found").format(name=name))
plugin = plugins[name]
table = PluginVersionTable(plugin.release_recent_history, user=request.user)
table.configure(request)
return render(request, 'core/plugin.html', {
'plugin': plugin,
'table': table,
})