import base64
from lxml import html
from unittest.mock import patch

from odoo import exceptions
from odoo.tools import mute_logger
from odoo.tests.common import users
from odoo.tests import Form, HttpCase, tagged, warmup
from odoo.addons.mail.tests.common import MailCase
from odoo.addons.marketing_card.controllers.marketing_card import SOCIAL_NETWORK_USER_AGENTS

from .common import MarketingCardCommon, mock_image_render, VALID_JPEG


def _extract_values_from_document(rendered_document):
    return {
        'body': rendered_document.find('.//div[@id="body"]'),
        'header': rendered_document.find('.//span[@id="header"]'),
        'subheader': rendered_document.find('.//span[@id="subheader"]'),
        'section': rendered_document.find('.//span[@id="section"]'),
        'sub_section1': rendered_document.find('.//span[@id="sub_section1"]'),
        'sub_section2': rendered_document.find('.//span[@id="sub_section2"]'),
        'button': rendered_document.find('.//span[@id="button"]'),
        'image1': rendered_document.find('.//img[@id="image1"]'),
        'image2': rendered_document.find('.//img[@id="image2"]'),
    }


class TestMarketingCardMail(MailCase, MarketingCardCommon):

    @users('marketing_card_user')
    @warmup
    @mute_logger('odoo.addons.mail.models.mail_mail')
    def test_campaign_send_mailing(self):
        campaign = self.campaign.with_user(self.env.user)
        self.env.user.sudo().groups_id += self.env.ref('mass_mailing.group_mass_mailing_user')
        partners = self.env['res.partner'].sudo().create([{'name': f'Part{n}', 'email': f'partn{n}@test.lan'} for n in range(7)])
        mailing_context = campaign.action_share().get('context') | {
            'default_email_from': 'test@test.lan',
            'default_mailing_domain': [('id', 'in', partners.ids[:5])],
            'default_reply_to': 'test@test.lan',
        }
        mailing = Form(self.env['mailing.mailing'].with_context(mailing_context)).save()
        mailing.body_html = mailing.body_arch  # normally the js html_field would fill this in

        # sending mailing before generating cards sends to no-one
        self.assertTrue(mailing.card_requires_sync_count)
        with self.assertRaises(exceptions.UserError, msg="There are no recipients selected."):
            mailing._action_send_mail()

        with self.assertRaises(exceptions.UserError, msg="You should update all the cards before scheduling a mailing."):
            mailing.action_launch()

        # once cards are updated they can be sent
        with self.mock_image_renderer():
            mailing.action_update_cards()
        self.assertEqual(len(self._wkhtmltoimage_bodies), 5)

        self.assertFalse(mailing.card_requires_sync_count)
        mailing.action_launch()
        mailing.action_cancel()

        # modifying the domain such that there are missing cards prevents sending again
        mailing.mailing_domain = [('id', 'in', partners.ids[1:6])]
        mailing._compute_card_requires_sync_count()
        self.assertTrue(mailing.card_requires_sync_count)
        with self.assertRaises(exceptions.UserError, msg="You should update all the cards before scheduling a mailing."):
            mailing.action_launch()

        # updating when the campaign was not modified only updates cards that need to be
        with self.mock_image_renderer():
            mailing.action_update_cards()
        self.assertEqual(len(self._wkhtmltoimage_bodies), 1)

        self.assertFalse(mailing.card_requires_sync_count)

        # modifying the campaign should lead to all cards relevant being re-rendered
        campaign.content_header = "New Header"
        mailing._compute_card_requires_sync_count()
        self.assertTrue(mailing.card_requires_sync_count)
        with self.mock_image_renderer():
            mailing.action_update_cards()
        self.assertEqual(len(self._wkhtmltoimage_bodies), 5)

        with self.mock_mail_gateway(), self.assertQueryCount(243):
            mailing._action_send_mail()

        cards = self.env['card.card'].search([('campaign_id', '=', campaign.id)])
        self.assertEqual(len(cards), 6)
        self.assertEqual(len(cards.filtered(lambda card: not card.requires_sync)), 5)
        self.assertEqual(len(self._mails), 5)

        IrHttp = self.env['ir.http']
        for sent_mail in self._mails:
            record_id = int(sent_mail['object_id'].split('-')[0])
            card = cards.filtered(lambda card: card.res_id == record_id)
            self.assertEqual(len(card), 1)
            preview_url = f"{campaign.get_base_url()}/cards/{IrHttp._slug(card)}/preview"
            image_url = f"{campaign.get_base_url()}/cards/{IrHttp._slug(card)}/card.jpg"
            self.assertIn(f'<a href="{preview_url}"', sent_mail['body'])
            self.assertIn(f'<img src="{image_url}"', sent_mail['body'])


class TestMarketingCardRender(MarketingCardCommon):

    @users('marketing_card_user')
    def test_campaign(self):
        campaign = self.campaign.with_user(self.env.user)

        with self.mock_image_renderer():
            campaign.write({
                'content_header': 'Come and See',
                'content_header_dyn': False,
                'content_header_color': '#CC8888',
            })
            self.assertTrue(campaign.image_preview)

        role_values = _extract_values_from_document(html.fromstring(self._wkhtmltoimage_bodies[0]))
        self.assertEqual(role_values['body'].attrib['style'], "background-image: url('data:image/png;base64,');")
        self.assertEqual(role_values['header'].text, 'Come and See')
        self.assertEqual(role_values['header'].attrib['style'], 'color: #CC8888;')
        self.assertEqual(role_values['subheader'].text, 'John')
        self.assertEqual(role_values['section'].text, 'Contact')
        self.assertEqual(role_values['sub_section1'].text, 'john93@trombino.scope')
        self.assertFalse(role_values['sub_section2'])
        self.assertEqual(role_values['button'].text, 'Button')
        self.assertFalse(role_values['image1'])
        self.assertFalse(role_values['image2'])

        campaign.action_preview()
        card = self.env['card.card'].search([
            ('campaign_id', '=', campaign.id),
            ('active', '=', False)
        ])
        self.assertEqual(len(card), 1)
        self.assertTrue(card.image)
        self.assertEqual(card.res_id, self.partners[0].id)

        # second record, modified tags

        with self.mock_image_renderer():
            campaign.preview_record_ref = self.partners[1]
            self.assertTrue(campaign.image_preview)
        role_values = _extract_values_from_document(html.fromstring(self._wkhtmltoimage_bodies[0]))
        self.assertEqual(role_values['body'].attrib['style'], "background-image: url('data:image/png;base64,');")
        self.assertEqual(role_values['subheader'].text, 'Bob')
        self.assertEqual(role_values['sub_section1'].text, 'bob@justbob.me')
        self.assertEqual(role_values['sub_section2'].text, '+32 123 446 789')
        self.assertFalse(role_values['image1'])
        self.assertEqual(role_values['image2'].attrib['src'], f'data:image/png;base64,{base64.b64encode(VALID_JPEG).decode()}')

        campaign.action_preview()
        cards = self.env['card.card'].search([
            ('campaign_id', '=', campaign.id),
            ('active', '=', False)
        ])
        self.assertTrue(cards.mapped('res_id'), self.partners.ids)

        # update previewed record fields

        with self.mock_image_renderer():
            campaign.preview_record_ref.sudo().name = 'An updated name'
        self.assertFalse(self._wkhtmltoimage_bodies, 'Updating the preview record does not refresh the preview.')

        # mismatch preview

        with patch('odoo.addons.marketing_card.models.card_campaign.CardCampaign._get_model_selection',
                   lambda Model: [('res.partner', 'Partner'), ('res.users', 'User')]):

            # mismatches without cards
            self.assertEqual(self.static_campaign.res_model, 'res.partner')
            self.assertFalse(self.static_campaign.card_ids)
            self.static_campaign.preview_record_ref = self.env.user
            self.assertEqual(self.static_campaign.res_model, 'res.users')
            self.static_campaign.preview_record_ref = self.partners[1]
            self.assertEqual(self.static_campaign.res_model, 'res.partner')

            # mismatch with card
            self.env['card.card'].sudo().create({'campaign_id': self.static_campaign.id, 'res_id': 1})

            self.assertTrue(self.static_campaign.card_ids)
            with self.assertRaises(exceptions.ValidationError):
                self.static_campaign.preview_record_ref = self.env.user
                self.assertTrue(self.static_campaign.res_model)
            self.assertEqual(self.static_campaign.res_model, 'res.partner')

            # match with card
            self.static_campaign.preview_record_ref = self.partners[0]
            self.assertEqual(self.static_campaign.res_model, 'res.partner')


@tagged('post_install', '-at_install')
class TestMarketingCardRouting(HttpCase, MarketingCardCommon):

    @mock_image_render
    def test_campaign_stats(self):
        partners = self.env['res.partner'].create([{'name': f'Part{n}', 'email': f'partn{n}@test.lan'} for n in range(20)])
        cards = self.campaign._update_cards([('id', 'in', partners.ids)]).sorted('res_id')
        self.assertEqual(len(cards), 20)
        self.assertEqual(self.campaign.card_count, 20)
        self.assertEqual(self.campaign.card_click_count, 0)
        self.assertEqual(self.campaign.card_share_count, 0)
        self.assertListEqual(cards.mapped('image'), [base64.b64encode(VALID_JPEG)] * 20)
        self.assertListEqual(cards.mapped('share_status'), [False] * 20)
        self.assertListEqual(cards.mapped('requires_sync'), [False] * 20)

        # user checks preview
        self.campaign.preview_record_ref = partners[0]
        card = cards.filtered(lambda card: card.res_id == partners[0].id)
        self.assertEqual(self.campaign.action_preview()['url'], card._get_path('preview'))
        self.url_open(card._get_path('preview'))
        image_request_headers = self.url_open(card._get_card_url()).headers
        self.assertEqual(image_request_headers.get('Content-Type'), 'image/jpeg')
        self.assertTrue(image_request_headers.get('Content-Length'))
        self.assertTrue(card.image)
        self.assertEqual(card.share_status, 'visited')
        self.campaign.flush_recordset()
        self.assertEqual(self.campaign.card_count, 20)
        self.assertEqual(self.campaign.card_click_count, 1)
        self.assertEqual(self.campaign.card_share_count, 0, 'A regular user fetching the card should not count as a share.')

        # user publishes redirect url, prompting social network crawler to check open-graph data
        self.opener.headers['User-Agent'] = f'v1 {SOCIAL_NETWORK_USER_AGENTS[0]} v1.2/'
        opengraph_view = html.fromstring(self.url_open(card._get_redirect_url()).content)
        self.assertTrue(opengraph_view is not None, 'Crawler should get a valid html page as response')
        opengraph_image_url_element = opengraph_view.find('.//meta[@property="og:image"]')
        self.assertTrue(opengraph_image_url_element is not None, 'page should contain image opengraph node')
        opengraph_image_url = opengraph_image_url_element.attrib.get('content')
        self.assertTrue(opengraph_image_url)
        self.assertEqual(opengraph_image_url, card._get_card_url())

        image_request_headers = self.url_open(opengraph_image_url).headers
        self.assertEqual(image_request_headers.get('Content-Type'), 'image/jpeg')
        self.assertTrue(image_request_headers.get('Content-Length'))

        self.campaign.flush_recordset()
        self.assertEqual(self.campaign.card_count, 20)
        self.assertEqual(self.campaign.card_click_count, 1)
        self.assertEqual(self.campaign.card_share_count, 1, "A crawler fetching the card is considered a share.")
        self.assertEqual(cards[0].share_status, 'shared')

        # someone clicks the redirect url on the social network platform
        self.assertEqual(self.campaign.target_url_click_count, 0)
        self.opener.headers['User-Agent'] = 'someuseragent'
        redirect_response = self.url_open(card._get_redirect_url(), allow_redirects=False)
        self.assertEqual(redirect_response.status_code, 303)
        self.assertEqual(redirect_response._next.url, self.campaign.link_tracker_id.short_url)
        self.opener.send(redirect_response._next, allow_redirects=False)
        self.assertEqual(self.campaign.target_url_click_count, 1)

        cards[1:10].share_status = 'visited'
        cards[10:].share_status = 'shared'
        self.assertEqual(self.campaign.card_count, 20)
        self.assertEqual(self.campaign.card_click_count, 20, 'Shared cards are considered implicitly visited')
        self.assertEqual(self.campaign.card_share_count, 11)


class TestMarketingCardSecurity(MarketingCardCommon):

    @users('marketing_card_manager')
    @mute_logger('odoo.addons.mail.models.mail_render_mixin')
    def test_campaign_field_paths(self):
        """Check that card updates are performed as the current user."""
        # restrict reading from partner titles (flush to apply new rule)
        rules = self.env['ir.rule'].sudo().create([{
            'name': 'marketing card user read partner title',
            'domain_force': repr([(0, '=', 1)]),
            'groups': self.env.ref('marketing_card.marketing_card_group_user').ids,
            'model_id': self.env['ir.model']._get_id('res.partner.title'),
            'perm_read': True,
        }, {
            'name': 'system user read partner title',
            'domain_force': repr([(1, '=', 1)]),
            'groups': self.env.ref('base.group_system').ids,
            'model_id': self.env['ir.model']._get_id('res.partner.title'),
            'perm_read': True,
        }])
        rules.flush_recordset()
        # set a title as sudo and invalidate to force fetch as test user
        self.marketing_card_user.partner_id.title = self.env['res.partner.title'].sudo().create({
            'name': 'test marketing card title',
        })
        self.marketing_card_user.partner_id.title.invalidate_recordset()

        campaign = self.campaign.with_user(self.env.user)
        campaign.preview_record_ref = self.marketing_card_user.partner_id
        # should work fine with accessible fields
        campaign._update_cards([('id', '=', self.marketing_card_user.partner_id.id)])
        with self.assertRaises(exceptions.UserError):
            campaign.write({
                'content_header_dyn': True,
                'content_header_path': 'title.name',
            })
            # flush to compute image_preview
            campaign.flush_recordset()

        campaign.with_user(self.system_admin).write({
            'content_header_dyn': True,
            'content_header_path': 'title.name',
        })
        campaign.with_user(self.system_admin).flush_recordset()
        # clear title from cache as it was fetched by the admin for the preview render
        self.marketing_card_user.partner_id.title.invalidate_recordset()

        with self.assertRaises(exceptions.UserError), self.mock_image_renderer():
            campaign._update_cards([('id', '=', self.marketing_card_user.partner_id.id)])
        self.assertFalse(self._wkhtmltoimage_bodies, 'There should have been no render on illegal fields')

        with self.mock_image_renderer():
            campaign.with_user(self.system_admin)._update_cards([('id', '=', self.marketing_card_user.partner_id.id)])
        self.assertIn('test marketing card title', self._wkhtmltoimage_bodies[0])

    def test_campaign_ownership(self):
        campaign_as_manager = self.campaign.with_user(self.marketing_card_manager)
        campaign_as_owner = self.campaign.with_user(self.marketing_card_user)
        campaign_as_other = self.campaign.with_user(self.marketing_card_user_2)

        with self.assertRaises(exceptions.AccessError):
            campaign_as_other.content_header = 'Hello'
        campaign_as_owner.content_header = 'Hi'
        campaign_as_manager.content_header = 'Hoy'

        with self.assertRaises(exceptions.AccessError):
            campaign_as_other.unlink()
        campaign_as_owner.unlink()

    def test_mail_render_security_body_html(self):
        """Asserts body_html of card.campaign cannot be written to.

        See _check_access_right_dynamic_template override.
        """
        campaign = self.campaign.with_user(self.marketing_card_manager)
        arbitrary_qweb = """
        <img t-attf-src="data:image/png;base64,{{object.env.ref('base.user_admin').sudo().image_128}}"/>
        """

        campaign.body_html = arbitrary_qweb
        # Normally, this should raise an AccessError, as `body_html` is a related to `card_template_id.body`
        # and the user does not have the write rights on `card.template`.
        # However, the ORM doesn't forward the new value to the related,
        # and the new value is put in the cache without error.
        # In the real world, using the web client or the XMLRPC API,
        # a user would have to do this operation in two requests:
        # First set the body_html on the campaign,
        # Then trigger the render (e.g. with `action_update_cards`),
        # and the cache would have changed between the two operations, hence setting an arbitrary value on body_html
        # on a campaign wouldn't work.
        # Just ensure that the value is well not written in db, nor on the current campaign, nor on the related.
        # Force a cache invalidation to force a re-fetch from database
        campaign.invalidate_recordset(fnames=['body_html'])
        self.assertTrue(arbitrary_qweb not in campaign.body_html)
        self.assertTrue(arbitrary_qweb not in campaign.card_template_id.body)

        with self.assertRaisesRegex(exceptions.AccessError, 'You are not allowed to modify'):
            campaign.card_template_id.body = arbitrary_qweb

    def test_mail_render_security_render_field_write_access(self):
        """Check the rendered fields on card.campaign are not both rendered and writeable.

        See _check_access_right_dynamic_template override.
        """
        CardCampaign = self.env['card.campaign'].with_user(self.marketing_card_manager)
        # Asserts all render fields are related to card.template, not stored on the campaign itself, and readonly
        # If one of the render fields doesn't fulfil this assumption, the `_unrestricted_rendering = True` must be
        # reconsidered for security reasons.
        self.assertTrue(
            all(
                field.related_field.model_name == 'card.template'
                and not field.store
                and field.readonly
                for field in CardCampaign._fields.values() if hasattr(field, 'render_engine')
            )
        )
        # Asserts the manager doesn't have write access to card.template
        self.assertFalse(CardCampaign.card_template_id.has_access('write'))
